diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 65 |
1 files changed, 34 insertions, 31 deletions
@@ -119,10 +119,10 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -428,7 +428,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, - (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); + processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); @@ -905,7 +905,7 @@ done: RETiRet; * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) { DEFiRet; @@ -916,6 +916,8 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) getActStateName(pThis), pThis->iActionNbr); pThis->bHadAutoCommit = 0; +dbgprintf("DDDD: calling action id %d for wti %p\n", pThis->iActionNbr, pWti); +#warning TODO: add action instance check here iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); switch(iRet) { case RS_RET_OK: @@ -954,7 +956,7 @@ finalize_it: * rgerhards, 2008-01-28 */ static inline rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; @@ -965,7 +967,7 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); iRet = getReturnCode(pThis); finalize_it: @@ -1035,7 +1037,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti) { int i; int iElemProcessed; @@ -1060,7 +1062,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) if(batchIsValidElem(pBatch, i)) { pMsg = pBatch->pElem[i].pMsg; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate); + pBatch->pbShutdownImmediate, pWti); DBGPRINTF("action %p call returned %d\n", pAction, localRet); /* Note: we directly modify the batch object state, because we know that * wo do not overwrite BATCH_STATE_DISC indicators! @@ -1112,7 +1114,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti) { int i; int bDone; @@ -1126,7 +1128,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) wasDoneTo = pBatch->iDoneUpTo; bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem); + localRet = tryDoAction(pAction, pBatch, &nElem, pWti); if(localRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } @@ -1165,8 +1167,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); + submitBatch(pAction, pBatch, nElem / 2, pWti); + submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti); bDone = 1; } } @@ -1246,12 +1248,12 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR * rgerhards, 2009-05-12 */ static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti) { DEFiRet; assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti)); iRet = finishBatch(pAction, pBatch); finalize_it: @@ -1265,12 +1267,13 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { int *pbShutdownImmdtSave; sbool *activeSave; int bMustRestoreActivePtr = 0; rsRetVal localRet; + action_t *pAction = (action_t*) pVoid; DEFiRet; assert(pBatch != NULL); @@ -1289,7 +1292,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch); + iRet = processAction(pAction, pBatch, pWti); pthread_cleanup_pop(1); /* unlock mutex */ @@ -1382,7 +1385,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT * rgerhards, 2010-06-08 */ static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1410,7 +1413,7 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction, msg_t *pMsg) +actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1465,7 +1468,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg); + iRet = doSubmitToActionQ(pAction, pMsg, pWti); finalize_it: RETiRet; @@ -1476,7 +1479,7 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) { msg_t *pMsg; DEFiRet; @@ -1491,7 +1494,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) } /* call the output driver */ - iRet = actionWriteToAction(pAction, pMsg); + iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: /* we need to update the batch to handle failover processing correctly */ @@ -1551,7 +1554,7 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { time_t now = 0; time_t lastAct; @@ -1589,7 +1592,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) } } - iRet = doSubmitToActionQBatch(pAction, pBatch); + iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); free(pBatch->active); pBatch->active = activeSave; @@ -1667,7 +1670,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -1685,7 +1688,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); + doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); } } } @@ -1700,7 +1703,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2010-06-23 */ static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -1713,7 +1716,7 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i); + doActionCallAction(pAction, pBatch, i, pWti); } } @@ -1725,13 +1728,13 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ |