diff options
-rw-r--r-- | action.c | 126 | ||||
-rw-r--r-- | action.h | 12 | ||||
-rw-r--r-- | runtime/batch.h | 1 | ||||
-rw-r--r-- | runtime/conf.c | 1 | ||||
-rw-r--r-- | runtime/wti.h | 23 |
5 files changed, 88 insertions, 75 deletions
@@ -496,9 +496,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal) * returned string must not be modified. * rgerhards, 2009-05-07 */ -static uchar *getActStateName(action_t *pThis) +static uchar *getActStateName(action_t *pThis, wti_t *pWti) { - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: return (uchar*) "rdy"; case ACT_STATE_ITX: @@ -520,12 +520,12 @@ static uchar *getActStateName(action_t *pThis) /* returns a suitable return code based on action state * rgerhards, 2009-05-07 */ -static rsRetVal getReturnCode(action_t *pThis) +static rsRetVal getReturnCode(action_t *pThis, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - switch(pThis->eState) { + switch(getActionState(pWti, pThis)) { case ACT_STATE_RDY: iRet = RS_RET_OK; break; @@ -545,8 +545,8 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_ACTION_FAILED; break; default: - DBGPRINTF("Invalid action engine state %d, program error\n", - (int) pThis->eState); + DBGPRINTF("Invalid action engine state %u, program error\n", + getActionState(pWti, pThis)); iRet = RS_RET_ERR; break; } @@ -558,28 +558,29 @@ static rsRetVal getReturnCode(action_t *pThis) /* set the action to a new state * rgerhards, 2007-08-02 */ -static inline void actionSetState(action_t *pThis, action_state_t newState) +static inline void +actionSetState(action_t *pThis, wti_t *pWti, uint8_t newState) { - pThis->eState = newState; - DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); + setActionState(pWti, pThis, newState); + DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis, pWti)); } /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ -static void actionCommitted(action_t *pThis) +static void actionCommitted(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } /* set action to "rtry" state. * rgerhards, 2007-08-02 */ -static void actionRetry(action_t *pThis) +static void actionRetry(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_RTRY); + actionSetState(pThis, pWti, ACT_STATE_RTRY); pThis->iResumeOKinRow++; } @@ -589,9 +590,9 @@ static void actionRetry(action_t *pThis) * depends on output module. * rgerhards, 2007-08-02 */ -static void actionDisable(action_t *pThis) +static void actionDisable(action_t *pThis, wti_t *pWti) { - actionSetState(pThis, ACT_STATE_DIED); + actionSetState(pThis, pWti, ACT_STATE_DIED); } @@ -603,7 +604,7 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis) +static inline void actionSuspend(action_t *pThis, wti_t *pWti) { time_t ttNow; @@ -612,7 +613,7 @@ static inline void actionSuspend(action_t *pThis) */ datetime.GetTime(&ttNow); pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); - actionSetState(pThis, ACT_STATE_SUSP); + actionSetState(pThis, pWti, ACT_STATE_SUSP); DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -633,7 +634,7 @@ static inline void actionSuspend(action_t *pThis) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate) +actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) { int iRetries; int iSleepPeriod; @@ -643,7 +644,7 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); iRet = pThis->pMod->tryResume(pThis->pModData); DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); @@ -655,13 +656,13 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); - actionSetState(pThis, ACT_STATE_RDY); + actionSetState(pThis, pWti, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { - actionSuspend(pThis); + actionSuspend(pThis, pWti); } else { ++pThis->iNbrResRtry; ++iRetries; @@ -672,11 +673,11 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) } } } else if(iRet == RS_RET_DISABLE_ACTION) { - actionDisable(pThis); + actionDisable(pThis, pWti); } } - if(pThis->eState == ACT_STATE_RDY) { + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { pThis->iNbrResRtry = 0; } @@ -696,6 +697,7 @@ dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), pThis->pModData)); pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ } finalize_it: RETiRet; @@ -704,14 +706,13 @@ finalize_it: /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal +actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; - ASSERT(pThis != NULL); - - if(pThis->eState == ACT_STATE_SUSP) { + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used * to use the action timestamp, but in this case we will never reach a @@ -721,19 +722,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) */ datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow >= pThis->ttResumeRtry) { - actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ + actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ } } - if(pThis->eState == ACT_STATE_RTRY) { + if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate)); } - if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", - pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); } finalize_it: @@ -751,22 +752,22 @@ static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, assert(pThis != NULL); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); + CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ - if(pThis->eState == ACT_STATE_RDY) { + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionSetState(pThis, ACT_STATE_ITX); + actionSetState(pThis, pWti, ACT_STATE_ITX); break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; default:FINALIZE; } @@ -780,7 +781,7 @@ finalize_it: /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ -rsRetVal actionDbgPrint(action_t *pThis) +static rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; char *sz; @@ -790,11 +791,12 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - if(pThis->eState == ACT_STATE_SUSP) { +#if 0 // do we need this ??? + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\tState: %s\n", getActStateName(pThis)); +#endif dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { sz = "slow, but feature-rich"; @@ -928,7 +930,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) ISOBJ_TYPE_assert(pMsg, msg); DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", - getActStateName(pThis), pThis->iActionNbr); + getActStateName(pThis, pWti), pThis->iActionNbr); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); @@ -937,7 +939,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); + actionCommitted(pThis, pWti); pThis->iResumeOKinRow = 0; /* we had a successful call! */ break; case RS_RET_DEFER_COMMIT: @@ -950,17 +952,17 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) pThis->iResumeOKinRow = 0; /* we had a successful call! */ break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) */ FINALIZE; } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; @@ -982,10 +984,10 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); - if(pThis->eState == ACT_STATE_ITX) + if(getActionState(pWti, pThis) == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } @@ -1003,17 +1005,17 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) ASSERT(pThis != NULL); - if(pThis->eState == ACT_STATE_RDY) { + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { /* we just need to flag the batch as commited */ FINALIZE; /* nothing to do */ } CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); - if(pThis->eState == ACT_STATE_ITX) { + if(getActionState(pWti, pThis) == ACT_STATE_ITX) { iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: - actionCommitted(pThis); + actionCommitted(pThis, pWti); /* flag messages as committed */ for(i = 0 ; i < pBatch->nElem ; ++i) { batchSetElemState(pBatch, i, BATCH_STATE_COMM); @@ -1021,20 +1023,20 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) } break; case RS_RET_SUSPENDED: - actionRetry(pThis); + actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: - actionDisable(pThis); + actionDisable(pThis, pWti); break; case RS_RET_DEFER_COMMIT: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; case RS_RET_PREVIOUS_COMMITTED: DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " "- ignored\n"); - actionCommitted(pThis); + actionCommitted(pThis, pWti); break; default:/* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) @@ -1042,7 +1044,7 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) FINALIZE; } } - iRet = getReturnCode(pThis); + iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; @@ -1383,7 +1385,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; - if(pAction->eState == ACT_STATE_DIED) { + if(getActionState(pWti, pAction) == ACT_STATE_DIED) { DBGPRINTF("action %p died, do NOT execute\n", pAction); FINALIZE; } @@ -1516,7 +1518,8 @@ DEFFUNC_llExecFunc(doActivateActions) errmsg.LogError(0, localRet, "file prefix (work directory?) " "is missing"); } - actionDisable(pThis); +#warning think how to handle this: + //actionDisable(pThis); } DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), pThis, pThis->pQueue); @@ -1874,11 +1877,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* check if the module is compatible with select features (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ - if(bSuspended) - actionSuspend(pAction); +#warning we need to look at the following + // Probably the core init needs to be done during createWrkrInstance() + //if(bSuspended) + // actionSuspend(pAction, pWti); CHKiRet(actionConstructFinalize(pAction, lst)); @@ -1973,7 +1976,6 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ } *ppAction = pAction; @@ -35,15 +35,6 @@ extern int glbliActionResumeRetryCount; -typedef enum { - ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZERO! */ - ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */ - ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */ - ACT_STATE_COMM = 3, /* transaction finished (a transient state) */ - ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */ - ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */ -} action_state_t; - /* the following struct defines the action object data structure */ struct action_s { @@ -55,7 +46,6 @@ struct action_s { sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */ sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */ int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */ - action_state_t eState; /* current state of action */ sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */ time_t ttResumeRtry; /* when is it time to retry the resume? */ int iResumeOKinRow; /* number of times in a row that resume said OK with an immediate failure following */ @@ -93,7 +83,7 @@ struct action_s { rsRetVal actionConstruct(action_t **ppThis); rsRetVal actionConstructFinalize(action_t *pThis, struct nvlst *lst); rsRetVal actionDestruct(action_t *pThis); -rsRetVal actionDbgPrint(action_t *pThis); +//rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*); diff --git a/runtime/batch.h b/runtime/batch.h index 2ec07670..4120cc51 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -171,7 +171,6 @@ batchInit(batch_t *pBatch, int maxElem) { pBatch->maxElem = maxElem; CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t))); - // TODO: replace calloc by inidividual writes? finalize_it: RETiRet; } diff --git a/runtime/conf.c b/runtime/conf.c index 2b000c60..1544e364 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -523,7 +523,6 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction) (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* here check if the module is compatible with select features * (currently, we have no such features!) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ conf->actions.nbrActions++; /* one more active action! */ } break; diff --git a/runtime/wti.h b/runtime/wti.h index bb4f56bc..53f1c9fd 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -26,11 +26,22 @@ #include "wtp.h" #include "obj.h" #include "batch.h" +#include "action.h" +#define ACT_STATE_DIED 0 /* action permanently failed and now disabled - MUST BE ZERO! */ +#define ACT_STATE_RDY 1 /* action ready, waiting for new transaction */ +#define ACT_STATE_ITX 2 /* transaction active, waiting for new data or commit */ +#define ACT_STATE_COMM 3 /* transaction finished (a transient state) */ +#define ACT_STATE_RTRY 4 /* failure occured, trying to restablish ready state */ +#define ACT_STATE_SUSP 5 /* suspended due to failure (return fail until timeout expired) */ + typedef struct actWrkrInfo { action_t *pAction; void *actWrkrData; + struct { + unsigned actState : 3; + } flags; } actWrkrInfo_t; /* the worker thread instance class */ @@ -62,4 +73,16 @@ PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); +static inline uint8_t +getActionState(wti_t *pWti, action_t *pAction) +{ + return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState); +} + +static inline void +setActionState(wti_t *pWti, action_t *pAction, uint8_t newState) +{ + pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState; +} + #endif /* #ifndef WTI_H_INCLUDED */ |