From e86cb62f1299ef18732f7b3b87d45a840ee38f1e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 13 Sep 2010 15:43:56 +0200 Subject: improved statistics-gathering subsystem ... well, actually this is a first real implementation of this subsystem. I have added a counter registry, a way to access the countres (as readable string) and a way to define and maintem them. Also, module impstats has been updated to utilize the new system. Finally, I added some counters. I hope that this sets the baseline for useful future enhancements. --- runtime/queue.c | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 60d17086..387c15c2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -58,6 +58,7 @@ #include "errmsg.h" #include "datetime.h" #include "unicode-helper.h" +#include "statsobj.h" #include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS @@ -70,6 +71,7 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(strm) DEFobjCurrIf(errmsg) DEFobjCurrIf(datetime) +DEFobjCurrIf(statsobj) /* forward-definitions */ static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); @@ -1817,6 +1819,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; uchar pszBuf[64]; + uchar *qName; size_t lenBuf; ASSERT(pThis != NULL); @@ -1886,6 +1889,27 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ qqueueAdviseMaxWorkers(pThis); pThis->bQueueStarted = 1; + /* support statistics gathering */ + qName = obj.GetName((obj_t*)pThis); + CHKiRet(statsobj.Construct(&pThis->statsobj)); + CHKiRet(statsobj.SetName(pThis->statsobj, qName)); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("size"), + ctrType_Int, &pThis->iQueueSize)); + + STATSCOUNTER_INIT(pThis->ctrEnqueued, pThis->mutCtrEnqueued); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("enqueued"), + ctrType_IntCtr, &pThis->ctrEnqueued)); + + STATSCOUNTER_INIT(pThis->ctrFull, pThis->mutCtrFull); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"), + ctrType_IntCtr, &pThis->ctrFull)); + + pThis->ctrMaxqsize = 0; + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"), + ctrType_Int, &pThis->ctrMaxqsize)); + + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); + finalize_it: RETiRet; } @@ -2119,6 +2143,10 @@ CODESTARTobjDestruct(qqueue) free(pThis->pszFilePrefix); free(pThis->pszSpoolDir); + + /* some queues do not provide stats and thus have no statsobj! */ + if(pThis->statsobj != NULL) + statsobj.Destruct(&pThis->statsobj); ENDobjDestruct(qqueue) @@ -2178,6 +2206,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) DEFiRet; struct timespec t; + STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); @@ -2225,6 +2254,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); + STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); // TODO : handle enqOnly => discard! if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); @@ -2235,6 +2265,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* and finally enqueue the message */ CHKiRet(qqueueAdd(pThis, pUsr)); + STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); finalize_it: RETiRet; @@ -2414,6 +2445,7 @@ BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) CHKiRet(objUse(strm, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); /* now set our own handlers */ OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); -- cgit v1.2.3 From 121f5ab4ec4e766aeae5671005325a6aef4a1806 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 11 Feb 2011 14:56:02 +0100 Subject: bugfix: queue engine did not properly slow down inputs in FULL_DELAY mode... ...when in disk-assisted mode. This especially affected imfile, which created unnecessarily queue files if a large set of input file data was to process. --- runtime/queue.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index e4922f37..cc7c0f54 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -246,6 +246,7 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) if(!pThis->bEnqOnly) { if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { + DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n"); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } else { if(getLogicalQueueSize(pThis) == 0) { @@ -1211,7 +1212,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread /* set some water marks so that we have useful defaults if none are set specifically */ pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */ pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */ - pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; @@ -1819,6 +1819,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; uchar pszBuf[64]; + int wrk; uchar *qName; size_t lenBuf; @@ -1850,6 +1851,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ + /* re-adjust some params if required */ + if(pThis->bIsDA) { + /* if we are in DA mode, we must make sure full delayable messages do not + * initiate going to disk! + */ + wrk = pThis->iHighWtrMrk - (pThis->iHighWtrMrk / 100) * 50; /* 50% of high water mark */ + if(wrk < pThis->iFullDlyMrk) + pThis->iFullDlyMrk = wrk; + } + DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, " "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, -- cgit v1.2.3 From 6a18d25cbec2676a7910ff038170716293abe89f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 11 Feb 2011 17:06:20 +0100 Subject: removed no longer needed code --- runtime/queue.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index cc7c0f54..9f63a338 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1842,7 +1842,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } pthread_mutex_init(&pThis->mutThrdMgmt, NULL); - pthread_cond_init (&pThis->condDAReady, NULL); pthread_cond_init (&pThis->notFull, NULL); pthread_cond_init (&pThis->notEmpty, NULL); pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); @@ -2140,7 +2139,6 @@ CODESTARTobjDestruct(qqueue) free(pThis->mut); } pthread_mutex_destroy(&pThis->mutThrdMgmt); - pthread_cond_destroy(&pThis->condDAReady); pthread_cond_destroy(&pThis->notFull); pthread_cond_destroy(&pThis->notEmpty); pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); -- cgit v1.2.3 From f3d354da3e373f9c4890a78e5274a6ba02f1c8cb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 11 Feb 2011 17:47:30 +0100 Subject: bugfix: very long running actions could prevent shutdown under some circumstances This has now been solved, at least for common situations. --- runtime/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 9f63a338..76327f6a 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1497,7 +1497,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) * now that we dequeue batches of pointers, this is much less an issue... * rgerhards, 2009-04-22 */ - if(iQueueSize < pThis->iFullDlyMrk / 2) { + if(iQueueSize < pThis->iFullDlyMrk / 2 || glbl.GetGlobalInputTermState() == 1) { pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); } -- cgit v1.2.3 From 00e1f24187ee814801e6969629b82a7ae030beaf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 30 Mar 2011 12:13:14 +0200 Subject: stop adding data to DA queue when low watermark has been reached potentially closes: http://bugzilla.adiscon.com/show_bug.cgi?id=241 But needs more verification. --- runtime/queue.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index ef6e843b..50ae307c 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1774,9 +1774,13 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) { DEFiRet; +//DBGPRINTF("XXXX: chkStopWrkrDA called, low watermark %d, phys Size %d\n", pThis->iLowWtrMrk, getPhysicalQueueSize(pThis)); if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; } + if(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk) { + iRet = RS_RET_TERMINATE_NOW; + } RETiRet; } -- cgit v1.2.3