summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c43
1 files changed, 41 insertions, 2 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 1ee34335..99fb5fbd 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -920,7 +920,7 @@ static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg)
{
DEFiRet;
iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL,
- NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgDeserialize);
+ NULL, msgConstructForDeserializer, NULL, MsgDeserialize);
RETiRet;
}
@@ -1340,7 +1340,7 @@ finalize_it:
}
-/* set default inisde queue object suitable for action queues.
+/* set default inside queue object suitable for action queues.
* This shall be called directly after queue construction. This functions has
* been added in support of the new v6 config system. It expect properly pre-initialized
* objects, but we need to differentiate between ruleset main and action queues.
@@ -1374,6 +1374,36 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
}
+/* set defaults inside queue object suitable for main/ruleset queues.
+ * See queueSetDefaultsActionQueue() for more details and background.
+ */
+void
+qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
+{
+ pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
+ pThis->iMaxQueueSize = 50000; /* size of the main message queue above */
+ pThis->iDeqBatchSize = 1024; /* default batch size */
+ pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */
+ pThis->iDiscardMrk = 49500; /* begin to discard messages */
+ pThis->iDiscardSeverity = 8; /* turn off */
+ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
+ pThis->iMaxFileSize = 16*1024*1024;
+ pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
+ pThis->bSyncQueueFiles = 0;
+ pThis->toQShutdown = 1500; /* queue shutdown */
+ pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
+ pThis->toEnq = 2000; /* timeout for queue enque */
+ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
+ pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */
+ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ pThis->sizeOnDiskMax = 0; /* unlimited */
+ pThis->iDeqSlowdown = 0;
+ pThis->iDeqtWinFromHr = 0;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+}
+
+
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
@@ -2655,6 +2685,15 @@ qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
return RS_RET_OK;
}
+
+/* are any queue params set at all? 1 - yes, 0 - no */
+int
+queueCnfParamsSet(struct cnfparamvals *pvals)
+{
+ return cnfparamvalsIsSet(&pblk, pvals);
+}
+
+
/* apply all params from param block to queue. Must be called before
* finalizing. This supports the v6 config system. Defaults were already
* set during queue creation. The pvals object is destructed by this