summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/ruleset.c13
-rw-r--r--runtime/wti.c1
-rw-r--r--runtime/wti.h52
3 files changed, 55 insertions, 11 deletions
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 2ad21170..654c6e55 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -588,8 +588,14 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti)
}
+static void
+commitBatch(wti_t *pWti)
+{
+ actionCommitAll(pWti);
+}
+
/* Process (consume) a batch of messages. Calls the actions configured.
- * If the whole batch uses a singel ruleset, we can process the batch as
+ * If the whole batch uses a single ruleset, we can process the batch as
* a whole. Otherwise, we need to process it slower, on a message-by-message
* basis (what can be optimized to a per-ruleset basis)
* rgerhards, 2005-10-13
@@ -602,6 +608,8 @@ processBatch(batch_t *pBatch, wti_t *pWti)
assert(pBatch != NULL);
DBGPRINTF("processBatch: batch of %d elements must be processed\n", pBatch->nElem);
+
+ /* execution phase */
if(pBatch->bSingleRuleset) {
pThis = batchGetRuleset(pBatch);
if(pThis == NULL)
@@ -612,7 +620,8 @@ processBatch(batch_t *pBatch, wti_t *pWti)
CHKiRet(processBatchMultiRuleset(pBatch, pWti));
}
- actionCommitAll(pWti);
+ /* commit phase */
+ commitBatch(pWti);
finalize_it:
DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet);
RETiRet;
diff --git a/runtime/wti.c b/runtime/wti.c
index df77bc19..ddffd81a 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -391,7 +391,6 @@ finalize_it:
RETiRet;
}
-
/* dummy */
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
diff --git a/runtime/wti.h b/runtime/wti.h
index 79b62102..cd2fefab 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -1,6 +1,6 @@
/* Definition of the worker thread instance (wti) class.
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -37,6 +37,24 @@
#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */
/* note: 3 bit bit field --> highest value is 7! */
+/* The following structure defines immutable parameters which need to
+ * be passed as action parameters. Note that the current implementation
+ * does NOT focus on performance, but on a simple PoC in order to get
+ * things going. TODO: Once it works, revisit this code and think about
+ * an array implementation. We also need to support other passing modes
+ * as well. -- gerhards, 2013-11-04
+ */
+typedef struct actWrkrIParams {
+ struct actWrkrIParams *next;
+ int msgFlags;
+ /* following are caches to save allocs if not absolutely necessary */
+ uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
+ /* a cache to save malloc(), if not absolutely necessary */
+ unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ /* and the same for the message length (if used) */
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+} actWrkrIParams_t;
+
typedef struct actWrkrInfo {
action_t *pAction;
void *actWrkrData;
@@ -45,13 +63,9 @@ typedef struct actWrkrInfo {
struct {
unsigned actState : 3;
} flags;
- /* following are caches to save allocs if not absolutely necessary */
- uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
- /* a cache to save malloc(), if not absolutely necessary */
- void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */
- size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
- /* and the same for the message length (if used) */
- /* end action work variables */
+ actWrkrIParams_t *iparamRoot;
+ actWrkrIParams_t *iparamLast;
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */
} actWrkrInfo_t;
/* the worker thread instance class */
@@ -136,4 +150,26 @@ incActionNbrResRtry(wti_t *pWti, action_t *pAction)
{
pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++;
}
+
+static inline rsRetVal
+wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams)
+{
+ actWrkrInfo_t *wrkrInfo;
+ actWrkrIParams_t *iparams;
+ DEFiRet;
+
+ wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr);
+ CHKmalloc(iparams = calloc(1, sizeof(actWrkrIParams_t)));
+ if(wrkrInfo->iparamLast == NULL) {
+ wrkrInfo->iparamLast = wrkrInfo->iparamRoot = iparams;
+ } else {
+ wrkrInfo->iparamLast->next = iparams;
+ wrkrInfo->iparamLast = iparams;
+ }
+ *piparams = iparams;
+
+finalize_it:
+ RETiRet;
+}
#endif /* #ifndef WTI_H_INCLUDED */