diff options
-rw-r--r-- | action.c | 6 | ||||
-rw-r--r-- | action.h | 2 | ||||
-rw-r--r-- | runtime/batch.h | 1 | ||||
-rw-r--r-- | runtime/ruleset.c | 5 | ||||
-rw-r--r-- | runtime/wti.h | 1 | ||||
-rw-r--r-- | tools/syslogd.c | 8 |
6 files changed, 12 insertions, 11 deletions
@@ -1061,7 +1061,7 @@ actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) /* Commit all active transactions in *DIRECT mode* */ void -actionCommitAllDirect(wti_t *pWti, int *pbShutdownImmediate) +actionCommitAllDirect(wti_t *pWti) { int i; action_t *pAction; @@ -1071,7 +1071,7 @@ actionCommitAllDirect(wti_t *pWti, int *pbShutdownImmediate) i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot); pAction = pWti->actWrkrInfo[i].pAction; if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT) - actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pbShutdownImmediate); + actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pWti->pbShutdownImmediate); } } @@ -1115,7 +1115,7 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed DEFiRet; if(pbShutdownImmediate == NULL) { - pbShutdownImmediate = pBatch->pbShutdownImmediate; + pbShutdownImmediate = pWti->pbShutdownImmediate; } /* indicate we have not yet read the date */ @@ -91,7 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr rsRetVal activateActions(void); rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction); rsRetVal actionProcessCnf(struct cnfobj *o); -void actionCommitAllDirect(wti_t *pWti, int *pbShutdownImmediate); +void actionCommitAllDirect(wti_t *pWti); /* external data */ extern int iActionNbr; diff --git a/runtime/batch.h b/runtime/batch.h index e8268b1c..5c855521 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -68,7 +68,6 @@ struct batch_s { int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */ int iDoneUpTo; /* all messages below this index have state other than RDY */ qDeqID deqID; /* ID of dequeue operation that generated this batch */ - int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ batch_obj_t *pElem; /* batch elements */ batch_state_t *eltState;/* state (array!) for individual objects. NOTE: we have moved this out of batch_obj_t because we diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 3af78927..c54715fc 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -48,6 +48,7 @@ #include "rainerscript.h" #include "srUtils.h" #include "modules.h" +#include "wti.h" #include "dirty.h" /* for main ruleset queue creation */ /* static data */ @@ -392,7 +393,7 @@ processBatch(batch_t *pBatch, wti_t *pWti) DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem); /* execution phase */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) { pMsg = pBatch->pElem[i].pMsg; DBGPRINTF("processBATCH: next msg %d: %.128s\n", i, pMsg->pszRawMsg); pRuleset = (pMsg->pRuleset == NULL) ? ourConf->rulesets.pDflt : pMsg->pRuleset; @@ -405,7 +406,7 @@ processBatch(batch_t *pBatch, wti_t *pWti) /* commit phase */ dbgprintf("END batch execution phase, entering to commit phase\n"); - actionCommitAllDirect(pWti, pBatch->pbShutdownImmediate); + actionCommitAllDirect(pWti); DBGPRINTF("processBATCH: batch of %d elements has been processed\n", pBatch->nElem); RETiRet; diff --git a/runtime/wti.h b/runtime/wti.h index cd2fefab..813237fc 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -74,6 +74,7 @@ struct wti_s { pthread_t thrdID; /* thread ID */ int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */ sbool bAlwaysRunning; /* should this thread always run? */ + int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ diff --git a/tools/syslogd.c b/tools/syslogd.c index 0fb9a9fe..c27d79b7 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -497,7 +497,7 @@ finalize_it: * rgerhards, 2010-06-09 */ static inline rsRetVal -preprocessBatch(batch_t *pBatch) { +preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) { prop_t *ip; prop_t *fqdn; prop_t *localName; @@ -509,7 +509,7 @@ preprocessBatch(batch_t *pBatch) { rsRetVal localRet; DEFiRet; - for(i = 0 ; i < pBatch->nElem && !*(pBatch->pbShutdownImmediate) ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { pMsg = pBatch->pElem[i].pMsg; if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); @@ -555,8 +555,8 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWt { DEFiRet; assert(pBatch != NULL); - pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */ - preprocessBatch(pBatch); + pWti->pbShutdownImmediate = pbShutdownImmediate; + preprocessBatch(pBatch, pWti->pbShutdownImmediate); ruleset.ProcessBatch(pBatch, pWti); //TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we //do not have this yet and so we emulate -- 2010-06-10 |