diff options
-rw-r--r-- | action.c | 950 | ||||
-rw-r--r-- | action.h | 24 | ||||
-rw-r--r-- | doc/Makefile.am | 5 | ||||
-rw-r--r-- | doc/manual.html | 20 | ||||
-rw-r--r-- | doc/rsyslog_conf_actions.html | 10 | ||||
-rw-r--r-- | doc/v8compatibility.html | 82 | ||||
-rw-r--r-- | runtime/batch.h | 40 | ||||
-rw-r--r-- | runtime/conf.c | 6 | ||||
-rw-r--r-- | runtime/queue.c | 87 | ||||
-rw-r--r-- | runtime/queue.h | 9 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 | ||||
-rw-r--r-- | runtime/ruleset.c | 369 | ||||
-rw-r--r-- | runtime/wti.c | 41 | ||||
-rw-r--r-- | runtime/wti.h | 132 | ||||
-rw-r--r-- | tools/syslogd.c | 20 |
15 files changed, 752 insertions, 1044 deletions
@@ -13,18 +13,15 @@ * The different modes (and calling sequence) are: * * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval - * - doSubmitToActionQComplexBatch - * - helperSubmitToActionQComplexBatch - * - doActionCallAction + * - doSubmitToActionQComplex * handles mark message reduction, but in essence calls * - actionWriteToAction * - qqueueEnqObj * (now queue engine processing) - * if(pThis->bWriteAllMarkMsgs == RSFALSE) - this is the DEFAULT - * - doSubmitToActionQNotAllMarkBatch - * - doSubmitToActionQBatch (and from here like in the else case below!) + * if(pThis->bWriteAllMarkMsgs == RSFALSE) + * - doSubmitToActionQNotAllMark + * - doSubmitToActionQ (and from here like in the else case below!) * else - * - doSubmitToActionQBatch * - doSubmitToActionQ * - qqueueEnqObj * (now queue engine processing) @@ -36,9 +33,6 @@ * * After dequeue, processing is as follows: * - processBatchMain - * - processAction - * - submitBatch - * - tryDoAction * - ... * * MORE ON PROCESSING, QUEUES and FILTERING @@ -69,7 +63,7 @@ * beast. * rgerhards, 2011-06-15 * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -119,13 +113,12 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQComplex(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, wti_t *pWti, msg_t*); /* object static data (once for all instances) */ -/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) @@ -332,12 +325,13 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iResumeInterval = 30; pThis->iResumeRetryCount = 0; pThis->pszName = NULL; - pThis->bWriteAllMarkMsgs = RSFALSE; + pThis->bWriteAllMarkMsgs = 1; pThis->iExecEveryNthOccur = 0; pThis->iExecEveryNthOccurTO = 0; pThis->iSecsExecOnceInterval = 0; pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; + pThis->bDisabled = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pThis->iActionNbr = iActionNbr; pthread_mutex_init(&pThis->mutAction, NULL); @@ -410,13 +404,13 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) DBGPRINTF("info: firehose mode disabled for action because " "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); - pThis->submitToActQ = doSubmitToActionQComplexBatch; - } else if(pThis->bWriteAllMarkMsgs == RSFALSE) { - /* nearly full-speed submission mode, default case */ - pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; + pThis->submitToActQ = doSubmitToActionQComplex; + } else if(pThis->bWriteAllMarkMsgs) { + /* full firehose submission mode, default case*/ + pThis->submitToActQ = doSubmitToActionQ; } else { - /* full firehose submission mode */ - pThis->submitToActQ = doSubmitToActionQBatch; + /* nearly full-speed submission mode */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; } /* create queue */ @@ -496,9 +490,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal) * returned string must not be modified. * rgerhards, 2009-05-07 */ -static uchar *getActStateName(action_t *pThis) +static uchar *getActStateName(action_t *pThis, wti_t *pWti) { - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: return (uchar*) "rdy"; case ACT_STATE_ITX: @@ -507,8 +501,6 @@ static uchar *getActStateName(action_t *pThis) return (uchar*) "rtry"; case ACT_STATE_SUSP: return (uchar*) "susp"; - case ACT_STATE_DIED: - return (uchar*) "died"; case ACT_STATE_COMM: return (uchar*) "comm"; default: @@ -520,12 +512,12 @@ static uchar *getActStateName(action_t *pThis) /* returns a suitable return code based on action state * rgerhards, 2009-05-07 */ -static rsRetVal getReturnCode(action_t *pThis) +static rsRetVal getReturnCode(action_t *pThis, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: iRet = RS_RET_OK; break; @@ -541,12 +533,11 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_SUSPENDED; break; case ACT_STATE_SUSP: - case ACT_STATE_DIED: iRet = RS_RET_ACTION_FAILED; break; default: - DBGPRINTF("Invalid action engine state %d, program error\n", - (int) pThis->eState); + DBGPRINTF("Invalid action engine state %u, program error\n", + getActionState(pWti, pThis)); iRet = RS_RET_ERR; break; } @@ -558,29 +549,31 @@ static rsRetVal getReturnCode(action_t *pThis) /* set the action to a new state * rgerhards, 2007-08-02 */ -static inline void actionSetState(action_t *pThis, action_state_t newState) +static inline void +actionSetState(action_t *pThis, wti_t *pWti, uint8_t newState) { - pThis->eState = newState; - DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); + setActionState(pWti, pThis, newState); + DBGPRINTF("Action %d transitioned to state: %s\n", + pThis->iActionNbr, getActStateName(pThis, pWti)); } /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ -static void actionCommitted(action_t *pThis) +static void actionCommitted(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } /* set action to "rtry" state. * rgerhards, 2007-08-02 */ -static void actionRetry(action_t *pThis) +static void actionRetry(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RTRY); - pThis->iResumeOKinRow++; + actionSetState(pThis, pWti, ACT_STATE_RTRY); + incActionResumeInRow(pWti, pThis); } @@ -591,7 +584,7 @@ static void actionRetry(action_t *pThis) */ static void actionDisable(action_t *pThis) { - actionSetState(pThis, ACT_STATE_DIED); + pThis->bDisabled = 1; } @@ -603,7 +596,7 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis) +static inline void actionSuspend(action_t *pThis, wti_t *pWti) { time_t ttNow; @@ -611,8 +604,9 @@ static inline void actionSuspend(action_t *pThis) * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); - actionSetState(pThis, ACT_STATE_SUSP); + pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * + (getActionNbrResRtry(pWti, pThis) / 10 + 1); + actionSetState(pThis, pWti, ACT_STATE_SUSP); DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -625,15 +619,15 @@ static inline void actionSuspend(action_t *pThis) * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog * engine to go into a tight loop. That obviously is not acceptable. As such, we track the * count of iterations that a tryResume returning RS_RET_OK is immediately followed by - * an unsuccessful call to doAction(). If that happens more than 1,000 times, we assume + * an unsuccessful call to doAction(). If that happens more than 10 times, we assume * the return acutally is a RS_RET_SUSPENDED. In order to go through the various - * resumption stages, we do this for every 1000 requests. This magic number 1000 may + * resumption stages, we do this for every 10 requests. This magic number 10 may * not be the most appropriate, but it should be thought of a "if nothing else helps" * kind of facility: in the first place, the module should return a proper indication * of its inability to recover. -- rgerhards, 2010-04-26. */ -static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +static rsRetVal +actionDoRetry(action_t *pThis, wti_t *pWti) { int iRetries; int iSleepPeriod; @@ -643,31 +637,31 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); - if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { + if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) { bTreatOKasSusp = 1; - pThis->iResumeOKinRow = 0; + setActionResumeInRow(pWti, pThis, 0); } else { bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis); + actionSuspend(pThis, pWti); } else { - ++pThis->iNbrResRtry; + incActionNbrResRtry(pWti, pThis); ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); - if(*pbShutdownImmediate) { + if(*pWti->pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } } @@ -676,8 +670,8 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) } } - if(pThis->eState == ACT_STATE_RDY) { - pThis->iNbrResRtry = 0; + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + setActionNbrResRtry(pWti, pThis, 0); } finalize_it: @@ -696,6 +690,7 @@ dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), pThis->pModData)); pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ } finalize_it: RETiRet; @@ -704,14 +699,13 @@ finalize_it: /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +static rsRetVal +actionTryResume(action_t *pThis, wti_t *pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_SUSP) { + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used * to use the action timestamp, but in this case we will never reach a @@ -721,19 +715,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t */ datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow >= pThis->ttResumeRtry) { - actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ + actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ } } - if(pThis->eState == ACT_STATE_RTRY) { + if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionDoRetry(pThis, pWti)); } - if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", - pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); } finalize_it: @@ -745,25 +739,27 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +static inline rsRetVal +actionPrepare(action_t *pThis, wti_t *pWti) { DEFiRet; assert(pThis != NULL); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionTryResume(pThis, pWti)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ - if(pThis->eState == ACT_STATE_RDY) { + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { +dbgprintf("DDDDD: calling beginTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionSetState(pThis, ACT_STATE_ITX); + actionSetState(pThis, pWti, ACT_STATE_ITX); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -777,10 +773,11 @@ finalize_it: } +#if 0 // TODO: remove? /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ -rsRetVal actionDbgPrint(action_t *pThis) +static rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; char *sz; @@ -790,11 +787,12 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - if(pThis->eState == ACT_STATE_SUSP) { +#if 0 // do we need this ??? + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\tState: %s\n", getActStateName(pThis)); +#endif dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { sz = "slow, but feature-rich"; @@ -810,40 +808,41 @@ rsRetVal actionDbgPrint(action_t *pThis) RETiRet; } +#endif /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ static rsRetVal -prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) +prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { int i; - msg_t *pMsg; struct json_object *json; + actWrkrIParams_t *iparams; + actWrkrInfo_t *pWrkrInfo; DEFiRet; - ASSERT(pAction != NULL); - ASSERT(pElem != NULL); - - pMsg = pElem->pMsg; + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), - &pElem->staticLenStrings[i], ttNow)); - pElem->staticActParams[i] = pElem->staticActStrings[i]; + CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); + iparams->msgFlags = pMsg->msgFlags; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(iparams->staticActStrings[i]), + &iparams->staticLenStrings[i], ttNow)); + iparams->staticActParams[i] = iparams->staticActStrings[i]; break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow)); break; case ACT_MSG_PASSING: - pElem->staticActParams[i] = (void*) pMsg; + pWrkrInfo->staticActParams[i] = (void*) pMsg; break; case ACT_JSON_PASSING: CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); - pElem->staticActParams[i] = (void*) json; + pWrkrInfo->staticActParams[i] = (void*) json; break; default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", (int) pAction->eParamPassing); @@ -857,100 +856,85 @@ finalize_it: } -/* free a batches ressources, but not string buffers (because they will - * most probably be reused). String buffers are only deleted upon final - * destruction of the batch. - * This function here must be called only when the batch is actually no - * longer used, also not for retrying actions or such. It invalidates - * buffers. - * rgerhards, 2010-12-17 - */ -static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +static void +releaseDoActionParams(action_t *pAction, wti_t *pWti) { int jArr; - int i, j; - batch_obj_t *pElem; + int j; + actWrkrInfo_t *pWrkrInfo; uchar ***ppMsgs; - DEFiRet; - - ASSERT(pAction != NULL); if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) goto done; /* we need to do nothing with these types! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - ppMsgs = (uchar***) pElem->staticActParams; - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - if(((uchar**)ppMsgs)[j] != NULL) { - jArr = 0; - while(ppMsgs[j][jArr] != NULL) { - d_free(ppMsgs[j][jArr]); - ppMsgs[j][jArr] = NULL; - ++jArr; - } - d_free(((uchar**)ppMsgs)[j]); - ((uchar**)ppMsgs)[j] = NULL; - } + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pWrkrInfo->staticActParams; + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + free(ppMsgs[j][jArr]); + ppMsgs[j][jArr] = NULL; + ++jArr; } - break; - case ACT_JSON_PASSING: - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - json_object_put((struct json_object*) - pElem->staticActParams[j]); - pElem->staticActParams[j] = NULL; - } - break; - case ACT_STRING_PASSING: - case ACT_MSG_PASSING: - /* can never happen, just to keep compiler happy! */ - break; + free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; } } + break; + case ACT_JSON_PASSING: + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + json_object_put((struct json_object*) + pWrkrInfo->staticActParams[j]); + pWrkrInfo->staticActParams[j] = NULL; + } + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* can never happen, just to keep compiler happy! */ + break; } -done: RETiRet; +done: return; } /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ -rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) +static rsRetVal +actionCallDoAction(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", - getActStateName(pThis), pThis->iActionNbr); + getActStateName(pThis, pWti), pThis->iActionNbr); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, + iRet = pThis->pMod->mod.om.doAction(actParams, msgFlags, pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + actionCommitted(pThis, pWti); + setActionResumeInRow(pWti, pThis, 0); break; case RS_RET_DEFER_COMMIT: - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); /* we are done, action state remains the same */ break; case RS_RET_PREVIOUS_COMMITTED: /* action state remains the same, but we had a commit. */ pThis->bHadAutoCommit = 1; - pThis->iResumeOKinRow = 0; /* we had a successful call! */ + setActionResumeInRow(pWti, pThis, 0); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -960,7 +944,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) */ FINALIZE; } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; @@ -971,57 +955,90 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti) +rsRetVal +actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - - CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionPrepare(pThis, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) - pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); - if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti)); - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } -/* finish processing a batch. Most importantly, that means we commit if we - * need to do so. - * rgerhards, 2008-01-28 - */ +/* the following functions simulates a potential future new omo callback */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) +doTransaction(action_t *pThis, wti_t *pWti) { + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparamCurr; int i; DEFiRet; - ASSERT(pThis != NULL); + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + dbgprintf("DDDD: doTransaction: action %d, currIParams %d\n", pThis->iActionNbr, wrkrInfo->currIParam); + for(i = 0 ; i < wrkrInfo->currIParam ; ++i) { + iparamCurr = wrkrInfo->iparams + i; + iRet = actionProcessMessage(pThis, iparamCurr->msgFlags, + iparamCurr->staticActParams, + pWti); + dbgprintf("DDDD: doTransaction loop, iRet %d\n", iRet); + } + RETiRet; +} + - if(pThis->eState == ACT_STATE_RDY) { - /* we just need to flag the batch as commited */ - FINALIZE; /* nothing to do */ +static void +actionFreeParams(action_t *pThis, wti_t *pWti) +{ + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparamCurr; + int i; + int j; + + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + dbgprintf("DDDD: actionFreeParams: action %d, currIParam %d\n", pThis->iActionNbr, wrkrInfo->currIParam); + for(i = 0 ; i < wrkrInfo->currIParam ; ++i) { + iparamCurr = wrkrInfo->iparams + i; + for(j = 0 ; j < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++j) { + /* TODO: we can save time by not freeing everything, + * but that's left for a later optimization. + */ + free(iparamCurr->staticActStrings[j]); + iparamCurr->staticActStrings[j] = NULL; + iparamCurr->staticLenStrings[j] = 0; + } } + wrkrInfo->currIParam = 0; /* reset to beginning */ + releaseDoActionParams(pThis, pWti); +} + + +/* Commit try committing (do not handle retry processing and such) */ +static rsRetVal +actionTryCommit(action_t *pThis, wti_t *pWti) +{ + //actWrkrInfo_t *wrkrInfo; + DEFiRet; - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); - if(pThis->eState == ACT_STATE_ITX) { + doTransaction(pThis, pWti); + + CHKiRet(actionPrepare(pThis, pWti)); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) { +dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); - /* flag messages as committed */ - for(i = 0 ; i < pBatch->nElem ; ++i) { - batchSetElemState(pBatch, i, BATCH_STATE_COMM); - pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ - } + actionCommitted(pThis, pWti); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -1029,12 +1046,12 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) case RS_RET_DEFER_COMMIT: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; case RS_RET_PREVIOUS_COMMITTED: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) @@ -1042,283 +1059,130 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) FINALIZE; } } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: + actionFreeParams(pThis, pWti); RETiRet; } - -/* try to submit a partial batch of elements. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti) -{ - int i; - int iElemProcessed; - int iCommittedUpTo; - msg_t *pMsg; - rsRetVal localRet; - DEFiRet; - - assert(pBatch != NULL); - assert(pnElem != NULL); - - i = pBatch->iDoneUpTo; /* all messages below that index are processed */ - iElemProcessed = 0; - iCommittedUpTo = i; - DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); - while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - /* NOTE: do NOT extend the filter below! Anything else must be done on the - * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 - */ - if(batchIsValidElem(pBatch, i)) { - pMsg = pBatch->pElem[i].pMsg; - localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate, pWti); - DBGPRINTF("action %p call returned %d\n", pAction, localRet); - /* Note: we directly modify the batch object state, because we know that - * wo do not overwrite BATCH_STATE_DISC indicators! - */ - if(localRet == RS_RET_OK) { - /* mark messages as committed */ - while(iCommittedUpTo <= i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - /* mark messages as committed */ - while(iCommittedUpTo < i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; - } - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", - localRet, *pnElem, iCommittedUpTo); - iRet = localRet; - FINALIZE; - } - } - ++i; - ++iElemProcessed; - } - -finalize_it: - if(pBatch->iDoneUpTo != iCommittedUpTo) { - pBatch->iDoneUpTo = iCommittedUpTo; - } - RETiRet; -} - -/* submit a batch for actual action processing. - * The first nElem elements are processed. This function calls itself - * recursively if it needs to handle errors. - * Note: we don't need the number of the first message to be processed as a parameter, - * because this is kept track of inside the batch itself (iDoneUpTo). - * rgerhards, 2009-05-12 +/* Note: we currently need to return an iRet, as this is used in + * direct mode. TODO: However, it may be worth further investigating this, + * as it looks like there is no ultimate consumer of this code. + * rgerhards, 2013-11-06 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti) +actionCommit(action_t *pThis, wti_t *pWti) { - int i; - int bDone; - rsRetVal localRet; - int wasDoneTo; + sbool bDone; DEFiRet; - assert(pBatch != NULL); - - DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); - wasDoneTo = pBatch->iDoneUpTo; + /* even more TODO: + This is the place where retry processing needs to go in. If the action + permanently fails, we should - as a new feature - add the capability to + write an error file. This is already done be omelasticsearch, and IMHO + pretty useful. + For the time being, I do NOT implement all of this (not even retry!) + as I want to get the rest of the engine to SISD (non-SIMD ;)) so that + I know any potential suprises and complications that arise out of this. + When this is done, I can come back here and complete this work. Obviously, + many features do not work in the mean time (but it is not planned to release + any of these partial implementations). + rgerhards, 2013-11-04 + */ bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem, pWti); - if(localRet == RS_RET_FORCE_TERM) { + iRet = actionTryCommit(pThis, pWti); + DBGPRINTF("actionCommit, in retry loop, iRet %d\n", iRet); + if(iRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - /* try commit transaction, once done, we can simply do so as if - * that return state was returned from tryDoAction(). - */ - localRet = finishBatch(pAction, pBatch, pWti); - } - - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { + } else if(iRet == RS_RET_OK || iRet == RS_RET_ACTION_FAILED) { bDone = 1; - } else if(localRet == RS_RET_SUSPENDED) { - DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); - /* do nothing, this will retry the full batch */ - } else if(localRet == RS_RET_ACTION_FAILED) { - /* in this case, everything not yet committed is BAD */ - for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && pBatch->eltState[i] != BATCH_STATE_COMM ) { - pBatch->eltState[i] = BATCH_STATE_BAD; - pBatch->pElem[i].bPrevWasSuspended = 1; - STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); - } - } - bDone = 1; - } else { - if(nElem == 1) { - batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); - bDone = 1; - } else { - /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " - "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2, pWti); - submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti); - bDone = 1; - } } - } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - -finalize_it: - RETiRet; -} - - -/* copy "active" array of batch, as we need to modify it. The caller - * must make sure the new array is freed and the orginal batch - * pointer is restored (thus the caller must save it). If active - * is currently NULL, this is properly handled. - * Note: the batches active pointer is modified, so it must be - * saved BEFORE calling this function! - * rgerhards, 2012-09-12 - */ -static rsRetVal -copyActive(batch_t *pBatch) -{ - sbool *active; - DEFiRet; - - CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - if(pBatch->active == NULL) - memset(active, 1, batchNumMsgs(pBatch)); - else - memcpy(active, pBatch->active, batchNumMsgs(pBatch)); - pBatch->active = active; + } while(!bDone); finalize_it: RETiRet; } -/* The following function prepares a batch for processing, that it is - * reinitializes batch states, generates strings and does everything else - * that needs to be done in order to make the batch ready for submission to - * the actual output module. Note that we look at the precomputed - * filter OK condition and process only those messages, that actually matched - * the filter. - * rgerhards, 2010-06-14 - */ -static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) +/* Commit all active transactions in *DIRECT mode* */ +void +actionCommitAllDirect(wti_t *pWti) { int i; - batch_obj_t *pElem; - struct syslogTime ttNow; - DEFiRet; - - /* indicate we have not yet read the date */ - ttNow.year = 0; + action_t *pAction; - pBatch->iDoneUpTo = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - pBatch->eltState[i] = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { - /* make sure we have our copy of "active" array */ - if(!*bMustRestoreActivePtr) { - *activeSave = pBatch->active; - copyActive(pBatch); - } - pBatch->active[i] = RSFALSE; - } - } + for(i = 0 ; i < iActionNbr ; ++i) { + dbgprintf("DDDD: actionCommitAll: action %d, state %u, nbr to commit %d\n", + i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->currIParam); + pAction = pWti->actWrkrInfo[i].pAction; + if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT) + actionCommit(pAction, pWti); } - RETiRet; } - -/* receive a batch and process it. This includes retry handling. - * rgerhards, 2009-05-12 +/* process a single message. This is both called if we run from the + * cosumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". */ -static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti) +static rsRetVal +processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { DEFiRet; - assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti)); - iRet = finishBatch(pAction, pBatch, pWti); + if(pAction->bExecWhenPrevSusp && !pWti->execState.bPrevWasSuspended) { + DBGPRINTF("action %d: NOT executing, as previous action was " + "not suspended\n", pAction->iActionNbr); + FINALIZE; + } + +dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg); + iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); + if(pAction->eParamPassing == ACT_STRING_PASSING) { + pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; + dbgprintf("DDDD: action %d is string passing - executing in commit phase\n", pAction->iActionNbr); + iRet = getReturnCode(pAction, pWti); + FINALIZE; + } + iRet = actionProcessMessage(pAction, pMsg->msgFlags, + pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, + pWti); + releaseDoActionParams(pAction, pWti); finalize_it: + if(pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); + pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED); +dbgprintf("DDDD: bPrevWasSuspended now %d, action state %d\n", (int)pWti->execState.bPrevWasSuspended, getActionState(pWti, pAction)); RETiRet; } - -/* receive an array of to-process user pointers and submit them - * for processing. - * rgerhards, 2009-04-22 +/* This entry point is called by the ACTION queue (not main queue!) */ static rsRetVal -processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) +processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti) { - int *pbShutdownImmdtSave; - sbool *activeSave; - int bMustRestoreActivePtr = 0; - rsRetVal localRet; action_t *pAction = (action_t*) pVoid; + msg_t *pMsg; + int i; + struct syslogTime ttNow; DEFiRet; - assert(pBatch != NULL); - - if(pbShutdownImmediate != NULL) { - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; - } - CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - - iRet = processAction(pAction, pBatch, pWti); - - /* even if processAction failed, we need to release the batch (else we - * have a memory leak). So we do this first, and then check if we need to - * return an error code. If so, the code from processAction has priority. - * rgerhards, 2010-12-17 - */ - localRet = releaseBatch(pAction, pBatch); + wtiResetExecState(pWti, pBatch); + /* indicate we have not yet read the date */ + ttNow.year = 0; - if(iRet == RS_RET_OK) - iRet = localRet; - - if(bMustRestoreActivePtr) { - free(pBatch->active); - pBatch->active = activeSave; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) { + if(batchIsValidElem(pBatch, i)) { + pMsg = pBatch->pElem[i].pMsg; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } } -finalize_it: - if(pbShutdownImmediate != NULL) - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + if(!pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); +dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } @@ -1376,27 +1240,28 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. This is also utilized to submit messages in complex case once + * and thus speed. This is also utilized to submit messages in more complex cases once * the complex logic has been applied ;) * rgerhards, 2010-06-08 */ -static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) +static rsRetVal +doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg) { + struct syslogTime ttNow; // TODO: think if we can buffer this in pWti DEFiRet; - if(pAction->eState == ACT_STATE_DIED) { - DBGPRINTF("action %p died, do NOT execute\n", pAction); - FINALIZE; - } + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti); - else + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); + } else {/* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); + } -finalize_it: RETiRet; } @@ -1464,27 +1329,32 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg, pWti); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; } -/* helper to actonCallAction, mostly needed because of this damn - * pthread_cleanup_push() POSIX macro... +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 */ -static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +doSubmitToActionQComplex(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - msg_t *pMsg; DEFiRet; - pMsg = pBatch->pElem[idxBtch].pMsg; + d_pthread_mutex_lock(&pAction->mutAction); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ + // TODO: can we optimize the "now" handling again (was batch, I guess...)? /* don't output marks to recently written outputs */ - if(pAction->bWriteAllMarkMsgs == RSFALSE + if(pAction->bWriteAllMarkMsgs == 0 && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { ABORT_FINALIZE(RS_RET_OK); } @@ -1493,15 +1363,12 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: - /* we need to update the batch to handle failover processing correctly */ - if(iRet == RS_RET_OK) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 0; - } else if(iRet == RS_RET_ACTION_FAILED) { - pBatch->pElem[idxBtch].bPrevWasSuspended = 1; - } + d_pthread_mutex_unlock(&pAction->mutAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } +#pragma GCC diagnostic warning "-Wempty-body" /* helper to activateActions, it activates a specific action. @@ -1550,195 +1417,41 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQNotAllMark(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - time_t now = 0; + int doProcess = 1; time_t lastAct; - int i; - sbool *activeSave; - DEFiRet; - - activeSave = pBatch->active; - copyActive(pBatch); - - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) - continue; - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - } - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if(pBatch->pElem[i].pMsg->msgFlags & MARK) { - if((now - lastAct) < MarkInterval / 2) { - pBatch->active[i] = 0; - DBGPRINTF("batch item %d: action was recently called, ignoring " - "mark message\n", i); - break; /* do not update timestamp for non-written mark messages */ - } - } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->active[i]) { - DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", - i, module.GetStateName(pAction->pMod)); - } - } - - iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); - - free(pBatch->active); - pBatch->active = activeSave; - - RETiRet; -} - -static inline void -countStatsBatchEnq(action_t *pAction, batch_t *pBatch) -{ - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - } - } -} - - -/* enqueue a batch in direct mode. We have put this into its own function just to avoid - * cluttering the actual submit function. - * rgerhards, 2011-06-16 - */ -static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) -{ - sbool bNeedSubmit; - sbool *activeSave; - int i; DEFiRet; - activeSave = pBatch->active; - copyActive(pBatch); - - /* note: for direct mode, we need to adjust the filter property. For non-direct - * this is not necessary, because in that case we enqueue only what actually needs - * to be processed. + /* TODO: think about the whole logic. If messages come in out of order, things + * tend to become a bit unreliable. On the other hand, this only happens if we have + * very high traffic, in which this use case here is not really affected (as the + * MarkInterval is pretty corase). */ - if(pAction->bExecWhenPrevSusp) { - bNeedSubmit = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(!pBatch->pElem[i].bPrevWasSuspended) { - DBGPRINTF("action enq stage: change active to 0 due to " - "failover case in elem %d\n", i); - pBatch->active[i] = 0; - } - if(batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - bNeedSubmit = 1; - } - DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - } - if(bNeedSubmit) { - /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); - } else { - DBGPRINTF("no need to submit batch, all invalid\n"); - } - } else { - if(GatherStats) - countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); - } - - free(pBatch->active); - pBatch->active = activeSave; - RETiRet; -} - -/* This submits the message to the action queue in case we do NOT need to handle repeat - * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. - * rgerhards, 2010-06-08 - */ -static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); - - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); - } else {/* in this case, we do single submits to the queue. - * TODO: optimize this, we may do at least a multi-submit! - */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((pMsg->ttGenTime - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ } } - } - - RETiRet; -} - - - -/* Helper to submit a batch of actions to the engine. Note that we have rather - * complicated processing here, so we need to do this one message after another. - * rgerhards, 2010-06-23 - */ -static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) -{ - int i; - DEFiRet; + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); - DBGPRINTF("Called action %p (complex case), logging to %s\n", - pAction, module.GetStateName(pAction->pMod)); - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( batchIsValidElem(pBatch, i) - && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i, pWti); - } + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } -/* Call configured action, most complex case with all features supported (and thus slow). - * rgerhards, 2010-06-08 - */ -#pragma GCC diagnostic ignored "-Wempty-body" -static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) -{ - DEFiRet; - - d_pthread_mutex_lock(&pAction->mutAction); -dbgprintf("DDDD: locked mutAction\n"); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); - d_pthread_mutex_unlock(&pAction->mutAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - - RETiRet; -} -#pragma GCC diagnostic warning "-Wempty-body" - /* apply all params from param block to action. This supports the v6 config system. * Defaults must have been set appropriately during action construct! @@ -1789,7 +1502,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, - struct nvlst *lst, int bSuspended) + struct nvlst *lst) { DEFiRet; int i; @@ -1818,7 +1531,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg; cs.iActExecEveryNthOccur = 0; /* auto-reset */ cs.iActExecEveryNthOccurTO = 0; /* auto-reset */ - cs.bActionWriteAllMarkMsgs = RSFALSE; /* auto-reset */ + cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */ cs.pszActionName = NULL; /* free again! */ } else { actionApplyCnfParam(pAction, actParams); @@ -1876,15 +1589,10 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* check if the module is compatible with select features (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ - - if(bSuspended) - actionSuspend(pAction); CHKiRet(actionConstructFinalize(pAction, lst)); - /* TODO: if we exit here, we have a memory leak... */ + /* TODO: if we exit here, we have a (quite acceptable...) memory leak */ *ppAction = pAction; /* finally store the action pointer */ @@ -1920,7 +1628,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus static inline void initConfigVariables(void) { - cs.bActionWriteAllMarkMsgs = RSFALSE; + cs.bActionWriteAllMarkMsgs = 1; cs.glbliActionResumeRetryCount = 0; cs.bActExecWhenPrevSusp = 0; cs.iActExecOnceInterval = 0; @@ -1959,17 +1667,11 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); ABORT_FINALIZE(RS_RET_MOD_UNKNOWN); } - iRet = pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR); - // TODO: check if RS_RET_SUSPENDED is still valid in v6! - if(iRet != RS_RET_OK && iRet != RS_RET_SUSPENDED) { - FINALIZE; /* iRet is already set to error state */ - } + CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR)); - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst, - (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ } *ppAction = pAction; @@ -1980,8 +1682,6 @@ finalize_it: RETiRet; } -/* TODO: we are not yet a real object, the ClassInit here just looks like it is.. - */ rsRetVal actionClassInit(void) { DEFiRet; @@ -35,15 +35,6 @@ extern int glbliActionResumeRetryCount; -typedef enum { - ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZERO! */ - ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */ - ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */ - ACT_STATE_COMM = 3, /* transaction finished (a transient state) */ - ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */ - ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */ -} action_state_t; - /* the following struct defines the action object data structure */ struct action_s { @@ -54,14 +45,12 @@ struct action_s { int iActionNbr; /* this action's number (ID) */ sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */ sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */ - int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */ - action_state_t eState; /* current state of action */ sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */ + sbool bDisabled; + int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */ time_t ttResumeRtry; /* when is it time to retry the resume? */ - int iResumeOKinRow; /* number of times in a row that resume said OK with an immediate failure following */ int iResumeInterval;/* resume interval for this action */ int iResumeRetryCount;/* how often shall we retry a suspended action? (-1 --> eternal) */ - int iNbrResRtry; /* number of retries since last suspend */ int iNbrNoExec; /* number of matches that did not yet yield to an exec */ int iExecEveryNthOccur;/* execute this action only every n-th occurence (with n=0,1 -> always) */ int iExecEveryNthOccurTO;/* timeout for n-th occurence feature */ @@ -69,7 +58,7 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - rsRetVal (*submitToActQ)(action_t *, batch_t *, wti_t*);/* function submit message to action queue */ + rsRetVal (*submitToActQ)(action_t *, wti_t*, msg_t*);/* function submit message to action queue */ rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2, ACT_JSON_PASSING = 3} @@ -79,7 +68,7 @@ struct action_s { * in this order. */ qqueue_t *pQueue; /* action queue */ pthread_mutex_t mutAction; /* primary action mutex */ - uchar *pszName; /* action name (for documentation) */ + uchar *pszName; /* action name */ DEF_ATOMIC_HELPER_MUT(mutCAS); /* for statistics subsystem */ statsobj_t *statsobj; @@ -93,16 +82,17 @@ struct action_s { rsRetVal actionConstruct(action_t **ppThis); rsRetVal actionConstructFinalize(action_t *pThis, struct nvlst *lst); rsRetVal actionDestruct(action_t *pThis); -rsRetVal actionDbgPrint(action_t *pThis); +//rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); -rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct nvlst *lst, int bSuspended); +rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct nvlst *lst); rsRetVal activateActions(void); rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction); rsRetVal actionProcessCnf(struct cnfobj *o); +void actionCommitAllDirect(wti_t *pWti); /* external data */ extern int iActionNbr; diff --git a/doc/Makefile.am b/doc/Makefile.am index 867a63b7..f7a9efe0 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -121,6 +121,9 @@ html_files = \ v3compatibility.html \ v4compatibility.html \ v5compatibility.html \ + v6compatibility.html \ + v7compatibility.html \ + v8compatibility.html \ im3195.html \ netstream.html \ ns_gtls.html \ @@ -157,8 +160,6 @@ html_files = \ rsconf1_omfileforcechown.html \ rsyslog_queue_pointers.jpeg \ rsyslog_queue_pointers2.jpeg \ - v6compatibility.html \ - v7compatibility.html \ rsyslog_conf_basic_structure.html \ rsyslog_conf_sysklogd_compatibility.html \ imkmsg.html \ diff --git a/doc/manual.html b/doc/manual.html index 3e3625d9..7ca40491 100644 --- a/doc/manual.html +++ b/doc/manual.html @@ -27,16 +27,18 @@ want to lend us a helping hand. </b>It doesn't require a lot of time - even a single mouse click helps. Learn <a href="how2help.html">how to help the rsyslog project</a>. Due to popular demand, there is now a <a href="rsyslog_ng_comparison.html">side-by-side comparison between rsyslog and syslog-ng</a>.</p> -<p>If you are upgrading from rsyslog v2 or stock sysklogd, -<a href="v3compatibility.html">be sure to read the rsyslog v3 compatibility notes</a>, -and if you are upgrading from v3, read the -<a href="v4compatibility.html">rsyslog v4 compatibility notes</a>, -if you upgrade from v4, read the -<a href="v5compatibility.html">rsyslog v5 compatibility notes</a>, and -if you upgrade from v5, read the -<a href="v6compatibility.html">rsyslog v6 compatibility notes</a>. +If you upgrade from v7, read the +<a href="v8compatibility.html">rsyslog v8 compatibility notes</a>. if you upgrade from v6, read the <a href="v7compatibility.html">rsyslog v7 compatibility notes</a>. +if you upgrade from v5, read the +<a href="v6compatibility.html">rsyslog v6 compatibility notes</a>. +if you upgrade from v4, read the +<a href="v5compatibility.html">rsyslog v5 compatibility notes</a>. +if you upgrade from v3, read the +<a href="v4compatibility.html">rsyslog v4 compatibility notes</a>. +<p>If you are upgrading from rsyslog v2 or stock sysklogd, +<a href="v3compatibility.html">be sure to read the rsyslog v3 compatibility notes</a>, <p>Rsyslog will work even if you do not read the doc, but doing so will definitely improve your experience.</p> <p><b>Follow the links below for the</b></p> @@ -44,7 +46,7 @@ if you do not read the doc, but doing so will definitely improve your experience <li><a href="troubleshoot.html">troubleshooting rsyslog problems</a></li> <li><a href="rsyslog_conf.html">configuration file format (rsyslog.conf)</a></li> <li><a href="http://www.rsyslog.com/tool-regex">a regular expression checker/generator tool for rsyslog</a></li> -<li> <a href="property_replacer.html">property replacer, an important core component</a></li> +<li><a href="property_replacer.html">property replacer, an important core component</a></li> <li><a href="bugs.html">rsyslog bug list</a></li> <li><a href="messageparser.html">understanding rsyslog message parsers</a></li> <li><a href="generic_design.html">backgrounder on generic syslog application design</a></li> diff --git a/doc/rsyslog_conf_actions.html b/doc/rsyslog_conf_actions.html index adca540b..260d1f2b 100644 --- a/doc/rsyslog_conf_actions.html +++ b/doc/rsyslog_conf_actions.html @@ -31,8 +31,14 @@ implemented via <a href="rsyslog_conf_modules.html#om">output modules</a>. <br>used for statistics gathering and documentation <li><b>type</b> string <br>Mandatory parameter for every action. The name of the module that should be used. </li> - <li><b>action.writeAllMarkMessages</b> on/off - <br>Normally, mark messages are written to actions only if the action was not recently executed (by default, recently means within the past 20 minutes). If this setting is switched to "on", mark messages are always sent to actions, no matter how recently they have been executed. In this mode, mark messages can be used as a kind of heartbeat.</li> + <li><b>action.writeAllMarkMessages</b> <i>on</i>/off + <br>This setting tells if mark messages are always written ("on", the default) or only + if the action was not recently executed ("off"). By default, recently means within the + past 20 minutes. If this setting is "on", mark messages are always sent to actions, no + matter how recently they have been executed. In this mode, mark messages can be used as + a kind of heartbeat. This mode also enables faster processing inside the rule engine. So + it should be set to "off" only when there is a good reason to do so. + </li> <li><b>action.execOnlyEveryNthTime</b> integer <br>If configured, the next action will only be executed every n-th time. For example, if configured to 3, the first two messages that go into the action will be dropped, the 3rd will actually cause the action to execute, the 4th and 5th will be dropped, the 6th executed under the action, ... and so on.</li> <li><b>action.execOnlyEveryNthTimeout</b> integer diff --git a/doc/v8compatibility.html b/doc/v8compatibility.html new file mode 100644 index 00000000..37e8e31f --- /dev/null +++ b/doc/v8compatibility.html @@ -0,0 +1,82 @@ +<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> +<html><head><title>Compatibility notes for rsyslog v8</title> +</head> +<body> +<h1>Compatibility Notes for rsyslog v8</h1> +This document describes things to keep in mind when moving from v7 to v8. It +does not list enhancements nor does it talk about compatibility concerns introduced +by earlier versions (for this, see their respective compatibility documents). Its focus +is primarily on what you need to know if you used v7 and want to use v8 without hassle. +<p>Version 8 offers a completely rewritten core rsyslog engine. This resulted in +a number of changes that are visible to users and (plugin) developers. +Most importantly, pre-v8 plugins <b>do not longer work</b> and need to +be updated to support the new calling interfaces. If you developed a plugin, +be sure to review the developer section below. +</p> +<h2>Mark Messages</h2> +<p>In previous versions, mark messages were by default only processed if an +action was not executed for some time. The default has now changed, and mark +messages are now always processed. Note that this enables faster processing +inside rsyslog. To change to previous behaviour, you need to add +action.writeAllMarkMessages="off" to the actions in question. +<h2>What Developers need to Know</h2> +<h3>output plugin interface</h3> +<p>To support the new core engine, the output interface has been considerably +changed. It is suggested to review some of the project-provided plugins for +full details. In this doc, we describe the most important changes from a high +level perspective. +<p><b>Multi-thread awareness required</b></p> +<p>The new engine activates one <b>worker</b>instance of output actions on +each worker thread. This means an action has now three types of data: +<ul> +<li>global +<li>action-instance - previously known pData, one for each action inside the config +<li>worker-action-instance - one for each worker thread (called pWrkrData), note +that this is specific to exactly one pData +</ul> +The plugin <b>must</b> now by multi-threading aware. It may be called by multiple +threads concurrently, but it is guaranteed that each call is for a unique +pWrkrData structure. This still permits to write plugins easily, but enables the +engine to work with much higher performance. Note that plugin developers +should assume it is the norm that multiple concurrent worker action instances +are active a the some time. + +<p><b>New required entry points</b></p> +<p>In order to support the new threading model, new entry points are required. +Most importantly, only the plugin knows which data must be present in pData and +pWrkrData, so it must created and destroy these data structures on request of +the engine. Note that pWrkrData may be destroyed at any time and new ones +re-created later. Depending on workload structure and configuration, this can happen +frequently. +<p>New entry points are: +<ul> +<li>createWrkrInstance +<li>freeWrkrInstance +</ul> + +The calling interface for these entry points has changed. Basically, +they now receive a pWrkrData object instead pData. It is assumed that +createWrkrInstance populates pWrkrData->pData appropriately. +<ul> +<li>beginTransaction +<li>doAction +<li>endTransaction +</ul> + +<p><b>RS_RET_SUSPENDED is no longer supported when creating an action instance</b> +<p>This means a plugin must not try to establish any connections or the like +before any of its processing entry points (like beginTransaction or doAction) +is called. This was generally also the case von v7, but was not enforced in +all cases. In v8, creating action creation fails if anything but RS_RET_OK +is returned. + +<a href="http://www.rsyslog.com/g/BSD">rsyslog BSD blocks info</a> +page for more information and how to upgrade your config. +<p>[<a href="manual.html">manual index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p> + +<p><font size="2">This documentation is part of the +<a href="http://www.rsyslog.com/">rsyslog</a> project.<br> +Copyright © 2013 by <a href="http://www.gerhards.net/rainer">Rainer Gerhards</a> and +<a href="http://www.adiscon.com/">Adiscon</a>. Released under the GNU GPL +version 2 or higher.</font></p> +</body></html> diff --git a/runtime/batch.h b/runtime/batch.h index 2ec07670..b0aa8574 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -46,17 +46,6 @@ typedef unsigned char batch_state_t; */ struct batch_obj_s { msg_t *pMsg; - /* work variables for action processing; these are reused for each action (or block of - * actions) - */ - sbool bPrevWasSuspended; - /* following are caches to save allocs if not absolutely necessary */ - uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ - /* a cache to save malloc(), if not absolutely necessary */ - void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */ - size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; - /* and the same for the message length (if used) */ - /* end action work variables */ }; /* the batch @@ -77,11 +66,7 @@ struct batch_s { int maxElem; /* maximum number of elements that this batch supports */ int nElem; /* actual number of element in this entry */ int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */ - int iDoneUpTo; /* all messages below this index have state other than RDY */ qDeqID deqID; /* ID of dequeue operation that generated this batch */ - int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ - sbool *active; /* which messages are active for processing, NULL=all */ - sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */ batch_obj_t *pElem; /* batch elements */ batch_state_t *eltState;/* state (array!) for individual objects. NOTE: we have moved this out of batch_obj_t because we @@ -93,24 +78,6 @@ struct batch_s { }; -/* some inline functions (we may move this off to an object .. or not) */ -static inline void -batchSetSingleRuleset(batch_t *pBatch, sbool val) { - pBatch->bSingleRuleset = val; -} - -/* get the batches ruleset (if we have a single ruleset) */ -static inline ruleset_t* -batchGetRuleset(batch_t *pBatch) { - return (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL; -} - -/* get the ruleset of a specifc element of the batch (index not verified!) */ -static inline ruleset_t* -batchElemGetRuleset(batch_t *pBatch, int i) { - return pBatch->pElem[i].pMsg->pRuleset; -} - /* get number of msgs for this batch */ static inline int batchNumMsgs(batch_t *pBatch) { @@ -134,8 +101,7 @@ batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) { */ static inline int batchIsValidElem(batch_t *pBatch, int i) { - return( (pBatch->eltState[i] != BATCH_STATE_DISC) - && (pBatch->active == NULL || pBatch->active[i])); + return(pBatch->eltState[i] != BATCH_STATE_DISC); } @@ -152,7 +118,7 @@ batchFree(batch_t *pBatch) { /* staticActParams MUST be freed immediately (if required), * so we do not need to do that! */ - free(pBatch->pElem[i].staticActStrings[j]); + //TODO: do this in wti! free(pBatch->pElem[i].staticActStrings[j]); } } free(pBatch->pElem); @@ -167,11 +133,9 @@ batchFree(batch_t *pBatch) { static inline rsRetVal batchInit(batch_t *pBatch, int maxElem) { DEFiRet; - pBatch->iDoneUpTo = 0; pBatch->maxElem = maxElem; CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t))); - // TODO: replace calloc by inidividual writes? finalize_it: RETiRet; } diff --git a/runtime/conf.c b/runtime/conf.c index 2b000c60..83931bca 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -518,12 +518,10 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction) bHadWarning = 1; iRet = RS_RET_OK; } - if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) { - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL, - (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + if(iRet == RS_RET_OK) { + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL)) == RS_RET_OK) { /* here check if the module is compatible with select features * (currently, we have no such features!) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ conf->actions.nbrActions++; /* one more active action! */ } break; diff --git a/runtime/queue.c b/runtime/queue.c index 8c610b11..8fb734e7 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -81,11 +81,10 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti); -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDestructDisk(qqueue_t *pThis); /* some constants for queuePersist () */ @@ -705,9 +704,13 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) pThis->qType = QUEUETYPE_DIRECT; pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; if(pThis->pqParent != NULL) { DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); pThis->pqParent->bIsDA = 0; @@ -957,13 +960,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) +static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; batch_state_t batchState = BATCH_STATE_RDY; - sbool active = 1; - int i; DEFiRet; //TODO: init batchObj (states _OK and new fields -- CHECK) @@ -983,46 +984,29 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; - singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); - /* delete the batch string params: TODO: create its own "class" for this */ - for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { - free(batchObj.staticActStrings[i]); - } + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti); msgDestruct(&pMsg); RETiRet; } -/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead - * otherwise incured. -- rgerhards, ~2010-06-23 +/* this is called if we do not have a pWti. This currently only happens + * when we are called from a main queue in direct mode. If so, we need + * to obtain a dummy pWti. */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti) +static rsRetVal +qAddDirect(qqueue_t *pThis, msg_t* pMsg) { + wti_t *pWti; DEFiRet; - ASSERT(pThis != NULL); - - /* calling the consumer is quite different here than it is from a worker thread */ - /* we need to provide the consumer's return value back to the caller because in direct - * mode the consumer probably has a lot to convey (which get's lost in the other modes - * because they are asynchronous. But direct mode is deliberately synchronous. - * rgerhards, 2008-02-12 - * We use our knowledge about the batch_t structure below, but without that, we - * pay a too-large performance toll... -- rgerhards, 2009-04-22 - */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL); - + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + iRet = qAddDirectWithWti(pThis, pMsg, pWti); RETiRet; } -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) -{ - return RS_RET_OK; -} - - /* --------------- end type-specific handlers -------------------- */ @@ -1317,7 +1301,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*)) { DEFiRet; qqueue_t *pThis; @@ -1876,7 +1860,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate)); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2098,9 +2083,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; break; } @@ -2679,16 +2668,17 @@ finalize_it: /* now, the same function, but for direct mode */ static rsRetVal -qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti) +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int i; + wti_t *pWti; DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - assert(pMultiSub != NULL); + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); + CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2697,21 +2687,6 @@ finalize_it: /* ------------------------------ END multi-enqueue functions ------------------------------ */ -/* enqueue a new user data element in direct mode - * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better - * code later on (like multi submit!) 2010-06-10 - * Enqueues the new element and awakes worker thread. - */ -rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti) -{ - DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg, pWti); - RETiRet; -} - - /* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ diff --git a/runtime/queue.h b/runtime/queue.h index 8e789ebd..86107d24 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -103,11 +103,10 @@ struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*, wti_t*,int*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*, wti_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 - * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero - * during normal operations and one if the consumer must urgently shut down. + * is pointer to an array of message message pointers) */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -195,14 +194,12 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*)); -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *)); int queueCnfParamsSet(struct nvlst *lst); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst); void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index edf0c593..ae56f08f 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -49,6 +49,7 @@ #define CONF_PROGNAME_BUFSIZE 16 #define CONF_HOSTNAME_BUFSIZE 32 #define CONF_PROP_BUFSIZE 16 /* should be close to sizeof(ptr) or lighly above it */ +#define CONF_IPARAMS_BUFSIZE 16 /* initial size of iparams array in wti (is automatically extended) */ #define CONF_MIN_SIZE_FOR_COMPRESS 60 /* config param: minimum message size to try compression. The smaller * the message, the less likely is any compression gain. We check for * gain before we submit the message. But to do so we still need to diff --git a/runtime/ruleset.c b/runtime/ruleset.c index e39a1889..2e8d1f0f 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -48,6 +48,7 @@ #include "rainerscript.h" #include "srUtils.h" #include "modules.h" +#include "wti.h" #include "dirty.h" /* for main ruleset queue creation */ /* static data */ @@ -68,7 +69,7 @@ static struct cnfparamblk rspblk = /* forward definitions */ static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti); -static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti); +static void scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti); /* ---------- linked-list key handling functions (ruleset) ---------- */ @@ -161,249 +162,89 @@ finalize_it: } -/* This function is similar to processBatch(), but works on a batch that - * contains rules from multiple rulesets. In this case, we can not push - * the whole batch through the ruleset. Instead, we examine it and - * partition it into sub-rulesets which we then push through the system. - * rgerhards, 2010-06-15 - */ -static inline rsRetVal -processBatchMultiRuleset(batch_t *pBatch, wti_t *pWti) -{ - ruleset_t *currRuleset; - batch_t snglRuleBatch; - int i; - int iStart; /* start index of partial batch */ - int iNew; /* index for new (temporary) batch */ - int bHaveUnprocessed; /* do we (still) have unprocessed entries? (loop term predicate) */ - DEFiRet; - - do { - bHaveUnprocessed = 0; - /* search for first unprocessed element */ - for(iStart = 0 ; iStart < pBatch->nElem && pBatch->eltState[iStart] == BATCH_STATE_DISC ; ++iStart) - /* just search, no action */; - if(iStart == pBatch->nElem) - break; /* everything processed */ - - /* prepare temporary batch */ - CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem)); - snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate; - currRuleset = batchElemGetRuleset(pBatch, iStart); - iNew = 0; - for(i = iStart ; i < pBatch->nElem ; ++i) { - if(batchElemGetRuleset(pBatch, i) == currRuleset) { - /* for performance reasons, we copy only those members that we actually need */ - snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg; - snglRuleBatch.eltState[iNew] = pBatch->eltState[i]; - ++iNew; - /* We indicate the element also as done, so it will not be processed again */ - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - bHaveUnprocessed = 1; - } - } - snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ - batchSetSingleRuleset(&snglRuleBatch, 1); - /* process temp batch */ - processBatch(&snglRuleBatch, pWti); - batchFree(&snglRuleBatch); - } while(bHaveUnprocessed == 1); - -finalize_it: - RETiRet; -} - -/* return a new "active" structure for the batch. Free with freeActive(). */ -static inline sbool *newActive(batch_t *pBatch) +static void +execAct(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - return malloc(sizeof(sbool) * batchNumMsgs(pBatch)); - -} -static inline void freeActive(sbool *active) { free(active); } - + if(stmt->d.act->bDisabled) { + DBGPRINTF("action %d died, do NOT execute\n", stmt->d.act->iActionNbr); + goto done; + } -/* for details, see scriptExec() header comment! */ -/* call action for all messages with filter on */ -static rsRetVal -execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) -{ - DEFiRet; -dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); - pBatch->active = active; - stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti); - RETiRet; + DBGPRINTF("executing action %d\n", stmt->d.act->iActionNbr); + stmt->d.act->submitToActQ(stmt->d.act, pWti, pMsg); +done: return; } -static rsRetVal -execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +static void +execSet(struct cnfstmt *stmt, msg_t *pMsg) { - int i; struct var result; - DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg); - msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname, - &result); - varDelete(&result); - } - } - RETiRet; + cnfexprEval(stmt->d.s_set.expr, &result, pMsg); + msgSetJSONFromVar(pMsg, stmt->d.s_set.varname, &result); + varDelete(&result); } -static rsRetVal -execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +static void +execUnset(struct cnfstmt *stmt, msg_t *pMsg) { - int i; - DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - msgDelJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname); - } - } - RETiRet; + msgDelJSON(pMsg, stmt->d.s_unset.varname); } -/* for details, see scriptExec() header comment! */ -/* "stop" simply discards the filtered items - it's just a (hopefully more intuitive - * shortcut for users. - */ -static rsRetVal -execStop(batch_t *pBatch, sbool *active) -{ - int i; - DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } - } - RETiRet; -} static rsRetVal -execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) +execCall(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - msg_t *pMsg; - int i; DEFiRet; if(stmt->d.s_call.ruleset == NULL) { - scriptExec(stmt->d.s_call.stmt, pBatch, active, pWti); + scriptExec(stmt->d.s_call.stmt, pMsg, pWti); } else { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - CHKmalloc(pMsg = MsgDup((msg_t*) pBatch->pElem[i].pMsg)); - DBGPRINTF("CALL: forwarding message %d to async ruleset %p\n", - i, stmt->d.s_call.ruleset->pQueue); - MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - MsgSetRuleset(pMsg, stmt->d.s_call.ruleset); - /* Note: we intentionally use submitMsg2() here, as we process messages - * that were already run through the rate-limiter. - */ - submitMsg2(pMsg); - } + CHKmalloc(pMsg = MsgDup((msg_t*) pMsg)); + DBGPRINTF("CALL: forwarding message to async ruleset %p\n", + stmt->d.s_call.ruleset->pQueue); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(pMsg, stmt->d.s_call.ruleset); + /* Note: we intentionally use submitMsg2() here, as we process messages + * that were already run through the rate-limiter. + */ + submitMsg2(pMsg); } finalize_it: RETiRet; } -/* for details, see scriptExec() header comment! */ -// save current filter, evaluate new one -// perform then (if any message) -// if ELSE given: -// set new filter, inverted -// perform else (if any messages) -static rsRetVal -execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) +static void +execIf(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *newAct; - int i; sbool bRet; - sbool allInactive = 1; - DEFiRet; - newAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - FINALIZE; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - if(active == NULL || active[i]) { - bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg); - allInactive = 0; - } else - bRet = 0; - newAct[i] = bRet; - DBGPRINTF("batch: item %d: expr eval: %d\n", i, bRet); - } - - if(allInactive) { - DBGPRINTF("execIf: all batch elements are inactive, holding execution\n"); - freeActive(newAct); - FINALIZE; - } - - if(stmt->d.s_if.t_then != NULL) { - scriptExec(stmt->d.s_if.t_then, pBatch, newAct, pWti); - } - if(stmt->d.s_if.t_else != NULL) { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - FINALIZE; - if(pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) - newAct[i] = !newAct[i]; - } - scriptExec(stmt->d.s_if.t_else, pBatch, newAct, pWti); + bRet = cnfexprEvalBool(stmt->d.s_if.expr, pMsg); + DBGPRINTF("if condition result is %d\n", bRet); + if(bRet) { + if(stmt->d.s_if.t_then != NULL) + scriptExec(stmt->d.s_if.t_then, pMsg, pWti); + } else { + if(stmt->d.s_if.t_else != NULL) + scriptExec(stmt->d.s_if.t_else, pMsg, pWti); } - freeActive(newAct); -finalize_it: - RETiRet; } -/* for details, see scriptExec() header comment! */ static void -execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) +execPRIFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *newAct; - msg_t *pMsg; int bRet; - int i; - newAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - pMsg = pBatch->pElem[i].pMsg; - if(active == NULL || active[i]) { - if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) || - ((stmt->d.s_prifilt.pmask[pMsg->iFacility] - & (1<<pMsg->iSeverity)) == 0) ) - bRet = 0; - else - bRet = 1; - } else - bRet = 0; - newAct[i] = bRet; - DBGPRINTF("batch: item %d PRIFILT %d\n", i, newAct[i]); - } - - if(stmt->d.s_prifilt.t_then != NULL) { - scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct, pWti); - } - if(stmt->d.s_prifilt.t_else != NULL) { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) - newAct[i] = !newAct[i]; - } - scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct, pWti); + if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) || + ((stmt->d.s_prifilt.pmask[pMsg->iFacility] + & (1<<pMsg->iSeverity)) == 0) ) + bRet = 0; + else + bRet = 1; + + DBGPRINTF("PRIFILT condition result is %d\n", bRet); + if(bRet) { + if(stmt->d.s_prifilt.t_then != NULL) + scriptExec(stmt->d.s_prifilt.t_then, pMsg, pWti); + } else { + if(stmt->d.s_prifilt.t_else != NULL) + scriptExec(stmt->d.s_prifilt.t_else, pMsg, pWti); } - freeActive(newAct); } @@ -498,79 +339,63 @@ done: return bRet; } -/* for details, see scriptExec() header comment! */ static void -execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) +execPROPFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *thenAct; sbool bRet; - int i; - thenAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - if(active == NULL || active[i]) { - bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg); - } else - bRet = 0; - thenAct[i] = bRet; - DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]); - } - scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct, pWti); - freeActive(thenAct); + bRet = evalPROPFILT(stmt, pMsg); + DBGPRINTF("PROPFILT condition result is %d\n", bRet); + if(bRet) + scriptExec(stmt->d.s_propfilt.t_then, pMsg, pWti); } /* The rainerscript execution engine. It is debatable if that would be better * contained in grammer/rainerscript.c, HOWEVER, that file focusses primarily * on the parsing and object creation part. So as an actual executor, it is * better suited here. - * param active: if NULL, all messages are active (to be processed), if non-null - * this is an array of the same size as the batch. If 1, the message - * is to be processed, otherwise not. - * NOTE: this function must receive batches which contain a single ruleset ONLY! * rgerhards, 2012-09-04 */ -static rsRetVal -scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti) +static void +scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti) { - DEFiRet; struct cnfstmt *stmt; for(stmt = root ; stmt != NULL ; stmt = stmt->next) { + if(*pWti->pbShutdownImmediate) { + DBGPRINTF("scriptExec: ShutdownImmediate set, " + "force terminating\n"); + goto done; + } if(Debug) { - dbgprintf("scriptExec: batch of %d elements, active %p, active[0]:%d\n", - batchNumMsgs(pBatch), active, (active == NULL ? 1 : active[0])); cnfstmtPrintOnly(stmt, 2, 0); } switch(stmt->nodetype) { case S_NOP: break; case S_STOP: - execStop(pBatch, active); + goto done; break; case S_ACT: - execAct(stmt, pBatch, active, pWti); + execAct(stmt, pMsg, pWti); break; case S_SET: - execSet(stmt, pBatch, active); + execSet(stmt, pMsg); break; case S_UNSET: - execUnset(stmt, pBatch, active); + execUnset(stmt, pMsg); break; case S_CALL: - execCall(stmt, pBatch, active, pWti); + execCall(stmt, pMsg, pWti); break; case S_IF: - execIf(stmt, pBatch, active, pWti); + execIf(stmt, pMsg, pWti); break; case S_PRIFILT: - execPRIFILT(stmt, pBatch, active, pWti); + execPRIFILT(stmt, pMsg, pWti); break; case S_PROPFILT: - execPROPFILT(stmt, pBatch, active, pWti); + execPROPFILT(stmt, pMsg, pWti); break; default: dbgprintf("error: unknown stmt type %u during exec\n", @@ -578,36 +403,42 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti) break; } } - RETiRet; +done: return; } /* Process (consume) a batch of messages. Calls the actions configured. - * If the whole batch uses a singel ruleset, we can process the batch as - * a whole. Otherwise, we need to process it slower, on a message-by-message - * basis (what can be optimized to a per-ruleset basis) - * rgerhards, 2005-10-13 + * This is called by MAIN queues. */ static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti) { - ruleset_t *pThis; + int i; + msg_t *pMsg; + ruleset_t *pRuleset; DEFiRet; - assert(pBatch != NULL); - - DBGPRINTF("processBatch: batch of %d elements must be processed\n", pBatch->nElem); - if(pBatch->bSingleRuleset) { - pThis = batchGetRuleset(pBatch); - if(pThis == NULL) - pThis = ourConf->rulesets.pDflt; - ISOBJ_TYPE_assert(pThis, ruleset); - CHKiRet(scriptExec(pThis->root, pBatch, NULL, pWti)); - } else { - CHKiRet(processBatchMultiRuleset(pBatch, pWti)); + + DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem); + + wtiResetExecState(pWti, pBatch); + + /* execution phase */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) { + pMsg = pBatch->pElem[i].pMsg; + DBGPRINTF("processBATCH: next msg %d: %.128s\n", i, pMsg->pszRawMsg); + pRuleset = (pMsg->pRuleset == NULL) ? ourConf->rulesets.pDflt : pMsg->pRuleset; + scriptExec(pRuleset->root, pMsg, pWti); + // TODO: think if we need a return state of scriptExec - most probably + // the answer is "no", as we need to process the batch in any case! + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); } -finalize_it: - DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet); + /* commit phase */ + dbgprintf("END batch execution phase, entering to commit phase\n"); + actionCommitAllDirect(pWti); + + DBGPRINTF("processBATCH: batch of %d elements has been processed\n", pBatch->nElem); RETiRet; } diff --git a/runtime/wti.c b/runtime/wti.c index c2077a51..6b5d82c8 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -51,6 +51,8 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) +pthread_key_t thrd_wti_key; + /* forward-definitions */ /* methods */ @@ -208,6 +210,11 @@ wtiConstructFinalize(wti_t *pThis) /* must use calloc as we need zero-init */ CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t))); + if(pThis->pWtp == NULL) { + dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n"); + FINALIZE; + } + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); @@ -394,6 +401,33 @@ finalize_it: } +/* This function returns (and creates if necessary) a dummy wti suitable + * for use by the rule engine. It is intended to be used for direct-mode + * main queues (folks, don't do that!). Once created, data is stored in + * thread-specific storage. + * Note: we do NOT do error checking -- if this functions fails, all the + * rest will fail as well... (also, it will only fail under OOM, so...). + * Memleak: we leak pWti's when run in direct mode. However, this is only + * a cosmetic leak, as we need them until all inputs are terminated, + * what means essentially until rsyslog itself is terminated. So we + * don't care -- it's just not nice in valgrind, but that's it. + */ +wti_t * +wtiGetDummy(void) +{ + wti_t *pWti; + + pWti = (wti_t*) pthread_getspecific(thrd_wti_key); + if(pWti == NULL) { + wtiConstruct(&pWti); + wtiConstructFinalize(pWti); + if(pthread_setspecific(thrd_wti_key, pWti) != 0) { + DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n"); + } + } + return pWti; +} + /* dummy */ rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } @@ -403,6 +437,7 @@ BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */ CODESTARTObjClassExit(nsdsel_gtls) /* release objects we no longer need */ objRelease(glbl, CORE_COMPONENT); + pthread_key_delete(thrd_wti_key); ENDObjClassExit(wti) @@ -411,8 +446,14 @@ ENDObjClassExit(wti) * rgerhards, 2008-01-09 */ BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */ + int r; /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + r = pthread_key_create(&thrd_wti_key, NULL); + if(r != 0) { + dbgprintf("wti.c: pthread_key_create failed\n"); + iRet = RS_RET_ERR; + } ENDObjClassInit(wti) /* vi:set ai: diff --git a/runtime/wti.h b/runtime/wti.h index 8c43c22e..adc7897c 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -1,6 +1,6 @@ /* Definition of the worker thread instance (wti) class. * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2013 Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -26,11 +26,45 @@ #include "wtp.h" #include "obj.h" #include "batch.h" +#include "action.h" +#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */ +#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */ +#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */ +#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */ +#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */ +/* note: 3 bit bit field --> highest value is 7! */ + +/* The following structure defines immutable parameters which need to + * be passed as action parameters. Note that the current implementation + * does NOT focus on performance, but on a simple PoC in order to get + * things going. TODO: Once it works, revisit this code and think about + * an array implementation. We also need to support other passing modes + * as well. -- gerhards, 2013-11-04 + */ +typedef struct actWrkrIParams { + int msgFlags; + /* following are caches to save allocs if not absolutely necessary */ + uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ + /* a cache to save malloc(), if not absolutely necessary */ + unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + /* and the same for the message length (if used) */ + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; +} actWrkrIParams_t; + typedef struct actWrkrInfo { action_t *pAction; void *actWrkrData; + uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */ + int iNbrResRtry; /* number of retries since last suspend */ + struct { + unsigned actState : 3; + } flags; + actWrkrIParams_t *iparams; + int currIParam; + int maxIParams; /* current max */ + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */ } actWrkrInfo_t; /* the worker thread instance class */ @@ -39,12 +73,20 @@ struct wti_s { pthread_t thrdID; /* thread ID */ int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */ sbool bAlwaysRunning; /* should this thread always run? */ + int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */ pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); + struct { + uint8_t bPrevWasSuspended; + uint8_t bDoAutoCommit; /* do a commit after each message + * this is usually set for batches with 0 element, but may + * also be added as a user-selectable option (not implemented yet) + */ + } execState; /* state for the execution engine */ }; @@ -59,8 +101,96 @@ rsRetVal wtiSetAlwaysRunning(wti_t *pThis); rsRetVal wtiSetState(wti_t *pThis, sbool bNew); rsRetVal wtiWakeupThrd(wti_t *pThis); sbool wtiGetState(wti_t *pThis); +wti_t *wtiGetDummy(void); PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); +static inline uint8_t +getActionStateByNbr(wti_t *pWti, int iActNbr) +{ + return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState); +} + +static inline uint8_t +getActionState(wti_t *pWti, action_t *pAction) +{ + return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState); +} + +static inline void +setActionState(wti_t *pWti, action_t *pAction, uint8_t newState) +{ + pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState; +} + +static inline uint16_t +getActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow); +} + +static inline void +setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val; +} + +static inline void +incActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++; +} + +static inline int +getActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry); +} + +static inline void +setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val; +} + +static inline void +incActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++; +} + +static inline rsRetVal +wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams) +{ + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparams; + int newMax; + DEFiRet; + + wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + if(wrkrInfo->currIParam == wrkrInfo->maxIParams) { + /* we need to extend */ + dbgprintf("DDDD: extending iparams, max %d\n", wrkrInfo->maxIParams); + newMax = (wrkrInfo->maxIParams == 0) ? CONF_IPARAMS_BUFSIZE : 2 * wrkrInfo->maxIParams; + CHKmalloc(iparams = realloc(wrkrInfo->iparams, sizeof(actWrkrIParams_t) * newMax)); + wrkrInfo->iparams = iparams; + wrkrInfo->maxIParams = newMax; + } +dbgprintf("DDDD: adding param %d for action %d\n", wrkrInfo->currIParam, pAction->iActionNbr); + iparams = wrkrInfo->iparams + wrkrInfo->currIParam; + memset(iparams, 0, sizeof(actWrkrIParams_t)); + *piparams = iparams; + ++wrkrInfo->currIParam; + +finalize_it: + RETiRet; +} + +static inline void +wtiResetExecState(wti_t *pWti, batch_t *pBatch) +{ + pWti->execState.bPrevWasSuspended = 0; + pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1); +} #endif /* #ifndef WTI_H_INCLUDED */ diff --git a/tools/syslogd.c b/tools/syslogd.c index 7597b05d..7543d338 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -497,24 +497,19 @@ finalize_it: * rgerhards, 2010-06-09 */ static inline rsRetVal -preprocessBatch(batch_t *pBatch) { +preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) { prop_t *ip; prop_t *fqdn; prop_t *localName; prop_t *propFromHost = NULL; prop_t *propFromHostIP = NULL; - int bSingleRuleset; - ruleset_t *batchRuleset; /* the ruleset used for all message inside the batch, if there is a single one */ int bIsPermitted; msg_t *pMsg; int i; rsRetVal localRet; DEFiRet; - bSingleRuleset = 1; - batchRuleset = (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL; - - for(i = 0 ; i < pBatch->nElem && !*(pBatch->pbShutdownImmediate) ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { pMsg = pBatch->pElem[i].pMsg; if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); @@ -539,12 +534,8 @@ preprocessBatch(batch_t *pBatch) { pBatch->eltState[i] = BATCH_STATE_DISC; } } - if(pMsg->pRuleset != batchRuleset) - bSingleRuleset = 0; } - batchSetSingleRuleset(pBatch, bSingleRuleset); - finalize_it: if(propFromHost != NULL) prop.Destruct(&propFromHost); @@ -560,17 +551,16 @@ finalize_it: * for the main queue. */ static rsRetVal -msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) +msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti) { DEFiRet; assert(pBatch != NULL); - pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */ - preprocessBatch(pBatch); + preprocessBatch(pBatch, pWti->pbShutdownImmediate); ruleset.ProcessBatch(pBatch, pWti); //TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we //do not have this yet and so we emulate -- 2010-06-10 int i; - for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pWti->pbShutdownImmediate ; i++) { pBatch->eltState[i] = BATCH_STATE_COMM; } RETiRet; |