diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/batch.h | 14 | ||||
-rw-r--r-- | runtime/conf.c | 1 | ||||
-rw-r--r-- | runtime/queue.c | 4 | ||||
-rw-r--r-- | runtime/ruleset.c | 33 | ||||
-rw-r--r-- | runtime/wti.h | 74 |
5 files changed, 107 insertions, 19 deletions
diff --git a/runtime/batch.h b/runtime/batch.h index 2ec07670..fac0158e 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -46,17 +46,6 @@ typedef unsigned char batch_state_t; */ struct batch_obj_s { msg_t *pMsg; - /* work variables for action processing; these are reused for each action (or block of - * actions) - */ - sbool bPrevWasSuspended; - /* following are caches to save allocs if not absolutely necessary */ - uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ - /* a cache to save malloc(), if not absolutely necessary */ - void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */ - size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; - /* and the same for the message length (if used) */ - /* end action work variables */ }; /* the batch @@ -152,7 +141,7 @@ batchFree(batch_t *pBatch) { /* staticActParams MUST be freed immediately (if required), * so we do not need to do that! */ - free(pBatch->pElem[i].staticActStrings[j]); + //TODO: do this in wti! free(pBatch->pElem[i].staticActStrings[j]); } } free(pBatch->pElem); @@ -171,7 +160,6 @@ batchInit(batch_t *pBatch, int maxElem) { pBatch->maxElem = maxElem; CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t))); - // TODO: replace calloc by inidividual writes? finalize_it: RETiRet; } diff --git a/runtime/conf.c b/runtime/conf.c index 2b000c60..1544e364 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -523,7 +523,6 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction) (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* here check if the module is compatible with select features * (currently, we have no such features!) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ conf->actions.nbrActions++; /* one more active action! */ } break; diff --git a/runtime/queue.c b/runtime/queue.c index 968c016e..93661e41 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -987,10 +987,6 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) singleBatch.eltState = &batchState; singleBatch.active = &active; iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); - /* delete the batch string params: TODO: create its own "class" for this */ - for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { - free(batchObj.staticActStrings[i]); - } msgDestruct(&pMsg); RETiRet; diff --git a/runtime/ruleset.c b/runtime/ruleset.c index db253d28..2ad21170 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -228,10 +228,40 @@ static inline void freeActive(sbool *active) { free(active); } static rsRetVal execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { + int i; DEFiRet; dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); pBatch->active = active; - stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti); +// TODO: check here if bPrevWasSuspsended was required and, if so +// if we actually are permitted to execute this action. + //if(pAction->bExecWhenPrevSusp) { + + + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + DBGPRINTF("action %d: valid:%d state:%d execWhenPrev:%d\n", + stmt->d.act->iActionNbr, batchIsValidElem(pBatch, i), pBatch->eltState[i], + stmt->d.act->bExecWhenPrevSusp); + if(batchIsValidElem(pBatch, i)) { + stmt->d.act->submitToActQ(stmt->d.act, pWti, pBatch->pElem[i].pMsg); + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + } + } + + +#warning implement action return code checking +// we should store the return code and make it available +// to users via a special function (or maybe variable) +// internally, we can use this for bPrevWasSuspended checking +// to implement this system, we need to keep a kind of +// "execution state" when running the rule engine. This most +// probably is best done inside the wti object. +// I think in v7 there was a bug, so that bPrevWasSuspended did +// not properly make it onto the next batch (because it was +// stored within the batch state) -- but even if so, the +// exposure window was minimal, as the action would probably +// fail the next time again. [TODO: check if batch object survived +// end of batch, in which case it was probably correctly handled] RETiRet; } @@ -582,6 +612,7 @@ processBatch(batch_t *pBatch, wti_t *pWti) CHKiRet(processBatchMultiRuleset(pBatch, pWti)); } + actionCommitAll(pWti); finalize_it: DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet); RETiRet; diff --git a/runtime/wti.h b/runtime/wti.h index bb4f56bc..79b62102 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -26,11 +26,32 @@ #include "wtp.h" #include "obj.h" #include "batch.h" +#include "action.h" +#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */ +#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */ +#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */ +#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */ +#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */ +#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */ +/* note: 3 bit bit field --> highest value is 7! */ + typedef struct actWrkrInfo { action_t *pAction; void *actWrkrData; + uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */ + int iNbrResRtry; /* number of retries since last suspend */ + struct { + unsigned actState : 3; + } flags; + /* following are caches to save allocs if not absolutely necessary */ + uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ + /* a cache to save malloc(), if not absolutely necessary */ + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */ + size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + /* and the same for the message length (if used) */ + /* end action work variables */ } actWrkrInfo_t; /* the worker thread instance class */ @@ -62,4 +83,57 @@ PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); +static inline uint8_t +getActionStateByNbr(wti_t *pWti, int iActNbr) +{ + return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState); +} + +static inline uint8_t +getActionState(wti_t *pWti, action_t *pAction) +{ + return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState); +} + +static inline void +setActionState(wti_t *pWti, action_t *pAction, uint8_t newState) +{ + pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState; +} + +static inline uint16_t +getActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow); +} + +static inline void +setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val; +} + +static inline void +incActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++; +} + +static inline int +getActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry); +} + +static inline void +setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val; +} + +static inline void +incActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++; +} #endif /* #ifndef WTI_H_INCLUDED */ |