diff options
34 files changed, 730 insertions, 362 deletions
@@ -3,6 +3,12 @@ Version 7.4.7 [v7.4-stable] 2013-11-?? - bugfix: call to ruleset with async queue did not use the queue closes: http://bugzilla.adiscon.com/show_bug.cgi?id=443 --------------------------------------------------------------------------- +Version 8.1.0 [devel] 2013-11-?? +- module omruleset is no longer enabled by default. + Note that it has been deprecated in v7 and been replaced by the "call" + statement. Also, it can still be build without problems, the option must + just explicitely be given. +--------------------------------------------------------------------------- Version 7.5.7 [devel] 2013-11-?? - bugfix: ommysql lost configfile/section parameters after first close This means that when a connection was broken, it was probably @@ -119,10 +119,10 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); -static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); -static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -175,10 +175,9 @@ configSettings_t cs_save; /* our saved (scope!) config settings */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice * to have during one instance of an rsyslogd run. For example, I use them to name actions when there - * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep - * counting. -- rgerhards, 2008-01-29 + * is no better name available. */ -static int iActionNbr = 0; +int iActionNbr = 0; /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfparamdescr[] = { @@ -308,7 +307,6 @@ rsRetVal actionDestruct(action_t *pThis) pThis->pMod->freeInstance(pThis->pModData); pthread_mutex_destroy(&pThis->mutAction); - pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); @@ -341,7 +339,7 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ - pthread_mutex_init(&pThis->mutActExec, NULL); + pThis->iActionNbr = iActionNbr; pthread_mutex_init(&pThis->mutAction, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); @@ -428,7 +426,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, - (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); + processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); @@ -635,7 +633,7 @@ static inline void actionSuspend(action_t *pThis) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, int *pbShutdownImmediate) +actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { int iRetries; int iSleepPeriod; @@ -647,7 +645,7 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) iRetries = 0; while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); - iRet = pThis->pMod->tryResume(pThis->pModData); + iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { bTreatOKasSusp = 1; @@ -687,10 +685,26 @@ finalize_it: } +static rsRetVal +actionCheckAndCreateWrkrInstance(action_t *pThis, wti_t *pWti) +{ + DEFiRet; + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { +dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr); + DBGPRINTF("we need to create a new action worker instance for " + "action %d\n", pThis->iActionNbr); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + } +finalize_it: + RETiRet; +} + /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ -static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) +static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; @@ -714,7 +728,7 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) if(pThis->eState == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pbShutdownImmediate, pWti)); } if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { @@ -731,18 +745,19 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; assert(pThis != NULL); - CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); + CHKiRet(actionTryResume(pThis, pbShutdownImmediate, pWti)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ if(pThis->eState == ACT_STATE_RDY) { - iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionSetState(pThis, ACT_STATE_ITX); @@ -905,17 +920,21 @@ done: RETiRet; * rgerhards, 2008-01-28 */ rsRetVal -actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", + getActStateName(pThis), pThis->iActionNbr); + + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; - iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -952,19 +971,19 @@ finalize_it: * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ -static inline rsRetVal -actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +static rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(pThis->eState == ACT_STATE_ITX) - CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti)); iRet = getReturnCode(pThis); finalize_it: @@ -977,7 +996,7 @@ finalize_it: * rgerhards, 2008-01-28 */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch) +finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -989,9 +1008,9 @@ finishBatch(action_t *pThis, batch_t *pBatch) FINALIZE; /* nothing to do */ } - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); if(pThis->eState == ACT_STATE_ITX) { - iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -1034,7 +1053,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti) { int i; int iElemProcessed; @@ -1059,7 +1078,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) if(batchIsValidElem(pBatch, i)) { pMsg = pBatch->pElem[i].pMsg; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate); + pBatch->pbShutdownImmediate, pWti); DBGPRINTF("action %p call returned %d\n", pAction, localRet); /* Note: we directly modify the batch object state, because we know that * wo do not overwrite BATCH_STATE_DISC indicators! @@ -1111,7 +1130,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti) { int i; int bDone; @@ -1125,7 +1144,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) wasDoneTo = pBatch->iDoneUpTo; bDone = 0; do { - localRet = tryDoAction(pAction, pBatch, &nElem); + localRet = tryDoAction(pAction, pBatch, &nElem, pWti); if(localRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } @@ -1135,7 +1154,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) /* try commit transaction, once done, we can simply do so as if * that return state was returned from tryDoAction(). */ - localRet = finishBatch(pAction, pBatch); + localRet = finishBatch(pAction, pBatch, pWti); } if( localRet == RS_RET_OK @@ -1164,8 +1183,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); + submitBatch(pAction, pBatch, nElem / 2, pWti); + submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti); bDone = 1; } } @@ -1245,31 +1264,31 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR * rgerhards, 2009-05-12 */ static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti) { DEFiRet; assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); - iRet = finishBatch(pAction, pBatch); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti)); + iRet = finishBatch(pAction, pBatch, pWti); finalize_it: RETiRet; } -#pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { int *pbShutdownImmdtSave; sbool *activeSave; int bMustRestoreActivePtr = 0; rsRetVal localRet; + action_t *pAction = (action_t*) pVoid; DEFiRet; assert(pBatch != NULL); @@ -1280,17 +1299,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) } CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - - iRet = processAction(pAction, pBatch); - - pthread_cleanup_pop(1); /* unlock mutex */ + iRet = processAction(pAction, pBatch, pWti); /* even if processAction failed, we need to release the batch (else we * have a memory leak). So we do this first, and then check if we need to @@ -1312,15 +1321,12 @@ finalize_it: pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" -/* call the HUP handler for a given action, if such a handler is defined. The - * action mutex is locked, because the HUP handler most probably needs to modify - * some internal state information. - * rgerhards, 2008-10-22 +/* call the HUP handler for a given action, if such a handler is defined. + * Note that the action must be able to service HUP requests concurrently + * to any current doAction() processing. */ -#pragma GCC diagnostic ignored "-Wempty-body" rsRetVal actionCallHUPHdlr(action_t *pAction) { @@ -1329,19 +1335,13 @@ actionCallHUPHdlr(action_t *pAction) ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); - if(pAction->pMod->doHUP == NULL) { - FINALIZE; /* no HUP handler, so we are done ;) */ + if(pAction->pMod->doHUP != NULL) { + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); } - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - CHKiRet(pAction->pMod->doHUP(pAction->pModData)); - pthread_cleanup_pop(1); /* unlock mutex */ - finalize_it: RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* set the action message queue mode @@ -1381,7 +1381,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT * rgerhards, 2010-06-08 */ static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1392,7 +1392,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); + iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti); else iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); @@ -1409,7 +1409,7 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction, msg_t *pMsg) +actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti) { DEFiRet; @@ -1464,7 +1464,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pMsg); + iRet = doSubmitToActionQ(pAction, pMsg, pWti); finalize_it: RETiRet; @@ -1475,7 +1475,7 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti) { msg_t *pMsg; DEFiRet; @@ -1490,7 +1490,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) } /* call the output driver */ - iRet = actionWriteToAction(pAction, pMsg); + iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: /* we need to update the batch to handle failover processing correctly */ @@ -1550,7 +1550,7 @@ activateActions(void) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { time_t now = 0; time_t lastAct; @@ -1588,7 +1588,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) } } - iRet = doSubmitToActionQBatch(pAction, pBatch); + iRet = doSubmitToActionQBatch(pAction, pBatch, pWti); free(pBatch->active); pBatch->active = activeSave; @@ -1613,7 +1613,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch) * rgerhards, 2011-06-16 */ static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) +doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { sbool bNeedSubmit; sbool *activeSave; @@ -1645,14 +1645,14 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) } if(bNeedSubmit) { /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); } else { DBGPRINTF("no need to submit batch, all invalid\n"); } } else { if(GatherStats) countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); } free(pBatch->active); @@ -1666,7 +1666,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2010-06-08 */ static rsRetVal -doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -1674,7 +1674,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch); + iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ @@ -1684,7 +1684,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); + doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti); } } } @@ -1699,7 +1699,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2010-06-23 */ static inline rsRetVal -helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { int i; DEFiRet; @@ -1712,7 +1712,7 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { - doActionCallAction(pAction, pBatch, i); + doActionCallAction(pAction, pBatch, i, pWti); } } @@ -1724,13 +1724,14 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) */ #pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal -doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); +dbgprintf("DDDD: locked mutAction\n"); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); - iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti); d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ @@ -51,6 +51,7 @@ struct action_s { time_t tActNow; /* the current time for an action execution. Initially set to -1 and populated on an as-needed basis. This is a performance optimization. */ time_t tLastExec; /* time this action was last executed */ + int iActionNbr; /* this action's number (ID) */ sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */ sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */ int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */ @@ -68,7 +69,7 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - rsRetVal (*submitToActQ)(action_t *, batch_t *);/* function submit message to action queue */ + rsRetVal (*submitToActQ)(action_t *, batch_t *, wti_t*);/* function submit message to action queue */ rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2, ACT_JSON_PASSING = 3} @@ -78,7 +79,6 @@ struct action_s { * in this order. */ qqueue_t *pQueue; /* action queue */ pthread_mutex_t mutAction; /* primary action mutex */ - pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ uchar *pszName; /* action name (for documentation) */ DEF_ATOMIC_HELPER_MUT(mutCAS); /* for statistics subsystem */ @@ -96,7 +96,7 @@ rsRetVal actionDestruct(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); -rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg); +rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct nvlst *lst, int bSuspended); @@ -104,4 +104,7 @@ rsRetVal activateActions(void); rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction); rsRetVal actionProcessCnf(struct cnfobj *o); +/* external data */ +extern int iActionNbr; + #endif /* #ifndef ACTION_H_INCLUDED */ diff --git a/configure.ac b/configure.ac index 3abd559e..de4c3ea9 100644 --- a/configure.ac +++ b/configure.ac @@ -1353,7 +1353,7 @@ AC_ARG_ENABLE(omruleset, no) enable_omruleset="no" ;; *) AC_MSG_ERROR(bad value ${enableval} for --enable-omruleset) ;; esac], - [enable_omruleset=yes] + [enable_omruleset=no] ) AM_CONDITIONAL(ENABLE_OMRULESET, test x$enable_omruleset = xyes) diff --git a/doc/multi_ruleset.html b/doc/multi_ruleset.html index 14a761c5..1be2c81e 100644 --- a/doc/multi_ruleset.html +++ b/doc/multi_ruleset.html @@ -89,6 +89,15 @@ modify the ruleset to which the next input is bound but rather provides a system default rule set for those inputs that did not explicitly bind to one. As such, the directive can not be used as a work-around to bind inputs to non-default rulesets that do not support ruleset binding. + +<h2>Rulesets and Main Queues</h2> +By default, rulesets do not have their own queue. It must be activated via +the $RulesetCreateMainQueue directive, or if using rainerscript format, by +specifying queue parameters on the ruleset directive, e.g. +ruleset(name="whatever" queue.type="fixedArray" queue. ...) +See <a href="http://www.rsyslog.com/doc/queue_parameters.html">http://www.rsyslog.com/doc/queue_parameters.html</a> +for more details. + <h2>Examples</h2> <h3>Split local and remote logging</h3> <p>Let's say you have a pretty standard system that logs its local messages to the usual diff --git a/doc/rsyslog_conf_modules.html b/doc/rsyslog_conf_modules.html index 8dc3ed56..99557215 100644 --- a/doc/rsyslog_conf_modules.html +++ b/doc/rsyslog_conf_modules.html @@ -53,28 +53,28 @@ to message generators. and messages be transmitted to various different targets. <ul> <li><a href="omfile.html">omfile</a> - file output module</li> -<li><a href="omfwd.html">omfwd</a> - syslog forwarding output module</li> +<li><a href="omfwd.html">omfwd</a> (does NOT yet work in v8) - syslog forwarding output module</li> <li><a href="omjournal.html">omjournal</a> - Linux journal output module</li> <li><a href="ompipe.html">ompipe</a> - named pipe output module</li> <li><a href="omusrmsg.html">omusrmsg</a> - user message output module</li> -<li><a href="omsnmp.html">omsnmp</a> - SNMP trap output module</li> +<li><a href="omsnmp.html">omsnmp</a> (does NOT yet work in v8) - SNMP trap output module</li> <li><a href="omstdout.html">omtdout</a> - stdout output module (mainly a test tool)</li> -<li><a href="omrelp.html">omrelp</a> - RELP output module</li> +<li><a href="omrelp.html">omrelp</a> (does NOT yet work in v8) - RELP output module</li> <li><a href="omruleset.html">omruleset</a> - forward message to another ruleset</li> -<li>omgssapi - output module for GSS-enabled syslog</li> +<li>omgssapi (does NOT yet work in v8) - output module for GSS-enabled syslog</li> <li><a href="ommysql.html">ommysql</a> - output module for MySQL</li> -<li>ompgsql - output module for PostgreSQL</li> -<li><a href="omlibdbi.html">omlibdbi</a> - +<li>ompgsql (does NOT yet work in v8) - output module for PostgreSQL</li> +<li><a href="omlibdbi.html">omlibdbi</a> (does NOT yet work in v8) - generic database output module (Firebird/Interbase, MS SQL, Sybase, SQLLite, Ingres, Oracle, mSQL)</li> -<li><a href="ommail.html">ommail</a> - +<li><a href="ommail.html">ommail</a> (does NOT yet work in v8) - permits rsyslog to alert folks by mail if something important happens</li> -<li><a href="omprog.html">omprog</a> - permits sending messages to a program for custom processing</li> -<li><a href="omoracle.html">omoracle</a> - output module for Oracle (native OCI interface)</li> -<li><a href="omudpspoof.html">omudpspoof</a> - output module sending UDP syslog messages with a spoofed address</li> -<li><a href="omuxsock.html">omuxsock</a> - output module Unix domain sockets</li> -<li><a href="omhdfs.html">omhdfs</a> - output module for Hadoop's HDFS file system</li> -<li><a href="ommongodb.html">ommongodb</a> - output module for MongoDB</li> +<li><a href="omprog.html">omprog</a> (does NOT yet work in v8) - permits sending messages to a program for custom processing</li> +<li><a href="omoracle.html">omoracle</a> (orphaned) - output module for Oracle (native OCI interface)</li> +<li><a href="omudpspoof.html">omudpspoof</a> (does NOT yet work in v8) - output module sending UDP syslog messages with a spoofed address</li> +<li><a href="omuxsock.html">omuxsock</a> (does NOT yet work in v8) - output module Unix domain sockets</li> +<li><a href="omhdfs.html">omhdfs</a> (does NOT yet work in v8) - output module for Hadoop's HDFS file system</li> +<li><a href="ommongodb.html">ommongodb</a> (does NOT yet work in v8) - output module for MongoDB</li> <li><a href="omelasticsearch.html">omelasticsearch</a> - output module for ElasticSearch</li> </ul> diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c index 0dad86cc..0c695934 100644 --- a/grammar/rainerscript.c +++ b/grammar/rainerscript.c @@ -2428,7 +2428,7 @@ cnfstmtPrintOnly(struct cnfstmt *stmt, int indent, sbool subtree) free(cstr); break; case S_ACT: - doIndent(indent); dbgprintf("ACTION %p [%s:%s]\n", stmt->d.act, + doIndent(indent); dbgprintf("ACTION %d [%s:%s]\n", stmt->d.act->iActionNbr, modGetName(stmt->d.act->pMod), stmt->printable); break; case S_IF: diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 694c07c4..a883ef1b 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -377,7 +377,6 @@ checkRuleset(modConfData_t *modConf) DEFiRet; modConf->pBindRuleset = NULL; /* assume default ruleset */ -dbgprintf("DDDD: impstats ruleset %s\n", modConf->pszBindRuleset); if(modConf->pszBindRuleset == NULL) FINALIZE; @@ -390,7 +389,6 @@ dbgprintf("DDDD: impstats ruleset %s\n", modConf->pszBindRuleset); CHKiRet(localRet); modConf->pBindRuleset = pRuleset; finalize_it: -dbgprintf("DDDD: impstats ruleset ptr %p\n", modConf->pBindRuleset); RETiRet; } diff --git a/plugins/mmanon/mmanon.c b/plugins/mmanon/mmanon.c index 16a4f34b..c714706a 100644 --- a/plugins/mmanon/mmanon.c +++ b/plugins/mmanon/mmanon.c @@ -71,6 +71,10 @@ typedef struct _instanceData { } ipv4; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -119,6 +123,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -130,6 +138,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + static inline void setInstParamDefaults(instanceData *pData) { @@ -354,6 +367,7 @@ BEGINdoAction int lenMsg; int i; CODESTARTdoAction + pData = pWrkrData->pData; pMsg = (msg_t*) ppString[0]; lenMsg = getMSGLen(pMsg); msg = getMSG(pMsg); @@ -387,6 +401,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/omelasticsearch/README b/plugins/omelasticsearch/README index 9021bc0e..b8bf4151 100644 --- a/plugins/omelasticsearch/README +++ b/plugins/omelasticsearch/README @@ -1,3 +1,7 @@ +How to access ElasticSearch on local machine (for testing): +=========================================================== +see: https://github.com/mobz/elasticsearch-head + How to produce an error: ======================== It's quite easy to get 400, if you put a wrong mapping to your diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index eb82c35f..b878050d 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -69,8 +69,8 @@ STATSCOUNTER_DEF(indexESFail, mutIndexESFail) typedef struct curl_slist HEADER; typedef struct _instanceData { int port; - int replyLen; int fdErrFile; /* error file fd or -1 if not open */ + pthread_mutex_t mutErrFile; uchar *server; uchar *uid; uchar *pwd; @@ -80,24 +80,29 @@ typedef struct _instanceData { uchar *tplName; uchar *timeout; uchar *bulkId; - uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; - char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; sbool dynBulkId; sbool bulkmode; sbool asyncRepl; +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + int replyLen; + char *reply; + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ + uchar *restURL; /* last used URL for error reporting */ struct { es_str_t *data; int nmemb; /* number of messages in batch (for statistics counting) */ uchar *currTpl1; uchar *currTpl2; } batch; - CURL *curlHandle; /* libcurl session handle */ - HEADER *postHeader; /* json POST request info */ -} instanceData; +} wrkrInstanceData_t; /* tables for interfacing with the v6 config system */ @@ -117,7 +122,7 @@ static struct cnfparamdescr actpdescr[] = { { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 }, + { "template", eCmdHdlrGetWord, 0 }, { "dynbulkid", eCmdHdlrBinary, 0 }, { "bulkid", eCmdHdlrGetWord, 0 }, }; @@ -127,12 +132,32 @@ static struct cnfparamblk actpblk = actpdescr }; +static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData); + BEGINcreateInstance CODESTARTcreateInstance - pData->restURL = NULL; pData->fdErrFile = -1; + pthread_mutex_init(&pData->mutErrFile, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +dbgprintf("omelasticsearch: createWrkrInstance\n"); + pWrkrData->restURL = NULL; + if(pData->bulkmode) { + pWrkrData->batch.currTpl1 = NULL; + pWrkrData->batch.currTpl2 = NULL; + if((pWrkrData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + CHKiRet(curlSetup(pWrkrData, pWrkrData->pData)); +finalize_it: +dbgprintf("DDDD: createWrkrInstance,pData %p/%p, pWrkrData %p\n", pData, pWrkrData->pData, pWrkrData); +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -141,16 +166,9 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - if (pData->postHeader) { - curl_slist_free_all(pData->postHeader); - pData->postHeader = NULL; - } - if (pData->curlHandle) { - curl_easy_cleanup(pData->curlHandle); - pData->curlHandle = NULL; - } if(pData->fdErrFile != -1) close(pData->fdErrFile); + pthread_mutex_destroy(&pData->mutErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -159,11 +177,23 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); - free(pData->restURL); free(pData->errorFile); free(pData->bulkId); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->postHeader) { + curl_slist_free_all(pWrkrData->postHeader); + pWrkrData->postHeader = NULL; + } + if(pWrkrData->curlHandle) { + curl_easy_cleanup(pWrkrData->curlHandle); + pWrkrData->curlHandle = NULL; + } + free(pWrkrData->restURL); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("omelasticsearch\n"); @@ -211,7 +241,7 @@ setBaseURL(instanceData *pData, es_str_t **url) static inline rsRetVal -checkConn(instanceData *pData) +checkConn(wrkrInstanceData_t *pWrkrData) { es_str_t *url; CURL *curl = NULL; @@ -219,7 +249,7 @@ checkConn(instanceData *pData) char *cstr; DEFiRet; - setBaseURL(pData, &url); + setBaseURL(pWrkrData->pData, &url); curl = curl_easy_init(); if(curl == NULL) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); @@ -235,16 +265,16 @@ checkConn(instanceData *pData) curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - pData->reply = NULL; - pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } - free(pData->reply); + free(pWrkrData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -257,7 +287,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); - iRet = checkConn(pData); + iRet = checkConn(pWrkrData); ENDtryResume @@ -330,7 +360,7 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, static rsRetVal -setCurlURL(instanceData *pData, uchar **tpls) +setCurlURL(wrkrInstanceData_t *pWrkrData, instanceData *pData, uchar **tpls) { char authBuf[1024]; uchar *searchIndex; @@ -368,11 +398,11 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - free(pData->restURL); - pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + free(pWrkrData->restURL); + pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_URL, pWrkrData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -383,8 +413,8 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); - curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: RETiRet; @@ -396,7 +426,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar **tpls) +buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -411,29 +441,29 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); - r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId); + r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType, ustrlen(searchType)); if(parent != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent)); } if(bulkId != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId)); } - if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); } - ++pData->batch.nmemb; + ++pWrkrData->batch.nmemb; iRet = RS_RET_DEFER_COMMIT; finalize_it: @@ -446,7 +476,7 @@ finalize_it: * needs to be closed, HUP must be sent. */ static inline rsRetVal -writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) { char *rendered = NULL; cJSON *errRoot; @@ -454,6 +484,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON *replyRoot = *pReplyRoot; size_t toWrite; ssize_t wrRet; + sbool bMutLocked = 0; char errStr[1024]; DEFiRet; @@ -463,6 +494,9 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) FINALIZE; } + pthread_mutex_lock(&pData->mutErrFile); + bMutLocked = 1; + if(pData->fdErrFile == -1) { pData->fdErrFile = open((char*)pData->errorFile, O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, @@ -474,7 +508,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) } } if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); - cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pWrkrData->restURL)); cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); @@ -495,13 +529,15 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) *pReplyRoot = NULL; /* tell caller not to delete once again! */ finalize_it: + if(bMutLocked) + pthread_mutex_unlock(&pData->mutErrFile); free(rendered); RETiRet; } static inline rsRetVal -checkResultBulkmode(instanceData *pData, cJSON *root) +checkResultBulkmode(wrkrInstanceData_t *pWrkrData, cJSON *root) { int i; int numitems; @@ -515,7 +551,7 @@ checkResultBulkmode(instanceData *pData, cJSON *root) if(items == NULL || items->type != cJSON_Array) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " "bulkmode insert does not return array, reply is: %s\n", - pData->reply); + pWrkrData->reply); ABORT_FINALIZE(RS_RET_DATAFAIL); } numitems = cJSON_GetArraySize(items); @@ -547,20 +583,20 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData, uchar *reqmsg) +checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) { cJSON *root; cJSON *ok; DEFiRet; - root = cJSON_Parse(pData->reply); + root = cJSON_Parse(pWrkrData->reply); if(root == NULL) { DBGPRINTF("omelasticsearch: could not parse JSON result \n"); ABORT_FINALIZE(RS_RET_ERR); } - if(pData->bulkmode) { - iRet = checkResultBulkmode(pData, root); + if(pWrkrData->pData->bulkmode) { + iRet = checkResultBulkmode(pWrkrData, root); } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { @@ -572,7 +608,7 @@ checkResult(instanceData *pData, uchar *reqmsg) * these in any case. */ if(iRet == RS_RET_DATAFAIL) { - writeDataError(pData, &root, reqmsg); + writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg); iRet = RS_RET_OK; /* we have handled the problem! */ } @@ -587,19 +623,19 @@ finalize_it: static rsRetVal -curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs) +curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, int nmsgs) { CURLcode code; - CURL *curl = pData->curlHandle; + CURL *curl = pWrkrData->curlHandle; DEFiRet; - pData->reply = NULL; - pData->replyLen = 0; + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; - if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) - CHKiRet(setCurlURL(pData, tpls)); + if(pWrkrData->pData->dynSrchIdx || pWrkrData->pData->dynSrchType || pWrkrData->pData->dynParent) + CHKiRet(setCurlURL(pWrkrData, pWrkrData->pData, tpls)); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); @@ -618,27 +654,27 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsg break; } - DBGPRINTF("omelasticsearch: pData replyLen = '%d'\n", pData->replyLen); - if (pData->replyLen > 0) { - pData->reply[pData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ + DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen); + if(pWrkrData->replyLen > 0) { + pWrkrData->reply[pWrkrData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ } - DBGPRINTF("omelasticsearch: pData reply: '%s'\n", pData->reply); + DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply); - CHKiRet(checkResult(pData, message)); + CHKiRet(checkResult(pWrkrData, message)); finalize_it: - free(pData->reply); + free(pWrkrData->reply); RETiRet; } BEGINbeginTransaction CODESTARTbeginTransaction -dbgprintf("omelasticsearch: beginTransaction\n"); - if(!pData->bulkmode) { +dbgprintf("omelasticsearch: beginTransaction, pWrkrData %p, pData %p\n", pWrkrData, pWrkrData->pData); + if(!pWrkrData->pData->bulkmode) { FINALIZE; } - es_emptyStr(pData->batch.data); - pData->batch.nmemb = 0; + es_emptyStr(pWrkrData->batch.data); + pWrkrData->batch.nmemb = 0; finalize_it: ENDbeginTransaction @@ -646,14 +682,14 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString)); + if(pWrkrData->pData->bulkmode) { + CHKiRet(buildBatch(pWrkrData, ppString[0], ppString)); } else { - CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]), ppString, 1)); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pWrkrData->pData->bulkmode); ENDdoAction @@ -662,13 +698,13 @@ BEGINendTransaction CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); /* End Transaction only if batch data is not empty */ - if (pData->batch.data != NULL ) { - cstr = es_str2cstr(pData->batch.data, NULL); + if (pWrkrData->batch.data != NULL ) { + cstr = es_str2cstr(pWrkrData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb)); + CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb)); } else - dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n"); + dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, nothing to send. \n"); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -679,24 +715,24 @@ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { char *p = (char *)ptr; - instanceData *pData = (instanceData*) userdata; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) userdata; char *buf; size_t newlen; - newlen = pData->replyLen + size*nmemb; - if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + newlen = pWrkrData->replyLen + size*nmemb; + if((buf = realloc(pWrkrData->reply, newlen + 1)) == NULL) { DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); return 0; /* abort due to failure */ } - memcpy(buf+pData->replyLen, p, size*nmemb); - pData->replyLen = newlen; - pData->reply = buf; + memcpy(buf+pWrkrData->replyLen, p, size*nmemb); + pWrkrData->replyLen = newlen; + pWrkrData->reply = buf; return size*nmemb; } static rsRetVal -curlSetup(instanceData *pData) +curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData) { HEADER *header; CURL *handle; @@ -712,13 +748,13 @@ curlSetup(instanceData *pData) curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); curl_easy_setopt(handle, CURLOPT_POST, 1); - pData->curlHandle = handle; - pData->postHeader = header; + pWrkrData->curlHandle = handle; + pWrkrData->postHeader = header; if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL); + setCurlURL(pWrkrData, pData, NULL); } if(Debug) { @@ -838,16 +874,6 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } - if(pData->bulkmode) { - pData->batch.currTpl1 = NULL; - pData->batch.currTpl2 = NULL; - if((pData->batch.data = es_newStr(1024)) == NULL) { - DBGPRINTF("omelasticsearch: error creating batch string " - "turned off bulk mode\n"); - pData->bulkmode = 0; /* at least it works */ - } - } - iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; @@ -939,9 +965,6 @@ CODESTARTnewActInst pData->searchIndex = (uchar*) strdup("system"); if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); - - CHKiRet(curlSetup(pData)); - CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst @@ -979,6 +1002,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_doHUP diff --git a/plugins/omjournal/omjournal.c b/plugins/omjournal/omjournal.c index 160c369d..82fd7bfb 100644 --- a/plugins/omjournal/omjournal.c +++ b/plugins/omjournal/omjournal.c @@ -56,6 +56,10 @@ DEF_OMOD_STATIC_DATA typedef struct _instanceData { } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -91,6 +95,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature @@ -101,6 +110,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINnewActInst CODESTARTnewActInst /* Note: we currently do not have any parameters, so we do not need @@ -172,6 +186,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c index 49079ab1..c004d1c6 100644 --- a/plugins/ommysql/ommysql.c +++ b/plugins/ommysql/ommysql.c @@ -6,7 +6,7 @@ * * File begun on 2007-07-20 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * Licensed under the Apache License, Version 2.0 (the "License"); @@ -55,18 +55,22 @@ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) typedef struct _instanceData { - MYSQL *f_hmysql; /* handle to MySQL */ char dbsrv[MAXHOSTNAMELEN+1]; /* IP or hostname of DB server*/ unsigned int dbsrvPort; /* port of MySQL server */ char dbname[_DB_MAXDBLEN+1]; /* DB name */ char dbuid[_DB_MAXUNAMELEN+1]; /* DB user */ char dbpwd[_DB_MAXPWDLEN+1]; /* DB user's password */ - unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */ - uchar * f_configfile; /* MySQL Client Configuration File */ - uchar * f_configsection; /* MySQL Client Configuration Section */ - uchar *tplName; /* format template to use */ + uchar *configfile; /* MySQL Client Configuration File */ + uchar *configsection; /* MySQL Client Configuration Section */ + uchar *tplName; /* format template to use */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + MYSQL *hmysql; /* handle to MySQL */ + unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */ +} wrkrInstanceData_t; + typedef struct configSettings_s { int iSrvPort; /* database server port */ uchar *pszMySQLConfigFile; /* MySQL Client Configuration File */ @@ -104,6 +108,12 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->hmysql = NULL; +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -115,25 +125,28 @@ ENDisCompatibleWithFeature * MySQL connection. * Initially added 2004-10-28 */ -static void closeMySQL(instanceData *pData) +static void closeMySQL(wrkrInstanceData_t *pWrkrData) { - ASSERT(pData != NULL); - - if(pData->f_hmysql != NULL) { /* just to be on the safe side... */ - mysql_close(pData->f_hmysql); - pData->f_hmysql = NULL; + if(pWrkrData->hmysql != NULL) { /* just to be on the safe side... */ + mysql_close(pWrkrData->hmysql); + pWrkrData->hmysql = NULL; } } BEGINfreeInstance CODESTARTfreeInstance - free(pData->f_configfile); - free(pData->f_configsection); + free(pData->configfile); + free(pData->configsection); free(pData->tplName); - closeMySQL(pData); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + closeMySQL(pWrkrData); +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* nothing special here */ @@ -144,25 +157,23 @@ ENDdbgPrintInstInfo * We check if we have a valid MySQL handle. If not, we simply * report an error, but can not be specific. RGerhards, 2007-01-30 */ -static void reportDBError(instanceData *pData, int bSilent) +static void reportDBError(wrkrInstanceData_t *pWrkrData, int bSilent) { char errMsg[512]; unsigned uMySQLErrno; - ASSERT(pData != NULL); - /* output log message */ errno = 0; - if(pData->f_hmysql == NULL) { + if(pWrkrData->hmysql == NULL) { errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MySQL handle"); } else { /* we can ask mysql for the error description... */ - uMySQLErrno = mysql_errno(pData->f_hmysql); + uMySQLErrno = mysql_errno(pWrkrData->hmysql); snprintf(errMsg, sizeof(errMsg)/sizeof(char), "db error (%d): %s\n", uMySQLErrno, - mysql_error(pData->f_hmysql)); - if(bSilent || uMySQLErrno == pData->uLastMySQLErrno) + mysql_error(pWrkrData->hmysql)); + if(bSilent || uMySQLErrno == pWrkrData->uLastMySQLErrno) dbgprintf("mysql, DBError(silent): %s\n", errMsg); else { - pData->uLastMySQLErrno = uMySQLErrno; + pWrkrData->uLastMySQLErrno = uMySQLErrno; errmsg.LogError(0, NO_ERRCODE, "%s", errMsg); } } @@ -175,25 +186,26 @@ static void reportDBError(instanceData *pData, int bSilent) * MySQL connection. * Initially added 2004-10-28 mmeckelein */ -static rsRetVal initMySQL(instanceData *pData, int bSilent) +static rsRetVal initMySQL(wrkrInstanceData_t *pWrkrData, int bSilent) { + instanceData *pData; DEFiRet; - ASSERT(pData != NULL); - ASSERT(pData->f_hmysql == NULL); - pData->f_hmysql = mysql_init(NULL); - if(pData->f_hmysql == NULL) { + ASSERT(pWrkrData->hmysql == NULL); + pData = pWrkrData->pData; + pWrkrData->hmysql = mysql_init(NULL); + if(pWrkrData->hmysql == NULL) { errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MySQL handle"); iRet = RS_RET_SUSPENDED; } else { /* we could get the handle, now on with work... */ - mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->f_configsection!=NULL)?(char*)pData->f_configsection:"client")); - if(pData->f_configfile!=NULL){ + mysql_options(pWrkrData->hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->configsection!=NULL)?(char*)pData->configsection:"client")); + if(pData->configfile!=NULL){ FILE * fp; - fp=fopen((char*)pData->f_configfile,"r"); + fp=fopen((char*)pData->configfile,"r"); int err=errno; if(fp==NULL){ char msg[512]; - snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->f_configfile); + snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->configfile); if(bSilent) { char errStr[512]; rs_strerror_r(err, errStr, sizeof(errStr)); @@ -202,17 +214,17 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent) errmsg.LogError(err,NO_ERRCODE,"mysql configuration error: %s\n",msg); } else { fclose(fp); - mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_FILE,pData->f_configfile); + mysql_options(pWrkrData->hmysql,MYSQL_READ_DEFAULT_FILE,pData->configfile); } } /* Connect to database */ - if(mysql_real_connect(pData->f_hmysql, pData->dbsrv, pData->dbuid, + if(mysql_real_connect(pWrkrData->hmysql, pData->dbsrv, pData->dbuid, pData->dbpwd, pData->dbname, pData->dbsrvPort, NULL, 0) == NULL) { - reportDBError(pData, bSilent); - closeMySQL(pData); /* ignore any error we may get */ + reportDBError(pWrkrData, bSilent); + closeMySQL(pWrkrData); /* ignore any error we may get */ ABORT_FINALIZE(RS_RET_SUSPENDED); } - mysql_autocommit(pData->f_hmysql, 0); + mysql_autocommit(pWrkrData->hmysql, 0); } finalize_it: @@ -224,35 +236,32 @@ finalize_it: * to an established MySQL session. * Initially added 2004-10-28 mmeckelein */ -rsRetVal writeMySQL(uchar *psz, instanceData *pData) +rsRetVal writeMySQL(wrkrInstanceData_t *pWrkrData, uchar *psz) { DEFiRet; - ASSERT(psz != NULL); - ASSERT(pData != NULL); - /* see if we are ready to proceed */ - if(pData->f_hmysql == NULL) { - CHKiRet(initMySQL(pData, 0)); + if(pWrkrData->hmysql == NULL) { + CHKiRet(initMySQL(pWrkrData, 0)); } /* try insert */ - if(mysql_query(pData->f_hmysql, (char*)psz)) { + if(mysql_query(pWrkrData->hmysql, (char*)psz)) { /* error occured, try to re-init connection and retry */ - closeMySQL(pData); /* close the current handle */ - CHKiRet(initMySQL(pData, 0)); /* try to re-open */ - if(mysql_query(pData->f_hmysql, (char*)psz)) { /* re-try insert */ + closeMySQL(pWrkrData); /* close the current handle */ + CHKiRet(initMySQL(pWrkrData, 0)); /* try to re-open */ + if(mysql_query(pWrkrData->hmysql, (char*)psz)) { /* re-try insert */ /* we failed, giving up for now */ - reportDBError(pData, 0); - closeMySQL(pData); /* free ressources */ + reportDBError(pWrkrData, 0); + closeMySQL(pWrkrData); /* free ressources */ ABORT_FINALIZE(RS_RET_SUSPENDED); } } finalize_it: if(iRet == RS_RET_OK) { - pData->uLastMySQLErrno = 0; /* reset error for error supression */ + pWrkrData->uLastMySQLErrno = 0; /* reset error for error supression */ } RETiRet; @@ -261,28 +270,28 @@ finalize_it: BEGINtryResume CODESTARTtryResume - if(pData->f_hmysql == NULL) { - iRet = initMySQL(pData, 1); + if(pWrkrData->hmysql == NULL) { + iRet = initMySQL(pWrkrData, 1); } ENDtryResume BEGINbeginTransaction CODESTARTbeginTransaction - CHKiRet(writeMySQL((uchar*)"START TRANSACTION", pData)); + CHKiRet(writeMySQL(pWrkrData, (uchar*)"START TRANSACTION")); finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction dbgprintf("\n"); - CHKiRet(writeMySQL(ppString[0], pData)); + CHKiRet(writeMySQL(pWrkrData, ppString[0])); iRet = RS_RET_DEFER_COMMIT; finalize_it: ENDdoAction BEGINendTransaction CODESTARTendTransaction - if (mysql_commit(pData->f_hmysql) != 0) { + if(mysql_commit(pWrkrData->hmysql) != 0) { dbgprintf("mysql server error: transaction not committed\n"); iRet = RS_RET_SUSPENDED; } @@ -293,10 +302,9 @@ static inline void setInstParamDefaults(instanceData *pData) { pData->dbsrvPort = 0; - pData->f_configfile = NULL; - pData->f_configsection = NULL; + pData->configfile = NULL; + pData->configsection = NULL; pData->tplName = NULL; - pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */ } @@ -338,9 +346,9 @@ CODESTARTnewActInst strncpy(pData->dbpwd, cstr, sizeof(pData->dbpwd)); free(cstr); } else if(!strcmp(actpblk.descr[i].name, "mysqlconfig.file")) { - pData->f_configfile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + pData->configfile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "mysqlconfig.section")) { - pData->f_configsection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + pData->configsection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { @@ -424,9 +432,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } else { pData->dbsrvPort = (unsigned) cs.iSrvPort; /* set configured port */ - pData->f_configfile = cs.pszMySQLConfigFile; - pData->f_configsection = cs.pszMySQLConfigSection; - pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */ + pData->configfile = cs.pszMySQLConfigFile; + pData->configsection = cs.pszMySQLConfigSection; } CODE_STD_FINALIZERparseSelectorAct @@ -446,6 +453,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt @@ -484,7 +492,7 @@ CODEmodInit_QueryRegCFSLineHdlr mysql_server_init(0, NULL, NULL) # endif ) { - errmsg.LogError(0, NO_ERRCODE, "ommysql: mysql_server_init() failed, plugin " + errmsg.LogError(0, NO_ERRCODE, "ommysql: intializing mysql client failed, plugin " "can not run"); ABORT_FINALIZE(RS_RET_ERR); } diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c index 11765507..73419915 100644 --- a/plugins/omruleset/omruleset.c +++ b/plugins/omruleset/omruleset.c @@ -10,7 +10,7 @@ * * File begun on 2009-11-02 by RGerhards * - * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -70,6 +70,10 @@ typedef struct _instanceData { uchar *pszRulesetName; /* primarily for debugging/display purposes */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { ruleset_t *pRuleset; /* ruleset to enqueue message to (NULL = Default, not recommended) */ uchar *pszRulesetName; @@ -87,11 +91,21 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance free(pData->pszRulesetName); @@ -117,9 +131,9 @@ BEGINdoAction CODESTARTdoAction CHKmalloc(pMsg = MsgDup((msg_t*) ppString[0])); DBGPRINTF(":omruleset: forwarding message %p to ruleset %s[%p]\n", pMsg, - (char*) pData->pszRulesetName, pData->pRuleset); + (char*) pWrkrData->pData->pszRulesetName, pWrkrData->pData->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - MsgSetRuleset(pMsg, pData->pRuleset); + MsgSetRuleset(pMsg, pWrkrData->pData->pRuleset); /* Note: we intentionally use submitMsg2() here, as we process messages * that were already run through the rate-limiter. So it is (at least) * questionable if they were rate-limited again. @@ -199,6 +213,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES ENDqueryEtryPt diff --git a/plugins/omstdout/omstdout.c b/plugins/omstdout/omstdout.c index a84a7593..210b0165 100644 --- a/plugins/omstdout/omstdout.c +++ b/plugins/omstdout/omstdout.c @@ -6,7 +6,7 @@ * * File begun on 2009-03-19 by RGerhards * - * Copyright 2009-2012 Adiscon GmbH. + * Copyright 2009-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -60,6 +60,10 @@ typedef struct _instanceData { int bEnsureLFEnding; /* ensure that a linefeed is written at the end of EACH record (test aid for nettester) */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { int bUseArrayInterface; /* shall action use array instead of string template interface? */ int bEnsureLFEnding; /* shall action use array instead of string template interface? */ @@ -76,6 +80,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -88,6 +97,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo ENDdbgPrintInstInfo @@ -107,7 +121,7 @@ BEGINdoAction size_t len; int r; CODESTARTdoAction - if(pData->bUseArrayInterface) { + if(pWrkrData->pData->bUseArrayInterface) { /* if we use array passing, we need to put together a string * ourselves. At this point, please keep in mind that omstdout is * primarily a testing aid. Other modules may do different processing @@ -145,7 +159,7 @@ CODESTARTdoAction DBGPRINTF("omstdout: error %d writing to stdout[%d]: %s\n", r, len, toWrite); } - if(pData->bEnsureLFEnding && toWrite[len-1] != '\n') { + if(pWrkrData->pData->bEnsureLFEnding && toWrite[len-1] != '\n') { if((r = write(1, "\n", 1)) != 1) { /* write missing LF */ DBGPRINTF("omstdout: error %d writing \\n to stdout\n", r); @@ -186,6 +200,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES ENDqueryEtryPt diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c index c9f1e06b..2cc1159e 100644 --- a/plugins/omtesting/omtesting.c +++ b/plugins/omtesting/omtesting.c @@ -22,7 +22,7 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -63,7 +63,6 @@ MODULE_CNFNAME("omtesting") */ DEF_OMOD_STATIC_DATA - typedef struct _instanceData { enum { MD_SLEEP, MD_FAIL, MD_RANDFAIL, MD_ALWAYS_SUSPEND } mode; @@ -76,6 +75,10 @@ typedef struct _instanceData { int iCurrRetries; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { int bEchoStdout; /* echo non-failed messages to stdout */ } configSettings_t; @@ -93,6 +96,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("Action delays rule by %d second(s) and %d millisecond(s)\n", @@ -170,11 +178,11 @@ static rsRetVal doRandFail(void) BEGINtryResume CODESTARTtryResume dbgprintf("omtesting tryResume() called\n"); - switch(pData->mode) { + switch(pWrkrData->pData->mode) { case MD_SLEEP: break; case MD_FAIL: - iRet = doFailOnResume(pData); + iRet = doFailOnResume(pWrkrData->pData); break; case MD_RANDFAIL: iRet = doRandFail(); @@ -187,8 +195,10 @@ ENDtryResume BEGINdoAction + instanceData *pData; CODESTARTdoAction dbgprintf("omtesting received msg '%s'\n", ppString[0]); + pData = pWrkrData->pData; switch(pData->mode) { case MD_SLEEP: iRet = doSleep(pData); @@ -220,6 +230,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINparseSelectorAct int i; uchar szBuf[1024]; @@ -313,6 +328,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES ENDqueryEtryPt diff --git a/runtime/module-template.h b/runtime/module-template.h index 8a958f90..f65aaf0f 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -175,6 +175,44 @@ static rsRetVal freeInstance(void* pModData)\ RETiRet;\ } +/* createWrkrInstance() + */ +#define BEGINcreateWrkrInstance \ +static rsRetVal createWrkrInstance(wrkrInstanceData_t **ppWrkrData, instanceData *pData)\ + {\ + DEFiRet; /* store error code here */\ + wrkrInstanceData_t *pWrkrData; /* use this to point to data elements */ + +#define CODESTARTcreateWrkrInstance \ + if((pWrkrData = calloc(1, sizeof(wrkrInstanceData_t))) == NULL) {\ + *ppWrkrData = NULL;\ + ENDfunc \ + return RS_RET_OUT_OF_MEMORY;\ + } \ + pWrkrData->pData = pData; + +#define ENDcreateWrkrInstance \ + *ppWrkrData = pWrkrData;\ + RETiRet;\ +} + +/* freeWrkrInstance */ +#define BEGINfreeWrkrInstance \ +static rsRetVal freeWrkrInstance(void* pd)\ +{\ + DEFiRet;\ + wrkrInstanceData_t *pWrkrData; + +#define CODESTARTfreeWrkrInstance \ + pWrkrData = (wrkrInstanceData_t*) pd; + +#define ENDfreeWrkrInstance \ + if(pWrkrData != NULL)\ + free(pWrkrData); /* we need to free this in any case */\ + RETiRet;\ +} + + /* isCompatibleWithFeature() */ #define BEGINisCompatibleWithFeature \ @@ -194,7 +232,7 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINbeginTransaction \ -static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal beginTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -209,7 +247,7 @@ static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINendTransaction \ -static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal endTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -223,7 +261,7 @@ static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ /* doAction() */ #define BEGINdoAction \ -static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, instanceData __attribute__((unused)) *pData)\ +static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -382,12 +420,12 @@ static rsRetVal newInpInst(struct nvlst *lst)\ * rgerhard, 2007-08-02 */ #define BEGINtryResume \ -static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\ +static rsRetVal tryResume(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; #define CODESTARTtryResume \ - assert(pData != NULL); + assert(pWrkrData != NULL); #define ENDtryResume \ RETiRet;\ @@ -467,6 +505,13 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ *pEtryPoint = tryResume;\ } +/* standard queries for output module interface in rsyslog v8+ */ +#define CODEqueryEtryPt_STD_OMOD8_QUERIES \ + else if(!strcmp((char*) name, "createWrkrInstance")) {\ + *pEtryPoint = createWrkrInstance;\ + } else if(!strcmp((char*) name, "freeWrkrInstance")) {\ + *pEtryPoint = freeWrkrInstance;\ + } /* the following definition is queryEtryPt block that must be added * if an output module supports the transactional interface. diff --git a/runtime/modules.c b/runtime/modules.c index 56606306..52096082 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -652,6 +652,9 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"doAction", &pNew->mod.om.doAction)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume)); + CHKiRet((*pNew->modQueryEtryPt)((uchar*)"createWrkrInstance", &pNew->mod.om.createWrkrInstance)); + CHKiRet((*pNew->modQueryEtryPt)((uchar*)"freeWrkrInstance", &pNew->mod.om.freeWrkrInstance)); + /* try load optional interfaces */ localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP); if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) diff --git a/runtime/modules.h b/runtime/modules.h index 23df22d6..a0637ac9 100644 --- a/runtime/modules.h +++ b/runtime/modules.h @@ -139,6 +139,8 @@ struct modInfo_s { rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**); rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **); rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr); + rsRetVal (*createWrkrInstance)(void*ppWrkrData, void*pData); + rsRetVal (*freeWrkrInstance)(void*pWrkrData); } om; struct { /* data for library modules */ char dummy; diff --git a/runtime/queue.c b/runtime/queue.c index 30269152..e0f4481f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -959,7 +959,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; @@ -986,7 +986,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); @@ -999,7 +999,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) /* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead * otherwise incured. -- rgerhards, ~2010-06-23 */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti) { DEFiRet; @@ -1013,7 +1013,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL); + iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL); RETiRet; } @@ -1319,7 +1319,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*)) { DEFiRet; qqueue_t *pThis; @@ -1878,7 +1878,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2683,7 +2683,7 @@ finalize_it: /* now, the same function, but for direct mode */ static rsRetVal -qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti) { int i; DEFiRet; @@ -2692,7 +2692,7 @@ qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) assert(pMultiSub != NULL); for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2707,16 +2707,16 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg); + iRet = qAddDirect(pThis, pMsg, pWti); RETiRet; } -/* enqueue a new user data element +/* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ rsRetVal diff --git a/runtime/queue.h b/runtime/queue.h index 844523ad..91900b30 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -103,7 +103,7 @@ struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*, wti_t*,int*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero @@ -195,14 +195,14 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg); +rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*)); +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti); int queueCnfParamsSet(struct nvlst *lst); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst); void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index 8c808786..abce53b8 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -1205,6 +1205,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone! ABORT_FINALIZE(RS_RET_NO_ACTIONS); } tellLexEndParsing(); + DBGPRINTF("Number of actions in this configuration: %d\n", iActionNbr); rulesetOptimizeAll(loadConf); tellCoreConfigLoadDone(); diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 3c4d2eed..e39a1889 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -67,8 +67,8 @@ static struct cnfparamblk rspblk = }; /* forward definitions */ -static rsRetVal processBatch(batch_t *pBatch); -static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active); +static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti); +static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti); /* ---------- linked-list key handling functions (ruleset) ---------- */ @@ -168,7 +168,7 @@ finalize_it: * rgerhards, 2010-06-15 */ static inline rsRetVal -processBatchMultiRuleset(batch_t *pBatch) +processBatchMultiRuleset(batch_t *pBatch, wti_t *pWti) { ruleset_t *currRuleset; batch_t snglRuleBatch; @@ -206,7 +206,7 @@ processBatchMultiRuleset(batch_t *pBatch) snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ batchSetSingleRuleset(&snglRuleBatch, 1); /* process temp batch */ - processBatch(&snglRuleBatch); + processBatch(&snglRuleBatch, pWti); batchFree(&snglRuleBatch); } while(bHaveUnprocessed == 1); @@ -226,12 +226,12 @@ static inline void freeActive(sbool *active) { free(active); } /* for details, see scriptExec() header comment! */ /* call action for all messages with filter on */ static rsRetVal -execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { DEFiRet; dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); pBatch->active = active; - stmt->d.act->submitToActQ(stmt->d.act, pBatch); + stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti); RETiRet; } @@ -285,13 +285,13 @@ execStop(batch_t *pBatch, sbool *active) RETiRet; } static rsRetVal -execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { msg_t *pMsg; int i; DEFiRet; if(stmt->d.s_call.ruleset == NULL) { - scriptExec(stmt->d.s_call.stmt, pBatch, active); + scriptExec(stmt->d.s_call.stmt, pBatch, active, pWti); } else { for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { CHKmalloc(pMsg = MsgDup((msg_t*) pBatch->pElem[i].pMsg)); @@ -316,7 +316,7 @@ finalize_it: // set new filter, inverted // perform else (if any messages) static rsRetVal -execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *newAct; int i; @@ -345,7 +345,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) } if(stmt->d.s_if.t_then != NULL) { - scriptExec(stmt->d.s_if.t_then, pBatch, newAct); + scriptExec(stmt->d.s_if.t_then, pBatch, newAct, pWti); } if(stmt->d.s_if.t_else != NULL) { for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { @@ -355,7 +355,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) && (active == NULL || active[i])) newAct[i] = !newAct[i]; } - scriptExec(stmt->d.s_if.t_else, pBatch, newAct); + scriptExec(stmt->d.s_if.t_else, pBatch, newAct, pWti); } freeActive(newAct); finalize_it: @@ -364,7 +364,7 @@ finalize_it: /* for details, see scriptExec() header comment! */ static void -execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *newAct; msg_t *pMsg; @@ -391,7 +391,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) } if(stmt->d.s_prifilt.t_then != NULL) { - scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct); + scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct, pWti); } if(stmt->d.s_prifilt.t_else != NULL) { for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { @@ -401,7 +401,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) && (active == NULL || active[i])) newAct[i] = !newAct[i]; } - scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct); + scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct, pWti); } freeActive(newAct); } @@ -500,7 +500,7 @@ done: /* for details, see scriptExec() header comment! */ static void -execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti) { sbool *thenAct; sbool bRet; @@ -519,7 +519,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]); } - scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct); + scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct, pWti); freeActive(thenAct); } @@ -534,7 +534,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) * rgerhards, 2012-09-04 */ static rsRetVal -scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) +scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti) { DEFiRet; struct cnfstmt *stmt; @@ -552,7 +552,7 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) execStop(pBatch, active); break; case S_ACT: - execAct(stmt, pBatch, active); + execAct(stmt, pBatch, active, pWti); break; case S_SET: execSet(stmt, pBatch, active); @@ -561,16 +561,16 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) execUnset(stmt, pBatch, active); break; case S_CALL: - execCall(stmt, pBatch, active); + execCall(stmt, pBatch, active, pWti); break; case S_IF: - execIf(stmt, pBatch, active); + execIf(stmt, pBatch, active, pWti); break; case S_PRIFILT: - execPRIFILT(stmt, pBatch, active); + execPRIFILT(stmt, pBatch, active, pWti); break; case S_PROPFILT: - execPROPFILT(stmt, pBatch, active); + execPROPFILT(stmt, pBatch, active, pWti); break; default: dbgprintf("error: unknown stmt type %u during exec\n", @@ -589,7 +589,7 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) * rgerhards, 2005-10-13 */ static rsRetVal -processBatch(batch_t *pBatch) +processBatch(batch_t *pBatch, wti_t *pWti) { ruleset_t *pThis; DEFiRet; @@ -601,9 +601,9 @@ processBatch(batch_t *pBatch) if(pThis == NULL) pThis = ourConf->rulesets.pDflt; ISOBJ_TYPE_assert(pThis, ruleset); - CHKiRet(scriptExec(pThis->root, pBatch, NULL)); + CHKiRet(scriptExec(pThis->root, pBatch, NULL, pWti)); } else { - CHKiRet(processBatchMultiRuleset(pBatch)); + CHKiRet(processBatchMultiRuleset(pBatch, pWti)); } finalize_it: diff --git a/runtime/ruleset.h b/runtime/ruleset.h index 9905b53c..0af3578c 100644 --- a/runtime/ruleset.h +++ b/runtime/ruleset.h @@ -46,7 +46,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Destruct)(ruleset_t **ppThis); rsRetVal (*DestructAllActions)(rsconf_t *conf); rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName); - rsRetVal (*ProcessBatch)(batch_t*); + rsRetVal (*ProcessBatch)(batch_t*, wti_t *); rsRetVal (*GetRuleset)(rsconf_t *conf, ruleset_t **ppThis, uchar*); rsRetVal (*SetDefaultRuleset)(rsconf_t *conf, uchar*); rsRetVal (*SetCurrRuleset)(rsconf_t *conf, uchar*); @@ -64,8 +64,9 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ /* AddRule() removed */ /*TODO:REMOVE*/rsRetVal (*IterateAllActions)(rsconf_t *conf, rsRetVal (*pFunc)(void*, void*), void* pParam); void (*AddScript)(ruleset_t *pThis, struct cnfstmt *script); + /* v8: changed processBatch interface */ ENDinterface(ruleset) -#define rulesetCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */ +#define rulesetCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */ /* prototypes */ diff --git a/runtime/wti.c b/runtime/wti.c index f91fb5a9..df77bc19 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -44,6 +44,7 @@ #include "wti.h" #include "obj.h" #include "glbl.h" +#include "action.h" #include "atomic.h" /* static data */ @@ -171,6 +172,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); + free(pThis->actWrkrInfo); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); free(pThis->pszDbgHdr); @@ -195,11 +197,15 @@ wtiConstructFinalize(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n", + wtiGetDbgHdr(pThis), iActionNbr); /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = RSFALSE; + /* must use calloc as we need zero-init */ + CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t))); + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); @@ -277,6 +283,7 @@ wtiWorker(wti_t *pThis) rsRetVal localRet; rsRetVal terminateRet; int iCancelStateSave; + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -286,6 +293,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +dbgprintf("DDDD: wti %p: worker starting\n", pThis); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -332,9 +340,19 @@ wtiWorker(wti_t *pThis) bInactivityTOOccured = 0; /* reset for next run */ } + DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis); + for(i = 0 ; i < iActionNbr ; ++i) { + dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData); + if(pThis->actWrkrInfo[i].actWrkrData != NULL) { + dbgprintf("DDDD: calling freeWrkrData!\n"); + pThis->actWrkrInfo[i].pAction->pMod->mod.om.freeWrkrInstance(pThis->actWrkrInfo[i].actWrkrData); + } + } + /* indicate termination */ pthread_cleanup_pop(0); /* remove cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); +dbgprintf("DDDD: wti %p: worker exiting\n", pThis); RETiRet; } diff --git a/runtime/wti.h b/runtime/wti.h index 014251f0..bb4f56bc 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -28,6 +28,11 @@ #include "batch.h" +typedef struct actWrkrInfo { + action_t *pAction; + void *actWrkrData; +} actWrkrInfo_t; + /* the worker thread instance class */ struct wti_s { BEGINobjInstance; @@ -37,6 +42,7 @@ struct wti_s { wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ + actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); }; diff --git a/runtime/wtp.c b/runtime/wtp.c index 19151e7c..1b960eb9 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -8,7 +8,7 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -91,6 +91,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); pthread_attr_init(&pThis->attrThrd); /* Set thread scheduling policy to default */ +#warning do we need this any longer? I think it was a cure for an already fixed bug.. #ifdef HAVE_PTHREAD_SETSCHEDPARAM pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy); pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param); @@ -121,7 +122,8 @@ wtpConstructFinalize(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker thread pool (numworkerThreads %d)\n", + wtpGetDbgHdr(pThis), pThis->iNumWorkerThreads); /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ diff --git a/tools/omdiscard.c b/tools/omdiscard.c index 15c6ea82..a76bcc33 100644 --- a/tools/omdiscard.c +++ b/tools/omdiscard.c @@ -6,7 +6,7 @@ * * File begun on 2007-07-24 by RGerhards * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -49,6 +49,10 @@ typedef struct _instanceData { EMPTY_STRUCT } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* we do not need a createInstance()! BEGINcreateInstance CODESTARTcreateInstance @@ -56,6 +60,11 @@ ENDcreateInstance */ +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* do nothing */ @@ -87,6 +96,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINparseSelectorAct CODESTARTparseSelectorAct CODE_STD_STRING_REQUESTparseSelectorAct(0) @@ -114,6 +128,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt diff --git a/tools/omfile.c b/tools/omfile.c index fdcf355a..90efe71a 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -133,6 +133,7 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry; typedef struct _instanceData { + pthread_mutex_t mutWrite; /* guard against multiple instances writing to single file */ uchar *f_fname; /* file or template name (display only) */ uchar *tplName; /* name of assigned template */ strm_t *pStrm; /* our output stream */ @@ -181,6 +182,20 @@ typedef struct _instanceData { STATSCOUNTER_DEF(ctrMax, mutCtrMax); } instanceData; +/* to build a linked list for temporary storage of lines while we cannot commit */ +typedef struct linebuf { + uchar *filename; /* for dynafiles, make go away */ + uchar *ln; + unsigned iMsgOpts; + struct linebuf *pNext; +} linebuf_t; + +typedef struct wrkrInstanceData { + instanceData *pData; + linebuf_t *pRoot; + linebuf_t *pLast; +} wrkrInstanceData_t; + typedef struct configSettings_s { int iDynaFileCacheSize; /* max cache for dynamic files */ @@ -786,7 +801,7 @@ finalize_it: /* rgerhards 2004-11-11: write to a file output. */ static rsRetVal -writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) +writeFile(instanceData *pData, linebuf_t *linebuf) { DEFiRet; @@ -796,7 +811,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) * check if it still is ok or a new file needs to be created */ if(pData->bDynamicName) { - CHKiRet(prepareDynFile(pData, ppString[1], iMsgOpts)); + CHKiRet(prepareDynFile(pData, linebuf->filename, linebuf->iMsgOpts)); } else { /* "regular", non-dynafile */ if(pData->pStrm == NULL) { CHKiRet(prepareFile(pData, pData->f_fname)); @@ -806,7 +821,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) } } - CHKiRet(doWrite(pData, ppString[0], strlen(CHAR_CONVERT(ppString[0])))); + CHKiRet(doWrite(pData, linebuf->ln, ustrlen(linebuf->ln))); finalize_it: RETiRet; @@ -888,9 +903,15 @@ ENDfreeCnf BEGINcreateInstance CODESTARTcreateInstance pData->pStrm = NULL; + pthread_mutex_init(&pData->mutWrite, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance free(pData->tplName); @@ -913,9 +934,15 @@ CODESTARTfreeInstance free(pData->cryprovName); free(pData->cryprovNameFull); } + pthread_mutex_destroy(&pData->mutWrite); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume CODESTARTtryResume ENDtryResume @@ -926,8 +953,68 @@ CODESTARTbeginTransaction ENDbeginTransaction +static rsRetVal +bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line) +{ + linebuf_t *lb; + DEFiRet; + + CHKmalloc(lb = (linebuf_t*) malloc(sizeof(linebuf_t))); + CHKmalloc(lb->filename = ustrdup(filename)); + CHKmalloc(lb->ln = ustrdup(line)); + lb->pNext = NULL; + if(pWrkrData->pRoot == NULL) { + pWrkrData->pRoot = pWrkrData->pLast = lb; + } else { + pWrkrData->pLast->pNext = lb; + pWrkrData->pLast = lb; + } +finalize_it: + RETiRet; +} + +static void +submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData) +{ + linebuf_t *curr, *todel; + +dbgprintf("omfile: waiting on write lock (pWrkrData %p)\n", pWrkrData); + pthread_mutex_lock(&pData->mutWrite); +dbgprintf("omfile: aquired write lock (pWrkrData %p)\n", pWrkrData); + + for(curr = pWrkrData->pRoot ; curr != NULL ; ) { + DBGPRINTF("omfile: file to log to: %s\n", curr->filename); + DBGPRINTF("omfile: start of data: '%.128s'\n", curr->ln); + STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); + writeFile(pData, curr); + + todel = curr; + curr = curr->pNext; + free(todel->filename); + free(todel->ln); + free(todel); + } + pthread_mutex_unlock(&pData->mutWrite); +dbgprintf("omfile: free write lock (pWrkrData %p)\n", pWrkrData); + pWrkrData->pRoot = NULL; +} + + +BEGINdoAction + instanceData *pData; +CODESTARTdoAction + pData = pWrkrData->pData; + iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname, + ppString[0]); + if(iRet == RS_RET_OK) + iRet = RS_RET_DEFER_COMMIT; +ENDdoAction + BEGINendTransaction + instanceData *pData; CODESTARTendTransaction + pData = pWrkrData->pData; + submitCachedLines(pWrkrData, pData); /* Note: pStrm may be NULL if there was an error opening the stream */ if(pData->bFlushOnTXEnd && pData->pStrm != NULL) { /* if we have an async writer, it controls the flush via @@ -941,21 +1028,6 @@ finalize_it: ENDendTransaction -BEGINdoAction -CODESTARTdoAction - DBGPRINTF("file to log to: %s\n", - (pData->bDynamicName) ? ppString[1] : pData->f_fname); - DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]); - STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); - CHKiRet(writeFile(ppString, iMsgOpts, pData)); - if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) { - CHKiRet(strm.Flush(pData->pStrm)); - } -finalize_it: - if(iRet == RS_RET_OK) - iRet = RS_RET_DEFER_COMMIT; -ENDdoAction - static inline void setInstParamDefaults(instanceData *pData) @@ -1336,6 +1408,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a BEGINdoHUP CODESTARTdoHUP + pthread_mutex_lock(&pData->mutWrite); if(pData->bDynamicName) { dynaFileFreeCacheEntries(pData); } else { @@ -1343,6 +1416,7 @@ CODESTARTdoHUP closeFile(pData); } } + pthread_mutex_unlock(&pData->mutWrite); ENDdoHUP @@ -1358,6 +1432,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES diff --git a/tools/omfwd.c b/tools/omfwd.c index 6e5cf809..ed0898c9 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -112,6 +112,10 @@ typedef struct _instanceData { int errsToReport; /* (remaining) number of errors to report */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* config data */ typedef struct configSettings_s { uchar *pszTplName; /* name of the default template to use */ @@ -335,6 +339,12 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + dbgprintf("DDDD: createWrkrInstance: pWrkrData %p\n", pWrkrData); +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -360,6 +370,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("%s", pData->target); @@ -720,7 +735,8 @@ finalize_it: BEGINtryResume CODESTARTtryResume - iRet = doTryResume(pData); + dbgprintf("DDDD: tryResume: pWrkrData %p\n", pWrkrData); + iRet = doTryResume(pWrkrData->pData); ENDtryResume @@ -737,7 +753,10 @@ BEGINdoAction # ifdef USE_NETZIP Bytef *out = NULL; /* for compression */ # endif + instanceData *pData; CODESTARTdoAction + dbgprintf("DDDD: doAction: pWrkrData %p\n", pWrkrData); + pData = pWrkrData->pData; CHKiRet(doTryResume(pData)); iMaxLine = glbl.GetMaxLine(); @@ -813,7 +832,9 @@ ENDdoAction BEGINendTransaction + instanceData *pData; CODESTARTendTransaction + pData = pWrkrData->pData; dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf); if(pData->offsSndBuf != 0) { iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, IS_FLUSH); @@ -1251,6 +1272,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES diff --git a/tools/ompipe.c b/tools/ompipe.c index df8066b1..79f3ae84 100644 --- a/tools/ompipe.c +++ b/tools/ompipe.c @@ -12,7 +12,7 @@ * NOTE: read comments in module-template.h to understand how this pipe * works! * - * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -69,9 +69,14 @@ typedef struct _instanceData { uchar *pipe; /* pipe or template name (display only) */ uchar *tplName; /* format template to use */ short fd; /* pipe descriptor for (current) pipe */ + pthread_mutex_t mutWrite; /* guard against multiple instances writing to same pipe */ sbool bHadError; /* did we already have/report an error on this pipe? */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { EMPTY_STRUCT } configSettings_t; @@ -276,25 +281,42 @@ CODESTARTcreateInstance pData->pipe = NULL; pData->fd = -1; pData->bHadError = 0; + pthread_mutex_init(&pData->mutWrite, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance + pthread_mutex_destroy(&pData->mutWrite); free(pData->pipe); if(pData->fd != -1) close(pData->fd); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume CODESTARTtryResume ENDtryResume BEGINdoAction + instanceData *pData; CODESTARTdoAction + pData = pWrkrData->pData; DBGPRINTF(" (%s)\n", pData->pipe); + /* this module is single-threaded by nature */ + pthread_mutex_lock(&pData->mutWrite); iRet = writePipe(ppString, pData); + pthread_mutex_unlock(&pData->mutWrite); ENDdoAction @@ -390,6 +412,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_doHUP CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES diff --git a/tools/omshell.c b/tools/omshell.c index ac62fa62..ad6e979f 100644 --- a/tools/omshell.c +++ b/tools/omshell.c @@ -19,7 +19,7 @@ * of the "old" message code without any modifications. However, it * helps to have things at the right place one we go to the meat of it. * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -63,11 +63,20 @@ typedef struct _instanceData { uchar progName[MAXFNAME]; /* program to execute */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -80,6 +89,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo printf("%s", pData->progName); @@ -92,13 +106,9 @@ ENDtryResume BEGINdoAction CODESTARTdoAction - /* TODO: using pData->progName is not clean from the point of - * modularization. We'll change that as we go ahead with modularization. - * rgerhards, 2007-07-20 - */ dbgprintf("\n"); - if(execProg((uchar*) pData->progName, 1, ppString[0]) == 0) - errmsg.LogError(0, NO_ERRCODE, "Executing program '%s' failed", (char*)pData->progName); + if(execProg((uchar*) pWrkrData->pData->progName, 1, ppString[0]) == 0) + errmsg.LogError(0, NO_ERRCODE, "Executing program '%s' failed", (char*)pWrkrData->pData->progName); ENDdoAction @@ -139,6 +149,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt diff --git a/tools/omusrmsg.c b/tools/omusrmsg.c index f4cc4094..5d0b088f 100644 --- a/tools/omusrmsg.c +++ b/tools/omusrmsg.c @@ -8,7 +8,7 @@ * File begun on 2007-07-20 by RGerhards (extracted from syslogd.c, which at the * time of the fork from sysklogd was under BSD license) * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -87,6 +87,10 @@ typedef struct _instanceData { uchar *tplName; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { EMPTY_STRUCT } configSettings_t; @@ -115,6 +119,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -128,6 +137,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo register int i; CODESTARTdbgPrintInstInfo @@ -276,7 +290,7 @@ ENDtryResume BEGINdoAction CODESTARTdoAction dbgprintf("\n"); - iRet = wallmsg(ppString[0], pData); + iRet = wallmsg(ppString[0], pWrkrData->pData); ENDdoAction @@ -435,6 +449,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/tools/syslogd.c b/tools/syslogd.c index aaeb9866..7597b05d 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -560,13 +560,13 @@ finalize_it: * for the main queue. */ static rsRetVal -msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate) +msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { DEFiRet; assert(pBatch != NULL); pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */ preprocessBatch(pBatch); - ruleset.ProcessBatch(pBatch); + ruleset.ProcessBatch(pBatch, pWti); //TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we //do not have this yet and so we emulate -- 2010-06-10 int i; |