diff options
-rw-r--r-- | action.c | 22 | ||||
-rw-r--r-- | runtime/batch.h | 15 | ||||
-rw-r--r-- | runtime/queue.c | 13 | ||||
-rw-r--r-- | runtime/ruleset.c | 24 | ||||
-rw-r--r-- | tools/syslogd.c | 6 |
5 files changed, 44 insertions, 36 deletions
@@ -1070,11 +1070,11 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) ++iCommittedUpTo; //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } - pBatch->pElem[i].state = BATCH_STATE_SUB; + pBatch->eltState[i] = BATCH_STATE_SUB; } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->pElem[i].state = BATCH_STATE_SUB; + pBatch->eltState[i] = BATCH_STATE_SUB; } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->pElem[i].state = BATCH_STATE_DISC; + pBatch->eltState[i] = BATCH_STATE_DISC; } else { dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", localRet, *pnElem, iCommittedUpTo); @@ -1136,9 +1136,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) } else if(localRet == RS_RET_ACTION_FAILED) { /* in this case, everything not yet committed is BAD */ for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->pElem[i].state != BATCH_STATE_DISC - && pBatch->pElem[i].state != BATCH_STATE_COMM ) { - pBatch->pElem[i].state = BATCH_STATE_BAD; + if( pBatch->eltState[i] != BATCH_STATE_DISC + && pBatch->eltState[i] != BATCH_STATE_COMM ) { + pBatch->eltState[i] = BATCH_STATE_BAD; pBatch->pElem[i].bPrevWasSuspended = 1; STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); } @@ -1214,7 +1214,7 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { pElem = &(pBatch->pElem[i]); if(batchIsValidElem(pBatch, i)) { - pElem->state = BATCH_STATE_RDY; + pBatch->eltState[i] = BATCH_STATE_RDY; if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { /* make sure we have our copy of "active" array */ if(!*bMustRestoreActivePtr) { @@ -1547,7 +1547,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) copyActive(pBatch); for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if((pBatch->pElem[i].state == BATCH_STATE_DISC) || !pBatch->active[i]) + if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) continue; if(now == 0) { now = datetime.GetTime(NULL); /* good time call - the only one done */ @@ -1625,7 +1625,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) bNeedSubmit = 1; } DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, batchIsValidElem(pBatch, i), pBatch->pElem[i].state, + pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); } if(bNeedSubmit) { @@ -1665,7 +1665,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) */ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->pElem[i].state, + pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { @@ -1693,7 +1693,7 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) pAction, module.GetStateName(pAction->pMod)); for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, batchIsValidElem(pBatch, i), pBatch->pElem[i].state, + pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { diff --git a/runtime/batch.h b/runtime/batch.h index 097e104a..03122bd0 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -46,7 +46,6 @@ typedef unsigned char batch_state_t; */ struct batch_obj_s { msg_t *pMsg; - batch_state_t state; /* associated state */ /* work variables for action processing; these are reused for each action (or block of * actions) */ @@ -84,6 +83,13 @@ struct batch_s { sbool *active; /* which messages are active for processing, NULL=all */ sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */ batch_obj_t *pElem; /* batch elements */ + batch_state_t *eltState;/* state (array!) for individual objects. + NOTE: we have moved this out of batch_obj_t because we + get a *much* better cache hit ratio this way. So do not + move it back into this structure! Note that this is really + a HUGE saving, even if it doesn't look so (both profiler + data as well as practical tests indicate that!). + */ }; @@ -118,8 +124,8 @@ batchNumMsgs(batch_t *pBatch) { */ static inline void batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) { - if(pBatch->pElem[i].state != BATCH_STATE_DISC) - pBatch->pElem[i].state = newState; + if(pBatch->eltState[i] != BATCH_STATE_DISC) + pBatch->eltState[i] = newState; } @@ -128,7 +134,7 @@ batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) { */ static inline int batchIsValidElem(batch_t *pBatch, int i) { - return( (pBatch->pElem[i].state != BATCH_STATE_DISC) + return( (pBatch->eltState[i] != BATCH_STATE_DISC) && (pBatch->active == NULL || pBatch->active[i])); } @@ -163,6 +169,7 @@ batchInit(batch_t *pBatch, int maxElem) { pBatch->iDoneUpTo = 0; pBatch->maxElem = maxElem; CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); + CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t))); // TODO: replace calloc by inidividual writes? finalize_it: RETiRet; diff --git a/runtime/queue.c b/runtime/queue.c index 99fb5fbd..6df1c95e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -130,7 +130,7 @@ static inline void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { - DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]); } } @@ -941,6 +941,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) { batch_t singleBatch; batch_obj_t batchObj; + batch_state_t batchState = BATCH_STATE_RDY; sbool active = 1; int i; DEFiRet; @@ -958,10 +959,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) */ memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); - batchObj.state = BATCH_STATE_RDY; batchObj.pMsg = pMsg; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; + singleBatch.eltState = &batchState; singleBatch.active = &active; iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ @@ -1546,8 +1547,8 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) for(i = 0 ; i < pBatch->nElem ; ++i) { pMsg = pBatch->pElem[i].pMsg; - if( pBatch->pElem[i].state == BATCH_STATE_RDY - || pBatch->pElem[i].state == BATCH_STATE_SUB) { + if( pBatch->eltState[i] == BATCH_STATE_RDY + || pBatch->eltState[i] == BATCH_STATE_SUB) { localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { @@ -1611,7 +1612,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz /* all well, use this element */ pWti->batch.pElem[nDequeued].pMsg = pMsg; - pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY; ++nDequeued; } @@ -1915,7 +1916,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg))); - pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ + pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */ } /* but now cancellation is no longer permitted */ diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 16cb7cb3..66b38fc9 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -181,7 +181,7 @@ processBatchMultiRuleset(batch_t *pBatch) do { bHaveUnprocessed = 0; /* search for first unprocessed element */ - for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart) + for(iStart = 0 ; iStart < pBatch->nElem && pBatch->eltState[iStart] == BATCH_STATE_DISC ; ++iStart) /* just search, no action */; if(iStart == pBatch->nElem) break; /* everything processed */ @@ -195,10 +195,10 @@ processBatchMultiRuleset(batch_t *pBatch) if(batchElemGetRuleset(pBatch, i) == currRuleset) { /* for performance reasons, we copy only those members that we actually need */ snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg; - snglRuleBatch.pElem[iNew].state = pBatch->pElem[i].state; + snglRuleBatch.eltState[iNew] = pBatch->eltState[i]; ++iNew; /* We indicate the element also as done, so it will not be processed again */ - pBatch->pElem[i].state = BATCH_STATE_DISC; + pBatch->eltState[i] = BATCH_STATE_DISC; } else { bHaveUnprocessed = 1; } @@ -242,7 +242,7 @@ execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) struct var result; DEFiRet; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->pElem[i].state != BATCH_STATE_DISC + if( pBatch->eltState[i] != BATCH_STATE_DISC && (active == NULL || active[i])) { cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg); msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname, @@ -259,7 +259,7 @@ execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) int i; DEFiRet; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->pElem[i].state != BATCH_STATE_DISC + if( pBatch->eltState[i] != BATCH_STATE_DISC && (active == NULL || active[i])) { msgUnsetJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname); } @@ -277,9 +277,9 @@ execStop(batch_t *pBatch, sbool *active) int i; DEFiRet; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->pElem[i].state != BATCH_STATE_DISC + if( pBatch->eltState[i] != BATCH_STATE_DISC && (active == NULL || active[i])) { - pBatch->pElem[i].state = BATCH_STATE_DISC; + pBatch->eltState[i] = BATCH_STATE_DISC; } } RETiRet; @@ -302,7 +302,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { if(*(pBatch->pbShutdownImmediate)) FINALIZE; - if(pBatch->pElem[i].state == BATCH_STATE_DISC) + if(pBatch->eltState[i] == BATCH_STATE_DISC) continue; /* will be ignored in any case */ if(active == NULL || active[i]) { bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg); @@ -319,7 +319,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { if(*(pBatch->pbShutdownImmediate)) FINALIZE; - if(pBatch->pElem[i].state != BATCH_STATE_DISC) + if(pBatch->eltState[i] != BATCH_STATE_DISC) newAct[i] = !newAct[i]; } scriptExec(stmt->d.s_if.t_else, pBatch, newAct); @@ -341,7 +341,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { if(*(pBatch->pbShutdownImmediate)) return; - if(pBatch->pElem[i].state == BATCH_STATE_DISC) + if(pBatch->eltState[i] == BATCH_STATE_DISC) continue; /* will be ignored in any case */ pMsg = pBatch->pElem[i].pMsg; if(active == NULL || active[i]) { @@ -364,7 +364,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { if(*(pBatch->pbShutdownImmediate)) return; - if(pBatch->pElem[i].state != BATCH_STATE_DISC) + if(pBatch->eltState[i] != BATCH_STATE_DISC) newAct[i] = !newAct[i]; } scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct); @@ -473,7 +473,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { if(*(pBatch->pbShutdownImmediate)) return; - if(pBatch->pElem[i].state == BATCH_STATE_DISC) + if(pBatch->eltState[i] == BATCH_STATE_DISC) continue; /* will be ignored in any case */ if(active == NULL || active[i]) { bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg); diff --git a/tools/syslogd.c b/tools/syslogd.c index bd5b52ca..a4b53d1f 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -527,7 +527,7 @@ preprocessBatch(batch_t *pBatch) { if(!bIsPermitted) { DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", fromHostFQDN); - pBatch->pElem[i].state = BATCH_STATE_DISC; + pBatch->eltState[i] = BATCH_STATE_DISC; } else { /* save some of the info we obtained */ MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); @@ -538,7 +538,7 @@ preprocessBatch(batch_t *pBatch) { if((pMsg->msgFlags & NEEDS_PARSING) != 0) { if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { DBGPRINTF("Message discarded, parsing error %d\n", localRet); - pBatch->pElem[i].state = BATCH_STATE_DISC; + pBatch->eltState[i] = BATCH_STATE_DISC; } } if(pMsg->pRuleset != batchRuleset) @@ -573,7 +573,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu //do not have this yet and so we emulate -- 2010-06-10 int i; for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { - pBatch->pElem[i].state = BATCH_STATE_COMM; + pBatch->eltState[i] = BATCH_STATE_COMM; } RETiRet; } |