diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 44 |
1 files changed, 30 insertions, 14 deletions
@@ -188,7 +188,7 @@ static struct cnfparamdescr cnfparamdescr[] = { { "action.execonlyeverynthtime", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtime */ { "action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtimetimeout */ { "action.execonlyonceeveryinterval", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyonceeveryinterval */ - { "action.execonlywhenpreviousissuspended", eCmdHdlrInt, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */ + { "action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */ { "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */ { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */ { "action.resumeinterval", eCmdHdlrInt, 0 } @@ -605,13 +605,17 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis, time_t ttNow) +static inline void actionSuspend(action_t *pThis) { - if(ttNow == NO_TIME_PROVIDED) - datetime.GetTime(&ttNow); + time_t ttNow; + + /* note: we can NOT use a cached timestamp, as time may have evolved + * since caching, and this would break logic (and it actually did so!) + */ + datetime.GetTime(&ttNow); pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); actionSetState(pThis, ACT_STATE_SUSP); - DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); + DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -631,7 +635,7 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate) +actionDoRetry(action_t *pThis, int *pbShutdownImmediate) { int iRetries; int iSleepPeriod; @@ -642,7 +646,9 @@ actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate) iRetries = 0; while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); iRet = pThis->pMod->tryResume(pThis->pModData); + DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { bTreatOKasSusp = 1; pThis->iResumeOKinRow = 0; @@ -650,16 +656,18 @@ actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate) bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { + DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); actionSetState(pThis, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ + DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", + pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis, ttNow); + actionSuspend(pThis); } else { ++pThis->iNbrResRtry; ++iRetries; iSleepPeriod = pThis->iResumeInterval; - ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ srSleep(iSleepPeriod, 0); if(*pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); @@ -706,7 +714,7 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) if(pThis->eState == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, ttNow, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); } if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { @@ -953,6 +961,8 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd ISOBJ_TYPE_assert(pMsg, msg); CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(pThis->eState == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); @@ -1111,6 +1121,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) assert(pBatch != NULL); + DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); wasDoneTo = pBatch->iDoneUpTo; bDone = 0; do { @@ -1132,7 +1143,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) || localRet == RS_RET_DEFER_COMMIT) { bDone = 1; } else if(localRet == RS_RET_SUSPENDED) { - ; /* do nothing, this will retry the full batch */ + DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); + /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { /* in this case, everything not yet committed is BAD */ for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { @@ -1262,8 +1274,11 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) assert(pBatch != NULL); - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; + if(pbShutdownImmediate != NULL) { + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; +dbgprintf("DDDD: processBatchMain ShutdownImmediate is %p, was %p\n", pBatch->pbShutdownImmediate, pbShutdownImmdtSave); + } CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); /* We now must guard the output module against execution by multiple threads. The @@ -1294,7 +1309,8 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) } finalize_it: - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + if(pbShutdownImmediate != NULL) + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -1864,7 +1880,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->eState = ACT_STATE_RDY; /* action is enabled */ if(bSuspended) - actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ + actionSuspend(pAction); CHKiRet(actionConstructFinalize(pAction, queueParams)); |