diff options
-rw-r--r-- | action.c | 155 | ||||
-rw-r--r-- | action.h | 9 | ||||
-rw-r--r-- | doc/multi_ruleset.html | 9 | ||||
-rw-r--r-- | doc/rsyslog_conf_modules.html | 34 | ||||
-rw-r--r-- | grammar/rainerscript.c | 2 | ||||
-rw-r--r-- | plugins/impstats/impstats.c | 2 | ||||
-rw-r--r-- | plugins/mmanon/mmanon.c | 15 | ||||
-rw-r--r-- | plugins/omelasticsearch/README | 4 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 228 | ||||
-rw-r--r-- | plugins/omruleset/omruleset.c | 4 | ||||
-rw-r--r-- | plugins/omtesting/omtesting.c | 9 | ||||
-rw-r--r-- | runtime/module-template.h | 57 | ||||
-rw-r--r-- | runtime/modules.c | 3 | ||||
-rw-r--r-- | runtime/modules.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 26 | ||||
-rw-r--r-- | runtime/queue.h | 8 | ||||
-rw-r--r-- | runtime/rsconf.c | 1 | ||||
-rw-r--r-- | runtime/ruleset.c | 46 | ||||
-rw-r--r-- | runtime/ruleset.h | 5 | ||||
-rw-r--r-- | runtime/wti.c | 20 | ||||
-rw-r--r-- | runtime/wti.h | 6 | ||||
-rw-r--r-- | runtime/wtp.c | 6 | ||||
-rw-r--r-- | tools/omdiscard.c | 17 | ||||
-rw-r--r-- | tools/omfile.c | 109 | ||||
-rw-r--r-- | tools/omfwd.c | 17 | ||||
-rw-r--r-- | tools/ompipe.c | 23 | ||||
-rw-r--r-- | tools/omshell.c | 22 | ||||
-rw-r--r-- | tools/omusrmsg.c | 17 | ||||
-rw-r--r-- | tools/syslogd.c | 4 |
29 files changed, 576 insertions, 284 deletions
@@ -119,10 +119,10 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -175,10 +175,9 @@ configSettings_t cs_save; /* our saved (scope!) config settings */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice * to have during one instance of an rsyslogd run. For example, I use them to name actions when there - * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep - * counting. -- rgerhards, 2008-01-29 + * is no better name available. */ -static int iActionNbr = 0; +int iActionNbr = 0; /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfparamdescr[] = { @@ -308,7 +307,6 @@ rsRetVal actionDestruct(action_t *pThis) pThis->pMod->freeInstance(pThis->pModData); pthread_mutex_destroy(&pThis->mutAction); - pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); @@ -341,7 +339,7 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ - pthread_mutex_init(&pThis->mutActExec, NULL); + pThis->iActionNbr = iActionNbr; pthread_mutex_init(&pThis->mutAction, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); @@ -428,7 +426,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, - (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); + processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); @@ -687,6 +685,22 @@ finalize_it: } +static rsRetVal +actionCheckAndCreateWrkrInstance(action_t *pThis, wti_t *pWti) +{ + DEFiRet; + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { +dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr); + DBGPRINTF("we need to create a new action worker instance for " + "action %d\n", pThis->iActionNbr); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + } +finalize_it: + RETiRet; +} + /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ @@ -731,18 +745,19 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; assert(pThis != NULL); + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ if(pThis->eState == ACT_STATE_RDY) { - iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionSetState(pThis, ACT_STATE_ITX); @@ -905,17 +920,21 @@ done: RETiRet; * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", + getActStateName(pThis), pThis->iActionNbr); + + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -952,19 +971,19 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static inline rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +static rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); iRet = getReturnCode(pThis); finalize_it: @@ -977,7 +996,7 @@ finalize_it: * rgerhards, 2008-01-28 */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch) +finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -989,9 +1008,9 @@ finishBatch(action_t *pThis, batch_t *pBatch) FINALIZE; /* nothing to do */ } - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); if(pThis->eState == ACT_STATE_ITX) { - iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -1034,7 +1053,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti) { int i; int iElemProcessed; @@ -1059,7 +1078,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) if(batchIsValidElem(pBatch, i)) { pMsg = pBatch->pElem[i].pMsg; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate); + pBatch->pbShutdownImmediate, pWti); DBGPRINTF("action %p call returned %d\n", pAction, localRet); /* Note: we directly modify the batch object state, because we know that * wo do not overwrite BATCH_STATE_DISC indicators! @@ -1111,7 +1130,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti) { int i; int bDone; @@ -1125,7 +1144,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) wasDoneTo = pBatch->iDoneUpTo; bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem); + localRet = tryDoAction(pAction, pBatch, &nElem, pWti); if(localRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } @@ -1135,7 +1154,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) /* try commit transaction, once done, we can simply do so as if * that return state was returned from tryDoAction(). */ - localRet = finishBatch(pAction, pBatch); + localRet = finishBatch(pAction, pBatch, pWti); } if( localRet == RS_RET_OK @@ -1164,8 +1183,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); + submitBatch(pAction, pBatch, nElem / 2, pWti); + submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti); bDone = 1; } } @@ -1245,31 +1264,31 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR * rgerhards, 2009-05-12 */ static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti) { DEFiRet; assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); - iRet = finishBatch(pAction, pBatch); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti)); + iRet = finishBatch(pAction, pBatch, pWti); finalize_it: RETiRet; } -#pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { int *pbShutdownImmdtSave; sbool *activeSave; int bMustRestoreActivePtr = 0; rsRetVal localRet; + action_t *pAction = (action_t*) pVoid; DEFiRet; assert(pBatch != NULL); @@ -1280,17 +1299,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) } CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - - iRet = processAction(pAction, pBatch); - - pthread_cleanup_pop(1); /* unlock mutex */ + iRet = processAction(pAction, pBatch, pWti); /* even if processAction failed, we need to release the batch (else we * have a memory leak). So we do this first, and then check if we need to @@ -1312,15 +1321,12 @@ finalize_it: pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* call the HUP handler for a given action, if such a handler is defined. The - * action mutex is locked, because the HUP handler most probably needs to modify - * some internal state information. - * rgerhards, 2008-10-22 +/* call the HUP handler for a given action, if such a handler is defined. + * Note that the action must be able to service HUP requests concurrently + * to any current doAction() processing. */ -#pragma GCC diagnostic ignored "-Wempty-body" rsRetVal actionCallHUPHdlr(action_t *pAction) { @@ -1329,19 +1335,13 @@ actionCallHUPHdlr(action_t *pAction) ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); - if(pAction->pMod->doHUP == NULL) { - FINALIZE; /* no HUP handler, so we are done ;) */ + if(pAction->pMod->doHUP != NULL) { + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); } - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - CHKiRet(pAction->pMod->doHUP(pAction->pModData)); - pthread_cleanup_pop(1); /* unlock mutex */ - finalize_it: RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* set the action message queue mode @@ -1381,7 +1381,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT * rgerhards, 2010-06-08 */ static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1392,7 +1392,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); + iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti); else iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); @@ -1409,7 +1409,7 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction, msg_t *pMsg) +actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1464,7 +1464,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg); + iRet = doSubmitToActionQ(pAction, pMsg, pWti); finalize_it: RETiRet; @@ -1475,7 +1475,7 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) { msg_t *pMsg; DEFiRet; @@ -1490,7 +1490,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) } /* call the output driver */ - iRet = actionWriteToAction(pAction, pMsg); + iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: /* we need to update the batch to handle failover processing correctly */ @@ -1550,7 +1550,7 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { time_t now = 0; time_t lastAct; @@ -1588,7 +1588,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) } } - iRet = doSubmitToActionQBatch(pAction, pBatch); + iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); free(pBatch->active); pBatch->active = activeSave; @@ -1613,7 +1613,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch) * rgerhards, 2011-06-16 */ static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) +doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { sbool bNeedSubmit; sbool *activeSave; @@ -1645,14 +1645,14 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) } if(bNeedSubmit) { /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); } else { DBGPRINTF("no need to submit batch, all invalid\n"); } } else { if(GatherStats) countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); } free(pBatch->active); @@ -1666,7 +1666,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -1674,7 +1674,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch); + iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ @@ -1684,7 +1684,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); + doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); } } } @@ -1699,7 +1699,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2010-06-23 */ static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -1712,7 +1712,7 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i); + doActionCallAction(pAction, pBatch, i, pWti); } } @@ -1724,13 +1724,14 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); +dbgprintf("DDDD: locked mutAction\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ @@ -51,6 +51,7 @@ struct action_s { time_t tActNow; /* the current time for an action execution. Initially set to -1 and populated on an as-needed basis. This is a performance optimization. */ time_t tLastExec; /* time this action was last executed */ + int iActionNbr; /* this action's number (ID) */ sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */ sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */ int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */ @@ -68,7 +69,7 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - rsRetVal (*submitToActQ)(action_t *, batch_t *);/* function submit message to action queue */ + rsRetVal (*submitToActQ)(action_t *, batch_t *, wti_t*);/* function submit message to action queue */ rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2, ACT_JSON_PASSING = 3} @@ -78,7 +79,6 @@ struct action_s { * in this order. */ qqueue_t *pQueue; /* action queue */ pthread_mutex_t mutAction; /* primary action mutex */ - pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ uchar *pszName; /* action name (for documentation) */ DEF_ATOMIC_HELPER_MUT(mutCAS); /* for statistics subsystem */ @@ -96,7 +96,7 @@ rsRetVal actionDestruct(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); -rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg); +rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct nvlst *lst, int bSuspended); @@ -104,4 +104,7 @@ rsRetVal activateActions(void); rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction); rsRetVal actionProcessCnf(struct cnfobj *o); +/* external data */ +extern int iActionNbr; + #endif /* #ifndef ACTION_H_INCLUDED */ diff --git a/doc/multi_ruleset.html b/doc/multi_ruleset.html index 14a761c5..1be2c81e 100644 --- a/doc/multi_ruleset.html +++ b/doc/multi_ruleset.html @@ -89,6 +89,15 @@ modify the ruleset to which the next input is bound but rather provides a system default rule set for those inputs that did not explicitly bind to one. As such, the directive can not be used as a work-around to bind inputs to non-default rulesets that do not support ruleset binding. + +<h2>Rulesets and Main Queues</h2> +By default, rulesets do not have their own queue. It must be activated via +the $RulesetCreateMainQueue directive, or if using rainerscript format, by +specifying queue parameters on the ruleset directive, e.g. +ruleset(name="whatever" queue.type="fixedArray" queue. ...) +See <a href="http://www.rsyslog.com/doc/queue_parameters.html">http://www.rsyslog.com/doc/queue_parameters.html</a> +for more details. + <h2>Examples</h2> <h3>Split local and remote logging</h3> <p>Let's say you have a pretty standard system that logs its local messages to the usual diff --git a/doc/rsyslog_conf_modules.html b/doc/rsyslog_conf_modules.html index 8dc3ed56..2917ae56 100644 --- a/doc/rsyslog_conf_modules.html +++ b/doc/rsyslog_conf_modules.html @@ -53,28 +53,28 @@ to message generators. and messages be transmitted to various different targets. <ul> <li><a href="omfile.html">omfile</a> - file output module</li> -<li><a href="omfwd.html">omfwd</a> - syslog forwarding output module</li> -<li><a href="omjournal.html">omjournal</a> - Linux journal output module</li> +<li><a href="omfwd.html">omfwd</a> (does NOT yet work in v8) - syslog forwarding output module</li> +<li><a href="omjournal.html">omjournal</a> (does NOT yet work in v8) - Linux journal output module</li> <li><a href="ompipe.html">ompipe</a> - named pipe output module</li> <li><a href="omusrmsg.html">omusrmsg</a> - user message output module</li> -<li><a href="omsnmp.html">omsnmp</a> - SNMP trap output module</li> -<li><a href="omstdout.html">omtdout</a> - stdout output module (mainly a test tool)</li> -<li><a href="omrelp.html">omrelp</a> - RELP output module</li> -<li><a href="omruleset.html">omruleset</a> - forward message to another ruleset</li> -<li>omgssapi - output module for GSS-enabled syslog</li> -<li><a href="ommysql.html">ommysql</a> - output module for MySQL</li> -<li>ompgsql - output module for PostgreSQL</li> -<li><a href="omlibdbi.html">omlibdbi</a> - +<li><a href="omsnmp.html">omsnmp</a> (does NOT yet work in v8) - SNMP trap output module</li> +<li><a href="omstdout.html">omtdout</a> (does NOT yet work in v8) - stdout output module (mainly a test tool)</li> +<li><a href="omrelp.html">omrelp</a> (does NOT yet work in v8) - RELP output module</li> +<li><a href="omruleset.html">omruleset</a> (does NOT yet work in v8) - forward message to another ruleset</li> +<li>omgssapi (does NOT yet work in v8) - output module for GSS-enabled syslog</li> +<li><a href="ommysql.html">ommysql</a> (does NOT yet work in v8) - output module for MySQL</li> +<li>ompgsql (does NOT yet work in v8) - output module for PostgreSQL</li> +<li><a href="omlibdbi.html">omlibdbi</a> (does NOT yet work in v8) - generic database output module (Firebird/Interbase, MS SQL, Sybase, SQLLite, Ingres, Oracle, mSQL)</li> -<li><a href="ommail.html">ommail</a> - +<li><a href="ommail.html">ommail</a> (does NOT yet work in v8) - permits rsyslog to alert folks by mail if something important happens</li> -<li><a href="omprog.html">omprog</a> - permits sending messages to a program for custom processing</li> -<li><a href="omoracle.html">omoracle</a> - output module for Oracle (native OCI interface)</li> -<li><a href="omudpspoof.html">omudpspoof</a> - output module sending UDP syslog messages with a spoofed address</li> -<li><a href="omuxsock.html">omuxsock</a> - output module Unix domain sockets</li> -<li><a href="omhdfs.html">omhdfs</a> - output module for Hadoop's HDFS file system</li> -<li><a href="ommongodb.html">ommongodb</a> - output module for MongoDB</li> +<li><a href="omprog.html">omprog</a> (does NOT yet work in v8) - permits sending messages to a program for custom processing</li> +<li><a href="omoracle.html">omoracle</a> (orphaned) - output module for Oracle (native OCI interface)</li> +<li><a href="omudpspoof.html">omudpspoof</a> (does NOT yet work in v8) - output module sending UDP syslog messages with a spoofed address</li> +<li><a href="omuxsock.html">omuxsock</a> (does NOT yet work in v8) - output module Unix domain sockets</li> +<li><a href="omhdfs.html">omhdfs</a> (does NOT yet work in v8) - output module for Hadoop's HDFS file system</li> +<li><a href="ommongodb.html">ommongodb</a> (does NOT yet work in v8) - output module for MongoDB</li> <li><a href="omelasticsearch.html">omelasticsearch</a> - output module for ElasticSearch</li> </ul> diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c index 53839474..76b91940 100644 --- a/grammar/rainerscript.c +++ b/grammar/rainerscript.c @@ -2427,7 +2427,7 @@ cnfstmtPrintOnly(struct cnfstmt *stmt, int indent, sbool subtree) free(cstr); break; case S_ACT: - doIndent(indent); dbgprintf("ACTION %p [%s:%s]\n", stmt->d.act, + doIndent(indent); dbgprintf("ACTION %d [%s:%s]\n", stmt->d.act->iActionNbr, modGetName(stmt->d.act->pMod), stmt->printable); break; case S_IF: diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 694c07c4..a883ef1b 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -377,7 +377,6 @@ checkRuleset(modConfData_t *modConf) DEFiRet; modConf->pBindRuleset = NULL; /* assume default ruleset */ -dbgprintf("DDDD: impstats ruleset %s\n", modConf->pszBindRuleset); if(modConf->pszBindRuleset == NULL) FINALIZE; @@ -390,7 +389,6 @@ dbgprintf("DDDD: impstats ruleset %s\n", modConf->pszBindRuleset); CHKiRet(localRet); modConf->pBindRuleset = pRuleset; finalize_it: -dbgprintf("DDDD: impstats ruleset ptr %p\n", modConf->pBindRuleset); RETiRet; } diff --git a/plugins/mmanon/mmanon.c b/plugins/mmanon/mmanon.c index 16a4f34b..c714706a 100644 --- a/plugins/mmanon/mmanon.c +++ b/plugins/mmanon/mmanon.c @@ -71,6 +71,10 @@ typedef struct _instanceData { } ipv4; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -119,6 +123,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -130,6 +138,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + static inline void setInstParamDefaults(instanceData *pData) { @@ -354,6 +367,7 @@ BEGINdoAction int lenMsg; int i; CODESTARTdoAction + pData = pWrkrData->pData; pMsg = (msg_t*) ppString[0]; lenMsg = getMSGLen(pMsg); msg = getMSG(pMsg); @@ -387,6 +401,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/omelasticsearch/README b/plugins/omelasticsearch/README index 9021bc0e..b8bf4151 100644 --- a/plugins/omelasticsearch/README +++ b/plugins/omelasticsearch/README @@ -1,3 +1,7 @@ +How to access ElasticSearch on local machine (for testing): +=========================================================== +see: https://github.com/mobz/elasticsearch-head + How to produce an error: ======================== It's quite easy to get 400, if you put a wrong mapping to your diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index eb82c35f..b878050d 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -69,8 +69,8 @@ STATSCOUNTER_DEF(indexESFail, mutIndexESFail) typedef struct curl_slist HEADER; typedef struct _instanceData { int port; - int replyLen; int fdErrFile; /* error file fd or -1 if not open */ + pthread_mutex_t mutErrFile; uchar *server; uchar *uid; uchar *pwd; @@ -80,24 +80,29 @@ typedef struct _instanceData { uchar *tplName; uchar *timeout; uchar *bulkId; - uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; - char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; sbool dynBulkId; sbool bulkmode; sbool asyncRepl; +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + int replyLen; + char *reply; + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ + uchar *restURL; /* last used URL for error reporting */ struct { es_str_t *data; int nmemb; /* number of messages in batch (for statistics counting) */ uchar *currTpl1; uchar *currTpl2; } batch; - CURL *curlHandle; /* libcurl session handle */ - HEADER *postHeader; /* json POST request info */ -} instanceData; +} wrkrInstanceData_t; /* tables for interfacing with the v6 config system */ @@ -117,7 +122,7 @@ static struct cnfparamdescr actpdescr[] = { { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 }, + { "template", eCmdHdlrGetWord, 0 }, { "dynbulkid", eCmdHdlrBinary, 0 }, { "bulkid", eCmdHdlrGetWord, 0 }, }; @@ -127,12 +132,32 @@ static struct cnfparamblk actpblk = actpdescr }; +static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData); + BEGINcreateInstance CODESTARTcreateInstance - pData->restURL = NULL; pData->fdErrFile = -1; + pthread_mutex_init(&pData->mutErrFile, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +dbgprintf("omelasticsearch: createWrkrInstance\n"); + pWrkrData->restURL = NULL; + if(pData->bulkmode) { + pWrkrData->batch.currTpl1 = NULL; + pWrkrData->batch.currTpl2 = NULL; + if((pWrkrData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + CHKiRet(curlSetup(pWrkrData, pWrkrData->pData)); +finalize_it: +dbgprintf("DDDD: createWrkrInstance,pData %p/%p, pWrkrData %p\n", pData, pWrkrData->pData, pWrkrData); +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -141,16 +166,9 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - if (pData->postHeader) { - curl_slist_free_all(pData->postHeader); - pData->postHeader = NULL; - } - if (pData->curlHandle) { - curl_easy_cleanup(pData->curlHandle); - pData->curlHandle = NULL; - } if(pData->fdErrFile != -1) close(pData->fdErrFile); + pthread_mutex_destroy(&pData->mutErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -159,11 +177,23 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); - free(pData->restURL); free(pData->errorFile); free(pData->bulkId); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->postHeader) { + curl_slist_free_all(pWrkrData->postHeader); + pWrkrData->postHeader = NULL; + } + if(pWrkrData->curlHandle) { + curl_easy_cleanup(pWrkrData->curlHandle); + pWrkrData->curlHandle = NULL; + } + free(pWrkrData->restURL); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("omelasticsearch\n"); @@ -211,7 +241,7 @@ setBaseURL(instanceData *pData, es_str_t **url) static inline rsRetVal -checkConn(instanceData *pData) +checkConn(wrkrInstanceData_t *pWrkrData) { es_str_t *url; CURL *curl = NULL; @@ -219,7 +249,7 @@ checkConn(instanceData *pData) char *cstr; DEFiRet; - setBaseURL(pData, &url); + setBaseURL(pWrkrData->pData, &url); curl = curl_easy_init(); if(curl == NULL) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); @@ -235,16 +265,16 @@ checkConn(instanceData *pData) curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - pData->reply = NULL; - pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } - free(pData->reply); + free(pWrkrData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -257,7 +287,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); - iRet = checkConn(pData); + iRet = checkConn(pWrkrData); ENDtryResume @@ -330,7 +360,7 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, static rsRetVal -setCurlURL(instanceData *pData, uchar **tpls) +setCurlURL(wrkrInstanceData_t *pWrkrData, instanceData *pData, uchar **tpls) { char authBuf[1024]; uchar *searchIndex; @@ -368,11 +398,11 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - free(pData->restURL); - pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + free(pWrkrData->restURL); + pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_URL, pWrkrData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -383,8 +413,8 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); - curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: RETiRet; @@ -396,7 +426,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar **tpls) +buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -411,29 +441,29 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); - r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId); + r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType, ustrlen(searchType)); if(parent != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent)); } if(bulkId != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId)); } - if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); } - ++pData->batch.nmemb; + ++pWrkrData->batch.nmemb; iRet = RS_RET_DEFER_COMMIT; finalize_it: @@ -446,7 +476,7 @@ finalize_it: * needs to be closed, HUP must be sent. */ static inline rsRetVal -writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) { char *rendered = NULL; cJSON *errRoot; @@ -454,6 +484,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON *replyRoot = *pReplyRoot; size_t toWrite; ssize_t wrRet; + sbool bMutLocked = 0; char errStr[1024]; DEFiRet; @@ -463,6 +494,9 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) FINALIZE; } + pthread_mutex_lock(&pData->mutErrFile); + bMutLocked = 1; + if(pData->fdErrFile == -1) { pData->fdErrFile = open((char*)pData->errorFile, O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, @@ -474,7 +508,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) } } if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); - cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pWrkrData->restURL)); cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); @@ -495,13 +529,15 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) *pReplyRoot = NULL; /* tell caller not to delete once again! */ finalize_it: + if(bMutLocked) + pthread_mutex_unlock(&pData->mutErrFile); free(rendered); RETiRet; } static inline rsRetVal -checkResultBulkmode(instanceData *pData, cJSON *root) +checkResultBulkmode(wrkrInstanceData_t *pWrkrData, cJSON *root) { int i; int numitems; @@ -515,7 +551,7 @@ checkResultBulkmode(instanceData *pData, cJSON *root) if(items == NULL || items->type != cJSON_Array) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " "bulkmode insert does not return array, reply is: %s\n", - pData->reply); + pWrkrData->reply); ABORT_FINALIZE(RS_RET_DATAFAIL); } numitems = cJSON_GetArraySize(items); @@ -547,20 +583,20 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData, uchar *reqmsg) +checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) { cJSON *root; cJSON *ok; DEFiRet; - root = cJSON_Parse(pData->reply); + root = cJSON_Parse(pWrkrData->reply); if(root == NULL) { DBGPRINTF("omelasticsearch: could not parse JSON result \n"); ABORT_FINALIZE(RS_RET_ERR); } - if(pData->bulkmode) { - iRet = checkResultBulkmode(pData, root); + if(pWrkrData->pData->bulkmode) { + iRet = checkResultBulkmode(pWrkrData, root); } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { @@ -572,7 +608,7 @@ checkResult(instanceData *pData, uchar *reqmsg) * these in any case. */ if(iRet == RS_RET_DATAFAIL) { - writeDataError(pData, &root, reqmsg); + writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg); iRet = RS_RET_OK; /* we have handled the problem! */ } @@ -587,19 +623,19 @@ finalize_it: static rsRetVal -curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs) +curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, int nmsgs) { CURLcode code; - CURL *curl = pData->curlHandle; + CURL *curl = pWrkrData->curlHandle; DEFiRet; - pData->reply = NULL; - pData->replyLen = 0; + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; - if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) - CHKiRet(setCurlURL(pData, tpls)); + if(pWrkrData->pData->dynSrchIdx || pWrkrData->pData->dynSrchType || pWrkrData->pData->dynParent) + CHKiRet(setCurlURL(pWrkrData, pWrkrData->pData, tpls)); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); @@ -618,27 +654,27 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsg break; } - DBGPRINTF("omelasticsearch: pData replyLen = '%d'\n", pData->replyLen); - if (pData->replyLen > 0) { - pData->reply[pData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ + DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen); + if(pWrkrData->replyLen > 0) { + pWrkrData->reply[pWrkrData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ } - DBGPRINTF("omelasticsearch: pData reply: '%s'\n", pData->reply); + DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply); - CHKiRet(checkResult(pData, message)); + CHKiRet(checkResult(pWrkrData, message)); finalize_it: - free(pData->reply); + free(pWrkrData->reply); RETiRet; } BEGINbeginTransaction CODESTARTbeginTransaction -dbgprintf("omelasticsearch: beginTransaction\n"); - if(!pData->bulkmode) { +dbgprintf("omelasticsearch: beginTransaction, pWrkrData %p, pData %p\n", pWrkrData, pWrkrData->pData); + if(!pWrkrData->pData->bulkmode) { FINALIZE; } - es_emptyStr(pData->batch.data); - pData->batch.nmemb = 0; + es_emptyStr(pWrkrData->batch.data); + pWrkrData->batch.nmemb = 0; finalize_it: ENDbeginTransaction @@ -646,14 +682,14 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString)); + if(pWrkrData->pData->bulkmode) { + CHKiRet(buildBatch(pWrkrData, ppString[0], ppString)); } else { - CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]), ppString, 1)); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pWrkrData->pData->bulkmode); ENDdoAction @@ -662,13 +698,13 @@ BEGINendTransaction CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); /* End Transaction only if batch data is not empty */ - if (pData->batch.data != NULL ) { - cstr = es_str2cstr(pData->batch.data, NULL); + if (pWrkrData->batch.data != NULL ) { + cstr = es_str2cstr(pWrkrData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb)); + CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb)); } else - dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n"); + dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, nothing to send. \n"); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -679,24 +715,24 @@ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { char *p = (char *)ptr; - instanceData *pData = (instanceData*) userdata; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) userdata; char *buf; size_t newlen; - newlen = pData->replyLen + size*nmemb; - if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + newlen = pWrkrData->replyLen + size*nmemb; + if((buf = realloc(pWrkrData->reply, newlen + 1)) == NULL) { DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); return 0; /* abort due to failure */ } - memcpy(buf+pData->replyLen, p, size*nmemb); - pData->replyLen = newlen; - pData->reply = buf; + memcpy(buf+pWrkrData->replyLen, p, size*nmemb); + pWrkrData->replyLen = newlen; + pWrkrData->reply = buf; return size*nmemb; } static rsRetVal -curlSetup(instanceData *pData) +curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData) { HEADER *header; CURL *handle; @@ -712,13 +748,13 @@ curlSetup(instanceData *pData) curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); curl_easy_setopt(handle, CURLOPT_POST, 1); - pData->curlHandle = handle; - pData->postHeader = header; + pWrkrData->curlHandle = handle; + pWrkrData->postHeader = header; if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL); + setCurlURL(pWrkrData, pData, NULL); } if(Debug) { @@ -838,16 +874,6 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } - if(pData->bulkmode) { - pData->batch.currTpl1 = NULL; - pData->batch.currTpl2 = NULL; - if((pData->batch.data = es_newStr(1024)) == NULL) { - DBGPRINTF("omelasticsearch: error creating batch string " - "turned off bulk mode\n"); - pData->bulkmode = 0; /* at least it works */ - } - } - iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; @@ -939,9 +965,6 @@ CODESTARTnewActInst pData->searchIndex = (uchar*) strdup("system"); if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); - - CHKiRet(curlSetup(pData)); - CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst @@ -979,6 +1002,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_doHUP diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c index 11765507..2908095e 100644 --- a/plugins/omruleset/omruleset.c +++ b/plugins/omruleset/omruleset.c @@ -70,6 +70,10 @@ typedef struct _instanceData { uchar *pszRulesetName; /* primarily for debugging/display purposes */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { ruleset_t *pRuleset; /* ruleset to enqueue message to (NULL = Default, not recommended) */ uchar *pszRulesetName; diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c index c9f1e06b..ffb1ff8b 100644 --- a/plugins/omtesting/omtesting.c +++ b/plugins/omtesting/omtesting.c @@ -63,7 +63,6 @@ MODULE_CNFNAME("omtesting") */ DEF_OMOD_STATIC_DATA - typedef struct _instanceData { enum { MD_SLEEP, MD_FAIL, MD_RANDFAIL, MD_ALWAYS_SUSPEND } mode; @@ -76,6 +75,10 @@ typedef struct _instanceData { int iCurrRetries; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { int bEchoStdout; /* echo non-failed messages to stdout */ } configSettings_t; @@ -170,11 +173,11 @@ static rsRetVal doRandFail(void) BEGINtryResume CODESTARTtryResume dbgprintf("omtesting tryResume() called\n"); - switch(pData->mode) { + switch(pWrkrData->pData->mode) { case MD_SLEEP: break; case MD_FAIL: - iRet = doFailOnResume(pData); + iRet = doFailOnResume(pWrkrData->pData); break; case MD_RANDFAIL: iRet = doRandFail(); diff --git a/runtime/module-template.h b/runtime/module-template.h index 8a958f90..c3e79570 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -175,6 +175,44 @@ static rsRetVal freeInstance(void* pModData)\ RETiRet;\ } +/* createWrkrInstance() + */ +#define BEGINcreateWrkrInstance \ +static rsRetVal createWrkrInstance(wrkrInstanceData_t **ppWrkrData, instanceData *pData)\ + {\ + DEFiRet; /* store error code here */\ + wrkrInstanceData_t *pWrkrData; /* use this to point to data elements */ + +#define CODESTARTcreateWrkrInstance \ + if((pWrkrData = calloc(1, sizeof(wrkrInstanceData_t))) == NULL) {\ + *ppWrkrData = NULL;\ + ENDfunc \ + return RS_RET_OUT_OF_MEMORY;\ + } \ + pWrkrData->pData = pData; + +#define ENDcreateWrkrInstance \ + *ppWrkrData = pWrkrData;\ + RETiRet;\ +} + +/* freeWrkrInstance */ +#define BEGINfreeWrkrInstance \ +static rsRetVal freeWrkrInstance(void* pd)\ +{\ + DEFiRet;\ + wrkrInstanceData_t *pWrkrData; + +#define CODESTARTfreeWrkrInstance \ + pWrkrData = (wrkrInstanceData_t*) pd; + +#define ENDfreeWrkrInstance \ + if(pWrkrData != NULL)\ + free(pWrkrData); /* we need to free this in any case */\ + RETiRet;\ +} + + /* isCompatibleWithFeature() */ #define BEGINisCompatibleWithFeature \ @@ -194,7 +232,7 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINbeginTransaction \ -static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal beginTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -209,8 +247,9 @@ static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINendTransaction \ -static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal endTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ + instanceData *pData = NULL; /* deliberately make module abort if it does not support new IF */\ DEFiRet; #define CODESTARTendTransaction /* currently empty, but may be extended */ @@ -223,8 +262,9 @@ static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ /* doAction() */ #define BEGINdoAction \ -static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, instanceData __attribute__((unused)) *pData)\ +static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ + instanceData *pData = NULL; /* deliberately make module abort if it does not support new IF */\ DEFiRet; #define CODESTARTdoAction \ @@ -382,12 +422,12 @@ static rsRetVal newInpInst(struct nvlst *lst)\ * rgerhard, 2007-08-02 */ #define BEGINtryResume \ -static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\ +static rsRetVal tryResume(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; #define CODESTARTtryResume \ - assert(pData != NULL); + assert(pWrkrData != NULL); #define ENDtryResume \ RETiRet;\ @@ -467,6 +507,13 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ *pEtryPoint = tryResume;\ } +/* standard queries for output module interface in rsyslog v8+ */ +#define CODEqueryEtryPt_STD_OMOD8_QUERIES \ + else if(!strcmp((char*) name, "createWrkrInstance")) {\ + *pEtryPoint = createWrkrInstance;\ + } else if(!strcmp((char*) name, "freeWrkrInstance")) {\ + *pEtryPoint = freeWrkrInstance;\ + } /* the following definition is queryEtryPt block that must be added * if an output module supports the transactional interface. diff --git a/runtime/modules.c b/runtime/modules.c index 56606306..52096082 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -652,6 +652,9 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"doAction", &pNew->mod.om.doAction)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume)); + CHKiRet((*pNew->modQueryEtryPt)((uchar*)"createWrkrInstance", &pNew->mod.om.createWrkrInstance)); + CHKiRet((*pNew->modQueryEtryPt)((uchar*)"freeWrkrInstance", &pNew->mod.om.freeWrkrInstance)); + /* try load optional interfaces */ localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP); if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) diff --git a/runtime/modules.h b/runtime/modules.h index 23df22d6..a0637ac9 100644 --- a/runtime/modules.h +++ b/runtime/modules.h @@ -139,6 +139,8 @@ struct modInfo_s { rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**); rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **); rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr); + rsRetVal (*createWrkrInstance)(void*ppWrkrData, void*pData); + rsRetVal (*freeWrkrInstance)(void*pWrkrData); } om; struct { /* data for library modules */ char dummy; diff --git a/runtime/queue.c b/runtime/queue.c index 66cb7218..968c016e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -959,7 +959,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; @@ -986,7 +986,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); @@ -999,7 +999,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) /* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead * otherwise incured. -- rgerhards, ~2010-06-23 */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti) { DEFiRet; @@ -1013,7 +1013,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL); + iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL); RETiRet; } @@ -1319,7 +1319,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*)) { DEFiRet; qqueue_t *pThis; @@ -1878,7 +1878,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2675,7 +2675,7 @@ finalize_it: /* now, the same function, but for direct mode */ static rsRetVal -qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti) { int i; DEFiRet; @@ -2684,7 +2684,7 @@ qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) assert(pMultiSub != NULL); for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2699,16 +2699,16 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg); + iRet = qAddDirect(pThis, pMsg, pWti); RETiRet; } -/* enqueue a new user data element +/* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ rsRetVal diff --git a/runtime/queue.h b/runtime/queue.h index 844523ad..91900b30 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -103,7 +103,7 @@ struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*, wti_t*,int*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero @@ -195,14 +195,14 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg); +rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*)); +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti); int queueCnfParamsSet(struct nvlst *lst); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst); void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index 8c808786..abce53b8 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -1205,6 +1205,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone! ABORT_FINALIZE(RS_RET_NO_ACTIONS); } tellLexEndParsing(); + DBGPRINTF("Number of actions in this configuration: %d\n", iActionNbr); rulesetOptimizeAll(loadConf); tellCoreConfigLoadDone(); diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 57cf6afe..db253d28 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -67,8 +67,8 @@ static struct cnfparamblk rspblk = }; /* forward definitions */ -static rsRetVal processBatch(batch_t *pBatch); -static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active); +static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti); +static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti); /* ---------- linked-list key handling functions (ruleset) ---------- */ @@ -168,7 +168,7 @@ finalize_it: * rgerhards, 2010-06-15 */ static inline rsRetVal -processBatchMultiRuleset(batch_t *pBatch) +processBatchMultiRuleset(batch_t *pBatch, wti_t *pWti) { ruleset_t *currRuleset; batch_t snglRuleBatch; @@ -206,7 +206,7 @@ processBatchMultiRuleset(batch_t *pBatch) snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ batchSetSingleRuleset(&snglRuleBatch, 1); /* process temp batch */ - processBatch(&snglRuleBatch); + processBatch(&snglRuleBatch, pWti); batchFree(&snglRuleBatch); } while(bHaveUnprocessed == 1); @@ -226,12 +226,12 @@ static inline void freeActive(sbool *active) { free(active); } /* for details, see scriptExec() header comment! */ /* call action for all messages with filter on */ static rsRetVal -execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { DEFiRet; dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); pBatch->active = active; - stmt->d.act->submitToActQ(stmt->d.act, pBatch); + stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti); RETiRet; } @@ -292,7 +292,7 @@ execStop(batch_t *pBatch, sbool *active) // set new filter, inverted // perform else (if any messages) static rsRetVal -execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *newAct; int i; @@ -321,7 +321,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) } if(stmt->d.s_if.t_then != NULL) { - scriptExec(stmt->d.s_if.t_then, pBatch, newAct); + scriptExec(stmt->d.s_if.t_then, pBatch, newAct, pWti); } if(stmt->d.s_if.t_else != NULL) { for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { @@ -331,7 +331,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) && (active == NULL || active[i])) newAct[i] = !newAct[i]; } - scriptExec(stmt->d.s_if.t_else, pBatch, newAct); + scriptExec(stmt->d.s_if.t_else, pBatch, newAct, pWti); } freeActive(newAct); finalize_it: @@ -340,7 +340,7 @@ finalize_it: /* for details, see scriptExec() header comment! */ static void -execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *newAct; msg_t *pMsg; @@ -367,7 +367,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) } if(stmt->d.s_prifilt.t_then != NULL) { - scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct); + scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct, pWti); } if(stmt->d.s_prifilt.t_else != NULL) { for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { @@ -377,7 +377,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) && (active == NULL || active[i])) newAct[i] = !newAct[i]; } - scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct); + scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct, pWti); } freeActive(newAct); } @@ -476,7 +476,7 @@ done: /* for details, see scriptExec() header comment! */ static void -execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *thenAct; sbool bRet; @@ -495,7 +495,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]); } - scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct); + scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct, pWti); freeActive(thenAct); } @@ -510,7 +510,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) * rgerhards, 2012-09-04 */ static rsRetVal -scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) +scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti) { DEFiRet; struct cnfstmt *stmt; @@ -528,7 +528,7 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) execStop(pBatch, active); break; case S_ACT: - execAct(stmt, pBatch, active); + execAct(stmt, pBatch, active, pWti); break; case S_SET: execSet(stmt, pBatch, active); @@ -537,16 +537,16 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) execUnset(stmt, pBatch, active); break; case S_CALL: - scriptExec(stmt->d.s_call.stmt, pBatch, active); + scriptExec(stmt->d.s_call.stmt, pBatch, active, pWti); break; case S_IF: - execIf(stmt, pBatch, active); + execIf(stmt, pBatch, active, pWti); break; case S_PRIFILT: - execPRIFILT(stmt, pBatch, active); + execPRIFILT(stmt, pBatch, active, pWti); break; case S_PROPFILT: - execPROPFILT(stmt, pBatch, active); + execPROPFILT(stmt, pBatch, active, pWti); break; default: dbgprintf("error: unknown stmt type %u during exec\n", @@ -565,7 +565,7 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) * rgerhards, 2005-10-13 */ static rsRetVal -processBatch(batch_t *pBatch) +processBatch(batch_t *pBatch, wti_t *pWti) { ruleset_t *pThis; DEFiRet; @@ -577,9 +577,9 @@ processBatch(batch_t *pBatch) if(pThis == NULL) pThis = ourConf->rulesets.pDflt; ISOBJ_TYPE_assert(pThis, ruleset); - CHKiRet(scriptExec(pThis->root, pBatch, NULL)); + CHKiRet(scriptExec(pThis->root, pBatch, NULL, pWti)); } else { - CHKiRet(processBatchMultiRuleset(pBatch)); + CHKiRet(processBatchMultiRuleset(pBatch, pWti)); } finalize_it: diff --git a/runtime/ruleset.h b/runtime/ruleset.h index 8bfd4920..05da144d 100644 --- a/runtime/ruleset.h +++ b/runtime/ruleset.h @@ -46,7 +46,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Destruct)(ruleset_t **ppThis); rsRetVal (*DestructAllActions)(rsconf_t *conf); rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName); - rsRetVal (*ProcessBatch)(batch_t*); + rsRetVal (*ProcessBatch)(batch_t*, wti_t *); rsRetVal (*GetRuleset)(rsconf_t *conf, ruleset_t **ppThis, uchar*); rsRetVal (*SetDefaultRuleset)(rsconf_t *conf, uchar*); rsRetVal (*SetCurrRuleset)(rsconf_t *conf, uchar*); @@ -64,8 +64,9 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ /* AddRule() removed */ /*TODO:REMOVE*/rsRetVal (*IterateAllActions)(rsconf_t *conf, rsRetVal (*pFunc)(void*, void*), void* pParam); void (*AddScript)(ruleset_t *pThis, struct cnfstmt *script); + /* v8: changed processBatch interface */ ENDinterface(ruleset) -#define rulesetCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */ +#define rulesetCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */ /* prototypes */ diff --git a/runtime/wti.c b/runtime/wti.c index f91fb5a9..df77bc19 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -44,6 +44,7 @@ #include "wti.h" #include "obj.h" #include "glbl.h" +#include "action.h" #include "atomic.h" /* static data */ @@ -171,6 +172,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); + free(pThis->actWrkrInfo); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); free(pThis->pszDbgHdr); @@ -195,11 +197,15 @@ wtiConstructFinalize(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n", + wtiGetDbgHdr(pThis), iActionNbr); /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = RSFALSE; + /* must use calloc as we need zero-init */ + CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t))); + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); @@ -277,6 +283,7 @@ wtiWorker(wti_t *pThis) rsRetVal localRet; rsRetVal terminateRet; int iCancelStateSave; + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -286,6 +293,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +dbgprintf("DDDD: wti %p: worker starting\n", pThis); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -332,9 +340,19 @@ wtiWorker(wti_t *pThis) bInactivityTOOccured = 0; /* reset for next run */ } + DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis); + for(i = 0 ; i < iActionNbr ; ++i) { + dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData); + if(pThis->actWrkrInfo[i].actWrkrData != NULL) { + dbgprintf("DDDD: calling freeWrkrData!\n"); + pThis->actWrkrInfo[i].pAction->pMod->mod.om.freeWrkrInstance(pThis->actWrkrInfo[i].actWrkrData); + } + } + /* indicate termination */ pthread_cleanup_pop(0); /* remove cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); +dbgprintf("DDDD: wti %p: worker exiting\n", pThis); RETiRet; } diff --git a/runtime/wti.h b/runtime/wti.h index 014251f0..bb4f56bc 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -28,6 +28,11 @@ #include "batch.h" +typedef struct actWrkrInfo { + action_t *pAction; + void *actWrkrData; +} actWrkrInfo_t; + /* the worker thread instance class */ struct wti_s { BEGINobjInstance; @@ -37,6 +42,7 @@ struct wti_s { wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ + actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); }; diff --git a/runtime/wtp.c b/runtime/wtp.c index 19151e7c..1b960eb9 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -8,7 +8,7 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -91,6 +91,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); pthread_attr_init(&pThis->attrThrd); /* Set thread scheduling policy to default */ +#warning do we need this any longer? I think it was a cure for an already fixed bug.. #ifdef HAVE_PTHREAD_SETSCHEDPARAM pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy); pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param); @@ -121,7 +122,8 @@ wtpConstructFinalize(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker thread pool (numworkerThreads %d)\n", + wtpGetDbgHdr(pThis), pThis->iNumWorkerThreads); /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ diff --git a/tools/omdiscard.c b/tools/omdiscard.c index 15c6ea82..a76bcc33 100644 --- a/tools/omdiscard.c +++ b/tools/omdiscard.c @@ -6,7 +6,7 @@ * * File begun on 2007-07-24 by RGerhards * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -49,6 +49,10 @@ typedef struct _instanceData { EMPTY_STRUCT } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* we do not need a createInstance()! BEGINcreateInstance CODESTARTcreateInstance @@ -56,6 +60,11 @@ ENDcreateInstance */ +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* do nothing */ @@ -87,6 +96,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINparseSelectorAct CODESTARTparseSelectorAct CODE_STD_STRING_REQUESTparseSelectorAct(0) @@ -114,6 +128,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt diff --git a/tools/omfile.c b/tools/omfile.c index fdcf355a..e8cd24d3 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -133,6 +133,7 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry; typedef struct _instanceData { + pthread_mutex_t mutWrite; /* guard against multiple instances writing to single file */ uchar *f_fname; /* file or template name (display only) */ uchar *tplName; /* name of assigned template */ strm_t *pStrm; /* our output stream */ @@ -181,6 +182,20 @@ typedef struct _instanceData { STATSCOUNTER_DEF(ctrMax, mutCtrMax); } instanceData; +/* to build a linked list for temporary storage of lines while we cannot commit */ +typedef struct linebuf { + uchar *filename; /* for dynafiles, make go away */ + uchar *ln; + unsigned iMsgOpts; + struct linebuf *pNext; +} linebuf_t; + +typedef struct wrkrInstanceData { + instanceData *pData; + linebuf_t *pRoot; + linebuf_t *pLast; +} wrkrInstanceData_t; + typedef struct configSettings_s { int iDynaFileCacheSize; /* max cache for dynamic files */ @@ -786,7 +801,7 @@ finalize_it: /* rgerhards 2004-11-11: write to a file output. */ static rsRetVal -writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) +writeFile(instanceData *pData, linebuf_t *linebuf) { DEFiRet; @@ -796,7 +811,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) * check if it still is ok or a new file needs to be created */ if(pData->bDynamicName) { - CHKiRet(prepareDynFile(pData, ppString[1], iMsgOpts)); + CHKiRet(prepareDynFile(pData, linebuf->filename, linebuf->iMsgOpts)); } else { /* "regular", non-dynafile */ if(pData->pStrm == NULL) { CHKiRet(prepareFile(pData, pData->f_fname)); @@ -806,7 +821,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) } } - CHKiRet(doWrite(pData, ppString[0], strlen(CHAR_CONVERT(ppString[0])))); + CHKiRet(doWrite(pData, linebuf->ln, ustrlen(linebuf->ln))); finalize_it: RETiRet; @@ -888,9 +903,15 @@ ENDfreeCnf BEGINcreateInstance CODESTARTcreateInstance pData->pStrm = NULL; + pthread_mutex_init(&pData->mutWrite, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance free(pData->tplName); @@ -913,9 +934,15 @@ CODESTARTfreeInstance free(pData->cryprovName); free(pData->cryprovNameFull); } + pthread_mutex_destroy(&pData->mutWrite); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume CODESTARTtryResume ENDtryResume @@ -926,8 +953,66 @@ CODESTARTbeginTransaction ENDbeginTransaction +static rsRetVal +bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line) +{ + linebuf_t *lb; + DEFiRet; + + CHKmalloc(lb = (linebuf_t*) malloc(sizeof(linebuf_t))); + CHKmalloc(lb->filename = ustrdup(filename)); + CHKmalloc(lb->ln = ustrdup(line)); + lb->pNext = NULL; + if(pWrkrData->pRoot == NULL) { + pWrkrData->pRoot = pWrkrData->pLast = lb; + } else { + pWrkrData->pLast->pNext = lb; + pWrkrData->pLast = lb; + } +finalize_it: + RETiRet; +} + +static void +submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData) +{ + linebuf_t *curr, *todel; + +dbgprintf("omfile: waiting on write lock (pWrkrData %p)\n", pWrkrData); + pthread_mutex_lock(&pData->mutWrite); +dbgprintf("omfile: aquired write lock (pWrkrData %p)\n", pWrkrData); + + for(curr = pWrkrData->pRoot ; curr != NULL ; ) { + DBGPRINTF("omfile: file to log to: %s\n", curr->filename); + DBGPRINTF("omfile: start of data: '%.128s'\n", curr->ln); + STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); + writeFile(pData, curr); + + todel = curr; + curr = curr->pNext; + free(todel->filename); + free(todel->ln); + free(todel); + } + pthread_mutex_unlock(&pData->mutWrite); +dbgprintf("omfile: free write lock (pWrkrData %p)\n", pWrkrData); + pWrkrData->pRoot = NULL; +} + + +BEGINdoAction +CODESTARTdoAction + pData = pWrkrData->pData; + iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname, + ppString[0]); + if(iRet == RS_RET_OK) + iRet = RS_RET_DEFER_COMMIT; +ENDdoAction + BEGINendTransaction CODESTARTendTransaction + pData = pWrkrData->pData; + submitCachedLines(pWrkrData, pData); /* Note: pStrm may be NULL if there was an error opening the stream */ if(pData->bFlushOnTXEnd && pData->pStrm != NULL) { /* if we have an async writer, it controls the flush via @@ -941,21 +1026,6 @@ finalize_it: ENDendTransaction -BEGINdoAction -CODESTARTdoAction - DBGPRINTF("file to log to: %s\n", - (pData->bDynamicName) ? ppString[1] : pData->f_fname); - DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]); - STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); - CHKiRet(writeFile(ppString, iMsgOpts, pData)); - if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) { - CHKiRet(strm.Flush(pData->pStrm)); - } -finalize_it: - if(iRet == RS_RET_OK) - iRet = RS_RET_DEFER_COMMIT; -ENDdoAction - static inline void setInstParamDefaults(instanceData *pData) @@ -1336,6 +1406,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a BEGINdoHUP CODESTARTdoHUP + pthread_mutex_lock(&pData->mutWrite); if(pData->bDynamicName) { dynaFileFreeCacheEntries(pData); } else { @@ -1343,6 +1414,7 @@ CODESTARTdoHUP closeFile(pData); } } + pthread_mutex_unlock(&pData->mutWrite); ENDdoHUP @@ -1358,6 +1430,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES diff --git a/tools/omfwd.c b/tools/omfwd.c index 6e5cf809..a7ba4d01 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -112,6 +112,10 @@ typedef struct _instanceData { int errsToReport; /* (remaining) number of errors to report */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* config data */ typedef struct configSettings_s { uchar *pszTplName; /* name of the default template to use */ @@ -335,6 +339,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -360,6 +369,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("%s", pData->target); @@ -720,7 +734,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume - iRet = doTryResume(pData); + iRet = doTryResume(pWrkrData->pData); ENDtryResume @@ -1251,6 +1265,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES diff --git a/tools/ompipe.c b/tools/ompipe.c index df8066b1..8d886509 100644 --- a/tools/ompipe.c +++ b/tools/ompipe.c @@ -12,7 +12,7 @@ * NOTE: read comments in module-template.h to understand how this pipe * works! * - * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -69,9 +69,14 @@ typedef struct _instanceData { uchar *pipe; /* pipe or template name (display only) */ uchar *tplName; /* format template to use */ short fd; /* pipe descriptor for (current) pipe */ + pthread_mutex_t mutWrite; /* guard against multiple instances writing to same pipe */ sbool bHadError; /* did we already have/report an error on this pipe? */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { EMPTY_STRUCT } configSettings_t; @@ -276,17 +281,29 @@ CODESTARTcreateInstance pData->pipe = NULL; pData->fd = -1; pData->bHadError = 0; + pthread_mutex_init(&pData->mutWrite, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance + pthread_mutex_destroy(&pData->mutWrite); free(pData->pipe); if(pData->fd != -1) close(pData->fd); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume CODESTARTtryResume ENDtryResume @@ -294,7 +311,10 @@ ENDtryResume BEGINdoAction CODESTARTdoAction DBGPRINTF(" (%s)\n", pData->pipe); + /* this module is single-threaded by nature */ + pthread_mutex_lock(&pData->mutWrite); iRet = writePipe(ppString, pData); + pthread_mutex_unlock(&pData->mutWrite); ENDdoAction @@ -390,6 +410,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_doHUP CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES diff --git a/tools/omshell.c b/tools/omshell.c index ac62fa62..a8ba1564 100644 --- a/tools/omshell.c +++ b/tools/omshell.c @@ -19,7 +19,7 @@ * of the "old" message code without any modifications. However, it * helps to have things at the right place one we go to the meat of it. * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -63,11 +63,20 @@ typedef struct _instanceData { uchar progName[MAXFNAME]; /* program to execute */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -80,6 +89,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo printf("%s", pData->progName); @@ -92,10 +106,7 @@ ENDtryResume BEGINdoAction CODESTARTdoAction - /* TODO: using pData->progName is not clean from the point of - * modularization. We'll change that as we go ahead with modularization. - * rgerhards, 2007-07-20 - */ + pData = pWrkrData->pData; dbgprintf("\n"); if(execProg((uchar*) pData->progName, 1, ppString[0]) == 0) errmsg.LogError(0, NO_ERRCODE, "Executing program '%s' failed", (char*)pData->progName); @@ -139,6 +150,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt diff --git a/tools/omusrmsg.c b/tools/omusrmsg.c index f4cc4094..cab2a4fe 100644 --- a/tools/omusrmsg.c +++ b/tools/omusrmsg.c @@ -8,7 +8,7 @@ * File begun on 2007-07-20 by RGerhards (extracted from syslogd.c, which at the * time of the fork from sysklogd was under BSD license) * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -87,6 +87,10 @@ typedef struct _instanceData { uchar *tplName; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { EMPTY_STRUCT } configSettings_t; @@ -115,6 +119,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -128,6 +137,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo register int i; CODESTARTdbgPrintInstInfo @@ -435,6 +449,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/tools/syslogd.c b/tools/syslogd.c index aaeb9866..7597b05d 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -560,13 +560,13 @@ finalize_it: * for the main queue. */ static rsRetVal -msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate) +msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { DEFiRet; assert(pBatch != NULL); pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */ preprocessBatch(pBatch); - ruleset.ProcessBatch(pBatch); + ruleset.ProcessBatch(pBatch, pWti); //TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we //do not have this yet and so we emulate -- 2010-06-10 int i; |