summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-11-06 11:28:44 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2013-11-06 11:28:44 +0100
commit176dea765449df3fc057623cb3762bed2754d8fa (patch)
tree2b4f2ef1c3a6ebc8b94a87a666765ed3c2fe3f2e
parent08c262011d7436dcc189949d759cfad80d928a7e (diff)
downloadrsyslog-176dea765449df3fc057623cb3762bed2754d8fa.tar.gz
rsyslog-176dea765449df3fc057623cb3762bed2754d8fa.tar.bz2
rsyslog-176dea765449df3fc057623cb3762bed2754d8fa.zip
make batches of 1 commit automatically after each action
This improves failover handling and makes it consistent with doc (precise handling for batches of one).
-rw-r--r--action.c13
-rw-r--r--runtime/ruleset.c3
-rw-r--r--runtime/wti.h11
3 files changed, 21 insertions, 6 deletions
diff --git a/action.c b/action.c
index e9395e23..718d9038 100644
--- a/action.c
+++ b/action.c
@@ -996,7 +996,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti)
iparamCurr = iparamCurr->next;
free(iparamDel); // TODO: memleak strings!
}
- wrkrInfo->iparamLast = NULL;
+ wrkrInfo->iparamRoot = wrkrInfo->iparamLast = NULL;
}
CHKiRet(actionPrepare(pThis, pWti));
@@ -1102,14 +1102,14 @@ dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRa
pWti);
releaseDoActionParams(pAction, pWti);
finalize_it:
+ if(pWti->execState.bDoAutoCommit)
+ iRet = actionCommit(pAction, pWti);
pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED);
dbgprintf("DDDD: bPrevWasSuspended now %d, action state %d\n", (int)pWti->execState.bPrevWasSuspended, getActionState(pWti, pAction));
RETiRet;
}
-/* receive an array of to-process user pointers and submit them
- * for processing.
- * rgerhards, 2009-04-22
+/* This entry point is called by the ACTION queue (not main queue!)
*/
static rsRetVal
processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti)
@@ -1120,7 +1120,7 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti)
struct syslogTime ttNow;
DEFiRet;
- pWti->execState.bPrevWasSuspended = 0;
+ wtiResetExecState(pWti, pBatch);
/* indicate we have not yet read the date */
ttNow.year = 0;
@@ -1133,7 +1133,8 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti)
}
}
- iRet = actionCommit(pAction, pWti);
+ if(!pWti->execState.bDoAutoCommit)
+ iRet = actionCommit(pAction, pWti);
dbgprintf("DDDD: processBatchMain - end\n");
RETiRet;
}
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 6013baf7..822150d9 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -412,6 +412,7 @@ done: return;
/* Process (consume) a batch of messages. Calls the actions configured.
+ * This is called by MAIN queues.
*/
static rsRetVal
processBatch(batch_t *pBatch, wti_t *pWti)
@@ -423,6 +424,8 @@ processBatch(batch_t *pBatch, wti_t *pWti)
DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem);
+ wtiResetExecState(pWti, pBatch);
+
/* execution phase */
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) {
pMsg = pBatch->pElem[i].pMsg;
diff --git a/runtime/wti.h b/runtime/wti.h
index 0c2af12e..2a408c0a 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -82,6 +82,10 @@ struct wti_s {
DEF_ATOMIC_HELPER_MUT(mutIsRunning);
struct {
uint8_t bPrevWasSuspended;
+ uint8_t bDoAutoCommit; /* do a commit after each message
+ * this is usually set for batches with 0 element, but may
+ * also be added as a user-selectable option (not implemented yet)
+ */
} execState; /* state for the execution engine */
};
@@ -177,4 +181,11 @@ dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr);
finalize_it:
RETiRet;
}
+
+static inline void
+wtiResetExecState(wti_t *pWti, batch_t *pBatch)
+{
+ pWti->execState.bPrevWasSuspended = 0;
+ pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1);
+}
#endif /* #ifndef WTI_H_INCLUDED */