summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/batch.h5
-rw-r--r--runtime/queue.c4
2 files changed, 5 insertions, 4 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index 944889bd..fdacb8e2 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -51,7 +51,6 @@ struct batch_obj_s {
/* work variables for action processing; these are reused for each action (or block of
* actions)
*/
- sbool bFilterOK; /* work area for filter processing (per action, reused!) */
sbool bPrevWasSuspended;
/* following are caches to save allocs if not absolutely necessary */
uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
@@ -83,6 +82,7 @@ struct batch_s {
int iDoneUpTo; /* all messages below this index have state other than RDY */
qDeqID deqID; /* ID of dequeue operation that generated this batch */
int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
+ sbool *active; /* which messages are active for processing, NULL=all */
sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */
batch_obj_t *pElem; /* batch elements */
};
@@ -129,7 +129,8 @@ batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) {
*/
static inline int
batchIsValidElem(batch_t *pBatch, int i) {
- return(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC);
+ return( (pBatch->active == NULL || pBatch->active[i])
+ && pBatch->pElem[i].state != BATCH_STATE_DISC);
}
diff --git a/runtime/queue.c b/runtime/queue.c
index bb9ea060..2108e231 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -976,6 +976,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
batch_t singleBatch;
batch_obj_t batchObj;
+ sbool active = 1;
int i;
DEFiRet;
@@ -994,9 +995,9 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
batchObj.pUsrp = (obj_t*) pUsr;
- batchObj.bFilterOK = 1;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
+ singleBatch.active = &active;
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
@@ -1596,7 +1597,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
/* all well, use this element */
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
- pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance
++nDequeued;
}