summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c53
1 files changed, 24 insertions, 29 deletions
diff --git a/action.c b/action.c
index 790ea4ce..54004311 100644
--- a/action.c
+++ b/action.c
@@ -115,7 +115,7 @@
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
-static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*);
+static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti);
static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*);
static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*);
static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*);
@@ -632,7 +632,7 @@ static inline void actionSuspend(action_t *pThis, wti_t *pWti)
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
static inline rsRetVal
-actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
+actionDoRetry(action_t *pThis, wti_t *pWti)
{
int iRetries;
int iSleepPeriod;
@@ -642,7 +642,7 @@ actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
ASSERT(pThis != NULL);
iRetries = 0;
- while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
+ while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
@@ -666,7 +666,7 @@ actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
++iRetries;
iSleepPeriod = pThis->iResumeInterval;
srSleep(iSleepPeriod, 0);
- if(*pbShutdownImmediate) {
+ if(*pWti->pbShutdownImmediate) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
}
@@ -705,7 +705,7 @@ finalize_it:
* changed to new action state engine -- rgerhards, 2009-05-07
*/
static rsRetVal
-actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
+actionTryResume(action_t *pThis, wti_t *pWti)
{
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
@@ -727,7 +727,7 @@ actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
if(getActionState(pWti, pThis) == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate));
+ CHKiRet(actionDoRetry(pThis, pWti));
}
if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) {
@@ -745,13 +745,13 @@ finalize_it:
* rgerhards, 2009-05-07
*/
static inline rsRetVal
-actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+actionPrepare(action_t *pThis, wti_t *pWti)
{
DEFiRet;
assert(pThis != NULL);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
- CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate));
+ CHKiRet(actionTryResume(pThis, pWti));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
@@ -961,13 +961,13 @@ finalize_it:
* rgerhards, 2008-01-28
*/
rsRetVal
-actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
+actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti)
{
DEFiRet;
- CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
+ CHKiRet(actionPrepare(pThis, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
- pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
+ pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate);
if(getActionState(pWti, pThis) == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti));
@@ -979,7 +979,7 @@ finalize_it:
/* Commit try committing (do not handle retry processing and such) */
static rsRetVal
-actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
+actionTryCommit(action_t *pThis, wti_t *pWti)
{
actWrkrInfo_t *wrkrInfo;
actWrkrIParams_t *iparamCurr, *iparamDel;
@@ -992,7 +992,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
while(iparamCurr != NULL) {
iRet = actionProcessMessage(pThis, iparamCurr->msgFlags,
iparamCurr->staticActParams,
- pbShutdownImmediate, pWti);
+ pWti);
releaseDoActionParams(pThis, pWti);
iparamDel = iparamCurr;
iparamCurr = iparamCurr->next;
@@ -1001,7 +1001,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
wrkrInfo->iparamLast = NULL;
}
- CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
+ CHKiRet(actionPrepare(pThis, pWti));
if(getActionState(pWti, pThis) == ACT_STATE_ITX) {
dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
@@ -1038,7 +1038,7 @@ finalize_it:
}
static rsRetVal
-actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
+actionCommit(action_t *pThis, wti_t *pWti)
{
DEFiRet;
// TODO: #warning do we really need to return something?
@@ -1055,7 +1055,7 @@ actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
any of these partial implementations).
rgerhards, 2013-11-04
*/
- iRet = actionTryCommit(pThis, pWti, pbShutdownImmediate);
+ iRet = actionTryCommit(pThis, pWti);
RETiRet;
}
@@ -1071,7 +1071,7 @@ actionCommitAllDirect(wti_t *pWti)
i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot);
pAction = pWti->actWrkrInfo[i].pAction;
if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT)
- actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pWti->pbShutdownImmediate);
+ actionCommit(pWti->actWrkrInfo[i].pAction, pWti);
}
}
@@ -1080,7 +1080,7 @@ actionCommitAllDirect(wti_t *pWti)
* queue thread if the action queue is set to "direct".
*/
static rsRetVal
-processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate)
+processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow)
{
DEFiRet;
@@ -1095,7 +1095,7 @@ dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRa
iRet = actionProcessMessage(pAction, pMsg->msgFlags,
pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
- pbShutdownImmediate, pWti);
+ pWti);
releaseDoActionParams(pAction, pWti);
finalize_it:
RETiRet;
@@ -1106,7 +1106,7 @@ finalize_it:
* rgerhards, 2009-04-22
*/
static rsRetVal
-processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
+processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti)
{
action_t *pAction = (action_t*) pVoid;
msg_t *pMsg;
@@ -1114,23 +1114,19 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed
struct syslogTime ttNow;
DEFiRet;
- if(pbShutdownImmediate == NULL) {
- pbShutdownImmediate = pWti->pbShutdownImmediate;
- }
-
/* indicate we have not yet read the date */
ttNow.year = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) {
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) {
if(batchIsValidElem(pBatch, i)) {
pMsg = pBatch->pElem[i].pMsg;
- iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate);
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
// TODO: we must refactor this! flag messages as committed
batchSetElemState(pBatch, i, BATCH_STATE_COMM);
}
}
- iRet = actionCommit(pAction, pWti, pbShutdownImmediate);
+ iRet = actionCommit(pAction, pWti);
dbgprintf("DDDD: processBatchMain - end\n");
RETiRet;
}
@@ -1398,14 +1394,13 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
static inline rsRetVal
doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
-int pbShutdownImmediate = 0; // TODO: implement
dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg);
struct syslogTime ttNow;
DEFiRet;
if(GatherStats)
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
ttNow.year = 0;
- iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate);
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
RETiRet;
}