summaryrefslogtreecommitdiffstats
path: root/action.h
diff options
context:
space:
mode:
Diffstat (limited to 'action.h')
-rw-r--r--action.h31
1 files changed, 12 insertions, 19 deletions
diff --git a/action.h b/action.h
index 20d7807b..7e921c63 100644
--- a/action.h
+++ b/action.h
@@ -33,15 +33,6 @@ extern int glbliActionResumeRetryCount;
extern int bActionReportSuspension;
-typedef enum {
- ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZERO! */
- ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */
- ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */
- ACT_STATE_COMM = 3, /* transaction finished (a transient state) */
- ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */
- ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */
-} action_state_t;
-
/* the following struct defines the action object data structure
*/
struct action_s {
@@ -49,16 +40,15 @@ struct action_s {
time_t tActNow; /* the current time for an action execution. Initially set to -1 and
populated on an as-needed basis. This is a performance optimization. */
time_t tLastExec; /* time this action was last executed */
+ int iActionNbr; /* this action's number (ID) */
sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */
sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */
- int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */
- action_state_t eState; /* current state of action */
sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */
+ sbool bDisabled;
+ int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */
time_t ttResumeRtry; /* when is it time to retry the resume? */
- int iResumeOKinRow; /* number of times in a row that resume said OK with an immediate failure following */
int iResumeInterval;/* resume interval for this action */
int iResumeRetryCount;/* how often shall we retry a suspended action? (-1 --> eternal) */
- int iNbrResRtry; /* number of retries since last suspend */
int iNbrNoExec; /* number of matches that did not yet yield to an exec */
int iExecEveryNthOccur;/* execute this action only every n-th occurence (with n=0,1 -> always) */
int iExecEveryNthOccurTO;/* timeout for n-th occurence feature */
@@ -66,7 +56,7 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
- rsRetVal (*submitToActQ)(action_t *, batch_t *);/* function submit message to action queue */
+ rsRetVal (*submitToActQ)(action_t *, wti_t*, msg_t*);/* function submit message to action queue */
rsRetVal (*qConstruct)(struct queue_s *pThis);
enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2,
ACT_JSON_PASSING = 3}
@@ -76,8 +66,7 @@ struct action_s {
* in this order. */
qqueue_t *pQueue; /* action queue */
pthread_mutex_t mutAction; /* primary action mutex */
- pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
- uchar *pszName; /* action name (for documentation) */
+ uchar *pszName; /* action name */
DEF_ATOMIC_HELPER_MUT(mutCAS);
/* for statistics subsystem */
statsobj_t *statsobj;
@@ -94,15 +83,19 @@ struct action_s {
rsRetVal actionConstruct(action_t **ppThis);
rsRetVal actionConstructFinalize(action_t *pThis, struct nvlst *lst);
rsRetVal actionDestruct(action_t *pThis);
-rsRetVal actionDbgPrint(action_t *pThis);
+//rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
-rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg);
+rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
-rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct nvlst *lst, int bSuspended);
+rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct nvlst *lst);
rsRetVal activateActions(void);
rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction);
rsRetVal actionProcessCnf(struct cnfobj *o);
+void actionCommitAllDirect(wti_t *pWti);
+
+/* external data */
+extern int iActionNbr;
#endif /* #ifndef ACTION_H_INCLUDED */