diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 53 |
1 files changed, 24 insertions, 29 deletions
@@ -115,7 +115,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti); static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*); static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*); static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*); @@ -632,7 +632,7 @@ static inline void actionSuspend(action_t *pThis, wti_t *pWti) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) +actionDoRetry(action_t *pThis, wti_t *pWti) { int iRetries; int iSleepPeriod; @@ -642,7 +642,7 @@ actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { + while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); @@ -666,7 +666,7 @@ actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); - if(*pbShutdownImmediate) { + if(*pWti->pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } } @@ -705,7 +705,7 @@ finalize_it: * changed to new action state engine -- rgerhards, 2009-05-07 */ static rsRetVal -actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) +actionTryResume(action_t *pThis, wti_t *pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; @@ -727,7 +727,7 @@ actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pWti)); } if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { @@ -745,13 +745,13 @@ finalize_it: * rgerhards, 2009-05-07 */ static inline rsRetVal -actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +actionPrepare(action_t *pThis, wti_t *pWti) { DEFiRet; assert(pThis != NULL); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); - CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate)); + CHKiRet(actionTryResume(pThis, pWti)); /* if we are now ready, we initialize the transaction and advance * action state accordingly @@ -961,13 +961,13 @@ finalize_it: * rgerhards, 2008-01-28 */ rsRetVal -actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti) +actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; - CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionPrepare(pThis, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) - pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate); if(getActionState(pWti, pThis) == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti)); @@ -979,7 +979,7 @@ finalize_it: /* Commit try committing (do not handle retry processing and such) */ static rsRetVal -actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) +actionTryCommit(action_t *pThis, wti_t *pWti) { actWrkrInfo_t *wrkrInfo; actWrkrIParams_t *iparamCurr, *iparamDel; @@ -992,7 +992,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) while(iparamCurr != NULL) { iRet = actionProcessMessage(pThis, iparamCurr->msgFlags, iparamCurr->staticActParams, - pbShutdownImmediate, pWti); + pWti); releaseDoActionParams(pThis, pWti); iparamDel = iparamCurr; iparamCurr = iparamCurr->next; @@ -1001,7 +1001,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) wrkrInfo->iparamLast = NULL; } - CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionPrepare(pThis, pWti)); if(getActionState(pWti, pThis) == ACT_STATE_ITX) { dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); @@ -1038,7 +1038,7 @@ finalize_it: } static rsRetVal -actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) +actionCommit(action_t *pThis, wti_t *pWti) { DEFiRet; // TODO: #warning do we really need to return something? @@ -1055,7 +1055,7 @@ actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) any of these partial implementations). rgerhards, 2013-11-04 */ - iRet = actionTryCommit(pThis, pWti, pbShutdownImmediate); + iRet = actionTryCommit(pThis, pWti); RETiRet; } @@ -1071,7 +1071,7 @@ actionCommitAllDirect(wti_t *pWti) i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot); pAction = pWti->actWrkrInfo[i].pAction; if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT) - actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pWti->pbShutdownImmediate); + actionCommit(pWti->actWrkrInfo[i].pAction, pWti); } } @@ -1080,7 +1080,7 @@ actionCommitAllDirect(wti_t *pWti) * queue thread if the action queue is set to "direct". */ static rsRetVal -processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate) +processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { DEFiRet; @@ -1095,7 +1095,7 @@ dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRa iRet = actionProcessMessage(pAction, pMsg->msgFlags, pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, - pbShutdownImmediate, pWti); + pWti); releaseDoActionParams(pAction, pWti); finalize_it: RETiRet; @@ -1106,7 +1106,7 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) +processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti) { action_t *pAction = (action_t*) pVoid; msg_t *pMsg; @@ -1114,23 +1114,19 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed struct syslogTime ttNow; DEFiRet; - if(pbShutdownImmediate == NULL) { - pbShutdownImmediate = pWti->pbShutdownImmediate; - } - /* indicate we have not yet read the date */ ttNow.year = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) { + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) { if(batchIsValidElem(pBatch, i)) { pMsg = pBatch->pElem[i].pMsg; - iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate); + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); // TODO: we must refactor this! flag messages as committed batchSetElemState(pBatch, i, BATCH_STATE_COMM); } } - iRet = actionCommit(pAction, pWti, pbShutdownImmediate); + iRet = actionCommit(pAction, pWti); dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } @@ -1398,14 +1394,13 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) static inline rsRetVal doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { -int pbShutdownImmediate = 0; // TODO: implement dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg); struct syslogTime ttNow; DEFiRet; if(GatherStats) STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); ttNow.year = 0; - iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate); + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); RETiRet; } |