diff options
-rw-r--r-- | action.c | 13 | ||||
-rw-r--r-- | runtime/ruleset.c | 3 | ||||
-rw-r--r-- | runtime/wti.h | 11 |
3 files changed, 21 insertions, 6 deletions
@@ -996,7 +996,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti) iparamCurr = iparamCurr->next; free(iparamDel); // TODO: memleak strings! } - wrkrInfo->iparamLast = NULL; + wrkrInfo->iparamRoot = wrkrInfo->iparamLast = NULL; } CHKiRet(actionPrepare(pThis, pWti)); @@ -1102,14 +1102,14 @@ dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRa pWti); releaseDoActionParams(pAction, pWti); finalize_it: + if(pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED); dbgprintf("DDDD: bPrevWasSuspended now %d, action state %d\n", (int)pWti->execState.bPrevWasSuspended, getActionState(pWti, pAction)); RETiRet; } -/* receive an array of to-process user pointers and submit them - * for processing. - * rgerhards, 2009-04-22 +/* This entry point is called by the ACTION queue (not main queue!) */ static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti) @@ -1120,7 +1120,7 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti) struct syslogTime ttNow; DEFiRet; - pWti->execState.bPrevWasSuspended = 0; + wtiResetExecState(pWti, pBatch); /* indicate we have not yet read the date */ ttNow.year = 0; @@ -1133,7 +1133,8 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti) } } - iRet = actionCommit(pAction, pWti); + if(!pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 6013baf7..822150d9 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -412,6 +412,7 @@ done: return; /* Process (consume) a batch of messages. Calls the actions configured. + * This is called by MAIN queues. */ static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti) @@ -423,6 +424,8 @@ processBatch(batch_t *pBatch, wti_t *pWti) DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem); + wtiResetExecState(pWti, pBatch); + /* execution phase */ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) { pMsg = pBatch->pElem[i].pMsg; diff --git a/runtime/wti.h b/runtime/wti.h index 0c2af12e..2a408c0a 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -82,6 +82,10 @@ struct wti_s { DEF_ATOMIC_HELPER_MUT(mutIsRunning); struct { uint8_t bPrevWasSuspended; + uint8_t bDoAutoCommit; /* do a commit after each message + * this is usually set for batches with 0 element, but may + * also be added as a user-selectable option (not implemented yet) + */ } execState; /* state for the execution engine */ }; @@ -177,4 +181,11 @@ dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr); finalize_it: RETiRet; } + +static inline void +wtiResetExecState(wti_t *pWti, batch_t *pBatch) +{ + pWti->execState.bPrevWasSuspended = 0; + pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1); +} #endif /* #ifndef WTI_H_INCLUDED */ |