summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog36
-rw-r--r--action.c34
-rw-r--r--action.h1
-rw-r--r--configure.ac2
-rw-r--r--dirty.h9
-rw-r--r--doc/imklog.html66
-rw-r--r--doc/imptcp.html15
-rw-r--r--doc/imtcp.html11
-rw-r--r--doc/imudp.html7
-rw-r--r--doc/manual.html2
-rw-r--r--doc/omfile.html10
-rw-r--r--doc/omlibdbi.html46
-rw-r--r--doc/v7compatibility.html37
-rw-r--r--plugins/imdiag/imdiag.c10
-rw-r--r--plugins/imfile/imfile.c22
-rw-r--r--plugins/imklog/bsd.c25
-rw-r--r--plugins/imklog/imklog.c13
-rw-r--r--plugins/imklog/imklog.h1
-rw-r--r--plugins/imkmsg/imkmsg.c2
-rw-r--r--plugins/impstats/impstats.c3
-rw-r--r--plugins/imptcp/imptcp.c36
-rw-r--r--plugins/imsolaris/imsolaris.c2
-rw-r--r--plugins/imtcp/imtcp.c20
-rw-r--r--plugins/imudp/imudp.c40
-rw-r--r--plugins/imuxsock/imuxsock.c121
-rw-r--r--plugins/omlibdbi/omlibdbi.c151
-rw-r--r--plugins/ommongodb/ommongodb.c10
-rw-r--r--plugins/omruleset/omruleset.c6
-rw-r--r--runtime/Makefile.am2
-rw-r--r--runtime/debug.c6
-rw-r--r--runtime/msg.c96
-rw-r--r--runtime/msg.h2
-rw-r--r--runtime/queue.c2
-rw-r--r--runtime/ratelimit.c359
-rw-r--r--runtime/ratelimit.h51
-rw-r--r--runtime/rsconf.c4
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--runtime/ruleset.c3
-rw-r--r--runtime/stream.c140
-rw-r--r--runtime/stream.h7
-rw-r--r--runtime/typedefs.h1
-rw-r--r--tcps_sess.c16
-rw-r--r--tcpsrv.c19
-rw-r--r--tcpsrv.h7
-rw-r--r--template.c71
-rw-r--r--template.h8
-rw-r--r--tools/omfile.c15
-rw-r--r--tools/syslogd.c84
48 files changed, 1192 insertions, 440 deletions
diff --git a/ChangeLog b/ChangeLog
index 5dc5584b..9081f07d 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,37 @@
-----------------------------------------------------------------------------
+---------------------------------------------------------------------------
+Version 7.3.2 [devel] 2012-10-??
+- imtcp: support for Linux-Type ratelimiting added
+- imptcp: support for Linux-Type ratelimiting added
+- imudp enhancements:
+ * support for input batching added (performance improvement)
+ * support for Linux-Type ratelimiting added
+---------------------------------------------------------------------------
+Version 7.3.1 [devel] 2012-10-19
+- optimized template processing performance, especially for $NOW family
+ of properties
+- change lumberjack cookie to "@cee:" from "@cee: "
+ CEE originally specified the cookie with SP, whereas other lumberjack
+ tools used it without space. In order to keep interop with lumberjack,
+ we now use the cookie without space as well. I hope this can be changed
+ in CEE as well when it is released at a later time.
+ Thanks to Miloslav Trmač for pointing this out and a similiar v7 patch.
+- bugfix: imuxsock and imklog truncated head of received message
+ This happened only under some circumstances. Thanks to Marius
+ Tomaschwesky, Florian Piekert and Milan Bartos for their help in
+ solving this issue.
+- bugfix: imuxsock did not properly honor $LocalHostIPIF
+---------------------------------------------------------------------------
+Version 7.3.0 [devel] 2012-10-09
+- omlibdbi improvements, added
+ * support for config load phases & module() parameters
+ * support for default templates
+ * driverdirectory is now cleanly a global parameter, but can no longer
+ be specified as an action paramter. Note that in previous versions
+ this parameter was ignored in all but the first action definition
+- improved omfile zip writer to increase compression
+ This was achieved by somewhat reducing the robustness of the zip archive.
+ This is controlled by the new action parameter "VeryReliableZip".
+---------------------------------------------------------------------------
Version 7.1.13 [beta] 2012-10-??
- bugfix: imuxsock did not properly honor $LocalHostIPIF
----------------------------------------------------------------------------
@@ -15,6 +48,7 @@ Version 7.1.12 [beta] 2012-10-18
This happened only under some circumstances. Thanks to Marius
Tomaschwesky and Florian Piekert for their help in solving this issue.
----------------------------------------------------------------------------
+>>>>>>> b151584d0929759284c0fb0399709e5ca0e29d60
Version 7.1.11 [beta] 2012-10-16
- bugfix: imuxsock truncated head of received message
This happened only under some circumstances. Thanks to Marius
diff --git a/action.c b/action.c
index 9c06f61e..cf010d01 100644
--- a/action.c
+++ b/action.c
@@ -809,7 +809,8 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem)
+static rsRetVal
+prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow)
{
int i;
msg_t *pMsg;
@@ -825,17 +826,17 @@ static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem)
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]),
- &pElem->staticLenStrings[i]));
+ &pElem->staticLenStrings[i], ttNow));
pElem->staticActParams[i] = pElem->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i])));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow));
break;
case ACT_MSG_PASSING:
pElem->staticActParams[i] = (void*) pMsg;
break;
case ACT_JSON_PASSING:
- CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json));
+ CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
pElem->staticActParams[i] = (void*) json;
break;
default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n",
@@ -1226,14 +1227,19 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR
{
int i;
batch_obj_t *pElem;
+ struct syslogTime ttNow;
DEFiRet;
+ if(pAction->requiresDateCall) {
+ datetime.getCurrTime(&ttNow, NULL);
+ }
+
pBatch->iDoneUpTo = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
pElem = &(pBatch->pElem[i]);
if(batchIsValidElem(pBatch, i)) {
pElem->state = BATCH_STATE_RDY;
- if(prepareDoActionParams(pAction, pElem) != RS_RET_OK) {
+ if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) {
/* make sure we have our copy of "active" array */
if(!*bMustRestoreActivePtr) {
*activeSave = pBatch->active;
@@ -1875,6 +1881,23 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
return RS_RET_OK;
}
+/* check if the templates used in this action require a date call
+ * ($NOW family of properties).
+ */
+static inline int
+actionRequiresDateCall(action_t *pAction)
+{
+ int i;
+ int r = 0;
+
+ for(i = 0 ; i < pAction->iNumTpls ; ++i) {
+ if(tplRequiresDateCall(pAction->ppTpl[i])) {
+ r = 1;
+ break;
+ }
+ }
+ return r;
+}
/* add an Action to the current selector
@@ -1980,6 +2003,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->f_ReduceRepeated = 0;
}
pAction->eState = ACT_STATE_RDY; /* action is enabled */
+ pAction->requiresDateCall = actionRequiresDateCall(pAction);
if(bSuspended)
actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */
diff --git a/action.h b/action.h
index bce36b4c..177fd682 100644
--- a/action.h
+++ b/action.h
@@ -70,6 +70,7 @@ struct action_s {
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
+ sbool requiresDateCall;/* do we need to do a date call before creating templates? */
int f_prevcount; /* repetition cnt of prevline */
int f_repeatcount; /* number of "repeated" msgs */
rsRetVal (*submitToActQ)(action_t *, batch_t *);/* function submit message to action queue */
diff --git a/configure.ac b/configure.ac
index 3c5b6764..660c7ee9 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-AC_INIT([rsyslog],[7.1.12],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[7.3.1],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
diff --git a/dirty.h b/dirty.h
index a3940cb9..30b30bec 100644
--- a/dirty.h
+++ b/dirty.h
@@ -27,10 +27,13 @@
#ifndef DIRTY_H_INCLUDED
#define DIRTY_H_INCLUDED 1
-rsRetVal multiSubmitMsg(multi_submit_t *pMultiSub);
-rsRetVal submitMsg(msg_t *pMsg);
+rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub);
+rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub); /* friends only! */
+rsRetVal submitMsg2(msg_t *pMsg);
+rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg);
+rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub);
rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags);
-rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset);
+rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset);
rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */
rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName);
diff --git a/doc/imklog.html b/doc/imklog.html
index 2e3b3bc2..b62f4f92 100644
--- a/doc/imklog.html
+++ b/doc/imklog.html
@@ -15,7 +15,10 @@
syslog engine.</p>
<p><b>Configuration Directives</b>:</p>
<ul>
-<li><strong>$KLogInternalMsgFacility
+<li><strong>LogPath</strong><br>
+The path to the Kernel log. This value should only be changed if you really know what
+you are doing.</li>
+<li><strong>InternalMsgFacility
&lt;facility&gt;</strong><br>
The facility which messages internally generated by imklog will have.
imklog generates some messages of itself (e.g. on problems, startup and
@@ -26,13 +29,47 @@ need to specify this configuratin directive - it is included primarily
for few limited cases where it is needed for good reason. Bottom line:
if you don't have a good idea why you should use this setting, do not
touch it.</li>
-<li><span style="font-weight: bold;">$KLogPermitNonKernelFacility
+<li><span style="font-weight: bold;">PermitNonKernelFacility
[on/<span style="font-style: italic;">off</span>]<br>
</span>At least under BSD the kernel log may contain entries
with non-kernel facilities. This setting controls how those are
handled. The default is "off", in which case these messages are
ignored. Switch it to on to submit non-kernel messages to rsyslog
processing.<span style="font-weight: bold;"></span></li>
+<li><b>KeepKernelTimeStamp</b> [on/<b>off</b>] --
+<br>if the Kernel creates a timestamp for its log messages, this timestamp will be preserved.
+If disabled, the time when the message is received will be used.<br>
+<li><b>ConsoleLogLevel</b> [<i>number</i>]
+(former klogd -c option) -- sets the console log level. If specified, only messages with
+up to the specified level are printed to the console. The default is -1, which means that
+the current settings are not modified. To get this behavior, do not specify
+ConsoleLogLevel in the configuration file. Note that this is a global parameter. Each time
+it is changed, the previous definition is re-set. The one activate will be that one that is
+active when imklog actually starts processing. In short words: do not specify this
+directive more than once!
+</ul>
+<b>Caveats/Known Bugs:</b>
+<p>This is obviously platform specific and requires platform
+drivers.
+Currently, imklog functionality is available on Linux and BSD.</p>
+<p>This module is <b>not supported on Solaris</b> and not needed there.
+For Solaris kernel input, use <a href="imsolaris.html">imsolaris</a>.</p>
+<p><b>Sample:</b></p>
+<p>The following sample pulls messages from the kernel log. All
+parameters are left by default, which is usually a good idea. Please
+note that loading the plugin is sufficient to activate it. No directive
+is needed to start pulling kernel messages.<br>
+</p>
+<textarea rows="15" cols="60">module(load="imklog")
+</textarea>
+<p><b>Legacy Configuration Directives</b>:</p>
+<ul>
+<li><strong>$KLogInternalMsgFacility
+&lt;facility&gt;</strong><br>
+equivalent to: InternalMsgFacility</li>
+<li><span style="font-weight: bold;">$KLogPermitNonKernelFacility
+[on/<span style="font-style: italic;">off</span>]<br>
+equivalent to: PermitNonKernelFacility</li>
<li><span style="font-weight: bold;"></span>$DebugPrintKernelSymbols
[on/<b>off</b>]<br>
Linux only, ignored on other platforms (but may be specified)</li>
@@ -50,14 +87,7 @@ it except if you have a very good reason. If you have one, let us know
because otherwise new versions will no longer support it.<br>
Linux only, ignored on other platforms (but may be specified)</li>
<li><b>$klogConsoleLogLevel</b> [<i>number</i>]
-(former klogd -c option) -- sets the console log level. If specified, only messages with
-up to the specified level are printed to the console. The default is -1, which means that
-the current settings are not modified. To get this behavior, do not specify
-$klogConsoleLogLevel in the configuration file. Note that this is a global parameter. Each time
-it is changed, the previous definition is re-set. The one activate will be that one that is
-active when imklog actually starts processing. In short words: do not specify this
-directive more than once!
-<br><b>Linux only</b>, ignored on other platforms (but may be specified)</li>
+<br>equivalent to: ConsoleLogLevel</li>
<li><b>$klogUseSyscallInterface</b> [on/<b>off</b>]
-- former klogd -s option<br>
Linux only, ignored on other platforms (but may be specified)</li>
@@ -66,26 +96,12 @@ former klogd -2 option<br>
Linux only, ignored on other platforms (but may be specified)<br style="font-weight: bold;">
</li>
</ul>
-<b>Caveats/Known Bugs:</b>
-<p>This is obviously platform specific and requires platform
-drivers.
-Currently, imklog functionality is available on Linux and BSD.</p>
-<p>This module is <b>not supported on Solaris</b> and not needed there.
-For Solaris kernel input, use <a href="imsolaris.html">imsolaris</a>.</p>
-<p><b>Sample:</b></p>
-<p>The following sample pulls messages from the kernel log. All
-parameters are left by default, which is usually a good idea. Please
-note that loading the plugin is sufficient to activate it. No directive
-is needed to start pulling kernel messages.<br>
-</p>
-<textarea rows="15" cols="60">$ModLoad imklog
-</textarea>
<p>[<a href="rsyslog_conf.html">rsyslog.conf overview</a>]
[<a href="manual.html">manual index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p>
<p><font size="2">This documentation is part of the
<a href="http://www.rsyslog.com/">rsyslog</a>
project.<br>
-Copyright &copy; 2008-2009 by <a href="http://www.gerhards.net/rainer">Rainer
+Copyright &copy; 2008-2012 by <a href="http://www.gerhards.net/rainer">Rainer
Gerhards</a> and
<a href="http://www.adiscon.com/">Adiscon</a>.
Released under the GNU GPL version 3 or higher.</font></p>
diff --git a/doc/imptcp.html b/doc/imptcp.html
index 7e712afa..33b8b13b 100644
--- a/doc/imptcp.html
+++ b/doc/imptcp.html
@@ -13,18 +13,12 @@
<p><b>Description</b>:</p>
<p>Provides the ability to receive syslog messages via plain TCP syslog.
This is a specialised input plugin tailored for high performance on Linux. It will
-probably not run on any other platform. Also, it does no provide TLS services.
+probably not run on any other platform. Also, it does not provide TLS services.
Encryption can be provided by using <a href="rsyslog_stunnel.html">stunnel</a>.
<p>This module has no limit on the number of listeners and sessions that can be used.
-<p>Multiple receivers may be configured by
-specifying $InputPTCPServerRun multiple times.
</p>
<p><b>Configuration Directives</b>:</p>
-<p>This plugin has config directives similar named as imtcp, but they all have <b>P</b>TCP in
-their name instead of just TCP. Note that only a subset of the parameters are supported.
-<ul>
-
<p><b>Global Directives</b>:</p>
<ul>
<li>Threads &lt;number&gt;<br>
@@ -91,6 +85,13 @@ the message was received from.
Binds specified ruleset to next server defined.
<li><b>Address</b> &lt;name&gt;<br>
On multi-homed machines, specifies to which local address the listerner should be bound.
+<li><b>RateLimit.Interval</b> [number] - (available since 7.3.1) specifies the rate-limiting
+interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number
+of seconds (5 recommended) to activate rate-limiting.
+</li>
+<li><b>RateLimit.Burst</b> [number] - (available since 7.3.1) specifies the rate-limiting
+burst in number of messages. Default is 10,000.
+</li>
</ul>
<b>Caveats/Known Bugs:</b>
<ul>
diff --git a/doc/imtcp.html b/doc/imtcp.html
index 01ea2802..4bda46ba 100644
--- a/doc/imtcp.html
+++ b/doc/imtcp.html
@@ -17,10 +17,6 @@
Encryption is natively provided by selecting the approprioate network stream driver and
can also be provided by using <a href="rsyslog_stunnel.html">stunnel</a>
(an alternative is the use the <a href="imgssapi.html">imgssapi</a> module).</p>
-<p>Multiple receivers may be configured by specifying
-$InputTCPServerRun multiple times. This is available since version 4.3.1, earlier
-versions do NOT support it.
-</p>
<p><b>Configuration Directives</b>:</p>
<p><b>Global Directives</b>:</p>
@@ -100,6 +96,13 @@ activated. This is the default and should be left unchanged until you know
very well what you do. It may be useful to turn it off, if you know this framing
is not used and some senders emit multi-line messages into the message stream.
</li>
+<li><b>RateLimit.Interval</b> [number] - (available since 7.3.1) specifies the rate-limiting
+interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number
+of seconds (5 recommended) to activate rate-limiting.
+</li>
+<li><b>RateLimit.Burst</b> [number] - (available since 7.3.1) specifies the rate-limiting
+burst in number of messages. Default is 10,000.
+</li>
</ul>
<b>Caveats/Known Bugs:</b>
<ul>
diff --git a/doc/imudp.html b/doc/imudp.html
index b1a3ecc9..e32f9ecf 100644
--- a/doc/imudp.html
+++ b/doc/imudp.html
@@ -47,6 +47,13 @@ default 514, start UDP server on this port. Either a single port can be specifie
<br>Array of ports: Port=["514","515","10514","..."]</li>
<li><b>Ruleset</b> &lt;ruleset&gt;<br>
Binds the listener to a specific <a href="multi_ruleset.html">ruleset</a>.</li>
+<li><b>RateLimit.Interval</b> [number] - (available since 7.3.1) specifies the rate-limiting
+interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number
+of seconds (5 recommended) to activate rate-limiting.
+</li>
+<li><b>RateLimit.Burst</b> [number] - (available since 7.3.1) specifies the rate-limiting
+burst in number of messages. Default is 10,000.
+</li>
</ul>
<b>Caveats/Known Bugs:</b>
<ul>
diff --git a/doc/manual.html b/doc/manual.html
index 45ba771b..e3a5e62b 100644
--- a/doc/manual.html
+++ b/doc/manual.html
@@ -19,7 +19,7 @@ rsyslog support</a> available directly from the source!</p>
<p><b>Please visit the <a href="http://www.rsyslog.com/sponsors">rsyslog sponsor's page</a>
to honor the project sponsors or become one yourself!</b> We are very grateful for any help towards the
project goals.</p>
-<p><b>This documentation is for version 7.1.12 (beta branch) of rsyslog.</b>
+<p><b>This documentation is for version 7.3.1 (devel branch) of rsyslog.</b>
Visit the <i><a href="http://www.rsyslog.com/status">rsyslog status page</a></i></b>
to obtain current version information and project status.
</p><p><b>If you like rsyslog, you might
diff --git a/doc/omfile.html b/doc/omfile.html
index 23ecc034..e58ea3b0 100644
--- a/doc/omfile.html
+++ b/doc/omfile.html
@@ -28,6 +28,16 @@
<li><strong>ZipLevel </strong>0..9 [default 0]<br>
if greater 0, turns on gzip compression of the output file. The higher the number, the better the compression, but also the more CPU is required for zipping.<br></li><br>
+ <li><b>VeryReliableZip</b> [<b>on</b>/off] (v7.3.0+) - if ZipLevel is greater 0,
+ then this setting controls if extra headers are written to make the resulting file
+ extra hardened against malfunction. If set to off, data appended to previously unclean
+ closed files may not be accessible without extra tools.
+ Note that this risk is usually expected to be bearable, and thus "off" is the default mode.
+ The extra headers considerably
+ degrade compression, files with this option set to "on" may be four to five times as
+ large as files processed in "off" mode.
+ </li><br>
+
<li><strong>FlushInterval </strong>(not mandatory, default will be used)<br>
Defines a template to be used for the output. <br></li><br>
diff --git a/doc/omlibdbi.html b/doc/omlibdbi.html
index 008dcb81..e47c7f57 100644
--- a/doc/omlibdbi.html
+++ b/doc/omlibdbi.html
@@ -54,7 +54,23 @@ dlopen()ed plugin (as omlibdbi is). So in short, you probably save you
a lot of headache if you make sure you have at least libdbi version
0.8.3 on your system.
</p>
-<p><b>Action Parameters</b>:</p>
+<p><b>Module Parameters</b></p>
+<ul>
+<li><b>template</b><br>
+The default template to use. This template is used when no template is
+explicitely specified in the action() statement.
+<li><b>driverdirectory</b><br>
+Path to the libdbi drivers. Usually,
+you do not need to set it. If you installed libdbi-drivers at a
+non-standard location, you may need to specify the directory here. If
+you are unsure, do <b>not</b> use this configuration directive.
+Usually, everything works just fine.
+Note that this was an action() paramter in rsyslog versions below 7.3.0.
+However, only the first action's driverdirectory parameter was actually used.
+This has been cleaned up in 7.3.0, where this now is a module paramter.
+</li>
+</ul>
+<p><b>Action Parameters</b></p>
<ul>
<li><b>server</b><br>Name or address of the MySQL server
<li><b>db</b><br>Database to use
@@ -68,24 +84,18 @@ writiting "mysql" (suggest to use ommysql instead), "firebird" (Firbird
and InterBase), "ingres", "msql", "Oracle", "sqlite", "sqlite3",
"freetds" (for Microsoft SQL and Sybase) and "pgsql" (suggest to use
ompgsql instead).</li>
-<li><b>driverdirectory</b><br>
-Path to the libdbi drivers. Usually,
-you do not need to set it. If you installed libdbi-drivers at a
-non-standard location, you may need to specify the directory here. If
-you are unsure, do <b>not</b> use this configuration directive.
-Usually, everything works just fine.</li>
</ul>
<p><b>Legacy (pre-v6) Configuration Directives</b>:</p>
+<p>It is strongly recommended NOT to use legacy format.
<ul>
-<li><b>$ActionLibdbiDriverDirectory /path/to/dbd/drivers</b>
+<li><i>$ActionLibdbiDriverDirectory /path/to/dbd/drivers</i>
- like the driverdirectory action parameter.
-<li><strong>$ActionLibdbiDriver drivername</strong><br> - like the drivername action parameter.
-<li><span style="font-weight: bold;">$ActionLibdbiHost hostname</span> - like the server action parameter
-The host to connect to.</li>
-<li><b>$ActionLibdbiUserName user</b> - like the uid action parameter
-<li><b>$ActionlibdbiPassword</b> - like the pwd action parameter
-<li><b>$ActionlibdbiDBName db</b> - like the db action parameter
-<li><b>selector line: :omlibdbi:<i>;template</i></b><br>
+<li><i>$ActionLibdbiDriver drivername</i> - like the drivername action parameter
+<li><i>$ActionLibdbiHost hostname</i> - like the server action parameter
+<li><i>$ActionLibdbiUserName user</i> - like the uid action parameter
+<li><i>$ActionlibdbiPassword</i> - like the pwd action parameter
+<li><i>$ActionlibdbiDBName db</i> - like the db action parameter
+<li><i>selector line: :omlibdbi:<code>;template</code></i><br>
executes the recently configured omlibdbi action. The ;template part is
optional. If no template is provided, a default template is used (which
is currently optimized for MySQL - sorry, folks...)</li>
@@ -114,14 +124,14 @@ database "syslog_db" on mysqlsever.example.com. The server is MySQL and
being accessed under the account of "user" with password "pwd" (if you
have empty passwords, just remove the $ActionLibdbiPassword line).<br>
</p>
-<textarea rows="5" cols="60">$ModLoad omlibdbi
+<textarea rows="5" cols="60">module(load="omlibdbi")
*.* action(type="omlibdbi" driver="mysql"
server="mysqlserver.example.com" db="syslog_db"
uid="user" pwd="pwd"
</textarea>
-<p><b>Sample:</b></p>
+<p><b>Legacy Sample:</b></p>
<p>The same as above, but in legacy config format (pre rsyslog-v6):
-<textarea rows="10" cols="60">$ModLoad omlibdbi
+<textarea rows="8" cols="60">$ModLoad omlibdbi
$ActionLibdbiDriver mysql
$ActionLibdbiHost mysqlserver.example.com
$ActionLibdbiUserName user
diff --git a/doc/v7compatibility.html b/doc/v7compatibility.html
index 692a4fe1..932e2076 100644
--- a/doc/v7compatibility.html
+++ b/doc/v7compatibility.html
@@ -42,6 +42,43 @@ They tell that the construct is deprecated and which statement is to be used
as replacement. This does <b>not</b> affect operations: both modules are still
fully operational and will not be removed in the v7 timeframe.
+<h2>"last message repeated n times" Processing</h2>
+<p>This processing has been optimized and moved to the input side. This results
+in usually far better performance and also de-couples different sources
+from the same
+processing. It is now also integrated in to the more generic rate-limiting
+processing.
+<h3>User-Noticable Changes</h3>
+The code works almost as before, with two exceptions:
+<ul>
+<li>The supression amount can be different, as the new algorithm
+ precisely check's a single source, and while that source is being
+ read. The previous algorithm worked on a set of mixed messages
+ from multiple sources.
+<li>The previous algorithm wrote a "last message repeated n times" message
+ at least every 60 seconds. For performance reasons, we do no longer do
+ this but write this message only when a new message arrives or rsyslog
+ is shut down.
+</ul>
+<p>Note that the new algorithms needs support from input modules. If old
+modules which do not have the necessary support are used, duplicate
+messages will most probably not be detected. Upgrading the module code is
+simple, and all rsyslog-provided plugins support the new method, so this
+should not be a real problem (crafting a solution would result in rather
+complex code - for a case that most probably would never happen).
+<h3>Performance Implications</h3>
+<p>In general, the new method enables far faster output procesing. However, it
+needs to be noted that the "last message repeated n" processing needs parsed
+messages in order to detect duplicated. Consequently, if it is enabled the
+parser step cannot be deferred to the main queue processing thread and
+thus must be done during input processing. The changes workload distribution
+and may have (good or bad) effect on the overall performance. If you have
+a very high performance installation, it is suggested to check the performance
+profile before deploying the new version. Note: for high-performance
+environments it is highly recommended NOT to use "last message repeated n times"
+processing but rather the other (more efficient) rate-limiting methods. These
+also do NOT require the parsing step to be done during input processing.
+
<p><font size="2">This documentation is part of the
<a href="http://www.rsyslog.com/">rsyslog</a> project.<br>
Copyright &copy; 2011-2012 by <a href="http://www.gerhards.net/rainer">Rainer Gerhards</a> and
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 09742537..15948215 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -53,6 +53,7 @@
#include "srUtils.h"
#include "msg.h"
#include "datetime.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
MODULE_TYPE_INPUT
@@ -200,7 +201,7 @@ finalize_it:
/* actually submit a message to the rsyslog core
*/
static rsRetVal
-doInjectMsg(int iNum)
+doInjectMsg(int iNum, ratelimit_t *ratelimiter)
{
uchar szMsg[1024];
msg_t *pMsg;
@@ -220,7 +221,7 @@ doInjectMsg(int iNum)
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
MsgSetRcvFrom(pMsg, pRcvDummy);
CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg));
finalize_it:
RETiRet;
@@ -238,6 +239,7 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
int iFrom;
int nMsgs;
int i;
+ ratelimit_t *ratelimit;
DEFiRet;
/* we do not check errors here! */
@@ -245,13 +247,15 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
iFrom = atoi((char*)wordBuf);
getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE);
nMsgs = atoi((char*)wordBuf);
+ ratelimitNew(&ratelimit, "imdiag", "injectmsg");
for(i = 0 ; i < nMsgs ; ++i) {
- doInjectMsg(i + iFrom);
+ doInjectMsg(i + iFrom, ratelimit);
}
CHKiRet(sendResponse(pSess, "%d messages injected\n", nMsgs));
DBGPRINTF("imdiag: %d messages injected\n", nMsgs);
+ ratelimitDestruct(ratelimit);
finalize_it:
RETiRet;
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 453b6b05..d50f917e 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -48,6 +48,7 @@
#include "prop.h"
#include "stringbuf.h"
#include "ruleset.h"
+#include "ratelimit.h"
MODULE_TYPE_INPUT /* must be present for input modules, do not remove */
MODULE_TYPE_NOKEEP
@@ -82,6 +83,7 @@ typedef struct fileInfo_s {
strm_t *pStrm; /* its stream (NULL if not assigned) */
int readMode; /* which mode to use in ReadMulteLine call? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ ratelimit_t *ratelimiter;
multi_submit_t multiSub;
} fileInfo_t;
@@ -189,9 +191,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
MsgSetRuleset(pMsg, pInfo->pRuleset);
- pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg;
- if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem)
- CHKiRet(multiSubmitMsg(&pInfo->multiSub));
+ ratelimitAddMsg(pInfo->ratelimiter, &pInfo->multiSub, pMsg);
finalize_it:
RETiRet;
}
@@ -304,18 +304,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
finalize_it:
- if(pThis->multiSub.nElem > 0) {
- /* submit everything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&pThis->multiSub));
- }
- ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */
- /* Note: the problem above is that pthread:cleanup_pop() is a macro which
- * evaluates to something like "} while(0);". So the code would become
- * "finalize_it: }", that is a label without a statement. The C standard does
- * not permit this. So we add an empty statement "finalize_it: ; }" and
- * everybody is happy. Note that without the ;, an error is reported only
- * on some platforms/compiler versions. -- rgerhards, 2008-08-15
- */
+ multiSubmitFlush(&pThis->multiSub);
pthread_cleanup_pop(0);
if(pCStr != NULL) {
@@ -423,6 +412,7 @@ addListner(instanceConf_t *inst)
pThis->lenTag = ustrlen(pThis->pszTag);
pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile);
+ CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", (char*)inst->pszFileName));
CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*)));
pThis->multiSub.maxElem = inst->nMultiSub;
pThis->multiSub.nElem = 0;
@@ -773,6 +763,8 @@ CODESTARTafterRun
persistStrmState(&files[i]);
strm.Destruct(&(files[i].pStrm));
}
+ ratelimitDestruct(files[i].ratelimiter);
+ free(files[i].multiSub.ppMsgs);
free(files[i].pszFileName);
free(files[i].pszTag);
free(files[i].pszStateFile);
diff --git a/plugins/imklog/bsd.c b/plugins/imklog/bsd.c
index d4f9f773..ad194b58 100644
--- a/plugins/imklog/bsd.c
+++ b/plugins/imklog/bsd.c
@@ -58,9 +58,6 @@ static int fklog = -1; /* kernel log fd */
#ifdef OS_LINUX
/* submit a message to imklog Syslog() API. In this function, we check if
* a kernel timestamp is present and, if so, extract and strip it.
- * Note: this is an extra processing step. We should revisit the whole
- * idea in v6 and remove all that old stuff that we do not longer need
- * (like symbol resolution). <-- TODO
* Note that this is heavily Linux specific and thus is not compiled or
* used for BSD.
* Special thanks to Lennart Poettering for suggesting on how to convert
@@ -73,7 +70,7 @@ static int fklog = -1; /* kernel log fd */
* rgerhards, 2011-06-24
*/
static void
-submitSyslog(int pri, uchar *buf)
+submitSyslog(modConfData_t *pModConf, int pri, uchar *buf)
{
long secs;
long nsecs;
@@ -119,8 +116,10 @@ submitSyslog(int pri, uchar *buf)
/* we have a timestamp */
DBGPRINTF("kernel timestamp is %ld %ld\n", secs, nsecs);
- bufsize= strlen((char*)buf);
- memmove(buf+3, buf+i, bufsize - i + 1);
+ if(!pModConf->bKeepKernelStamp) {
+ bufsize= strlen((char*)buf);
+ memmove(buf+3, buf+i, bufsize - i + 1);
+ }
clock_gettime(CLOCK_MONOTONIC, &monotonic);
clock_gettime(CLOCK_REALTIME, &realtime);
@@ -146,7 +145,7 @@ done:
}
#else /* now comes the BSD "code" (just a shim) */
static void
-submitSyslog(int pri, uchar *buf)
+submitSyslog(modConfData_t *pModConf, int pri, uchar *buf)
{
Syslog(pri, buf, NULL);
}
@@ -196,7 +195,7 @@ finalize_it:
/* Read kernel log while data are available, split into lines.
*/
static void
-readklog(void)
+readklog(modConfData_t *pModConf)
{
char *p, *q;
int len, i;
@@ -238,18 +237,18 @@ readklog(void)
for (p = (char*)pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) {
*q = '\0';
- submitSyslog(LOG_INFO, (uchar*) p);
+ submitSyslog(pModConf, LOG_INFO, (uchar*) p);
}
len = strlen(p);
if (len >= iMaxLine - 1) {
- submitSyslog(LOG_INFO, (uchar*)p);
+ submitSyslog(pModConf, LOG_INFO, (uchar*)p);
len = 0;
}
if(len > 0)
memmove(pRcv, p, len + 1);
}
if (len > 0)
- submitSyslog(LOG_INFO, pRcv);
+ submitSyslog(pModConf, LOG_INFO, pRcv);
if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1)
free(pRcv);
@@ -278,10 +277,10 @@ rsRetVal klogAfterRun(modConfData_t *pModConf)
* "message pull" mechanism.
* rgerhards, 2008-04-09
*/
-rsRetVal klogLogKMsg(modConfData_t __attribute__((unused)) *pModConf)
+rsRetVal klogLogKMsg(modConfData_t *pModConf)
{
DEFiRet;
- readklog();
+ readklog(pModConf);
RETiRet;
}
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index 93323707..6eed33fe 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -91,6 +91,7 @@ static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config para
static struct cnfparamdescr modpdescr[] = {
{ "logpath", eCmdHdlrGetWord, 0 },
{ "permitnonkernelfacility", eCmdHdlrBinary, 0 },
+ { "keepkerneltimestamp", eCmdHdlrBinary, 0 },
{ "consoleloglevel", eCmdHdlrInt, 0 },
{ "internalmsgfacility", eCmdHdlrFacility, 0 }
};
@@ -100,11 +101,8 @@ static struct cnfparamblk modpblk =
modpdescr
};
-
-
static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */
-static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */
-
+static prop_t *pLocalHostIP = NULL;
static inline void
initConfigSettings(void)
@@ -147,7 +145,8 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *
MsgSetTAG(pMsg, pszTag, ustrlen(pszTag));
pMsg->iFacility = iFacility;
pMsg->iSeverity = iSeverity;
- CHKiRet(submitMsg(pMsg));
+ /* note: we do NOT use rate-limiting, as the kernel itself does rate-limiting */
+ CHKiRet(submitMsg2(pMsg));
finalize_it:
RETiRet;
@@ -289,6 +288,7 @@ CODESTARTbeginCnfLoad
pModConf->pszPath = NULL;
pModConf->bPermitNonKernel = 0;
pModConf->console_log_level = -1;
+ pModConf->bKeepKernelStamp = 0;
pModConf->iFacilIntMsg = klogFacilIntMsg();
loadModConf->configSetViaV2Method = 0;
bLegacyCnfModGlobalsPermitted = 1;
@@ -322,6 +322,8 @@ CODESTARTsetModCnf
loadModConf->bPermitNonKernel = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "consoleloglevel")) {
loadModConf->console_log_level= (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "keepkerneltimestamp")) {
+ loadModConf->bKeepKernelStamp = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "internalmsgfacility")) {
loadModConf->iFacilIntMsg = (int) pvals[i].val.d.n;
} else {
@@ -347,6 +349,7 @@ CODESTARTendCnfLoad
loadModConf->bPermitNonKernel = cs.bPermitNonKernel;
loadModConf->iFacilIntMsg = cs.iFacilIntMsg;
loadModConf->console_log_level = cs.console_log_level;
+ loadModConf->bKeepKernelStamp = 0;
if((cs.pszPath == NULL) || (cs.pszPath[0] == '\0')) {
loadModConf->pszPath = NULL;
if(cs.pszPath != NULL)
diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h
index acfb50ab..6cd97c37 100644
--- a/plugins/imklog/imklog.h
+++ b/plugins/imklog/imklog.h
@@ -36,6 +36,7 @@ struct modConfData_s {
uchar *pszPath;
int console_log_level;
sbool bPermitNonKernel;
+ sbool bKeepKernelStamp; /* keep kernel timestamp instead of interpreting it */
sbool configSetViaV2Method;
};
diff --git a/plugins/imkmsg/imkmsg.c b/plugins/imkmsg/imkmsg.c
index 2a97f82d..d1a83879 100644
--- a/plugins/imkmsg/imkmsg.c
+++ b/plugins/imkmsg/imkmsg.c
@@ -113,7 +113,7 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *
pMsg->iFacility = iFacility;
pMsg->iSeverity = iSeverity;
pMsg->json = json;
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg(pMsg, NULL));
finalize_it:
RETiRet;
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c
index 62599969..9bd11556 100644
--- a/plugins/impstats/impstats.c
+++ b/plugins/impstats/impstats.c
@@ -138,7 +138,8 @@ doSubmitMsg(uchar *line)
pMsg->iSeverity = runModConf->iSeverity;
pMsg->msgFlags = 0;
- submitMsg(pMsg);
+ /* we do not use rate-limiting, as the stats message always need to be emitted */
+ submitMsg2(pMsg);
finalize_it:
RETiRet;
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 8150fc33..0475e219 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -67,6 +67,7 @@
#include "ruleset.h"
#include "msg.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */
@@ -121,6 +122,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
};
@@ -158,6 +161,8 @@ static struct cnfparamdescr inppdescr[] = {
{ "keepalive.time", eCmdHdlrInt, 0 },
{ "keepalive.interval", eCmdHdlrInt, 0 },
{ "addtlframedelimiter", eCmdHdlrInt, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -195,6 +200,7 @@ struct ptcpsrv_s {
sbool bKeepAlive; /* support keep-alive packets */
sbool bEmitMsgOnClose;
sbool bSuppOctetFram;
+ ratelimit_t *ratelimiter;
};
/* the ptcp session object. Describes a single active session.
@@ -295,6 +301,7 @@ destructSess(ptcpsess_t *pSess)
static void
destructSrv(ptcpsrv_t *pSrv)
{
+ ratelimitDestruct(pSrv->ratelimiter);
prop.Destruct(&pSrv->pInputName);
pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->pszInputName);
@@ -679,14 +686,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult
MsgSetRuleset(pMsg, pSrv->pRuleset);
STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
- if(pMultiSub == NULL) {
- CHKiRet(submitMsg(pMsg));
- } else {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg(pMultiSub));
- }
-
+ ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg);
finalize_it:
/* reset status variables */
@@ -805,12 +805,11 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
* we have just received a bunch of data! -- rgerhards, 2009-06-16
* EXTRACT from tcps_sess.c
*/
-#define NUM_MULTISUB 1024
static rsRetVal
DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
{
multi_submit_t multiSub;
- msg_t *pMsgs[NUM_MULTISUB];
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
struct syslogTime stTime;
time_t ttGenTime;
char *pEnd;
@@ -821,7 +820,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
datetime.getCurrTime(&stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
- multiSub.maxElem = NUM_MULTISUB;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
/* We now copy the message to the session buffer. */
@@ -831,15 +830,11 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
- if(multiSub.nElem > 0) {
- /* submit anything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&multiSub));
- }
+ iRet = multiSubmitFlush(&multiSub);
finalize_it:
RETiRet;
}
-#undef NUM_MULTISUB
/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
@@ -1051,6 +1046,8 @@ createInstance(instanceConf_t **pinst)
inst->bEmitMsgOnClose = 0;
inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
inst->pBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -1130,6 +1127,9 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort));
+ ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst);
+ ratelimitSetThreadSafe(pSrv->ratelimiter);
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
if(inst->pszBindAddr == NULL)
@@ -1458,6 +1458,10 @@ CODESTARTnewInpInst
inst->iAddtlFrameDelim = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) {
inst->bEmitMsgOnClose = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imptcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
diff --git a/plugins/imsolaris/imsolaris.c b/plugins/imsolaris/imsolaris.c
index a220e72a..1e7d9b0f 100644
--- a/plugins/imsolaris/imsolaris.c
+++ b/plugins/imsolaris/imsolaris.c
@@ -212,7 +212,7 @@ readLog(int fd, uchar *pRcv, int iMaxLine)
pMsg->iFacility = LOG_FAC(hdr.pri);
pMsg->iSeverity = LOG_PRI(hdr.pri);
pMsg->msgFlags = NEEDS_PARSING | NO_PRI_IN_RAW;
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg(pMsg, NULL));
}
finalize_it:
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index eaf9a213..8d71d5f2 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -105,6 +105,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
+ int ratelimitInterval;
+ int ratelimitBurst;
int bSuppOctetFram;
struct instanceConf_s *next;
};
@@ -155,7 +157,9 @@ static struct cnfparamdescr inppdescr[] = {
{ "port", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
{ "name", eCmdHdlrString, 0 },
{ "ruleset", eCmdHdlrString, 0 },
- { "supportOctetCountedFraming", eCmdHdlrBinary, 0 }
+ { "supportOctetCountedFraming", eCmdHdlrBinary, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -251,6 +255,8 @@ createInstance(instanceConf_t **pinst)
inst->pszBindRuleset = NULL;
inst->pszInputName = NULL;
inst->bSuppOctetFram = 1;
+ inst->ratelimitInterval = 0;
+ inst->ratelimitBurst = 10000;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -334,6 +340,7 @@ addListner(modConfData_t *modConf, instanceConf_t *inst)
CHKiRet(tcpsrv.SetRuleset(pOurTcpsrv, inst->pBindRuleset));
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, inst->pszInputName == NULL ?
UCHAR_CONSTANT("imtcp") : inst->pszInputName));
+ CHKiRet(tcpsrv.SetLinuxLikeRatelimiters(pOurTcpsrv, inst->ratelimitInterval, inst->ratelimitBurst));
tcpsrv.configureTCPListen(pOurTcpsrv, inst->pszBindPort, inst->bSuppOctetFram);
finalize_it:
@@ -376,6 +383,10 @@ CODESTARTnewInpInst
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "supportOctetCountedFraming")) {
inst->bSuppOctetFram = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imtcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -580,7 +591,9 @@ ENDwillRun
BEGINafterRun
CODESTARTafterRun
- /* do cleanup here */
+ if(pOurTcpsrv != NULL)
+ iRet = tcpsrv.Destruct(&pOurTcpsrv);
+
net.clearAllowedSenders(UCHAR_CONSTANT("TCP"));
ENDafterRun
@@ -594,9 +607,6 @@ ENDisCompatibleWithFeature
BEGINmodExit
CODESTARTmodExit
- if(pOurTcpsrv != NULL)
- iRet = tcpsrv.Destruct(&pOurTcpsrv);
-
if(pPermPeersRoot != NULL) {
net.DestructPermittedPeers(&pPermPeersRoot);
}
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 782d7bee..9b6409c1 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -4,8 +4,6 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
- *
* Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
@@ -53,6 +51,7 @@
#include "prop.h"
#include "ruleset.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -77,6 +76,7 @@ static struct lstn_s {
int sock; /* socket */
ruleset_t *pRuleset; /* bound ruleset */
statsobj_t *stats; /* listener stats */
+ ratelimit_t *ratelimiter;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
} *lcnfRoot = NULL, *lcnfLast = NULL;
@@ -109,6 +109,8 @@ struct instanceConf_s {
uchar *pszBindPort; /* Port to bind socket to */
uchar *pszBindRuleset; /* name of ruleset to bind to */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
};
@@ -140,7 +142,9 @@ static struct cnfparamblk modpblk =
static struct cnfparamdescr inppdescr[] = {
{ "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
{ "address", eCmdHdlrString, 0 },
- { "ruleset", eCmdHdlrString, 0 }
+ { "ruleset", eCmdHdlrString, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -165,6 +169,8 @@ createInstance(instanceConf_t **pinst)
inst->pszBindPort = NULL;
inst->pszBindAddr = NULL;
inst->pszBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -223,7 +229,7 @@ addListner(instanceConf_t *inst)
struct lstn_s *newlcnfinfo;
uchar *bindName;
uchar *port;
- uchar statname[64];
+ uchar dispname[64];
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -248,11 +254,14 @@ addListner(instanceConf_t *inst)
newlcnfinfo->next = NULL;
newlcnfinfo->sock = newSocks[iSrc];
newlcnfinfo->pRuleset = inst->pBindRuleset;
+ snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port);
+ dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL));
+ ratelimitSetLinuxLike(newlcnfinfo->ratelimiter, inst->ratelimitInterval,
+ inst->ratelimitBurst);
/* support statistics gathering */
CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
- snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
- statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
- CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname));
STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
@@ -304,7 +313,6 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta
static inline rsRetVal
processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
- DEFiRet;
int iNbrTimeUsed;
time_t ttGenTime;
struct syslogTime stTime;
@@ -314,9 +322,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
msg_t *pMsg;
prop_t *propFromHost = NULL;
prop_t *propFromHostIP = NULL;
+ multi_submit_t multiSub;
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
char errStr[1024];
+ DEFiRet;
assert(pThrd != NULL);
+ multiSub.ppMsgs = pMsgs;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
+ multiSub.nElem = 0;
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a bad receive, done below in the body */
if(pThrd->bShallStop == RSTRUE)
@@ -383,12 +397,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
if(*pbIsPermitted == 2)
pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg));
STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit);
}
}
+
finalize_it:
+ multiSubmitFlush(&multiSub);
+
if(propFromHost != NULL)
prop.Destruct(&propFromHost);
if(propFromHostIP != NULL)
@@ -682,6 +699,10 @@ createListner(es_str_t *port, struct cnfparamvals *pvals)
inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "ruleset")) {
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imudp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -884,6 +905,7 @@ CODESTARTafterRun
net.clearAllowedSenders((uchar*)"UDP");
for(lstn = lcnfRoot ; lstn != NULL ; ) {
statsobj.Destruct(&(lstn->stats));
+ ratelimitDestruct(lstn->ratelimiter);
close(lstn->sock);
lstnDel = lstn;
lstn = lstn->next;
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index d5e4bb31..1409c24a 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -55,6 +55,7 @@
#include "statsobj.h"
#include "datetime.h"
#include "hashtable.h"
+#include "ratelimit.h"
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
@@ -105,15 +106,6 @@ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit)
STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters)
-struct rs_ratelimit_state {
- unsigned short interval;
- unsigned short burst;
- unsigned done;
- unsigned missed;
- time_t begin;
-};
-typedef struct rs_ratelimit_state rs_ratelimit_state_t;
-
/* a very simple "hash function" for process IDs - we simply use the
* pid itself: it is quite expected that all pids may log some time, but
@@ -271,74 +263,9 @@ static struct cnfparamblk inppblk =
/* we do not use this, because we do not bind to a ruleset so far
* enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */
-static void
-initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst)
-{
- rs->interval = interval;
- rs->burst = burst;
- rs->done = 0;
- rs->missed = 0;
- rs->begin = 0;
-}
-
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
-/* ratelimiting support, modelled after the linux kernel
- * returns 1 if message is within rate limit and shall be
- * processed, 0 otherwise.
- * This implementation is NOT THREAD-SAFE and must not
- * be called concurrently.
- */
-static inline int
-withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid)
-{
- int ret;
- uchar msgbuf[1024];
-
- if(rs->interval == 0) {
- ret = 1;
- goto finalize_it;
- }
-
- assert(rs->burst != 0);
-
- if(rs->begin == 0)
- rs->begin = tt;
-
- /* resume if we go out of out time window */
- if(tt > rs->begin + rs->interval) {
- if(rs->missed) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock lost %u messages from pid %lu due to rate-limiting",
- rs->missed, (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- rs->missed = 0;
- }
- rs->begin = 0;
- rs->done = 0;
- }
-
- /* do actual limit check */
- if(rs->burst > rs->done) {
- rs->done++;
- ret = 1;
- } else {
- if(rs->missed == 0) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock begins to drop messages from pid %lu due to rate-limiting",
- (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- }
- rs->missed++;
- ret = 0;
- }
-
-finalize_it:
- return ret;
-}
-
-
/* create input instance, set default paramters, and
* add it to the list of instances.
*/
@@ -445,7 +372,8 @@ addListner(instanceConf_t *inst)
CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
}
if(inst->ratelimitInterval > 0) {
- if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn,
+ (void(*)(void*))ratelimitDestruct)) == NULL) {
/* in this case, we simply turn off rate-limiting */
DBGPRINTF("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
@@ -604,19 +532,22 @@ finalize_it:
* listener (the latter being a performance enhancement).
*/
static inline rsRetVal
-findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
+findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl)
{
- rs_ratelimit_state_t *rl;
+ ratelimit_t *rl;
int r;
pid_t *keybuf;
+ char pidbuf[256];
DEFiRet;
if(cred == NULL)
FINALIZE;
+#if 0 // TODO: check deactivated?
if(pLstn->ratelimitInterval == 0) {
*prl = NULL;
FINALIZE;
}
+#endif
rl = hashtable_search(pLstn->ht, &cred->pid);
if(rl == NULL) {
@@ -624,10 +555,13 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n",
(unsigned long) cred->pid);
STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters);
- CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t)));
+ snprintf(pidbuf, sizeof(pidbuf), "pid %lu",
+ (unsigned long) cred->pid);
+ pidbuf[sizeof(pidbuf)-1] = '\0'; /* to be on safe side */
+ CHKiRet(ratelimitNew(&rl, "imuxsock", pidbuf));
+ ratelimitSetLinuxLike(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
CHKmalloc(keybuf = malloc(sizeof(pid_t)));
*keybuf = cred->pid;
- initRatelimitState(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
r = hashtable_insert(pLstn->ht, keybuf, rl);
if(r == 0)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -762,28 +696,6 @@ copyescaped(uchar *dstbuf, uchar *inbuf, int inlen)
}
-#if 0
-/* Creates new field to be added to event
- * used for SystemLogParseTrusted parsing
- */
-struct ee_field *
-createNewField(char *fieldname, char *value, int lenValue) {
- es_str_t *newStr;
- struct ee_value *newVal;
- struct ee_field *newField;
-
- newStr = es_newStrFromBuf(value, (es_size_t) lenValue);
-
- newVal = ee_newValue(ctxee);
- ee_setStrValue(newVal, newStr);
-
- newField = ee_newFieldFromNV(ctxee, fieldname, newVal);
-
- return newField;
-}
-#endif
-
-
/* submit received message to the queue engine
* We now parse the message according to expected format so that we
* can also mangle it if necessary.
@@ -802,8 +714,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
uchar bufParseTAG[CONF_TAG_MAXSIZE];
struct syslogTime st;
time_t tt;
- rs_ratelimit_state_t *ratelimiter = NULL;
int lenProp;
+ ratelimit_t *ratelimiter = NULL;
uchar propBuf[1024];
uchar msgbuf[8192];
uchar *pmsgbuf;
@@ -842,10 +754,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
tt = ts->tv_sec;
}
+#if 0 // TODO: think about stats counters (or wait for request...?)
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
FINALIZE;
}
+#endif
/* created trusted properties */
if(cred != NULL && pLstn->bAnnotate) {
@@ -976,8 +890,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName);
CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
- CHKiRet(submitMsg(pMsg));
-
+ ratelimitAddMsg(ratelimiter, NULL, pMsg);
STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
finalize_it:
RETiRet;
diff --git a/plugins/omlibdbi/omlibdbi.c b/plugins/omlibdbi/omlibdbi.c
index 8f5fa944..d7f3cf41 100644
--- a/plugins/omlibdbi/omlibdbi.c
+++ b/plugins/omlibdbi/omlibdbi.c
@@ -81,15 +81,36 @@ typedef struct configSettings_s {
uchar *dbName; /* database to use */
} configSettings_t;
static configSettings_t cs;
+uchar *pszFileDfltTplName; /* name of the default template to use */
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+ uchar *dbiDrvrDir; /* where do the dbi drivers reside? */
+ uchar *tplName; /* default template */
+};
+
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+
/* tables for interfacing with the v6 config system */
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "template", eCmdHdlrGetWord, 0 },
+ { "driverdirectory", eCmdHdlrGetWord, 0 }
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "server", eCmdHdlrGetWord, 1 },
{ "db", eCmdHdlrGetWord, 1 },
{ "uid", eCmdHdlrGetWord, 1 },
{ "pwd", eCmdHdlrGetWord, 1 },
- { "driverdirectory", eCmdHdlrGetWord, 0 },
{ "driver", eCmdHdlrGetWord, 1 },
{ "template", eCmdHdlrGetWord, 0 }
};
@@ -99,6 +120,20 @@ static struct cnfparamblk actpblk =
actpdescr
};
+/* this function gets the default template. It coordinates action between
+ * old-style and new-style configuration parts.
+ */
+static inline uchar*
+getDfltTpl(void)
+{
+ if(loadModConf != NULL && loadModConf->tplName != NULL)
+ return loadModConf->tplName;
+ else if(pszFileDfltTplName == NULL)
+ return (uchar*)" StdDBFmt";
+ else
+ return pszFileDfltTplName;
+}
+
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
@@ -144,7 +179,6 @@ static void closeConn(instanceData *pData)
BEGINfreeInstance
CODESTARTfreeInstance
closeConn(pData);
- free(pData->dbiDrvrDir);
free(pData->drvrName);
free(pData->host);
free(pData->usrName);
@@ -302,6 +336,79 @@ CODESTARTdoAction
ENDdoAction
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ pModConf->tplName = NULL;
+ bLegacyCnfModGlobalsPermitted = 1;
+ENDbeginCnfLoad
+
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "omlibdbi: error processing "
+ "module config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for omlibdbi:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "template")) {
+ loadModConf->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ if(pszFileDfltTplName != NULL) {
+ errmsg.LogError(0, RS_RET_DUP_PARAM, "omlibdbi: warning: default template "
+ "was already set via legacy directive - may lead to inconsistent "
+ "results.");
+ }
+ } else if(!strcmp(modpblk.descr[i].name, "driverdirectory")) {
+ loadModConf->dbiDrvrDir = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("omlibdbi: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+ bLegacyCnfModGlobalsPermitted = 0;
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ loadModConf = NULL; /* done loading */
+ /* free legacy config vars */
+ free(pszFileDfltTplName);
+ pszFileDfltTplName = NULL;
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ free(pModConf->tplName);
+ free(pModConf->dbiDrvrDir);
+ENDfreeCnf
+
+
+
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -311,6 +418,7 @@ setInstParamDefaults(instanceData *pData)
BEGINnewActInst
struct cnfparamvals *pvals;
+ uchar *tplToUse;
int i;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
@@ -332,28 +440,19 @@ CODESTARTnewActInst
pData->usrName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "pwd")) {
pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "driverdirectory")) {
- pData->dbiDrvrDir = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "driver")) {
pData->drvrName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
- dbgprintf("ommysql: program error, non-handled "
+ dbgprintf("omlibdbi: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
}
}
- if(pData->tplName == NULL) {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*) strdup(" StdDBFmt"),
- OMSR_RQD_TPL_OPT_SQL));
- } else {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0,
- (uchar*) strdup((char*) pData->tplName),
- OMSR_RQD_TPL_OPT_SQL));
- }
+ tplToUse = (pData->tplName == NULL) ? (uchar*)strdup((char*)getDfltTpl()) : pData->tplName;
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_RQD_TPL_OPT_SQL));
CODE_STD_FINALIZERnewActInst
-dbgprintf("XXXX: added param, iRet %d\n", iRet);
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
@@ -380,19 +479,17 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* NULL values are supported because drivers have different needs.
* They will err out on connect. -- rgerhards, 2008-02-15
*/
- if(cs.host != NULL)
- if((pData->host = (uchar*) strdup((char*)cs.host)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ if(cs.host != NULL)
+ CHKmalloc(pData->host = (uchar*) strdup((char*)cs.host));
if(cs.usrName != NULL)
- if((pData->usrName = (uchar*) strdup((char*)cs.usrName)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- if(cs.dbName != NULL)
- if((pData->dbName = (uchar*) strdup((char*)cs.dbName)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- if(cs.pwd != NULL)
- if((pData->pwd = (uchar*) strdup((char*)cs.pwd)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ CHKmalloc(pData->usrName = (uchar*) strdup((char*)cs.usrName));
+ if(cs.dbName != NULL)
+ CHKmalloc(pData->dbName = (uchar*) strdup((char*)cs.dbName));
+ if(cs.pwd != NULL)
+ CHKmalloc(pData->pwd = (uchar*) strdup((char*)cs.pwd));
if(cs.dbiDrvrDir != NULL)
- if((pData->dbiDrvrDir = (uchar*) strdup((char*)cs.dbiDrvrDir)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_RQD_TPL_OPT_SQL, (uchar*) " StdDBFmt"));
-
+ CHKmalloc(loadModConf->dbiDrvrDir = (uchar*) strdup((char*)cs.dbiDrvrDir));
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_RQD_TPL_OPT_SQL, getDfltTpl()));
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -413,6 +510,8 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
@@ -444,7 +543,7 @@ INITLegCnfVars
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbidriverdirectory", 0, eCmdHdlrGetWord, NULL, &cs.dbiDrvrDir, STD_LOADABLE_MODULE_ID));
+ CHKiRet(regCfSysLineHdlr2((uchar *)"actionlibdbidriverdirectory", 0, eCmdHdlrGetWord, NULL, &cs.dbiDrvrDir, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbidriver", 0, eCmdHdlrGetWord, NULL, &cs.drvrName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbihost", 0, eCmdHdlrGetWord, NULL, &cs.host, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbiusername", 0, eCmdHdlrGetWord, NULL, &cs.usrName, STD_LOADABLE_MODULE_ID));
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c
index 2c65f275..57ef20b2 100644
--- a/plugins/ommongodb/ommongodb.c
+++ b/plugins/ommongodb/ommongodb.c
@@ -233,11 +233,11 @@ getDefaultBSON(msg_t *pMsg)
gint64 ts_gen, ts_rcv; /* timestamps: generated, received */
int secfrac;
- procid = MsgGetProp(pMsg, NULL, PROP_PROGRAMNAME, NULL, &procid_len, &procid_free);
- tag = MsgGetProp(pMsg, NULL, PROP_SYSLOGTAG, NULL, &tag_len, &tag_free);
- pid = MsgGetProp(pMsg, NULL, PROP_PROCID, NULL, &pid_len, &pid_free);
- sys = MsgGetProp(pMsg, NULL, PROP_HOSTNAME, NULL, &sys_len, &sys_free);
- msg = MsgGetProp(pMsg, NULL, PROP_MSG, NULL, &msg_len, &msg_free);
+ procid = MsgGetProp(pMsg, NULL, PROP_PROGRAMNAME, NULL, &procid_len, &procid_free, NULL);
+ tag = MsgGetProp(pMsg, NULL, PROP_SYSLOGTAG, NULL, &tag_len, &tag_free, NULL);
+ pid = MsgGetProp(pMsg, NULL, PROP_PROCID, NULL, &pid_len, &pid_free, NULL);
+ sys = MsgGetProp(pMsg, NULL, PROP_HOSTNAME, NULL, &sys_len, &sys_free, NULL);
+ msg = MsgGetProp(pMsg, NULL, PROP_MSG, NULL, &msg_len, &msg_free, NULL);
// TODO: move to datetime? Refactor in any case! rgerhards, 2012-03-30
ts_gen = (gint64) datetime.syslogTime2time_t(&pMsg->tTIMESTAMP) * 1000; /* ms! */
diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c
index 6c770c94..fd002265 100644
--- a/plugins/omruleset/omruleset.c
+++ b/plugins/omruleset/omruleset.c
@@ -120,7 +120,11 @@ CODESTARTdoAction
(char*) pData->pszRulesetName, pData->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
MsgSetRuleset(pMsg, pData->pRuleset);
- submitMsg(pMsg);
+ /* Note: we intentionally use submitMsg2() here, as we process messages
+ * that were already run through the rate-limiter. So it is (at least)
+ * questionable if they were rate-limited again.
+ */
+ submitMsg2(pMsg);
finalize_it:
ENDdoAction
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 7af26d2b..7abbc258 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -65,6 +65,8 @@ librsyslog_la_SOURCES = \
ruleset.h \
prop.c \
prop.h \
+ ratelimit.c \
+ ratelimit.h \
cfsysline.c \
cfsysline.h \
sd-daemon.c \
diff --git a/runtime/debug.c b/runtime/debug.c
index edc4a255..307a8bb8 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -927,12 +927,12 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
pszObjName = obj.GetName(pObj);
}
-// pthread_mutex_lock(&mutdbgprint);
-// pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
+ pthread_mutex_lock(&mutdbgprint);
+ pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
do_dbgprint(pszObjName, pszMsg, lenMsg);
-// pthread_cleanup_pop(1);
+ pthread_cleanup_pop(1);
}
#pragma GCC diagnostic warning "-Wempty-body"
diff --git a/runtime/msg.c b/runtime/msg.c
index d874178b..b0b93f98 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -2373,40 +2373,38 @@ char *textpri(char *pRes, size_t pResLen, int pri)
*/
typedef enum ENOWType { NOW_NOW, NOW_YEAR, NOW_MONTH, NOW_DAY, NOW_HOUR, NOW_HHOUR, NOW_QHOUR, NOW_MINUTE } eNOWType;
#define tmpBUFSIZE 16 /* size of formatting buffer */
-static uchar *getNOW(eNOWType eNow)
+static uchar *getNOW(eNOWType eNow, struct syslogTime *t)
{
uchar *pBuf;
- struct syslogTime t;
if((pBuf = (uchar*) MALLOC(sizeof(uchar) * tmpBUFSIZE)) == NULL) {
return NULL;
}
- datetime.getCurrTime(&t, NULL);
switch(eNow) {
case NOW_NOW:
- snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d-%2.2d-%2.2d", t.year, t.month, t.day);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d-%2.2d-%2.2d", t->year, t->month, t->day);
break;
case NOW_YEAR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d", t.year);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d", t->year);
break;
case NOW_MONTH:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.month);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->month);
break;
case NOW_DAY:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.day);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->day);
break;
case NOW_HOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.hour);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->hour);
break;
case NOW_HHOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute / 30);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute / 30);
break;
case NOW_QHOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute / 15);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute / 15);
break;
case NOW_MINUTE:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute);
break;
}
@@ -2673,7 +2671,7 @@ finalize_it:
* Parameter "bMustBeFreed" is set by this function. It tells the
* caller whether or not the string returned must be freed by the
* caller itself. It is is 0, the caller MUST NOT free it. If it is
- * 1, the caller MUST free 1. Handling this wrongly leads to either
+ * 1, the caller MUST free it. Handling this wrongly leads to either
* a memory leak of a program abort (do to double-frees or frees on
* the constant memory pool). So be careful to do it right.
* rgerhards 2004-11-23
@@ -2690,7 +2688,7 @@ finalize_it:
return(UCHAR_CONSTANT("**OUT OF MEMORY**"));}
uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propid, es_str_t *propName, rs_size_t *pPropLen,
- unsigned short *pbMustBeFreed)
+ unsigned short *pbMustBeFreed, struct syslogTime *ttNow)
{
uchar *pRes; /* result pointer */
rs_size_t bufLen = -1; /* length of string or -1, if not known */
@@ -2803,52 +2801,68 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
pRes = (uchar*)getParseSuccess(pMsg);
break;
case PROP_SYS_NOW:
- if((pRes = getNOW(NOW_NOW)) == NULL) {
+ if((pRes = getNOW(NOW_NOW, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 10;
+ }
break;
case PROP_SYS_YEAR:
- if((pRes = getNOW(NOW_YEAR)) == NULL) {
+ if((pRes = getNOW(NOW_YEAR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 4;
+ }
break;
case PROP_SYS_MONTH:
- if((pRes = getNOW(NOW_MONTH)) == NULL) {
+ if((pRes = getNOW(NOW_MONTH, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_DAY:
- if((pRes = getNOW(NOW_DAY)) == NULL) {
+ if((pRes = getNOW(NOW_DAY, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_HOUR:
- if((pRes = getNOW(NOW_HOUR)) == NULL) {
+ if((pRes = getNOW(NOW_HOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_HHOUR:
- if((pRes = getNOW(NOW_HHOUR)) == NULL) {
+ if((pRes = getNOW(NOW_HHOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_QHOUR:
- if((pRes = getNOW(NOW_QHOUR)) == NULL) {
+ if((pRes = getNOW(NOW_QHOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_MINUTE:
- if((pRes = getNOW(NOW_MINUTE)) == NULL) {
+ if((pRes = getNOW(NOW_MINUTE, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_MYHOSTNAME:
pRes = glbl.GetLocalHostName();
@@ -2907,7 +2921,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
/* If we did not receive a template pointer, we are already done... */
- if(pTpe == NULL) {
+ if(pTpe == NULL || !pTpe->bComplexProcessing) {
*pPropLen = (bufLen == -1) ? ustrlen(pRes) : bufLen;
return pRes;
}
@@ -3494,9 +3508,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
jsonField(pTpe, &pRes, pbMustBeFreed, &bufLen);
}
- if(bufLen == -1)
- bufLen = ustrlen(pRes);
- *pPropLen = bufLen;
+ *pPropLen = (bufLen == -1) ? ustrlen(pRes) : bufLen;
ENDfunc
return(pRes);
@@ -3553,7 +3565,7 @@ msgGetMsgVarNew(msg_t *pThis, uchar *name)
/* always call MsgGetProp() without a template specifier */
/* TODO: optimize propNameToID() call -- rgerhards, 2009-06-26 */
propNameStrToID(name, &propid);
- pszProp = (uchar*) MsgGetProp(pThis, NULL, propid, NULL, &propLen, &bMustBeFreed);
+ pszProp = (uchar*) MsgGetProp(pThis, NULL, propid, NULL, &propLen, &bMustBeFreed, NULL);
estr = es_newStrFromCStr((char*)pszProp, propLen);
if(bMustBeFreed)
diff --git a/runtime/msg.h b/runtime/msg.h
index 396e861f..172ae0da 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -173,7 +173,7 @@ void MsgSetRawMsg(msg_t *pMsg, char* pszRawMsg, size_t lenMsg);
rsRetVal MsgReplaceMSG(msg_t *pThis, uchar* pszMSG, int lenMSG);
uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propid, es_str_t *propName,
- rs_size_t *pPropLen, unsigned short *pbMustBeFreed);
+ rs_size_t *pPropLen, unsigned short *pbMustBeFreed, struct syslogTime *ttNow);
char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name);
diff --git a/runtime/queue.c b/runtime/queue.c
index 0cd33701..fbf77108 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -127,7 +127,7 @@ static struct cnfparamblk pblk =
};
/* debug aid */
-static void displayBatchState(batch_t *pBatch)
+static inline void displayBatchState(batch_t *pBatch)
{
int i;
for(i = 0 ; i < pBatch->nElem ; ++i) {
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c
new file mode 100644
index 00000000..4b618fb5
--- /dev/null
+++ b/runtime/ratelimit.c
@@ -0,0 +1,359 @@
+/* ratelimit.c
+ * support for rate-limiting sources, including "last message
+ * repeated n times" processing.
+ *
+ * Copyright 2012 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include "rsyslog.h"
+#include "errmsg.h"
+#include "ratelimit.h"
+#include "datetime.h"
+#include "parser.h"
+#include "unicode-helper.h"
+#include "msg.h"
+#include "rsconf.h"
+#include "dirty.h"
+
+/* definitions for objects we access */
+DEFobjStaticHelpers
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(datetime)
+DEFobjCurrIf(parser)
+
+/* static data */
+
+/* generate a "repeated n times" message */
+static inline msg_t *
+ratelimitGenRepMsg(ratelimit_t *ratelimit)
+{
+ msg_t *repMsg;
+ size_t lenRepMsg;
+ uchar szRepMsg[1024];
+
+ if(ratelimit->nsupp == 1) { /* we simply use the original message! */
+ repMsg = MsgAddRef(ratelimit->pMsg);
+ } else {/* we need to duplicate, original message may still be in use in other
+ * parts of the system! */
+ if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) {
+ DBGPRINTF("Message duplication failed, dropping repeat message.\n");
+ goto done;
+ }
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
+ " message repeated %d times: [%.800s]",
+ ratelimit->nsupp, getMSG(ratelimit->pMsg));
+ MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
+ }
+
+done: return repMsg;
+}
+
+static inline rsRetVal
+doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
+{
+ int bNeedUnlockMutex = 0;
+ rsRetVal localRet;
+ DEFiRet;
+
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+
+ if(ratelimit->bThreadSafe) {
+ pthread_mutex_lock(&ratelimit->mut);
+ bNeedUnlockMutex = 1;
+ }
+
+ if( ratelimit->pMsg != NULL &&
+ getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
+ !ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) &&
+ !strcmp(getHOSTNAME(pMsg), getHOSTNAME(ratelimit->pMsg)) &&
+ !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(ratelimit->pMsg, LOCK_MUTEX)) &&
+ !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(ratelimit->pMsg, LOCK_MUTEX))) {
+ ratelimit->nsupp++;
+ DBGPRINTF("msg repeated %d times\n", ratelimit->nsupp);
+ /* use current message, so we have the new timestamp
+ * (means we need to discard previous one) */
+ msgDestruct(&ratelimit->pMsg);
+ ratelimit->pMsg = pMsg;
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ } else {/* new message, do "repeat processing" & save it */
+ if(ratelimit->pMsg != NULL) {
+ if(ratelimit->nsupp > 0) {
+ *ppRepMsg = ratelimitGenRepMsg(ratelimit);
+ ratelimit->nsupp = 0;
+ }
+ msgDestruct(&ratelimit->pMsg);
+ }
+ ratelimit->pMsg = MsgAddRef(pMsg);
+ }
+
+finalize_it:
+ if(bNeedUnlockMutex)
+ pthread_mutex_unlock(&ratelimit->mut);
+ RETiRet;
+}
+
+
+/* helper: tell how many messages we lost due to linux-like ratelimiting */
+static inline void
+tellLostCnt(ratelimit_t *ratelimit)
+{
+ uchar msgbuf[1024];
+ if(ratelimit->missed) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: %u messages lost due to rate-limiting",
+ ratelimit->name, ratelimit->missed);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ ratelimit->missed = 0;
+ }
+}
+
+/* Linux-like ratelimiting, modelled after the linux kernel
+ * returns 1 if message is within rate limit and shall be
+ * processed, 0 otherwise.
+ * This implementation is NOT THREAD-SAFE and must not
+ * be called concurrently.
+ */
+static inline int
+withinRatelimit(ratelimit_t *ratelimit, time_t tt)
+{
+ int ret;
+ uchar msgbuf[1024];
+
+ if(ratelimit->interval == 0) {
+ ret = 1;
+ goto finalize_it;
+ }
+
+ assert(ratelimit->burst != 0);
+
+ if(ratelimit->begin == 0)
+ ratelimit->begin = tt;
+
+ /* resume if we go out of out time window */
+ if(tt > ratelimit->begin + ratelimit->interval) {
+ tellLostCnt(ratelimit);
+ ratelimit->begin = 0;
+ ratelimit->done = 0;
+ }
+
+ /* do actual limit check */
+ if(ratelimit->burst > ratelimit->done) {
+ ratelimit->done++;
+ ret = 1;
+ } else {
+ if(ratelimit->missed == 0) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: begin to drop messages due to rate-limiting",
+ ratelimit->name);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ }
+ ratelimit->missed++;
+ ret = 0;
+ }
+
+finalize_it:
+ return ret;
+}
+
+
+/* ratelimit a message, that means:
+ * - handle "last message repeated n times" logic
+ * - handle actual (discarding) rate-limiting
+ * This function returns RS_RET_OK, if the caller shall process
+ * the message regularly and RS_RET_DISCARD if the caller must
+ * discard the message. The caller should also discard the message
+ * if another return status occurs. This places some burden on the
+ * caller logic, but provides best performance. Demanding this
+ * cooperative mode can enable a faulty caller to thrash up part
+ * of the system, but we accept that risk (a faulty caller can
+ * always do all sorts of evil, so...)
+ * If *ppRepMsg != NULL on return, the caller must enqueue that
+ * message before the original message.
+ */
+rsRetVal
+ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
+{
+ DEFiRet;
+
+ *ppRepMsg = NULL;
+ if(ratelimit->interval) {
+ if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) {
+ msgDestruct(&pMsg);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+ if(ratelimit->bReduceRepeatMsgs) {
+ CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg));
+ }
+finalize_it:
+ RETiRet;
+}
+
+/* returns 1, if the ratelimiter performs any checks and 0 otherwise */
+int
+ratelimitChecked(ratelimit_t *ratelimit)
+{
+ return ratelimit->interval || ratelimit->bReduceRepeatMsgs;
+}
+
+
+/* add a message to a ratelimiter/multisubmit structure.
+ * ratelimiting is automatically handled according to the ratelimit
+ * settings.
+ * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration)
+ */
+rsRetVal
+ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg)
+{
+ rsRetVal localRet;
+ msg_t *repMsg;
+ DEFiRet;
+
+ if(pMultiSub == NULL) {
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL)
+ CHKiRet(submitMsg2(repMsg));
+ if(localRet == RS_RET_OK)
+ CHKiRet(submitMsg2(pMsg));
+ } else {
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
+ if(localRet == RS_RET_OK) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* modname must be a static name (usually expected to be the module
+ * name and MUST be present. dynname may be NULL and can be used for
+ * dynamic information, e.g. PID or listener IP, ...
+ * Both values should be kept brief.
+ */
+rsRetVal
+ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname)
+{
+ ratelimit_t *pThis;
+ char namebuf[256];
+ DEFiRet;
+
+ CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t)));
+ if(modname == NULL)
+ modname ="*ERROR:MODULE NAME MISSING*";
+
+ if(dynname == NULL) {
+ pThis->name = strdup(modname);
+ } else {
+ snprintf(namebuf, sizeof(namebuf), "%s[%s]",
+ modname, dynname);
+ namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */
+ pThis->name = strdup(namebuf);
+ }
+ pThis->bReduceRepeatMsgs = loadConf->globals.bReduceRepeatMsgs;
+ *ppThis = pThis;
+finalize_it:
+ RETiRet;
+}
+
+
+/* enable linux-like ratelimiting */
+void
+ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst)
+{
+ ratelimit->interval = interval;
+ ratelimit->burst = burst;
+ ratelimit->done = 0;
+ ratelimit->missed = 0;
+ ratelimit->begin = 0;
+}
+
+
+/* enable thread-safe operations mode. This make sure that
+ * a single ratelimiter can be called from multiple threads. As
+ * this causes some overhead and is not always required, it needs
+ * to be explicitely enabled. This operation cannot be undone
+ * (think: why should one do that???)
+ */
+void
+ratelimitSetThreadSafe(ratelimit_t *ratelimit)
+{
+ ratelimit->bThreadSafe = 1;
+ pthread_mutex_init(&ratelimit->mut, NULL);
+}
+
+void
+ratelimitDestruct(ratelimit_t *ratelimit)
+{
+ msg_t *pMsg;
+ if(ratelimit->pMsg != NULL) {
+ if(ratelimit->nsupp > 0) {
+ pMsg = ratelimitGenRepMsg(ratelimit);
+ if(pMsg != NULL)
+ submitMsg2(pMsg);
+ }
+ msgDestruct(&ratelimit->pMsg);
+ }
+ tellLostCnt(ratelimit);
+ if(ratelimit->bThreadSafe)
+ pthread_mutex_destroy(&ratelimit->mut);
+ free(ratelimit->name);
+ free(ratelimit);
+}
+
+void
+ratelimitModExit(void)
+{
+ objRelease(datetime, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+}
+
+rsRetVal
+ratelimitModInit(void)
+{
+ DEFiRet;
+ CHKiRet(objGetObjInterface(&obj));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+finalize_it:
+ RETiRet;
+}
+
diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h
new file mode 100644
index 00000000..820817bc
--- /dev/null
+++ b/runtime/ratelimit.h
@@ -0,0 +1,51 @@
+/* header for ratelimit.c
+ *
+ * Copyright 2012 Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef INCLUDED_RATELIMIT_H
+#define INCLUDED_RATELIMIT_H
+
+struct ratelimit_s {
+ char *name; /**< rate limiter name, e.g. for user messages */
+ /* support for Linux kernel-type ratelimiting */
+ unsigned short interval;
+ unsigned short burst;
+ unsigned done;
+ unsigned missed;
+ time_t begin;
+ /* support for "last message repeated n times */
+ int bReduceRepeatMsgs; /**< shall we do "last message repeated n times" processing? */
+ unsigned nsupp; /**< nbr of msgs suppressed */
+ msg_t *pMsg;
+ sbool bThreadSafe; /**< do we need to operate in Thread-Safe mode? */
+ pthread_mutex_t mut; /**< mutex if thread-safe operation desired */
+};
+
+/* prototypes */
+rsRetVal ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname);
+void ratelimitSetThreadSafe(ratelimit_t *ratelimit);
+void ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst);
+rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep);
+rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg);
+void ratelimitDestruct(ratelimit_t *pThis);
+int ratelimitChecked(ratelimit_t *ratelimit);
+rsRetVal ratelimitModInit(void);
+void ratelimitModExit(void);
+
+#endif /* #ifndef INCLUDED_RATELIMIT_H */
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index ad588832..cf29c720 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -114,8 +114,8 @@ void cnfDoCfsysline(char *ln);
*/
BEGINobjConstruct(rsconf) /* be sure to specify the object type also in END macro! */
pThis->globals.bDebugPrintTemplateList = 1;
- pThis->globals.bDebugPrintModuleList = 1;
- pThis->globals.bDebugPrintCfSysLineHandlerList = 1;
+ pThis->globals.bDebugPrintModuleList = 0;
+ pThis->globals.bDebugPrintCfSysLineHandlerList = 0;
pThis->globals.bLogStatusMsgs = DFLT_bLogStatusMsgs;
pThis->globals.bErrMsgToStderr = 1;
pThis->globals.umask = -1;
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 07d58d68..c02db53c 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -69,6 +69,7 @@
* approach taken here is considered appropriate.
* rgerhards, 2010-06-24
*/
+#define CONF_NUM_MULTISUB 1024 /* default number of messages per multisub structure */
/* ############################################################# *
* # End Config Settings # *
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index bdeb61b7..d2c21424 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -378,7 +378,8 @@ evalPROPFILT(struct cnfstmt *stmt, msg_t *pMsg)
goto done;
pszPropVal = MsgGetProp(pMsg, NULL, stmt->d.s_propfilt.propID,
- stmt->d.s_propfilt.propName, &propLen, &pbMustBeFreed);
+ stmt->d.s_propfilt.propName, &propLen,
+ &pbMustBeFreed, NULL);
/* Now do the compares (short list currently ;)) */
switch(stmt->d.s_propfilt.operation ) {
diff --git a/runtime/stream.c b/runtime/stream.c
index 742799d2..064cb42a 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -74,11 +74,12 @@ DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
/* forward definitions */
-static rsRetVal strmFlushInternal(strm_t *pThis);
+static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
-static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush);
+static rsRetVal doZipFinish(strm_t *pThis);
static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
@@ -341,7 +342,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
(pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName);
if(pThis->tOperationsMode != STREAMMODE_READ) {
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
if(pThis->bAsyncWrite) {
strmWaitAsyncWriterDone(pThis);
}
@@ -675,6 +676,7 @@ BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro!
pThis->fd = -1;
pThis->fdDir = -1;
pThis->iUngetC = -1;
+ pThis->bVeryReliableZip = 0;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
pThis->tOpenMode = 0600;
@@ -777,6 +779,10 @@ stopWriter(strm_t *pThis)
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(strm)
+ /* we need to stop the ZIP writer */
+ if(pThis->iZipLevel) {
+ doZipFinish(pThis);
+ }
if(pThis->bAsyncWrite)
/* Note: mutex will be unlocked in stopWriter! */
d_pthread_mutex_lock(&pThis->mut);
@@ -919,14 +925,14 @@ finalize_it:
/* write memory buffer to a stream object.
*/
static inline rsRetVal
-doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
DEFiRet;
ASSERT(pThis != NULL);
if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf, bFlush));
} else {
/* write without zipping */
CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
@@ -971,7 +977,7 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
* the background thread. -- rgerhards, 2009-07-07
*/
static rsRetVal
-strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlushZip)
{
DEFiRet;
@@ -990,7 +996,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->bAsyncWrite) {
CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
} else {
- CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf, bFlushZip));
}
@@ -1030,7 +1036,7 @@ asyncWriterThread(void *pPtr)
}
if(bTimedOut && pThis->iBufPtr > 0) {
/* if we timed out, we need to flush pending data */
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
bTimedOut = 0;
continue; /* now we should have data */
}
@@ -1056,7 +1062,7 @@ asyncWriterThread(void *pPtr)
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
- doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state
// TODO: error check????? 2009-07-06
--pThis->iCnt;
@@ -1166,63 +1172,101 @@ finalize_it:
* rgerhards, 2009-06-04
*/
static rsRetVal
-doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
- z_stream zstrm;
int zRet; /* zlib return state */
sbool bzInitDone = RSFALSE;
DEFiRet;
+ unsigned outavail;
assert(pThis != NULL);
assert(pBuf != NULL);
- /* allocate deflate state */
- zstrm.zalloc = Z_NULL;
- zstrm.zfree = Z_NULL;
- zstrm.opaque = Z_NULL;
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- /* see note in file header for the params we use with deflateInit2() */
- zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
- ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ /* see note in file header for the params we use with deflateInit2() */
+ zRet = zlibw.DeflateInit2(&pThis->zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
}
bzInitDone = RSTRUE;
/* now doing the compression */
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- zstrm.avail_in = lenBuf;
+ pThis->zstrm.next_in = (Bytef*) pBuf;
+ pThis->zstrm.avail_in = lenBuf;
/* run deflate() on buffer until everything has been compressed */
do {
- DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
- zstrm.avail_out = pThis->sIOBufSize;
- zstrm.next_out = pThis->pZipBuf;
- zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
- DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
- assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
- if(zstrm.avail_out == pThis->sIOBufSize)
- break; /* this is valid, indicates end of compression --> see zlib howto */
- CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
- } while (zstrm.avail_out == 0);
- assert(zstrm.avail_in == 0); /* all input will be used */
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, bFlush ? Z_SYNC_FLUSH : Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail =pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
+ }
+ } while (pThis->zstrm.avail_out == 0);
finalize_it:
- if(bzInitDone) {
- zRet = zlibw.DeflateEnd(&zstrm);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
+ if(pThis->bzInitDone && pThis->bVeryReliableZip) {
+ doZipFinish(pThis);
+ }
+ RETiRet;
+}
+
+
+
+/* finish zlib buffer, to be called before closing the ZIP file (if
+ * running in stream mode).
+ */
+static rsRetVal
+doZipFinish(strm_t *pThis)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ assert(pThis != NULL);
+
+ if(!pThis->bzInitDone) {
+ FINALIZE;
+ }
+
+dbgprintf("AAAA: doZipFinish() called\n");
+ pThis->zstrm.avail_in = 0;
+ /* run deflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
}
+ } while (pThis->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = zlibw.DeflateEnd(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
}
+ pThis->bzInitDone = 0;
RETiRet;
}
-
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
*/
static rsRetVal
-strmFlushInternal(strm_t *pThis)
+strmFlushInternal(strm_t *pThis, int bFlushZip)
{
DEFiRet;
@@ -1232,7 +1276,7 @@ strmFlushInternal(strm_t *pThis)
(long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : "");
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr, bFlushZip);
}
RETiRet;
@@ -1254,7 +1298,7 @@ strmFlush(strm_t *pThis)
if(pThis->bAsyncWrite)
d_pthread_mutex_lock(&pThis->mut);
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 1));
finalize_it:
if(pThis->bAsyncWrite)
@@ -1277,7 +1321,7 @@ static rsRetVal strmSeek(strm_t *pThis, off64_t offs)
if(pThis->fd == -1) {
CHKiRet(strmOpenFile(pThis));
} else {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
long long i;
DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs);
@@ -1320,7 +1364,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
/* we now always have space for one character, so we simply copy it */
*(pThis->pIOBuf + pThis->iBufPtr) = c;
@@ -1390,7 +1434,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
@@ -1405,7 +1449,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
* write it. This seems more natural than waiting (hours?) for the next message...
*/
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
finalize_it:
@@ -1433,6 +1477,7 @@ DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
DEFpropSetMeth(strm, iZipLevel, int)
+DEFpropSetMeth(strm, bVeryReliableZip, int)
DEFpropSetMeth(strm, bSync, int)
DEFpropSetMeth(strm, sIOBufSize, size_t)
DEFpropSetMeth(strm, iSizeLimit, off_t)
@@ -1564,7 +1609,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis));
objSerializeSCALAR(pStrm, iCurrFNum, INT);
@@ -1757,6 +1802,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SettOpenMode = strmSettOpenMode;
pIf->SetsType = strmSetsType;
pIf->SetiZipLevel = strmSetiZipLevel;
+ pIf->SetbVeryReliableZip = strmSetbVeryReliableZip;
pIf->SetbSync = strmSetbSync;
pIf->SetsIOBufSize = strmSetsIOBufSize;
pIf->SetiSizeLimit = strmSetiSizeLimit;
diff --git a/runtime/stream.h b/runtime/stream.h
index a01929f2..fdfefaa3 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -124,6 +124,8 @@ typedef struct strm_s {
sbool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
sbool bStopWriter; /* shall writer thread terminate? */
sbool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ sbool bVeryReliableZip; /* shall we write interim headers to create a very reliable ZIP file? */
int iFlushInterval; /* flush in which interval - 0, no flushing */
pthread_mutex_t mut;/* mutex for flush in async mode */
pthread_cond_t notFull;
@@ -132,6 +134,7 @@ typedef struct strm_s {
unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
short iCnt; /* current nbr of elements in buffer */
+ z_stream zstrm; /* zip stream to use */
struct {
uchar *pBuf;
size_t lenBuf;
@@ -181,8 +184,10 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
/* v6 added */
rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, int mode);
+ /* v7 added 2012-09-14 */
+ INTERFACEpropSetMeth(strm, bVeryReliableZip, int);
ENDinterface(strm)
-#define strmCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
+#define strmCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/runtime/typedefs.h b/runtime/typedefs.h
index ccae08b2..1e0cb466 100644
--- a/runtime/typedefs.h
+++ b/runtime/typedefs.h
@@ -92,6 +92,7 @@ typedef struct cfgmodules_etry_s cfgmodules_etry_t;
typedef struct outchannels_s outchannels_t;
typedef struct modConfData_s modConfData_t;
typedef struct instanceConf_s instanceConf_t;
+typedef struct ratelimit_s ratelimit_t;
typedef int rs_size_t; /* we do never need more than 2Gig strings, signed permits to
* use -1 as a special flag. */
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */
diff --git a/tcps_sess.c b/tcps_sess.c
index e7149cb7..16fd94f5 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -47,6 +47,7 @@
#include "msg.h"
#include "datetime.h"
#include "prop.h"
+#include "ratelimit.h"
#include "debug.h"
@@ -264,14 +265,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG
MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset);
STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit);
- if(pMultiSub == NULL) {
- CHKiRet(submitMsg(pMsg));
- } else {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg(pMultiSub));
- }
-
+ ratelimitAddMsg(pThis->pLstnInfo->ratelimiter, pMultiSub, pMsg);
finalize_it:
/* reset status variables */
@@ -487,11 +481,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen)
while(pData < pEnd) {
CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
-
- if(multiSub.nElem > 0) {
- /* submit anything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&multiSub));
- }
+ iRet = multiSubmitFlush(&multiSub);
finalize_it:
RETiRet;
diff --git a/tcpsrv.c b/tcpsrv.c
index bf12f1fa..7ba557e0 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -72,6 +72,7 @@
#include "nspoll.h"
#include "errmsg.h"
#include "ruleset.h"
+#include "ratelimit.h"
#include "unicode-helper.h"
@@ -151,6 +152,9 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram)
snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort);
statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
CHKiRet(statsobj.SetName(pEntry->stats, statname));
+ CHKiRet(ratelimitNew(&pEntry->ratelimiter, "tcperver", NULL));
+ ratelimitSetLinuxLike(pEntry->ratelimiter, pThis->ratelimitInterval, pThis->ratelimitBurst);
+ ratelimitSetThreadSafe(pEntry->ratelimiter);
STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(pEntry->ctrSubmit)));
@@ -295,6 +299,7 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
while(pEntry != NULL) {
free(pEntry->pszPort);
prop.Destruct(&pEntry->pInputName);
+ ratelimitDestruct(pEntry->ratelimiter);
pDel = pEntry;
pEntry = pEntry->pNext;
free(pDel);
@@ -913,6 +918,8 @@ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macr
pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
pThis->bDisableLFDelim = 0;
pThis->OnMsgReceive = NULL;
+ pThis->ratelimitInterval = 0;
+ pThis->ratelimitBurst = 10000;
pThis->bUseFlowControl = 1;
ENDobjConstruct(tcpsrv)
@@ -1120,6 +1127,17 @@ finalize_it:
}
+/* Set the linux-like ratelimiter settings */
+static rsRetVal
+SetLinuxLikeRatelimiters(tcpsrv_t *pThis, int ratelimitInterval, int ratelimitBurst)
+{
+ DEFiRet;
+ pThis->ratelimitInterval = ratelimitInterval;
+ pThis->ratelimitBurst = ratelimitBurst;
+ RETiRet;
+}
+
+
/* Set the ruleset (ptr) to use */
static rsRetVal
SetRuleset(tcpsrv_t *pThis, ruleset_t *pRuleset)
@@ -1270,6 +1288,7 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->SetCBOnErrClose = SetCBOnErrClose;
pIf->SetOnMsgReceive = SetOnMsgReceive;
pIf->SetRuleset = SetRuleset;
+ pIf->SetLinuxLikeRatelimiters = SetLinuxLikeRatelimiters;
pIf->SetNotificationOnRemoteClose = SetNotificationOnRemoteClose;
finalize_it:
diff --git a/tcpsrv.h b/tcpsrv.h
index d66f682c..93e472c9 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -42,6 +42,7 @@ struct tcpLstnPortList_s {
ruleset_t *pRuleset; /**< associated ruleset */
statsobj_t *stats; /**< associated stats object */
sbool bSuppOctetFram; /**< do we support octect-counted framing? (if no->legay only!)*/
+ ratelimit_t *ratelimiter;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
tcpLstnPortList_t *pNext; /**< next port or NULL */
};
@@ -70,6 +71,8 @@ struct tcpsrv_s {
int addtlFrameDelim; /**< additional frame delimiter for plain TCP syslog framing (e.g. to handle NetScreen) */
int bDisableLFDelim; /**< if 1, standard LF frame delimiter is disabled (*very dangerous*) */
+ int ratelimitInterval;
+ int ratelimitBurst;
tcps_sess_t **pSessions;/**< array of all of our sessions */
void *pUsr; /**< a user-settable pointer (provides extensibility for "derived classes")*/
/* callbacks */
@@ -142,8 +145,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetUseFlowControl)(tcpsrv_t*, int);
/* added v11 -- rgerhards, 2011-05-09 */
rsRetVal (*SetKeepAlive)(tcpsrv_t*, int);
+ /* added v13 -- rgerhards, 2012-10-15 */
+ rsRetVal (*SetLinuxLikeRatelimiters)(tcpsrv_t *pThis, int interval, int burst);
ENDinterface(tcpsrv)
-#define tcpsrvCURR_IF_VERSION 12 /* increment whenever you change the interface structure! */
+#define tcpsrvCURR_IF_VERSION 13 /* increment whenever you change the interface structure! */
/* change for v4:
* - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10
* - SetInputName() added -- rgerhards, 2008-12-10
diff --git a/template.c b/template.c
index 2fc85e55..986dbfd6 100644
--- a/template.c
+++ b/template.c
@@ -141,7 +141,9 @@ finalize_it:
* offers big performance improvements.
* rewritten 2009-06-19 rgerhards
*/
-rsRetVal tplToString(struct template *pTpl, msg_t *pMsg, uchar **ppBuf, size_t *pLenBuf)
+rsRetVal
+tplToString(struct template *pTpl, msg_t *pMsg, uchar **ppBuf, size_t *pLenBuf,
+ struct syslogTime *ttNow)
{
DEFiRet;
struct templateEntry *pTpe;
@@ -191,7 +193,8 @@ rsRetVal tplToString(struct template *pTpl, msg_t *pMsg, uchar **ppBuf, size_t *
bMustBeFreed = 0;
} else if(pTpe->eEntryType == FIELD) {
pVal = (uchar*) MsgGetProp(pMsg, pTpe, pTpe->data.field.propid,
- pTpe->data.field.propName, &iLenVal, &bMustBeFreed);
+ pTpe->data.field.propName, &iLenVal,
+ &bMustBeFreed, ttNow);
/* we now need to check if we should use SQL option. In this case,
* we must go over the generated string and escape '\'' characters.
* rgerhards, 2005-09-22: the option values below look somewhat misplaced,
@@ -245,7 +248,8 @@ finalize_it:
* is indicated by a NULL pointer.
* rgerhards, 2009-04-03
*/
-rsRetVal tplToArray(struct template *pTpl, msg_t *pMsg, uchar*** ppArr)
+rsRetVal
+tplToArray(struct template *pTpl, msg_t *pMsg, uchar*** ppArr, struct syslogTime *ttNow)
{
DEFiRet;
struct templateEntry *pTpe;
@@ -286,7 +290,8 @@ rsRetVal tplToArray(struct template *pTpl, msg_t *pMsg, uchar*** ppArr)
CHKmalloc(pArr[iArr] = (uchar*)strdup((char*) pTpe->data.constant.pConstant));
} else if(pTpe->eEntryType == FIELD) {
pVal = (uchar*) MsgGetProp(pMsg, pTpe, pTpe->data.field.propid,
- pTpe->data.field.propName, &propLen, &bMustBeFreed);
+ pTpe->data.field.propName, &propLen,
+ &bMustBeFreed, ttNow);
if(bMustBeFreed) { /* if it must be freed, it is our own private copy... */
pArr[iArr] = pVal; /* ... so we can use it! */
} else {
@@ -310,7 +315,7 @@ finalize_it:
* rgerhards, 2012-08-29
*/
rsRetVal
-tplToJSON(struct template *pTpl, msg_t *pMsg, struct json_object **pjson)
+tplToJSON(struct template *pTpl, msg_t *pMsg, struct json_object **pjson, struct syslogTime *ttNow)
{
struct templateEntry *pTpe;
rs_size_t propLen;
@@ -353,7 +358,7 @@ tplToJSON(struct template *pTpl, msg_t *pMsg, struct json_object **pjson)
} else {
pVal = (uchar*) MsgGetProp(pMsg, pTpe, pTpe->data.field.propid,
pTpe->data.field.propName, &propLen,
- &bMustBeFreed);
+ &bMustBeFreed, ttNow);
if(pTpe->data.field.options.bMandatory || propLen > 0) {
jsonf = json_object_new_string_len((char*)pVal, propLen);
json_object_object_add(json, (char*)pTpe->fieldName, jsonf);
@@ -371,6 +376,38 @@ finalize_it:
}
+/* Check if the template requires a date call (actually a cached
+ * date structure). This currently is the case for the $NOW family
+ * of properties.
+ */
+int
+tplRequiresDateCall(struct template *pTpl)
+{
+ struct templateEntry *pTpe;
+ int r = 0;
+
+ if(pTpl->subtree != NULL)
+ goto done;
+
+ for(pTpe = pTpl->pEntryRoot ; pTpe != NULL ; pTpe = pTpe->pNext) {
+ switch(pTpe->data.field.propid) {
+ case PROP_SYS_NOW:
+ case PROP_SYS_YEAR:
+ case PROP_SYS_MONTH:
+ case PROP_SYS_DAY:
+ case PROP_SYS_HOUR:
+ case PROP_SYS_HHOUR:
+ case PROP_SYS_QHOUR:
+ case PROP_SYS_MINUTE:
+ r = 1;
+ goto done;
+ default:break;
+ }
+ }
+done: return r;
+}
+
+
/* Helper to doEscape. This is called if doEscape
* runs out of memory allocating the escaped string.
* Then we are in trouble. We can
@@ -791,6 +828,7 @@ static int do_Parameter(unsigned char **pp, struct template *pTpl)
/* Check frompos, if it has an R, then topos should be a regex */
if(*p == ':') {
+ pTpe->bComplexProcessing = 1;
++p; /* eat ':' */
#ifdef FEATURE_REGEXP
if(*p == 'R') {
@@ -1351,6 +1389,7 @@ createPropertyTpe(struct template *pTpl, struct cnfobj *o)
int fielddelim = 9; /* default is HT (USACSII 9) */
int re_matchToUse = 0;
int re_submatchToUse = 0;
+ int bComplexProcessing = 0;
char *re_expr = NULL;
struct cnfparamvals *pvals = NULL;
enum {F_NONE, F_CSV, F_JSON, F_JSONF} formatType = F_NONE;
@@ -1376,23 +1415,31 @@ createPropertyTpe(struct template *pTpl, struct cnfobj *o)
free(tmpstr);
} else if(!strcmp(pblkProperty.descr[i].name, "droplastlf")) {
droplastlf = pvals[i].val.d.n;
+ bComplexProcessing = 1;
} else if(!strcmp(pblkProperty.descr[i].name, "mandatory")) {
mandatory = pvals[i].val.d.n;
} else if(!strcmp(pblkProperty.descr[i].name, "spifno1stsp")) {
spifno1stsp = pvals[i].val.d.n;
+ bComplexProcessing = 1;
} else if(!strcmp(pblkProperty.descr[i].name, "outname")) {
outname = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(pblkProperty.descr[i].name, "position.from")) {
frompos = pvals[i].val.d.n;
+ bComplexProcessing = 1;
} else if(!strcmp(pblkProperty.descr[i].name, "position.to")) {
topos = pvals[i].val.d.n;
+ bComplexProcessing = 1;
} else if(!strcmp(pblkProperty.descr[i].name, "field.number")) {
fieldnum = pvals[i].val.d.n;
+ bComplexProcessing = 1;
} else if(!strcmp(pblkProperty.descr[i].name, "field.delimiter")) {
fielddelim = pvals[i].val.d.n;
+ bComplexProcessing = 1;
} else if(!strcmp(pblkProperty.descr[i].name, "regex.expression")) {
re_expr = es_str2cstr(pvals[i].val.d.estr, NULL);
+ bComplexProcessing = 1;
} else if(!strcmp(pblkProperty.descr[i].name, "regex.type")) {
+ bComplexProcessing = 1;
if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"BRE", sizeof("BRE")-1)) {
re_type = TPL_REGEX_BRE;
} else if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"ERE", sizeof("ERE")-1)) {
@@ -1405,6 +1452,7 @@ createPropertyTpe(struct template *pTpl, struct cnfobj *o)
ABORT_FINALIZE(RS_RET_ERR);
}
} else if(!strcmp(pblkProperty.descr[i].name, "regex.nomatchmode")) {
+ bComplexProcessing = 1;
if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"DFLT", sizeof("DFLT")-1)) {
re_nomatchType = TPL_REGEX_NOMATCH_USE_DFLTSTR;
} else if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"BLANK", sizeof("BLANK")-1)) {
@@ -1421,10 +1469,13 @@ createPropertyTpe(struct template *pTpl, struct cnfobj *o)
ABORT_FINALIZE(RS_RET_ERR);
}
} else if(!strcmp(pblkProperty.descr[i].name, "regex.match")) {
+ bComplexProcessing = 1;
re_matchToUse = pvals[i].val.d.n;
} else if(!strcmp(pblkProperty.descr[i].name, "regex.submatch")) {
+ bComplexProcessing = 1;
re_submatchToUse = pvals[i].val.d.n;
} else if(!strcmp(pblkProperty.descr[i].name, "format")) {
+ bComplexProcessing = 1;
if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"csv", sizeof("csv")-1)) {
formatType = F_CSV;
} else if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"json", sizeof("json")-1)) {
@@ -1439,6 +1490,7 @@ createPropertyTpe(struct template *pTpl, struct cnfobj *o)
ABORT_FINALIZE(RS_RET_ERR);
}
} else if(!strcmp(pblkProperty.descr[i].name, "controlcharacters")) {
+ bComplexProcessing = 1;
if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"escape", sizeof("escape")-1)) {
controlchr = CC_ESCAPE;
} else if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"space", sizeof("space")-1)) {
@@ -1453,6 +1505,7 @@ createPropertyTpe(struct template *pTpl, struct cnfobj *o)
ABORT_FINALIZE(RS_RET_ERR);
}
} else if(!strcmp(pblkProperty.descr[i].name, "securepath")) {
+ bComplexProcessing = 1;
if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"drop", sizeof("drop")-1)) {
secpath = SP_DROP;
} else if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"replace", sizeof("replace")-1)) {
@@ -1465,6 +1518,7 @@ createPropertyTpe(struct template *pTpl, struct cnfobj *o)
ABORT_FINALIZE(RS_RET_ERR);
}
} else if(!strcmp(pblkProperty.descr[i].name, "caseconversion")) {
+ bComplexProcessing = 1;
if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"lower", sizeof("lower")-1)) {
caseconv = tplCaseConvLower;
} else if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"upper", sizeof("upper")-1)) {
@@ -1583,6 +1637,7 @@ createPropertyTpe(struct template *pTpl, struct cnfobj *o)
pTpe->fieldName = outname;
if(outname != NULL)
pTpe->lenFieldName = ustrlen(outname);
+ pTpe->bComplexProcessing = bComplexProcessing;
pTpe->data.field.eDateFormat = datefmt;
if(fieldnum != -1) {
pTpe->data.field.has_fields = 1;
@@ -2117,6 +2172,8 @@ void tplPrintList(rsconf_t *conf)
}
break;
}
+ if(pTpe->bComplexProcessing)
+ dbgprintf("[COMPLEX]");
dbgprintf("\n");
pTpe = pTpe->pNext;
}
@@ -2130,8 +2187,6 @@ int tplGetEntryCount(struct template *pTpl)
return(pTpl->tpenElements);
}
-/* our init function. TODO: remove once converted to a class
- */
rsRetVal templateInit()
{
DEFiRet;
diff --git a/template.h b/template.h
index 5a35d274..018e2f52 100644
--- a/template.h
+++ b/template.h
@@ -72,6 +72,7 @@ struct templateEntry {
enum EntryTypes eEntryType;
uchar *fieldName; /**< field name to be used for structured output */
int lenFieldName;
+ sbool bComplexProcessing; /**< set if complex processing (options, etc) is required */
union {
struct {
uchar *pConstant; /* pointer to constant value */
@@ -142,14 +143,15 @@ void tplDeleteNew(rsconf_t *conf);
void tplPrintList(rsconf_t *conf);
void tplLastStaticInit(rsconf_t *conf, struct template *tpl);
rsRetVal ExtendBuf(uchar **pBuf, size_t *pLenBuf, size_t iMinSize);
+int tplRequiresDateCall(struct template *pTpl);
/* note: if a compiler warning for undefined type tells you to look at this
* code line below, the actual cause is that you currently MUST include template.h
* BEFORE msg.h, even if your code file does not actually need it.
* rgerhards, 2007-08-06
*/
-rsRetVal tplToArray(struct template *pTpl, msg_t *pMsg, uchar*** ppArr);
-rsRetVal tplToString(struct template *pTpl, msg_t *pMsg, uchar** ppSz, size_t *);
-rsRetVal tplToJSON(struct template *pTpl, msg_t *pMsg, struct json_object **);
+rsRetVal tplToArray(struct template *pTpl, msg_t *pMsg, uchar*** ppArr, struct syslogTime *ttNow);
+rsRetVal tplToString(struct template *pTpl, msg_t *pMsg, uchar** ppSz, size_t *, struct syslogTime *ttNow);
+rsRetVal tplToJSON(struct template *pTpl, msg_t *pMsg, struct json_object **, struct syslogTime *ttNow);
rsRetVal doEscape(uchar **pp, rs_size_t *pLen, unsigned short *pbMustBeFreed, int escapeMode);
rsRetVal templateInit();
diff --git a/tools/omfile.c b/tools/omfile.c
index 5b0bfb46..c7e0dc25 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -156,6 +156,7 @@ typedef struct _instanceData {
int iFlushInterval; /* how fast flush buffer on inactivity? */
sbool bFlushOnTXEnd; /* flush write buffers when transaction has ended? */
sbool bUseAsyncWriter; /* use async stream writer? */
+ sbool bVeryRobustZip;
} instanceData;
@@ -205,6 +206,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "ziplevel", eCmdHdlrInt, 0 }, /* legacy: omfileziplevel */
{ "flushinterval", eCmdHdlrInt, 0 }, /* legacy: omfileflushinterval */
{ "asyncwriting", eCmdHdlrBinary, 0 }, /* legacy: omfileasyncwriting */
+ { "veryrobustzip", eCmdHdlrBinary, 0 },
{ "flushontxend", eCmdHdlrBinary, 0 }, /* legacy: omfileflushontxend */
{ "iobuffersize", eCmdHdlrSize, 0 }, /* legacy: omfileiobuffersize */
{ "dirowner", eCmdHdlrUID, 0 }, /* legacy: dirowner */
@@ -269,7 +271,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tflush on TX end=%d\n", pData->bFlushOnTXEnd);
dbgprintf("\tflush interval=%d\n", pData->iFlushInterval);
dbgprintf("\tfile cache size=%d\n", pData->iDynaFileCacheSize);
- dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "yes" : "no");
+ dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "on" : "off");
+ dbgprintf("\tvery robust zip: %s\n", pData->bCreateDirs ? "on" : "off");
dbgprintf("\tfile owner %d, group %d\n", (int) pData->fileUID, (int) pData->fileGID);
dbgprintf("\tdirectory owner %d, group %d\n", (int) pData->dirUID, (int) pData->dirGID);
dbgprintf("\tdir create mode 0%3.3o, file create mode 0%3.3o\n",
@@ -292,7 +295,7 @@ setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal)
if(loadModConf != NULL && loadModConf->tplName != NULL) {
free(newVal);
- errmsg.LogError(0, RS_RET_ERR, "omfile default template already set via module "
+ errmsg.LogError(0, RS_RET_ERR, "omfile: default template already set via module "
"global parameter - can no longer be changed");
ABORT_FINALIZE(RS_RET_ERR);
}
@@ -536,6 +539,7 @@ prepareFile(instanceData *pData, uchar *newFileName)
CHKiRet(strm.SetFName(pData->pStrm, szBaseName, ustrlen(szBaseName)));
CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName)));
CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel));
+ CHKiRet(strm.SetbVeryReliableZip(pData->pStrm, pData->bVeryRobustZip));
CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize));
CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND));
CHKiRet(strm.SettOpenMode(pData->pStrm, cs.fCreateMode));
@@ -843,6 +847,7 @@ BEGINendTransaction
CODESTARTendTransaction
/* Note: pStrm may be NULL if there was an error opening the stream */
if(pData->bFlushOnTXEnd && pData->pStrm != NULL) {
+dbgprintf("AAAA: flusing stream, endTx\n");
CHKiRet(strm.Flush(pData->pStrm));
}
finalize_it:
@@ -854,6 +859,7 @@ CODESTARTdoAction
DBGPRINTF("file to log to: %s\n", pData->f_fname);
CHKiRet(writeFile(ppString, iMsgOpts, pData));
if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) {
+dbgprintf("AAAA: flusing stream, in Tx\n");
CHKiRet(strm.Flush(pData->pStrm));
}
finalize_it:
@@ -878,6 +884,7 @@ setInstParamDefaults(instanceData *pData)
pData->bCreateDirs = 1;
pData->bSyncFile = 0;
pData->iZipLevel = 0;
+ pData->bVeryRobustZip = 0;
pData->bFlushOnTXEnd = FLUSHONTX_DFLT;
pData->iIOBufSize = IOBUF_DFLT_SIZE;
pData->iFlushInterval = FLUSH_INTRVL_DFLT;
@@ -915,6 +922,8 @@ CODESTARTnewActInst
pData->iZipLevel = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "flushinterval")) {
pData->iFlushInterval = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "veryrobustzip")) {
+ pData->bVeryRobustZip = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "asyncwriting")) {
pData->bUseAsyncWriter = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "flushontxend")) {
@@ -1064,6 +1073,7 @@ CODESTARTparseSelectorAct
pData->iIOBufSize = (int) cs.iIOBufSize;
pData->iFlushInterval = cs.iFlushInterval;
pData->bUseAsyncWriter = cs.bUseAsyncWriter;
+ pData->bVeryRobustZip = 0; /* cannot be specified via legacy conf */
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -1090,7 +1100,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
cs.bUseAsyncWriter = USE_ASYNCWRITER_DFLT;
free(pszFileDfltTplName);
pszFileDfltTplName = NULL;
-
return RS_RET_OK;
}
diff --git a/tools/syslogd.c b/tools/syslogd.c
index edb546a1..cb6a47cd 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -125,6 +125,7 @@
#include "dnscache.h"
#include "sd-daemon.h"
#include "rainerscript.h"
+#include "ratelimit.h"
/* definitions for objects we access */
DEFobjCurrIf(obj)
@@ -220,6 +221,7 @@ struct queuefilenames_s {
} *queuefilenames = NULL;
+static ratelimit_t *dflt_ratelimiter = NULL; /* ratelimiter for submits without explicit one */
int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */
int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */
static int NoFork = 0; /* don't fork - don't run in daemon mode - read-only after startup */
@@ -411,7 +413,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int fla
CHKiRet(prop.Destruct(&pProp));
CHKiRet(MsgSetRcvFromIPStr(pMsg, hnameIP, ustrlen(hnameIP), &pProp));
CHKiRet(prop.Destruct(&pProp));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg2(pMsg));
finalize_it:
RETiRet;
@@ -431,6 +433,12 @@ submitErrMsg(int iErr, uchar *msg)
}
+static inline rsRetVal
+submitMsgWithDfltRatelimiter(msg_t *pMsg)
+{
+ return ratelimitAddMsg(dflt_ratelimiter, NULL, pMsg);
+}
+
/* rgerhards 2004-11-09: the following is a function that can be used
* to log a message orginating from the syslogd itself.
*/
@@ -482,50 +490,12 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags)
/* we have the queue, so we can simply provide the
* message to the queue engine.
*/
- submitMsg(pMsg);
+ submitMsgWithDfltRatelimiter(pMsg);
}
finalize_it:
RETiRet;
}
-/* check message against ACL set
- * rgerhards, 2009-11-16
- */
-#if 0
-static inline rsRetVal
-chkMsgAgainstACL() {
- /* if we reach this point, we had a good receive and can process the packet received */
- /* check if we have a different sender than before, if so, we need to query some new values */
- if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) {
- CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP));
- memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */
- /* Here we check if a host is permitted to send us
- * syslog messages. If it isn't, we do not further
- * process the message but log a warning (if we are
- * configured to do this).
- * rgerhards, 2005-09-26
- */
- *pbIsPermitted = net.isAllowedSender((uchar*)"UDP",
- (struct sockaddr *)&frominet, (char*)fromHostFQDN);
-
- if(!*pbIsPermitted) {
- DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN);
- if(glbl.GetOption_DisallowWarning) {
- time_t tt;
-
- datetime.GetTime(&tt);
- if(tt > ttLastDiscard + 60) {
- ttLastDiscard = tt;
- errmsg.LogError(0, NO_ERRCODE,
- "UDP message from disallowed sender %s discarded",
- (char*)fromHost);
- }
- }
- }
- }
-}
-#endif
-
/* preprocess a batch of messages, that is ready them for actual processing. This is done
* as a first stage and totally in parallel to any other worker active in the system. So
@@ -618,7 +588,7 @@ int i;
* rgerhards, 2008-02-13
*/
rsRetVal
-submitMsg(msg_t *pMsg)
+submitMsg2(msg_t *pMsg)
{
qqueue_t *pQueue;
ruleset_t *pRuleset;
@@ -631,7 +601,7 @@ submitMsg(msg_t *pMsg)
/* if a plugin logs a message during shutdown, the queue may no longer exist */
if(pQueue == NULL) {
- DBGPRINTF("submitMsg() could not submit message - "
+ DBGPRINTF("submitMsg2() could not submit message - "
"queue does (no longer?) exist - ignored\n");
FINALIZE;
}
@@ -643,13 +613,19 @@ finalize_it:
RETiRet;
}
+rsRetVal
+submitMsg(msg_t *pMsg)
+{
+ return submitMsgWithDfltRatelimiter(pMsg);
+}
+
/* submit multiple messages at once, very similar to submitMsg, just
* for multi_submit_t. All messages need to go into the SAME queue!
* rgerhards, 2009-06-16
*/
rsRetVal
-multiSubmitMsg(multi_submit_t *pMultiSub)
+multiSubmitMsg2(multi_submit_t *pMultiSub)
{
int i;
qqueue_t *pQueue;
@@ -680,8 +656,23 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
finalize_it:
RETiRet;
}
+rsRetVal
+multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */
+{
+ return multiSubmitMsg2(pMultiSub);
+}
+/* flush multiSubmit, e.g. at end of read records */
+rsRetVal
+multiSubmitFlush(multi_submit_t *pMultiSub)
+{
+ DEFiRet;
+ if(pMultiSub->nElem > 0) {
+ iRet = multiSubmitMsg2(pMultiSub);
+ }
+ RETiRet;
+}
static void
@@ -1262,7 +1253,7 @@ static inline void processImInternal(void)
msg_t *pMsg;
while(iminternalRemoveMsg(&pMsg) == RS_RET_OK) {
- submitMsg(pMsg);
+ submitMsgWithDfltRatelimiter(pMsg);
}
}
@@ -1467,6 +1458,7 @@ InitGlobalClasses(void)
CHKiRet(objUse(net, LM_NET_FILENAME));
dnscacheInit();
initRainerscript();
+ ratelimitModInit();
finalize_it:
if(iRet != RS_RET_OK) {
@@ -1505,6 +1497,7 @@ GlobalClassExit(void)
/* TODO: implement the rest of the deinit */
/* dummy "classes */
strExit();
+ ratelimitModExit();
#if 0
CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */
@@ -2037,6 +2030,9 @@ int realMain(int argc, char **argv)
}
CHKiRet(localRet);
+ CHKiRet(ratelimitNew(&dflt_ratelimiter, "rsyslogd", NULL));
+ /* TODO: add linux-type limiting capability */
+
if(bChDirRoot) {
if(chdir("/") != 0)
fprintf(stderr, "Can not do 'cd /' - still trying to run\n");