diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 435 |
1 files changed, 159 insertions, 276 deletions
@@ -12,11 +12,11 @@ * necessary to triple-check that everything works well in *all* modes. * The different modes (and calling sequence) are: * - * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval + * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval * - doSubmitToActionQComplexBatch * - helperSubmitToActionQComplexBatch * - doActionCallAction - * handles duplicate message processing, but in essence calls + * handles mark message reduction, but in essence calls * - actionWriteToAction * - qqueueEnqObj * (now queue engine processing) @@ -188,7 +188,7 @@ static struct cnfparamdescr cnfparamdescr[] = { { "action.execonlyeverynthtime", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtime */ { "action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtimetimeout */ { "action.execonlyonceeveryinterval", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyonceeveryinterval */ - { "action.execonlywhenpreviousissuspended", eCmdHdlrInt, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */ + { "action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */ { "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */ { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */ { "action.resumeinterval", eCmdHdlrInt, 0 } @@ -289,6 +289,11 @@ rsRetVal actionDestruct(action_t *pThis) DEFiRet; ASSERT(pThis != NULL); + if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { + /* discard actions will be optimized out */ + FINALIZE; + } + if(pThis->pQueue != NULL) { qqueueDestruct(&pThis->pQueue); } @@ -302,16 +307,13 @@ rsRetVal actionDestruct(action_t *pThis) if(pThis->pMod != NULL) pThis->pMod->freeInstance(pThis->pModData); - if(pThis->f_pMsg != NULL) - msgDestruct(&pThis->f_pMsg); - pthread_mutex_destroy(&pThis->mutAction); pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); +finalize_it: d_free(pThis); - RETiRet; } @@ -362,6 +364,10 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) ASSERT(pThis != NULL); + if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { + /* discard actions will be optimized out */ + FINALIZE; + } /* generate a friendly name for us action stats */ if(pThis->pszName == NULL) { snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr); @@ -401,16 +407,11 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 */ if( pThis->iExecEveryNthOccur > 1 - || pThis->f_ReduceRepeated || pThis->iSecsExecOnceInterval ) { DBGPRINTF("info: firehose mode disabled for action because " - "iExecEveryNthOccur=%d, " - "ReduceRepeated=%d, " - "iSecsExecOnceInterval=%d\n", - pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, - pThis->iSecsExecOnceInterval - ); + "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); pThis->submitToActQ = doSubmitToActionQComplexBatch; } else if(pThis->bWriteAllMarkMsgs == RSFALSE) { /* nearly full-speed submission mode, default case */ @@ -429,7 +430,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); - qqueueSetpUsr(pThis->pQueue, pThis); + qqueueSetpAction(pThis->pQueue, pThis); if(queueParams == NULL) { /* use legacy params? */ /* ... set some properties ... */ @@ -604,13 +605,17 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis, time_t ttNow) +static inline void actionSuspend(action_t *pThis) { - if(ttNow == NO_TIME_PROVIDED) - datetime.GetTime(&ttNow); + time_t ttNow; + + /* note: we can NOT use a cached timestamp, as time may have evolved + * since caching, and this would break logic (and it actually did so!) + */ + datetime.GetTime(&ttNow); pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); actionSetState(pThis, ACT_STATE_SUSP); - DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); + DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -630,7 +635,7 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate) +actionDoRetry(action_t *pThis, int *pbShutdownImmediate) { int iRetries; int iSleepPeriod; @@ -641,24 +646,28 @@ actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate) iRetries = 0; while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); iRet = pThis->pMod->tryResume(pThis->pModData); - if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) { + DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); + if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { bTreatOKasSusp = 1; pThis->iResumeOKinRow = 0; } else { bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { + DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); actionSetState(pThis, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ + DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", + pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis, ttNow); + actionSuspend(pThis); } else { ++pThis->iNbrResRtry; ++iRetries; iSleepPeriod = pThis->iResumeInterval; - ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ srSleep(iSleepPeriod, 0); if(*pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); @@ -705,7 +714,7 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) if(pThis->eState == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, ttNow, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); } if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { @@ -765,7 +774,6 @@ rsRetVal actionDbgPrint(action_t *pThis) pThis->pMod->dbgPrintInstInfo(pThis->pModData); dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); - dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); if(pThis->eState == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", @@ -792,7 +800,8 @@ rsRetVal actionDbgPrint(action_t *pThis) /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ -static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem) +static rsRetVal +prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) { int i; msg_t *pMsg; @@ -802,23 +811,23 @@ static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem) ASSERT(pAction != NULL); ASSERT(pElem != NULL); - pMsg = (msg_t*) pElem->pUsrp; + pMsg = pElem->pMsg; /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), - &pElem->staticLenStrings[i])); + &pElem->staticLenStrings[i], ttNow)); pElem->staticActParams[i] = pElem->staticActStrings[i]; break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]))); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); break; case ACT_MSG_PASSING: pElem->staticActParams[i] = (void*) pMsg; break; case ACT_JSON_PASSING: - CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json)); + CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); pElem->staticActParams[i] = (void*) json; break; default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", @@ -851,9 +860,12 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) ASSERT(pAction != NULL); + if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) + goto done; /* we need to do nothing with these types! */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { pElem = &(pBatch->pElem[i]); - if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + if(batchIsValidElem(pBatch, i)) { switch(pAction->eParamPassing) { case ACT_ARRAY_PASSING: ppMsgs = (uchar***) pElem->staticActParams; @@ -870,19 +882,6 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) } } break; - case ACT_STRING_PASSING: - case ACT_MSG_PASSING: - /* nothing to do in that case */ - /* TODO ... and yet we do something ;) This is considered not - * really needed, but I was not bold enough to remove that while - * fixing the stable. It should be removed in a devel version - * soon (I really don't see a reason why we would need it). - * rgerhards, 2010-12-16 - */ - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - ((uchar**)pElem->staticActParams)[j] = NULL; - } - break; case ACT_JSON_PASSING: for(j = 0 ; j < pAction->iNumTpls ; ++j) { json_object_put((struct json_object*) @@ -890,11 +889,15 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) pElem->staticActParams[j] = NULL; } break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* can never happen, just to keep compiler happy! */ + break; } } } - RETiRet; +done: RETiRet; } @@ -958,6 +961,8 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd ISOBJ_TYPE_assert(pMsg, msg); CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(pThis->eState == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); @@ -1051,9 +1056,8 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) /* NOTE: do NOT extend the filter below! Anything else must be done on the * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 */ - if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC) { - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + if(batchIsValidElem(pBatch, i)) { + pMsg = pBatch->pElem[i].pMsg; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, pBatch->pbShutdownImmediate); DBGPRINTF("action %p call returned %d\n", pAction, localRet); @@ -1076,11 +1080,11 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) ++iCommittedUpTo; //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } - pBatch->pElem[i].state = BATCH_STATE_SUB; + pBatch->eltState[i] = BATCH_STATE_SUB; } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->pElem[i].state = BATCH_STATE_SUB; + pBatch->eltState[i] = BATCH_STATE_SUB; } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->pElem[i].state = BATCH_STATE_DISC; + pBatch->eltState[i] = BATCH_STATE_DISC; } else { dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", localRet, *pnElem, iCommittedUpTo); @@ -1117,6 +1121,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) assert(pBatch != NULL); + DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); wasDoneTo = pBatch->iDoneUpTo; bDone = 0; do { @@ -1138,13 +1143,14 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) || localRet == RS_RET_DEFER_COMMIT) { bDone = 1; } else if(localRet == RS_RET_SUSPENDED) { - ; /* do nothing, this will retry the full batch */ + DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); + /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { /* in this case, everything not yet committed is BAD */ for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->pElem[i].state != BATCH_STATE_DISC - && pBatch->pElem[i].state != BATCH_STATE_COMM ) { - pBatch->pElem[i].state = BATCH_STATE_BAD; + if( pBatch->eltState[i] != BATCH_STATE_DISC + && pBatch->eltState[i] != BATCH_STATE_COMM ) { + pBatch->eltState[i] = BATCH_STATE_BAD; pBatch->pElem[i].bPrevWasSuspended = 1; STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); } @@ -1173,6 +1179,29 @@ finalize_it: } +/* copy "active" array of batch, as we need to modify it. The caller + * must make sure the new array is freed and the orginal batch + * pointer is restored (thus the caller must save it). If active + * is currently NULL, this is properly handled. + * Note: the batches active pointer is modified, so it must be + * saved BEFORE calling this function! + * rgerhards, 2012-09-12 + */ +static rsRetVal +copyActive(batch_t *pBatch) +{ + sbool *active; + DEFiRet; + + CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); + if(pBatch->active == NULL) + memset(active, 1, batchNumMsgs(pBatch)); + else + memcpy(active, pBatch->active, batchNumMsgs(pBatch)); + pBatch->active = active; +finalize_it: + RETiRet; +} /* The following function prepares a batch for processing, that it is * reinitializes batch states, generates strings and does everything else @@ -1183,19 +1212,29 @@ finalize_it: * rgerhards, 2010-06-14 */ static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch) +prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) { int i; batch_obj_t *pElem; + struct syslogTime ttNow; DEFiRet; + /* indicate we have not yet read the date */ + ttNow.year = 0; + pBatch->iDoneUpTo = 0; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { pElem = &(pBatch->pElem[i]); - if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { - pElem->state = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem) != RS_RET_OK) - pElem->bFilterOK = RSFALSE; + if(batchIsValidElem(pBatch, i)) { + pBatch->eltState[i] = BATCH_STATE_RDY; + if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { + /* make sure we have our copy of "active" array */ + if(!*bMustRestoreActivePtr) { + *activeSave = pBatch->active; + copyActive(pBatch); + } + pBatch->active[i] = RSFALSE; + } } } RETiRet; @@ -1228,14 +1267,18 @@ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { int *pbShutdownImmdtSave; + sbool *activeSave; + int bMustRestoreActivePtr = 0; rsRetVal localRet; DEFiRet; assert(pBatch != NULL); - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; - CHKiRet(prepareBatch(pAction, pBatch)); + if(pbShutdownImmediate != NULL) { + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; + } + CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); /* We now must guard the output module against execution by multiple threads. The * plugin interface specifies that output modules must not be thread-safe (except @@ -1258,9 +1301,15 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) if(iRet == RS_RET_OK) iRet = localRet; + + if(bMustRestoreActivePtr) { + free(pBatch->active); + pBatch->active = activeSave; + } finalize_it: - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + if(pbShutdownImmediate != NULL) + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -1343,9 +1392,9 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); else - iRet = qqueueEnqObj(pAction->pQueue, eFLOWCTL_NO_DELAY, (void*) MsgAddRef(pMsg)); + iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); finalize_it: RETiRet; @@ -1360,14 +1409,10 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction) +actionWriteToAction(action_t *pAction, msg_t *pMsg) { - msg_t *pMsgSave; /* to save current message pointer, necessary to restore - it in case it needs to be updated (e.g. repeated msgs) */ DEFiRet; - pMsgSave = NULL; /* indicate message poiner not saved */ - /* first, we check if the action should actually be called. The action-specific * $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth * time. So we need to check if we need to drop the (otherwise perfectly executable) @@ -1394,43 +1439,6 @@ actionWriteToAction(action_t *pAction) } } - /* then check if this is a regular message or the repeation of - * a previous message. If so, we need to change the message text - * to "last message repeated n times" and then go ahead and write - * it. Please note that we can not modify the message object, because - * that would update it in other selectors as well. As such, we first - * need to create a local copy of the message, which we than can update. - * rgerhards, 2007-07-10 - */ - if(pAction->f_prevcount > 1) { - msg_t *pMsg; - size_t lenRepMsg; - uchar szRepMsg[1024]; - - if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) { - /* it failed - nothing we can do against it... */ - DBGPRINTF("Message duplication failed, dropping repeat message.\n"); - ABORT_FINALIZE(RS_RET_ERR); - } - - if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times", - pAction->f_prevcount); - } else { - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]", - pAction->f_prevcount, getMSG(pAction->f_pMsg)); - } - - /* We now need to update the other message properties. Please note that digital - * signatures inside the message are also invalidated. - */ - datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime)); - memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime)); - MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg); - pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */ - pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ - } - DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); /* now check if we need to drop the message because otherwise the action would be too @@ -1451,31 +1459,14 @@ actionWriteToAction(action_t *pAction) /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) * rgerhards, 2008-09-17 */ pAction->tLastExec = getActNow(pAction); /* re-init time flags */ - pAction->f_time = pAction->f_pMsg->ttGenTime; + pAction->f_time = pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pAction->f_pMsg); - - if(iRet == RS_RET_OK) - pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ + iRet = doSubmitToActionQ(pAction, pMsg); finalize_it: - if(pMsgSave != NULL) { - /* we had saved the original message pointer. That was - * done because we needed to create a temporary one - * (most often for "message repeated n time" handling). If so, - * we need to restore the original one now, so that procesing - * can continue as normal. We also need to discard the temporary - * one, as we do not like memory leaks ;) Please note that the original - * message object will be discarded by our callers, so this is nothing - * of our business. rgerhards, 2007-07-10 - */ - msgDestruct(&pAction->f_pMsg); - pAction->f_pMsg = pMsgSave; /* restore it */ - } - RETiRet; } @@ -1489,7 +1480,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) msg_t *pMsg; DEFiRet; - pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp); + pMsg = pBatch->pElem[idxBtch].pMsg; pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1498,43 +1489,8 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) ABORT_FINALIZE(RS_RET_OK); } - /* suppress duplicate messages */ - if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL && - (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) && - !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && - !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) && - !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) && - !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) { - pAction->f_prevcount++; - DBGPRINTF("msg repeated %d times, %ld sec of %d.\n", - pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time, - repeatinterval[pAction->f_repeatcount]); - /* use current message, so we have the new timestamp (means we need to discard previous one) */ - msgDestruct(&pAction->f_pMsg); - pAction->f_pMsg = MsgAddRef(pMsg); - /* If domark would have logged this by now, flush it now (so we don't hold - * isolated messages), but back off so we'll flush less often in the future. - */ - if(getActNow(pAction) > REPEATTIME(pAction)) { - iRet = actionWriteToAction(pAction); - BACKOFF(pAction); - } - } else {/* new message, save it */ - /* first check if we have a previous message stored - * if so, emit and then discard it first - */ - if(pAction->f_pMsg != NULL) { - if(pAction->f_prevcount > 0) - actionWriteToAction(pAction); - /* we do not care about iRet above - I think it's right but if we have - * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01 - */ - msgDestruct(&pAction->f_pMsg); - } - pAction->f_pMsg = MsgAddRef(pMsg); - /* call the output driver */ - iRet = actionWriteToAction(pAction); - } + /* call the output driver */ + iRet = actionWriteToAction(pAction, pMsg); finalize_it: /* we need to update the batch to handle failover processing correctly */ @@ -1564,7 +1520,8 @@ DEFFUNC_llExecFunc(doActivateActions) } actionDisable(pThis); } - DBGPRINTF("Action %p: queue %p started\n", pThis, pThis->pQueue); + DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), + pThis, pThis->pQueue); ENDfunc return RS_RET_OK; /* we ignore errors, we can not do anything either way */ } @@ -1598,22 +1555,15 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) time_t now = 0; time_t lastAct; int i; - int bModifiedFilter; - sbool FilterSave[1024]; - sbool *pFilterSave; + sbool *activeSave; DEFiRet; - if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { - pFilterSave = FilterSave; - } else { - CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - } + activeSave = pBatch->active; + copyActive(pBatch); - bModifiedFilter = 0; for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(!pBatch->pElem[i].bFilterOK) + if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) continue; - pFilterSave[i] = pBatch->pElem[i].bFilterOK; if(now == 0) { now = datetime.GetTime(NULL); /* good time call - the only one done */ } @@ -1622,17 +1572,17 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) * also faster ;) -- rgerhards, 2008-09-17 */ do { lastAct = pAction->f_time; - if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { + if(pBatch->pElem[i].pMsg->msgFlags & MARK) { if((now - lastAct) < MarkInterval / 2) { - pBatch->pElem[i].bFilterOK = 0; - bModifiedFilter = 1; - DBGPRINTF("action was recently called, ignoring mark message\n"); + pBatch->active[i] = 0; + DBGPRINTF("batch item %d: action was recently called, ignoring " + "mark message\n", i); break; /* do not update timestamp for non-written mark messages */ } } } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->pElem[i].bFilterOK) { + pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); + if(pBatch->active[i]) { DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", i, module.GetStateName(pAction->pMod)); } @@ -1640,17 +1590,8 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) iRet = doSubmitToActionQBatch(pAction, pBatch); - if(bModifiedFilter) { - /* in this case, we need to restore previous state */ - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - /* note: clang static code analyzer reports a false positive below */ - pBatch->pElem[i].bFilterOK = pFilterSave[i]; - } - } - -finalize_it: - if(pFilterSave != FilterSave) - free(pFilterSave); + free(pBatch->active); + pBatch->active = activeSave; RETiRet; } @@ -1660,8 +1601,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch) { int i; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC) { + if( batchIsValidElem(pBatch, i)) { STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); } } @@ -1675,18 +1615,13 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch) static inline rsRetVal doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) { - sbool FilterSave[1024]; - sbool *pFilterSave; sbool bNeedSubmit; - sbool bModifiedFilter; + sbool *activeSave; int i; DEFiRet; - if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { - pFilterSave = FilterSave; - } else { - CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - } + activeSave = pBatch->active; + copyActive(pBatch); /* note: for direct mode, we need to adjust the filter property. For non-direct * this is not necessary, because in that case we enqueue only what actually needs @@ -1694,37 +1629,25 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) */ if(pAction->bExecWhenPrevSusp) { bNeedSubmit = 0; - bModifiedFilter = 0; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pFilterSave[i] = pBatch->pElem[i].bFilterOK; if(!pBatch->pElem[i].bPrevWasSuspended) { - DBGPRINTF("action enq stage: change bFilterOK to 0 due to " + DBGPRINTF("action enq stage: change active to 0 due to " "failover case in elem %d\n", i); - pBatch->pElem[i].bFilterOK = 0; - bModifiedFilter = 1; + pBatch->active[i] = 0; } - if(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC) { + if(batchIsValidElem(pBatch, i)) { STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); bNeedSubmit = 1; } - DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); } if(bNeedSubmit) { /* note: stats were already computed above */ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } else { - DBGPRINTF("no need to submit batch, all bFilterOK==0 or discarded\n"); - } - if(bModifiedFilter) { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - /* note: clang static code analyzer reports a false positive below */ - pBatch->pElem[i].bFilterOK = pFilterSave[i]; - } + DBGPRINTF("no need to submit batch, all invalid\n"); } } else { if(GatherStats) @@ -1732,7 +1655,8 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } -finalize_it: + free(pBatch->active); + pBatch->active = activeSave; RETiRet; } @@ -1755,13 +1679,12 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) * TODO: optimize this, we may do at least a multi-submit! */ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC + if( batchIsValidElem(pBatch, i) && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); } } } @@ -1784,11 +1707,10 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) DBGPRINTF("Called action %p (complex case), logging to %s\n", pAction, module.GetStateName(pAction->pMod)); for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC + if( batchIsValidElem(pBatch, i) && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { doActionCallAction(pAction, pBatch, i); } @@ -1858,7 +1780,6 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) } - /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. * Note: this function pulls global data that specifies action config state. @@ -1923,7 +1844,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, && (pAction->ppTpl[i] = tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) { snprintf(errMsg, sizeof(errMsg) / sizeof(char), - " Could not find template '%s' - action disabled\n", + " Could not find template '%s' - action disabled", pTplName); errno = 0; errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg); @@ -1954,17 +1875,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* now check if the module is compatible with select features */ - if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) { - pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; - } else { - DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); - pAction->f_ReduceRepeated = 0; - } + /* check if the module is compatible with select features (currently no such features exist) */ pAction->eState = ACT_STATE_RDY; /* action is enabled */ if(bSuspended) - actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ + actionSuspend(pAction); CHKiRet(actionConstructFinalize(pAction, queueParams)); @@ -2060,13 +1975,8 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { - /* now check if the module is compatible with select features */ - if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) - pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; - else { - DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); - pAction->f_ReduceRepeated = 0; - } + /* check if the module is compatible with select features + * (currently no such features exist) */ pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ } @@ -2078,33 +1988,6 @@ finalize_it: RETiRet; } - -/* Process a rsyslog v6 action config object (the now-primary config method). - * rgerhards, 2011-07-19 - */ -rsRetVal -actionProcessCnf(struct cnfobj __attribute__((unused)) *o) -{ - DEFiRet; -#if 0 /* we need to check if we actually need this functionality -- later! */ -// This is for STAND-ALONE actions at the conf file TOP level - struct cnfparamvals *paramvals; - - paramvals = nvlstGetParams(o->nvlst, &pblk, NULL); - if(paramvals == NULL) { - iRet = RS_RET_ERR; - goto finalize_it; - } - DBGPRINTF("action param blk after actionProcessCnf:\n"); - cnfparamsPrint(&pblk, paramvals); - - /* now find module to activate */ -finalize_it: -#endif - RETiRet; -} - - /* TODO: we are not yet a real object, the ClassInit here just looks like it is.. */ rsRetVal actionClassInit(void) |