summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/queue.c41
-rw-r--r--runtime/queue.h2
-rw-r--r--runtime/rsconf.c2
-rw-r--r--runtime/ruleset.c25
4 files changed, 60 insertions, 10 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 37809238..bb40e540 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1382,7 +1382,7 @@ finalize_it:
}
-/* set default inisde queue object suitable for action queues.
+/* set default inside queue object suitable for action queues.
* This shall be called directly after queue construction. This functions has
* been added in support of the new v6 config system. It expect properly pre-initialized
* objects, but we need to differentiate between ruleset main and action queues.
@@ -1416,6 +1416,36 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
}
+/* set defaults inside queue object suitable for main/ruleset queues.
+ * See queueSetDefaultsActionQueue() for more details and background.
+ */
+void
+qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
+{
+ pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
+ pThis->iMaxQueueSize = 50000; /* size of the main message queue above */
+ pThis->iDeqBatchSize = 1024; /* default batch size */
+ pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */
+ pThis->iDiscardMrk = 49500; /* begin to discard messages */
+ pThis->iDiscardSeverity = 8; /* turn off */
+ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
+ pThis->iMaxFileSize = 16*1024*1024;
+ pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
+ pThis->bSyncQueueFiles = 0;
+ pThis->toQShutdown = 1500; /* queue shutdown */
+ pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
+ pThis->toEnq = 2000; /* timeout for queue enque */
+ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
+ pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */
+ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ pThis->sizeOnDiskMax = 0; /* unlimited */
+ pThis->iDeqSlowdown = 0;
+ pThis->iDeqtWinFromHr = 0;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+}
+
+
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
@@ -2678,6 +2708,15 @@ qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
return RS_RET_OK;
}
+
+/* are any queue params set at all? 1 - yes, 0 - no */
+int
+queueCnfParamsSet(struct cnfparamvals *pvals)
+{
+ return cnfparamvalsIsSet(&pblk, pvals);
+}
+
+
/* apply all params from param block to queue. Must be called before
* finalizing. This supports the v6 config system. Defaults were already
* set during queue creation. The pvals object is destructed by this
diff --git a/runtime/queue.h b/runtime/queue.h
index edb770c6..91c100ed 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -193,7 +193,9 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
+int queueCnfParamsSet(struct cnfparamvals *pvals);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
+void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
void qqueueSetDefaultsActionQueue(qqueue_t *pThis);
void qqueueDbgPrint(qqueue_t *pThis);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index 55fdd3bd..ac9cd800 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -758,7 +758,7 @@ activateMainQueue()
{
DEFiRet;
/* create message queue */
- CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) {
+ CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), NULL)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
FINALIZE;
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 8d2bb924..3459f545 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -844,7 +844,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal)
rsname = (conf->rulesets.pCurr->pszName == NULL) ? (uchar*) "[ruleset]" : conf->rulesets.pCurr->pszName;
DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname);
- CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname));
+ CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname, NULL));
finalize_it:
RETiRet;
@@ -904,6 +904,7 @@ rsRetVal
rulesetProcessCnf(struct cnfobj *o)
{
struct cnfparamvals *pvals;
+ struct cnfparamvals *queueParams;
rsRetVal localRet;
uchar *rsName = NULL;
uchar *parserName;
@@ -911,6 +912,7 @@ rulesetProcessCnf(struct cnfobj *o)
ruleset_t *pRuleset;
struct cnfarray *ar;
int i;
+ uchar *rsname;
DEFiRet;
pvals = nvlstGetParams(o->nvlst, &rspblk, NULL);
@@ -938,14 +940,21 @@ rulesetProcessCnf(struct cnfobj *o)
/* we have only two params, so we do NOT do the usual param loop */
parserIdx = cnfparamGetIdx(&rspblk, "parser");
- if(parserIdx == -1 || !pvals[parserIdx].bUsed)
- FINALIZE;
+ if(parserIdx != -1 && pvals[parserIdx].bUsed) {
+ ar = pvals[parserIdx].val.d.ar;
+ for(i = 0 ; i < ar->nmemb ; ++i) {
+ parserName = (uchar*)es_str2cstr(ar->arr[i], NULL);
+ doRulesetAddParser(pRuleset, parserName);
+ free(parserName);
+ }
+ }
- ar = pvals[parserIdx].val.d.ar;
- for(i = 0 ; i < ar->nmemb ; ++i) {
- parserName = (uchar*)es_str2cstr(ar->arr[i], NULL);
- doRulesetAddParser(pRuleset, parserName);
- free(parserName);
+ /* pick up ruleset queue parameters */
+ qqueueDoCnfParams(o->nvlst, &queueParams);
+ if(queueCnfParamsSet(queueParams)) {
+ rsname = (pRuleset->pszName == NULL) ? (uchar*) "[ruleset]" : pRuleset->pszName;
+ DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname);
+ CHKiRet(createMainQueue(&pRuleset->pQueue, rsname, queueParams));
}
finalize_it: