summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c126
-rw-r--r--action.h12
-rw-r--r--runtime/batch.h1
-rw-r--r--runtime/conf.c1
-rw-r--r--runtime/wti.h23
5 files changed, 88 insertions, 75 deletions
diff --git a/action.c b/action.c
index 76305478..7e314003 100644
--- a/action.c
+++ b/action.c
@@ -496,9 +496,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal)
* returned string must not be modified.
* rgerhards, 2009-05-07
*/
-static uchar *getActStateName(action_t *pThis)
+static uchar *getActStateName(action_t *pThis, wti_t *pWti)
{
- switch(pThis->eState) {
+ switch(getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
return (uchar*) "rdy";
case ACT_STATE_ITX:
@@ -520,12 +520,12 @@ static uchar *getActStateName(action_t *pThis)
/* returns a suitable return code based on action state
* rgerhards, 2009-05-07
*/
-static rsRetVal getReturnCode(action_t *pThis)
+static rsRetVal getReturnCode(action_t *pThis, wti_t *pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
- switch(pThis->eState) {
+ switch(getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
iRet = RS_RET_OK;
break;
@@ -545,8 +545,8 @@ static rsRetVal getReturnCode(action_t *pThis)
iRet = RS_RET_ACTION_FAILED;
break;
default:
- DBGPRINTF("Invalid action engine state %d, program error\n",
- (int) pThis->eState);
+ DBGPRINTF("Invalid action engine state %u, program error\n",
+ getActionState(pWti, pThis));
iRet = RS_RET_ERR;
break;
}
@@ -558,28 +558,29 @@ static rsRetVal getReturnCode(action_t *pThis)
/* set the action to a new state
* rgerhards, 2007-08-02
*/
-static inline void actionSetState(action_t *pThis, action_state_t newState)
+static inline void
+actionSetState(action_t *pThis, wti_t *pWti, uint8_t newState)
{
- pThis->eState = newState;
- DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis));
+ setActionState(pWti, pThis, newState);
+ DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis, pWti));
}
/* Handles the transient commit state. So far, this is
* mostly a dummy...
* rgerhards, 2007-08-02
*/
-static void actionCommitted(action_t *pThis)
+static void actionCommitted(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_RDY);
+ actionSetState(pThis, pWti, ACT_STATE_RDY);
}
/* set action to "rtry" state.
* rgerhards, 2007-08-02
*/
-static void actionRetry(action_t *pThis)
+static void actionRetry(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_RTRY);
+ actionSetState(pThis, pWti, ACT_STATE_RTRY);
pThis->iResumeOKinRow++;
}
@@ -589,9 +590,9 @@ static void actionRetry(action_t *pThis)
* depends on output module.
* rgerhards, 2007-08-02
*/
-static void actionDisable(action_t *pThis)
+static void actionDisable(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_DIED);
+ actionSetState(pThis, pWti, ACT_STATE_DIED);
}
@@ -603,7 +604,7 @@ static void actionDisable(action_t *pThis)
* CPU time. TODO: maybe a config option for that?
* rgerhards, 2007-08-02
*/
-static inline void actionSuspend(action_t *pThis)
+static inline void actionSuspend(action_t *pThis, wti_t *pWti)
{
time_t ttNow;
@@ -612,7 +613,7 @@ static inline void actionSuspend(action_t *pThis)
*/
datetime.GetTime(&ttNow);
pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
- actionSetState(pThis, ACT_STATE_SUSP);
+ actionSetState(pThis, pWti, ACT_STATE_SUSP);
DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry);
}
@@ -633,7 +634,7 @@ static inline void actionSuspend(action_t *pThis)
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
static inline rsRetVal
-actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
+actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
int iRetries;
int iSleepPeriod;
@@ -643,7 +644,7 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
ASSERT(pThis != NULL);
iRetries = 0;
- while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
+ while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
iRet = pThis->pMod->tryResume(pThis->pModData);
DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
@@ -655,13 +656,13 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
}
if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet);
- actionSetState(pThis, ACT_STATE_RDY);
+ actionSetState(pThis, pWti, ACT_STATE_RDY);
} else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
/* max retries reached? */
DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n",
pThis->iResumeRetryCount, iRetries);
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
- actionSuspend(pThis);
+ actionSuspend(pThis, pWti);
} else {
++pThis->iNbrResRtry;
++iRetries;
@@ -672,11 +673,11 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
}
}
} else if(iRet == RS_RET_DISABLE_ACTION) {
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
}
}
- if(pThis->eState == ACT_STATE_RDY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
pThis->iNbrResRtry = 0;
}
@@ -696,6 +697,7 @@ dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis
CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
pThis->pModData));
pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
+ setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */
}
finalize_it:
RETiRet;
@@ -704,14 +706,13 @@ finalize_it:
/* try to resume an action -- rgerhards, 2007-08-02
* changed to new action state engine -- rgerhards, 2009-05-07
*/
-static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
+static rsRetVal
+actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
- ASSERT(pThis != NULL);
-
- if(pThis->eState == ACT_STATE_SUSP) {
+ if(getActionState(pWti, pThis) == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
* to use the action timestamp, but in this case we will never reach a
@@ -721,19 +722,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
*/
datetime.GetTime(&ttNow); /* cache "now" */
if(ttNow >= pThis->ttResumeRtry) {
- actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */
+ actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */
}
}
- if(pThis->eState == ACT_STATE_RTRY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, pbShutdownImmediate));
+ CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate));
}
- if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
+ if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) {
DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n",
- pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
@@ -751,22 +752,22 @@ static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate,
assert(pThis != NULL);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
- CHKiRet(actionTryResume(pThis, pbShutdownImmediate));
+ CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
*/
- if(pThis->eState == ACT_STATE_RDY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionSetState(pThis, ACT_STATE_ITX);
+ actionSetState(pThis, pWti, ACT_STATE_ITX);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
default:FINALIZE;
}
@@ -780,7 +781,7 @@ finalize_it:
/* debug-print the contents of an action object
* rgerhards, 2007-08-02
*/
-rsRetVal actionDbgPrint(action_t *pThis)
+static rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
char *sz;
@@ -790,11 +791,12 @@ rsRetVal actionDbgPrint(action_t *pThis)
dbgprintf("\n");
dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
- if(pThis->eState == ACT_STATE_SUSP) {
+#if 0 // do we need this ???
+ if(getActionState(pWti, pThis) == ACT_STATE_SUSP) {
dbgprintf("\tresume next retry: %u, number retries: %d",
(unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
}
- dbgprintf("\tState: %s\n", getActStateName(pThis));
+#endif
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
if(pThis->submitToActQ == doSubmitToActionQComplexBatch) {
sz = "slow, but feature-rich";
@@ -928,7 +930,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
ISOBJ_TYPE_assert(pMsg, msg);
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
- getActStateName(pThis), pThis->iActionNbr);
+ getActStateName(pThis, pWti), pThis->iActionNbr);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
@@ -937,7 +939,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
pThis->iResumeOKinRow = 0; /* we had a successful call! */
break;
case RS_RET_DEFER_COMMIT:
@@ -950,17 +952,17 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
pThis->iResumeOKinRow = 0; /* we had a successful call! */
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
default:/* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
*/
FINALIZE;
}
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
@@ -982,10 +984,10 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd
CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
- if(pThis->eState == ACT_STATE_ITX)
+ if(getActionState(pWti, pThis) == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti));
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
@@ -1003,17 +1005,17 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
ASSERT(pThis != NULL);
- if(pThis->eState == ACT_STATE_RDY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
/* we just need to flag the batch as commited */
FINALIZE; /* nothing to do */
}
CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti));
- if(pThis->eState == ACT_STATE_ITX) {
+ if(getActionState(pWti, pThis) == ACT_STATE_ITX) {
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
/* flag messages as committed */
for(i = 0 ; i < pBatch->nElem ; ++i) {
batchSetElemState(pBatch, i, BATCH_STATE_COMM);
@@ -1021,20 +1023,20 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
}
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
case RS_RET_DEFER_COMMIT:
DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
"- ignored\n");
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
break;
case RS_RET_PREVIOUS_COMMITTED:
DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
"- ignored\n");
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
break;
default:/* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
@@ -1042,7 +1044,7 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
FINALIZE;
}
}
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
@@ -1383,7 +1385,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
- if(pAction->eState == ACT_STATE_DIED) {
+ if(getActionState(pWti, pAction) == ACT_STATE_DIED) {
DBGPRINTF("action %p died, do NOT execute\n", pAction);
FINALIZE;
}
@@ -1516,7 +1518,8 @@ DEFFUNC_llExecFunc(doActivateActions)
errmsg.LogError(0, localRet, "file prefix (work directory?) "
"is missing");
}
- actionDisable(pThis);
+#warning think how to handle this:
+ //actionDisable(pThis);
}
DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod),
pThis, pThis->pQueue);
@@ -1874,11 +1877,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
- /* check if the module is compatible with select features (currently no such features exist) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
- if(bSuspended)
- actionSuspend(pAction);
+#warning we need to look at the following
+ // Probably the core init needs to be done during createWrkrInstance()
+ //if(bSuspended)
+ // actionSuspend(pAction, pWti);
CHKiRet(actionConstructFinalize(pAction, lst));
@@ -1973,7 +1976,6 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* check if the module is compatible with select features
* (currently no such features exist) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
loadConf->actions.nbrActions++; /* one more active action! */
}
*ppAction = pAction;
diff --git a/action.h b/action.h
index 5c88b052..243f2f84 100644
--- a/action.h
+++ b/action.h
@@ -35,15 +35,6 @@
extern int glbliActionResumeRetryCount;
-typedef enum {
- ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZERO! */
- ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */
- ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */
- ACT_STATE_COMM = 3, /* transaction finished (a transient state) */
- ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */
- ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */
-} action_state_t;
-
/* the following struct defines the action object data structure
*/
struct action_s {
@@ -55,7 +46,6 @@ struct action_s {
sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */
sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */
int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */
- action_state_t eState; /* current state of action */
sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */
time_t ttResumeRtry; /* when is it time to retry the resume? */
int iResumeOKinRow; /* number of times in a row that resume said OK with an immediate failure following */
@@ -93,7 +83,7 @@ struct action_s {
rsRetVal actionConstruct(action_t **ppThis);
rsRetVal actionConstructFinalize(action_t *pThis, struct nvlst *lst);
rsRetVal actionDestruct(action_t *pThis);
-rsRetVal actionDbgPrint(action_t *pThis);
+//rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*);
diff --git a/runtime/batch.h b/runtime/batch.h
index 2ec07670..4120cc51 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -171,7 +171,6 @@ batchInit(batch_t *pBatch, int maxElem) {
pBatch->maxElem = maxElem;
CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t)));
- // TODO: replace calloc by inidividual writes?
finalize_it:
RETiRet;
}
diff --git a/runtime/conf.c b/runtime/conf.c
index 2b000c60..1544e364 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -523,7 +523,6 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* here check if the module is compatible with select features
* (currently, we have no such features!) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
conf->actions.nbrActions++; /* one more active action! */
}
break;
diff --git a/runtime/wti.h b/runtime/wti.h
index bb4f56bc..53f1c9fd 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -26,11 +26,22 @@
#include "wtp.h"
#include "obj.h"
#include "batch.h"
+#include "action.h"
+#define ACT_STATE_DIED 0 /* action permanently failed and now disabled - MUST BE ZERO! */
+#define ACT_STATE_RDY 1 /* action ready, waiting for new transaction */
+#define ACT_STATE_ITX 2 /* transaction active, waiting for new data or commit */
+#define ACT_STATE_COMM 3 /* transaction finished (a transient state) */
+#define ACT_STATE_RTRY 4 /* failure occured, trying to restablish ready state */
+#define ACT_STATE_SUSP 5 /* suspended due to failure (return fail until timeout expired) */
+
typedef struct actWrkrInfo {
action_t *pAction;
void *actWrkrData;
+ struct {
+ unsigned actState : 3;
+ } flags;
} actWrkrInfo_t;
/* the worker thread instance class */
@@ -62,4 +73,16 @@ PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
+static inline uint8_t
+getActionState(wti_t *pWti, action_t *pAction)
+{
+ return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState);
+}
+
+static inline void
+setActionState(wti_t *pWti, action_t *pAction, uint8_t newState)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState;
+}
+
#endif /* #ifndef WTI_H_INCLUDED */