diff options
-rw-r--r-- | action.c | 790 | ||||
-rw-r--r-- | action.h | 19 | ||||
-rw-r--r-- | runtime/batch.h | 38 | ||||
-rw-r--r-- | runtime/conf.c | 1 | ||||
-rw-r--r-- | runtime/queue.c | 6 | ||||
-rw-r--r-- | runtime/ruleset.c | 364 | ||||
-rw-r--r-- | runtime/wti.c | 1 | ||||
-rw-r--r-- | runtime/wti.h | 113 | ||||
-rw-r--r-- | tools/syslogd.c | 17 |
9 files changed, 469 insertions, 880 deletions
@@ -14,7 +14,6 @@ * * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval * - doSubmitToActionQComplexBatch - * - helperSubmitToActionQComplexBatch * - doActionCallAction * handles mark message reduction, but in essence calls * - actionWriteToAction @@ -36,9 +35,6 @@ * * After dequeue, processing is as follows: * - processBatchMain - * - processAction - * - submitBatch - * - tryDoAction * - ... * * MORE ON PROCESSING, QUEUES and FILTERING @@ -69,7 +65,7 @@ * beast. * rgerhards, 2011-06-15 * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -120,9 +116,9 @@ /* forward definitions */ static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -496,9 +492,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal) * returned string must not be modified. * rgerhards, 2009-05-07 */ -static uchar *getActStateName(action_t *pThis) +static uchar *getActStateName(action_t *pThis, wti_t *pWti) { - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: return (uchar*) "rdy"; case ACT_STATE_ITX: @@ -520,12 +516,12 @@ static uchar *getActStateName(action_t *pThis) /* returns a suitable return code based on action state * rgerhards, 2009-05-07 */ -static rsRetVal getReturnCode(action_t *pThis) +static rsRetVal getReturnCode(action_t *pThis, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: iRet = RS_RET_OK; break; @@ -545,8 +541,8 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_ACTION_FAILED; break; default: - DBGPRINTF("Invalid action engine state %d, program error\n", - (int) pThis->eState); + DBGPRINTF("Invalid action engine state %u, program error\n", + getActionState(pWti, pThis)); iRet = RS_RET_ERR; break; } @@ -558,29 +554,31 @@ static rsRetVal getReturnCode(action_t *pThis) /* set the action to a new state * rgerhards, 2007-08-02 */ -static inline void actionSetState(action_t *pThis, action_state_t newState) +static inline void +actionSetState(action_t *pThis, wti_t *pWti, uint8_t newState) { - pThis->eState = newState; - DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); + setActionState(pWti, pThis, newState); + DBGPRINTF("Action %d transitioned to state: %s\n", + pThis->iActionNbr, getActStateName(pThis, pWti)); } /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ -static void actionCommitted(action_t *pThis) +static void actionCommitted(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } /* set action to "rtry" state. * rgerhards, 2007-08-02 */ -static void actionRetry(action_t *pThis) +static void actionRetry(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RTRY); - pThis->iResumeOKinRow++; + actionSetState(pThis, pWti, ACT_STATE_RTRY); + incActionResumeInRow(pWti, pThis); } @@ -589,9 +587,9 @@ static void actionRetry(action_t *pThis) * depends on output module. * rgerhards, 2007-08-02 */ -static void actionDisable(action_t *pThis) +static void actionDisable(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_DIED); + actionSetState(pThis, pWti, ACT_STATE_DIED); } @@ -603,7 +601,7 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis) +static inline void actionSuspend(action_t *pThis, wti_t *pWti) { time_t ttNow; @@ -611,8 +609,9 @@ static inline void actionSuspend(action_t *pThis) * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); - actionSetState(pThis, ACT_STATE_SUSP); + pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * + (getActionNbrResRtry(pWti, pThis) / 10 + 1); + actionSetState(pThis, pWti, ACT_STATE_SUSP); DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -633,7 +632,7 @@ static inline void actionSuspend(action_t *pThis) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) { int iRetries; int iSleepPeriod; @@ -643,27 +642,27 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); - if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { + if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) { bTreatOKasSusp = 1; - pThis->iResumeOKinRow = 0; + setActionResumeInRow(pWti, pThis, 0); } else { bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis); + actionSuspend(pThis, pWti); } else { - ++pThis->iNbrResRtry; + incActionNbrResRtry(pWti, pThis); ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); @@ -672,12 +671,12 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) } } } else if(iRet == RS_RET_DISABLE_ACTION) { - actionDisable(pThis); + actionDisable(pThis, pWti); } } - if(pThis->eState == ACT_STATE_RDY) { - pThis->iNbrResRtry = 0; + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + setActionNbrResRtry(pWti, pThis, 0); } finalize_it: @@ -696,6 +695,7 @@ dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), pThis->pModData)); pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ } finalize_it: RETiRet; @@ -704,14 +704,13 @@ finalize_it: /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +static rsRetVal +actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_SUSP) { + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used * to use the action timestamp, but in this case we will never reach a @@ -721,19 +720,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t */ datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow >= pThis->ttResumeRtry) { - actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ + actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ } } - if(pThis->eState == ACT_STATE_RTRY) { + if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate)); } - if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", - pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); } finalize_it: @@ -745,28 +744,30 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +static inline rsRetVal +actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; assert(pThis != NULL); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ - if(pThis->eState == ACT_STATE_RDY) { + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { +dbgprintf("DDDDD: calling beginTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionSetState(pThis, ACT_STATE_ITX); + actionSetState(pThis, pWti, ACT_STATE_ITX); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; default:FINALIZE; } @@ -777,10 +778,11 @@ finalize_it: } +#if 0 // TODO: remove? /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ -rsRetVal actionDbgPrint(action_t *pThis) +static rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; char *sz; @@ -790,11 +792,12 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - if(pThis->eState == ACT_STATE_SUSP) { +#if 0 // do we need this ??? + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\tState: %s\n", getActStateName(pThis)); +#endif dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { sz = "slow, but feature-rich"; @@ -810,40 +813,41 @@ rsRetVal actionDbgPrint(action_t *pThis) RETiRet; } +#endif /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ static rsRetVal -prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) +prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { int i; - msg_t *pMsg; struct json_object *json; + actWrkrIParams_t *iparams; + actWrkrInfo_t *pWrkrInfo; DEFiRet; - ASSERT(pAction != NULL); - ASSERT(pElem != NULL); - - pMsg = pElem->pMsg; + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), - &pElem->staticLenStrings[i], ttNow)); - pElem->staticActParams[i] = pElem->staticActStrings[i]; + CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); + iparams->msgFlags = pMsg->msgFlags; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(iparams->staticActStrings[i]), + &iparams->staticLenStrings[i], ttNow)); + iparams->staticActParams[i] = iparams->staticActStrings[i]; break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow)); break; case ACT_MSG_PASSING: - pElem->staticActParams[i] = (void*) pMsg; + pWrkrInfo->staticActParams[i] = (void*) pMsg; break; case ACT_JSON_PASSING: CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); - pElem->staticActParams[i] = (void*) json; + pWrkrInfo->staticActParams[i] = (void*) json; break; default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", (int) pAction->eParamPassing); @@ -857,110 +861,95 @@ finalize_it: } -/* free a batches ressources, but not string buffers (because they will - * most probably be reused). String buffers are only deleted upon final - * destruction of the batch. - * This function here must be called only when the batch is actually no - * longer used, also not for retrying actions or such. It invalidates - * buffers. - * rgerhards, 2010-12-17 - */ -static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +static void +releaseDoActionParams(action_t *pAction, wti_t *pWti) { int jArr; - int i, j; - batch_obj_t *pElem; + int j; + actWrkrInfo_t *pWrkrInfo; uchar ***ppMsgs; - DEFiRet; - - ASSERT(pAction != NULL); if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) goto done; /* we need to do nothing with these types! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - ppMsgs = (uchar***) pElem->staticActParams; - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - if(((uchar**)ppMsgs)[j] != NULL) { - jArr = 0; - while(ppMsgs[j][jArr] != NULL) { - d_free(ppMsgs[j][jArr]); - ppMsgs[j][jArr] = NULL; - ++jArr; - } - d_free(((uchar**)ppMsgs)[j]); - ((uchar**)ppMsgs)[j] = NULL; - } - } - break; - case ACT_JSON_PASSING: - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - json_object_put((struct json_object*) - pElem->staticActParams[j]); - pElem->staticActParams[j] = NULL; + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pWrkrInfo->staticActParams; + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + free(ppMsgs[j][jArr]); + ppMsgs[j][jArr] = NULL; + ++jArr; } - break; - case ACT_STRING_PASSING: - case ACT_MSG_PASSING: - /* can never happen, just to keep compiler happy! */ - break; + free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; } } + break; + case ACT_JSON_PASSING: + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + json_object_put((struct json_object*) + pWrkrInfo->staticActParams[j]); + pWrkrInfo->staticActParams[j] = NULL; + } + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* can never happen, just to keep compiler happy! */ + break; } -done: RETiRet; +done: return; } /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ -rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) +static rsRetVal +actionCallDoAction(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", - getActStateName(pThis), pThis->iActionNbr); + getActStateName(pThis, pWti), pThis->iActionNbr); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, + iRet = pThis->pMod->mod.om.doAction(actParams, msgFlags, pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionCommitted(pThis, pWti); + setActionResumeInRow(pWti, pThis, 0); break; case RS_RET_DEFER_COMMIT: - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); /* we are done, action state remains the same */ break; case RS_RET_PREVIOUS_COMMITTED: /* action state remains the same, but we had a commit. */ pThis->bHadAutoCommit = 1; - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) */ FINALIZE; } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; @@ -971,70 +960,70 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti) +rsRetVal +actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); - if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti)); - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } -/* finish processing a batch. Most importantly, that means we commit if we - * need to do so. - * rgerhards, 2008-01-28 - */ +/* Commit try committing (do not handle retry processing and such) */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) +actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) { - int i; + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparamCurr, *iparamDel; DEFiRet; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_RDY) { - /* we just need to flag the batch as commited */ - FINALIZE; /* nothing to do */ + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + dbgprintf("DDDD: actionCommit: action %d, root %p\n", pThis->iActionNbr, wrkrInfo->iparamRoot); + if(wrkrInfo->iparamRoot != NULL) { + iparamCurr = wrkrInfo->iparamRoot; + while(iparamCurr != NULL) { + iRet = actionProcessMessage(pThis, iparamCurr->msgFlags, + iparamCurr->staticActParams, + pbShutdownImmediate, pWti); + releaseDoActionParams(pThis, pWti); + iparamDel = iparamCurr; + iparamCurr = iparamCurr->next; + free(iparamDel); // TODO: memleak strings! + } + wrkrInfo->iparamLast = NULL; } - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); - if(pThis->eState == ACT_STATE_ITX) { + CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) { +dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - /* flag messages as committed */ - for(i = 0 ; i < pBatch->nElem ; ++i) { - batchSetElemState(pBatch, i, BATCH_STATE_COMM); - pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ - } + actionCommitted(pThis, pWti); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; case RS_RET_DEFER_COMMIT: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; case RS_RET_PREVIOUS_COMMITTED: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) @@ -1042,241 +1031,76 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) FINALIZE; } } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } - -/* try to submit a partial batch of elements. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti) -{ - int i; - int iElemProcessed; - int iCommittedUpTo; - msg_t *pMsg; - rsRetVal localRet; - DEFiRet; - - assert(pBatch != NULL); - assert(pnElem != NULL); - - i = pBatch->iDoneUpTo; /* all messages below that index are processed */ - iElemProcessed = 0; - iCommittedUpTo = i; - DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); - while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - /* NOTE: do NOT extend the filter below! Anything else must be done on the - * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 - */ - if(batchIsValidElem(pBatch, i)) { - pMsg = pBatch->pElem[i].pMsg; - localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate, pWti); - DBGPRINTF("action %p call returned %d\n", pAction, localRet); - /* Note: we directly modify the batch object state, because we know that - * wo do not overwrite BATCH_STATE_DISC indicators! - */ - if(localRet == RS_RET_OK) { - /* mark messages as committed */ - while(iCommittedUpTo <= i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - /* mark messages as committed */ - while(iCommittedUpTo < i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", - localRet, *pnElem, iCommittedUpTo); - iRet = localRet; - FINALIZE; - } - } - ++i; - ++iElemProcessed; - } - -finalize_it: - if(pBatch->iDoneUpTo != iCommittedUpTo) { - pBatch->iDoneUpTo = iCommittedUpTo; - } - RETiRet; -} - -/* submit a batch for actual action processing. - * The first nElem elements are processed. This function calls itself - * recursively if it needs to handle errors. - * Note: we don't need the number of the first message to be processed as a parameter, - * because this is kept track of inside the batch itself (iDoneUpTo). - * rgerhards, 2009-05-12 - */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti) +actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) { - int i; - int bDone; - rsRetVal localRet; - int wasDoneTo; DEFiRet; - - assert(pBatch != NULL); - - DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); - wasDoneTo = pBatch->iDoneUpTo; - bDone = 0; - do { - localRet = tryDoAction(pAction, pBatch, &nElem, pWti); - if(localRet == RS_RET_FORCE_TERM) { - ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - /* try commit transaction, once done, we can simply do so as if - * that return state was returned from tryDoAction(). - */ - localRet = finishBatch(pAction, pBatch, pWti); - } - - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - bDone = 1; - } else if(localRet == RS_RET_SUSPENDED) { - DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); - /* do nothing, this will retry the full batch */ - } else if(localRet == RS_RET_ACTION_FAILED) { - /* in this case, everything not yet committed is BAD */ - for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && pBatch->eltState[i] != BATCH_STATE_COMM ) { - pBatch->eltState[i] = BATCH_STATE_BAD; - pBatch->pElem[i].bPrevWasSuspended = 1; - STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); - } - } - bDone = 1; - } else { - if(nElem == 1) { - batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); - bDone = 1; - } else { - /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " - "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2, pWti); - submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti); - bDone = 1; - } - } - } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - -finalize_it: - RETiRet; -} - - -/* copy "active" array of batch, as we need to modify it. The caller - * must make sure the new array is freed and the orginal batch - * pointer is restored (thus the caller must save it). If active - * is currently NULL, this is properly handled. - * Note: the batches active pointer is modified, so it must be - * saved BEFORE calling this function! - * rgerhards, 2012-09-12 - */ -static rsRetVal -copyActive(batch_t *pBatch) -{ - sbool *active; - DEFiRet; - - CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - if(pBatch->active == NULL) - memset(active, 1, batchNumMsgs(pBatch)); - else - memcpy(active, pBatch->active, batchNumMsgs(pBatch)); - pBatch->active = active; -finalize_it: +// TODO: #warning do we really need to return something? + /* even more TODO: + This is the place where retry processing needs to go in. If the action + permanently fails, we should - as a new feature - add the capability to + write an error file. This is already done be omelasticsearch, and IMHO + pretty useful. + For the time being, I do NOT implement all of this (not even retry!) + as I want to get the rest of the engine to SISD (non-SIMD ;)) so that + I know any potential suprises and complications that arise out of this. + When this is done, I can come back here and complete this work. Obviously, + many features do not work in the mean time (but it is not planned to release + any of these partial implementations). + rgerhards, 2013-11-04 + */ + iRet = actionTryCommit(pThis, pWti, pbShutdownImmediate); RETiRet; } -/* The following function prepares a batch for processing, that it is - * reinitializes batch states, generates strings and does everything else - * that needs to be done in order to make the batch ready for submission to - * the actual output module. Note that we look at the precomputed - * filter OK condition and process only those messages, that actually matched - * the filter. - * rgerhards, 2010-06-14 - */ -static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) +/* Commit all active transactions in *DIRECT mode* */ +void +actionCommitAllDirect(wti_t *pWti) { int i; - batch_obj_t *pElem; - struct syslogTime ttNow; - DEFiRet; - - /* indicate we have not yet read the date */ - ttNow.year = 0; + action_t *pAction; - pBatch->iDoneUpTo = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - pBatch->eltState[i] = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { - /* make sure we have our copy of "active" array */ - if(!*bMustRestoreActivePtr) { - *activeSave = pBatch->active; - copyActive(pBatch); - } - pBatch->active[i] = RSFALSE; - } - } + for(i = 0 ; i < iActionNbr ; ++i) { + dbgprintf("DDDD: actionCommitAll: action %d, state %u, root %p\n", + i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot); + pAction = pWti->actWrkrInfo[i].pAction; + if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT) + actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pWti->pbShutdownImmediate); } - RETiRet; } - -/* receive a batch and process it. This includes retry handling. - * rgerhards, 2009-05-12 +/* process a single message. This is both called if we run from the + * cosumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". */ -static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti) +static rsRetVal +processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate) { DEFiRet; - assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti)); - iRet = finishBatch(pAction, pBatch, pWti); +dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg); + // TODO: check error return states! + iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); + if(pAction->eParamPassing == ACT_STRING_PASSING) { + pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; + dbgprintf("DDDD: action %d is string passing - executing in commit phase\n", pAction->iActionNbr); + FINALIZE; + } + iRet = actionProcessMessage(pAction, pMsg->msgFlags, + pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, + pbShutdownImmediate, pWti); + releaseDoActionParams(pAction, pWti); finalize_it: RETiRet; } - /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 @@ -1284,41 +1108,30 @@ finalize_it: static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { - int *pbShutdownImmdtSave; - sbool *activeSave; - int bMustRestoreActivePtr = 0; - rsRetVal localRet; action_t *pAction = (action_t*) pVoid; + msg_t *pMsg; + int i; + struct syslogTime ttNow; DEFiRet; - assert(pBatch != NULL); - - if(pbShutdownImmediate != NULL) { - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; + if(pbShutdownImmediate == NULL) { + pbShutdownImmediate = pWti->pbShutdownImmediate; } - CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - iRet = processAction(pAction, pBatch, pWti); - - /* even if processAction failed, we need to release the batch (else we - * have a memory leak). So we do this first, and then check if we need to - * return an error code. If so, the code from processAction has priority. - * rgerhards, 2010-12-17 - */ - localRet = releaseBatch(pAction, pBatch); + /* indicate we have not yet read the date */ + ttNow.year = 0; - if(iRet == RS_RET_OK) - iRet = localRet; - - if(bMustRestoreActivePtr) { - free(pBatch->active); - pBatch->active = activeSave; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) { + if(batchIsValidElem(pBatch, i)) { + pMsg = pBatch->pElem[i].pMsg; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } } -finalize_it: - if(pbShutdownImmediate != NULL) - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + iRet = actionCommit(pAction, pWti, pbShutdownImmediate); +dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } @@ -1381,11 +1194,12 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT * rgerhards, 2010-06-08 */ static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) +doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg) { DEFiRet; - if(pAction->eState == ACT_STATE_DIED) { + // TODO: bug? Isn't that supposed to be checked in direct mode as well??? + if(getActionState(pWti, pAction) == ACT_STATE_DIED) { DBGPRINTF("action %p died, do NOT execute\n", pAction); FINALIZE; } @@ -1464,7 +1278,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg, pWti); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; @@ -1475,12 +1289,10 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) +doActionCallAction(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - msg_t *pMsg; DEFiRet; - pMsg = pBatch->pElem[idxBtch].pMsg; pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1493,13 +1305,6 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: - /* we need to update the batch to handle failover processing correctly */ - if(iRet == RS_RET_OK) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 0; - } else if(iRet == RS_RET_ACTION_FAILED) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 1; - } - RETiRet; } @@ -1518,7 +1323,8 @@ DEFFUNC_llExecFunc(doActivateActions) errmsg.LogError(0, localRet, "file prefix (work directory?) " "is missing"); } - actionDisable(pThis); +#warning think how to handle this: + //actionDisable(pThis); } DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), pThis, pThis->pQueue); @@ -1550,113 +1356,56 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { time_t now = 0; + int doProcess = 1; time_t lastAct; - int i; - sbool *activeSave; DEFiRet; - activeSave = pBatch->active; - copyActive(pBatch); - - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) - continue; - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - } - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if(pBatch->pElem[i].pMsg->msgFlags & MARK) { - if((now - lastAct) < MarkInterval / 2) { - pBatch->active[i] = 0; - DBGPRINTF("batch item %d: action was recently called, ignoring " - "mark message\n", i); - break; /* do not update timestamp for non-written mark messages */ - } + if(now == 0) { // TODO: do in caller! + now = datetime.GetTime(NULL); /* good time call - the only one done */ + } + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((now - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->active[i]) { - DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", - i, module.GetStateName(pAction->pMod)); } - } - - iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); - free(pBatch->active); - pBatch->active = activeSave; + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQBatch(pAction, pWti, pMsg); + } RETiRet; } -static inline void -countStatsBatchEnq(action_t *pAction, batch_t *pBatch) -{ - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - } - } -} - /* enqueue a batch in direct mode. We have put this into its own function just to avoid * cluttering the actual submit function. * rgerhards, 2011-06-16 */ static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - sbool bNeedSubmit; - sbool *activeSave; - int i; +int pbShutdownImmediate = 0; // TODO: implement +dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg); + struct syslogTime ttNow; DEFiRet; - - activeSave = pBatch->active; - copyActive(pBatch); - - /* note: for direct mode, we need to adjust the filter property. For non-direct - * this is not necessary, because in that case we enqueue only what actually needs - * to be processed. - */ - if(pAction->bExecWhenPrevSusp) { - bNeedSubmit = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(!pBatch->pElem[i].bPrevWasSuspended) { - DBGPRINTF("action enq stage: change active to 0 due to " - "failover case in elem %d\n", i); - pBatch->active[i] = 0; - } - if(batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - bNeedSubmit = 1; - } - DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - } - if(bNeedSubmit) { - /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); - } else { - DBGPRINTF("no need to submit batch, all invalid\n"); - } - } else { - if(GatherStats) - countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); - } - - free(pBatch->active); - pBatch->active = activeSave; + if(GatherStats) + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate); RETiRet; } @@ -1666,72 +1415,38 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - int i; DEFiRet; - DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); + iRet = doQueueEnqObjDirectBatch(pAction, pWti, pMsg); } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); - } - } + doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } - -/* Helper to submit a batch of actions to the engine. Note that we have rather - * complicated processing here, so we need to do this one message after another. - * rgerhards, 2010-06-23 - */ -static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action %p (complex case), logging to %s\n", - pAction, module.GetStateName(pAction->pMod)); - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i, pWti); - } - } - - RETiRet; -} - /* Call configured action, most complex case with all features supported (and thus slow). * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); -dbgprintf("DDDD: locked mutAction\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + doActionCallAction(pAction, pWti, pMsg); d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ @@ -1876,11 +1591,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* check if the module is compatible with select features (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ - if(bSuspended) - actionSuspend(pAction); +// TODO #warning we need to look at the following + // Probably the core init needs to be done during createWrkrInstance() + //if(bSuspended) + // actionSuspend(pAction, pWti); CHKiRet(actionConstructFinalize(pAction, lst)); @@ -1975,7 +1690,6 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ } *ppAction = pAction; @@ -35,15 +35,6 @@ extern int glbliActionResumeRetryCount; -typedef enum { - ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZERO! */ - ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */ - ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */ - ACT_STATE_COMM = 3, /* transaction finished (a transient state) */ - ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */ - ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */ -} action_state_t; - /* the following struct defines the action object data structure */ struct action_s { @@ -55,13 +46,10 @@ struct action_s { sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */ sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */ int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */ - action_state_t eState; /* current state of action */ sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */ time_t ttResumeRtry; /* when is it time to retry the resume? */ - int iResumeOKinRow; /* number of times in a row that resume said OK with an immediate failure following */ int iResumeInterval;/* resume interval for this action */ int iResumeRetryCount;/* how often shall we retry a suspended action? (-1 --> eternal) */ - int iNbrResRtry; /* number of retries since last suspend */ int iNbrNoExec; /* number of matches that did not yet yield to an exec */ int iExecEveryNthOccur;/* execute this action only every n-th occurence (with n=0,1 -> always) */ int iExecEveryNthOccurTO;/* timeout for n-th occurence feature */ @@ -69,7 +57,7 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - rsRetVal (*submitToActQ)(action_t *, batch_t *, wti_t*);/* function submit message to action queue */ + rsRetVal (*submitToActQ)(action_t *, wti_t*, msg_t*);/* function submit message to action queue */ rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2, ACT_JSON_PASSING = 3} @@ -79,7 +67,7 @@ struct action_s { * in this order. */ qqueue_t *pQueue; /* action queue */ pthread_mutex_t mutAction; /* primary action mutex */ - uchar *pszName; /* action name (for documentation) */ + uchar *pszName; /* action name */ DEF_ATOMIC_HELPER_MUT(mutCAS); /* for statistics subsystem */ statsobj_t *statsobj; @@ -93,7 +81,7 @@ struct action_s { rsRetVal actionConstruct(action_t **ppThis); rsRetVal actionConstructFinalize(action_t *pThis, struct nvlst *lst); rsRetVal actionDestruct(action_t *pThis); -rsRetVal actionDbgPrint(action_t *pThis); +//rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*); @@ -103,6 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr rsRetVal activateActions(void); rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction); rsRetVal actionProcessCnf(struct cnfobj *o); +void actionCommitAllDirect(wti_t *pWti); /* external data */ extern int iActionNbr; diff --git a/runtime/batch.h b/runtime/batch.h index 2ec07670..5c855521 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -46,17 +46,6 @@ typedef unsigned char batch_state_t; */ struct batch_obj_s { msg_t *pMsg; - /* work variables for action processing; these are reused for each action (or block of - * actions) - */ - sbool bPrevWasSuspended; - /* following are caches to save allocs if not absolutely necessary */ - uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ - /* a cache to save malloc(), if not absolutely necessary */ - void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */ - size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; - /* and the same for the message length (if used) */ - /* end action work variables */ }; /* the batch @@ -79,9 +68,6 @@ struct batch_s { int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */ int iDoneUpTo; /* all messages below this index have state other than RDY */ qDeqID deqID; /* ID of dequeue operation that generated this batch */ - int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ - sbool *active; /* which messages are active for processing, NULL=all */ - sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */ batch_obj_t *pElem; /* batch elements */ batch_state_t *eltState;/* state (array!) for individual objects. NOTE: we have moved this out of batch_obj_t because we @@ -93,24 +79,6 @@ struct batch_s { }; -/* some inline functions (we may move this off to an object .. or not) */ -static inline void -batchSetSingleRuleset(batch_t *pBatch, sbool val) { - pBatch->bSingleRuleset = val; -} - -/* get the batches ruleset (if we have a single ruleset) */ -static inline ruleset_t* -batchGetRuleset(batch_t *pBatch) { - return (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL; -} - -/* get the ruleset of a specifc element of the batch (index not verified!) */ -static inline ruleset_t* -batchElemGetRuleset(batch_t *pBatch, int i) { - return pBatch->pElem[i].pMsg->pRuleset; -} - /* get number of msgs for this batch */ static inline int batchNumMsgs(batch_t *pBatch) { @@ -134,8 +102,7 @@ batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) { */ static inline int batchIsValidElem(batch_t *pBatch, int i) { - return( (pBatch->eltState[i] != BATCH_STATE_DISC) - && (pBatch->active == NULL || pBatch->active[i])); + return(pBatch->eltState[i] != BATCH_STATE_DISC); } @@ -152,7 +119,7 @@ batchFree(batch_t *pBatch) { /* staticActParams MUST be freed immediately (if required), * so we do not need to do that! */ - free(pBatch->pElem[i].staticActStrings[j]); + //TODO: do this in wti! free(pBatch->pElem[i].staticActStrings[j]); } } free(pBatch->pElem); @@ -171,7 +138,6 @@ batchInit(batch_t *pBatch, int maxElem) { pBatch->maxElem = maxElem; CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t))); - // TODO: replace calloc by inidividual writes? finalize_it: RETiRet; } diff --git a/runtime/conf.c b/runtime/conf.c index 2b000c60..1544e364 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -523,7 +523,6 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction) (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* here check if the module is compatible with select features * (currently, we have no such features!) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ conf->actions.nbrActions++; /* one more active action! */ } break; diff --git a/runtime/queue.c b/runtime/queue.c index 968c016e..18f62ffa 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -964,7 +964,6 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) batch_t singleBatch; batch_obj_t batchObj; batch_state_t batchState = BATCH_STATE_RDY; - sbool active = 1; int i; DEFiRet; @@ -985,12 +984,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; - singleBatch.active = &active; iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); - /* delete the batch string params: TODO: create its own "class" for this */ - for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { - free(batchObj.staticActStrings[i]); - } msgDestruct(&pMsg); RETiRet; diff --git a/runtime/ruleset.c b/runtime/ruleset.c index e39a1889..4ac0039a 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -48,6 +48,7 @@ #include "rainerscript.h" #include "srUtils.h" #include "modules.h" +#include "wti.h" #include "dirty.h" /* for main ruleset queue creation */ /* static data */ @@ -68,7 +69,7 @@ static struct cnfparamblk rspblk = /* forward definitions */ static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti); -static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti); +static void scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti); /* ---------- linked-list key handling functions (ruleset) ---------- */ @@ -161,249 +162,87 @@ finalize_it: } -/* This function is similar to processBatch(), but works on a batch that - * contains rules from multiple rulesets. In this case, we can not push - * the whole batch through the ruleset. Instead, we examine it and - * partition it into sub-rulesets which we then push through the system. - * rgerhards, 2010-06-15 - */ -static inline rsRetVal -processBatchMultiRuleset(batch_t *pBatch, wti_t *pWti) -{ - ruleset_t *currRuleset; - batch_t snglRuleBatch; - int i; - int iStart; /* start index of partial batch */ - int iNew; /* index for new (temporary) batch */ - int bHaveUnprocessed; /* do we (still) have unprocessed entries? (loop term predicate) */ - DEFiRet; - - do { - bHaveUnprocessed = 0; - /* search for first unprocessed element */ - for(iStart = 0 ; iStart < pBatch->nElem && pBatch->eltState[iStart] == BATCH_STATE_DISC ; ++iStart) - /* just search, no action */; - if(iStart == pBatch->nElem) - break; /* everything processed */ - - /* prepare temporary batch */ - CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem)); - snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate; - currRuleset = batchElemGetRuleset(pBatch, iStart); - iNew = 0; - for(i = iStart ; i < pBatch->nElem ; ++i) { - if(batchElemGetRuleset(pBatch, i) == currRuleset) { - /* for performance reasons, we copy only those members that we actually need */ - snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg; - snglRuleBatch.eltState[iNew] = pBatch->eltState[i]; - ++iNew; - /* We indicate the element also as done, so it will not be processed again */ - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - bHaveUnprocessed = 1; - } - } - snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ - batchSetSingleRuleset(&snglRuleBatch, 1); - /* process temp batch */ - processBatch(&snglRuleBatch, pWti); - batchFree(&snglRuleBatch); - } while(bHaveUnprocessed == 1); - -finalize_it: - RETiRet; -} - -/* return a new "active" structure for the batch. Free with freeActive(). */ -static inline sbool *newActive(batch_t *pBatch) +static void +execAct(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - return malloc(sizeof(sbool) * batchNumMsgs(pBatch)); - -} -static inline void freeActive(sbool *active) { free(active); } +// TODO: check here if bPrevWasSuspsended was required and, if so +// if we actually are permitted to execute this action. +// NOTE: this will primarily be handled by end-of-batch processing - -/* for details, see scriptExec() header comment! */ -/* call action for all messages with filter on */ -static rsRetVal -execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) -{ - DEFiRet; -dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); - pBatch->active = active; - stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti); - RETiRet; + DBGPRINTF("executing action %d\n", stmt->d.act->iActionNbr); + stmt->d.act->submitToActQ(stmt->d.act, pWti, pMsg); } -static rsRetVal -execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +static void +execSet(struct cnfstmt *stmt, msg_t *pMsg) { - int i; struct var result; - DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg); - msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname, - &result); - varDelete(&result); - } - } - RETiRet; + cnfexprEval(stmt->d.s_set.expr, &result, pMsg); + msgSetJSONFromVar(pMsg, stmt->d.s_set.varname, &result); + varDelete(&result); } -static rsRetVal -execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +static void +execUnset(struct cnfstmt *stmt, msg_t *pMsg) { - int i; - DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - msgDelJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname); - } - } - RETiRet; + msgDelJSON(pMsg, stmt->d.s_unset.varname); } -/* for details, see scriptExec() header comment! */ -/* "stop" simply discards the filtered items - it's just a (hopefully more intuitive - * shortcut for users. - */ static rsRetVal -execStop(batch_t *pBatch, sbool *active) +execCall(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - int i; - DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } - } - RETiRet; -} -static rsRetVal -execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) -{ - msg_t *pMsg; - int i; DEFiRet; if(stmt->d.s_call.ruleset == NULL) { - scriptExec(stmt->d.s_call.stmt, pBatch, active, pWti); + scriptExec(stmt->d.s_call.stmt, pMsg, pWti); } else { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - CHKmalloc(pMsg = MsgDup((msg_t*) pBatch->pElem[i].pMsg)); - DBGPRINTF("CALL: forwarding message %d to async ruleset %p\n", - i, stmt->d.s_call.ruleset->pQueue); - MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - MsgSetRuleset(pMsg, stmt->d.s_call.ruleset); - /* Note: we intentionally use submitMsg2() here, as we process messages - * that were already run through the rate-limiter. - */ - submitMsg2(pMsg); - } + CHKmalloc(pMsg = MsgDup((msg_t*) pMsg)); + DBGPRINTF("CALL: forwarding message to async ruleset %p\n", + stmt->d.s_call.ruleset->pQueue); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(pMsg, stmt->d.s_call.ruleset); + /* Note: we intentionally use submitMsg2() here, as we process messages + * that were already run through the rate-limiter. + */ + submitMsg2(pMsg); } finalize_it: RETiRet; } -/* for details, see scriptExec() header comment! */ -// save current filter, evaluate new one -// perform then (if any message) -// if ELSE given: -// set new filter, inverted -// perform else (if any messages) -static rsRetVal -execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) +static void +execIf(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *newAct; - int i; sbool bRet; - sbool allInactive = 1; - DEFiRet; - newAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - FINALIZE; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - if(active == NULL || active[i]) { - bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg); - allInactive = 0; - } else - bRet = 0; - newAct[i] = bRet; - DBGPRINTF("batch: item %d: expr eval: %d\n", i, bRet); - } - - if(allInactive) { - DBGPRINTF("execIf: all batch elements are inactive, holding execution\n"); - freeActive(newAct); - FINALIZE; - } - - if(stmt->d.s_if.t_then != NULL) { - scriptExec(stmt->d.s_if.t_then, pBatch, newAct, pWti); - } - if(stmt->d.s_if.t_else != NULL) { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - FINALIZE; - if(pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) - newAct[i] = !newAct[i]; - } - scriptExec(stmt->d.s_if.t_else, pBatch, newAct, pWti); + bRet = cnfexprEvalBool(stmt->d.s_if.expr, pMsg); + DBGPRINTF("if condition result is %d\n", bRet); + if(bRet) { + if(stmt->d.s_if.t_then != NULL) + scriptExec(stmt->d.s_if.t_then, pMsg, pWti); + } else { + if(stmt->d.s_if.t_else != NULL) + scriptExec(stmt->d.s_if.t_else, pMsg, pWti); } - freeActive(newAct); -finalize_it: - RETiRet; } -/* for details, see scriptExec() header comment! */ static void -execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) +execPRIFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *newAct; - msg_t *pMsg; int bRet; - int i; - newAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - pMsg = pBatch->pElem[i].pMsg; - if(active == NULL || active[i]) { - if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) || - ((stmt->d.s_prifilt.pmask[pMsg->iFacility] - & (1<<pMsg->iSeverity)) == 0) ) - bRet = 0; - else - bRet = 1; - } else - bRet = 0; - newAct[i] = bRet; - DBGPRINTF("batch: item %d PRIFILT %d\n", i, newAct[i]); - } - - if(stmt->d.s_prifilt.t_then != NULL) { - scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct, pWti); - } - if(stmt->d.s_prifilt.t_else != NULL) { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) - newAct[i] = !newAct[i]; - } - scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct, pWti); + if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) || + ((stmt->d.s_prifilt.pmask[pMsg->iFacility] + & (1<<pMsg->iSeverity)) == 0) ) + bRet = 0; + else + bRet = 1; + + DBGPRINTF("PRIFILT condition result is %d\n", bRet); + if(bRet) { + if(stmt->d.s_prifilt.t_then != NULL) + scriptExec(stmt->d.s_prifilt.t_then, pMsg, pWti); + } else { + if(stmt->d.s_prifilt.t_else != NULL) + scriptExec(stmt->d.s_prifilt.t_else, pMsg, pWti); } - freeActive(newAct); } @@ -498,79 +337,63 @@ done: return bRet; } -/* for details, see scriptExec() header comment! */ static void -execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) +execPROPFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *thenAct; sbool bRet; - int i; - thenAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - if(active == NULL || active[i]) { - bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg); - } else - bRet = 0; - thenAct[i] = bRet; - DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]); - } - scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct, pWti); - freeActive(thenAct); + bRet = evalPROPFILT(stmt, pMsg); + DBGPRINTF("PROPFILT condition result is %d\n", bRet); + if(bRet) + scriptExec(stmt->d.s_propfilt.t_then, pMsg, pWti); } /* The rainerscript execution engine. It is debatable if that would be better * contained in grammer/rainerscript.c, HOWEVER, that file focusses primarily * on the parsing and object creation part. So as an actual executor, it is * better suited here. - * param active: if NULL, all messages are active (to be processed), if non-null - * this is an array of the same size as the batch. If 1, the message - * is to be processed, otherwise not. - * NOTE: this function must receive batches which contain a single ruleset ONLY! * rgerhards, 2012-09-04 */ -static rsRetVal -scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti) +static void +scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti) { - DEFiRet; struct cnfstmt *stmt; for(stmt = root ; stmt != NULL ; stmt = stmt->next) { + if(*pWti->pbShutdownImmediate) { + DBGPRINTF("scriptExec: ShutdownImmediate set, " + "force terminating\n"); + goto done; + } if(Debug) { - dbgprintf("scriptExec: batch of %d elements, active %p, active[0]:%d\n", - batchNumMsgs(pBatch), active, (active == NULL ? 1 : active[0])); cnfstmtPrintOnly(stmt, 2, 0); } switch(stmt->nodetype) { case S_NOP: break; case S_STOP: - execStop(pBatch, active); + goto done; break; case S_ACT: - execAct(stmt, pBatch, active, pWti); + execAct(stmt, pMsg, pWti); break; case S_SET: - execSet(stmt, pBatch, active); + execSet(stmt, pMsg); break; case S_UNSET: - execUnset(stmt, pBatch, active); + execUnset(stmt, pMsg); break; case S_CALL: - execCall(stmt, pBatch, active, pWti); + execCall(stmt, pMsg, pWti); break; case S_IF: - execIf(stmt, pBatch, active, pWti); + execIf(stmt, pMsg, pWti); break; case S_PRIFILT: - execPRIFILT(stmt, pBatch, active, pWti); + execPRIFILT(stmt, pMsg, pWti); break; case S_PROPFILT: - execPROPFILT(stmt, pBatch, active, pWti); + execPROPFILT(stmt, pMsg, pWti); break; default: dbgprintf("error: unknown stmt type %u during exec\n", @@ -578,36 +401,39 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti) break; } } - RETiRet; +done: return; } /* Process (consume) a batch of messages. Calls the actions configured. - * If the whole batch uses a singel ruleset, we can process the batch as - * a whole. Otherwise, we need to process it slower, on a message-by-message - * basis (what can be optimized to a per-ruleset basis) - * rgerhards, 2005-10-13 */ static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti) { - ruleset_t *pThis; + int i; + msg_t *pMsg; + ruleset_t *pRuleset; DEFiRet; - assert(pBatch != NULL); - - DBGPRINTF("processBatch: batch of %d elements must be processed\n", pBatch->nElem); - if(pBatch->bSingleRuleset) { - pThis = batchGetRuleset(pBatch); - if(pThis == NULL) - pThis = ourConf->rulesets.pDflt; - ISOBJ_TYPE_assert(pThis, ruleset); - CHKiRet(scriptExec(pThis->root, pBatch, NULL, pWti)); - } else { - CHKiRet(processBatchMultiRuleset(pBatch, pWti)); + + DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem); + + /* execution phase */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) { + pMsg = pBatch->pElem[i].pMsg; + DBGPRINTF("processBATCH: next msg %d: %.128s\n", i, pMsg->pszRawMsg); + pRuleset = (pMsg->pRuleset == NULL) ? ourConf->rulesets.pDflt : pMsg->pRuleset; + scriptExec(pRuleset->root, pMsg, pWti); + // TODO: think if we need a return state of scriptExec - most probably + // the answer is "no", as we need to process the batch in any case! + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); } -finalize_it: - DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet); + /* commit phase */ + dbgprintf("END batch execution phase, entering to commit phase\n"); + actionCommitAllDirect(pWti); + + DBGPRINTF("processBATCH: batch of %d elements has been processed\n", pBatch->nElem); RETiRet; } diff --git a/runtime/wti.c b/runtime/wti.c index df77bc19..ddffd81a 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -391,7 +391,6 @@ finalize_it: RETiRet; } - /* dummy */ rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } diff --git a/runtime/wti.h b/runtime/wti.h index bb4f56bc..813237fc 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -1,6 +1,6 @@ /* Definition of the worker thread instance (wti) class. * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2013 Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -26,11 +26,46 @@ #include "wtp.h" #include "obj.h" #include "batch.h" +#include "action.h" +#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */ +#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */ +#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */ +#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */ +#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */ +#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */ +/* note: 3 bit bit field --> highest value is 7! */ + +/* The following structure defines immutable parameters which need to + * be passed as action parameters. Note that the current implementation + * does NOT focus on performance, but on a simple PoC in order to get + * things going. TODO: Once it works, revisit this code and think about + * an array implementation. We also need to support other passing modes + * as well. -- gerhards, 2013-11-04 + */ +typedef struct actWrkrIParams { + struct actWrkrIParams *next; + int msgFlags; + /* following are caches to save allocs if not absolutely necessary */ + uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ + /* a cache to save malloc(), if not absolutely necessary */ + unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + /* and the same for the message length (if used) */ + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; +} actWrkrIParams_t; + typedef struct actWrkrInfo { action_t *pAction; void *actWrkrData; + uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */ + int iNbrResRtry; /* number of retries since last suspend */ + struct { + unsigned actState : 3; + } flags; + actWrkrIParams_t *iparamRoot; + actWrkrIParams_t *iparamLast; + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */ } actWrkrInfo_t; /* the worker thread instance class */ @@ -39,6 +74,7 @@ struct wti_s { pthread_t thrdID; /* thread ID */ int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */ sbool bAlwaysRunning; /* should this thread always run? */ + int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ @@ -62,4 +98,79 @@ PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); +static inline uint8_t +getActionStateByNbr(wti_t *pWti, int iActNbr) +{ + return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState); +} + +static inline uint8_t +getActionState(wti_t *pWti, action_t *pAction) +{ + return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState); +} + +static inline void +setActionState(wti_t *pWti, action_t *pAction, uint8_t newState) +{ + pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState; +} + +static inline uint16_t +getActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow); +} + +static inline void +setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val; +} + +static inline void +incActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++; +} + +static inline int +getActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry); +} + +static inline void +setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val; +} + +static inline void +incActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++; +} + +static inline rsRetVal +wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams) +{ + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparams; + DEFiRet; + + wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); +dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr); + CHKmalloc(iparams = calloc(1, sizeof(actWrkrIParams_t))); + if(wrkrInfo->iparamLast == NULL) { + wrkrInfo->iparamLast = wrkrInfo->iparamRoot = iparams; + } else { + wrkrInfo->iparamLast->next = iparams; + wrkrInfo->iparamLast = iparams; + } + *piparams = iparams; + +finalize_it: + RETiRet; +} #endif /* #ifndef WTI_H_INCLUDED */ diff --git a/tools/syslogd.c b/tools/syslogd.c index 7597b05d..c27d79b7 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -497,24 +497,19 @@ finalize_it: * rgerhards, 2010-06-09 */ static inline rsRetVal -preprocessBatch(batch_t *pBatch) { +preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) { prop_t *ip; prop_t *fqdn; prop_t *localName; prop_t *propFromHost = NULL; prop_t *propFromHostIP = NULL; - int bSingleRuleset; - ruleset_t *batchRuleset; /* the ruleset used for all message inside the batch, if there is a single one */ int bIsPermitted; msg_t *pMsg; int i; rsRetVal localRet; DEFiRet; - bSingleRuleset = 1; - batchRuleset = (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL; - - for(i = 0 ; i < pBatch->nElem && !*(pBatch->pbShutdownImmediate) ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { pMsg = pBatch->pElem[i].pMsg; if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); @@ -539,12 +534,8 @@ preprocessBatch(batch_t *pBatch) { pBatch->eltState[i] = BATCH_STATE_DISC; } } - if(pMsg->pRuleset != batchRuleset) - bSingleRuleset = 0; } - batchSetSingleRuleset(pBatch, bSingleRuleset); - finalize_it: if(propFromHost != NULL) prop.Destruct(&propFromHost); @@ -564,8 +555,8 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWt { DEFiRet; assert(pBatch != NULL); - pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */ - preprocessBatch(pBatch); + pWti->pbShutdownImmediate = pbShutdownImmediate; + preprocessBatch(pBatch, pWti->pbShutdownImmediate); ruleset.ProcessBatch(pBatch, pWti); //TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we //do not have this yet and so we emulate -- 2010-06-10 |