From 1ada506e2d90377c2475e103340d8986bf8847f9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 17:31:27 +0100 Subject: added the capability to have ruleset-specific main message queues This offers considerable additional flexibility AND superior performance (in cases where multiple inputs now can avoid lock contention) --- ChangeLog | 3 + dirty.h | 2 + doc/Makefile.am | 1 + doc/imtcp.html | 2 +- doc/multi_ruleset.html | 26 ++++++- doc/rsconf1_rulesetcreatemainqueue.html | 83 +++++++++++++++++++++ doc/rsyslog_conf_global.html | 2 + runtime/msg.h | 10 +++ runtime/queue.h | 4 +- runtime/rsyslog.h | 3 + runtime/ruleset.c | 54 ++++++++++++++ runtime/ruleset.h | 5 +- tools/syslogd.c | 125 ++++++++++++++++++-------------- 13 files changed, 260 insertions(+), 60 deletions(-) create mode 100644 doc/rsconf1_rulesetcreatemainqueue.html diff --git a/ChangeLog b/ChangeLog index 8731d442..c8afe0c9 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,8 @@ --------------------------------------------------------------------------- Version 5.3.4 [DEVEL] (rgerhards), 2009-10-?? +- added the capability to have ruleset-specific main message queues + This offers considerable additional flexibility AND superior performance + (in cases where multiple inputs now can avoid lock contention) --------------------------------------------------------------------------- Version 5.3.3 [DEVEL] (rgerhards), 2009-10-27 - simplified and thus speeded up the queue engine, also fixed some diff --git a/dirty.h b/dirty.h index 0153cb69..79805b60 100644 --- a/dirty.h +++ b/dirty.h @@ -35,6 +35,7 @@ int parseRFCSyslogMsg(msg_t *pMsg, int flags); int parseLegacySyslogMsg(msg_t *pMsg, int flags); rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ char* getFIOPName(unsigned iFIOP); +rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName); /* Intervals at which we flush out "message repeated" messages, * in seconds after previous message is logged. After each flush, @@ -46,6 +47,7 @@ extern int iActExecOnceInterval; extern int MarkInterval; extern int repeatinterval[2]; extern int bReduceRepeatMsgs; +extern qqueue_t *pMsgQueue; /* the main message queue */ #define MAXREPEAT ((int)((sizeof(repeatinterval) / sizeof(repeatinterval[0])) - 1)) #define REPEATTIME(f) ((f)->f_time + repeatinterval[(f)->f_repeatcount]) #define BACKOFF(f) { if (++(f)->f_repeatcount > MAXREPEAT) \ diff --git a/doc/Makefile.am b/doc/Makefile.am index a447cddf..285ba600 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -90,6 +90,7 @@ html_files = \ rsconf1_moddir.html \ rsconf1_repeatedmsgreduction.html \ rsconf1_resetconfigvariables.html \ + rsconf1_rulesetcreatemainqueue.html \ rsconf1_umask.html \ v3compatibility.html \ v4compatibility.html \ diff --git a/doc/imtcp.html b/doc/imtcp.html index 0671d6d5..434b3903 100644 --- a/doc/imtcp.html +++ b/doc/imtcp.html @@ -73,7 +73,7 @@ Binds the listener to a specific ruleset.
  • can not be loaded together with imgssapi (which includes the functionality of imtcp)
  • -

    Sample:

    +

    Example:

    This sets up a TCP server on port 514 and permits it to accept up to 500 connections:

    +

    Note the positions of the directives. With the current config language, +position is very important. This is ugly, but unfortunately the way it currently +works. +

    + +

    [rsyslog.conf overview] [manual +index] [rsyslog site]

    +

    This documentation is part of the +rsyslog project.
    +Copyright © 2009 by Rainer Gerhards and +Adiscon. Released under the GNU GPL version 2 or higher.

    + + diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html index 885dbdc7..1bf02a55 100644 --- a/doc/rsyslog_conf_global.html +++ b/doc/rsyslog_conf_global.html @@ -253,6 +253,8 @@ the name does not yet exist, it is created. To swith back to rsyslog's default ruleset, specify "RSYSLOG_DefaultRuleset") as the name. All following actions belong to that new rule set. It is advised to also read our paper on using multiple rule sets in rsyslog. +
  • $RulesetCreateMainQueue on - creates +a ruleset-specific main queue.
  • $OptimizeForUniprocessor [on/off] - turns on optimizatons which lead to better performance on uniprocessors. If you run on multicore-machiens, turning this off lessens CPU load. The default may change as uniprocessor systems become less common. [available since 4.1.0]
  • diff --git a/runtime/msg.h b/runtime/msg.h index f7d74597..9101cef7 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -208,6 +208,16 @@ MsgSetRawMsgSize(msg_t *pMsg, size_t newLen) } +/* get the ruleset that is associated with the ruleset. + * May be NULL. -- rgerhards, 2009-10-27 + */ +static inline ruleset_t* +MsgGetRuleset(msg_t *pMsg) +{ + return pMsg->pRuleset; +} + + #endif /* #ifndef MSG_H_INCLUDED */ /* vim:set ai: */ diff --git a/runtime/queue.h b/runtime/queue.h index 26c57a50..93573dae 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -55,7 +55,7 @@ typedef struct qLinkedList_S { /* the queue object */ -typedef struct queue_s { +struct queue_s { BEGINobjInstance; queueType_t qType; int nLogDeq; /* number of elements currently logically dequeued */ @@ -160,7 +160,7 @@ typedef struct queue_s { strm_t *pReadDel; /* current file for deleting */ } disk; } tVars; -} qqueue_t; +}; /* the define below is an "eternal" timeout for the timeout settings which require a value. diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index bd1c936e..7c966276 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -107,6 +107,7 @@ typedef struct wti_s wti_t; typedef obj_t nsd_t; typedef obj_t nsdsel_t; typedef struct msg msg_t; +typedef struct queue_s qqueue_t; typedef struct prop_s prop_t; typedef struct interface_s interface_t; typedef struct objInfo_s objInfo_t; @@ -387,6 +388,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_HOST_NOT_SPECIFIED = -2151, /**< (target) host was not specified where it was needed */ RS_RET_ERR_LIBNET_INIT = -2152, /**< error initializing libnet */ RS_RET_FORCE_TERM = -2153, /**< thread was forced to terminate by bShallShutdown, a state, not an error */ + RS_RET_RULES_QUEUE_EXISTS = -2154,/**< we were instructed to create a new ruleset queue, but one already exists */ + RS_RET_NO_CURR_RULESET = -2155,/**< no current ruleset exists (but one is required) */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ diff --git a/runtime/ruleset.c b/runtime/ruleset.c index d3de672e..3592aafd 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -40,11 +40,13 @@ #include "rsyslog.h" #include "obj.h" +#include "cfsysline.h" #include "msg.h" #include "ruleset.h" #include "rule.h" #include "errmsg.h" #include "unicode-helper.h" +#include "dirty.h" /* for main ruleset queue creation */ /* static data */ DEFobjStaticHelpers @@ -214,6 +216,19 @@ GetCurrent(void) } +/* get main queue associated with ruleset. If no ruleset-specifc main queue + * is set, the primary main message queue is returned. + * We use a non-standard calling interface, as nothing can go wrong and it + * is really much more natural to return the pointer directly. + */ +static qqueue_t* +GetRulesetQueue(ruleset_t *pThis) +{ + ISOBJ_TYPE_assert(pThis, ruleset); + return (pThis->pQueue == NULL) ? pMsgQueue : pThis->pQueue; +} + + /* Find the ruleset with the given name and return a pointer to its object. */ static rsRetVal @@ -384,6 +399,41 @@ debugPrintAll(void) } +/* Create a ruleset-specific "main" queue for this ruleset. If one is already + * defined, an error message is emitted but nothing else is done. + * Note: we use the main message queue parameters for queue creation and access + * syslogd.c directly to obtain these. This is far from being perfect, but + * considered acceptable for the time being. + * rgerhards, 2009-10-27 + */ +static rsRetVal +rulesetCreateQueue(void __attribute__((unused)) *pVal, int *pNewVal) +{ + DEFiRet; + + if(pCurrRuleset == NULL) { + errmsg.LogError(0, RS_RET_NO_CURR_RULESET, "error: currently no specific ruleset specified, thus a " + "queue can not be added to it"); + ABORT_FINALIZE(RS_RET_NO_CURR_RULESET); + } + + if(pCurrRuleset->pQueue != NULL) { + errmsg.LogError(0, RS_RET_RULES_QUEUE_EXISTS, "error: ruleset already has a main queue, can not " + "add another one"); + ABORT_FINALIZE(RS_RET_RULES_QUEUE_EXISTS); + } + + if(pNewVal == 0) + FINALIZE; /* if it is turned off, we do not need to change anything ;) */ + + dbgprintf("adding a ruleset-specific \"main\" queue"); + CHKiRet(createMainQueue(&pCurrRuleset->pQueue, UCHAR_CONSTANT("ruleset"))); + +finalize_it: + RETiRet; +} + + /* queryInterface function * rgerhards, 2008-02-21 */ @@ -413,6 +463,7 @@ CODESTARTobjQueryInterface(ruleset) pIf->GetRuleset = GetRuleset; pIf->SetDefaultRuleset = SetDefaultRuleset; pIf->SetCurrRuleset = SetCurrRuleset; + pIf->GetRulesetQueue = GetRulesetQueue; finalize_it: ENDobjQueryInterface(ruleset) @@ -442,6 +493,9 @@ BEGINObjClassInit(ruleset, 1, OBJ_IS_CORE_MODULE) /* class, version */ /* prepare global data */ CHKiRet(llInit(&llRulesets, rulesetDestructForLinkedList, keyDestruct, strcasecmp)); + + /* config file handlers */ + CHKiRet(regCfSysLineHdlr((uchar *)"rulesetcreatemainqueue", 0, eCmdHdlrBinary, rulesetCreateQueue, NULL, NULL)); ENDObjClassInit(ruleset) /* vi:set ai: diff --git a/runtime/ruleset.h b/runtime/ruleset.h index 32571687..deea9405 100644 --- a/runtime/ruleset.h +++ b/runtime/ruleset.h @@ -25,6 +25,7 @@ #ifndef INCLUDED_RULESET_H #define INCLUDED_RULESET_H +#include "queue.h" #include "linkedlist.h" /* the ruleset object */ @@ -32,6 +33,7 @@ struct ruleset_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ linkedList_t llRules; /* this is NOT a pointer - no typo here ;) */ uchar *pszName; /* name of our ruleset */ + qqueue_t *pQueue; /* "main" message queue, if the ruleset has its own (else NULL) */ }; /* interfaces */ @@ -50,8 +52,9 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetDefaultRuleset)(uchar*); rsRetVal (*SetCurrRuleset)(uchar*); ruleset_t* (*GetCurrent)(void); + qqueue_t* (*GetRulesetQueue)(ruleset_t*); ENDinterface(ruleset) -#define rulesetCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ +#define rulesetCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */ /* prototypes */ diff --git a/tools/syslogd.c b/tools/syslogd.c index 10df10d1..62d33677 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -265,7 +265,7 @@ extern int errno; static uchar *pszConfDAGFile = NULL; /* name of config DAG file, non-NULL means generate one */ /* main message queue and its configuration parameters */ -static qqueue_t *pMsgQueue = NULL; /* the main message queue */ +qqueue_t *pMsgQueue = NULL; /* the main message queue */ static int iMainMsgQueueSize = 10000; /* size of the main message queue above */ static int iMainMsgQHighWtrMark = 8000; /* high water mark for disk-assisted queues */ static int iMainMsgQLowWtrMark = 2000; /* low water mark for disk-assisted queues */ @@ -640,7 +640,6 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu int i; msg_t *pMsg; DEFiRet; - rsRetVal localRet; assert(pBatch != NULL); @@ -650,9 +649,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu if((pMsg->msgFlags & NEEDS_PARSING) != 0) { parseMsg(pMsg); } - localRet = ruleset.ProcessMsg(pMsg); -dbgprintf("msgConsumer got iRet %d from ProcessMsg\n", localRet); /* if we reach this point, the message is considered committed (by definition!) */ pBatch->pElem[i].state = BATCH_STATE_COMM; } @@ -1037,12 +1034,17 @@ int parseLegacySyslogMsg(msg_t *pMsg, int flags) rsRetVal submitMsg(msg_t *pMsg) { + qqueue_t *pQueue; + ruleset_t *pRuleset; DEFiRet; ISOBJ_TYPE_assert(pMsg, msg); + pRuleset = MsgGetRuleset(pMsg); + + pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset); MsgPrepareEnqueue(pMsg); - qqueueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); + qqueueEnqObj(pQueue, pMsg->flowCtlType, (void*) pMsg); RETiRet; } @@ -1880,6 +1882,70 @@ startInputModules(void) } +/* create a main message queue, now also used for ruleset queues. This function + * needs to be moved to some other module, but it is considered acceptable for + * the time being (remember that we want to restructure config processing at large!). + * rgerhards, 2009-10-27 + */ +rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName) +{ + DEFiRet; + + /* switch the message object to threaded operation, if necessary */ + if(MainMsgQueType == QUEUETYPE_DIRECT || iMainMsgQueueNumWorkers > 1) { + MsgEnableThreadSafety(); + } + + /* create message queue */ + CHKiRet_Hdlr(qqueueConstruct(ppQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) { + /* no queue is fatal, we need to give up in that case... */ + errmsg.LogError(0, iRet, "could not create (ruleset) main message queue"); \ + } + /* name our main queue object (it's not fatal if it fails...) */ + obj.SetName((obj_t*) (*ppQueue), pszQueueName); + + /* ... set some properties ... */ +# define setQPROP(func, directive, data) \ + CHKiRet_Hdlr(func(*ppQueue, data)) { \ + errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ + } +# define setQPROPstr(func, directive, data) \ + CHKiRet_Hdlr(func(*ppQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ + errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ + } + + setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); + setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", iMainMsgQueDeqBatchSize); + setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles); + setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr); + +# undef setQPROP +# undef setQPROPstr + + /* ... and finally start the queue! */ + CHKiRet_Hdlr(qqueueStart(*ppQueue)) { + /* no queue is fatal, we need to give up in that case... */ + errmsg.LogError(0, iRet, "could not start (ruleset) main message queue"); \ + } + RETiRet; +} + + /* INIT -- Initialize syslogd * Note that if iConfigVerify is set, only the config file is verified but nothing * else happens. -- rgerhards, 2008-07-28 @@ -1988,59 +2054,12 @@ init(void) exit(2); } - /* switch the message object to threaded operation, if necessary */ - if(MainMsgQueType == QUEUETYPE_DIRECT || iMainMsgQueueNumWorkers > 1) { - MsgEnableThreadSafety(); - } - /* create message queue */ - CHKiRet_Hdlr(qqueueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) { + CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); exit(1); } - /* name our main queue object (it's not fatal if it fails...) */ - obj.SetName((obj_t*) pMsgQueue, (uchar*) "main Q"); - - /* ... set some properties ... */ -# define setQPROP(func, directive, data) \ - CHKiRet_Hdlr(func(pMsgQueue, data)) { \ - errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ - } -# define setQPROPstr(func, directive, data) \ - CHKiRet_Hdlr(func(pMsgQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ - errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ - } - - setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); - setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); - setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", iMainMsgQueDeqBatchSize); - setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); - setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); - setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles); - setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); - setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); - setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown); - setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq); - setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark); - setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark); - setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark); - setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); - setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); - setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); - setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown); - setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr); - setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr); - -# undef setQPROP -# undef setQPROPstr - - /* ... and finally start the queue! */ - CHKiRet_Hdlr(qqueueStart(pMsgQueue)) { - /* no queue is fatal, we need to give up in that case... */ - fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet); - exit(1); - } bHaveMainQueue = (MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1; DBGPRINTF("Main processing queue is initialized and running\n"); -- cgit v1.2.3