diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 98 |
1 files changed, 66 insertions, 32 deletions
@@ -69,7 +69,7 @@ * beast. * rgerhards, 2011-06-15 * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -114,6 +114,7 @@ #include "unicode-helper.h" #include "atomic.h" #include "ruleset.h" +#include "parserif.h" #include "statsobj.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ @@ -179,6 +180,7 @@ configSettings_t cs_save; /* our saved (scope!) config settings */ * counting. -- rgerhards, 2008-01-29 */ static int iActionNbr = 0; +int bActionReportSuspension = 1; /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfparamdescr[] = { @@ -253,8 +255,8 @@ actionResetQueueParams(void) cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ cs.iActionQueueSize = 1000; /* size of the main message queue above */ cs.iActionQueueDeqBatchSize = 16; /* default batch size */ - cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ - cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ + cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */ + cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */ cs.iActionQDiscardMark = 980; /* begin to discard messages */ cs.iActionQDiscardSeverity = 8; /* discard warning and above */ cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ @@ -265,7 +267,7 @@ actionResetQueueParams(void) cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ cs.iActionQtoEnq = 50; /* timeout for queue enque */ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ - cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ + cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ cs.iActionQueMaxDiskSpace = 0; cs.iActionQueueDeqSlowdown = 0; @@ -357,7 +359,7 @@ finalize_it: /* action construction finalizer */ rsRetVal -actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) +actionConstructFinalize(action_t *pThis, struct nvlst *lst) { DEFiRet; uchar pszAName[64]; /* friendly name of our action */ @@ -382,11 +384,22 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), - ctrType_IntCtr, &pThis->ctrProcessed)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrProcessed)); STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), - ctrType_IntCtr, &pThis->ctrFail)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFail)); + + STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrSuspend)); + STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"), + ctrType_IntCtr, 0, &pThis->ctrSuspendDuration)); + + STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrResume)); CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); @@ -432,7 +445,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) obj.SetName((obj_t*) pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); - if(queueParams == NULL) { /* use legacy params? */ + if(lst == NULL) { /* use legacy params? */ /* ... set some properties ... */ # define setQPROP(func, directive, data) \ CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ @@ -467,7 +480,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) } else { /* we have v6-style config params */ qqueueSetDefaultsActionQueue(pThis->pQueue); - qqueueApplyCnfParam(pThis->pQueue, queueParams); + qqueueApplyCnfParam(pThis->pQueue, lst); } # undef setQPROP @@ -476,6 +489,12 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) qqueueDbgPrint(pThis->pQueue); DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); + + if(pThis->eParamPassing == ACT_MSG_PASSING && pThis->pQueue->qType != QUEUETYPE_DIRECT) { + parser_warnmsg("module %s with message passing mode uses " + "non-direct queue. This most probably leads to undesired " + "results", (char*)modGetName(pThis->pMod)); + } /* and now reset the queue params (see comment in its function header!) */ actionResetQueueParams(); @@ -598,7 +617,7 @@ static void actionDisable(action_t *pThis) } -/* Suspend action, this involves changing the acton state as well +/* Suspend action, this involves changing the action state as well * as setting the next retry time. * if we have more than 10 retries, we prolong the * retry interval. If something is really stalled, it will @@ -606,17 +625,34 @@ static void actionDisable(action_t *pThis) * CPU time. TODO: maybe a config option for that? * rgerhards, 2007-08-02 */ -static inline void actionSuspend(action_t *pThis) +static inline void +actionSuspend(action_t * const pThis) { time_t ttNow; + int suspendDuration; + char timebuf[32]; /* note: we can NOT use a cached timestamp, as time may have evolved * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + suspendDuration = pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + pThis->ttResumeRtry = ttNow + suspendDuration; actionSetState(pThis, ACT_STATE_SUSP); - DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); + pThis->ctrSuspendDuration += suspendDuration; + if(pThis->iNbrResRtry == 0) { + STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend); + } + DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d\n", + pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow, + pThis->iNbrResRtry); + if(bActionReportSuspension) { + ctime_r(&pThis->ttResumeRtry, timebuf); + timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */ + errmsg.LogMsg(0, RS_RET_NOT_FOUND, LOG_WARNING, + "action '%s' suspended, next retry is %s", + pThis->pszName, timebuf); + } } @@ -647,9 +683,9 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) iRetries = 0; while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { - DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); + DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d\n", pThis->pszName, iRetries); iRet = pThis->pMod->tryResume(pThis->pModData); - DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); + DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet); if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { bTreatOKasSusp = 1; pThis->iResumeOKinRow = 0; @@ -657,16 +693,23 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate) bTreatOKasSusp = 0; } if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { - DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); + DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n", + pThis->pszName, iRet); + if(bActionReportSuspension) { + errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "action '%s' resumed", + pThis->pszName); + } actionSetState(pThis, ACT_STATE_RDY); } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ - DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", - pThis->iResumeRetryCount, iRetries); + DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount " + "%d, iRetries %d\n", + pThis->pszName, pThis->iResumeRetryCount, iRetries); if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis); + if(pThis->iNbrResRtry < 20) + ++pThis->iNbrResRtry; } else { - ++pThis->iNbrResRtry; ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); @@ -1789,7 +1832,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, - struct cnfparamvals *queueParams, int bSuspended) + struct nvlst *lst, int bSuspended) { DEFiRet; int i; @@ -1882,7 +1925,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, if(bSuspended) actionSuspend(pAction); - CHKiRet(actionConstructFinalize(pAction, queueParams)); + CHKiRet(actionConstructFinalize(pAction, lst)); /* TODO: if we exit here, we have a memory leak... */ @@ -1941,26 +1984,19 @@ rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction) { struct cnfparamvals *paramvals; - struct cnfparamvals *queueParams; modInfo_t *pMod; uchar *cnfModName = NULL; omodStringRequest_t *pOMSR; void *pModData; action_t *pAction; - int typeIdx; DEFiRet; paramvals = nvlstGetParams(lst, &pblk, NULL); if(paramvals == NULL) { - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); } dbgprintf("action param blk after actionNewInst:\n"); cnfparamsPrint(&pblk, paramvals); - typeIdx = cnfparamGetIdx(&pblk, "type"); - if(paramvals[typeIdx].bUsed == 0) { - errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "action type missing"); - ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers - } cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL); if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) { errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); @@ -1972,9 +2008,7 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) FINALIZE; /* iRet is already set to error state */ } - qqueueDoCnfParams(lst, &queueParams); - - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams, + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ |