diff options
-rw-r--r-- | action.c | 737 | ||||
-rw-r--r-- | action.h | 19 | ||||
-rw-r--r-- | runtime/batch.h | 14 | ||||
-rw-r--r-- | runtime/conf.c | 1 | ||||
-rw-r--r-- | runtime/queue.c | 4 | ||||
-rw-r--r-- | runtime/ruleset.c | 33 | ||||
-rw-r--r-- | runtime/wti.h | 74 |
7 files changed, 314 insertions, 568 deletions
@@ -14,7 +14,6 @@ * * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval * - doSubmitToActionQComplexBatch - * - helperSubmitToActionQComplexBatch * - doActionCallAction * handles mark message reduction, but in essence calls * - actionWriteToAction @@ -36,9 +35,6 @@ * * After dequeue, processing is as follows: * - processBatchMain - * - processAction - * - submitBatch - * - tryDoAction * - ... * * MORE ON PROCESSING, QUEUES and FILTERING @@ -69,7 +65,7 @@ * beast. * rgerhards, 2011-06-15 * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -120,9 +116,9 @@ /* forward definitions */ static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -496,9 +492,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal) * returned string must not be modified. * rgerhards, 2009-05-07 */ -static uchar *getActStateName(action_t *pThis) +static uchar *getActStateName(action_t *pThis, wti_t *pWti) { - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: return (uchar*) "rdy"; case ACT_STATE_ITX: @@ -520,12 +516,12 @@ static uchar *getActStateName(action_t *pThis) /* returns a suitable return code based on action state * rgerhards, 2009-05-07 */ -static rsRetVal getReturnCode(action_t *pThis) +static rsRetVal getReturnCode(action_t *pThis, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: iRet = RS_RET_OK; break; @@ -545,8 +541,8 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_ACTION_FAILED; break; default: - DBGPRINTF("Invalid action engine state %d, program error\n", - (int) pThis->eState); + DBGPRINTF("Invalid action engine state %u, program error\n", + getActionState(pWti, pThis)); iRet = RS_RET_ERR; break; } @@ -558,29 +554,31 @@ static rsRetVal getReturnCode(action_t *pThis) /* set the action to a new state * rgerhards, 2007-08-02 */ -static inline void actionSetState(action_t *pThis, action_state_t newState) +static inline void +actionSetState(action_t *pThis, wti_t *pWti, uint8_t newState) { - pThis->eState = newState; - DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); + setActionState(pWti, pThis, newState); + DBGPRINTF("Action %d transitioned to state: %s\n", + pThis->iActionNbr, getActStateName(pThis, pWti)); } /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ -static void actionCommitted(action_t *pThis) +static void actionCommitted(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } /* set action to "rtry" state. * rgerhards, 2007-08-02 */ -static void actionRetry(action_t *pThis) +static void actionRetry(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RTRY); - pThis->iResumeOKinRow++; + actionSetState(pThis, pWti, ACT_STATE_RTRY); + incActionResumeInRow(pWti, pThis); } @@ -589,9 +587,9 @@ static void actionRetry(action_t *pThis) * depends on output module. * rgerhards, 2007-08-02 */ -static void actionDisable(action_t *pThis) +static void actionDisable(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_DIED); + actionSetState(pThis, pWti, ACT_STATE_DIED); } @@ -603,7 +601,7 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis) +static inline void actionSuspend(action_t *pThis, wti_t *pWti) { time_t ttNow; @@ -611,8 +609,9 @@ static inline void actionSuspend(action_t *pThis) * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); - actionSetState(pThis, ACT_STATE_SUSP); + pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * + (getActionNbrResRtry(pWti, pThis) / 10 + 1); + actionSetState(pThis, pWti, ACT_STATE_SUSP); DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -633,7 +632,7 @@ static inline void actionSuspend(action_t *pThis) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) { int iRetries; int iSleepPeriod; @@ -643,27 +642,27 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); - if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { + if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) { bTreatOKasSusp = 1; - pThis->iResumeOKinRow = 0; + setActionResumeInRow(pWti, pThis, 0); } else { bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis); + actionSuspend(pThis, pWti); } else { - ++pThis->iNbrResRtry; + incActionNbrResRtry(pWti, pThis); ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); @@ -672,12 +671,12 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) } } } else if(iRet == RS_RET_DISABLE_ACTION) { - actionDisable(pThis); + actionDisable(pThis, pWti); } } - if(pThis->eState == ACT_STATE_RDY) { - pThis->iNbrResRtry = 0; + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + setActionNbrResRtry(pWti, pThis, 0); } finalize_it: @@ -696,6 +695,7 @@ dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), pThis->pModData)); pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ } finalize_it: RETiRet; @@ -704,14 +704,13 @@ finalize_it: /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +static rsRetVal +actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_SUSP) { + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used * to use the action timestamp, but in this case we will never reach a @@ -721,19 +720,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t */ datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow >= pThis->ttResumeRtry) { - actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ + actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ } } - if(pThis->eState == ACT_STATE_RTRY) { + if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate)); } - if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", - pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); } finalize_it: @@ -745,28 +744,30 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +static inline rsRetVal +actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; assert(pThis != NULL); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ - if(pThis->eState == ACT_STATE_RDY) { + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { +dbgprintf("DDDDD: calling beginTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionSetState(pThis, ACT_STATE_ITX); + actionSetState(pThis, pWti, ACT_STATE_ITX); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; default:FINALIZE; } @@ -780,7 +781,7 @@ finalize_it: /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ -rsRetVal actionDbgPrint(action_t *pThis) +static rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; char *sz; @@ -790,11 +791,12 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - if(pThis->eState == ACT_STATE_SUSP) { +#if 0 // do we need this ??? + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\tState: %s\n", getActStateName(pThis)); +#endif dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { sz = "slow, but feature-rich"; @@ -816,34 +818,31 @@ rsRetVal actionDbgPrint(action_t *pThis) * rgerhards, 2009-05-07 */ static rsRetVal -prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) +prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { int i; - msg_t *pMsg; struct json_object *json; + actWrkrInfo_t *pWrkrInfo; DEFiRet; - ASSERT(pAction != NULL); - ASSERT(pElem != NULL); - - pMsg = pElem->pMsg; + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), - &pElem->staticLenStrings[i], ttNow)); - pElem->staticActParams[i] = pElem->staticActStrings[i]; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->staticActStrings[i]), + &pWrkrInfo->staticLenStrings[i], ttNow)); + pWrkrInfo->staticActParams[i] = pWrkrInfo->staticActStrings[i]; break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow)); break; case ACT_MSG_PASSING: - pElem->staticActParams[i] = (void*) pMsg; + pWrkrInfo->staticActParams[i] = (void*) pMsg; break; case ACT_JSON_PASSING: CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); - pElem->staticActParams[i] = (void*) json; + pWrkrInfo->staticActParams[i] = (void*) json; break; default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", (int) pAction->eParamPassing); @@ -857,69 +856,55 @@ finalize_it: } -/* free a batches ressources, but not string buffers (because they will - * most probably be reused). String buffers are only deleted upon final - * destruction of the batch. - * This function here must be called only when the batch is actually no - * longer used, also not for retrying actions or such. It invalidates - * buffers. - * rgerhards, 2010-12-17 - */ -static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +static void +releaseDoActionParams(action_t *pAction, wti_t *pWti) { int jArr; - int i, j; - batch_obj_t *pElem; + int j; + actWrkrInfo_t *pWrkrInfo; uchar ***ppMsgs; - DEFiRet; - - ASSERT(pAction != NULL); if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) goto done; /* we need to do nothing with these types! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - ppMsgs = (uchar***) pElem->staticActParams; - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - if(((uchar**)ppMsgs)[j] != NULL) { - jArr = 0; - while(ppMsgs[j][jArr] != NULL) { - d_free(ppMsgs[j][jArr]); - ppMsgs[j][jArr] = NULL; - ++jArr; - } - d_free(((uchar**)ppMsgs)[j]); - ((uchar**)ppMsgs)[j] = NULL; - } - } - break; - case ACT_JSON_PASSING: - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - json_object_put((struct json_object*) - pElem->staticActParams[j]); - pElem->staticActParams[j] = NULL; + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pWrkrInfo->staticActParams; + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + free(ppMsgs[j][jArr]); + ppMsgs[j][jArr] = NULL; + ++jArr; } - break; - case ACT_STRING_PASSING: - case ACT_MSG_PASSING: - /* can never happen, just to keep compiler happy! */ - break; + free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; } } + break; + case ACT_JSON_PASSING: + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + json_object_put((struct json_object*) + pWrkrInfo->staticActParams[j]); + pWrkrInfo->staticActParams[j] = NULL; + } + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* can never happen, just to keep compiler happy! */ + break; } -done: RETiRet; +done: return; } /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ -rsRetVal +static rsRetVal actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) { DEFiRet; @@ -928,7 +913,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) ISOBJ_TYPE_assert(pMsg, msg); DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", - getActStateName(pThis), pThis->iActionNbr); + getActStateName(pThis, pWti), pThis->iActionNbr); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); @@ -937,30 +922,30 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionCommitted(pThis, pWti); + setActionResumeInRow(pWti, pThis, 0); break; case RS_RET_DEFER_COMMIT: - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); /* we are done, action state remains the same */ break; case RS_RET_PREVIOUS_COMMITTED: /* action state remains the same, but we had a commit. */ pThis->bHadAutoCommit = 1; - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) */ FINALIZE; } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; @@ -982,59 +967,49 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); - if(pThis->eState == ACT_STATE_ITX) + if(getActionState(pWti, pThis) == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } -/* finish processing a batch. Most importantly, that means we commit if we - * need to do so. - * rgerhards, 2008-01-28 - */ +/* Commit action after processing. */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) +actionCommit(action_t *pThis, wti_t *pWti) { - int i; + int pbShutdownImmediate = 1; DEFiRet; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_RDY) { - /* we just need to flag the batch as commited */ + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { FINALIZE; /* nothing to do */ } - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); - if(pThis->eState == ACT_STATE_ITX) { + CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti)); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) { +dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - /* flag messages as committed */ - for(i = 0 ; i < pBatch->nElem ; ++i) { - batchSetElemState(pBatch, i, BATCH_STATE_COMM); - pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ - } + actionCommitted(pThis, pWti); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; case RS_RET_DEFER_COMMIT: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; case RS_RET_PREVIOUS_COMMITTED: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) @@ -1042,241 +1017,46 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) FINALIZE; } } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } -/* try to submit a partial batch of elements. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti) -{ - int i; - int iElemProcessed; - int iCommittedUpTo; - msg_t *pMsg; - rsRetVal localRet; - DEFiRet; - - assert(pBatch != NULL); - assert(pnElem != NULL); - - i = pBatch->iDoneUpTo; /* all messages below that index are processed */ - iElemProcessed = 0; - iCommittedUpTo = i; - DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); - while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - /* NOTE: do NOT extend the filter below! Anything else must be done on the - * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 - */ - if(batchIsValidElem(pBatch, i)) { - pMsg = pBatch->pElem[i].pMsg; - localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate, pWti); - DBGPRINTF("action %p call returned %d\n", pAction, localRet); - /* Note: we directly modify the batch object state, because we know that - * wo do not overwrite BATCH_STATE_DISC indicators! - */ - if(localRet == RS_RET_OK) { - /* mark messages as committed */ - while(iCommittedUpTo <= i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - /* mark messages as committed */ - while(iCommittedUpTo < i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", - localRet, *pnElem, iCommittedUpTo); - iRet = localRet; - FINALIZE; - } - } - ++i; - ++iElemProcessed; - } - -finalize_it: - if(pBatch->iDoneUpTo != iCommittedUpTo) { - pBatch->iDoneUpTo = iCommittedUpTo; - } - RETiRet; -} - -/* submit a batch for actual action processing. - * The first nElem elements are processed. This function calls itself - * recursively if it needs to handle errors. - * Note: we don't need the number of the first message to be processed as a parameter, - * because this is kept track of inside the batch itself (iDoneUpTo). - * rgerhards, 2009-05-12 - */ -static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti) -{ - int i; - int bDone; - rsRetVal localRet; - int wasDoneTo; - DEFiRet; - - assert(pBatch != NULL); - - DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); - wasDoneTo = pBatch->iDoneUpTo; - bDone = 0; - do { - localRet = tryDoAction(pAction, pBatch, &nElem, pWti); - if(localRet == RS_RET_FORCE_TERM) { - ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - /* try commit transaction, once done, we can simply do so as if - * that return state was returned from tryDoAction(). - */ - localRet = finishBatch(pAction, pBatch, pWti); - } - - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - bDone = 1; - } else if(localRet == RS_RET_SUSPENDED) { - DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); - /* do nothing, this will retry the full batch */ - } else if(localRet == RS_RET_ACTION_FAILED) { - /* in this case, everything not yet committed is BAD */ - for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && pBatch->eltState[i] != BATCH_STATE_COMM ) { - pBatch->eltState[i] = BATCH_STATE_BAD; - pBatch->pElem[i].bPrevWasSuspended = 1; - STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); - } - } - bDone = 1; - } else { - if(nElem == 1) { - batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); - bDone = 1; - } else { - /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " - "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2, pWti); - submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti); - bDone = 1; - } - } - } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - -finalize_it: - RETiRet; -} - - -/* copy "active" array of batch, as we need to modify it. The caller - * must make sure the new array is freed and the orginal batch - * pointer is restored (thus the caller must save it). If active - * is currently NULL, this is properly handled. - * Note: the batches active pointer is modified, so it must be - * saved BEFORE calling this function! - * rgerhards, 2012-09-12 +/* process a single message. This is both called if we run from the + * cosumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". */ static rsRetVal -copyActive(batch_t *pBatch) +processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate) { - sbool *active; DEFiRet; - CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - if(pBatch->active == NULL) - memset(active, 1, batchNumMsgs(pBatch)); - else - memcpy(active, pBatch->active, batchNumMsgs(pBatch)); - pBatch->active = active; -finalize_it: +dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg); + // TODO: check error return states! + iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); + iRet = actionProcessMessage(pAction, pMsg, + pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, + pbShutdownImmediate, pWti); + releaseDoActionParams(pAction, pWti); RETiRet; } -/* The following function prepares a batch for processing, that it is - * reinitializes batch states, generates strings and does everything else - * that needs to be done in order to make the batch ready for submission to - * the actual output module. Note that we look at the precomputed - * filter OK condition and process only those messages, that actually matched - * the filter. - * rgerhards, 2010-06-14 - */ -static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) +/* Commit all active transactions */ +rsRetVal +actionCommitAll(wti_t *pWti) { int i; - batch_obj_t *pElem; - struct syslogTime ttNow; - DEFiRet; - - /* indicate we have not yet read the date */ - ttNow.year = 0; - - pBatch->iDoneUpTo = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - pBatch->eltState[i] = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { - /* make sure we have our copy of "active" array */ - if(!*bMustRestoreActivePtr) { - *activeSave = pBatch->active; - copyActive(pBatch); - } - pBatch->active[i] = RSFALSE; - } + for(i = 0 ; i < iActionNbr ; ++i) { + DBGPRINTF("DDDD: actionCommitall action %d state %u\n", + i, getActionStateByNbr(pWti, i)); + if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) { + actionCommit(pWti->actWrkrInfo[i].pAction, pWti); } } - RETiRet; -} - - -/* receive a batch and process it. This includes retry handling. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti) -{ - DEFiRet; - - assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti)); - iRet = finishBatch(pAction, pBatch, pWti); - -finalize_it: - RETiRet; } - /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 @@ -1284,41 +1064,30 @@ finalize_it: static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { - int *pbShutdownImmdtSave; - sbool *activeSave; - int bMustRestoreActivePtr = 0; - rsRetVal localRet; action_t *pAction = (action_t*) pVoid; + msg_t *pMsg; + int i; + struct syslogTime ttNow; DEFiRet; - assert(pBatch != NULL); - - if(pbShutdownImmediate != NULL) { - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; + if(pbShutdownImmediate == NULL) { + pbShutdownImmediate = pBatch->pbShutdownImmediate; } - CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - - iRet = processAction(pAction, pBatch, pWti); - /* even if processAction failed, we need to release the batch (else we - * have a memory leak). So we do this first, and then check if we need to - * return an error code. If so, the code from processAction has priority. - * rgerhards, 2010-12-17 - */ - localRet = releaseBatch(pAction, pBatch); + /* indicate we have not yet read the date */ + ttNow.year = 0; - if(iRet == RS_RET_OK) - iRet = localRet; - - if(bMustRestoreActivePtr) { - free(pBatch->active); - pBatch->active = activeSave; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) { + if(batchIsValidElem(pBatch, i)) { + pMsg = pBatch->pElem[i].pMsg; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } } -finalize_it: - if(pbShutdownImmediate != NULL) - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + iRet = actionCommit(pAction, pWti); +dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } @@ -1381,11 +1150,12 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT * rgerhards, 2010-06-08 */ static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) +doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg) { DEFiRet; - if(pAction->eState == ACT_STATE_DIED) { + // TODO: bug? Isn't that supposed to be checked in direct mode as well??? + if(getActionState(pWti, pAction) == ACT_STATE_DIED) { DBGPRINTF("action %p died, do NOT execute\n", pAction); FINALIZE; } @@ -1464,7 +1234,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg, pWti); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; @@ -1475,12 +1245,10 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) +doActionCallAction(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - msg_t *pMsg; DEFiRet; - pMsg = pBatch->pElem[idxBtch].pMsg; pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1493,13 +1261,6 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: - /* we need to update the batch to handle failover processing correctly */ - if(iRet == RS_RET_OK) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 0; - } else if(iRet == RS_RET_ACTION_FAILED) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 1; - } - RETiRet; } @@ -1518,7 +1279,8 @@ DEFFUNC_llExecFunc(doActivateActions) errmsg.LogError(0, localRet, "file prefix (work directory?) " "is missing"); } - actionDisable(pThis); +#warning think how to handle this: + //actionDisable(pThis); } DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), pThis, pThis->pQueue); @@ -1550,113 +1312,55 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { time_t now = 0; + int doProcess = 1; time_t lastAct; - int i; - sbool *activeSave; DEFiRet; - activeSave = pBatch->active; - copyActive(pBatch); - - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) - continue; - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - } - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if(pBatch->pElem[i].pMsg->msgFlags & MARK) { - if((now - lastAct) < MarkInterval / 2) { - pBatch->active[i] = 0; - DBGPRINTF("batch item %d: action was recently called, ignoring " - "mark message\n", i); - break; /* do not update timestamp for non-written mark messages */ - } + if(now == 0) { // TODO: do in caller! + now = datetime.GetTime(NULL); /* good time call - the only one done */ + } + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((now - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->active[i]) { - DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", - i, module.GetStateName(pAction->pMod)); } - } - - iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); - free(pBatch->active); - pBatch->active = activeSave; + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQBatch(pAction, pWti, pMsg); + } RETiRet; } -static inline void -countStatsBatchEnq(action_t *pAction, batch_t *pBatch) -{ - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - } - } -} - /* enqueue a batch in direct mode. We have put this into its own function just to avoid * cluttering the actual submit function. * rgerhards, 2011-06-16 */ static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - sbool bNeedSubmit; - sbool *activeSave; - int i; +int pbShutdownImmediate = 0; // TODO: implement + struct syslogTime ttNow; DEFiRet; - - activeSave = pBatch->active; - copyActive(pBatch); - - /* note: for direct mode, we need to adjust the filter property. For non-direct - * this is not necessary, because in that case we enqueue only what actually needs - * to be processed. - */ - if(pAction->bExecWhenPrevSusp) { - bNeedSubmit = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(!pBatch->pElem[i].bPrevWasSuspended) { - DBGPRINTF("action enq stage: change active to 0 due to " - "failover case in elem %d\n", i); - pBatch->active[i] = 0; - } - if(batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - bNeedSubmit = 1; - } - DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - } - if(bNeedSubmit) { - /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); - } else { - DBGPRINTF("no need to submit batch, all invalid\n"); - } - } else { - if(GatherStats) - countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); - } - - free(pBatch->active); - pBatch->active = activeSave; + if(GatherStats) + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate); RETiRet; } @@ -1666,72 +1370,38 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - int i; DEFiRet; - DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); + iRet = doQueueEnqObjDirectBatch(pAction, pWti, pMsg); } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); - } - } + doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } - -/* Helper to submit a batch of actions to the engine. Note that we have rather - * complicated processing here, so we need to do this one message after another. - * rgerhards, 2010-06-23 - */ -static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action %p (complex case), logging to %s\n", - pAction, module.GetStateName(pAction->pMod)); - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i, pWti); - } - } - - RETiRet; -} - /* Call configured action, most complex case with all features supported (and thus slow). * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); -dbgprintf("DDDD: locked mutAction\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + doActionCallAction(pAction, pWti, pMsg); d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ @@ -1876,11 +1546,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* check if the module is compatible with select features (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ - if(bSuspended) - actionSuspend(pAction); +#warning we need to look at the following + // Probably the core init needs to be done during createWrkrInstance() + //if(bSuspended) + // actionSuspend(pAction, pWti); CHKiRet(actionConstructFinalize(pAction, lst)); @@ -1975,7 +1645,6 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ } *ppAction = pAction; @@ -35,15 +35,6 @@ extern int glbliActionResumeRetryCount; -typedef enum { - ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZERO! */ - ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */ - ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */ - ACT_STATE_COMM = 3, /* transaction finished (a transient state) */ - ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */ - ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */ -} action_state_t; - /* the following struct defines the action object data structure */ struct action_s { @@ -55,13 +46,10 @@ struct action_s { sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */ sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */ int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */ - action_state_t eState; /* current state of action */ sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */ time_t ttResumeRtry; /* when is it time to retry the resume? */ - int iResumeOKinRow; /* number of times in a row that resume said OK with an immediate failure following */ int iResumeInterval;/* resume interval for this action */ int iResumeRetryCount;/* how often shall we retry a suspended action? (-1 --> eternal) */ - int iNbrResRtry; /* number of retries since last suspend */ int iNbrNoExec; /* number of matches that did not yet yield to an exec */ int iExecEveryNthOccur;/* execute this action only every n-th occurence (with n=0,1 -> always) */ int iExecEveryNthOccurTO;/* timeout for n-th occurence feature */ @@ -69,7 +57,7 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - rsRetVal (*submitToActQ)(action_t *, batch_t *, wti_t*);/* function submit message to action queue */ + rsRetVal (*submitToActQ)(action_t *, wti_t*, msg_t*);/* function submit message to action queue */ rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2, ACT_JSON_PASSING = 3} @@ -79,7 +67,7 @@ struct action_s { * in this order. */ qqueue_t *pQueue; /* action queue */ pthread_mutex_t mutAction; /* primary action mutex */ - uchar *pszName; /* action name (for documentation) */ + uchar *pszName; /* action name */ DEF_ATOMIC_HELPER_MUT(mutCAS); /* for statistics subsystem */ statsobj_t *statsobj; @@ -93,7 +81,7 @@ struct action_s { rsRetVal actionConstruct(action_t **ppThis); rsRetVal actionConstructFinalize(action_t *pThis, struct nvlst *lst); rsRetVal actionDestruct(action_t *pThis); -rsRetVal actionDbgPrint(action_t *pThis); +//rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*); @@ -103,6 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr rsRetVal activateActions(void); rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction); rsRetVal actionProcessCnf(struct cnfobj *o); +rsRetVal actionCommitAll(wti_t *pWti); /* external data */ extern int iActionNbr; diff --git a/runtime/batch.h b/runtime/batch.h index 2ec07670..fac0158e 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -46,17 +46,6 @@ typedef unsigned char batch_state_t; */ struct batch_obj_s { msg_t *pMsg; - /* work variables for action processing; these are reused for each action (or block of - * actions) - */ - sbool bPrevWasSuspended; - /* following are caches to save allocs if not absolutely necessary */ - uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ - /* a cache to save malloc(), if not absolutely necessary */ - void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */ - size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; - /* and the same for the message length (if used) */ - /* end action work variables */ }; /* the batch @@ -152,7 +141,7 @@ batchFree(batch_t *pBatch) { /* staticActParams MUST be freed immediately (if required), * so we do not need to do that! */ - free(pBatch->pElem[i].staticActStrings[j]); + //TODO: do this in wti! free(pBatch->pElem[i].staticActStrings[j]); } } free(pBatch->pElem); @@ -171,7 +160,6 @@ batchInit(batch_t *pBatch, int maxElem) { pBatch->maxElem = maxElem; CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t))); - // TODO: replace calloc by inidividual writes? finalize_it: RETiRet; } diff --git a/runtime/conf.c b/runtime/conf.c index 2b000c60..1544e364 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -523,7 +523,6 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction) (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* here check if the module is compatible with select features * (currently, we have no such features!) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ conf->actions.nbrActions++; /* one more active action! */ } break; diff --git a/runtime/queue.c b/runtime/queue.c index 968c016e..93661e41 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -987,10 +987,6 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) singleBatch.eltState = &batchState; singleBatch.active = &active; iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); - /* delete the batch string params: TODO: create its own "class" for this */ - for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { - free(batchObj.staticActStrings[i]); - } msgDestruct(&pMsg); RETiRet; diff --git a/runtime/ruleset.c b/runtime/ruleset.c index db253d28..2ad21170 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -228,10 +228,40 @@ static inline void freeActive(sbool *active) { free(active); } static rsRetVal execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { + int i; DEFiRet; dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); pBatch->active = active; - stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti); +// TODO: check here if bPrevWasSuspsended was required and, if so +// if we actually are permitted to execute this action. + //if(pAction->bExecWhenPrevSusp) { + + + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + DBGPRINTF("action %d: valid:%d state:%d execWhenPrev:%d\n", + stmt->d.act->iActionNbr, batchIsValidElem(pBatch, i), pBatch->eltState[i], + stmt->d.act->bExecWhenPrevSusp); + if(batchIsValidElem(pBatch, i)) { + stmt->d.act->submitToActQ(stmt->d.act, pWti, pBatch->pElem[i].pMsg); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } + } + + +#warning implement action return code checking +// we should store the return code and make it available +// to users via a special function (or maybe variable) +// internally, we can use this for bPrevWasSuspended checking +// to implement this system, we need to keep a kind of +// "execution state" when running the rule engine. This most +// probably is best done inside the wti object. +// I think in v7 there was a bug, so that bPrevWasSuspended did +// not properly make it onto the next batch (because it was +// stored within the batch state) -- but even if so, the +// exposure window was minimal, as the action would probably +// fail the next time again. [TODO: check if batch object survived +// end of batch, in which case it was probably correctly handled] RETiRet; } @@ -582,6 +612,7 @@ processBatch(batch_t *pBatch, wti_t *pWti) CHKiRet(processBatchMultiRuleset(pBatch, pWti)); } + actionCommitAll(pWti); finalize_it: DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet); RETiRet; diff --git a/runtime/wti.h b/runtime/wti.h index bb4f56bc..79b62102 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -26,11 +26,32 @@ #include "wtp.h" #include "obj.h" #include "batch.h" +#include "action.h" +#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */ +#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */ +#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */ +#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */ +#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */ +#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */ +/* note: 3 bit bit field --> highest value is 7! */ + typedef struct actWrkrInfo { action_t *pAction; void *actWrkrData; + uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */ + int iNbrResRtry; /* number of retries since last suspend */ + struct { + unsigned actState : 3; + } flags; + /* following are caches to save allocs if not absolutely necessary */ + uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ + /* a cache to save malloc(), if not absolutely necessary */ + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */ + size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + /* and the same for the message length (if used) */ + /* end action work variables */ } actWrkrInfo_t; /* the worker thread instance class */ @@ -62,4 +83,57 @@ PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); +static inline uint8_t +getActionStateByNbr(wti_t *pWti, int iActNbr) +{ + return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState); +} + +static inline uint8_t +getActionState(wti_t *pWti, action_t *pAction) +{ + return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState); +} + +static inline void +setActionState(wti_t *pWti, action_t *pAction, uint8_t newState) +{ + pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState; +} + +static inline uint16_t +getActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow); +} + +static inline void +setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val; +} + +static inline void +incActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++; +} + +static inline int +getActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry); +} + +static inline void +setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val; +} + +static inline void +incActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++; +} #endif /* #ifndef WTI_H_INCLUDED */ |