diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/queue.c | 41 | ||||
-rw-r--r-- | runtime/queue.h | 2 | ||||
-rw-r--r-- | runtime/rsconf.c | 2 | ||||
-rw-r--r-- | runtime/ruleset.c | 25 |
4 files changed, 60 insertions, 10 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 37809238..bb40e540 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1382,7 +1382,7 @@ finalize_it: } -/* set default inisde queue object suitable for action queues. +/* set default inside queue object suitable for action queues. * This shall be called directly after queue construction. This functions has * been added in support of the new v6 config system. It expect properly pre-initialized * objects, but we need to differentiate between ruleset main and action queues. @@ -1416,6 +1416,36 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) } +/* set defaults inside queue object suitable for main/ruleset queues. + * See queueSetDefaultsActionQueue() for more details and background. + */ +void +qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) +{ + pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ + pThis->iMaxQueueSize = 50000; /* size of the main message queue above */ + pThis->iDeqBatchSize = 1024; /* default batch size */ + pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */ + pThis->iDiscardMrk = 49500; /* begin to discard messages */ + pThis->iDiscardSeverity = 8; /* turn off */ + pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ + pThis->iMaxFileSize = 16*1024*1024; + pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */ + pThis->bSyncQueueFiles = 0; + pThis->toQShutdown = 1500; /* queue shutdown */ + pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ + pThis->toEnq = 2000; /* timeout for queue enque */ + pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ + pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */ + pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + pThis->sizeOnDiskMax = 0; /* unlimited */ + pThis->iDeqSlowdown = 0; + pThis->iDeqtWinFromHr = 0; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ +} + + /* This function checks if the provided message shall be discarded and does so, if needed. * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to * provide real-time creation of spool files. @@ -2678,6 +2708,15 @@ qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals) return RS_RET_OK; } + +/* are any queue params set at all? 1 - yes, 0 - no */ +int +queueCnfParamsSet(struct cnfparamvals *pvals) +{ + return cnfparamvalsIsSet(&pblk, pvals); +} + + /* apply all params from param block to queue. Must be called before * finalizing. This supports the v6 config system. Defaults were already * set during queue creation. The pvals object is destructed by this diff --git a/runtime/queue.h b/runtime/queue.h index edb770c6..91c100ed 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -193,7 +193,9 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); +int queueCnfParamsSet(struct cnfparamvals *pvals); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); +void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); void qqueueSetDefaultsActionQueue(qqueue_t *pThis); void qqueueDbgPrint(qqueue_t *pThis); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index 55fdd3bd..ac9cd800 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -758,7 +758,7 @@ activateMainQueue() { DEFiRet; /* create message queue */ - CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) { + CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), NULL)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); FINALIZE; diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 8d2bb924..3459f545 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -844,7 +844,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal) rsname = (conf->rulesets.pCurr->pszName == NULL) ? (uchar*) "[ruleset]" : conf->rulesets.pCurr->pszName; DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname); - CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname)); + CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname, NULL)); finalize_it: RETiRet; @@ -904,6 +904,7 @@ rsRetVal rulesetProcessCnf(struct cnfobj *o) { struct cnfparamvals *pvals; + struct cnfparamvals *queueParams; rsRetVal localRet; uchar *rsName = NULL; uchar *parserName; @@ -911,6 +912,7 @@ rulesetProcessCnf(struct cnfobj *o) ruleset_t *pRuleset; struct cnfarray *ar; int i; + uchar *rsname; DEFiRet; pvals = nvlstGetParams(o->nvlst, &rspblk, NULL); @@ -938,14 +940,21 @@ rulesetProcessCnf(struct cnfobj *o) /* we have only two params, so we do NOT do the usual param loop */ parserIdx = cnfparamGetIdx(&rspblk, "parser"); - if(parserIdx == -1 || !pvals[parserIdx].bUsed) - FINALIZE; + if(parserIdx != -1 && pvals[parserIdx].bUsed) { + ar = pvals[parserIdx].val.d.ar; + for(i = 0 ; i < ar->nmemb ; ++i) { + parserName = (uchar*)es_str2cstr(ar->arr[i], NULL); + doRulesetAddParser(pRuleset, parserName); + free(parserName); + } + } - ar = pvals[parserIdx].val.d.ar; - for(i = 0 ; i < ar->nmemb ; ++i) { - parserName = (uchar*)es_str2cstr(ar->arr[i], NULL); - doRulesetAddParser(pRuleset, parserName); - free(parserName); + /* pick up ruleset queue parameters */ + qqueueDoCnfParams(o->nvlst, &queueParams); + if(queueCnfParamsSet(queueParams)) { + rsname = (pRuleset->pszName == NULL) ? (uchar*) "[ruleset]" : pRuleset->pszName; + DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname); + CHKiRet(createMainQueue(&pRuleset->pQueue, rsname, queueParams)); } finalize_it: |