diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 165 |
1 files changed, 83 insertions, 82 deletions
@@ -119,10 +119,10 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -175,10 +175,9 @@ configSettings_t cs_save; /* our saved (scope!) config settings */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice * to have during one instance of an rsyslogd run. For example, I use them to name actions when there - * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep - * counting. -- rgerhards, 2008-01-29 + * is no better name available. */ -static int iActionNbr = 0; +int iActionNbr = 0; /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfparamdescr[] = { @@ -308,7 +307,6 @@ rsRetVal actionDestruct(action_t *pThis) pThis->pMod->freeInstance(pThis->pModData); pthread_mutex_destroy(&pThis->mutAction); - pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); @@ -341,7 +339,7 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ - pthread_mutex_init(&pThis->mutActExec, NULL); + pThis->iActionNbr = iActionNbr; pthread_mutex_init(&pThis->mutAction, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); @@ -428,7 +426,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, - (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); + processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); @@ -635,7 +633,7 @@ static inline void actionSuspend(action_t *pThis) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate) +actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { int iRetries; int iSleepPeriod; @@ -647,7 +645,7 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) iRetries = 0; while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); - iRet = pThis->pMod->tryResume(pThis->pModData); + iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { bTreatOKasSusp = 1; @@ -687,10 +685,26 @@ finalize_it: } +static rsRetVal +actionCheckAndCreateWrkrInstance(action_t *pThis, wti_t *pWti) +{ + DEFiRet; + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { +dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr); + DBGPRINTF("we need to create a new action worker instance for " + "action %d\n", pThis->iActionNbr); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + } +finalize_it: + RETiRet; +} + /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; @@ -714,7 +728,7 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) if(pThis->eState == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pbShutdownImmediate, pWti)); } if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { @@ -731,18 +745,19 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; assert(pThis != NULL); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); + CHKiRet(actionTryResume(pThis, pbShutdownImmediate, pWti)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ if(pThis->eState == ACT_STATE_RDY) { - iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionSetState(pThis, ACT_STATE_ITX); @@ -905,17 +920,21 @@ done: RETiRet; * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", + getActStateName(pThis), pThis->iActionNbr); + + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -952,19 +971,19 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static inline rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +static rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); iRet = getReturnCode(pThis); finalize_it: @@ -977,7 +996,7 @@ finalize_it: * rgerhards, 2008-01-28 */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch) +finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -989,9 +1008,9 @@ finishBatch(action_t *pThis, batch_t *pBatch) FINALIZE; /* nothing to do */ } - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); if(pThis->eState == ACT_STATE_ITX) { - iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -1034,7 +1053,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti) { int i; int iElemProcessed; @@ -1059,7 +1078,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) if(batchIsValidElem(pBatch, i)) { pMsg = pBatch->pElem[i].pMsg; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate); + pBatch->pbShutdownImmediate, pWti); DBGPRINTF("action %p call returned %d\n", pAction, localRet); /* Note: we directly modify the batch object state, because we know that * wo do not overwrite BATCH_STATE_DISC indicators! @@ -1111,7 +1130,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti) { int i; int bDone; @@ -1125,7 +1144,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) wasDoneTo = pBatch->iDoneUpTo; bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem); + localRet = tryDoAction(pAction, pBatch, &nElem, pWti); if(localRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } @@ -1135,7 +1154,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) /* try commit transaction, once done, we can simply do so as if * that return state was returned from tryDoAction(). */ - localRet = finishBatch(pAction, pBatch); + localRet = finishBatch(pAction, pBatch, pWti); } if( localRet == RS_RET_OK @@ -1164,8 +1183,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); + submitBatch(pAction, pBatch, nElem / 2, pWti); + submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti); bDone = 1; } } @@ -1245,31 +1264,31 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR * rgerhards, 2009-05-12 */ static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti) { DEFiRet; assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); - iRet = finishBatch(pAction, pBatch); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti)); + iRet = finishBatch(pAction, pBatch, pWti); finalize_it: RETiRet; } -#pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { int *pbShutdownImmdtSave; sbool *activeSave; int bMustRestoreActivePtr = 0; rsRetVal localRet; + action_t *pAction = (action_t*) pVoid; DEFiRet; assert(pBatch != NULL); @@ -1280,17 +1299,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) } CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - - iRet = processAction(pAction, pBatch); - - pthread_cleanup_pop(1); /* unlock mutex */ + iRet = processAction(pAction, pBatch, pWti); /* even if processAction failed, we need to release the batch (else we * have a memory leak). So we do this first, and then check if we need to @@ -1312,15 +1321,12 @@ finalize_it: pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* call the HUP handler for a given action, if such a handler is defined. The - * action mutex is locked, because the HUP handler most probably needs to modify - * some internal state information. - * rgerhards, 2008-10-22 +/* call the HUP handler for a given action, if such a handler is defined. + * Note that the action must be able to service HUP requests concurrently + * to any current doAction() processing. */ -#pragma GCC diagnostic ignored "-Wempty-body" rsRetVal actionCallHUPHdlr(action_t *pAction) { @@ -1329,19 +1335,13 @@ actionCallHUPHdlr(action_t *pAction) ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); - if(pAction->pMod->doHUP == NULL) { - FINALIZE; /* no HUP handler, so we are done ;) */ + if(pAction->pMod->doHUP != NULL) { + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); } - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - CHKiRet(pAction->pMod->doHUP(pAction->pModData)); - pthread_cleanup_pop(1); /* unlock mutex */ - finalize_it: RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* set the action message queue mode @@ -1381,7 +1381,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT * rgerhards, 2010-06-08 */ static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1392,7 +1392,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); + iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti); else iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); @@ -1409,7 +1409,7 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction, msg_t *pMsg) +actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1464,7 +1464,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg); + iRet = doSubmitToActionQ(pAction, pMsg, pWti); finalize_it: RETiRet; @@ -1475,7 +1475,7 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) { msg_t *pMsg; DEFiRet; @@ -1490,7 +1490,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) } /* call the output driver */ - iRet = actionWriteToAction(pAction, pMsg); + iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: /* we need to update the batch to handle failover processing correctly */ @@ -1550,7 +1550,7 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { time_t now = 0; time_t lastAct; @@ -1588,7 +1588,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) } } - iRet = doSubmitToActionQBatch(pAction, pBatch); + iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); free(pBatch->active); pBatch->active = activeSave; @@ -1613,7 +1613,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch) * rgerhards, 2011-06-16 */ static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) +doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { sbool bNeedSubmit; sbool *activeSave; @@ -1645,14 +1645,14 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) } if(bNeedSubmit) { /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); } else { DBGPRINTF("no need to submit batch, all invalid\n"); } } else { if(GatherStats) countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); } free(pBatch->active); @@ -1666,7 +1666,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -1674,7 +1674,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch); + iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ @@ -1684,7 +1684,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); + doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); } } } @@ -1699,7 +1699,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2010-06-23 */ static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -1712,7 +1712,7 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i); + doActionCallAction(pAction, pBatch, i, pWti); } } @@ -1724,13 +1724,14 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); +dbgprintf("DDDD: locked mutAction\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ |