summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-10-28 16:03:32 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2013-10-28 16:03:32 +0100
commit286bf42f61f5b67daf0d6a3d2234f0b50a2a9c97 (patch)
tree0ac1ec681f91669355c2700e535bdc48109585b4
parent4cf5b6cb49385a3f5cdcbe34c534bbd4b89e4d8c (diff)
downloadrsyslog-286bf42f61f5b67daf0d6a3d2234f0b50a2a9c97.tar.gz
rsyslog-286bf42f61f5b67daf0d6a3d2234f0b50a2a9c97.tar.bz2
rsyslog-286bf42f61f5b67daf0d6a3d2234f0b50a2a9c97.zip
milestone: action engine changed to partially support wrkr instance action interface
-rw-r--r--action.c21
-rw-r--r--grammar/rainerscript.c2
-rw-r--r--runtime/modules.c6
-rw-r--r--runtime/modules.h1
-rw-r--r--tools/omfile.c6
5 files changed, 26 insertions, 10 deletions
diff --git a/action.c b/action.c
index 7cc45c37..f9297a55 100644
--- a/action.c
+++ b/action.c
@@ -915,10 +915,15 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
getActStateName(pThis), pThis->iActionNbr);
+ if(pWti->actWrkrData[pThis->iActionNbr] == NULL) {
+ DBGPRINTF("we need to create a new action worker instance for "
+ "action %d\n", pThis->iActionNbr);
+ CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrData[pThis->iActionNbr]), pThis->pModData));
+ }
+
pThis->bHadAutoCommit = 0;
-dbgprintf("DDDD: calling action id %d for wti %p\n", pThis->iActionNbr, pWti);
-#warning TODO: add action instance check here
- iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData);
+ iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags,
+ pWti->actWrkrData[pThis->iActionNbr]);
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -955,7 +960,7 @@ finalize_it:
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-static inline rsRetVal
+static rsRetVal
actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
@@ -980,7 +985,7 @@ finalize_it:
* rgerhards, 2008-01-28
*/
static rsRetVal
-finishBatch(action_t *pThis, batch_t *pBatch)
+finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
{
int i;
DEFiRet;
@@ -994,7 +999,7 @@ finishBatch(action_t *pThis, batch_t *pBatch)
CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate));
if(pThis->eState == ACT_STATE_ITX) {
- iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData);
+ iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrData[pThis->iActionNbr]);
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -1138,7 +1143,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti)
/* try commit transaction, once done, we can simply do so as if
* that return state was returned from tryDoAction().
*/
- localRet = finishBatch(pAction, pBatch);
+ localRet = finishBatch(pAction, pBatch, pWti);
}
if( localRet == RS_RET_OK
@@ -1254,7 +1259,7 @@ processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti)
assert(pBatch != NULL);
CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti));
- iRet = finishBatch(pAction, pBatch);
+ iRet = finishBatch(pAction, pBatch, pWti);
finalize_it:
RETiRet;
diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c
index abf9dd34..30c36201 100644
--- a/grammar/rainerscript.c
+++ b/grammar/rainerscript.c
@@ -2427,7 +2427,7 @@ cnfstmtPrintOnly(struct cnfstmt *stmt, int indent, sbool subtree)
free(cstr);
break;
case S_ACT:
- doIndent(indent); dbgprintf("ACTION %p [%s:%s]\n", stmt->d.act,
+ doIndent(indent); dbgprintf("ACTION %d [%s:%s]\n", stmt->d.act->iActionNbr,
modGetName(stmt->d.act->pMod), stmt->printable);
break;
case S_IF:
diff --git a/runtime/modules.c b/runtime/modules.c
index 56606306..eca7e466 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -652,6 +652,12 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"doAction", &pNew->mod.om.doAction));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume));
+
+ /* TODO: 2013-10-28: these will become mandatory but are not to get me started */
+ localRet = (*pNew->modQueryEtryPt)((uchar*)"createWrkrInstance", &pNew->mod.om.createWrkrInstance);
+ if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
+ ABORT_FINALIZE(localRet);
+
/* try load optional interfaces */
localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP);
if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
diff --git a/runtime/modules.h b/runtime/modules.h
index 23df22d6..31e87995 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -139,6 +139,7 @@ struct modInfo_s {
rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**);
rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **);
rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr);
+ rsRetVal (*createWrkrInstance)(void*pWrkrData, void*pData);
} om;
struct { /* data for library modules */
char dummy;
diff --git a/tools/omfile.c b/tools/omfile.c
index 424647e0..77f10cb8 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -958,7 +958,7 @@ bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line)
linebuf_t *lb;
DEFiRet;
- dbgprintf("DDDD: buffering line %s\n", line);
+ dbgprintf("DDDD: buffering root %p, line %s\n", pWrkrData->pRoot, line);
CHKmalloc(lb = (linebuf_t*) malloc(sizeof(linebuf_t)));
CHKmalloc(lb->filename = ustrdup(filename));
CHKmalloc(lb->ln = ustrdup(line));
@@ -996,12 +996,16 @@ submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData)
BEGINdoAction
CODESTARTdoAction
+ pData = pWrkrData->pData;
iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname,
ppString[0]);
+ if(iRet == RS_RET_OK)
+ iRet = RS_RET_DEFER_COMMIT;
ENDdoAction
BEGINendTransaction
CODESTARTendTransaction
+ pData = pWrkrData->pData;
submitCachedLines(pWrkrData, pData);
/* Note: pStrm may be NULL if there was an error opening the stream */
if(pData->bFlushOnTXEnd && pData->pStrm != NULL) {