summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c98
1 files changed, 66 insertions, 32 deletions
diff --git a/action.c b/action.c
index 39611594..145856eb 100644
--- a/action.c
+++ b/action.c
@@ -69,7 +69,7 @@
* beast.
* rgerhards, 2011-06-15
*
- * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -114,6 +114,7 @@
#include "unicode-helper.h"
#include "atomic.h"
#include "ruleset.h"
+#include "parserif.h"
#include "statsobj.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
@@ -179,6 +180,7 @@ configSettings_t cs_save; /* our saved (scope!) config settings */
* counting. -- rgerhards, 2008-01-29
*/
static int iActionNbr = 0;
+int bActionReportSuspension = 1;
/* tables for interfacing with the v6 config system */
static struct cnfparamdescr cnfparamdescr[] = {
@@ -253,8 +255,8 @@ actionResetQueueParams(void)
cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
cs.iActionQueueSize = 1000; /* size of the main message queue above */
cs.iActionQueueDeqBatchSize = 16; /* default batch size */
- cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
- cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
+ cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */
+ cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */
cs.iActionQDiscardMark = 980; /* begin to discard messages */
cs.iActionQDiscardSeverity = 8; /* discard warning and above */
cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
@@ -265,7 +267,7 @@ actionResetQueueParams(void)
cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
cs.iActionQtoEnq = 50; /* timeout for queue enque */
cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
- cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
+ cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */
cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
cs.iActionQueMaxDiskSpace = 0;
cs.iActionQueueDeqSlowdown = 0;
@@ -357,7 +359,7 @@ finalize_it:
/* action construction finalizer
*/
rsRetVal
-actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
+actionConstructFinalize(action_t *pThis, struct nvlst *lst)
{
DEFiRet;
uchar pszAName[64]; /* friendly name of our action */
@@ -382,11 +384,22 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"),
- ctrType_IntCtr, &pThis->ctrProcessed));
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrProcessed));
STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"),
- ctrType_IntCtr, &pThis->ctrFail));
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFail));
+
+ STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrSuspend));
+ STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"),
+ ctrType_IntCtr, 0, &pThis->ctrSuspendDuration));
+
+ STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrResume));
CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
@@ -432,7 +445,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
obj.SetName((obj_t*) pThis->pQueue, pszAName);
qqueueSetpAction(pThis->pQueue, pThis);
- if(queueParams == NULL) { /* use legacy params? */
+ if(lst == NULL) { /* use legacy params? */
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
@@ -467,7 +480,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
} else {
/* we have v6-style config params */
qqueueSetDefaultsActionQueue(pThis->pQueue);
- qqueueApplyCnfParam(pThis->pQueue, queueParams);
+ qqueueApplyCnfParam(pThis->pQueue, lst);
}
# undef setQPROP
@@ -476,6 +489,12 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
qqueueDbgPrint(pThis->pQueue);
DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue);
+
+ if(pThis->eParamPassing == ACT_MSG_PASSING && pThis->pQueue->qType != QUEUETYPE_DIRECT) {
+ parser_warnmsg("module %s with message passing mode uses "
+ "non-direct queue. This most probably leads to undesired "
+ "results", (char*)modGetName(pThis->pMod));
+ }
/* and now reset the queue params (see comment in its function header!) */
actionResetQueueParams();
@@ -598,7 +617,7 @@ static void actionDisable(action_t *pThis)
}
-/* Suspend action, this involves changing the acton state as well
+/* Suspend action, this involves changing the action state as well
* as setting the next retry time.
* if we have more than 10 retries, we prolong the
* retry interval. If something is really stalled, it will
@@ -606,17 +625,34 @@ static void actionDisable(action_t *pThis)
* CPU time. TODO: maybe a config option for that?
* rgerhards, 2007-08-02
*/
-static inline void actionSuspend(action_t *pThis)
+static inline void
+actionSuspend(action_t * const pThis)
{
time_t ttNow;
+ int suspendDuration;
+ char timebuf[32];
/* note: we can NOT use a cached timestamp, as time may have evolved
* since caching, and this would break logic (and it actually did so!)
*/
datetime.GetTime(&ttNow);
- pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ suspendDuration = pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ pThis->ttResumeRtry = ttNow + suspendDuration;
actionSetState(pThis, ACT_STATE_SUSP);
- DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry);
+ pThis->ctrSuspendDuration += suspendDuration;
+ if(pThis->iNbrResRtry == 0) {
+ STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend);
+ }
+ DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d\n",
+ pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow,
+ pThis->iNbrResRtry);
+ if(bActionReportSuspension) {
+ ctime_r(&pThis->ttResumeRtry, timebuf);
+ timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */
+ errmsg.LogMsg(0, RS_RET_NOT_FOUND, LOG_WARNING,
+ "action '%s' suspended, next retry is %s",
+ pThis->pszName, timebuf);
+ }
}
@@ -647,9 +683,9 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
iRetries = 0;
while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
- DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
+ DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d\n", pThis->pszName, iRetries);
iRet = pThis->pMod->tryResume(pThis->pModData);
- DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
+ DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet);
if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) {
bTreatOKasSusp = 1;
pThis->iResumeOKinRow = 0;
@@ -657,16 +693,23 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
bTreatOKasSusp = 0;
}
if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
- DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet);
+ DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n",
+ pThis->pszName, iRet);
+ if(bActionReportSuspension) {
+ errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "action '%s' resumed",
+ pThis->pszName);
+ }
actionSetState(pThis, ACT_STATE_RDY);
} else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
/* max retries reached? */
- DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n",
- pThis->iResumeRetryCount, iRetries);
+ DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount "
+ "%d, iRetries %d\n",
+ pThis->pszName, pThis->iResumeRetryCount, iRetries);
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
actionSuspend(pThis);
+ if(pThis->iNbrResRtry < 20)
+ ++pThis->iNbrResRtry;
} else {
- ++pThis->iNbrResRtry;
++iRetries;
iSleepPeriod = pThis->iResumeInterval;
srSleep(iSleepPeriod, 0);
@@ -1789,7 +1832,7 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
rsRetVal
addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
omodStringRequest_t *pOMSR, struct cnfparamvals *actParams,
- struct cnfparamvals *queueParams, int bSuspended)
+ struct nvlst *lst, int bSuspended)
{
DEFiRet;
int i;
@@ -1882,7 +1925,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
if(bSuspended)
actionSuspend(pAction);
- CHKiRet(actionConstructFinalize(pAction, queueParams));
+ CHKiRet(actionConstructFinalize(pAction, lst));
/* TODO: if we exit here, we have a memory leak... */
@@ -1941,26 +1984,19 @@ rsRetVal
actionNewInst(struct nvlst *lst, action_t **ppAction)
{
struct cnfparamvals *paramvals;
- struct cnfparamvals *queueParams;
modInfo_t *pMod;
uchar *cnfModName = NULL;
omodStringRequest_t *pOMSR;
void *pModData;
action_t *pAction;
- int typeIdx;
DEFiRet;
paramvals = nvlstGetParams(lst, &pblk, NULL);
if(paramvals == NULL) {
- ABORT_FINALIZE(RS_RET_ERR);
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
dbgprintf("action param blk after actionNewInst:\n");
cnfparamsPrint(&pblk, paramvals);
- typeIdx = cnfparamGetIdx(&pblk, "type");
- if(paramvals[typeIdx].bUsed == 0) {
- errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "action type missing");
- ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers
- }
cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL);
if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) {
errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName);
@@ -1972,9 +2008,7 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
FINALIZE; /* iRet is already set to error state */
}
- qqueueDoCnfParams(lst, &queueParams);
-
- if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams,
+ if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst,
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* check if the module is compatible with select features
* (currently no such features exist) */