summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c69
1 files changed, 47 insertions, 22 deletions
diff --git a/action.c b/action.c
index 71ffd788..f57a7399 100644
--- a/action.c
+++ b/action.c
@@ -778,6 +778,7 @@ finalize_it:
}
+#if 0 // TODO: remove?
/* debug-print the contents of an action object
* rgerhards, 2007-08-02
*/
@@ -812,6 +813,7 @@ static rsRetVal actionDbgPrint(action_t *pThis)
RETiRet;
}
+#endif
/* prepare the calling parameters for doAction()
@@ -822,6 +824,7 @@ prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslog
{
int i;
struct json_object *json;
+ actWrkrIParams_t *iparams;
actWrkrInfo_t *pWrkrInfo;
DEFiRet;
@@ -830,9 +833,11 @@ prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslog
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->staticActStrings[i]),
- &pWrkrInfo->staticLenStrings[i], ttNow));
- pWrkrInfo->staticActParams[i] = pWrkrInfo->staticActStrings[i];
+ CHKiRet(wtiNewIParam(pWti, pAction, &iparams));
+ iparams->msgFlags = pMsg->msgFlags;
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(iparams->staticActStrings[i]),
+ &iparams->staticLenStrings[i], ttNow));
+ iparams->staticActParams[i] = iparams->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow));
@@ -905,12 +910,11 @@ done: return;
* rgerhards, 2008-01-28
*/
static rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
+actionCallDoAction(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
- ISOBJ_TYPE_assert(pMsg, msg);
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
getActStateName(pThis, pWti), pThis->iActionNbr);
@@ -918,7 +922,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags,
+ iRet = pThis->pMod->mod.om.doAction(actParams, msgFlags,
pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
@@ -956,19 +960,16 @@ finalize_it:
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-static rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
+rsRetVal
+actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
- ASSERT(pThis != NULL);
- ISOBJ_TYPE_assert(pMsg, msg);
-
CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
if(getActionState(pWti, pThis) == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti));
+ CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti));
iRet = getReturnCode(pThis, pWti);
finalize_it:
@@ -981,10 +982,24 @@ static rsRetVal
actionCommit(action_t *pThis, wti_t *pWti)
{
int pbShutdownImmediate = 1;
+ actWrkrInfo_t *wrkrInfo;
+ actWrkrIParams_t *iparamCurr, *iparamDel;
DEFiRet;
- if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
- FINALIZE; /* nothing to do */
+ wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
+ dbgprintf("DDDD: actionCommit: action %d, root %p\n", pThis->iActionNbr, wrkrInfo->iparamRoot);
+ if(wrkrInfo->iparamRoot != NULL) {
+ iparamCurr = wrkrInfo->iparamRoot;
+ while(iparamCurr != NULL) {
+ iRet = actionProcessMessage(pThis, iparamCurr->msgFlags,
+ iparamCurr->staticActParams,
+ &pbShutdownImmediate, pWti);
+ releaseDoActionParams(pThis, pWti);
+ iparamDel = iparamCurr;
+ iparamCurr = iparamCurr->next;
+ free(iparamDel); // TODO: memleak strings!
+ }
+ wrkrInfo->iparamLast = NULL;
}
CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti));
@@ -1036,24 +1051,33 @@ processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *t
dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg);
// TODO: check error return states!
iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow);
- iRet = actionProcessMessage(pAction, pMsg,
+ if(pAction->eParamPassing == ACT_STRING_PASSING) {
+ pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction;
+ dbgprintf("DDDD: action %d is string passing - executing in commit phase\n", pAction->iActionNbr);
+ FINALIZE;
+ }
+
+ iRet = actionProcessMessage(pAction, pMsg->msgFlags,
pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
pbShutdownImmediate, pWti);
releaseDoActionParams(pAction, pWti);
+finalize_it:
RETiRet;
}
-/* Commit all active transactions */
-rsRetVal
+/* Commit all active transactions in direct mode */
+void
actionCommitAll(wti_t *pWti)
{
int i;
+ action_t *pAction;
+
for(i = 0 ; i < iActionNbr ; ++i) {
- DBGPRINTF("DDDD: actionCommitall action %d state %u\n",
- i, getActionStateByNbr(pWti, i));
- if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) {
+ dbgprintf("DDDD: actionCommitAll: action %d, state %u, root %p\n",
+ i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot);
+ pAction = pWti->actWrkrInfo[i].pAction;
+ if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT)
actionCommit(pWti->actWrkrInfo[i].pAction, pWti);
- }
}
}
@@ -1355,6 +1379,7 @@ static inline rsRetVal
doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
int pbShutdownImmediate = 0; // TODO: implement
+dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg);
struct syslogTime ttNow;
DEFiRet;
if(GatherStats)
@@ -1547,7 +1572,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
-#warning we need to look at the following
+// TODO #warning we need to look at the following
// Probably the core init needs to be done during createWrkrInstance()
//if(bSuspended)
// actionSuspend(pAction, pWti);