diff options
-rw-r--r-- | action.c | 206 | ||||
-rw-r--r-- | action.h | 3 | ||||
-rw-r--r-- | runtime/ruleset.c | 17 |
3 files changed, 90 insertions, 136 deletions
@@ -14,7 +14,6 @@ * * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval * - doSubmitToActionQComplexBatch - * - helperSubmitToActionQComplexBatch * - doActionCallAction * handles mark message reduction, but in essence calls * - actionWriteToAction @@ -117,9 +116,9 @@ /* forward definitions */ static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -977,26 +976,14 @@ finalize_it: } -/* finish processing a batch. Most importantly, that means we commit if we - * need to do so. - * rgerhards, 2008-01-28 - */ +/* Commit action after processing. */ static rsRetVal -finishBatch(action_t *pThis, wti_t *pWti) +actionCommit(action_t *pThis, wti_t *pWti) { - int i; int pbShutdownImmediate = 1; DEFiRet; - ASSERT(pThis != NULL); -// Testing for new engine: -dbgprintf("DDDD: iActionNbr %d\n", iActionNbr); -for(i = 0 ; i < iActionNbr ; ++i) { - dbgprintf("DDDD: finishBatch, act %d state %u\n", i, getActionStateByNbr(pWti, i)); -} - if(getActionState(pWti, pThis) == ACT_STATE_RDY) { - /* we just need to flag the batch as commited */ FINALIZE; /* nothing to do */ } @@ -1037,30 +1024,38 @@ finalize_it: } -/* copy "active" array of batch, as we need to modify it. The caller - * must make sure the new array is freed and the orginal batch - * pointer is restored (thus the caller must save it). If active - * is currently NULL, this is properly handled. - * Note: the batches active pointer is modified, so it must be - * saved BEFORE calling this function! - * rgerhards, 2012-09-12 +/* process a single message. This is both called if we run from the + * cosumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". */ static rsRetVal -copyActive(batch_t *pBatch) +processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate) { - sbool *active; DEFiRet; - CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - if(pBatch->active == NULL) - memset(active, 1, batchNumMsgs(pBatch)); - else - memcpy(active, pBatch->active, batchNumMsgs(pBatch)); - pBatch->active = active; -finalize_it: +dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg); + // TODO: check error return states! + iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); + iRet = actionProcessMessage(pAction, pMsg, + pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, + pbShutdownImmediate, pWti); + releaseDoActionParams(pAction, pWti); RETiRet; } +/* Commit all active transactions */ +rsRetVal +actionCommitAll(wti_t *pWti) +{ + int i; + for(i = 0 ; i < iActionNbr ; ++i) { + DBGPRINTF("DDDD: actionCommitall action %d state %u\n", + i, getActionStateByNbr(pWti, i)); + if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) { + actionCommit(pWti->actWrkrInfo[i].pAction, pWti); + } + } +} /* receive an array of to-process user pointers and submit them * for processing. @@ -1085,19 +1080,13 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) { if(batchIsValidElem(pBatch, i)) { pMsg = pBatch->pElem[i].pMsg; -dbgprintf("DDDD: processBatchMain[act %d], elt %d: %s\n", pAction->iActionNbr, i, pMsg->pszRawMsg); - // TODO: check error return states! - iRet = prepareDoActionParams(pAction, pWti, pMsg, &ttNow); - iRet = actionProcessMessage(pAction, pMsg, - pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, - pbShutdownImmediate, pWti); - releaseDoActionParams(pAction, pWti); - // TODO: we must refactor this! flag messages as committed - batchSetElemState(pBatch, i, BATCH_STATE_COMM); + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); } } - iRet = finishBatch(pAction, pWti); + iRet = actionCommit(pAction, pWti); dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } @@ -1161,7 +1150,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT * rgerhards, 2010-06-08 */ static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) +doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg) { DEFiRet; @@ -1245,7 +1234,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg, pWti); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; @@ -1256,12 +1245,10 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) +doActionCallAction(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - msg_t *pMsg; DEFiRet; - pMsg = pBatch->pElem[idxBtch].pMsg; pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1325,75 +1312,55 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { time_t now = 0; + int doProcess = 1; time_t lastAct; - int i; - sbool *activeSave; DEFiRet; - activeSave = pBatch->active; - copyActive(pBatch); - - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) - continue; - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - } - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if(pBatch->pElem[i].pMsg->msgFlags & MARK) { - if((now - lastAct) < MarkInterval / 2) { - pBatch->active[i] = 0; - DBGPRINTF("batch item %d: action was recently called, ignoring " - "mark message\n", i); - break; /* do not update timestamp for non-written mark messages */ - } + if(now == 0) { // TODO: do in caller! + now = datetime.GetTime(NULL); /* good time call - the only one done */ + } + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((now - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->active[i]) { - DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", - i, module.GetStateName(pAction->pMod)); } - } - - iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); - free(pBatch->active); - pBatch->active = activeSave; + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQBatch(pAction, pWti, pMsg); + } RETiRet; } -static inline void -countStatsBatchEnq(action_t *pAction, batch_t *pBatch) -{ - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - } - } -} - /* enqueue a batch in direct mode. We have put this into its own function just to avoid * cluttering the actual submit function. * rgerhards, 2011-06-16 */ static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { +int pbShutdownImmediate = 0; // TODO: implement + struct syslogTime ttNow; DEFiRet; if(GatherStats) - countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate); RETiRet; } @@ -1403,67 +1370,38 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - int i; DEFiRet; - DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); + iRet = doQueueEnqObjDirectBatch(pAction, pWti, pMsg); } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(batchIsValidElem(pBatch, i)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); - } - } + doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } - -/* Helper to submit a batch of actions to the engine. Note that we have rather - * complicated processing here, so we need to do this one message after another. - * rgerhards, 2010-06-23 - */ -static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action %p (complex case), logging to %s\n", - pAction, module.GetStateName(pAction->pMod)); - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp); - if(batchIsValidElem(pBatch, i)) { - doActionCallAction(pAction, pBatch, i, pWti); - } - } - - RETiRet; -} - /* Call configured action, most complex case with all features supported (and thus slow). * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); -dbgprintf("DDDD: locked mutAction\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + doActionCallAction(pAction, pWti, pMsg); d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ @@ -57,7 +57,7 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - rsRetVal (*submitToActQ)(action_t *, batch_t *, wti_t*);/* function submit message to action queue */ + rsRetVal (*submitToActQ)(action_t *, wti_t*, msg_t*);/* function submit message to action queue */ rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2, ACT_JSON_PASSING = 3} @@ -91,6 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr rsRetVal activateActions(void); rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction); rsRetVal actionProcessCnf(struct cnfobj *o); +rsRetVal actionCommitAll(wti_t *pWti); /* external data */ extern int iActionNbr; diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 6cc98105..2ad21170 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -228,13 +228,27 @@ static inline void freeActive(sbool *active) { free(active); } static rsRetVal execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { + int i; DEFiRet; dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); pBatch->active = active; // TODO: check here if bPrevWasSuspsended was required and, if so // if we actually are permitted to execute this action. //if(pAction->bExecWhenPrevSusp) { - stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti); + + + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + DBGPRINTF("action %d: valid:%d state:%d execWhenPrev:%d\n", + stmt->d.act->iActionNbr, batchIsValidElem(pBatch, i), pBatch->eltState[i], + stmt->d.act->bExecWhenPrevSusp); + if(batchIsValidElem(pBatch, i)) { + stmt->d.act->submitToActQ(stmt->d.act, pWti, pBatch->pElem[i].pMsg); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } + } + + #warning implement action return code checking // we should store the return code and make it available // to users via a special function (or maybe variable) @@ -598,6 +612,7 @@ processBatch(batch_t *pBatch, wti_t *pWti) CHKiRet(processBatchMultiRuleset(pBatch, pWti)); } + actionCommitAll(pWti); finalize_it: DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet); RETiRet; |