diff options
-rw-r--r-- | ChangeLog | 6 | ||||
-rw-r--r-- | action.c | 7 | ||||
-rw-r--r-- | runtime/queue.c | 52 | ||||
-rw-r--r-- | runtime/queue.h | 1 |
4 files changed, 52 insertions, 14 deletions
@@ -40,6 +40,11 @@ Version 8.1.0 [devel] 2013-11-15 just explicitely be given. --------------------------------------------------------------------------- Version 7.5.7 [v7-devel] 2013-11-?? +- queue defaults have changed + * high water mark is now dynamically 90% of queue size + * low water makr is now dynamically 70% of queue size + * queue.workerThreadMinimumMessage set to queue.size / num workers +- bugfix: legacy directive $ActionQueueWorkerThreads was not honored - bugfix: segfault on startup when certain script constructs are used e.g. "if not $msg ..." - bugfix: ommysql lost configfile/section parameters after first close @@ -262,6 +267,7 @@ Version 7.5.0 [devel] 2013-06-11 Thanks to Axel Rau for the patch. --------------------------------------------------------------------------- Version 7.4.7 [v7.4-stable] 2013-11-?? +- bugfix: legacy directive $ActionQueueWorkerThreads was not honored - bugfix: segfault on startup when certain script constructs are used e.g. "if not $msg ..." - bugfix: imuxsock: UseSysTimeStamp config parameter did not work correctly @@ -246,8 +246,8 @@ actionResetQueueParams(void) cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ cs.iActionQueueSize = 1000; /* size of the main message queue above */ cs.iActionQueueDeqBatchSize = 16; /* default batch size */ - cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ - cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ + cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */ + cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */ cs.iActionQDiscardMark = 980; /* begin to discard messages */ cs.iActionQDiscardSeverity = 8; /* discard warning and above */ cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ @@ -258,7 +258,7 @@ actionResetQueueParams(void) cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ cs.iActionQtoEnq = 50; /* timeout for queue enque */ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ - cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ + cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ cs.iActionQueMaxDiskSpace = 0; cs.iActionQueueDeqSlowdown = 0; @@ -452,6 +452,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers); setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); diff --git a/runtime/queue.c b/runtime/queue.c index 8fb734e7..63f2db74 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1357,8 +1357,8 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */ pThis->iMaxQueueSize = 1000; /* size of the main message queue above */ pThis->iDeqBatchSize = 128; /* default batch size */ - pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */ - pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */ + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ pThis->iDiscardMrk = 980; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ @@ -1369,7 +1369,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ pThis->toEnq = 2000; /* timeout for queue enque */ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ - pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ pThis->sizeOnDiskMax = 0; /* unlimited */ pThis->iDeqSlowdown = 0; @@ -1387,8 +1387,8 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ pThis->iMaxQueueSize = 50000; /* size of the main message queue above */ pThis->iDeqBatchSize = 1024; /* default batch size */ - pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */ - pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */ + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ pThis->iDiscardMrk = 49500; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ @@ -1399,7 +1399,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ pThis->toEnq = 2000; /* timeout for queue enque */ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ - pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ pThis->sizeOnDiskMax = 0; /* unlimited */ pThis->iDeqSlowdown = 0; @@ -2039,6 +2039,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ uchar pszBuf[64]; uchar pszQIFNam[MAXFNAME]; int wrk; + int goodval; /* a "good value" to use for comparisons (different objects) */ uchar *qName; size_t lenBuf; @@ -2104,12 +2105,40 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* we need to do a quick check if our water marks are set plausible. If not, * we correct the most important shortcomings. */ + goodval = (pThis->iMaxQueueSize / 100) * 60; + if(pThis->iHighWtrMrk != -1 && pThis->iHighWtrMrk < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": high water mark " + "is set quite low at %d. You should only set it below " + "60%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); + } + + if(pThis->iNumWorkerThreads > 1) { + goodval = (pThis->iMaxQueueSize / 100) * 10; + if(pThis->iMinMsgsPerWrkr != -1 && pThis->iMinMsgsPerWrkr < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": " + "queue.workerThreadMinimumMessage " + "is set quite low at %d. You should only set it below " + "10%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iMinMsgsPerWrkr, goodval); + } + } + + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) + pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; + if( pThis->iLowWtrMrk < 2 + || pThis->iLowWtrMrk > pThis->iMaxQueueSize + || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) + pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iNumWorkerThreads > 1) { + if( pThis->iMinMsgsPerWrkr < 1 + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) + pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; + } if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) - pThis->iFullDlyMrk = pThis->iMaxQueueSize - - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */ + pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) - pThis->iLightDlyMrk = pThis->iMaxQueueSize - - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ + pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70; if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) pThis->iDeqBatchSize = pThis->iMaxQueueSize; @@ -2144,7 +2173,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, " - "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d\n" + "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, " "max wrkr %d, min msgs f. wrkr %d\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), @@ -2900,6 +2929,7 @@ DEFpropSetMeth(qqueue, iFullDlyMrk, int) DEFpropSetMeth(qqueue, iDiscardSeverity, int) DEFpropSetMeth(qqueue, iLightDlyMrk, int) DEFpropSetMeth(qqueue, bIsDA, int) +DEFpropSetMeth(qqueue, iNumWorkerThreads, int) DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pAction, action_t*) diff --git a/runtime/queue.h b/runtime/queue.h index 86107d24..468e19bc 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -221,6 +221,7 @@ PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int); PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int); +PROTOTYPEpropSetMeth(qqueue, iNumWorkerThreads, int); PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(qqueue, pAction, action_t*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); |