From 955312ebf5d59e2ed0d35ba624070439b9fc5273 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 16 Dec 2011 18:51:49 +0100 Subject: experimental: added first stats counter to action object --- action.c | 49 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 6 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index af7723e7..45fa6cf7 100644 --- a/action.c +++ b/action.c @@ -112,6 +112,7 @@ #include "datetime.h" #include "unicode-helper.h" #include "atomic.h" +#include "statsobj.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ @@ -127,6 +128,7 @@ DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) static int iActExecOnceInterval = 0; /* execute action once every nn seconds */ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ @@ -313,18 +315,38 @@ rsRetVal actionConstructFinalize(action_t *pThis) { DEFiRet; - uchar pszQName[64]; /* friendly name of our queue */ + uchar pszAName[64]; /* friendly name of our action */ ASSERT(pThis != NULL); - /* find a name for our queue */ + /* generate a friendly name for us action stats */ if(pThis->pszName == NULL) { - snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr); } else { - ustrncpy(pszQName, pThis->pszName, sizeof(pszQName)); - pszQName[63] = '\0'; /* to be on the save side */ + ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); + pszAName[63] = '\0'; /* to be on the save side */ } + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&pThis->statsobj)); + CHKiRet(statsobj.SetName(pThis->statsobj, pszAName)); + + STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), + ctrType_IntCtr, &pThis->ctrProcessed)); + + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); + + /* create our queue */ + + /* generate a friendly name for the queue */ + if(pThis->pszName == NULL) { + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue", + iActionNbr); + } else { + ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); + pszAName[63] = '\0'; /* to be on the save side */ + } /* now check if we can run the action in "firehose mode" during stage one of * its processing (that is before messages are enqueued into the action q). * This is only possible if some features, which require strict sequence, are @@ -367,7 +389,7 @@ actionConstructFinalize(action_t *pThis) */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); - obj.SetName((obj_t*) pThis->pQueue, pszQName); + obj.SetName((obj_t*) pThis->pQueue, pszAName); /* ... set some properties ... */ # define setQPROP(func, directive, data) \ @@ -1266,6 +1288,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); else @@ -1542,6 +1565,17 @@ finalize_it: RETiRet; } +static inline void +countStatsBatchEnq(action_t *pAction, batch_t *pBatch) +{ + int i; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + } + } +} + /* enqueue a batch in direct mode. We have put this into its own function just to avoid * cluttering the actual submit function. @@ -1585,6 +1619,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); } if(bNeedSubmit) { + countStatsBatchEnq(pAction, pBatch); iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } else { DBGPRINTF("no need to submit batch, all bFilterOK==0\n"); @@ -1599,6 +1634,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) } } } else { + countStatsBatchEnq(pAction, pBatch); iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } @@ -1822,6 +1858,7 @@ rsRetVal actionClassInit(void) CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); -- cgit v1.2.3 From be2383f2b80d4fb83abd2765e6467e8d7614ab49 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 16 Dec 2011 18:54:59 +0100 Subject: fixing memory leak in new stats code (delete statsobj) --- action.c | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'action.c') diff --git a/action.c b/action.c index 45fa6cf7..84b97e5c 100644 --- a/action.c +++ b/action.c @@ -265,6 +265,12 @@ rsRetVal actionDestruct(action_t *pThis) qqueueDestruct(&pThis->pQueue); } + /* destroy stats object, if we have one (may not always be + * be the case, e.g. if turned off) + */ + if(pThis->statsobj != NULL) + statsobj.Destruct(&pThis->statsobj); + if(pThis->pMod != NULL) pThis->pMod->freeInstance(pThis->pModData); -- cgit v1.2.3 From 5fe837bf7dbdcc245ee233feb1fbcc6d052a4898 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Dec 2011 11:16:07 +0100 Subject: added instrumentation --- action.c | 48 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 3 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 278625ce..49eb3663 100644 --- a/action.c +++ b/action.c @@ -112,6 +112,7 @@ #include "datetime.h" #include "unicode-helper.h" #include "atomic.h" +#include "statsobj.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ @@ -127,6 +128,7 @@ DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) static int iActExecOnceInterval = 0; /* execute action once every nn seconds */ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ @@ -263,6 +265,12 @@ rsRetVal actionDestruct(action_t *pThis) qqueueDestruct(&pThis->pQueue); } + /* destroy stats object, if we have one (may not always be + * be the case, e.g. if turned off) + */ + if(pThis->statsobj != NULL) + statsobj.Destruct(&pThis->statsobj); + if(pThis->pMod != NULL) pThis->pMod->freeInstance(pThis->pModData); @@ -313,12 +321,31 @@ rsRetVal actionConstructFinalize(action_t *pThis) { DEFiRet; - uchar pszQName[64]; /* friendly name of our queue */ + uchar pszAName[64]; /* friendly name of our queue */ ASSERT(pThis != NULL); + /* generate a friendly name for us action stats */ + if(pThis->pszName == NULL) { + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr); + } else { + ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); + pszAName[63] = '\0'; /* to be on the save side */ + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&pThis->statsobj)); + CHKiRet(statsobj.SetName(pThis->statsobj, pszAName)); + + STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), + ctrType_IntCtr, &pThis->ctrProcessed)); + + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); + + /* create our queue */ /* find a name for our queue */ - snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue", iActionNbr); /* now check if we can run the action in "firehose mode" during stage one of * its processing (that is before messages are enqueued into the action q). @@ -362,7 +389,7 @@ actionConstructFinalize(action_t *pThis) */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); - obj.SetName((obj_t*) pThis->pQueue, pszQName); + obj.SetName((obj_t*) pThis->pQueue, pszAName); /* ... set some properties ... */ # define setQPROP(func, directive, data) \ @@ -1261,6 +1288,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); else @@ -1537,6 +1565,17 @@ finalize_it: RETiRet; } +static inline void +countStatsBatchEnq(action_t *pAction, batch_t *pBatch) +{ + int i; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(pBatch->pElem[i].bFilterOK) { + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + } + } +} + /* enqueue a batch in direct mode. We have put this into its own function just to avoid * cluttering the actual submit function. @@ -1580,6 +1619,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); } if(bNeedSubmit) { + countStatsBatchEnq(pAction, pBatch); iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } else { DBGPRINTF("no need to submit batch, all bFilterOK==0\n"); @@ -1594,6 +1634,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) } } } else { + countStatsBatchEnq(pAction, pBatch); iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } @@ -1817,6 +1858,7 @@ rsRetVal actionClassInit(void) CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); -- cgit v1.2.3 From 7837c523b8da7ac90c6efbb3de12855978ecaecf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Dec 2011 12:34:19 +0100 Subject: additional counter for lost messages due to failed actions added --- action.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'action.c') diff --git a/action.c b/action.c index 49eb3663..66d7b55c 100644 --- a/action.c +++ b/action.c @@ -341,6 +341,10 @@ actionConstructFinalize(action_t *pThis) CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), ctrType_IntCtr, &pThis->ctrProcessed)); + STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), + ctrType_IntCtr, &pThis->ctrFail)); + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); /* create our queue */ @@ -1099,6 +1103,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) && pBatch->pElem[i].state != BATCH_STATE_COMM ) { pBatch->pElem[i].state = BATCH_STATE_BAD; pBatch->pElem[i].bPrevWasSuspended = 1; + STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); } } bDone = 1; -- cgit v1.2.3 From 32203be41af9c682a4f81c629f08b0593d755f69 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Dec 2011 14:16:32 +0100 Subject: better performance for action stats & cleanup: removing debug prints --- action.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 66d7b55c..7226e7d6 100644 --- a/action.c +++ b/action.c @@ -1617,14 +1617,16 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) pBatch->pElem[i].bFilterOK = 0; bModifiedFilter = 1; } - if(pBatch->pElem[i].bFilterOK) + if(pBatch->pElem[i].bFilterOK) { + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); bNeedSubmit = 1; + } DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); } if(bNeedSubmit) { - countStatsBatchEnq(pAction, pBatch); + /* note: stats were already computed above */ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } else { DBGPRINTF("no need to submit batch, all bFilterOK==0\n"); @@ -1639,7 +1641,8 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) } } } else { - countStatsBatchEnq(pAction, pBatch); + if(GatherStats) + countStatsBatchEnq(pAction, pBatch); iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } -- cgit v1.2.3