diff options
Diffstat (limited to 'runtime/wti.h')
-rw-r--r-- | runtime/wti.h | 113 |
1 files changed, 112 insertions, 1 deletions
diff --git a/runtime/wti.h b/runtime/wti.h index bb4f56bc..813237fc 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -1,6 +1,6 @@ /* Definition of the worker thread instance (wti) class. * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2013 Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -26,11 +26,46 @@ #include "wtp.h" #include "obj.h" #include "batch.h" +#include "action.h" +#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */ +#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */ +#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */ +#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */ +#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */ +#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */ +/* note: 3 bit bit field --> highest value is 7! */ + +/* The following structure defines immutable parameters which need to + * be passed as action parameters. Note that the current implementation + * does NOT focus on performance, but on a simple PoC in order to get + * things going. TODO: Once it works, revisit this code and think about + * an array implementation. We also need to support other passing modes + * as well. -- gerhards, 2013-11-04 + */ +typedef struct actWrkrIParams { + struct actWrkrIParams *next; + int msgFlags; + /* following are caches to save allocs if not absolutely necessary */ + uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ + /* a cache to save malloc(), if not absolutely necessary */ + unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + /* and the same for the message length (if used) */ + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; +} actWrkrIParams_t; + typedef struct actWrkrInfo { action_t *pAction; void *actWrkrData; + uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */ + int iNbrResRtry; /* number of retries since last suspend */ + struct { + unsigned actState : 3; + } flags; + actWrkrIParams_t *iparamRoot; + actWrkrIParams_t *iparamLast; + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */ } actWrkrInfo_t; /* the worker thread instance class */ @@ -39,6 +74,7 @@ struct wti_s { pthread_t thrdID; /* thread ID */ int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */ sbool bAlwaysRunning; /* should this thread always run? */ + int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ @@ -62,4 +98,79 @@ PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); +static inline uint8_t +getActionStateByNbr(wti_t *pWti, int iActNbr) +{ + return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState); +} + +static inline uint8_t +getActionState(wti_t *pWti, action_t *pAction) +{ + return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState); +} + +static inline void +setActionState(wti_t *pWti, action_t *pAction, uint8_t newState) +{ + pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState; +} + +static inline uint16_t +getActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow); +} + +static inline void +setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val; +} + +static inline void +incActionResumeInRow(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++; +} + +static inline int +getActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry); +} + +static inline void +setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val; +} + +static inline void +incActionNbrResRtry(wti_t *pWti, action_t *pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++; +} + +static inline rsRetVal +wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams) +{ + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparams; + DEFiRet; + + wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); +dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr); + CHKmalloc(iparams = calloc(1, sizeof(actWrkrIParams_t))); + if(wrkrInfo->iparamLast == NULL) { + wrkrInfo->iparamLast = wrkrInfo->iparamRoot = iparams; + } else { + wrkrInfo->iparamLast->next = iparams; + wrkrInfo->iparamLast = iparams; + } + *piparams = iparams; + +finalize_it: + RETiRet; +} #endif /* #ifndef WTI_H_INCLUDED */ |