summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c52
1 files changed, 41 insertions, 11 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 8fb734e7..63f2db74 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1357,8 +1357,8 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */
pThis->iMaxQueueSize = 1000; /* size of the main message queue above */
pThis->iDeqBatchSize = 128; /* default batch size */
- pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */
- pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */
+ pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
pThis->iDiscardMrk = 980; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
@@ -1369,7 +1369,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
pThis->toEnq = 2000; /* timeout for queue enque */
pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
- pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */
+ pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */
pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
pThis->sizeOnDiskMax = 0; /* unlimited */
pThis->iDeqSlowdown = 0;
@@ -1387,8 +1387,8 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
pThis->iMaxQueueSize = 50000; /* size of the main message queue above */
pThis->iDeqBatchSize = 1024; /* default batch size */
- pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */
- pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */
+ pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
pThis->iDiscardMrk = 49500; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
@@ -1399,7 +1399,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
pThis->toEnq = 2000; /* timeout for queue enque */
pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
- pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */
+ pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */
pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
pThis->sizeOnDiskMax = 0; /* unlimited */
pThis->iDeqSlowdown = 0;
@@ -2039,6 +2039,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
uchar pszBuf[64];
uchar pszQIFNam[MAXFNAME];
int wrk;
+ int goodval; /* a "good value" to use for comparisons (different objects) */
uchar *qName;
size_t lenBuf;
@@ -2104,12 +2105,40 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* we need to do a quick check if our water marks are set plausible. If not,
* we correct the most important shortcomings.
*/
+ goodval = (pThis->iMaxQueueSize / 100) * 60;
+ if(pThis->iHighWtrMrk != -1 && pThis->iHighWtrMrk < goodval) {
+ errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": high water mark "
+ "is set quite low at %d. You should only set it below "
+ "60%% (%d) if you have a good reason for this.",
+ obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval);
+ }
+
+ if(pThis->iNumWorkerThreads > 1) {
+ goodval = (pThis->iMaxQueueSize / 100) * 10;
+ if(pThis->iMinMsgsPerWrkr != -1 && pThis->iMinMsgsPerWrkr < goodval) {
+ errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": "
+ "queue.workerThreadMinimumMessage "
+ "is set quite low at %d. You should only set it below "
+ "10%% (%d) if you have a good reason for this.",
+ obj.GetName((obj_t*) pThis), pThis->iMinMsgsPerWrkr, goodval);
+ }
+ }
+
+ if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize)
+ pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90;
+ if( pThis->iLowWtrMrk < 2
+ || pThis->iLowWtrMrk > pThis->iMaxQueueSize
+ || pThis->iLowWtrMrk > pThis->iHighWtrMrk )
+ pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70;
+ if(pThis->iNumWorkerThreads > 1) {
+ if( pThis->iMinMsgsPerWrkr < 1
+ || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize )
+ pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads;
+ }
if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize)
- pThis->iFullDlyMrk = pThis->iMaxQueueSize
- - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */
+ pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97;
if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize)
- pThis->iLightDlyMrk = pThis->iMaxQueueSize
- - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */
+ pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70;
if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize)
pThis->iDeqBatchSize = pThis->iMaxQueueSize;
@@ -2144,7 +2173,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, "
- "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d\n"
+ "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, "
"max wrkr %d, min msgs f. wrkr %d\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
@@ -2900,6 +2929,7 @@ DEFpropSetMeth(qqueue, iFullDlyMrk, int)
DEFpropSetMeth(qqueue, iDiscardSeverity, int)
DEFpropSetMeth(qqueue, iLightDlyMrk, int)
DEFpropSetMeth(qqueue, bIsDA, int)
+DEFpropSetMeth(qqueue, iNumWorkerThreads, int)
DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
DEFpropSetMeth(qqueue, pAction, action_t*)