summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c22
-rw-r--r--runtime/batch.h15
-rw-r--r--runtime/queue.c13
-rw-r--r--runtime/ruleset.c24
-rw-r--r--tools/syslogd.c6
5 files changed, 44 insertions, 36 deletions
diff --git a/action.c b/action.c
index f6993626..07f3a6f1 100644
--- a/action.c
+++ b/action.c
@@ -1070,11 +1070,11 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
++iCommittedUpTo;
//pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
- pBatch->pElem[i].state = BATCH_STATE_SUB;
+ pBatch->eltState[i] = BATCH_STATE_SUB;
} else if(localRet == RS_RET_DEFER_COMMIT) {
- pBatch->pElem[i].state = BATCH_STATE_SUB;
+ pBatch->eltState[i] = BATCH_STATE_SUB;
} else if(localRet == RS_RET_DISCARDMSG) {
- pBatch->pElem[i].state = BATCH_STATE_DISC;
+ pBatch->eltState[i] = BATCH_STATE_DISC;
} else {
dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n",
localRet, *pnElem, iCommittedUpTo);
@@ -1136,9 +1136,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
} else if(localRet == RS_RET_ACTION_FAILED) {
/* in this case, everything not yet committed is BAD */
for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
- if( pBatch->pElem[i].state != BATCH_STATE_DISC
- && pBatch->pElem[i].state != BATCH_STATE_COMM ) {
- pBatch->pElem[i].state = BATCH_STATE_BAD;
+ if( pBatch->eltState[i] != BATCH_STATE_DISC
+ && pBatch->eltState[i] != BATCH_STATE_COMM ) {
+ pBatch->eltState[i] = BATCH_STATE_BAD;
pBatch->pElem[i].bPrevWasSuspended = 1;
STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
}
@@ -1214,7 +1214,7 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
pElem = &(pBatch->pElem[i]);
if(batchIsValidElem(pBatch, i)) {
- pElem->state = BATCH_STATE_RDY;
+ pBatch->eltState[i] = BATCH_STATE_RDY;
if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) {
/* make sure we have our copy of "active" array */
if(!*bMustRestoreActivePtr) {
@@ -1547,7 +1547,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
copyActive(pBatch);
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if((pBatch->pElem[i].state == BATCH_STATE_DISC) || !pBatch->active[i])
+ if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i])
continue;
if(now == 0) {
now = datetime.GetTime(NULL); /* good time call - the only one done */
@@ -1625,7 +1625,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
bNeedSubmit = 1;
}
DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, i, batchIsValidElem(pBatch, i), pBatch->pElem[i].state,
+ pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i],
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
}
if(bNeedSubmit) {
@@ -1665,7 +1665,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
*/
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->pElem[i].state,
+ pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( batchIsValidElem(pBatch, i)
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
@@ -1693,7 +1693,7 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
pAction, module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->pElem[i].state,
+ pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( batchIsValidElem(pBatch, i)
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
diff --git a/runtime/batch.h b/runtime/batch.h
index 097e104a..03122bd0 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -46,7 +46,6 @@ typedef unsigned char batch_state_t;
*/
struct batch_obj_s {
msg_t *pMsg;
- batch_state_t state; /* associated state */
/* work variables for action processing; these are reused for each action (or block of
* actions)
*/
@@ -84,6 +83,13 @@ struct batch_s {
sbool *active; /* which messages are active for processing, NULL=all */
sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */
batch_obj_t *pElem; /* batch elements */
+ batch_state_t *eltState;/* state (array!) for individual objects.
+ NOTE: we have moved this out of batch_obj_t because we
+ get a *much* better cache hit ratio this way. So do not
+ move it back into this structure! Note that this is really
+ a HUGE saving, even if it doesn't look so (both profiler
+ data as well as practical tests indicate that!).
+ */
};
@@ -118,8 +124,8 @@ batchNumMsgs(batch_t *pBatch) {
*/
static inline void
batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) {
- if(pBatch->pElem[i].state != BATCH_STATE_DISC)
- pBatch->pElem[i].state = newState;
+ if(pBatch->eltState[i] != BATCH_STATE_DISC)
+ pBatch->eltState[i] = newState;
}
@@ -128,7 +134,7 @@ batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) {
*/
static inline int
batchIsValidElem(batch_t *pBatch, int i) {
- return( (pBatch->pElem[i].state != BATCH_STATE_DISC)
+ return( (pBatch->eltState[i] != BATCH_STATE_DISC)
&& (pBatch->active == NULL || pBatch->active[i]));
}
@@ -163,6 +169,7 @@ batchInit(batch_t *pBatch, int maxElem) {
pBatch->iDoneUpTo = 0;
pBatch->maxElem = maxElem;
CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
+ CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t)));
// TODO: replace calloc by inidividual writes?
finalize_it:
RETiRet;
diff --git a/runtime/queue.c b/runtime/queue.c
index 99fb5fbd..6df1c95e 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -130,7 +130,7 @@ static inline void displayBatchState(batch_t *pBatch)
{
int i;
for(i = 0 ; i < pBatch->nElem ; ++i) {
- DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state);
+ DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]);
}
}
@@ -941,6 +941,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
{
batch_t singleBatch;
batch_obj_t batchObj;
+ batch_state_t batchState = BATCH_STATE_RDY;
sbool active = 1;
int i;
DEFiRet;
@@ -958,10 +959,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
*/
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
- batchObj.state = BATCH_STATE_RDY;
batchObj.pMsg = pMsg;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
+ singleBatch.eltState = &batchState;
singleBatch.active = &active;
iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
@@ -1546,8 +1547,8 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
for(i = 0 ; i < pBatch->nElem ; ++i) {
pMsg = pBatch->pElem[i].pMsg;
- if( pBatch->pElem[i].state == BATCH_STATE_RDY
- || pBatch->pElem[i].state == BATCH_STATE_SUB) {
+ if( pBatch->eltState[i] == BATCH_STATE_RDY
+ || pBatch->eltState[i] == BATCH_STATE_SUB) {
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
++nEnqueued;
if(localRet != RS_RET_OK) {
@@ -1611,7 +1612,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
/* all well, use this element */
pWti->batch.pElem[nDequeued].pMsg = pMsg;
- pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY;
++nDequeued;
}
@@ -1915,7 +1916,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
MsgAddRef(pWti->batch.pElem[i].pMsg)));
- pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
+ pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */
}
/* but now cancellation is no longer permitted */
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 16cb7cb3..66b38fc9 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -181,7 +181,7 @@ processBatchMultiRuleset(batch_t *pBatch)
do {
bHaveUnprocessed = 0;
/* search for first unprocessed element */
- for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart)
+ for(iStart = 0 ; iStart < pBatch->nElem && pBatch->eltState[iStart] == BATCH_STATE_DISC ; ++iStart)
/* just search, no action */;
if(iStart == pBatch->nElem)
break; /* everything processed */
@@ -195,10 +195,10 @@ processBatchMultiRuleset(batch_t *pBatch)
if(batchElemGetRuleset(pBatch, i) == currRuleset) {
/* for performance reasons, we copy only those members that we actually need */
snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg;
- snglRuleBatch.pElem[iNew].state = pBatch->pElem[i].state;
+ snglRuleBatch.eltState[iNew] = pBatch->eltState[i];
++iNew;
/* We indicate the element also as done, so it will not be processed again */
- pBatch->pElem[i].state = BATCH_STATE_DISC;
+ pBatch->eltState[i] = BATCH_STATE_DISC;
} else {
bHaveUnprocessed = 1;
}
@@ -242,7 +242,7 @@ execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
struct var result;
DEFiRet;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->pElem[i].state != BATCH_STATE_DISC
+ if( pBatch->eltState[i] != BATCH_STATE_DISC
&& (active == NULL || active[i])) {
cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg);
msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname,
@@ -259,7 +259,7 @@ execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
int i;
DEFiRet;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->pElem[i].state != BATCH_STATE_DISC
+ if( pBatch->eltState[i] != BATCH_STATE_DISC
&& (active == NULL || active[i])) {
msgUnsetJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname);
}
@@ -277,9 +277,9 @@ execStop(batch_t *pBatch, sbool *active)
int i;
DEFiRet;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->pElem[i].state != BATCH_STATE_DISC
+ if( pBatch->eltState[i] != BATCH_STATE_DISC
&& (active == NULL || active[i])) {
- pBatch->pElem[i].state = BATCH_STATE_DISC;
+ pBatch->eltState[i] = BATCH_STATE_DISC;
}
}
RETiRet;
@@ -302,7 +302,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
if(*(pBatch->pbShutdownImmediate))
FINALIZE;
- if(pBatch->pElem[i].state == BATCH_STATE_DISC)
+ if(pBatch->eltState[i] == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
if(active == NULL || active[i]) {
bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg);
@@ -319,7 +319,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
if(*(pBatch->pbShutdownImmediate))
FINALIZE;
- if(pBatch->pElem[i].state != BATCH_STATE_DISC)
+ if(pBatch->eltState[i] != BATCH_STATE_DISC)
newAct[i] = !newAct[i];
}
scriptExec(stmt->d.s_if.t_else, pBatch, newAct);
@@ -341,7 +341,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
if(*(pBatch->pbShutdownImmediate))
return;
- if(pBatch->pElem[i].state == BATCH_STATE_DISC)
+ if(pBatch->eltState[i] == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
pMsg = pBatch->pElem[i].pMsg;
if(active == NULL || active[i]) {
@@ -364,7 +364,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
if(*(pBatch->pbShutdownImmediate))
return;
- if(pBatch->pElem[i].state != BATCH_STATE_DISC)
+ if(pBatch->eltState[i] != BATCH_STATE_DISC)
newAct[i] = !newAct[i];
}
scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct);
@@ -473,7 +473,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
if(*(pBatch->pbShutdownImmediate))
return;
- if(pBatch->pElem[i].state == BATCH_STATE_DISC)
+ if(pBatch->eltState[i] == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
if(active == NULL || active[i]) {
bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg);
diff --git a/tools/syslogd.c b/tools/syslogd.c
index bd5b52ca..a4b53d1f 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -527,7 +527,7 @@ preprocessBatch(batch_t *pBatch) {
if(!bIsPermitted) {
DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
fromHostFQDN);
- pBatch->pElem[i].state = BATCH_STATE_DISC;
+ pBatch->eltState[i] = BATCH_STATE_DISC;
} else {
/* save some of the info we obtained */
MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
@@ -538,7 +538,7 @@ preprocessBatch(batch_t *pBatch) {
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
DBGPRINTF("Message discarded, parsing error %d\n", localRet);
- pBatch->pElem[i].state = BATCH_STATE_DISC;
+ pBatch->eltState[i] = BATCH_STATE_DISC;
}
}
if(pMsg->pRuleset != batchRuleset)
@@ -573,7 +573,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu
//do not have this yet and so we emulate -- 2010-06-10
int i;
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
- pBatch->pElem[i].state = BATCH_STATE_COMM;
+ pBatch->eltState[i] = BATCH_STATE_COMM;
}
RETiRet;
}