diff options
-rw-r--r-- | action.c | 69 | ||||
-rw-r--r-- | action.h | 2 | ||||
-rw-r--r-- | runtime/ruleset.c | 13 | ||||
-rw-r--r-- | runtime/wti.c | 1 | ||||
-rw-r--r-- | runtime/wti.h | 52 |
5 files changed, 103 insertions, 34 deletions
@@ -778,6 +778,7 @@ finalize_it: } +#if 0 // TODO: remove? /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ @@ -812,6 +813,7 @@ static rsRetVal actionDbgPrint(action_t *pThis) RETiRet; } +#endif /* prepare the calling parameters for doAction() @@ -822,6 +824,7 @@ prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslog { int i; struct json_object *json; + actWrkrIParams_t *iparams; actWrkrInfo_t *pWrkrInfo; DEFiRet; @@ -830,9 +833,11 @@ prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslog for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->staticActStrings[i]), - &pWrkrInfo->staticLenStrings[i], ttNow)); - pWrkrInfo->staticActParams[i] = pWrkrInfo->staticActStrings[i]; + CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); + iparams->msgFlags = pMsg->msgFlags; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(iparams->staticActStrings[i]), + &iparams->staticLenStrings[i], ttNow)); + iparams->staticActParams[i] = iparams->staticActStrings[i]; break; case ACT_ARRAY_PASSING: CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow)); @@ -905,12 +910,11 @@ done: return; * rgerhards, 2008-01-28 */ static rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) +actionCallDoAction(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", getActStateName(pThis, pWti), pThis->iActionNbr); @@ -918,7 +922,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, + iRet = pThis->pMod->mod.om.doAction(actParams, msgFlags, pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: @@ -956,19 +960,16 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti) +rsRetVal +actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; - ASSERT(pThis != NULL); - ISOBJ_TYPE_assert(pMsg, msg); - CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(getActionState(pWti, pThis) == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); + CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti)); iRet = getReturnCode(pThis, pWti); finalize_it: @@ -981,10 +982,24 @@ static rsRetVal actionCommit(action_t *pThis, wti_t *pWti) { int pbShutdownImmediate = 1; + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparamCurr, *iparamDel; DEFiRet; - if(getActionState(pWti, pThis) == ACT_STATE_RDY) { - FINALIZE; /* nothing to do */ + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + dbgprintf("DDDD: actionCommit: action %d, root %p\n", pThis->iActionNbr, wrkrInfo->iparamRoot); + if(wrkrInfo->iparamRoot != NULL) { + iparamCurr = wrkrInfo->iparamRoot; + while(iparamCurr != NULL) { + iRet = actionProcessMessage(pThis, iparamCurr->msgFlags, + iparamCurr->staticActParams, + &pbShutdownImmediate, pWti); + releaseDoActionParams(pThis, pWti); + iparamDel = iparamCurr; + iparamCurr = iparamCurr->next; + free(iparamDel); // TODO: memleak strings! + } + wrkrInfo->iparamLast = NULL; } CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti)); @@ -1036,24 +1051,33 @@ processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *t dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg); // TODO: check error return states! iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow); - iRet = actionProcessMessage(pAction, pMsg, + if(pAction->eParamPassing == ACT_STRING_PASSING) { + pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; + dbgprintf("DDDD: action %d is string passing - executing in commit phase\n", pAction->iActionNbr); + FINALIZE; + } + + iRet = actionProcessMessage(pAction, pMsg->msgFlags, pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, pbShutdownImmediate, pWti); releaseDoActionParams(pAction, pWti); +finalize_it: RETiRet; } -/* Commit all active transactions */ -rsRetVal +/* Commit all active transactions in direct mode */ +void actionCommitAll(wti_t *pWti) { int i; + action_t *pAction; + for(i = 0 ; i < iActionNbr ; ++i) { - DBGPRINTF("DDDD: actionCommitall action %d state %u\n", - i, getActionStateByNbr(pWti, i)); - if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) { + dbgprintf("DDDD: actionCommitAll: action %d, state %u, root %p\n", + i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot); + pAction = pWti->actWrkrInfo[i].pAction; + if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT) actionCommit(pWti->actWrkrInfo[i].pAction, pWti); - } } } @@ -1355,6 +1379,7 @@ static inline rsRetVal doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { int pbShutdownImmediate = 0; // TODO: implement +dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg); struct syslogTime ttNow; DEFiRet; if(GatherStats) @@ -1547,7 +1572,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; -#warning we need to look at the following +// TODO #warning we need to look at the following // Probably the core init needs to be done during createWrkrInstance() //if(bSuspended) // actionSuspend(pAction, pWti); @@ -91,7 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr rsRetVal activateActions(void); rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction); rsRetVal actionProcessCnf(struct cnfobj *o); -rsRetVal actionCommitAll(wti_t *pWti); +void actionCommitAll(wti_t *pWti); /* external data */ extern int iActionNbr; diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 2ad21170..654c6e55 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -588,8 +588,14 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti) } +static void +commitBatch(wti_t *pWti) +{ + actionCommitAll(pWti); +} + /* Process (consume) a batch of messages. Calls the actions configured. - * If the whole batch uses a singel ruleset, we can process the batch as + * If the whole batch uses a single ruleset, we can process the batch as * a whole. Otherwise, we need to process it slower, on a message-by-message * basis (what can be optimized to a per-ruleset basis) * rgerhards, 2005-10-13 @@ -602,6 +608,8 @@ processBatch(batch_t *pBatch, wti_t *pWti) assert(pBatch != NULL); DBGPRINTF("processBatch: batch of %d elements must be processed\n", pBatch->nElem); + + /* execution phase */ if(pBatch->bSingleRuleset) { pThis = batchGetRuleset(pBatch); if(pThis == NULL) @@ -612,7 +620,8 @@ processBatch(batch_t *pBatch, wti_t *pWti) CHKiRet(processBatchMultiRuleset(pBatch, pWti)); } - actionCommitAll(pWti); + /* commit phase */ + commitBatch(pWti); finalize_it: DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet); RETiRet; diff --git a/runtime/wti.c b/runtime/wti.c index df77bc19..ddffd81a 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -391,7 +391,6 @@ finalize_it: RETiRet; } - /* dummy */ rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } diff --git a/runtime/wti.h b/runtime/wti.h index 79b62102..cd2fefab 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -1,6 +1,6 @@ /* Definition of the worker thread instance (wti) class. * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2013 Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -37,6 +37,24 @@ #define ACT_STATE_DIED 7 /* action permanently failed and now disabled */ /* note: 3 bit bit field --> highest value is 7! */ +/* The following structure defines immutable parameters which need to + * be passed as action parameters. Note that the current implementation + * does NOT focus on performance, but on a simple PoC in order to get + * things going. TODO: Once it works, revisit this code and think about + * an array implementation. We also need to support other passing modes + * as well. -- gerhards, 2013-11-04 + */ +typedef struct actWrkrIParams { + struct actWrkrIParams *next; + int msgFlags; + /* following are caches to save allocs if not absolutely necessary */ + uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ + /* a cache to save malloc(), if not absolutely necessary */ + unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + /* and the same for the message length (if used) */ + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; +} actWrkrIParams_t; + typedef struct actWrkrInfo { action_t *pAction; void *actWrkrData; @@ -45,13 +63,9 @@ typedef struct actWrkrInfo { struct { unsigned actState : 3; } flags; - /* following are caches to save allocs if not absolutely necessary */ - uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ - /* a cache to save malloc(), if not absolutely necessary */ - void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */ - size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; - /* and the same for the message length (if used) */ - /* end action work variables */ + actWrkrIParams_t *iparamRoot; + actWrkrIParams_t *iparamLast; + void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */ } actWrkrInfo_t; /* the worker thread instance class */ @@ -136,4 +150,26 @@ incActionNbrResRtry(wti_t *pWti, action_t *pAction) { pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++; } + +static inline rsRetVal +wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams) +{ + actWrkrInfo_t *wrkrInfo; + actWrkrIParams_t *iparams; + DEFiRet; + + wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); +dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr); + CHKmalloc(iparams = calloc(1, sizeof(actWrkrIParams_t))); + if(wrkrInfo->iparamLast == NULL) { + wrkrInfo->iparamLast = wrkrInfo->iparamRoot = iparams; + } else { + wrkrInfo->iparamLast->next = iparams; + wrkrInfo->iparamLast = iparams; + } + *piparams = iparams; + +finalize_it: + RETiRet; +} #endif /* #ifndef WTI_H_INCLUDED */ |