summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c69
-rw-r--r--action.h2
-rw-r--r--runtime/ruleset.c13
-rw-r--r--runtime/wti.c1
-rw-r--r--runtime/wti.h52
5 files changed, 103 insertions, 34 deletions
diff --git a/action.c b/action.c
index 71ffd788..f57a7399 100644
--- a/action.c
+++ b/action.c
@@ -778,6 +778,7 @@ finalize_it:
}
+#if 0 // TODO: remove?
/* debug-print the contents of an action object
* rgerhards, 2007-08-02
*/
@@ -812,6 +813,7 @@ static rsRetVal actionDbgPrint(action_t *pThis)
RETiRet;
}
+#endif
/* prepare the calling parameters for doAction()
@@ -822,6 +824,7 @@ prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslog
{
int i;
struct json_object *json;
+ actWrkrIParams_t *iparams;
actWrkrInfo_t *pWrkrInfo;
DEFiRet;
@@ -830,9 +833,11 @@ prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslog
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->staticActStrings[i]),
- &pWrkrInfo->staticLenStrings[i], ttNow));
- pWrkrInfo->staticActParams[i] = pWrkrInfo->staticActStrings[i];
+ CHKiRet(wtiNewIParam(pWti, pAction, &iparams));
+ iparams->msgFlags = pMsg->msgFlags;
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(iparams->staticActStrings[i]),
+ &iparams->staticLenStrings[i], ttNow));
+ iparams->staticActParams[i] = iparams->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow));
@@ -905,12 +910,11 @@ done: return;
* rgerhards, 2008-01-28
*/
static rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
+actionCallDoAction(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
- ISOBJ_TYPE_assert(pMsg, msg);
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
getActStateName(pThis, pWti), pThis->iActionNbr);
@@ -918,7 +922,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags,
+ iRet = pThis->pMod->mod.om.doAction(actParams, msgFlags,
pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
@@ -956,19 +960,16 @@ finalize_it:
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-static rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
+rsRetVal
+actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
- ASSERT(pThis != NULL);
- ISOBJ_TYPE_assert(pMsg, msg);
-
CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
if(getActionState(pWti, pThis) == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti));
+ CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti));
iRet = getReturnCode(pThis, pWti);
finalize_it:
@@ -981,10 +982,24 @@ static rsRetVal
actionCommit(action_t *pThis, wti_t *pWti)
{
int pbShutdownImmediate = 1;
+ actWrkrInfo_t *wrkrInfo;
+ actWrkrIParams_t *iparamCurr, *iparamDel;
DEFiRet;
- if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
- FINALIZE; /* nothing to do */
+ wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
+ dbgprintf("DDDD: actionCommit: action %d, root %p\n", pThis->iActionNbr, wrkrInfo->iparamRoot);
+ if(wrkrInfo->iparamRoot != NULL) {
+ iparamCurr = wrkrInfo->iparamRoot;
+ while(iparamCurr != NULL) {
+ iRet = actionProcessMessage(pThis, iparamCurr->msgFlags,
+ iparamCurr->staticActParams,
+ &pbShutdownImmediate, pWti);
+ releaseDoActionParams(pThis, pWti);
+ iparamDel = iparamCurr;
+ iparamCurr = iparamCurr->next;
+ free(iparamDel); // TODO: memleak strings!
+ }
+ wrkrInfo->iparamLast = NULL;
}
CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti));
@@ -1036,24 +1051,33 @@ processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *t
dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg);
// TODO: check error return states!
iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow);
- iRet = actionProcessMessage(pAction, pMsg,
+ if(pAction->eParamPassing == ACT_STRING_PASSING) {
+ pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction;
+ dbgprintf("DDDD: action %d is string passing - executing in commit phase\n", pAction->iActionNbr);
+ FINALIZE;
+ }
+
+ iRet = actionProcessMessage(pAction, pMsg->msgFlags,
pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
pbShutdownImmediate, pWti);
releaseDoActionParams(pAction, pWti);
+finalize_it:
RETiRet;
}
-/* Commit all active transactions */
-rsRetVal
+/* Commit all active transactions in direct mode */
+void
actionCommitAll(wti_t *pWti)
{
int i;
+ action_t *pAction;
+
for(i = 0 ; i < iActionNbr ; ++i) {
- DBGPRINTF("DDDD: actionCommitall action %d state %u\n",
- i, getActionStateByNbr(pWti, i));
- if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) {
+ dbgprintf("DDDD: actionCommitAll: action %d, state %u, root %p\n",
+ i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot);
+ pAction = pWti->actWrkrInfo[i].pAction;
+ if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT)
actionCommit(pWti->actWrkrInfo[i].pAction, pWti);
- }
}
}
@@ -1355,6 +1379,7 @@ static inline rsRetVal
doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
int pbShutdownImmediate = 0; // TODO: implement
+dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg);
struct syslogTime ttNow;
DEFiRet;
if(GatherStats)
@@ -1547,7 +1572,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
-#warning we need to look at the following
+// TODO #warning we need to look at the following
// Probably the core init needs to be done during createWrkrInstance()
//if(bSuspended)
// actionSuspend(pAction, pWti);
diff --git a/action.h b/action.h
index 27bb74a0..eb01f6e5 100644
--- a/action.h
+++ b/action.h
@@ -91,7 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr
rsRetVal activateActions(void);
rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction);
rsRetVal actionProcessCnf(struct cnfobj *o);
-rsRetVal actionCommitAll(wti_t *pWti);
+void actionCommitAll(wti_t *pWti);
/* external data */
extern int iActionNbr;
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 2ad21170..654c6e55 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -588,8 +588,14 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti)
}
+static void
+commitBatch(wti_t *pWti)
+{
+ actionCommitAll(pWti);
+}
+
/* Process (consume) a batch of messages. Calls the actions configured.
- * If the whole batch uses a singel ruleset, we can process the batch as
+ * If the whole batch uses a single ruleset, we can process the batch as
* a whole. Otherwise, we need to process it slower, on a message-by-message
* basis (what can be optimized to a per-ruleset basis)
* rgerhards, 2005-10-13
@@ -602,6 +608,8 @@ processBatch(batch_t *pBatch, wti_t *pWti)
assert(pBatch != NULL);
DBGPRINTF("processBatch: batch of %d elements must be processed\n", pBatch->nElem);
+
+ /* execution phase */
if(pBatch->bSingleRuleset) {
pThis = batchGetRuleset(pBatch);
if(pThis == NULL)
@@ -612,7 +620,8 @@ processBatch(batch_t *pBatch, wti_t *pWti)
CHKiRet(processBatchMultiRuleset(pBatch, pWti));
}
- actionCommitAll(pWti);
+ /* commit phase */
+ commitBatch(pWti);
finalize_it:
DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet);
RETiRet;
diff --git a/runtime/wti.c b/runtime/wti.c
index df77bc19..ddffd81a 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -391,7 +391,6 @@ finalize_it:
RETiRet;
}
-
/* dummy */
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
diff --git a/runtime/wti.h b/runtime/wti.h
index 79b62102..cd2fefab 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -1,6 +1,6 @@
/* Definition of the worker thread instance (wti) class.
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -37,6 +37,24 @@
#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */
/* note: 3 bit bit field --> highest value is 7! */
+/* The following structure defines immutable parameters which need to
+ * be passed as action parameters. Note that the current implementation
+ * does NOT focus on performance, but on a simple PoC in order to get
+ * things going. TODO: Once it works, revisit this code and think about
+ * an array implementation. We also need to support other passing modes
+ * as well. -- gerhards, 2013-11-04
+ */
+typedef struct actWrkrIParams {
+ struct actWrkrIParams *next;
+ int msgFlags;
+ /* following are caches to save allocs if not absolutely necessary */
+ uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
+ /* a cache to save malloc(), if not absolutely necessary */
+ unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ /* and the same for the message length (if used) */
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+} actWrkrIParams_t;
+
typedef struct actWrkrInfo {
action_t *pAction;
void *actWrkrData;
@@ -45,13 +63,9 @@ typedef struct actWrkrInfo {
struct {
unsigned actState : 3;
} flags;
- /* following are caches to save allocs if not absolutely necessary */
- uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
- /* a cache to save malloc(), if not absolutely necessary */
- void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */
- size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
- /* and the same for the message length (if used) */
- /* end action work variables */
+ actWrkrIParams_t *iparamRoot;
+ actWrkrIParams_t *iparamLast;
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */
} actWrkrInfo_t;
/* the worker thread instance class */
@@ -136,4 +150,26 @@ incActionNbrResRtry(wti_t *pWti, action_t *pAction)
{
pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++;
}
+
+static inline rsRetVal
+wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams)
+{
+ actWrkrInfo_t *wrkrInfo;
+ actWrkrIParams_t *iparams;
+ DEFiRet;
+
+ wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr);
+ CHKmalloc(iparams = calloc(1, sizeof(actWrkrIParams_t)));
+ if(wrkrInfo->iparamLast == NULL) {
+ wrkrInfo->iparamLast = wrkrInfo->iparamRoot = iparams;
+ } else {
+ wrkrInfo->iparamLast->next = iparams;
+ wrkrInfo->iparamLast = iparams;
+ }
+ *piparams = iparams;
+
+finalize_it:
+ RETiRet;
+}
#endif /* #ifndef WTI_H_INCLUDED */