diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/module-template.h | 55 | ||||
-rw-r--r-- | runtime/modules.c | 3 | ||||
-rw-r--r-- | runtime/modules.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 26 | ||||
-rw-r--r-- | runtime/queue.h | 8 | ||||
-rw-r--r-- | runtime/rsconf.c | 1 | ||||
-rw-r--r-- | runtime/ruleset.c | 50 | ||||
-rw-r--r-- | runtime/ruleset.h | 5 | ||||
-rw-r--r-- | runtime/wti.c | 20 | ||||
-rw-r--r-- | runtime/wti.h | 6 | ||||
-rw-r--r-- | runtime/wtp.c | 6 |
11 files changed, 130 insertions, 52 deletions
diff --git a/runtime/module-template.h b/runtime/module-template.h index 8a958f90..f65aaf0f 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -175,6 +175,44 @@ static rsRetVal freeInstance(void* pModData)\ RETiRet;\ } +/* createWrkrInstance() + */ +#define BEGINcreateWrkrInstance \ +static rsRetVal createWrkrInstance(wrkrInstanceData_t **ppWrkrData, instanceData *pData)\ + {\ + DEFiRet; /* store error code here */\ + wrkrInstanceData_t *pWrkrData; /* use this to point to data elements */ + +#define CODESTARTcreateWrkrInstance \ + if((pWrkrData = calloc(1, sizeof(wrkrInstanceData_t))) == NULL) {\ + *ppWrkrData = NULL;\ + ENDfunc \ + return RS_RET_OUT_OF_MEMORY;\ + } \ + pWrkrData->pData = pData; + +#define ENDcreateWrkrInstance \ + *ppWrkrData = pWrkrData;\ + RETiRet;\ +} + +/* freeWrkrInstance */ +#define BEGINfreeWrkrInstance \ +static rsRetVal freeWrkrInstance(void* pd)\ +{\ + DEFiRet;\ + wrkrInstanceData_t *pWrkrData; + +#define CODESTARTfreeWrkrInstance \ + pWrkrData = (wrkrInstanceData_t*) pd; + +#define ENDfreeWrkrInstance \ + if(pWrkrData != NULL)\ + free(pWrkrData); /* we need to free this in any case */\ + RETiRet;\ +} + + /* isCompatibleWithFeature() */ #define BEGINisCompatibleWithFeature \ @@ -194,7 +232,7 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINbeginTransaction \ -static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal beginTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -209,7 +247,7 @@ static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINendTransaction \ -static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal endTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -223,7 +261,7 @@ static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ /* doAction() */ #define BEGINdoAction \ -static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, instanceData __attribute__((unused)) *pData)\ +static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -382,12 +420,12 @@ static rsRetVal newInpInst(struct nvlst *lst)\ * rgerhard, 2007-08-02 */ #define BEGINtryResume \ -static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\ +static rsRetVal tryResume(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; #define CODESTARTtryResume \ - assert(pData != NULL); + assert(pWrkrData != NULL); #define ENDtryResume \ RETiRet;\ @@ -467,6 +505,13 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ *pEtryPoint = tryResume;\ } +/* standard queries for output module interface in rsyslog v8+ */ +#define CODEqueryEtryPt_STD_OMOD8_QUERIES \ + else if(!strcmp((char*) name, "createWrkrInstance")) {\ + *pEtryPoint = createWrkrInstance;\ + } else if(!strcmp((char*) name, "freeWrkrInstance")) {\ + *pEtryPoint = freeWrkrInstance;\ + } /* the following definition is queryEtryPt block that must be added * if an output module supports the transactional interface. diff --git a/runtime/modules.c b/runtime/modules.c index 56606306..52096082 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -652,6 +652,9 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"doAction", &pNew->mod.om.doAction)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume)); + CHKiRet((*pNew->modQueryEtryPt)((uchar*)"createWrkrInstance", &pNew->mod.om.createWrkrInstance)); + CHKiRet((*pNew->modQueryEtryPt)((uchar*)"freeWrkrInstance", &pNew->mod.om.freeWrkrInstance)); + /* try load optional interfaces */ localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP); if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) diff --git a/runtime/modules.h b/runtime/modules.h index 23df22d6..a0637ac9 100644 --- a/runtime/modules.h +++ b/runtime/modules.h @@ -139,6 +139,8 @@ struct modInfo_s { rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**); rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **); rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr); + rsRetVal (*createWrkrInstance)(void*ppWrkrData, void*pData); + rsRetVal (*freeWrkrInstance)(void*pWrkrData); } om; struct { /* data for library modules */ char dummy; diff --git a/runtime/queue.c b/runtime/queue.c index e0d60249..8c610b11 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -957,7 +957,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; @@ -984,7 +984,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); @@ -997,7 +997,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) /* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead * otherwise incured. -- rgerhards, ~2010-06-23 */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti) { DEFiRet; @@ -1011,7 +1011,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL); + iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL); RETiRet; } @@ -1317,7 +1317,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*)) { DEFiRet; qqueue_t *pThis; @@ -1876,7 +1876,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2679,7 +2679,7 @@ finalize_it: /* now, the same function, but for direct mode */ static rsRetVal -qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti) { int i; DEFiRet; @@ -2688,7 +2688,7 @@ qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) assert(pMultiSub != NULL); for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2703,16 +2703,16 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg); + iRet = qAddDirect(pThis, pMsg, pWti); RETiRet; } -/* enqueue a new user data element +/* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ rsRetVal diff --git a/runtime/queue.h b/runtime/queue.h index 1918e502..8e789ebd 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -103,7 +103,7 @@ struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*, wti_t*,int*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero @@ -195,14 +195,14 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg); +rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*)); +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti); int queueCnfParamsSet(struct nvlst *lst); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst); void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index b1016568..c855779e 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -1220,6 +1220,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone! ABORT_FINALIZE(RS_RET_NO_ACTIONS); } tellLexEndParsing(); + DBGPRINTF("Number of actions in this configuration: %d\n", iActionNbr); rulesetOptimizeAll(loadConf); tellCoreConfigLoadDone(); diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 3c4d2eed..e39a1889 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -67,8 +67,8 @@ static struct cnfparamblk rspblk = }; /* forward definitions */ -static rsRetVal processBatch(batch_t *pBatch); -static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active); +static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti); +static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti); /* ---------- linked-list key handling functions (ruleset) ---------- */ @@ -168,7 +168,7 @@ finalize_it: * rgerhards, 2010-06-15 */ static inline rsRetVal -processBatchMultiRuleset(batch_t *pBatch) +processBatchMultiRuleset(batch_t *pBatch, wti_t *pWti) { ruleset_t *currRuleset; batch_t snglRuleBatch; @@ -206,7 +206,7 @@ processBatchMultiRuleset(batch_t *pBatch) snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ batchSetSingleRuleset(&snglRuleBatch, 1); /* process temp batch */ - processBatch(&snglRuleBatch); + processBatch(&snglRuleBatch, pWti); batchFree(&snglRuleBatch); } while(bHaveUnprocessed == 1); @@ -226,12 +226,12 @@ static inline void freeActive(sbool *active) { free(active); } /* for details, see scriptExec() header comment! */ /* call action for all messages with filter on */ static rsRetVal -execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { DEFiRet; dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); pBatch->active = active; - stmt->d.act->submitToActQ(stmt->d.act, pBatch); + stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti); RETiRet; } @@ -285,13 +285,13 @@ execStop(batch_t *pBatch, sbool *active) RETiRet; } static rsRetVal -execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { msg_t *pMsg; int i; DEFiRet; if(stmt->d.s_call.ruleset == NULL) { - scriptExec(stmt->d.s_call.stmt, pBatch, active); + scriptExec(stmt->d.s_call.stmt, pBatch, active, pWti); } else { for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { CHKmalloc(pMsg = MsgDup((msg_t*) pBatch->pElem[i].pMsg)); @@ -316,7 +316,7 @@ finalize_it: // set new filter, inverted // perform else (if any messages) static rsRetVal -execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *newAct; int i; @@ -345,7 +345,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) } if(stmt->d.s_if.t_then != NULL) { - scriptExec(stmt->d.s_if.t_then, pBatch, newAct); + scriptExec(stmt->d.s_if.t_then, pBatch, newAct, pWti); } if(stmt->d.s_if.t_else != NULL) { for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { @@ -355,7 +355,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) && (active == NULL || active[i])) newAct[i] = !newAct[i]; } - scriptExec(stmt->d.s_if.t_else, pBatch, newAct); + scriptExec(stmt->d.s_if.t_else, pBatch, newAct, pWti); } freeActive(newAct); finalize_it: @@ -364,7 +364,7 @@ finalize_it: /* for details, see scriptExec() header comment! */ static void -execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *newAct; msg_t *pMsg; @@ -391,7 +391,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) } if(stmt->d.s_prifilt.t_then != NULL) { - scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct); + scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct, pWti); } if(stmt->d.s_prifilt.t_else != NULL) { for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { @@ -401,7 +401,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) && (active == NULL || active[i])) newAct[i] = !newAct[i]; } - scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct); + scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct, pWti); } freeActive(newAct); } @@ -500,7 +500,7 @@ done: /* for details, see scriptExec() header comment! */ static void -execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *thenAct; sbool bRet; @@ -519,7 +519,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]); } - scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct); + scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct, pWti); freeActive(thenAct); } @@ -534,7 +534,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) * rgerhards, 2012-09-04 */ static rsRetVal -scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) +scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti) { DEFiRet; struct cnfstmt *stmt; @@ -552,7 +552,7 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) execStop(pBatch, active); break; case S_ACT: - execAct(stmt, pBatch, active); + execAct(stmt, pBatch, active, pWti); break; case S_SET: execSet(stmt, pBatch, active); @@ -561,16 +561,16 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) execUnset(stmt, pBatch, active); break; case S_CALL: - execCall(stmt, pBatch, active); + execCall(stmt, pBatch, active, pWti); break; case S_IF: - execIf(stmt, pBatch, active); + execIf(stmt, pBatch, active, pWti); break; case S_PRIFILT: - execPRIFILT(stmt, pBatch, active); + execPRIFILT(stmt, pBatch, active, pWti); break; case S_PROPFILT: - execPROPFILT(stmt, pBatch, active); + execPROPFILT(stmt, pBatch, active, pWti); break; default: dbgprintf("error: unknown stmt type %u during exec\n", @@ -589,7 +589,7 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) * rgerhards, 2005-10-13 */ static rsRetVal -processBatch(batch_t *pBatch) +processBatch(batch_t *pBatch, wti_t *pWti) { ruleset_t *pThis; DEFiRet; @@ -601,9 +601,9 @@ processBatch(batch_t *pBatch) if(pThis == NULL) pThis = ourConf->rulesets.pDflt; ISOBJ_TYPE_assert(pThis, ruleset); - CHKiRet(scriptExec(pThis->root, pBatch, NULL)); + CHKiRet(scriptExec(pThis->root, pBatch, NULL, pWti)); } else { - CHKiRet(processBatchMultiRuleset(pBatch)); + CHKiRet(processBatchMultiRuleset(pBatch, pWti)); } finalize_it: diff --git a/runtime/ruleset.h b/runtime/ruleset.h index 9905b53c..0af3578c 100644 --- a/runtime/ruleset.h +++ b/runtime/ruleset.h @@ -46,7 +46,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Destruct)(ruleset_t **ppThis); rsRetVal (*DestructAllActions)(rsconf_t *conf); rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName); - rsRetVal (*ProcessBatch)(batch_t*); + rsRetVal (*ProcessBatch)(batch_t*, wti_t *); rsRetVal (*GetRuleset)(rsconf_t *conf, ruleset_t **ppThis, uchar*); rsRetVal (*SetDefaultRuleset)(rsconf_t *conf, uchar*); rsRetVal (*SetCurrRuleset)(rsconf_t *conf, uchar*); @@ -64,8 +64,9 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ /* AddRule() removed */ /*TODO:REMOVE*/rsRetVal (*IterateAllActions)(rsconf_t *conf, rsRetVal (*pFunc)(void*, void*), void* pParam); void (*AddScript)(ruleset_t *pThis, struct cnfstmt *script); + /* v8: changed processBatch interface */ ENDinterface(ruleset) -#define rulesetCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */ +#define rulesetCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */ /* prototypes */ diff --git a/runtime/wti.c b/runtime/wti.c index 77197a95..c2077a51 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -44,6 +44,7 @@ #include "wti.h" #include "obj.h" #include "glbl.h" +#include "action.h" #include "atomic.h" /* static data */ @@ -171,6 +172,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); + free(pThis->actWrkrInfo); pthread_cond_destroy(&pThis->pcondBusy); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); @@ -197,11 +199,15 @@ wtiConstructFinalize(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n", + wtiGetDbgHdr(pThis), iActionNbr); /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = RSFALSE; + /* must use calloc as we need zero-init */ + CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t))); + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); @@ -279,6 +285,7 @@ wtiWorker(wti_t *pThis) rsRetVal localRet; rsRetVal terminateRet; int iCancelStateSave; + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -288,6 +295,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +dbgprintf("DDDD: wti %p: worker starting\n", pThis); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -334,9 +342,19 @@ wtiWorker(wti_t *pThis) bInactivityTOOccured = 0; /* reset for next run */ } + DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis); + for(i = 0 ; i < iActionNbr ; ++i) { + dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData); + if(pThis->actWrkrInfo[i].actWrkrData != NULL) { + dbgprintf("DDDD: calling freeWrkrData!\n"); + pThis->actWrkrInfo[i].pAction->pMod->mod.om.freeWrkrInstance(pThis->actWrkrInfo[i].actWrkrData); + } + } + /* indicate termination */ pthread_cleanup_pop(0); /* remove cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); +dbgprintf("DDDD: wti %p: worker exiting\n", pThis); RETiRet; } diff --git a/runtime/wti.h b/runtime/wti.h index b0dc6c96..8c43c22e 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -28,6 +28,11 @@ #include "batch.h" +typedef struct actWrkrInfo { + action_t *pAction; + void *actWrkrData; +} actWrkrInfo_t; + /* the worker thread instance class */ struct wti_s { BEGINobjInstance; @@ -37,6 +42,7 @@ struct wti_s { wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ + actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */ pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); }; diff --git a/runtime/wtp.c b/runtime/wtp.c index 895c1ffe..66942e6b 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -8,7 +8,7 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -91,6 +91,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); pthread_attr_init(&pThis->attrThrd); /* Set thread scheduling policy to default */ +#warning do we need this any longer? I think it was a cure for an already fixed bug.. #ifdef HAVE_PTHREAD_SETSCHEDPARAM pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy); pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param); @@ -121,7 +122,8 @@ wtpConstructFinalize(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker thread pool (numworkerThreads %d)\n", + wtpGetDbgHdr(pThis), pThis->iNumWorkerThreads); /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ |