summaryrefslogtreecommitdiffstats
path: root/tools/syslogd.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-27 17:31:27 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-27 17:31:27 +0100
commit1ada506e2d90377c2475e103340d8986bf8847f9 (patch)
tree214d2276fbfcff39d025eaf114cf5813e5a445cd /tools/syslogd.c
parentf3134f89211ea6a65e72bca1dd2f91bf0a0ae894 (diff)
downloadrsyslog-1ada506e2d90377c2475e103340d8986bf8847f9.tar.gz
rsyslog-1ada506e2d90377c2475e103340d8986bf8847f9.tar.bz2
rsyslog-1ada506e2d90377c2475e103340d8986bf8847f9.zip
added the capability to have ruleset-specific main message queues
This offers considerable additional flexibility AND superior performance (in cases where multiple inputs now can avoid lock contention)
Diffstat (limited to 'tools/syslogd.c')
-rw-r--r--tools/syslogd.c125
1 files changed, 72 insertions, 53 deletions
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 10df10d1..62d33677 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -265,7 +265,7 @@ extern int errno;
static uchar *pszConfDAGFile = NULL; /* name of config DAG file, non-NULL means generate one */
/* main message queue and its configuration parameters */
-static qqueue_t *pMsgQueue = NULL; /* the main message queue */
+qqueue_t *pMsgQueue = NULL; /* the main message queue */
static int iMainMsgQueueSize = 10000; /* size of the main message queue above */
static int iMainMsgQHighWtrMark = 8000; /* high water mark for disk-assisted queues */
static int iMainMsgQLowWtrMark = 2000; /* low water mark for disk-assisted queues */
@@ -640,7 +640,6 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu
int i;
msg_t *pMsg;
DEFiRet;
- rsRetVal localRet;
assert(pBatch != NULL);
@@ -650,9 +649,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
parseMsg(pMsg);
}
- localRet =
ruleset.ProcessMsg(pMsg);
-dbgprintf("msgConsumer got iRet %d from ProcessMsg\n", localRet);
/* if we reach this point, the message is considered committed (by definition!) */
pBatch->pElem[i].state = BATCH_STATE_COMM;
}
@@ -1037,12 +1034,17 @@ int parseLegacySyslogMsg(msg_t *pMsg, int flags)
rsRetVal
submitMsg(msg_t *pMsg)
{
+ qqueue_t *pQueue;
+ ruleset_t *pRuleset;
DEFiRet;
ISOBJ_TYPE_assert(pMsg, msg);
+ pRuleset = MsgGetRuleset(pMsg);
+
+ pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset);
MsgPrepareEnqueue(pMsg);
- qqueueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg);
+ qqueueEnqObj(pQueue, pMsg->flowCtlType, (void*) pMsg);
RETiRet;
}
@@ -1880,6 +1882,70 @@ startInputModules(void)
}
+/* create a main message queue, now also used for ruleset queues. This function
+ * needs to be moved to some other module, but it is considered acceptable for
+ * the time being (remember that we want to restructure config processing at large!).
+ * rgerhards, 2009-10-27
+ */
+rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName)
+{
+ DEFiRet;
+
+ /* switch the message object to threaded operation, if necessary */
+ if(MainMsgQueType == QUEUETYPE_DIRECT || iMainMsgQueueNumWorkers > 1) {
+ MsgEnableThreadSafety();
+ }
+
+ /* create message queue */
+ CHKiRet_Hdlr(qqueueConstruct(ppQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) {
+ /* no queue is fatal, we need to give up in that case... */
+ errmsg.LogError(0, iRet, "could not create (ruleset) main message queue"); \
+ }
+ /* name our main queue object (it's not fatal if it fails...) */
+ obj.SetName((obj_t*) (*ppQueue), pszQueueName);
+
+ /* ... set some properties ... */
+# define setQPROP(func, directive, data) \
+ CHKiRet_Hdlr(func(*ppQueue, data)) { \
+ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ }
+# define setQPROPstr(func, directive, data) \
+ CHKiRet_Hdlr(func(*ppQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
+ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ }
+
+ setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
+ setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", iMainMsgQueDeqBatchSize);
+ setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
+ setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles);
+ setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr);
+
+# undef setQPROP
+# undef setQPROPstr
+
+ /* ... and finally start the queue! */
+ CHKiRet_Hdlr(qqueueStart(*ppQueue)) {
+ /* no queue is fatal, we need to give up in that case... */
+ errmsg.LogError(0, iRet, "could not start (ruleset) main message queue"); \
+ }
+ RETiRet;
+}
+
+
/* INIT -- Initialize syslogd
* Note that if iConfigVerify is set, only the config file is verified but nothing
* else happens. -- rgerhards, 2008-07-28
@@ -1988,59 +2054,12 @@ init(void)
exit(2);
}
- /* switch the message object to threaded operation, if necessary */
- if(MainMsgQueType == QUEUETYPE_DIRECT || iMainMsgQueueNumWorkers > 1) {
- MsgEnableThreadSafety();
- }
-
/* create message queue */
- CHKiRet_Hdlr(qqueueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) {
+ CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);
}
- /* name our main queue object (it's not fatal if it fails...) */
- obj.SetName((obj_t*) pMsgQueue, (uchar*) "main Q");
-
- /* ... set some properties ... */
-# define setQPROP(func, directive, data) \
- CHKiRet_Hdlr(func(pMsgQueue, data)) { \
- errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
- }
-# define setQPROPstr(func, directive, data) \
- CHKiRet_Hdlr(func(pMsgQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
- errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
- }
-
- setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
- setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
- setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", iMainMsgQueDeqBatchSize);
- setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
- setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
- setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles);
- setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
- setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown);
- setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown);
- setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq);
- setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark);
- setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark);
- setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark);
- setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity);
- setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
- setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
- setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown);
- setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr);
- setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr);
-
-# undef setQPROP
-# undef setQPROPstr
-
- /* ... and finally start the queue! */
- CHKiRet_Hdlr(qqueueStart(pMsgQueue)) {
- /* no queue is fatal, we need to give up in that case... */
- fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet);
- exit(1);
- }
bHaveMainQueue = (MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1;
DBGPRINTF("Main processing queue is initialized and running\n");