From 88fa72fef9279aa20f4e52080131f06af7e9cc53 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 17:33:41 +0100 Subject: queue: warn if high watermark is set too low --- runtime/queue.c | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index e0d60249..56fd9905 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2054,6 +2054,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ uchar pszBuf[64]; uchar pszQIFNam[MAXFNAME]; int wrk; + int goodval; /* a "good value" to use for comparisons (different objects) */ uchar *qName; size_t lenBuf; @@ -2115,6 +2116,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* we need to do a quick check if our water marks are set plausible. If not, * we correct the most important shortcomings. */ + goodval = (pThis->iMaxQueueSize / 100) * 60; + if(pThis->iHighWtrMrk < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": high water mark " + "is set quite low at %d. You should only set it below " + "60%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); + } if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) pThis->iFullDlyMrk = pThis->iMaxQueueSize - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */ -- cgit v1.2.3 From cdce0bde4b3b2f77ecdf8a9ba2e4772fb92ff20d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 17:46:19 +0100 Subject: queue: auto-adjust watermarks --- runtime/queue.c | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 56fd9905..c9d064d6 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1373,8 +1373,8 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */ pThis->iMaxQueueSize = 1000; /* size of the main message queue above */ pThis->iDeqBatchSize = 128; /* default batch size */ - pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */ - pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */ + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ pThis->iDiscardMrk = 980; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ @@ -1403,8 +1403,8 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ pThis->iMaxQueueSize = 50000; /* size of the main message queue above */ pThis->iDeqBatchSize = 1024; /* default batch size */ - pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */ - pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */ + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ pThis->iDiscardMrk = 49500; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ @@ -2117,18 +2117,22 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ * we correct the most important shortcomings. */ goodval = (pThis->iMaxQueueSize / 100) * 60; - if(pThis->iHighWtrMrk < goodval) { + if(pThis->iHighWtrMrk != -1 && pThis->iHighWtrMrk < goodval) { errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": high water mark " "is set quite low at %d. You should only set it below " "60%% (%d) if you have a good reason for this.", obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); } + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) + pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; + if( pThis->iLowWtrMrk < 2 + || pThis->iLowWtrMrk > pThis->iMaxQueueSize + || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) + pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) - pThis->iFullDlyMrk = pThis->iMaxQueueSize - - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */ + pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) - pThis->iLightDlyMrk = pThis->iMaxQueueSize - - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ + pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70; if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) pThis->iDeqBatchSize = pThis->iMaxQueueSize; @@ -2163,7 +2167,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, " - "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d\n" + "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, " "max wrkr %d, min msgs f. wrkr %d\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), -- cgit v1.2.3 From 020fe0dcc4ae9eae05e36dcf54ac043c5ae856ae Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 18:18:10 +0100 Subject: bugfix: legacy directive $ActionQueueWorkerThreads was not honored --- runtime/queue.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index c9d064d6..78a7a79e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2123,6 +2123,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ "60%% (%d) if you have a good reason for this.", obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); } + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; if( pThis->iLowWtrMrk < 2 @@ -2937,6 +2938,7 @@ DEFpropSetMeth(qqueue, iFullDlyMrk, int) DEFpropSetMeth(qqueue, iDiscardSeverity, int) DEFpropSetMeth(qqueue, iLightDlyMrk, int) DEFpropSetMeth(qqueue, bIsDA, int) +DEFpropSetMeth(qqueue, iNumWorkerThreads, int) DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pAction, action_t*) -- cgit v1.2.3 From 98e5663055891ad786406584039d4ded4b2ccc46 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 18:20:18 +0100 Subject: queue.workerThreadMinimumMessage set to queue.size / num workers --- runtime/queue.c | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 78a7a79e..c594af7c 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1385,7 +1385,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ pThis->toEnq = 2000; /* timeout for queue enque */ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ - pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ pThis->sizeOnDiskMax = 0; /* unlimited */ pThis->iDeqSlowdown = 0; @@ -1415,7 +1415,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ pThis->toEnq = 2000; /* timeout for queue enque */ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ - pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ pThis->sizeOnDiskMax = 0; /* unlimited */ pThis->iDeqSlowdown = 0; @@ -2124,12 +2124,28 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); } + if(pThis->iNumWorkerThreads > 1) { + goodval = (pThis->iMaxQueueSize / 100) * 10; + if(pThis->iMinMsgsPerWrkr != -1 && pThis->iMinMsgsPerWrkr < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": " + "queue.workerThreadMinimumMessage " + "is set quite low at %d. You should only set it below " + "10%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iMinMsgsPerWrkr, goodval); + } + } + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; if( pThis->iLowWtrMrk < 2 || pThis->iLowWtrMrk > pThis->iMaxQueueSize || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iNumWorkerThreads > 1) { + if( pThis->iMinMsgsPerWrkr < 1 + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) + pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; + } if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) -- cgit v1.2.3