diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-11-04 09:12:56 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-11-04 09:12:56 +0100 |
commit | b278457d495379dd0eca1d9da8b51fcf7fad4559 (patch) | |
tree | 3261827916c421a5312edfeadf1812d3cc7feb9f /action.c | |
parent | 32b37ecd82b508e707cf1aa0b27acb4ac96a295c (diff) | |
download | rsyslog-b278457d495379dd0eca1d9da8b51fcf7fad4559.tar.gz rsyslog-b278457d495379dd0eca1d9da8b51fcf7fad4559.tar.bz2 rsyslog-b278457d495379dd0eca1d9da8b51fcf7fad4559.zip |
refactor: move batch "unrolling" up one layer
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 206 |
1 files changed, 72 insertions, 134 deletions
@@ -14,7 +14,6 @@ * * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval * - doSubmitToActionQComplexBatch - * - helperSubmitToActionQComplexBatch * - doActionCallAction * handles mark message reduction, but in essence calls * - actionWriteToAction @@ -117,9 +116,9 @@ /* forward definitions */ static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -977,26 +976,14 @@ finalize_it: } -/* finish processing a batch. Most importantly, that means we commit if we - * need to do so. - * rgerhards, 2008-01-28 - */ +/* Commit action after processing. */ static rsRetVal -finishBatch(action_t *pThis, wti_t *pWti) +actionCommit(action_t *pThis, wti_t *pWti) { - int i; int pbShutdownImmediate = 1; DEFiRet; - ASSERT(pThis != NULL); -// Testing for new engine: -dbgprintf("DDDD: iActionNbr %d\n", iActionNbr); -for(i = 0 ; i < iActionNbr ; ++i) { - dbgprintf("DDDD: finishBatch, act %d state %u\n", i, getActionStateByNbr(pWti, i)); -} - if(getActionState(pWti, pThis) == ACT_STATE_RDY) { - /* we just need to flag the batch as commited */ FINALIZE; /* nothing to do */ } @@ -1037,30 +1024,38 @@ finalize_it: } -/* copy "active" array of batch, as we need to modify it. The caller - * must make sure the new array is freed and the orginal batch - * pointer is restored (thus the caller must save it). If active - * is currently NULL, this is properly handled. - * Note: the batches active pointer is modified, so it must be - * saved BEFORE calling this function! - * rgerhards, 2012-09-12 +/* process a single message. This is both called if we run from the + * cosumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". */ static rsRetVal -copyActive(batch_t *pBatch) +processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate) { - sbool *active; DEFiRet; - CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - if(pBatch->active == NULL) - memset(active, 1, batchNumMsgs(pBatch)); - else - memcpy(active, pBatch->active, batchNumMsgs(pBatch)); - pBatch->active = active; -finalize_it: +dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg); + // TODO: check error return states! + iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); + iRet = actionProcessMessage(pAction, pMsg, + pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, + pbShutdownImmediate, pWti); + releaseDoActionParams(pAction, pWti); RETiRet; } +/* Commit all active transactions */ +rsRetVal +actionCommitAll(wti_t *pWti) +{ + int i; + for(i = 0 ; i < iActionNbr ; ++i) { + DBGPRINTF("DDDD: actionCommitall action %d state %u\n", + i, getActionStateByNbr(pWti, i)); + if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) { + actionCommit(pWti->actWrkrInfo[i].pAction, pWti); + } + } +} /* receive an array of to-process user pointers and submit them * for processing. @@ -1085,19 +1080,13 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) { if(batchIsValidElem(pBatch, i)) { pMsg = pBatch->pElem[i].pMsg; -dbgprintf("DDDD: processBatchMain[act %d], elt %d: %s\n", pAction->iActionNbr, i, pMsg->pszRawMsg); - // TODO: check error return states! - iRet = prepareDoActionParams(pAction, pWti, pMsg, &ttNow); - iRet = actionProcessMessage(pAction, pMsg, - pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, - pbShutdownImmediate, pWti); - releaseDoActionParams(pAction, pWti); - // TODO: we must refactor this! flag messages as committed - batchSetElemState(pBatch, i, BATCH_STATE_COMM); + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); } } - iRet = finishBatch(pAction, pWti); + iRet = actionCommit(pAction, pWti); dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } @@ -1161,7 +1150,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT * rgerhards, 2010-06-08 */ static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) +doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg) { DEFiRet; @@ -1245,7 +1234,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg, pWti); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; @@ -1256,12 +1245,10 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) +doActionCallAction(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - msg_t *pMsg; DEFiRet; - pMsg = pBatch->pElem[idxBtch].pMsg; pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1325,75 +1312,55 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { time_t now = 0; + int doProcess = 1; time_t lastAct; - int i; - sbool *activeSave; DEFiRet; - activeSave = pBatch->active; - copyActive(pBatch); - - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) - continue; - if(now == 0) { - now = datetime.GetTime(NULL); /* good time call - the only one done */ - } - /* CAS loop, we write back a bit early, but that's OK... */ - /* we use reception time, not dequeue time - this is considered more appropriate and - * also faster ;) -- rgerhards, 2008-09-17 */ - do { - lastAct = pAction->f_time; - if(pBatch->pElem[i].pMsg->msgFlags & MARK) { - if((now - lastAct) < MarkInterval / 2) { - pBatch->active[i] = 0; - DBGPRINTF("batch item %d: action was recently called, ignoring " - "mark message\n", i); - break; /* do not update timestamp for non-written mark messages */ - } + if(now == 0) { // TODO: do in caller! + now = datetime.GetTime(NULL); /* good time call - the only one done */ + } + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((now - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ } - } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->active[i]) { - DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", - i, module.GetStateName(pAction->pMod)); } - } - - iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); - free(pBatch->active); - pBatch->active = activeSave; + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQBatch(pAction, pWti, pMsg); + } RETiRet; } -static inline void -countStatsBatchEnq(action_t *pAction, batch_t *pBatch) -{ - int i; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( batchIsValidElem(pBatch, i)) { - STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); - } - } -} - /* enqueue a batch in direct mode. We have put this into its own function just to avoid * cluttering the actual submit function. * rgerhards, 2011-06-16 */ static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { +int pbShutdownImmediate = 0; // TODO: implement + struct syslogTime ttNow; DEFiRet; if(GatherStats) - countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate); RETiRet; } @@ -1403,67 +1370,38 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { - int i; DEFiRet; - DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); + iRet = doQueueEnqObjDirectBatch(pAction, pWti, pMsg); } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if(batchIsValidElem(pBatch, i)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); - } - } + doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } - -/* Helper to submit a batch of actions to the engine. Note that we have rather - * complicated processing here, so we need to do this one message after another. - * rgerhards, 2010-06-23 - */ -static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) -{ - int i; - DEFiRet; - - DBGPRINTF("Called action %p (complex case), logging to %s\n", - pAction, module.GetStateName(pAction->pMod)); - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], - pAction->bExecWhenPrevSusp); - if(batchIsValidElem(pBatch, i)) { - doActionCallAction(pAction, pBatch, i, pWti); - } - } - - RETiRet; -} - /* Call configured action, most complex case with all features supported (and thus slow). * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) +doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); -dbgprintf("DDDD: locked mutAction\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + doActionCallAction(pAction, pWti, pMsg); d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ |