summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c58
-rw-r--r--action.h4
-rw-r--r--configure.ac2
-rw-r--r--doc/impstats.html4
-rw-r--r--plugins/impstats/impstats.c2
-rw-r--r--plugins/imudp/imudp.c17
-rw-r--r--runtime/msg.c4
-rw-r--r--runtime/queue.c11
-rw-r--r--runtime/queue.h2
-rw-r--r--tcps_sess.c1
-rw-r--r--tcpsrv.c15
-rw-r--r--tcpsrv.h3
12 files changed, 110 insertions, 13 deletions
diff --git a/action.c b/action.c
index 278625ce..7226e7d6 100644
--- a/action.c
+++ b/action.c
@@ -112,6 +112,7 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "atomic.h"
+#include "statsobj.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
@@ -127,6 +128,7 @@ DEFobjCurrIf(obj)
DEFobjCurrIf(datetime)
DEFobjCurrIf(module)
DEFobjCurrIf(errmsg)
+DEFobjCurrIf(statsobj)
static int iActExecOnceInterval = 0; /* execute action once every nn seconds */
static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */
@@ -263,6 +265,12 @@ rsRetVal actionDestruct(action_t *pThis)
qqueueDestruct(&pThis->pQueue);
}
+ /* destroy stats object, if we have one (may not always be
+ * be the case, e.g. if turned off)
+ */
+ if(pThis->statsobj != NULL)
+ statsobj.Destruct(&pThis->statsobj);
+
if(pThis->pMod != NULL)
pThis->pMod->freeInstance(pThis->pModData);
@@ -313,12 +321,35 @@ rsRetVal
actionConstructFinalize(action_t *pThis)
{
DEFiRet;
- uchar pszQName[64]; /* friendly name of our queue */
+ uchar pszAName[64]; /* friendly name of our queue */
ASSERT(pThis != NULL);
+ /* generate a friendly name for us action stats */
+ if(pThis->pszName == NULL) {
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr);
+ } else {
+ ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
+ pszAName[63] = '\0'; /* to be on the save side */
+ }
+
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&pThis->statsobj));
+ CHKiRet(statsobj.SetName(pThis->statsobj, pszAName));
+
+ STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"),
+ ctrType_IntCtr, &pThis->ctrProcessed));
+
+ STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"),
+ ctrType_IntCtr, &pThis->ctrFail));
+
+ CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
+
+ /* create our queue */
/* find a name for our queue */
- snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue", iActionNbr);
/* now check if we can run the action in "firehose mode" during stage one of
* its processing (that is before messages are enqueued into the action q).
@@ -362,7 +393,7 @@ actionConstructFinalize(action_t *pThis)
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
(rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
- obj.SetName((obj_t*) pThis->pQueue, pszQName);
+ obj.SetName((obj_t*) pThis->pQueue, pszAName);
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
@@ -1072,6 +1103,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
&& pBatch->pElem[i].state != BATCH_STATE_COMM ) {
pBatch->pElem[i].state = BATCH_STATE_BAD;
pBatch->pElem[i].bPrevWasSuspended = 1;
+ STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
}
}
bDone = 1;
@@ -1261,6 +1293,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
else
@@ -1537,6 +1570,17 @@ finalize_it:
RETiRet;
}
+static inline void
+countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
+ }
+ }
+}
+
/* enqueue a batch in direct mode. We have put this into its own function just to avoid
* cluttering the actual submit function.
@@ -1573,13 +1617,16 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
pBatch->pElem[i].bFilterOK = 0;
bModifiedFilter = 1;
}
- if(pBatch->pElem[i].bFilterOK)
+ if(pBatch->pElem[i].bFilterOK) {
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
bNeedSubmit = 1;
+ }
DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
}
if(bNeedSubmit) {
+ /* note: stats were already computed above */
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
} else {
DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
@@ -1594,6 +1641,8 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
}
} else {
+ if(GatherStats)
+ countStatsBatchEnq(pAction, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
}
@@ -1817,6 +1866,7 @@ rsRetVal actionClassInit(void)
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(module, CORE_COMPONENT));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
diff --git a/action.h b/action.h
index bae64d31..7a283a4a 100644
--- a/action.h
+++ b/action.h
@@ -89,6 +89,10 @@ struct action_s {
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
uchar *pszName; /* action name (for documentation) */
DEF_ATOMIC_HELPER_MUT(mutCAS);
+ /* for statistics subsystem */
+ statsobj_t *statsobj;
+ STATSCOUNTER_DEF(ctrProcessed, mutCtrProcessed);
+ STATSCOUNTER_DEF(ctrFail, mutCtrFail);
};
diff --git a/configure.ac b/configure.ac
index 5a77a8a6..99d7b203 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-AC_INIT([rsyslog],[5.8.6],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[5.8.6-newstats1],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
diff --git a/doc/impstats.html b/doc/impstats.html
index cede4874..260c1aa4 100644
--- a/doc/impstats.html
+++ b/doc/impstats.html
@@ -18,7 +18,9 @@ prepared to change your trending scripts when you upgrade to a newer rsyslog ver
output is periodic, with the interval being configurable (default is 5 minutes).
Be sure that your configuration records the counter messages (default is syslog.info).
<p>Note that loading this module has impact on rsyslog performance. Depending on
-settings, this impact may be severe (for high-load environments).
+settings, this impact may be noticable (for high-load environments).
+<p>The rsyslog website has an updated overview of available
+<a href="http://rsyslog.com/rsyslog-statistic-counter/">rsyslog statistic counters</a>.
</p>
<p><b>Configuration Directives</b>:</p>
<ul>
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c
index cfbbad76..dc2541b8 100644
--- a/plugins/impstats/impstats.c
+++ b/plugins/impstats/impstats.c
@@ -10,7 +10,7 @@
* of the "old" message code without any modifications. However, it
* helps to have things at the right place one we go to the meat of it.
*
- * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2010-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index a5002591..32cd89d2 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -51,6 +51,7 @@
#include "datetime.h"
#include "prop.h"
#include "ruleset.h"
+#include "statsobj.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -66,6 +67,10 @@ DEFobjCurrIf(net)
DEFobjCurrIf(datetime)
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
+DEFobjCurrIf(statsobj)
+
+statsobj_t *modStats;
+STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
static int iMaxLine; /* maximum UDP message size supported */
@@ -377,6 +382,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev,
pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
CHKiRet(submitMsg(pMsg));
+ STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
}
}
@@ -622,9 +628,12 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
+ statsobj.Destruct(&modStats);
+
/* release what we no longer need */
objRelease(errmsg, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(datetime, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
@@ -662,6 +671,7 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
@@ -682,6 +692,13 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
+
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&modStats));
+ CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imudp")));
+ CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &ctrSubmit));
+ CHKiRet(statsobj.ConstructFinalize(modStats));
ENDmodInit
/* vim:set ai:
*/
diff --git a/runtime/msg.c b/runtime/msg.c
index 0744edd5..759554d8 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -1671,8 +1671,6 @@ void MsgSetTAG(msg_t *pMsg, uchar* pszBuf, size_t lenBuf)
uchar *pBuf;
assert(pMsg != NULL);
-dbgprintf("MsgSetTAG in: len %d, pszBuf: %s\n", lenBuf, pszBuf);
-
freeTAG(pMsg);
pMsg->iLenTAG = lenBuf;
@@ -1691,8 +1689,6 @@ dbgprintf("MsgSetTAG in: len %d, pszBuf: %s\n", lenBuf, pszBuf);
memcpy(pBuf, pszBuf, pMsg->iLenTAG);
pBuf[pMsg->iLenTAG] = '\0'; /* this also works with truncation! */
-
-dbgprintf("MsgSetTAG exit: pMsg->iLenTAG %d, pMsg->TAG.szBuf: %s\n", pMsg->iLenTAG, pMsg->TAG.szBuf);
}
diff --git a/runtime/queue.c b/runtime/queue.c
index 9012abeb..e97d78e8 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -12,7 +12,7 @@
* function names - this makes it really hard to read and does not provide much
* benefit, at least I (now) think so...
*
- * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -1312,6 +1312,7 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
iQueueSize, iSeverity);
+ STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
@@ -1936,6 +1937,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"),
ctrType_IntCtr, &pThis->ctrFull));
+ STATSCOUNTER_INIT(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.full"),
+ ctrType_IntCtr, &pThis->ctrFDscrd));
+ STATSCOUNTER_INIT(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.nf"),
+ ctrType_IntCtr, &pThis->ctrNFDscrd));
+
pThis->ctrMaxqsize = 0;
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"),
ctrType_Int, &pThis->ctrMaxqsize));
@@ -2289,6 +2297,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
// TODO : handle enqOnly => discard!
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
diff --git a/runtime/queue.h b/runtime/queue.h
index 97057180..06a58229 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -169,6 +169,8 @@ struct queue_s {
statsobj_t *statsobj;
STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued);
STATSCOUNTER_DEF(ctrFull, mutCtrFull);
+ STATSCOUNTER_DEF(ctrFDscrd, mutCtrFDscrd);
+ STATSCOUNTER_DEF(ctrNFDscrd, mutCtrNFDscrd);
int ctrMaxqsize;
};
diff --git a/tcps_sess.c b/tcps_sess.c
index 99af0cb8..0fd3e713 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -259,6 +259,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG
CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP));
MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset);
+ STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit);
if(pMultiSub == NULL) {
CHKiRet(submitMsg(pMsg));
} else {
diff --git a/tcpsrv.c b/tcpsrv.c
index 0b822511..ca9d4192 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -36,8 +36,8 @@
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
-
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
@@ -90,6 +90,7 @@ DEFobjCurrIf(netstrm)
DEFobjCurrIf(nssel)
DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
+DEFobjCurrIf(statsobj)
/* add new listener port to listener port list
@@ -99,6 +100,7 @@ static inline rsRetVal
addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
{
tcpLstnPortList_t *pEntry;
+ uchar statname[64];
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -118,6 +120,15 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
pEntry->pNext = pThis->pLstnPorts;
pThis->pLstnPorts = pEntry;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(pEntry->stats)));
+ snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(pEntry->stats, statname));
+ CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(pEntry->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(pEntry->stats));
+
finalize_it:
RETiRet;
}
@@ -1068,6 +1079,7 @@ CODESTARTObjClassExit(tcpsrv)
objRelease(tcps_sess, DONT_LOAD_LIB);
objRelease(conf, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
@@ -1094,6 +1106,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
CHKiRet(objUse(conf, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
/* set our own handlers */
diff --git a/tcpsrv.h b/tcpsrv.h
index 57bdf4b1..6dc79897 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -25,6 +25,7 @@
#include "obj.h"
#include "prop.h"
#include "tcps_sess.h"
+#include "statsobj.h"
/* support for framing anomalies */
typedef enum ETCPsyslogFramingAnomaly {
@@ -40,6 +41,8 @@ struct tcpLstnPortList_s {
prop_t *pInputName;
tcpsrv_t *pSrv; /**< pointer to higher-level server instance */
ruleset_t *pRuleset; /**< associated ruleset */
+ statsobj_t *stats; /**< associated stats object */
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
tcpLstnPortList_t *pNext; /**< next port or NULL */
};