diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 470 |
1 files changed, 325 insertions, 145 deletions
@@ -46,11 +46,15 @@ #include "wti.h" #include "datetime.h" #include "unicode-helper.h" +#include "atomic.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -187,7 +191,6 @@ actionResetQueueParams(void) */ rsRetVal actionDestruct(action_t *pThis) { - int i; DEFiRet; ASSERT(pThis != NULL); @@ -206,35 +209,6 @@ rsRetVal actionDestruct(action_t *pThis) d_free(pThis->pszName); d_free(pThis->ppTpl); - /* message ptr cleanup */ - for(i = 0 ; i < pThis->iNumTpls ; ++i) { - if(pThis->ppMsgs[i] != NULL) { - switch(pThis->eParamPassing) { - case ACT_ARRAY_PASSING: -#if 0 /* later, as an optimization. So far, we do the cleanup after each message */ - iArr = 0; - while(((char **)pThis->ppMsgs[i])[iArr] != NULL) { - d_free(((char **)pThis->ppMsgs[i])[iArr++]); - ((char **)pThis->ppMsgs[i])[iArr++] = NULL; - } - d_free(pThis->ppMsgs[i]); - pThis->ppMsgs[i] = NULL; -#endif - break; - case ACT_STRING_PASSING: - d_free(pThis->ppMsgs[i]); - break; - case ACT_MSG_PASSING: - /* No cleanup needed in this case */ - break; - default: - assert(0); - } - } - } - d_free(pThis->ppMsgs); - d_free(pThis->lenMsgs); - d_free(pThis); RETiRet; @@ -256,6 +230,7 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iResumeRetryCount = glbliActionResumeRetryCount; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pthread_mutex_init(&pThis->mutActExec, NULL); + INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); SYNC_OBJ_TOOL_INIT(pThis); /* indicate we have a new action */ @@ -280,6 +255,32 @@ actionConstructFinalize(action_t *pThis) /* find a name for our queue */ snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + /* now check if we can run the action in "firehose mode" during stage one of + * its processing (that is before messages are enqueued into the action q). + * This is only possible if some features, which require strict sequence, are + * not used. Thankfully, that is usually the case. The benefit of firehose + * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 + */ + if( pThis->iExecEveryNthOccur > 1 + || pThis->f_ReduceRepeated + || pThis->iSecsExecOnceInterval + ) { + DBGPRINTF("info: firehose mode disabled for action because " + "iExecEveryNthOccur=%d, " + "ReduceRepeated=%d, " + "iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, + pThis->iSecsExecOnceInterval + ); + pThis->submitToActQ = doSubmitToActionQComplexBatch; + } else if(pThis->bWriteAllMarkMsgs == FALSE) { + /* nearly full-speed submission mode, default case */ + pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; + } else { + /* full firehose submission mode */ + pThis->submitToActQ = doSubmitToActionQBatch; + } + /* we need to make a safety check: if the queue is NOT in direct mode, a single * message object may be accessed by multiple threads. As such, we need to enable * msg object thread safety in this case (this costs a bit performance and thus @@ -503,7 +504,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); iRetries = 0; - while(pThis->eState == ACT_STATE_RTRY) { + while((*pThis->pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) { bTreatOKasSusp = 1; @@ -522,6 +523,9 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) iSleepPeriod = pThis->iResumeInterval; ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ srSleep(iSleepPeriod, 0); + if(*pThis->pbShutdownImmediate) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } } } else if(iRet == RS_RET_DISABLE_ACTION) { actionDisable(pThis); @@ -532,6 +536,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) pThis->iNbrResRtry = 0; } +finalize_it: RETiRet; } @@ -580,7 +585,7 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static rsRetVal actionPrepare(action_t *pThis) +static inline rsRetVal actionPrepare(action_t *pThis) { DEFiRet; @@ -617,6 +622,7 @@ finalize_it: rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; + char *sz; dbgprintf("%s: ", module.GetStateName(pThis->pMod)); pThis->pMod->dbgPrintInstInfo(pThis->pModData); @@ -629,6 +635,16 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); + if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { + sz = "slow, but feature-rich"; + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMarkBatch) { + sz = "fast, but supports partial mark messages"; + } else if(pThis->submitToActQ == doSubmitToActionQBatch) { + sz = "firehose (fastest)"; + } else { + sz = "unknown (need to update debug display?)"; + } + dbgprintf("\tsubmission mode: %s\n", sz); dbgprintf("\n"); RETiRet; @@ -638,7 +654,7 @@ rsRetVal actionDbgPrint(action_t *pThis) /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ -static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) +static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs) { int i; DEFiRet; @@ -649,16 +665,17 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pAction->ppMsgs[i]), &(pAction->lenMsgs[i]))); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i])); break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pAction->ppMsgs[i]))); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i]))); break; case ACT_MSG_PASSING: /* we abuse the uchar* ptr, it now actually is a void*, but we can not * change that other than by chaning the interface, what we don't like... */ - pAction->ppMsgs[i] = (uchar*) pMsg; + ppMsgs[i] = (void*) pMsg; + lenMsgs[i] = 0; /* init for *next* action */ break; default:assert(0); /* software bug if this happens! */ } @@ -672,31 +689,23 @@ finalize_it: /* cleanup doAction calling parameters * rgerhards, 2009-05-07 */ -static rsRetVal cleanupDoActionParams(action_t *pAction) +static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***ppMsgs) { - int i; int iArr; + int i; DEFiRet; ASSERT(pAction != NULL); + for(i = 0 ; i < pAction->iNumTpls ; ++i) { - if(pAction->ppMsgs[i] != NULL) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - iArr = 0; - while(((char **)pAction->ppMsgs[i])[iArr] != NULL) { - d_free(((char **)pAction->ppMsgs[i])[iArr++]); - ((char **)pAction->ppMsgs[i])[iArr++] = NULL; - } - d_free(pAction->ppMsgs[i]); - pAction->ppMsgs[i] = NULL; - break; - case ACT_MSG_PASSING: - case ACT_STRING_PASSING: - break; - default: - assert(0); + if(((uchar**)ppMsgs)[i] != NULL) { + iArr = 0; + while((((uchar***)ppMsgs)[i][iArr]) != NULL) { + d_free(((uchar ***)ppMsgs)[i][iArr++]); + ((uchar ***)ppMsgs)[i][iArr++] = NULL; } + d_free(((uchar**)ppMsgs)[i]); + ((uchar**)ppMsgs)[i] = NULL; } } @@ -705,29 +714,24 @@ static rsRetVal cleanupDoActionParams(action_t *pAction) /* call the DoAction output plugin entry point - * Performance note: we build the action parameters here in this function. That - * means we do it while we hold the action look, potentially reducing concurrency - * (especially if the action queue is run in DIRECT mode). As an alternative, we - * may generate all params for the batch as whole before aquiring the action. However, - * that requires more memory, for large batches potentially a lot of memory. So for the - * time being, I am doing it here - the performance hit should be very minor and may even - * not be a hit because we may gain CPU cache locality gains with the "fewer memory" - * approach (I'd say that is rater likely). * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg) +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) { + int i; DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); - CHKiRet(prepareDoActionParams(pThis, pMsg)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); +//d_pthread_mutex_lock(&pThis->mutActExec); +//pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec); + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); +//pthread_cleanup_pop(1); /* unlock mutex */ switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -756,7 +760,26 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) iRet = getReturnCode(pThis); finalize_it: - cleanupDoActionParams(pThis); /* iRet ignored! */ + + /* we need to cleanup the batches string buffers if they have been used + * in a non-standard way. -- rgerhards, 2010-06-15 + * Note that we may do this at the batch level, this would provide a bit + * more concurrency (TODO). + */ + switch(pThis->eParamPassing) { + case ACT_STRING_PASSING: + /* nothing to do in that case */ + break; + case ACT_ARRAY_PASSING: + cleanupDoActionParams(pThis, actParams); /* iRet ignored! */ + break; + case ACT_MSG_PASSING: + /* nothing to do in that case */ + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + ((uchar**)actParams)[i] = NULL; + } + break; + } RETiRet; } @@ -766,18 +789,17 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg) +static inline rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams) { DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); -RUNLOG_STR("inside actionProcessMsg()"); CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg)); + CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); iRet = getReturnCode(pThis); finalize_it: @@ -797,8 +819,10 @@ finishBatch(action_t *pThis, batch_t *pBatch) ASSERT(pThis != NULL); - if(pThis->eState == ACT_STATE_RDY) + if(pThis->eState == ACT_STATE_RDY) { + /* we just need to flag the batch as commited */ FINALIZE; /* nothing to do */ + } CHKiRet(actionPrepare(pThis)); if(pThis->eState == ACT_STATE_ITX) { @@ -808,7 +832,8 @@ finishBatch(action_t *pThis, batch_t *pBatch) actionCommitted(pThis); /* flag messages as committed */ for(i = 0 ; i < pBatch->nElem ; ++i) { - pBatch->pElem[i].state = BATCH_STATE_COMM; + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ } break; case RS_RET_SUSPENDED: @@ -843,8 +868,8 @@ finalize_it: /* try to submit a partial batch of elements. * rgerhards, 2009-05-12 */ -static rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate) +static inline rsRetVal +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) { int i; int iElemProcessed; @@ -860,12 +885,17 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme iElemProcessed = 0; iCommittedUpTo = i; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*pbShutdownImmediate) + if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - if(pBatch->pElem[i].state != BATCH_STATE_DISC) { - localRet = actionProcessMessage(pAction, pMsg); + if( pBatch->pElem[i].bFilterOK + && pBatch->pElem[i].state != BATCH_STATE_DISC + && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams); DBGPRINTF("action call returned %d\n", localRet); + /* Note: we directly modify the batch object state, because we know that + * wo do not overwrite BATCH_STATE_DISC indicators! + */ if(localRet == RS_RET_OK) { /* mark messages as committed */ while(iCommittedUpTo <= i) { @@ -882,7 +912,8 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme } else if(localRet == RS_RET_DISCARDMSG) { pBatch->pElem[i].state = BATCH_STATE_DISC; } else { - dbgprintf("tryDoAction: unexpected error code %d, finalizing\n", localRet); + dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", + localRet, *pnElem, iCommittedUpTo); iRet = localRet; FINALIZE; } @@ -892,10 +923,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme } finalize_it: - if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) { - iRet = RS_RET_DISCARDMSG; - } else if(pBatch->iDoneUpTo != iCommittedUpTo) { - *pnElem += iCommittedUpTo - pBatch->iDoneUpTo; + if(pBatch->iDoneUpTo != iCommittedUpTo) { pBatch->iDoneUpTo = iCommittedUpTo; } RETiRet; @@ -905,10 +933,12 @@ finalize_it: /* submit a batch for actual action processing. * The first nElem elements are processed. This function calls itself * recursively if it needs to handle errors. + * Note: we don't need the number of the first message to be processed as a parameter, + * because this is kept track of inside the batch itself (iDoneUpTo). * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) { int i; int bDone; @@ -919,47 +949,49 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); - if(localRet == RS_RET_FORCE_TERM) - FINALIZE; + localRet = tryDoAction(pAction, pBatch, &nElem); + if(localRet == RS_RET_FORCE_TERM) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { /* try commit transaction, once done, we can simply do so as if * that return state was returned from tryDoAction(). */ - localRet = finishBatch(pAction, pBatch); // TODO: careful, do we need the elem counter? + localRet = finishBatch(pAction, pBatch); } if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { bDone = 1; - } else if(localRet == RS_RET_DISCARDMSG) { - iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */ - bDone = 1; } else if(localRet == RS_RET_SUSPENDED) { ; /* do nothing, this will retry the full batch */ } else if(localRet == RS_RET_ACTION_FAILED) { - /* in this case, the whole batch can not be processed */ - for(i = 0 ; i < nElem ; ++i) { - pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD; + /* in this case, everything not yet committed is BAD */ + for(i = pBatch->iDoneUpTo ; i < nElem ; ++i) { + if( pBatch->pElem[i].state != BATCH_STATE_DISC + && pBatch->pElem[i].state != BATCH_STATE_COMM ) { + pBatch->pElem[i].state = BATCH_STATE_BAD; + pBatch->pElem[i].bPrevWasSuspended = 1; + } } bDone = 1; } else { if(nElem == 1) { - pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD; + batchSetElemState(pBatch, i, BATCH_STATE_BAD); bDone = 1; } else { /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate); - submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate); + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); bDone = 1; } } - } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */ + } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - if(*pbShutdownImmediate) + if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); finalize_it: @@ -967,17 +999,45 @@ finalize_it: } + +/* The following function prepares a batch for processing, that it is + * reinitializes batch states, generates strings and does everything else + * that needs to be done in order to make the batch ready for submission to + * the actual output module. Note that we look at the precomputed + * filter OK condition and process only those messages, that actually matched + * the filter. + * rgerhards, 2010-06-14 + */ +static inline rsRetVal +prepareBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + batch_obj_t *pElem; + DEFiRet; + + pBatch->iDoneUpTo = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + pElem->state = BATCH_STATE_RDY; + prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp, + (uchar**) &(pElem->staticActParams), pElem->staticLenParams); + } + } + RETiRet; +} + + /* receive a batch and process it. This includes retry handling. * rgerhards, 2009-05-12 */ -static rsRetVal -processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +static inline rsRetVal +processAction(action_t *pAction, batch_t *pBatch) { DEFiRet; assert(pBatch != NULL); - pBatch->iDoneUpTo = 0; - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate)); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); iRet = finishBatch(pAction, pBatch); finalize_it: @@ -993,10 +1053,16 @@ finalize_it: static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { + int *pbShutdownImmdtSave; DEFiRet; assert(pBatch != NULL); + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; + pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate; + CHKiRet(prepareBatch(pAction, pBatch)); + /* We now must guard the output module against execution by multiple threads. The * plugin interface specifies that output modules must not be thread-safe (except * if they notify us they are - functionality not yet implemented...). @@ -1005,10 +1071,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch, pbShutdownImmediate); + iRet = processAction(pAction, pBatch); pthread_cleanup_pop(1); /* unlock mutex */ +finalize_it: + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -1073,20 +1141,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } -/* rgerhards 2004-11-09: fprintlog() is the actual driver for - * the output channel. It receives the channel description (f) as - * well as the message and outputs them according to the channel - * semantics. The message is typically already contained in the - * channel save buffer (f->f_prevline). This is not only the case - * when a message was already repeated but also when a new message - * arrived. - * rgerhards 2007-08-01: interface changed to use action_t - * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE - * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do - * not do this here. Failing to do so results in all kinds of - * "interesting" problems! - * RGERHARDS, 2008-01-29: - * This is now the action caller and has been renamed. +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. */ rsRetVal actionWriteToAction(action_t *pAction) @@ -1212,7 +1268,7 @@ finalize_it: /* helper to actonCallAction, mostly needed because of this damn * pthread_cleanup_push() POSIX macro... */ -static rsRetVal +static inline rsRetVal doActionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; @@ -1267,41 +1323,167 @@ finalize_it: RETiRet; } +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - * FYI: currently, this function is only called from the queue - * consumer. So we (conceptually) run detached from the input - * threads (which also means we may run much later than when the - * message was generated). + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + + +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. Here, we just modify the filter condition to be false when + * a mark message must not be written. However, in this case we must save the previous + * filter as we may need it in the next action (potential future optimization: check if this is + * the last action TODO). + * rgerhards, 2010-06-08 */ -#pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal -actionCallAction(action_t *pAction, msg_t *pMsg) +static rsRetVal +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) { + time_t now = 0; + time_t lastAct; + int i; + int bProcessMarkMsgs; + int bModifiedFilter; + sbool FilterSave[128]; + sbool *pFilterSave; DEFiRet; - ISOBJ_TYPE_assert(pMsg, msg); - ASSERT(pAction != NULL); + if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { + pFilterSave = FilterSave; + } else { + CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); + } - /* We need to lock the mutex only for repeated line processing. - * rgerhards, 2009-06-19 - */ - //if(pAction->f_ReduceRepeated == 1) { - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - iRet = doActionCallAction(pAction, pMsg); - UnlockObj(pAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - //} else { - //iRet = doActionCallAction(pAction, pMsg); - //} + bModifiedFilter = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + pFilterSave[i] = pBatch->pElem[i].bFilterOK; + if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { + /* check if we need to write or not */ + if(now == 0) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if((now - lastAct) < MarkInterval / 2) { + DBGPRINTF("action was recently called, ignoring mark message\n"); + bProcessMarkMsgs = 0; + } else { + bProcessMarkMsgs = 1; + } + } while(ATOMIC_CAS(&pAction->f_time, lastAct, + ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0); + } + if(bProcessMarkMsgs) { + pBatch->pElem[i].bFilterOK = 0; + bModifiedFilter = 1; + } + } + } + + DBGPRINTF("Called action(NotAllMark), logging to %s\n", module.GetStateName(pAction->pMod)); + + iRet = doSubmitToActionQBatch(pAction, pBatch); + + if(bModifiedFilter) { + /* in this case, we need to restore previous state */ + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + pBatch->pElem[i].bFilterOK = pFilterSave[i]; + } + } + +finalize_it: + if(pFilterSave != FilterSave) + free(pFilterSave); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + else { /* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { + doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + } + + RETiRet; +} + + + +/* Helper to submit a batch of actions to the engine. Note that we have rather + * complicated processing here, so we need to do this one message after another. + * rgerhards, 2010-06-23 + */ +static inline rsRetVal +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { + doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + + RETiRet; +} + +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 + */ +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + DEFiRet; + + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); + UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ + + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" + /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. * Note: this function pulls global data that specifies action config state. @@ -1347,8 +1529,6 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques if(pAction->iNumTpls > 0) { /* we first need to create the template pointer array */ CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); - CHKmalloc(pAction->ppMsgs = (uchar**) calloc(pAction->iNumTpls, sizeof(uchar *))); - CHKmalloc(pAction->lenMsgs = (size_t*) calloc(pAction->iNumTpls, sizeof(size_t))); } for(i = 0 ; i < pAction->iNumTpls ; ++i) { |