summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/batch.h14
-rw-r--r--runtime/conf.c1
-rw-r--r--runtime/queue.c4
-rw-r--r--runtime/ruleset.c33
-rw-r--r--runtime/wti.h74
5 files changed, 107 insertions, 19 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index 2ec07670..fac0158e 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -46,17 +46,6 @@ typedef unsigned char batch_state_t;
*/
struct batch_obj_s {
msg_t *pMsg;
- /* work variables for action processing; these are reused for each action (or block of
- * actions)
- */
- sbool bPrevWasSuspended;
- /* following are caches to save allocs if not absolutely necessary */
- uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
- /* a cache to save malloc(), if not absolutely necessary */
- void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */
- size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
- /* and the same for the message length (if used) */
- /* end action work variables */
};
/* the batch
@@ -152,7 +141,7 @@ batchFree(batch_t *pBatch) {
/* staticActParams MUST be freed immediately (if required),
* so we do not need to do that!
*/
- free(pBatch->pElem[i].staticActStrings[j]);
+ //TODO: do this in wti! free(pBatch->pElem[i].staticActStrings[j]);
}
}
free(pBatch->pElem);
@@ -171,7 +160,6 @@ batchInit(batch_t *pBatch, int maxElem) {
pBatch->maxElem = maxElem;
CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t)));
- // TODO: replace calloc by inidividual writes?
finalize_it:
RETiRet;
}
diff --git a/runtime/conf.c b/runtime/conf.c
index 2b000c60..1544e364 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -523,7 +523,6 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* here check if the module is compatible with select features
* (currently, we have no such features!) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
conf->actions.nbrActions++; /* one more active action! */
}
break;
diff --git a/runtime/queue.c b/runtime/queue.c
index 968c016e..93661e41 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -987,10 +987,6 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
singleBatch.eltState = &batchState;
singleBatch.active = &active;
iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
- /* delete the batch string params: TODO: create its own "class" for this */
- for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
- free(batchObj.staticActStrings[i]);
- }
msgDestruct(&pMsg);
RETiRet;
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index db253d28..2ad21170 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -228,10 +228,40 @@ static inline void freeActive(sbool *active) { free(active); }
static rsRetVal
execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
{
+ int i;
DEFiRet;
dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active);
pBatch->active = active;
- stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti);
+// TODO: check here if bPrevWasSuspsended was required and, if so
+// if we actually are permitted to execute this action.
+ //if(pAction->bExecWhenPrevSusp) {
+
+
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ DBGPRINTF("action %d: valid:%d state:%d execWhenPrev:%d\n",
+ stmt->d.act->iActionNbr, batchIsValidElem(pBatch, i), pBatch->eltState[i],
+ stmt->d.act->bExecWhenPrevSusp);
+ if(batchIsValidElem(pBatch, i)) {
+ stmt->d.act->submitToActQ(stmt->d.act, pWti, pBatch->pElem[i].pMsg);
+ // TODO: we must refactor this! flag messages as committed
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ }
+ }
+
+
+#warning implement action return code checking
+// we should store the return code and make it available
+// to users via a special function (or maybe variable)
+// internally, we can use this for bPrevWasSuspended checking
+// to implement this system, we need to keep a kind of
+// "execution state" when running the rule engine. This most
+// probably is best done inside the wti object.
+// I think in v7 there was a bug, so that bPrevWasSuspended did
+// not properly make it onto the next batch (because it was
+// stored within the batch state) -- but even if so, the
+// exposure window was minimal, as the action would probably
+// fail the next time again. [TODO: check if batch object survived
+// end of batch, in which case it was probably correctly handled]
RETiRet;
}
@@ -582,6 +612,7 @@ processBatch(batch_t *pBatch, wti_t *pWti)
CHKiRet(processBatchMultiRuleset(pBatch, pWti));
}
+ actionCommitAll(pWti);
finalize_it:
DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet);
RETiRet;
diff --git a/runtime/wti.h b/runtime/wti.h
index bb4f56bc..79b62102 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -26,11 +26,32 @@
#include "wtp.h"
#include "obj.h"
#include "batch.h"
+#include "action.h"
+#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */
+#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */
+#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */
+#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */
+#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */
+#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */
+/* note: 3 bit bit field --> highest value is 7! */
+
typedef struct actWrkrInfo {
action_t *pAction;
void *actWrkrData;
+ uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */
+ int iNbrResRtry; /* number of retries since last suspend */
+ struct {
+ unsigned actState : 3;
+ } flags;
+ /* following are caches to save allocs if not absolutely necessary */
+ uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
+ /* a cache to save malloc(), if not absolutely necessary */
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */
+ size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ /* and the same for the message length (if used) */
+ /* end action work variables */
} actWrkrInfo_t;
/* the worker thread instance class */
@@ -62,4 +83,57 @@ PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
+static inline uint8_t
+getActionStateByNbr(wti_t *pWti, int iActNbr)
+{
+ return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState);
+}
+
+static inline uint8_t
+getActionState(wti_t *pWti, action_t *pAction)
+{
+ return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState);
+}
+
+static inline void
+setActionState(wti_t *pWti, action_t *pAction, uint8_t newState)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState;
+}
+
+static inline uint16_t
+getActionResumeInRow(wti_t *pWti, action_t *pAction)
+{
+ return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow);
+}
+
+static inline void
+setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val;
+}
+
+static inline void
+incActionResumeInRow(wti_t *pWti, action_t *pAction)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++;
+}
+
+static inline int
+getActionNbrResRtry(wti_t *pWti, action_t *pAction)
+{
+ return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry);
+}
+
+static inline void
+setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val;
+}
+
+static inline void
+incActionNbrResRtry(wti_t *pWti, action_t *pAction)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++;
+}
#endif /* #ifndef WTI_H_INCLUDED */