summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog6
-rw-r--r--action.c7
-rw-r--r--runtime/queue.c52
-rw-r--r--runtime/queue.h1
4 files changed, 52 insertions, 14 deletions
diff --git a/ChangeLog b/ChangeLog
index 06a66bef..0874d44a 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -40,6 +40,11 @@ Version 8.1.0 [devel] 2013-11-15
just explicitely be given.
---------------------------------------------------------------------------
Version 7.5.7 [v7-devel] 2013-11-??
+- queue defaults have changed
+ * high water mark is now dynamically 90% of queue size
+ * low water makr is now dynamically 70% of queue size
+ * queue.workerThreadMinimumMessage set to queue.size / num workers
+- bugfix: legacy directive $ActionQueueWorkerThreads was not honored
- bugfix: segfault on startup when certain script constructs are used
e.g. "if not $msg ..."
- bugfix: ommysql lost configfile/section parameters after first close
@@ -262,6 +267,7 @@ Version 7.5.0 [devel] 2013-06-11
Thanks to Axel Rau for the patch.
---------------------------------------------------------------------------
Version 7.4.7 [v7.4-stable] 2013-11-??
+- bugfix: legacy directive $ActionQueueWorkerThreads was not honored
- bugfix: segfault on startup when certain script constructs are used
e.g. "if not $msg ..."
- bugfix: imuxsock: UseSysTimeStamp config parameter did not work correctly
diff --git a/action.c b/action.c
index 211af544..fde3bcfe 100644
--- a/action.c
+++ b/action.c
@@ -246,8 +246,8 @@ actionResetQueueParams(void)
cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
cs.iActionQueueSize = 1000; /* size of the main message queue above */
cs.iActionQueueDeqBatchSize = 16; /* default batch size */
- cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
- cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
+ cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */
+ cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */
cs.iActionQDiscardMark = 980; /* begin to discard messages */
cs.iActionQDiscardSeverity = 8; /* discard warning and above */
cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
@@ -258,7 +258,7 @@ actionResetQueueParams(void)
cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
cs.iActionQtoEnq = 50; /* timeout for queue enque */
cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
- cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
+ cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */
cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
cs.iActionQueMaxDiskSpace = 0;
cs.iActionQueueDeqSlowdown = 0;
@@ -452,6 +452,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst)
setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark);
setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity);
setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs);
+ setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers);
setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown);
setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown);
setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr);
diff --git a/runtime/queue.c b/runtime/queue.c
index 8fb734e7..63f2db74 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1357,8 +1357,8 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */
pThis->iMaxQueueSize = 1000; /* size of the main message queue above */
pThis->iDeqBatchSize = 128; /* default batch size */
- pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */
- pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */
+ pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
pThis->iDiscardMrk = 980; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
@@ -1369,7 +1369,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
pThis->toEnq = 2000; /* timeout for queue enque */
pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
- pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */
+ pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */
pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
pThis->sizeOnDiskMax = 0; /* unlimited */
pThis->iDeqSlowdown = 0;
@@ -1387,8 +1387,8 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
pThis->iMaxQueueSize = 50000; /* size of the main message queue above */
pThis->iDeqBatchSize = 1024; /* default batch size */
- pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */
- pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */
+ pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
pThis->iDiscardMrk = 49500; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
@@ -1399,7 +1399,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
pThis->toEnq = 2000; /* timeout for queue enque */
pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
- pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */
+ pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */
pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
pThis->sizeOnDiskMax = 0; /* unlimited */
pThis->iDeqSlowdown = 0;
@@ -2039,6 +2039,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
uchar pszBuf[64];
uchar pszQIFNam[MAXFNAME];
int wrk;
+ int goodval; /* a "good value" to use for comparisons (different objects) */
uchar *qName;
size_t lenBuf;
@@ -2104,12 +2105,40 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* we need to do a quick check if our water marks are set plausible. If not,
* we correct the most important shortcomings.
*/
+ goodval = (pThis->iMaxQueueSize / 100) * 60;
+ if(pThis->iHighWtrMrk != -1 && pThis->iHighWtrMrk < goodval) {
+ errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": high water mark "
+ "is set quite low at %d. You should only set it below "
+ "60%% (%d) if you have a good reason for this.",
+ obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval);
+ }
+
+ if(pThis->iNumWorkerThreads > 1) {
+ goodval = (pThis->iMaxQueueSize / 100) * 10;
+ if(pThis->iMinMsgsPerWrkr != -1 && pThis->iMinMsgsPerWrkr < goodval) {
+ errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": "
+ "queue.workerThreadMinimumMessage "
+ "is set quite low at %d. You should only set it below "
+ "10%% (%d) if you have a good reason for this.",
+ obj.GetName((obj_t*) pThis), pThis->iMinMsgsPerWrkr, goodval);
+ }
+ }
+
+ if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize)
+ pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90;
+ if( pThis->iLowWtrMrk < 2
+ || pThis->iLowWtrMrk > pThis->iMaxQueueSize
+ || pThis->iLowWtrMrk > pThis->iHighWtrMrk )
+ pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70;
+ if(pThis->iNumWorkerThreads > 1) {
+ if( pThis->iMinMsgsPerWrkr < 1
+ || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize )
+ pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads;
+ }
if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize)
- pThis->iFullDlyMrk = pThis->iMaxQueueSize
- - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */
+ pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97;
if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize)
- pThis->iLightDlyMrk = pThis->iMaxQueueSize
- - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */
+ pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70;
if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize)
pThis->iDeqBatchSize = pThis->iMaxQueueSize;
@@ -2144,7 +2173,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, "
- "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d\n"
+ "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, "
"max wrkr %d, min msgs f. wrkr %d\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
@@ -2900,6 +2929,7 @@ DEFpropSetMeth(qqueue, iFullDlyMrk, int)
DEFpropSetMeth(qqueue, iDiscardSeverity, int)
DEFpropSetMeth(qqueue, iLightDlyMrk, int)
DEFpropSetMeth(qqueue, bIsDA, int)
+DEFpropSetMeth(qqueue, iNumWorkerThreads, int)
DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
DEFpropSetMeth(qqueue, pAction, action_t*)
diff --git a/runtime/queue.h b/runtime/queue.h
index 86107d24..468e19bc 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -221,6 +221,7 @@ PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int);
PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int);
PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int);
PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int);
+PROTOTYPEpropSetMeth(qqueue, iNumWorkerThreads, int);
PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
PROTOTYPEpropSetMeth(qqueue, pAction, action_t*);
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);