From 88fa72fef9279aa20f4e52080131f06af7e9cc53 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 17:33:41 +0100 Subject: queue: warn if high watermark is set too low --- runtime/queue.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/runtime/queue.c b/runtime/queue.c index e0d60249..56fd9905 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2054,6 +2054,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ uchar pszBuf[64]; uchar pszQIFNam[MAXFNAME]; int wrk; + int goodval; /* a "good value" to use for comparisons (different objects) */ uchar *qName; size_t lenBuf; @@ -2115,6 +2116,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* we need to do a quick check if our water marks are set plausible. If not, * we correct the most important shortcomings. */ + goodval = (pThis->iMaxQueueSize / 100) * 60; + if(pThis->iHighWtrMrk < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": high water mark " + "is set quite low at %d. You should only set it below " + "60%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); + } if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) pThis->iFullDlyMrk = pThis->iMaxQueueSize - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */ -- cgit v1.2.3 From cdce0bde4b3b2f77ecdf8a9ba2e4772fb92ff20d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 17:46:19 +0100 Subject: queue: auto-adjust watermarks --- ChangeLog | 3 +++ action.c | 4 ++-- runtime/queue.c | 24 ++++++++++++++---------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/ChangeLog b/ChangeLog index a6daee5f..490af41e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,8 @@ --------------------------------------------------------------------------- Version 7.5.7 [v7-devel] 2013-11-?? +- queue defaults have changed + * high water mark is now dynamically 90% of queue size + * low water makr is now dynamically 70% of queue size - bugfix: segfault on startup when certain script constructs are used e.g. "if not $msg ..." - bugfix: ommysql lost configfile/section parameters after first close diff --git a/action.c b/action.c index d26199c5..b31eedb7 100644 --- a/action.c +++ b/action.c @@ -254,8 +254,8 @@ actionResetQueueParams(void) cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ cs.iActionQueueSize = 1000; /* size of the main message queue above */ cs.iActionQueueDeqBatchSize = 16; /* default batch size */ - cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ - cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ + cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */ + cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */ cs.iActionQDiscardMark = 980; /* begin to discard messages */ cs.iActionQDiscardSeverity = 8; /* discard warning and above */ cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ diff --git a/runtime/queue.c b/runtime/queue.c index 56fd9905..c9d064d6 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1373,8 +1373,8 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */ pThis->iMaxQueueSize = 1000; /* size of the main message queue above */ pThis->iDeqBatchSize = 128; /* default batch size */ - pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */ - pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */ + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ pThis->iDiscardMrk = 980; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ @@ -1403,8 +1403,8 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ pThis->iMaxQueueSize = 50000; /* size of the main message queue above */ pThis->iDeqBatchSize = 1024; /* default batch size */ - pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */ - pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */ + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ pThis->iDiscardMrk = 49500; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ @@ -2117,18 +2117,22 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ * we correct the most important shortcomings. */ goodval = (pThis->iMaxQueueSize / 100) * 60; - if(pThis->iHighWtrMrk < goodval) { + if(pThis->iHighWtrMrk != -1 && pThis->iHighWtrMrk < goodval) { errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": high water mark " "is set quite low at %d. You should only set it below " "60%% (%d) if you have a good reason for this.", obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); } + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) + pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; + if( pThis->iLowWtrMrk < 2 + || pThis->iLowWtrMrk > pThis->iMaxQueueSize + || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) + pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) - pThis->iFullDlyMrk = pThis->iMaxQueueSize - - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */ + pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) - pThis->iLightDlyMrk = pThis->iMaxQueueSize - - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ + pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70; if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) pThis->iDeqBatchSize = pThis->iMaxQueueSize; @@ -2163,7 +2167,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, " - "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d\n" + "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, " "max wrkr %d, min msgs f. wrkr %d\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), -- cgit v1.2.3 From 020fe0dcc4ae9eae05e36dcf54ac043c5ae856ae Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 18:18:10 +0100 Subject: bugfix: legacy directive $ActionQueueWorkerThreads was not honored --- ChangeLog | 1 + action.c | 1 + runtime/queue.c | 2 ++ runtime/queue.h | 1 + 4 files changed, 5 insertions(+) diff --git a/ChangeLog b/ChangeLog index 490af41e..f937f5a8 100644 --- a/ChangeLog +++ b/ChangeLog @@ -3,6 +3,7 @@ Version 7.5.7 [v7-devel] 2013-11-?? - queue defaults have changed * high water mark is now dynamically 90% of queue size * low water makr is now dynamically 70% of queue size +- bugfix: legacy directive $ActionQueueWorkerThreads was not honored - bugfix: segfault on startup when certain script constructs are used e.g. "if not $msg ..." - bugfix: ommysql lost configfile/section parameters after first close diff --git a/action.c b/action.c index b31eedb7..9653eedc 100644 --- a/action.c +++ b/action.c @@ -460,6 +460,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers); setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); diff --git a/runtime/queue.c b/runtime/queue.c index c9d064d6..78a7a79e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2123,6 +2123,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ "60%% (%d) if you have a good reason for this.", obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); } + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; if( pThis->iLowWtrMrk < 2 @@ -2937,6 +2938,7 @@ DEFpropSetMeth(qqueue, iFullDlyMrk, int) DEFpropSetMeth(qqueue, iDiscardSeverity, int) DEFpropSetMeth(qqueue, iLightDlyMrk, int) DEFpropSetMeth(qqueue, bIsDA, int) +DEFpropSetMeth(qqueue, iNumWorkerThreads, int) DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pAction, action_t*) diff --git a/runtime/queue.h b/runtime/queue.h index 1918e502..19ea735a 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -224,6 +224,7 @@ PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int); PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int); +PROTOTYPEpropSetMeth(qqueue, iNumWorkerThreads, int); PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(qqueue, pAction, action_t*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); -- cgit v1.2.3 From 98e5663055891ad786406584039d4ded4b2ccc46 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 18:20:18 +0100 Subject: queue.workerThreadMinimumMessage set to queue.size / num workers --- ChangeLog | 1 + action.c | 2 +- runtime/queue.c | 20 ++++++++++++++++++-- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/ChangeLog b/ChangeLog index f937f5a8..b857b5f1 100644 --- a/ChangeLog +++ b/ChangeLog @@ -3,6 +3,7 @@ Version 7.5.7 [v7-devel] 2013-11-?? - queue defaults have changed * high water mark is now dynamically 90% of queue size * low water makr is now dynamically 70% of queue size + * queue.workerThreadMinimumMessage set to queue.size / num workers - bugfix: legacy directive $ActionQueueWorkerThreads was not honored - bugfix: segfault on startup when certain script constructs are used e.g. "if not $msg ..." diff --git a/action.c b/action.c index 9653eedc..d46e1af8 100644 --- a/action.c +++ b/action.c @@ -266,7 +266,7 @@ actionResetQueueParams(void) cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ cs.iActionQtoEnq = 50; /* timeout for queue enque */ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ - cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ + cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ cs.iActionQueMaxDiskSpace = 0; cs.iActionQueueDeqSlowdown = 0; diff --git a/runtime/queue.c b/runtime/queue.c index 78a7a79e..c594af7c 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1385,7 +1385,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ pThis->toEnq = 2000; /* timeout for queue enque */ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ - pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ pThis->sizeOnDiskMax = 0; /* unlimited */ pThis->iDeqSlowdown = 0; @@ -1415,7 +1415,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ pThis->toEnq = 2000; /* timeout for queue enque */ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ - pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ pThis->sizeOnDiskMax = 0; /* unlimited */ pThis->iDeqSlowdown = 0; @@ -2124,12 +2124,28 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); } + if(pThis->iNumWorkerThreads > 1) { + goodval = (pThis->iMaxQueueSize / 100) * 10; + if(pThis->iMinMsgsPerWrkr != -1 && pThis->iMinMsgsPerWrkr < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": " + "queue.workerThreadMinimumMessage " + "is set quite low at %d. You should only set it below " + "10%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iMinMsgsPerWrkr, goodval); + } + } + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; if( pThis->iLowWtrMrk < 2 || pThis->iLowWtrMrk > pThis->iMaxQueueSize || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iNumWorkerThreads > 1) { + if( pThis->iMinMsgsPerWrkr < 1 + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) + pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; + } if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) -- cgit v1.2.3 From 3ed275481f3fa3da7d9d6e1c113a696ab9280b74 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 Nov 2013 18:18:10 +0100 Subject: bugfix: legacy directive $ActionQueueWorkerThreads was not honored Conflicts: ChangeLog runtime/queue.c --- ChangeLog | 1 + action.c | 1 + runtime/queue.c | 1 + runtime/queue.h | 1 + 4 files changed, 4 insertions(+) diff --git a/ChangeLog b/ChangeLog index dd353683..bd4a0b39 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,6 @@ --------------------------------------------------------------------------- Version 7.4.7 [v7.4-stable] 2013-11-?? +- bugfix: legacy directive $ActionQueueWorkerThreads was not honored - bugfix: segfault on startup when certain script constructs are used e.g. "if not $msg ..." - bugfix: imuxsock: UseSysTimeStamp config parameter did not work correctly diff --git a/action.c b/action.c index 6b52d708..39611594 100644 --- a/action.c +++ b/action.c @@ -459,6 +459,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers); setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); diff --git a/runtime/queue.c b/runtime/queue.c index 073c6823..6098eeee 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2835,6 +2835,7 @@ DEFpropSetMeth(qqueue, iFullDlyMrk, int) DEFpropSetMeth(qqueue, iDiscardSeverity, int) DEFpropSetMeth(qqueue, iLightDlyMrk, int) DEFpropSetMeth(qqueue, bIsDA, int) +DEFpropSetMeth(qqueue, iNumWorkerThreads, int) DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pAction, action_t*) diff --git a/runtime/queue.h b/runtime/queue.h index 886fac8d..79771081 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -219,6 +219,7 @@ PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int); PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int); +PROTOTYPEpropSetMeth(qqueue, iNumWorkerThreads, int); PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(qqueue, pAction, action_t*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); -- cgit v1.2.3