diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 1030 |
1 files changed, 368 insertions, 662 deletions
@@ -13,18 +13,15 @@ * The different modes (and calling sequence) are: * * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval - * - doSubmitToActionQComplexBatch - * - helperSubmitToActionQComplexBatch - * - doActionCallAction + * - doSubmitToActionQComplex * handles mark message reduction, but in essence calls * - actionWriteToAction * - qqueueEnqObj * (now queue engine processing) - * if(pThis->bWriteAllMarkMsgs == RSFALSE) - this is the DEFAULT - * - doSubmitToActionQNotAllMarkBatch - * - doSubmitToActionQBatch (and from here like in the else case below!) + * if(pThis->bWriteAllMarkMsgs == RSFALSE) + * - doSubmitToActionQNotAllMark + * - doSubmitToActionQ (and from here like in the else case below!) * else - * - doSubmitToActionQBatch * - doSubmitToActionQ * - qqueueEnqObj * (now queue engine processing) @@ -36,9 +33,6 @@ * * After dequeue, processing is as follows: * - processBatchMain - * - processAction - * - submitBatch - * - tryDoAction * - ... * * MORE ON PROCESSING, QUEUES and FILTERING @@ -120,13 +114,12 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQComplex(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, wti_t *pWti, msg_t*); /* object static data (once for all instances) */ -/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) @@ -176,10 +169,9 @@ configSettings_t cs_save; /* our saved (scope!) config settings */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice * to have during one instance of an rsyslogd run. For example, I use them to name actions when there - * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep - * counting. -- rgerhards, 2008-01-29 + * is no better name available. */ -static int iActionNbr = 0; +int iActionNbr = 0; int bActionReportSuspension = 1; /* tables for interfacing with the v6 config system */ @@ -310,7 +302,6 @@ rsRetVal actionDestruct(action_t *pThis) pThis->pMod->freeInstance(pThis->pModData); pthread_mutex_destroy(&pThis->mutAction); - pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); @@ -336,14 +327,15 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iResumeInterval = 30; pThis->iResumeRetryCount = 0; pThis->pszName = NULL; - pThis->bWriteAllMarkMsgs = RSFALSE; + pThis->bWriteAllMarkMsgs = 1; pThis->iExecEveryNthOccur = 0; pThis->iExecEveryNthOccurTO = 0; pThis->iSecsExecOnceInterval = 0; pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; + pThis->bDisabled = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ - pthread_mutex_init(&pThis->mutActExec, NULL); + pThis->iActionNbr = iActionNbr; pthread_mutex_init(&pThis->mutAction, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); @@ -425,13 +417,13 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) DBGPRINTF("info: firehose mode disabled for action because " "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); - pThis->submitToActQ = doSubmitToActionQComplexBatch; - } else if(pThis->bWriteAllMarkMsgs == RSFALSE) { - /* nearly full-speed submission mode, default case */ - pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; + pThis->submitToActQ = doSubmitToActionQComplex; + } else if(pThis->bWriteAllMarkMsgs) { + /* full firehose submission mode, default case*/ + pThis->submitToActQ = doSubmitToActionQ; } else { - /* full firehose submission mode */ - pThis->submitToActQ = doSubmitToActionQBatch; + /* nearly full-speed submission mode */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; } /* create queue */ @@ -441,7 +433,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, - (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); + processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); @@ -518,9 +510,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal) * returned string must not be modified. * rgerhards, 2009-05-07 */ -static uchar *getActStateName(action_t *pThis) +static uchar *getActStateName(action_t *pThis, wti_t *pWti) { - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: return (uchar*) "rdy"; case ACT_STATE_ITX: @@ -529,8 +521,6 @@ static uchar *getActStateName(action_t *pThis) return (uchar*) "rtry"; case ACT_STATE_SUSP: return (uchar*) "susp"; - case ACT_STATE_DIED: - return (uchar*) "died"; case ACT_STATE_COMM: return (uchar*) "comm"; default: @@ -542,12 +532,12 @@ static uchar *getActStateName(action_t *pThis) /* returns a suitable return code based on action state * rgerhards, 2009-05-07 */ -static rsRetVal getReturnCode(action_t *pThis) +static rsRetVal getReturnCode(action_t *pThis, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: iRet = RS_RET_OK; break; @@ -563,12 +553,11 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_SUSPENDED; break; case ACT_STATE_SUSP: - case ACT_STATE_DIED: iRet = RS_RET_ACTION_FAILED; break; default: - DBGPRINTF("Invalid action engine state %d, program error\n", - (int) pThis->eState); + DBGPRINTF("Invalid action engine state %u, program error\n", + getActionState(pWti, pThis)); iRet = RS_RET_ERR; break; } @@ -580,29 +569,31 @@ static rsRetVal getReturnCode(action_t *pThis) /* set the action to a new state * rgerhards, 2007-08-02 */ -static inline void actionSetState(action_t *pThis, action_state_t newState) +static inline void +actionSetState(action_t *pThis, wti_t *pWti, uint8_t newState) { - pThis->eState = newState; - DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); + setActionState(pWti, pThis, newState); + DBGPRINTF("Action %d transitioned to state: %s\n", + pThis->iActionNbr, getActStateName(pThis, pWti)); } /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ -static void actionCommitted(action_t *pThis) +static void actionCommitted(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } /* set action to "rtry" state. * rgerhards, 2007-08-02 */ -static void actionRetry(action_t *pThis) +static void actionRetry(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RTRY); - pThis->iResumeOKinRow++; + actionSetState(pThis, pWti, ACT_STATE_RTRY); + incActionResumeInRow(pWti, pThis); } @@ -613,7 +604,7 @@ static void actionRetry(action_t *pThis) */ static void actionDisable(action_t *pThis) { - actionSetState(pThis, ACT_STATE_DIED); + pThis->bDisabled = 1; } @@ -626,7 +617,7 @@ static void actionDisable(action_t *pThis) * rgerhards, 2007-08-02 */ static inline void -actionSuspend(action_t * const pThis) +actionSuspend(action_t * const pThis, wti_t *pWti) { time_t ttNow; int suspendDuration; @@ -636,16 +627,17 @@ actionSuspend(action_t * const pThis) * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); - suspendDuration = pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + suspendDuration = pThis->iResumeInterval * (getActionNbrResRtry(pWti, pThis) / 10 + 1); pThis->ttResumeRtry = ttNow + suspendDuration; - actionSetState(pThis, ACT_STATE_SUSP); + actionSetState(pThis, pWti, ACT_STATE_SUSP); pThis->ctrSuspendDuration += suspendDuration; - if(pThis->iNbrResRtry == 0) { + if(getActionNbrResRtry(pWti, pThis) == 0) { STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend); } - DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d\n", + DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d, " + "duration %d\n", pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow, - pThis->iNbrResRtry); + getActionNbrResRtry(pWti, pThis), suspendDuration); if(bActionReportSuspension) { ctime_r(&pThis->ttResumeRtry, timebuf); timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */ @@ -664,15 +656,15 @@ actionSuspend(action_t * const pThis) * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog * engine to go into a tight loop. That obviously is not acceptable. As such, we track the * count of iterations that a tryResume returning RS_RET_OK is immediately followed by - * an unsuccessful call to doAction(). If that happens more than 1,000 times, we assume + * an unsuccessful call to doAction(). If that happens more than 10 times, we assume * the return acutally is a RS_RET_SUSPENDED. In order to go through the various - * resumption stages, we do this for every 1000 requests. This magic number 1000 may + * resumption stages, we do this for every 10 requests. This magic number 10 may * not be the most appropriate, but it should be thought of a "if nothing else helps" * kind of facility: in the first place, the module should return a proper indication * of its inability to recover. -- rgerhards, 2010-04-26. */ -static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal +actionDoRetry(action_t *pThis, wti_t *pWti) { int iRetries; int iSleepPeriod; @@ -682,13 +674,13 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d\n", pThis->pszName, iRetries); - iRet = pThis->pMod->tryResume(pThis->pModData); + iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet); - if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { + if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) { bTreatOKasSusp = 1; - pThis->iResumeOKinRow = 0; + setActionResumeInRow(pWti, pThis, 0); } else { bTreatOKasSusp = 0; } @@ -699,21 +691,21 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "action '%s' resumed", pThis->pszName); } - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount " "%d, iRetries %d\n", pThis->pszName, pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis); - if(pThis->iNbrResRtry < 20) - ++pThis->iNbrResRtry; + actionSuspend(pThis, pWti); + if(getActionNbrResRtry(pWti, pThis) < 20) + incActionNbrResRtry(pWti, pThis); } else { ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); - if(*pbShutdownImmediate) { + if(*pWti->pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } } @@ -722,8 +714,8 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) } } - if(pThis->eState == ACT_STATE_RDY) { - pThis->iNbrResRtry = 0; + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + setActionNbrResRtry(pWti, pThis, 0); } finalize_it: @@ -731,17 +723,33 @@ finalize_it: } +static rsRetVal +actionCheckAndCreateWrkrInstance(action_t *pThis, wti_t *pWti) +{ + DEFiRet; + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { +dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr); + DBGPRINTF("we need to create a new action worker instance for " + "action %d\n", pThis->iActionNbr); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ + } +finalize_it: + RETiRet; +} + /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal +actionTryResume(action_t *pThis, wti_t *pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_SUSP) { + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used * to use the action timestamp, but in this case we will never reach a @@ -751,19 +759,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) */ datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow >= pThis->ttResumeRtry) { - actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ + actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ } } - if(pThis->eState == ACT_STATE_RTRY) { + if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pWti)); } - if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", - pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); } finalize_it: @@ -775,24 +783,27 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +static inline rsRetVal +actionPrepare(action_t *pThis, wti_t *pWti) { DEFiRet; assert(pThis != NULL); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); + CHKiRet(actionTryResume(pThis, pWti)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ - if(pThis->eState == ACT_STATE_RDY) { - iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { +dbgprintf("DDDDD: calling beginTransaction for action %d\n", pThis->iActionNbr); + iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionSetState(pThis, ACT_STATE_ITX); + actionSetState(pThis, pWti, ACT_STATE_ITX); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -806,10 +817,11 @@ finalize_it: } +#if 0 // TODO: remove? /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ -rsRetVal actionDbgPrint(action_t *pThis) +static rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; char *sz; @@ -819,11 +831,12 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - if(pThis->eState == ACT_STATE_SUSP) { +#if 0 // do we need this ??? + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\tState: %s\n", getActStateName(pThis)); +#endif dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { sz = "slow, but feature-rich"; @@ -839,40 +852,45 @@ rsRetVal actionDbgPrint(action_t *pThis) RETiRet; } +#endif /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ static rsRetVal -prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) +prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { int i; - msg_t *pMsg; struct json_object *json; + actWrkrIParams_t *iparams; + actWrkrInfo_t *pWrkrInfo; DEFiRet; - ASSERT(pAction != NULL); - ASSERT(pElem != NULL); + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + if(pAction->eParamPassing == ACT_STRING_PASSING) { + CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); + } - pMsg = pElem->pMsg; /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { +dbgprintf("DDDDD: generating template #%d\n", i); switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), - &pElem->staticLenStrings[i], ttNow)); - pElem->staticActParams[i] = pElem->staticActStrings[i]; + iparams->msgFlags = pMsg->msgFlags; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(iparams->staticActStrings[i]), + &iparams->staticLenStrings[i], ttNow)); + iparams->staticActParams[i] = iparams->staticActStrings[i]; break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow)); break; case ACT_MSG_PASSING: - pElem->staticActParams[i] = (void*) pMsg; + pWrkrInfo->staticActParams[i] = (void*) pMsg; break; case ACT_JSON_PASSING: CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); - pElem->staticActParams[i] = (void*) json; + pWrkrInfo->staticActParams[i] = (void*) json; break; default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", (int) pAction->eParamPassing); @@ -886,96 +904,85 @@ finalize_it: } -/* free a batches ressources, but not string buffers (because they will - * most probably be reused). String buffers are only deleted upon final - * destruction of the batch. - * This function here must be called only when the batch is actually no - * longer used, also not for retrying actions or such. It invalidates - * buffers. - * rgerhards, 2010-12-17 - */ -static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +static void +releaseDoActionParams(action_t *pAction, wti_t *pWti) { int jArr; - int i, j; - batch_obj_t *pElem; + int j; + actWrkrInfo_t *pWrkrInfo; uchar ***ppMsgs; - DEFiRet; - - ASSERT(pAction != NULL); if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) goto done; /* we need to do nothing with these types! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - ppMsgs = (uchar***) pElem->staticActParams; - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - if(((uchar**)ppMsgs)[j] != NULL) { - jArr = 0; - while(ppMsgs[j][jArr] != NULL) { - d_free(ppMsgs[j][jArr]); - ppMsgs[j][jArr] = NULL; - ++jArr; - } - d_free(((uchar**)ppMsgs)[j]); - ((uchar**)ppMsgs)[j] = NULL; - } - } - break; - case ACT_JSON_PASSING: - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - json_object_put((struct json_object*) - pElem->staticActParams[j]); - pElem->staticActParams[j] = NULL; + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pWrkrInfo->staticActParams; + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + free(ppMsgs[j][jArr]); + ppMsgs[j][jArr] = NULL; + ++jArr; } - break; - case ACT_STRING_PASSING: - case ACT_MSG_PASSING: - /* can never happen, just to keep compiler happy! */ - break; + free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; } } + break; + case ACT_JSON_PASSING: + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + json_object_put((struct json_object*) + pWrkrInfo->staticActParams[j]); + pWrkrInfo->staticActParams[j] = NULL; + } + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* can never happen, just to keep compiler happy! */ + break; } -done: RETiRet; +done: return; } /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ -rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +static rsRetVal +actionCallDoAction(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", + getActStateName(pThis, pWti), pThis->iActionNbr); + + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); + iRet = pThis->pMod->mod.om.doAction(actParams, msgFlags, + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionCommitted(pThis, pWti); + setActionResumeInRow(pWti, pThis, 0); break; case RS_RET_DEFER_COMMIT: - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); /* we are done, action state remains the same */ break; case RS_RET_PREVIOUS_COMMITTED: /* action state remains the same, but we had a commit. */ pThis->bHadAutoCommit = 1; - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -985,7 +992,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) */ FINALIZE; } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; @@ -996,57 +1003,90 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static inline rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +rsRetVal +actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - - CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) - pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); - if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti)); - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } -/* finish processing a batch. Most importantly, that means we commit if we - * need to do so. - * rgerhards, 2008-01-28 - */ +/* the following functions simulates a potential future new omo callback */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch) +doTransaction(action_t *pThis, wti_t *pWti) { + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparamCurr; int i; DEFiRet; - ASSERT(pThis != NULL); + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + dbgprintf("DDDD: doTransaction: action %d, currIParams %d\n", pThis->iActionNbr, wrkrInfo->currIParam); + for(i = 0 ; i < wrkrInfo->currIParam ; ++i) { + iparamCurr = wrkrInfo->iparams + i; + iRet = actionProcessMessage(pThis, iparamCurr->msgFlags, + iparamCurr->staticActParams, + pWti); + dbgprintf("DDDD: doTransaction loop, iRet %d\n", iRet); + } + RETiRet; +} + - if(pThis->eState == ACT_STATE_RDY) { - /* we just need to flag the batch as commited */ - FINALIZE; /* nothing to do */ +static void +actionFreeParams(action_t *pThis, wti_t *pWti) +{ + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparamCurr; + int i; + int j; + + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + dbgprintf("DDDD: actionFreeParams: action %d, currIParam %d\n", pThis->iActionNbr, wrkrInfo->currIParam); + for(i = 0 ; i < wrkrInfo->currIParam ; ++i) { + iparamCurr = wrkrInfo->iparams + i; + for(j = 0 ; j < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++j) { + /* TODO: we can save time by not freeing everything, + * but that's left for a later optimization. + */ + free(iparamCurr->staticActStrings[j]); + iparamCurr->staticActStrings[j] = NULL; + iparamCurr->staticLenStrings[j] = 0; + } } + wrkrInfo->currIParam = 0; /* reset to beginning */ + releaseDoActionParams(pThis, pWti); +} + - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); - if(pThis->eState == ACT_STATE_ITX) { - iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); +/* Commit try committing (do not handle retry processing and such) */ +static rsRetVal +actionTryCommit(action_t *pThis, wti_t *pWti) +{ + //actWrkrInfo_t *wrkrInfo; + DEFiRet; + + doTransaction(pThis, pWti); + + CHKiRet(actionPrepare(pThis, pWti)); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) { +dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr); + iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - /* flag messages as committed */ - for(i = 0 ; i < pBatch->nElem ; ++i) { - batchSetElemState(pBatch, i, BATCH_STATE_COMM); - pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ - } + actionCommitted(pThis, pWti); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -1054,12 +1094,12 @@ finishBatch(action_t *pThis, batch_t *pBatch) case RS_RET_DEFER_COMMIT: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; case RS_RET_PREVIOUS_COMMITTED: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) @@ -1067,304 +1107,139 @@ finishBatch(action_t *pThis, batch_t *pBatch) FINALIZE; } } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: + actionFreeParams(pThis, pWti); RETiRet; } - -/* try to submit a partial batch of elements. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) -{ - int i; - int iElemProcessed; - int iCommittedUpTo; - msg_t *pMsg; - rsRetVal localRet; - DEFiRet; - - assert(pBatch != NULL); - assert(pnElem != NULL); - - i = pBatch->iDoneUpTo; /* all messages below that index are processed */ - iElemProcessed = 0; - iCommittedUpTo = i; - DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); - while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - /* NOTE: do NOT extend the filter below! Anything else must be done on the - * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 - */ - if(batchIsValidElem(pBatch, i)) { - pMsg = pBatch->pElem[i].pMsg; - localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate); - DBGPRINTF("action %p call returned %d\n", pAction, localRet); - /* Note: we directly modify the batch object state, because we know that - * wo do not overwrite BATCH_STATE_DISC indicators! - */ - if(localRet == RS_RET_OK) { - /* mark messages as committed */ - while(iCommittedUpTo <= i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - /* mark messages as committed */ - while(iCommittedUpTo < i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", - localRet, *pnElem, iCommittedUpTo); - iRet = localRet; - FINALIZE; - } - } - ++i; - ++iElemProcessed; - } - -finalize_it: - if(pBatch->iDoneUpTo != iCommittedUpTo) { - pBatch->iDoneUpTo = iCommittedUpTo; - } - RETiRet; -} - -/* submit a batch for actual action processing. - * The first nElem elements are processed. This function calls itself - * recursively if it needs to handle errors. - * Note: we don't need the number of the first message to be processed as a parameter, - * because this is kept track of inside the batch itself (iDoneUpTo). - * rgerhards, 2009-05-12 +/* Note: we currently need to return an iRet, as this is used in + * direct mode. TODO: However, it may be worth further investigating this, + * as it looks like there is no ultimate consumer of this code. + * rgerhards, 2013-11-06 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +actionCommit(action_t *pThis, wti_t *pWti) { - int i; - int bDone; - rsRetVal localRet; - int wasDoneTo; + sbool bDone; DEFiRet; - assert(pBatch != NULL); - - DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); - wasDoneTo = pBatch->iDoneUpTo; + /* even more TODO: + This is the place where retry processing needs to go in. If the action + permanently fails, we should - as a new feature - add the capability to + write an error file. This is already done be omelasticsearch, and IMHO + pretty useful. + For the time being, I do NOT implement all of this (not even retry!) + as I want to get the rest of the engine to SISD (non-SIMD ;)) so that + I know any potential suprises and complications that arise out of this. + When this is done, I can come back here and complete this work. Obviously, + many features do not work in the mean time (but it is not planned to release + any of these partial implementations). + rgerhards, 2013-11-04 + */ bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem); - if(localRet == RS_RET_FORCE_TERM) { + iRet = actionTryCommit(pThis, pWti); + DBGPRINTF("actionCommit, in retry loop, iRet %d\n", iRet); + if(iRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - /* try commit transaction, once done, we can simply do so as if - * that return state was returned from tryDoAction(). - */ - localRet = finishBatch(pAction, pBatch); - } - - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { + } else if(iRet == RS_RET_OK || iRet == RS_RET_ACTION_FAILED) { bDone = 1; - } else if(localRet == RS_RET_SUSPENDED) { - DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); - /* do nothing, this will retry the full batch */ - } else if(localRet == RS_RET_ACTION_FAILED) { - /* in this case, everything not yet committed is BAD */ - for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && pBatch->eltState[i] != BATCH_STATE_COMM ) { - pBatch->eltState[i] = BATCH_STATE_BAD; - pBatch->pElem[i].bPrevWasSuspended = 1; - STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); - } - } - bDone = 1; - } else { - if(nElem == 1) { - batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); - bDone = 1; - } else { - /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " - "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); - bDone = 1; - } } - } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - -finalize_it: - RETiRet; -} - - -/* copy "active" array of batch, as we need to modify it. The caller - * must make sure the new array is freed and the orginal batch - * pointer is restored (thus the caller must save it). If active - * is currently NULL, this is properly handled. - * Note: the batches active pointer is modified, so it must be - * saved BEFORE calling this function! - * rgerhards, 2012-09-12 - */ -static rsRetVal -copyActive(batch_t *pBatch) -{ - sbool *active; - DEFiRet; - - CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - if(pBatch->active == NULL) - memset(active, 1, batchNumMsgs(pBatch)); - else - memcpy(active, pBatch->active, batchNumMsgs(pBatch)); - pBatch->active = active; + } while(!bDone); finalize_it: RETiRet; } -/* The following function prepares a batch for processing, that it is - * reinitializes batch states, generates strings and does everything else - * that needs to be done in order to make the batch ready for submission to - * the actual output module. Note that we look at the precomputed - * filter OK condition and process only those messages, that actually matched - * the filter. - * rgerhards, 2010-06-14 - */ -static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) +/* Commit all active transactions in *DIRECT mode* */ +void +actionCommitAllDirect(wti_t *pWti) { int i; - batch_obj_t *pElem; - struct syslogTime ttNow; - DEFiRet; - - /* indicate we have not yet read the date */ - ttNow.year = 0; + action_t *pAction; - pBatch->iDoneUpTo = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - pBatch->eltState[i] = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { - /* make sure we have our copy of "active" array */ - if(!*bMustRestoreActivePtr) { - *activeSave = pBatch->active; - copyActive(pBatch); - } - pBatch->active[i] = RSFALSE; - } - } + for(i = 0 ; i < iActionNbr ; ++i) { + dbgprintf("DDDD: actionCommitAll: action %d, state %u, nbr to commit %d\n", + i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->currIParam); + pAction = pWti->actWrkrInfo[i].pAction; + if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT) + actionCommit(pAction, pWti); } - RETiRet; } - -/* receive a batch and process it. This includes retry handling. - * rgerhards, 2009-05-12 +/* process a single message. This is both called if we run from the + * cosumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". */ -static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +static rsRetVal +processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { DEFiRet; - assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); - iRet = finishBatch(pAction, pBatch); + if(pAction->bExecWhenPrevSusp && !pWti->execState.bPrevWasSuspended) { + DBGPRINTF("action %d: NOT executing, as previous action was " + "not suspended\n", pAction->iActionNbr); + FINALIZE; + } + +dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg); + iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); + if(pAction->eParamPassing == ACT_STRING_PASSING) { + pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; + dbgprintf("DDDD: action %d is string passing - executing in commit phase\n", pAction->iActionNbr); + actionPrepare(pAction, pWti); + iRet = getReturnCode(pAction, pWti); + FINALIZE; + } + iRet = actionProcessMessage(pAction, pMsg->msgFlags, + pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, + pWti); + releaseDoActionParams(pAction, pWti); finalize_it: + if(pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); + pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED); +dbgprintf("DDDD: bPrevWasSuspended now %d, action state %d\n", (int)pWti->execState.bPrevWasSuspended, getActionState(pWti, pAction)); RETiRet; } - -#pragma GCC diagnostic ignored "-Wempty-body" -/* receive an array of to-process user pointers and submit them - * for processing. - * rgerhards, 2009-04-22 +/* This entry point is called by the ACTION queue (not main queue!) */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti) { - int *pbShutdownImmdtSave; - sbool *activeSave; - int bMustRestoreActivePtr = 0; - rsRetVal localRet; + action_t *pAction = (action_t*) pVoid; + msg_t *pMsg; + int i; + struct syslogTime ttNow; DEFiRet; - assert(pBatch != NULL); - - if(pbShutdownImmediate != NULL) { - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; - } - CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - - iRet = processAction(pAction, pBatch); - - pthread_cleanup_pop(1); /* unlock mutex */ - - /* even if processAction failed, we need to release the batch (else we - * have a memory leak). So we do this first, and then check if we need to - * return an error code. If so, the code from processAction has priority. - * rgerhards, 2010-12-17 - */ - localRet = releaseBatch(pAction, pBatch); + wtiResetExecState(pWti, pBatch); + /* indicate we have not yet read the date */ + ttNow.year = 0; - if(iRet == RS_RET_OK) - iRet = localRet; - - if(bMustRestoreActivePtr) { - free(pBatch->active); - pBatch->active = activeSave; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) { + if(batchIsValidElem(pBatch, i)) { + pMsg = pBatch->pElem[i].pMsg; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } } -finalize_it: - if(pbShutdownImmediate != NULL) - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + if(!pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); +dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* call the HUP handler for a given action, if such a handler is defined. The - * action mutex is locked, because the HUP handler most probably needs to modify - * some internal state information. - * rgerhards, 2008-10-22 +/* call the HUP handler for a given action, if such a handler is defined. + * Note that the action must be able to service HUP requests concurrently + * to any current doAction() processing. */ -#pragma GCC diagnostic ignored "-Wempty-body" rsRetVal actionCallHUPHdlr(action_t *pAction) { @@ -1373,19 +1248,13 @@ actionCallHUPHdlr(action_t *pAction) ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); - if(pAction->pMod->doHUP == NULL) { - FINALIZE; /* no HUP handler, so we are done ;) */ + if(pAction->pMod->doHUP != NULL) { + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); } - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - CHKiRet(pAction->pMod->doHUP(pAction->pModData)); - pthread_cleanup_pop(1); /* unlock mutex */ - finalize_it: RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* set the action message queue mode @@ -1420,27 +1289,28 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. This is also utilized to submit messages in complex case once + * and thus speed. This is also utilized to submit messages in more complex cases once * the complex logic has been applied ;) * rgerhards, 2010-06-08 */ -static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +static rsRetVal +doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg) { + struct syslogTime ttNow; // TODO: think if we can buffer this in pWti DEFiRet; - if(pAction->eState == ACT_STATE_DIED) { - DBGPRINTF("action %p died, do NOT execute\n", pAction); - FINALIZE; - } + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); - else + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); + } else {/* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); + } -finalize_it: RETiRet; } @@ -1453,7 +1323,7 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction, msg_t *pMsg) +actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1508,44 +1378,46 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; } -/* helper to actonCallAction, mostly needed because of this damn - * pthread_cleanup_push() POSIX macro... +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 */ -static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +doSubmitToActionQComplex(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - msg_t *pMsg; DEFiRet; - pMsg = pBatch->pElem[idxBtch].pMsg; + d_pthread_mutex_lock(&pAction->mutAction); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ + // TODO: can we optimize the "now" handling again (was batch, I guess...)? /* don't output marks to recently written outputs */ - if(pAction->bWriteAllMarkMsgs == RSFALSE + if(pAction->bWriteAllMarkMsgs == 0 && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { ABORT_FINALIZE(RS_RET_OK); } /* call the output driver */ - iRet = actionWriteToAction(pAction, pMsg); + iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: - /* we need to update the batch to handle failover processing correctly */ - if(iRet == RS_RET_OK) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 0; - } else if(iRet == RS_RET_ACTION_FAILED) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 1; - } + d_pthread_mutex_unlock(&pAction->mutAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } +#pragma GCC diagnostic warning "-Wempty-body" /* helper to activateActions, it activates a specific action. @@ -1594,194 +1466,41 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQNotAllMark(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - time_t now = 0; + int doProcess = 1; time_t lastAct; - int i; - sbool *activeSave; DEFiRet; - activeSave = pBatch->active; - copyActive(pBatch); - - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) - continue; - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - } - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if(pBatch->pElem[i].pMsg->msgFlags & MARK) { - if((now - lastAct) < MarkInterval / 2) { - pBatch->active[i] = 0; - DBGPRINTF("batch item %d: action was recently called, ignoring " - "mark message\n", i); - break; /* do not update timestamp for non-written mark messages */ - } - } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->active[i]) { - DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", - i, module.GetStateName(pAction->pMod)); - } - } - - iRet = doSubmitToActionQBatch(pAction, pBatch); - - free(pBatch->active); - pBatch->active = activeSave; - - RETiRet; -} - -static inline void -countStatsBatchEnq(action_t *pAction, batch_t *pBatch) -{ - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - } - } -} - - -/* enqueue a batch in direct mode. We have put this into its own function just to avoid - * cluttering the actual submit function. - * rgerhards, 2011-06-16 - */ -static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) -{ - sbool bNeedSubmit; - sbool *activeSave; - int i; - DEFiRet; - - activeSave = pBatch->active; - copyActive(pBatch); - - /* note: for direct mode, we need to adjust the filter property. For non-direct - * this is not necessary, because in that case we enqueue only what actually needs - * to be processed. + /* TODO: think about the whole logic. If messages come in out of order, things + * tend to become a bit unreliable. On the other hand, this only happens if we have + * very high traffic, in which this use case here is not really affected (as the + * MarkInterval is pretty corase). */ - if(pAction->bExecWhenPrevSusp) { - bNeedSubmit = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(!pBatch->pElem[i].bPrevWasSuspended) { - DBGPRINTF("action enq stage: change active to 0 due to " - "failover case in elem %d\n", i); - pBatch->active[i] = 0; - } - if(batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - bNeedSubmit = 1; - } - DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - } - if(bNeedSubmit) { - /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); - } else { - DBGPRINTF("no need to submit batch, all invalid\n"); - } - } else { - if(GatherStats) - countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); - } - - free(pBatch->active); - pBatch->active = activeSave; - RETiRet; -} - -/* This submits the message to the action queue in case we do NOT need to handle repeat - * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. - * rgerhards, 2010-06-08 - */ -static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); - - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch); - } else {/* in this case, we do single submits to the queue. - * TODO: optimize this, we may do at least a multi-submit! - */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((pMsg->ttGenTime - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ } } - } + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); - RETiRet; -} - - - -/* Helper to submit a batch of actions to the engine. Note that we have rather - * complicated processing here, so we need to do this one message after another. - * rgerhards, 2010-06-23 - */ -static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action %p (complex case), logging to %s\n", - pAction, module.GetStateName(pAction->pMod)); - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i); - } + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } -/* Call configured action, most complex case with all features supported (and thus slow). - * rgerhards, 2010-06-08 - */ -#pragma GCC diagnostic ignored "-Wempty-body" -static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) -{ - DEFiRet; - - d_pthread_mutex_lock(&pAction->mutAction); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); - d_pthread_mutex_unlock(&pAction->mutAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - - RETiRet; -} -#pragma GCC diagnostic warning "-Wempty-body" - /* apply all params from param block to action. This supports the v6 config system. * Defaults must have been set appropriately during action construct! @@ -1832,7 +1551,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, - struct nvlst *lst, int bSuspended) + struct nvlst *lst) { DEFiRet; int i; @@ -1861,7 +1580,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg; cs.iActExecEveryNthOccur = 0; /* auto-reset */ cs.iActExecEveryNthOccurTO = 0; /* auto-reset */ - cs.bActionWriteAllMarkMsgs = RSFALSE; /* auto-reset */ + cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */ cs.pszActionName = NULL; /* free again! */ } else { actionApplyCnfParam(pAction, actParams); @@ -1919,15 +1638,10 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* check if the module is compatible with select features (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ - - if(bSuspended) - actionSuspend(pAction); CHKiRet(actionConstructFinalize(pAction, lst)); - /* TODO: if we exit here, we have a memory leak... */ + /* TODO: if we exit here, we have a (quite acceptable...) memory leak */ *ppAction = pAction; /* finally store the action pointer */ @@ -1963,7 +1677,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus static inline void initConfigVariables(void) { - cs.bActionWriteAllMarkMsgs = RSFALSE; + cs.bActionWriteAllMarkMsgs = 1; cs.glbliActionResumeRetryCount = 0; cs.bActExecWhenPrevSusp = 0; cs.iActExecOnceInterval = 0; @@ -2002,17 +1716,11 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); ABORT_FINALIZE(RS_RET_MOD_UNKNOWN); } - iRet = pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR); - // TODO: check if RS_RET_SUSPENDED is still valid in v6! - if(iRet != RS_RET_OK && iRet != RS_RET_SUSPENDED) { - FINALIZE; /* iRet is already set to error state */ - } + CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR)); - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst, - (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ } *ppAction = pAction; @@ -2023,8 +1731,6 @@ finalize_it: RETiRet; } -/* TODO: we are not yet a real object, the ClassInit here just looks like it is.. - */ rsRetVal actionClassInit(void) { DEFiRet; |