summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/module-template.h55
-rw-r--r--runtime/modules.c3
-rw-r--r--runtime/modules.h2
-rw-r--r--runtime/queue.c26
-rw-r--r--runtime/queue.h8
-rw-r--r--runtime/rsconf.c1
-rw-r--r--runtime/ruleset.c50
-rw-r--r--runtime/ruleset.h5
-rw-r--r--runtime/wti.c20
-rw-r--r--runtime/wti.h6
-rw-r--r--runtime/wtp.c6
11 files changed, 130 insertions, 52 deletions
diff --git a/runtime/module-template.h b/runtime/module-template.h
index 8a958f90..f65aaf0f 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -175,6 +175,44 @@ static rsRetVal freeInstance(void* pModData)\
RETiRet;\
}
+/* createWrkrInstance()
+ */
+#define BEGINcreateWrkrInstance \
+static rsRetVal createWrkrInstance(wrkrInstanceData_t **ppWrkrData, instanceData *pData)\
+ {\
+ DEFiRet; /* store error code here */\
+ wrkrInstanceData_t *pWrkrData; /* use this to point to data elements */
+
+#define CODESTARTcreateWrkrInstance \
+ if((pWrkrData = calloc(1, sizeof(wrkrInstanceData_t))) == NULL) {\
+ *ppWrkrData = NULL;\
+ ENDfunc \
+ return RS_RET_OUT_OF_MEMORY;\
+ } \
+ pWrkrData->pData = pData;
+
+#define ENDcreateWrkrInstance \
+ *ppWrkrData = pWrkrData;\
+ RETiRet;\
+}
+
+/* freeWrkrInstance */
+#define BEGINfreeWrkrInstance \
+static rsRetVal freeWrkrInstance(void* pd)\
+{\
+ DEFiRet;\
+ wrkrInstanceData_t *pWrkrData;
+
+#define CODESTARTfreeWrkrInstance \
+ pWrkrData = (wrkrInstanceData_t*) pd;
+
+#define ENDfreeWrkrInstance \
+ if(pWrkrData != NULL)\
+ free(pWrkrData); /* we need to free this in any case */\
+ RETiRet;\
+}
+
+
/* isCompatibleWithFeature()
*/
#define BEGINisCompatibleWithFeature \
@@ -194,7 +232,7 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF
* introduced in v4.3.3 -- rgerhards, 2009-04-27
*/
#define BEGINbeginTransaction \
-static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\
+static rsRetVal beginTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
@@ -209,7 +247,7 @@ static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\
* introduced in v4.3.3 -- rgerhards, 2009-04-27
*/
#define BEGINendTransaction \
-static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\
+static rsRetVal endTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
@@ -223,7 +261,7 @@ static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\
/* doAction()
*/
#define BEGINdoAction \
-static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, instanceData __attribute__((unused)) *pData)\
+static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
@@ -382,12 +420,12 @@ static rsRetVal newInpInst(struct nvlst *lst)\
* rgerhard, 2007-08-02
*/
#define BEGINtryResume \
-static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\
+static rsRetVal tryResume(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
#define CODESTARTtryResume \
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
#define ENDtryResume \
RETiRet;\
@@ -467,6 +505,13 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\
*pEtryPoint = tryResume;\
}
+/* standard queries for output module interface in rsyslog v8+ */
+#define CODEqueryEtryPt_STD_OMOD8_QUERIES \
+ else if(!strcmp((char*) name, "createWrkrInstance")) {\
+ *pEtryPoint = createWrkrInstance;\
+ } else if(!strcmp((char*) name, "freeWrkrInstance")) {\
+ *pEtryPoint = freeWrkrInstance;\
+ }
/* the following definition is queryEtryPt block that must be added
* if an output module supports the transactional interface.
diff --git a/runtime/modules.c b/runtime/modules.c
index 56606306..52096082 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -652,6 +652,9 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"doAction", &pNew->mod.om.doAction));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume));
+ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"createWrkrInstance", &pNew->mod.om.createWrkrInstance));
+ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"freeWrkrInstance", &pNew->mod.om.freeWrkrInstance));
+
/* try load optional interfaces */
localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP);
if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
diff --git a/runtime/modules.h b/runtime/modules.h
index 23df22d6..a0637ac9 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -139,6 +139,8 @@ struct modInfo_s {
rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**);
rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **);
rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr);
+ rsRetVal (*createWrkrInstance)(void*ppWrkrData, void*pData);
+ rsRetVal (*freeWrkrInstance)(void*pWrkrData);
} om;
struct { /* data for library modules */
char dummy;
diff --git a/runtime/queue.c b/runtime/queue.c
index e0d60249..8c610b11 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@@ -957,7 +957,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
{
batch_t singleBatch;
batch_obj_t batchObj;
@@ -984,7 +984,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
singleBatch.active = &active;
- iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
free(batchObj.staticActStrings[i]);
@@ -997,7 +997,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
* otherwise incured. -- rgerhards, ~2010-06-23
*/
-rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti)
{
DEFiRet;
@@ -1011,7 +1011,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL);
+ iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL);
RETiRet;
}
@@ -1317,7 +1317,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1876,7 +1876,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
- CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate));
+ CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -2679,7 +2679,7 @@ finalize_it:
/* now, the same function, but for direct mode */
static rsRetVal
-qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti)
{
int i;
DEFiRet;
@@ -2688,7 +2688,7 @@ qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
assert(pMultiSub != NULL);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
+ CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
@@ -2703,16 +2703,16 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg)
+qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pMsg);
+ iRet = qAddDirect(pThis, pMsg, pWti);
RETiRet;
}
-/* enqueue a new user data element
+/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
diff --git a/runtime/queue.h b/runtime/queue.h
index 1918e502..8e789ebd 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -103,7 +103,7 @@ struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,batch_t*, wti_t*,int*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
* is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
@@ -195,14 +195,14 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg);
+rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
-rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*));
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti);
int queueCnfParamsSet(struct nvlst *lst);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index 8c808786..abce53b8 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -1205,6 +1205,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone!
ABORT_FINALIZE(RS_RET_NO_ACTIONS);
}
tellLexEndParsing();
+ DBGPRINTF("Number of actions in this configuration: %d\n", iActionNbr);
rulesetOptimizeAll(loadConf);
tellCoreConfigLoadDone();
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 3c4d2eed..e39a1889 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -67,8 +67,8 @@ static struct cnfparamblk rspblk =
};
/* forward definitions */
-static rsRetVal processBatch(batch_t *pBatch);
-static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active);
+static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti);
+static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti);
/* ---------- linked-list key handling functions (ruleset) ---------- */
@@ -168,7 +168,7 @@ finalize_it:
* rgerhards, 2010-06-15
*/
static inline rsRetVal
-processBatchMultiRuleset(batch_t *pBatch)
+processBatchMultiRuleset(batch_t *pBatch, wti_t *pWti)
{
ruleset_t *currRuleset;
batch_t snglRuleBatch;
@@ -206,7 +206,7 @@ processBatchMultiRuleset(batch_t *pBatch)
snglRuleBatch.nElem = iNew; /* was left just right by the for loop */
batchSetSingleRuleset(&snglRuleBatch, 1);
/* process temp batch */
- processBatch(&snglRuleBatch);
+ processBatch(&snglRuleBatch, pWti);
batchFree(&snglRuleBatch);
} while(bHaveUnprocessed == 1);
@@ -226,12 +226,12 @@ static inline void freeActive(sbool *active) { free(active); }
/* for details, see scriptExec() header comment! */
/* call action for all messages with filter on */
static rsRetVal
-execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
{
DEFiRet;
dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active);
pBatch->active = active;
- stmt->d.act->submitToActQ(stmt->d.act, pBatch);
+ stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti);
RETiRet;
}
@@ -285,13 +285,13 @@ execStop(batch_t *pBatch, sbool *active)
RETiRet;
}
static rsRetVal
-execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
{
msg_t *pMsg;
int i;
DEFiRet;
if(stmt->d.s_call.ruleset == NULL) {
- scriptExec(stmt->d.s_call.stmt, pBatch, active);
+ scriptExec(stmt->d.s_call.stmt, pBatch, active, pWti);
} else {
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
CHKmalloc(pMsg = MsgDup((msg_t*) pBatch->pElem[i].pMsg));
@@ -316,7 +316,7 @@ finalize_it:
// set new filter, inverted
// perform else (if any messages)
static rsRetVal
-execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
{
sbool *newAct;
int i;
@@ -345,7 +345,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
}
if(stmt->d.s_if.t_then != NULL) {
- scriptExec(stmt->d.s_if.t_then, pBatch, newAct);
+ scriptExec(stmt->d.s_if.t_then, pBatch, newAct, pWti);
}
if(stmt->d.s_if.t_else != NULL) {
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
@@ -355,7 +355,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
&& (active == NULL || active[i]))
newAct[i] = !newAct[i];
}
- scriptExec(stmt->d.s_if.t_else, pBatch, newAct);
+ scriptExec(stmt->d.s_if.t_else, pBatch, newAct, pWti);
}
freeActive(newAct);
finalize_it:
@@ -364,7 +364,7 @@ finalize_it:
/* for details, see scriptExec() header comment! */
static void
-execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
{
sbool *newAct;
msg_t *pMsg;
@@ -391,7 +391,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
}
if(stmt->d.s_prifilt.t_then != NULL) {
- scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct);
+ scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct, pWti);
}
if(stmt->d.s_prifilt.t_else != NULL) {
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
@@ -401,7 +401,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
&& (active == NULL || active[i]))
newAct[i] = !newAct[i];
}
- scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct);
+ scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct, pWti);
}
freeActive(newAct);
}
@@ -500,7 +500,7 @@ done:
/* for details, see scriptExec() header comment! */
static void
-execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
{
sbool *thenAct;
sbool bRet;
@@ -519,7 +519,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]);
}
- scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct);
+ scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct, pWti);
freeActive(thenAct);
}
@@ -534,7 +534,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
* rgerhards, 2012-09-04
*/
static rsRetVal
-scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active)
+scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti)
{
DEFiRet;
struct cnfstmt *stmt;
@@ -552,7 +552,7 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active)
execStop(pBatch, active);
break;
case S_ACT:
- execAct(stmt, pBatch, active);
+ execAct(stmt, pBatch, active, pWti);
break;
case S_SET:
execSet(stmt, pBatch, active);
@@ -561,16 +561,16 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active)
execUnset(stmt, pBatch, active);
break;
case S_CALL:
- execCall(stmt, pBatch, active);
+ execCall(stmt, pBatch, active, pWti);
break;
case S_IF:
- execIf(stmt, pBatch, active);
+ execIf(stmt, pBatch, active, pWti);
break;
case S_PRIFILT:
- execPRIFILT(stmt, pBatch, active);
+ execPRIFILT(stmt, pBatch, active, pWti);
break;
case S_PROPFILT:
- execPROPFILT(stmt, pBatch, active);
+ execPROPFILT(stmt, pBatch, active, pWti);
break;
default:
dbgprintf("error: unknown stmt type %u during exec\n",
@@ -589,7 +589,7 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active)
* rgerhards, 2005-10-13
*/
static rsRetVal
-processBatch(batch_t *pBatch)
+processBatch(batch_t *pBatch, wti_t *pWti)
{
ruleset_t *pThis;
DEFiRet;
@@ -601,9 +601,9 @@ processBatch(batch_t *pBatch)
if(pThis == NULL)
pThis = ourConf->rulesets.pDflt;
ISOBJ_TYPE_assert(pThis, ruleset);
- CHKiRet(scriptExec(pThis->root, pBatch, NULL));
+ CHKiRet(scriptExec(pThis->root, pBatch, NULL, pWti));
} else {
- CHKiRet(processBatchMultiRuleset(pBatch));
+ CHKiRet(processBatchMultiRuleset(pBatch, pWti));
}
finalize_it:
diff --git a/runtime/ruleset.h b/runtime/ruleset.h
index 9905b53c..0af3578c 100644
--- a/runtime/ruleset.h
+++ b/runtime/ruleset.h
@@ -46,7 +46,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Destruct)(ruleset_t **ppThis);
rsRetVal (*DestructAllActions)(rsconf_t *conf);
rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName);
- rsRetVal (*ProcessBatch)(batch_t*);
+ rsRetVal (*ProcessBatch)(batch_t*, wti_t *);
rsRetVal (*GetRuleset)(rsconf_t *conf, ruleset_t **ppThis, uchar*);
rsRetVal (*SetDefaultRuleset)(rsconf_t *conf, uchar*);
rsRetVal (*SetCurrRuleset)(rsconf_t *conf, uchar*);
@@ -64,8 +64,9 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
/* AddRule() removed */
/*TODO:REMOVE*/rsRetVal (*IterateAllActions)(rsconf_t *conf, rsRetVal (*pFunc)(void*, void*), void* pParam);
void (*AddScript)(ruleset_t *pThis, struct cnfstmt *script);
+ /* v8: changed processBatch interface */
ENDinterface(ruleset)
-#define rulesetCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
+#define rulesetCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/runtime/wti.c b/runtime/wti.c
index 77197a95..c2077a51 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -44,6 +44,7 @@
#include "wti.h"
#include "obj.h"
#include "glbl.h"
+#include "action.h"
#include "atomic.h"
/* static data */
@@ -171,6 +172,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(wti)
/* actual destruction */
batchFree(&pThis->batch);
+ free(pThis->actWrkrInfo);
pthread_cond_destroy(&pThis->pcondBusy);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
@@ -197,11 +199,15 @@ wtiConstructFinalize(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n",
+ wtiGetDbgHdr(pThis), iActionNbr);
/* initialize our thread instance descriptor (no concurrency here) */
pThis->bIsRunning = RSFALSE;
+ /* must use calloc as we need zero-init */
+ CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t)));
+
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
@@ -279,6 +285,7 @@ wtiWorker(wti_t *pThis)
rsRetVal localRet;
rsRetVal terminateRet;
int iCancelStateSave;
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -288,6 +295,7 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+dbgprintf("DDDD: wti %p: worker starting\n", pThis);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
@@ -334,9 +342,19 @@ wtiWorker(wti_t *pThis)
bInactivityTOOccured = 0; /* reset for next run */
}
+ DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis);
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData);
+ if(pThis->actWrkrInfo[i].actWrkrData != NULL) {
+ dbgprintf("DDDD: calling freeWrkrData!\n");
+ pThis->actWrkrInfo[i].pAction->pMod->mod.om.freeWrkrInstance(pThis->actWrkrInfo[i].actWrkrData);
+ }
+ }
+
/* indicate termination */
pthread_cleanup_pop(0); /* remove cleanup handler */
pthread_setcancelstate(iCancelStateSave, NULL);
+dbgprintf("DDDD: wti %p: worker exiting\n", pThis);
RETiRet;
}
diff --git a/runtime/wti.h b/runtime/wti.h
index b0dc6c96..8c43c22e 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -28,6 +28,11 @@
#include "batch.h"
+typedef struct actWrkrInfo {
+ action_t *pAction;
+ void *actWrkrData;
+} actWrkrInfo_t;
+
/* the worker thread instance class */
struct wti_s {
BEGINobjInstance;
@@ -37,6 +42,7 @@ struct wti_s {
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
+ actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */
pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
DEF_ATOMIC_HELPER_MUT(mutIsRunning);
};
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 895c1ffe..66942e6b 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -8,7 +8,7 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -91,6 +91,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_cond_init(&pThis->condThrdTrm, NULL);
pthread_attr_init(&pThis->attrThrd);
/* Set thread scheduling policy to default */
+#warning do we need this any longer? I think it was a cure for an already fixed bug..
#ifdef HAVE_PTHREAD_SETSCHEDPARAM
pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
@@ -121,7 +122,8 @@ wtpConstructFinalize(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker thread pool (numworkerThreads %d)\n",
+ wtpGetDbgHdr(pThis), pThis->iNumWorkerThreads);
/* alloc and construct workers - this can only be done in finalizer as we previously do
* not know the max number of workers
*/