diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 69 |
1 files changed, 47 insertions, 22 deletions
@@ -778,6 +778,7 @@ finalize_it: } +#if 0 // TODO: remove? /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ @@ -812,6 +813,7 @@ static rsRetVal actionDbgPrint(action_t *pThis) RETiRet; } +#endif /* prepare the calling parameters for doAction() @@ -822,6 +824,7 @@ prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslog { int i; struct json_object *json; + actWrkrIParams_t *iparams; actWrkrInfo_t *pWrkrInfo; DEFiRet; @@ -830,9 +833,11 @@ prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslog for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->staticActStrings[i]), - &pWrkrInfo->staticLenStrings[i], ttNow)); - pWrkrInfo->staticActParams[i] = pWrkrInfo->staticActStrings[i]; + CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); + iparams->msgFlags = pMsg->msgFlags; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(iparams->staticActStrings[i]), + &iparams->staticLenStrings[i], ttNow)); + iparams->staticActParams[i] = iparams->staticActStrings[i]; break; case ACT_ARRAY_PASSING: CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow)); @@ -905,12 +910,11 @@ done: return; * rgerhards, 2008-01-28 */ static rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) +actionCallDoAction(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", getActStateName(pThis, pWti), pThis->iActionNbr); @@ -918,7 +922,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, + iRet = pThis->pMod->mod.om.doAction(actParams, msgFlags, pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: @@ -956,19 +960,16 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti) +rsRetVal +actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(getActionState(pWti, pThis) == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); + CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti)); iRet = getReturnCode(pThis, pWti); finalize_it: @@ -981,10 +982,24 @@ static rsRetVal actionCommit(action_t *pThis, wti_t *pWti) { int pbShutdownImmediate = 1; + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparamCurr, *iparamDel; DEFiRet; - if(getActionState(pWti, pThis) == ACT_STATE_RDY) { - FINALIZE; /* nothing to do */ + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + dbgprintf("DDDD: actionCommit: action %d, root %p\n", pThis->iActionNbr, wrkrInfo->iparamRoot); + if(wrkrInfo->iparamRoot != NULL) { + iparamCurr = wrkrInfo->iparamRoot; + while(iparamCurr != NULL) { + iRet = actionProcessMessage(pThis, iparamCurr->msgFlags, + iparamCurr->staticActParams, + &pbShutdownImmediate, pWti); + releaseDoActionParams(pThis, pWti); + iparamDel = iparamCurr; + iparamCurr = iparamCurr->next; + free(iparamDel); // TODO: memleak strings! + } + wrkrInfo->iparamLast = NULL; } CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti)); @@ -1036,24 +1051,33 @@ processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *t dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg); // TODO: check error return states! iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); - iRet = actionProcessMessage(pAction, pMsg, + if(pAction->eParamPassing == ACT_STRING_PASSING) { + pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; + dbgprintf("DDDD: action %d is string passing - executing in commit phase\n", pAction->iActionNbr); + FINALIZE; + } + + iRet = actionProcessMessage(pAction, pMsg->msgFlags, pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, pbShutdownImmediate, pWti); releaseDoActionParams(pAction, pWti); +finalize_it: RETiRet; } -/* Commit all active transactions */ -rsRetVal +/* Commit all active transactions in direct mode */ +void actionCommitAll(wti_t *pWti) { int i; + action_t *pAction; + for(i = 0 ; i < iActionNbr ; ++i) { - DBGPRINTF("DDDD: actionCommitall action %d state %u\n", - i, getActionStateByNbr(pWti, i)); - if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) { + dbgprintf("DDDD: actionCommitAll: action %d, state %u, root %p\n", + i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot); + pAction = pWti->actWrkrInfo[i].pAction; + if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT) actionCommit(pWti->actWrkrInfo[i].pAction, pWti); - } } } @@ -1355,6 +1379,7 @@ static inline rsRetVal doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { int pbShutdownImmediate = 0; // TODO: implement +dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg); struct syslogTime ttNow; DEFiRet; if(GatherStats) @@ -1547,7 +1572,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; -#warning we need to look at the following +// TODO #warning we need to look at the following // Probably the core init needs to be done during createWrkrInstance() //if(bSuspended) // actionSuspend(pAction, pWti); |