From 46fbfee41e88034135725beb4136d44b94388ede Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
Date: Thu, 3 Apr 2008 13:19:48 +0000
Subject: added the capability to specify a processing (actually dequeue)
timeframe with queues - so things can be configured to be done at
off-peak hours
---
ChangeLog | 3 ++
action.c | 15 ++++++--
doc/features.html | 6 +--
doc/queues.html | 12 +++++-
doc/rsyslog_ng_comparison.html | 13 ++++++-
queue.c | 85 +++++++++++++++++++++++++++++++++---------
queue.h | 2 +-
stringbuf.h | 1 +
syslogd.c | 23 ++++--------
9 files changed, 114 insertions(+), 46 deletions(-)
diff --git a/ChangeLog b/ChangeLog
index 4648f791..f6291a67 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -5,6 +5,9 @@ Version 3.17.0 (rgerhards), 2008-04-??
- bugfix: memory leaks in script engine
- properties are now case-insensitive everywhere (script, filters,
templates)
+- added the capability to specify a processing (actually dequeue)
+ timeframe with queues - so things can be configured to be done
+ at off-peak hours
---------------------------------------------------------------------------
Version 3.15.1 (rgerhards), 2008-04-??
- disabled atomic operations for the time being because they introduce some
diff --git a/action.c b/action.c
index 99ae8b32..30bf3c92 100644
--- a/action.c
+++ b/action.c
@@ -74,8 +74,10 @@ static int iActionQtoEnq = 2000; /* timeout for queue enque */
static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
-static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
static int64 iActionQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
+static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
+static int iActionQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */
+static int iActionQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
@@ -113,8 +115,10 @@ actionResetQueueParams(void)
iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
- iActionQueueDeqSlowdown = 0;
iActionQueMaxDiskSpace = 0;
+ iActionQueueDeqSlowdown = 0;
+ iActionQueueDeqtWinFromHr = 0;
+ iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */
glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */
@@ -237,7 +241,9 @@ actionConstructFinalize(action_t *pThis)
setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
- setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
+ setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
+ setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
+ setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
@@ -680,6 +686,8 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL));
finalize_it:
RETiRet;
@@ -800,6 +808,5 @@ finalize_it:
RETiRet;
}
-
/* vi:set ai:
*/
diff --git a/doc/features.html b/doc/features.html
index 9573030e..f74f2aaf 100644
--- a/doc/features.html
+++ b/doc/features.html
@@ -1,7 +1,5 @@
-rsyslog features
-
-
+rsyslog features
RSyslog - Features
This page lists both current features as well as
@@ -31,7 +29,7 @@ reliability
support for sending and receiving compressed syslog messages
support for on-demand on-disk spooling of messages that can
not be processed fast enough (a great feature for writing massive
-amounts of syslog messages to a database)
+amounts of syslog messages to a database)support for selectively processing messages only during specific timeframes and spooling them to disk otherwise
ability to monitor text files and convert their contents
into syslog messages (one per line)
ability to configure backup syslog/database servers - if
diff --git a/doc/queues.html b/doc/queues.html
index 80641d8c..a2074d36 100644
--- a/doc/queues.html
+++ b/doc/queues.html
@@ -288,7 +288,17 @@ directive allows to specify how long (in microseconds) dequeueing should be
delayed. While simple, it still is powerful. For example, using a
DequeueSlowdown delay of 1,000 microseconds on a UDP send action ensures that no
more than 1,000 messages can be sent within a second (actually less, as there is
-also some time needed for the processing itself).
+also some time needed for the processing itself).
Processing Timeframes
Queues
+can be set to dequeue (process) messages only during certain
+timeframes. This is useful if you, for example, would like to transfer
+the bulk of messages only during off-peak hours, e.g. when you have
+only limited bandwidth on the network path the the central server.
Currently,
+only a single timeframe is supported and, even worse, it can only be
+specified by the hour. It is not hard to extend rsyslog's capabilities
+in this regard - it was just not requested so far. So if you need more
+fine-grained control, let us know and we'll probably implement it.
+There are two configuration directives, both should be used together or
+results are unpredictable:" $<object>QueueDequeueTimeBegin <hour>" and "$<object>QueueDequeueTimeEnd <hour>". The hour parameter must be specified in 24-hour format (so 10pm is 22). A use case for this parameter can be found in the rsyslog wiki.
Terminating Queues
Terminating a process sounds easy, but can be complex.
diff --git a/doc/rsyslog_ng_comparison.html b/doc/rsyslog_ng_comparison.html
index 852ab4ed..60eeee74 100644
--- a/doc/rsyslog_ng_comparison.html
+++ b/doc/rsyslog_ng_comparison.html
@@ -1,6 +1,6 @@
-rsyslog vs. syslog-ng - a comparison
+rsyslog vs. syslog-ng - a comparison
@@ -122,7 +122,9 @@ based framing on syslog/tcp connections
yes |
-syslog over RELP truly reliable message delivery (Why is plain tcp syslog not reliable?) |
+syslog over RELP
+truly reliable message delivery (Why
+is plain tcp syslog not reliable?) |
yes |
no |
@@ -337,6 +339,13 @@ be placed on different disk
no |
+ability to process spooled
+messages only during a configured timeframe (e.g. process messages only
+during off-peak hours, during peak hours they are enqueued only) |
+yes (can independently be configured for the main queue and each action queue) |
+no |
+
+
ability to configure backup
syslog/database servers |
yes |
diff --git a/queue.c b/queue.c
index 57484e60..7456f4a6 100644
--- a/queue.c
+++ b/queue.c
@@ -1,16 +1,16 @@
/* queue.c
-*
-* This file implements the queue object and its several queueing methods.
-*
-* File begun on 2008-01-03 by RGerhards
-*
-* There is some in-depth documentation available in doc/dev_queue.html
-* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
-* if you are getting aquainted to the object.
-*
-* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
-*
-* This file is part of rsyslog.
+ *
+ * This file implements the queue object and its several queueing methods.
+ *
+ * File begun on 2008-01-03 by RGerhards
+ *
+ * There is some in-depth documentation available in doc/dev_queue.html
+ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
+ * if you are getting aquainted to the object.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -38,6 +38,7 @@
#include
#include
#include /* required for HP UX */
+#include
#include
#include "rsyslog.h"
@@ -272,6 +273,8 @@ queueStartDA(queue_t *pThis)
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
+ CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
+ CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
@@ -342,7 +345,6 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
@@ -1269,6 +1271,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1486,20 +1489,67 @@ finalize_it:
* }
*
* Bottom line: we need to check which type of window we have and need to adjust our
- * logic accordingly. Of course, sleep calculations need to be done up to the minute,
+ * logic accordingly. Of course, sleep calculations need to be done up to the minute,
* but you get the idea from the code above.
*/
static rsRetVal
queueRateLimiter(queue_t *pThis)
{
DEFiRet;
+ int iDelay;
+ int iHrCurr;
+ time_t tCurr;
+ struct tm m;
ISOBJ_TYPE_assert(pThis, queue);
dbgoprint((obj_t*) pThis, "entering rate limiter\n");
- srSleep(2, 0);
-finalize_it:
+ iDelay = 0;
+dbgprintf("deq win from %d to %d\n", pThis->iDeqtWinFromHr, pThis->iDeqtWinToHr);
+ if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
+ /* time calls are expensive, so only do them when needed */
+ time(&tCurr);
+ localtime_r(&tCurr, &m);
+ iHrCurr = m.tm_hour;
+RUNLOG_VAR("%d", iHrCurr);
+
+ if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) {
+ if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) {
+ ; /* do not delay */
+ } else {
+ iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600;
+ /* this time, we are already into the next hour, so we need
+ * to subtract our current minute and seconds.
+ */
+ iDelay -= m.tm_min * 60;
+ iDelay -= m.tm_sec;
+ }
+ } else {
+ if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) {
+ ; /* do not delay */
+ } else {
+ if(iHrCurr < pThis->iDeqtWinFromHr) {
+ iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; /* -1 as we are already in the hour */
+ iDelay += (60 - m.tm_min) * 60;
+ iDelay += 60 - m.tm_sec;
+ } else {
+ iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600;
+ /* this time, we are already into the next hour, so we need
+ * to subtract our current minute and seconds.
+ */
+ iDelay -= m.tm_min * 60;
+ iDelay -= m.tm_sec;
+ }
+ }
+ }
+ }
+
+ if(iDelay > 0) {
+ dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
+ srSleep(iDelay, 0);
+ }
+
dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet);
RETiRet;
}
@@ -2272,6 +2322,5 @@ BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
ENDObjClassInit(queue)
-/*
- * vi:set ai:
+/* vi:set ai:
*/
diff --git a/queue.h b/queue.h
index ecac6ee4..7dfeb226 100644
--- a/queue.h
+++ b/queue.h
@@ -87,7 +87,7 @@ typedef struct queue_s {
/* end rate limiting */
/* dequeue time window settings (may also be expanded) */
int iDeqtWinFromHr; /* begin of dequeue time window (hour only) */
- int iDeqtWinToHr; /* end of dequeue time window (hour only) */
+ int iDeqtWinToHr; /* end of dequeue time window (hour only), set to 25 to disable deq window! */
/* note that begin and end have specific semantics. It is a big difference if we have
* begin 4, end 22 or begin 22, end 4. In the later case, dequeuing will run from 10p,
* throughout the night and stop at 4 in the morning. In the first case, it will start
diff --git a/stringbuf.h b/stringbuf.h
index aa31884e..3475b8f6 100755
--- a/stringbuf.h
+++ b/stringbuf.h
@@ -121,6 +121,7 @@ void rsCStrSetAllocIncrement(cstr_t *pThis, int iNewIncrement);
rsRetVal rsCStrAppendInt(cstr_t *pThis, long i);
+rsRetVal strExit(void); /* TODO: remove once we have a real object interface! */
uchar* rsCStrGetSzStr(cstr_t *pThis);
uchar* rsCStrGetSzStrNoNULL(cstr_t *pThis);
rsRetVal rsCStrSetSzStr(cstr_t *pThis, uchar *pszNew);
diff --git a/syslogd.c b/syslogd.c
index b5554e5f..e6dae301 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -338,8 +338,10 @@ static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */
static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
-static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
static int64 iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
+static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+static int iMainMsgQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */
+static int iMainMsgQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */
/* support for simple textual representation of FIOP names
@@ -2323,6 +2325,8 @@ init(void)
setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown);
+ setQPROP(queueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr);
+ setQPROP(queueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
@@ -2687,6 +2691,8 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinFromHr, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinToHr, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL));
@@ -2766,21 +2772,6 @@ static void mainThread()
BEGINfunc
uchar *pTmp;
-#if 0 // code moved back to main()
- /* doing some core initializations */
- if((iRet = modInitIminternal()) != RS_RET_OK) {
- fprintf(stderr, "fatal error: could not initialize errbuf object (error code %d).\n",
- iRet);
- exit(1); /* "good" exit, leaving at init for fatal error */
- }
-
- if((iRet = loadBuildInModules()) != RS_RET_OK) {
- fprintf(stderr, "fatal error: could not activate built-in modules. Error code %d.\n",
- iRet);
- exit(1); /* "good" exit, leaving at init for fatal error */
- }
-#endif
-
/* Note: signals MUST be processed by the thread this code is running in. The reason
* is that we need to interrupt the select() system call. -- rgerhards, 2007-10-17
*/
--
cgit v1.2.3