summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/ruleset.c3
-rw-r--r--runtime/wti.h11
2 files changed, 14 insertions, 0 deletions
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 6013baf7..822150d9 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -412,6 +412,7 @@ done: return;
/* Process (consume) a batch of messages. Calls the actions configured.
+ * This is called by MAIN queues.
*/
static rsRetVal
processBatch(batch_t *pBatch, wti_t *pWti)
@@ -423,6 +424,8 @@ processBatch(batch_t *pBatch, wti_t *pWti)
DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem);
+ wtiResetExecState(pWti, pBatch);
+
/* execution phase */
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) {
pMsg = pBatch->pElem[i].pMsg;
diff --git a/runtime/wti.h b/runtime/wti.h
index 0c2af12e..2a408c0a 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -82,6 +82,10 @@ struct wti_s {
DEF_ATOMIC_HELPER_MUT(mutIsRunning);
struct {
uint8_t bPrevWasSuspended;
+ uint8_t bDoAutoCommit; /* do a commit after each message
+ * this is usually set for batches with 0 element, but may
+ * also be added as a user-selectable option (not implemented yet)
+ */
} execState; /* state for the execution engine */
};
@@ -177,4 +181,11 @@ dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr);
finalize_it:
RETiRet;
}
+
+static inline void
+wtiResetExecState(wti_t *pWti, batch_t *pBatch)
+{
+ pWti->execState.bPrevWasSuspended = 0;
+ pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1);
+}
#endif /* #ifndef WTI_H_INCLUDED */