summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c6
-rw-r--r--action.h2
-rw-r--r--runtime/batch.h1
-rw-r--r--runtime/ruleset.c5
-rw-r--r--runtime/wti.h1
-rw-r--r--tools/syslogd.c8
6 files changed, 12 insertions, 11 deletions
diff --git a/action.c b/action.c
index abd8818e..790ea4ce 100644
--- a/action.c
+++ b/action.c
@@ -1061,7 +1061,7 @@ actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
/* Commit all active transactions in *DIRECT mode* */
void
-actionCommitAllDirect(wti_t *pWti, int *pbShutdownImmediate)
+actionCommitAllDirect(wti_t *pWti)
{
int i;
action_t *pAction;
@@ -1071,7 +1071,7 @@ actionCommitAllDirect(wti_t *pWti, int *pbShutdownImmediate)
i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot);
pAction = pWti->actWrkrInfo[i].pAction;
if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT)
- actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pbShutdownImmediate);
+ actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pWti->pbShutdownImmediate);
}
}
@@ -1115,7 +1115,7 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed
DEFiRet;
if(pbShutdownImmediate == NULL) {
- pbShutdownImmediate = pBatch->pbShutdownImmediate;
+ pbShutdownImmediate = pWti->pbShutdownImmediate;
}
/* indicate we have not yet read the date */
diff --git a/action.h b/action.h
index 5216430c..666542ac 100644
--- a/action.h
+++ b/action.h
@@ -91,7 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr
rsRetVal activateActions(void);
rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction);
rsRetVal actionProcessCnf(struct cnfobj *o);
-void actionCommitAllDirect(wti_t *pWti, int *pbShutdownImmediate);
+void actionCommitAllDirect(wti_t *pWti);
/* external data */
extern int iActionNbr;
diff --git a/runtime/batch.h b/runtime/batch.h
index e8268b1c..5c855521 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -68,7 +68,6 @@ struct batch_s {
int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */
int iDoneUpTo; /* all messages below this index have state other than RDY */
qDeqID deqID; /* ID of dequeue operation that generated this batch */
- int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
batch_obj_t *pElem; /* batch elements */
batch_state_t *eltState;/* state (array!) for individual objects.
NOTE: we have moved this out of batch_obj_t because we
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 3af78927..c54715fc 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -48,6 +48,7 @@
#include "rainerscript.h"
#include "srUtils.h"
#include "modules.h"
+#include "wti.h"
#include "dirty.h" /* for main ruleset queue creation */
/* static data */
@@ -392,7 +393,7 @@ processBatch(batch_t *pBatch, wti_t *pWti)
DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem);
/* execution phase */
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) {
pMsg = pBatch->pElem[i].pMsg;
DBGPRINTF("processBATCH: next msg %d: %.128s\n", i, pMsg->pszRawMsg);
pRuleset = (pMsg->pRuleset == NULL) ? ourConf->rulesets.pDflt : pMsg->pRuleset;
@@ -405,7 +406,7 @@ processBatch(batch_t *pBatch, wti_t *pWti)
/* commit phase */
dbgprintf("END batch execution phase, entering to commit phase\n");
- actionCommitAllDirect(pWti, pBatch->pbShutdownImmediate);
+ actionCommitAllDirect(pWti);
DBGPRINTF("processBATCH: batch of %d elements has been processed\n", pBatch->nElem);
RETiRet;
diff --git a/runtime/wti.h b/runtime/wti.h
index cd2fefab..813237fc 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -74,6 +74,7 @@ struct wti_s {
pthread_t thrdID; /* thread ID */
int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */
sbool bAlwaysRunning; /* should this thread always run? */
+ int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 0fb9a9fe..c27d79b7 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -497,7 +497,7 @@ finalize_it:
* rgerhards, 2010-06-09
*/
static inline rsRetVal
-preprocessBatch(batch_t *pBatch) {
+preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
prop_t *ip;
prop_t *fqdn;
prop_t *localName;
@@ -509,7 +509,7 @@ preprocessBatch(batch_t *pBatch) {
rsRetVal localRet;
DEFiRet;
- for(i = 0 ; i < pBatch->nElem && !*(pBatch->pbShutdownImmediate) ; i++) {
+ for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
pMsg = pBatch->pElem[i].pMsg;
if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
@@ -555,8 +555,8 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWt
{
DEFiRet;
assert(pBatch != NULL);
- pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */
- preprocessBatch(pBatch);
+ pWti->pbShutdownImmediate = pbShutdownImmediate;
+ preprocessBatch(pBatch, pWti->pbShutdownImmediate);
ruleset.ProcessBatch(pBatch, pWti);
//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
//do not have this yet and so we emulate -- 2010-06-10