summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog6
-rw-r--r--action.c137
-rw-r--r--action.h11
-rw-r--r--dirty.h20
-rw-r--r--doc/imptcp.html15
-rw-r--r--doc/imtcp.html11
-rw-r--r--doc/imudp.html7
-rw-r--r--doc/v7compatibility.html37
-rw-r--r--plugins/imdiag/imdiag.c10
-rw-r--r--plugins/imfile/imfile.c22
-rw-r--r--plugins/imklog/imklog.c8
-rw-r--r--plugins/imkmsg/imkmsg.c2
-rw-r--r--plugins/impstats/impstats.c3
-rw-r--r--plugins/imptcp/imptcp.c36
-rw-r--r--plugins/imrelp/imrelp.c24
-rw-r--r--plugins/imsolaris/imsolaris.c2
-rw-r--r--plugins/imtcp/imtcp.c20
-rw-r--r--plugins/imudp/imudp.c40
-rw-r--r--plugins/imuxsock/imuxsock.c131
-rw-r--r--plugins/omruleset/omruleset.c6
-rw-r--r--runtime/Makefile.am2
-rw-r--r--runtime/conf.c9
-rw-r--r--runtime/glbl.c2
-rw-r--r--runtime/queue.c2
-rw-r--r--runtime/ratelimit.c359
-rw-r--r--runtime/ratelimit.h51
-rw-r--r--runtime/rsconf.c4
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--runtime/stream.c2
-rw-r--r--runtime/typedefs.h1
-rw-r--r--tcps_sess.c16
-rw-r--r--tcpsrv.c19
-rw-r--r--tcpsrv.h7
-rw-r--r--tests/runtime-dummy.c1
-rw-r--r--tools/syslogd.c167
35 files changed, 720 insertions, 471 deletions
diff --git a/ChangeLog b/ChangeLog
index 5febb872..6d540565 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,10 @@
---------------------------------------------------------------------------
Version 7.3.2 [devel] 2012-10-??
+- imtcp: support for Linux-Type ratelimiting added
+- imptcp: support for Linux-Type ratelimiting added
+- imudp enhancements:
+ * support for input batching added (performance improvement)
+ * support for Linux-Type ratelimiting added
- permited action-like statements (stop, call, ...) in action lists
- bugfix: segfault on startup when modules using MSG_PASSING mode are used
- omelasticsearch: support for writing data errors to local file added
@@ -64,6 +69,7 @@ Version 7.1.12 [beta] 2012-10-18
This happened only under some circumstances. Thanks to Marius
Tomaschwesky and Florian Piekert for their help in solving this issue.
----------------------------------------------------------------------------
+>>>>>>> b151584d0929759284c0fb0399709e5ca0e29d60
Version 7.1.11 [beta] 2012-10-16
- bugfix: imuxsock truncated head of received message
This happened only under some circumstances. Thanks to Marius
diff --git a/action.c b/action.c
index ae40eadf..58ada67c 100644
--- a/action.c
+++ b/action.c
@@ -12,11 +12,11 @@
* necessary to triple-check that everything works well in *all* modes.
* The different modes (and calling sequence) are:
*
- * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval
+ * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval
* - doSubmitToActionQComplexBatch
* - helperSubmitToActionQComplexBatch
* - doActionCallAction
- * handles duplicate message processing, but in essence calls
+ * handles mark message reduction, but in essence calls
* - actionWriteToAction
* - qqueueEnqObj
* (now queue engine processing)
@@ -307,9 +307,6 @@ rsRetVal actionDestruct(action_t *pThis)
if(pThis->pMod != NULL)
pThis->pMod->freeInstance(pThis->pModData);
- if(pThis->f_pMsg != NULL)
- msgDestruct(&pThis->f_pMsg);
-
pthread_mutex_destroy(&pThis->mutAction);
pthread_mutex_destroy(&pThis->mutActExec);
d_free(pThis->pszName);
@@ -410,16 +407,11 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
* mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
*/
if( pThis->iExecEveryNthOccur > 1
- || pThis->f_ReduceRepeated
|| pThis->iSecsExecOnceInterval
) {
DBGPRINTF("info: firehose mode disabled for action because "
- "iExecEveryNthOccur=%d, "
- "ReduceRepeated=%d, "
- "iSecsExecOnceInterval=%d\n",
- pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
- pThis->iSecsExecOnceInterval
- );
+ "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval);
pThis->submitToActQ = doSubmitToActionQComplexBatch;
} else if(pThis->bWriteAllMarkMsgs == RSFALSE) {
/* nearly full-speed submission mode, default case */
@@ -782,7 +774,6 @@ rsRetVal actionDbgPrint(action_t *pThis)
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
dbgprintf("\n");
dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
- dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
if(pThis->eState == ACT_STATE_SUSP) {
dbgprintf("\tresume next retry: %u, number retries: %d",
@@ -1418,14 +1409,10 @@ finalize_it:
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction)
+actionWriteToAction(action_t *pAction, msg_t *pMsg)
{
- msg_t *pMsgSave; /* to save current message pointer, necessary to restore
- it in case it needs to be updated (e.g. repeated msgs) */
DEFiRet;
- pMsgSave = NULL; /* indicate message poiner not saved */
-
/* first, we check if the action should actually be called. The action-specific
* $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth
* time. So we need to check if we need to drop the (otherwise perfectly executable)
@@ -1452,43 +1439,6 @@ actionWriteToAction(action_t *pAction)
}
}
- /* then check if this is a regular message or the repeation of
- * a previous message. If so, we need to change the message text
- * to "last message repeated n times" and then go ahead and write
- * it. Please note that we can not modify the message object, because
- * that would update it in other selectors as well. As such, we first
- * need to create a local copy of the message, which we than can update.
- * rgerhards, 2007-07-10
- */
- if(pAction->f_prevcount > 1) {
- msg_t *pMsg;
- size_t lenRepMsg;
- uchar szRepMsg[1024];
-
- if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
- /* it failed - nothing we can do against it... */
- DBGPRINTF("Message duplication failed, dropping repeat message.\n");
- ABORT_FINALIZE(RS_RET_ERR);
- }
-
- if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times",
- pAction->f_prevcount);
- } else {
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]",
- pAction->f_prevcount, getMSG(pAction->f_pMsg));
- }
-
- /* We now need to update the other message properties. Please note that digital
- * signatures inside the message are also invalidated.
- */
- datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
- memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
- MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg);
- pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */
- pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
- }
-
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
/* now check if we need to drop the message because otherwise the action would be too
@@ -1509,31 +1459,14 @@ actionWriteToAction(action_t *pAction)
/* we use reception time, not dequeue time - this is considered more appropriate and also faster ;)
* rgerhards, 2008-09-17 */
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- pAction->f_time = pAction->f_pMsg->ttGenTime;
+ pAction->f_time = pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
-
- if(iRet == RS_RET_OK)
- pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
+ iRet = doSubmitToActionQ(pAction, pMsg);
finalize_it:
- if(pMsgSave != NULL) {
- /* we had saved the original message pointer. That was
- * done because we needed to create a temporary one
- * (most often for "message repeated n time" handling). If so,
- * we need to restore the original one now, so that procesing
- * can continue as normal. We also need to discard the temporary
- * one, as we do not like memory leaks ;) Please note that the original
- * message object will be discarded by our callers, so this is nothing
- * of our business. rgerhards, 2007-07-10
- */
- msgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = pMsgSave; /* restore it */
- }
-
RETiRet;
}
@@ -1556,43 +1489,8 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
ABORT_FINALIZE(RS_RET_OK);
}
- /* suppress duplicate messages */
- if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
- (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
- !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
- !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
- !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) &&
- !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) {
- pAction->f_prevcount++;
- DBGPRINTF("msg repeated %d times, %ld sec of %d.\n",
- pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time,
- repeatinterval[pAction->f_repeatcount]);
- /* use current message, so we have the new timestamp (means we need to discard previous one) */
- msgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* If domark would have logged this by now, flush it now (so we don't hold
- * isolated messages), but back off so we'll flush less often in the future.
- */
- if(getActNow(pAction) > REPEATTIME(pAction)) {
- iRet = actionWriteToAction(pAction);
- BACKOFF(pAction);
- }
- } else {/* new message, save it */
- /* first check if we have a previous message stored
- * if so, emit and then discard it first
- */
- if(pAction->f_pMsg != NULL) {
- if(pAction->f_prevcount > 0)
- actionWriteToAction(pAction);
- /* we do not care about iRet above - I think it's right but if we have
- * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
- */
- msgDestruct(&pAction->f_pMsg);
- }
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* call the output driver */
- iRet = actionWriteToAction(pAction);
- }
+ /* call the output driver */
+ iRet = actionWriteToAction(pAction, pMsg);
finalize_it:
/* we need to update the batch to handle failover processing correctly */
@@ -1998,13 +1896,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) {
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- } else {
- DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* check if the module is compatible with select features (currently no such features exist) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
pAction->requiresDateCall = actionRequiresDateCall(pAction);
@@ -2105,13 +1997,8 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams,
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- else {
- DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* check if the module is compatible with select features
+ * (currently no such features exist) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
loadConf->actions.nbrActions++; /* one more active action! */
}
diff --git a/action.h b/action.h
index 177fd682..fd84e2f1 100644
--- a/action.h
+++ b/action.h
@@ -48,7 +48,7 @@ typedef enum {
*/
typedef struct action_s action_t;
struct action_s {
- time_t f_time; /* used for "message repeated n times" - be careful, old, old code */
+ time_t f_time; /* used for "max. n messages in m seconds" processing */
time_t tActNow; /* the current time for an action execution. Initially set to -1 and
populated on an as-needed basis. This is a performance optimization. */
time_t tLastExec; /* time this action was last executed */
@@ -69,10 +69,7 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
- short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
sbool requiresDateCall;/* do we need to do a date call before creating templates? */
- int f_prevcount; /* repetition cnt of prevline */
- int f_repeatcount; /* number of "repeated" msgs */
rsRetVal (*submitToActQ)(action_t *, batch_t *);/* function submit message to action queue */
rsRetVal (*qConstruct)(struct queue_s *pThis);
enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2,
@@ -81,10 +78,6 @@ struct action_s {
int iNumTpls; /* number of array entries for template element below */
struct template **ppTpl;/* array of template to use - strings must be passed to doAction
* in this order. */
- msg_t *f_pMsg; /* pointer to the message (this will replace the other vars with msg
- * content later). This is preserved after the message has been
- * processed - it is also used to detect duplicates.
- */
qqueue_t *pQueue; /* action queue */
pthread_mutex_t mutAction; /* primary action mutex */
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
@@ -105,7 +98,7 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
-rsRetVal actionWriteToAction(action_t *pAction);
+rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct cnfparamvals *queueParams, int bSuspended);
diff --git a/dirty.h b/dirty.h
index a3940cb9..743632a2 100644
--- a/dirty.h
+++ b/dirty.h
@@ -27,26 +27,18 @@
#ifndef DIRTY_H_INCLUDED
#define DIRTY_H_INCLUDED 1
-rsRetVal multiSubmitMsg(multi_submit_t *pMultiSub);
-rsRetVal submitMsg(msg_t *pMsg);
+rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub);
+rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub); /* friends only! */
+rsRetVal submitMsg2(msg_t *pMsg);
+rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg);
+rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub);
rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags);
-rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset);
+rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset);
rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */
rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName);
-/* Intervals at which we flush out "message repeated" messages,
- * in seconds after previous message is logged. After each flush,
- * we move to the next interval until we reach the largest.
- * TODO: move this to action object! Only action.c and syslogd.c use it.
- */
extern int MarkInterval;
-extern int repeatinterval[2];
extern qqueue_t *pMsgQueue; /* the main message queue */
extern int iConfigVerify; /* is this just a config verify run? */
extern int bHaveMainQueue;
-#define MAXREPEAT ((int)((sizeof(repeatinterval) / sizeof(repeatinterval[0])) - 1))
-#define REPEATTIME(f) ((f)->f_time + repeatinterval[(f)->f_repeatcount])
-#define BACKOFF(f) { if (++(f)->f_repeatcount > MAXREPEAT) \
- (f)->f_repeatcount = MAXREPEAT; \
- }
#endif /* #ifndef DIRTY_H_INCLUDED */
diff --git a/doc/imptcp.html b/doc/imptcp.html
index 7e712afa..33b8b13b 100644
--- a/doc/imptcp.html
+++ b/doc/imptcp.html
@@ -13,18 +13,12 @@
<p><b>Description</b>:</p>
<p>Provides the ability to receive syslog messages via plain TCP syslog.
This is a specialised input plugin tailored for high performance on Linux. It will
-probably not run on any other platform. Also, it does no provide TLS services.
+probably not run on any other platform. Also, it does not provide TLS services.
Encryption can be provided by using <a href="rsyslog_stunnel.html">stunnel</a>.
<p>This module has no limit on the number of listeners and sessions that can be used.
-<p>Multiple receivers may be configured by
-specifying $InputPTCPServerRun multiple times.
</p>
<p><b>Configuration Directives</b>:</p>
-<p>This plugin has config directives similar named as imtcp, but they all have <b>P</b>TCP in
-their name instead of just TCP. Note that only a subset of the parameters are supported.
-<ul>
-
<p><b>Global Directives</b>:</p>
<ul>
<li>Threads &lt;number&gt;<br>
@@ -91,6 +85,13 @@ the message was received from.
Binds specified ruleset to next server defined.
<li><b>Address</b> &lt;name&gt;<br>
On multi-homed machines, specifies to which local address the listerner should be bound.
+<li><b>RateLimit.Interval</b> [number] - (available since 7.3.1) specifies the rate-limiting
+interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number
+of seconds (5 recommended) to activate rate-limiting.
+</li>
+<li><b>RateLimit.Burst</b> [number] - (available since 7.3.1) specifies the rate-limiting
+burst in number of messages. Default is 10,000.
+</li>
</ul>
<b>Caveats/Known Bugs:</b>
<ul>
diff --git a/doc/imtcp.html b/doc/imtcp.html
index 01ea2802..4bda46ba 100644
--- a/doc/imtcp.html
+++ b/doc/imtcp.html
@@ -17,10 +17,6 @@
Encryption is natively provided by selecting the approprioate network stream driver and
can also be provided by using <a href="rsyslog_stunnel.html">stunnel</a>
(an alternative is the use the <a href="imgssapi.html">imgssapi</a> module).</p>
-<p>Multiple receivers may be configured by specifying
-$InputTCPServerRun multiple times. This is available since version 4.3.1, earlier
-versions do NOT support it.
-</p>
<p><b>Configuration Directives</b>:</p>
<p><b>Global Directives</b>:</p>
@@ -100,6 +96,13 @@ activated. This is the default and should be left unchanged until you know
very well what you do. It may be useful to turn it off, if you know this framing
is not used and some senders emit multi-line messages into the message stream.
</li>
+<li><b>RateLimit.Interval</b> [number] - (available since 7.3.1) specifies the rate-limiting
+interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number
+of seconds (5 recommended) to activate rate-limiting.
+</li>
+<li><b>RateLimit.Burst</b> [number] - (available since 7.3.1) specifies the rate-limiting
+burst in number of messages. Default is 10,000.
+</li>
</ul>
<b>Caveats/Known Bugs:</b>
<ul>
diff --git a/doc/imudp.html b/doc/imudp.html
index b1a3ecc9..e32f9ecf 100644
--- a/doc/imudp.html
+++ b/doc/imudp.html
@@ -47,6 +47,13 @@ default 514, start UDP server on this port. Either a single port can be specifie
<br>Array of ports: Port=["514","515","10514","..."]</li>
<li><b>Ruleset</b> &lt;ruleset&gt;<br>
Binds the listener to a specific <a href="multi_ruleset.html">ruleset</a>.</li>
+<li><b>RateLimit.Interval</b> [number] - (available since 7.3.1) specifies the rate-limiting
+interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number
+of seconds (5 recommended) to activate rate-limiting.
+</li>
+<li><b>RateLimit.Burst</b> [number] - (available since 7.3.1) specifies the rate-limiting
+burst in number of messages. Default is 10,000.
+</li>
</ul>
<b>Caveats/Known Bugs:</b>
<ul>
diff --git a/doc/v7compatibility.html b/doc/v7compatibility.html
index 8834cd54..24c17d72 100644
--- a/doc/v7compatibility.html
+++ b/doc/v7compatibility.html
@@ -51,6 +51,43 @@ errors and error messages. Starting with 7.2.1, this has been reduced to 10
successive failures. This still gives the plugin a chance to recover. In extreme
cases, a plugin may now enter suspend mode where it previously did not do so.
In practice, we do NOT expect that.
+<h1>Notes for the 7.3/7.4 branch</h1>
+<h2>"last message repeated n times" Processing</h2>
+<p>This processing has been optimized and moved to the input side. This results
+in usually far better performance and also de-couples different sources
+from the same
+processing. It is now also integrated in to the more generic rate-limiting
+processing.
+<h3>User-Noticable Changes</h3>
+The code works almost as before, with two exceptions:
+<ul>
+<li>The supression amount can be different, as the new algorithm
+ precisely check's a single source, and while that source is being
+ read. The previous algorithm worked on a set of mixed messages
+ from multiple sources.
+<li>The previous algorithm wrote a "last message repeated n times" message
+ at least every 60 seconds. For performance reasons, we do no longer do
+ this but write this message only when a new message arrives or rsyslog
+ is shut down.
+</ul>
+<p>Note that the new algorithms needs support from input modules. If old
+modules which do not have the necessary support are used, duplicate
+messages will most probably not be detected. Upgrading the module code is
+simple, and all rsyslog-provided plugins support the new method, so this
+should not be a real problem (crafting a solution would result in rather
+complex code - for a case that most probably would never happen).
+<h3>Performance Implications</h3>
+<p>In general, the new method enables far faster output procesing. However, it
+needs to be noted that the "last message repeated n" processing needs parsed
+messages in order to detect duplicated. Consequently, if it is enabled the
+parser step cannot be deferred to the main queue processing thread and
+thus must be done during input processing. The changes workload distribution
+and may have (good or bad) effect on the overall performance. If you have
+a very high performance installation, it is suggested to check the performance
+profile before deploying the new version. Note: for high-performance
+environments it is highly recommended NOT to use "last message repeated n times"
+processing but rather the other (more efficient) rate-limiting methods. These
+also do NOT require the parsing step to be done during input processing.
<p><font size="2">This documentation is part of the
<a href="http://www.rsyslog.com/">rsyslog</a> project.<br>
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 09742537..15948215 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -53,6 +53,7 @@
#include "srUtils.h"
#include "msg.h"
#include "datetime.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
MODULE_TYPE_INPUT
@@ -200,7 +201,7 @@ finalize_it:
/* actually submit a message to the rsyslog core
*/
static rsRetVal
-doInjectMsg(int iNum)
+doInjectMsg(int iNum, ratelimit_t *ratelimiter)
{
uchar szMsg[1024];
msg_t *pMsg;
@@ -220,7 +221,7 @@ doInjectMsg(int iNum)
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
MsgSetRcvFrom(pMsg, pRcvDummy);
CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg));
finalize_it:
RETiRet;
@@ -238,6 +239,7 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
int iFrom;
int nMsgs;
int i;
+ ratelimit_t *ratelimit;
DEFiRet;
/* we do not check errors here! */
@@ -245,13 +247,15 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
iFrom = atoi((char*)wordBuf);
getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE);
nMsgs = atoi((char*)wordBuf);
+ ratelimitNew(&ratelimit, "imdiag", "injectmsg");
for(i = 0 ; i < nMsgs ; ++i) {
- doInjectMsg(i + iFrom);
+ doInjectMsg(i + iFrom, ratelimit);
}
CHKiRet(sendResponse(pSess, "%d messages injected\n", nMsgs));
DBGPRINTF("imdiag: %d messages injected\n", nMsgs);
+ ratelimitDestruct(ratelimit);
finalize_it:
RETiRet;
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 453b6b05..d50f917e 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -48,6 +48,7 @@
#include "prop.h"
#include "stringbuf.h"
#include "ruleset.h"
+#include "ratelimit.h"
MODULE_TYPE_INPUT /* must be present for input modules, do not remove */
MODULE_TYPE_NOKEEP
@@ -82,6 +83,7 @@ typedef struct fileInfo_s {
strm_t *pStrm; /* its stream (NULL if not assigned) */
int readMode; /* which mode to use in ReadMulteLine call? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ ratelimit_t *ratelimiter;
multi_submit_t multiSub;
} fileInfo_t;
@@ -189,9 +191,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
MsgSetRuleset(pMsg, pInfo->pRuleset);
- pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg;
- if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem)
- CHKiRet(multiSubmitMsg(&pInfo->multiSub));
+ ratelimitAddMsg(pInfo->ratelimiter, &pInfo->multiSub, pMsg);
finalize_it:
RETiRet;
}
@@ -304,18 +304,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
finalize_it:
- if(pThis->multiSub.nElem > 0) {
- /* submit everything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&pThis->multiSub));
- }
- ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */
- /* Note: the problem above is that pthread:cleanup_pop() is a macro which
- * evaluates to something like "} while(0);". So the code would become
- * "finalize_it: }", that is a label without a statement. The C standard does
- * not permit this. So we add an empty statement "finalize_it: ; }" and
- * everybody is happy. Note that without the ;, an error is reported only
- * on some platforms/compiler versions. -- rgerhards, 2008-08-15
- */
+ multiSubmitFlush(&pThis->multiSub);
pthread_cleanup_pop(0);
if(pCStr != NULL) {
@@ -423,6 +412,7 @@ addListner(instanceConf_t *inst)
pThis->lenTag = ustrlen(pThis->pszTag);
pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile);
+ CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", (char*)inst->pszFileName));
CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*)));
pThis->multiSub.maxElem = inst->nMultiSub;
pThis->multiSub.nElem = 0;
@@ -773,6 +763,8 @@ CODESTARTafterRun
persistStrmState(&files[i]);
strm.Destruct(&(files[i].pStrm));
}
+ ratelimitDestruct(files[i].ratelimiter);
+ free(files[i].multiSub.ppMsgs);
free(files[i].pszFileName);
free(files[i].pszTag);
free(files[i].pszStateFile);
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index 2897d76d..6eed33fe 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -101,11 +101,8 @@ static struct cnfparamblk modpblk =
modpdescr
};
-
-
static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */
-static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */
-
+static prop_t *pLocalHostIP = NULL;
static inline void
initConfigSettings(void)
@@ -148,7 +145,8 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *
MsgSetTAG(pMsg, pszTag, ustrlen(pszTag));
pMsg->iFacility = iFacility;
pMsg->iSeverity = iSeverity;
- CHKiRet(submitMsg(pMsg));
+ /* note: we do NOT use rate-limiting, as the kernel itself does rate-limiting */
+ CHKiRet(submitMsg2(pMsg));
finalize_it:
RETiRet;
diff --git a/plugins/imkmsg/imkmsg.c b/plugins/imkmsg/imkmsg.c
index 2a97f82d..d1a83879 100644
--- a/plugins/imkmsg/imkmsg.c
+++ b/plugins/imkmsg/imkmsg.c
@@ -113,7 +113,7 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *
pMsg->iFacility = iFacility;
pMsg->iSeverity = iSeverity;
pMsg->json = json;
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg(pMsg, NULL));
finalize_it:
RETiRet;
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c
index 62599969..9bd11556 100644
--- a/plugins/impstats/impstats.c
+++ b/plugins/impstats/impstats.c
@@ -138,7 +138,8 @@ doSubmitMsg(uchar *line)
pMsg->iSeverity = runModConf->iSeverity;
pMsg->msgFlags = 0;
- submitMsg(pMsg);
+ /* we do not use rate-limiting, as the stats message always need to be emitted */
+ submitMsg2(pMsg);
finalize_it:
RETiRet;
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 8150fc33..0475e219 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -67,6 +67,7 @@
#include "ruleset.h"
#include "msg.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */
@@ -121,6 +122,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
};
@@ -158,6 +161,8 @@ static struct cnfparamdescr inppdescr[] = {
{ "keepalive.time", eCmdHdlrInt, 0 },
{ "keepalive.interval", eCmdHdlrInt, 0 },
{ "addtlframedelimiter", eCmdHdlrInt, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -195,6 +200,7 @@ struct ptcpsrv_s {
sbool bKeepAlive; /* support keep-alive packets */
sbool bEmitMsgOnClose;
sbool bSuppOctetFram;
+ ratelimit_t *ratelimiter;
};
/* the ptcp session object. Describes a single active session.
@@ -295,6 +301,7 @@ destructSess(ptcpsess_t *pSess)
static void
destructSrv(ptcpsrv_t *pSrv)
{
+ ratelimitDestruct(pSrv->ratelimiter);
prop.Destruct(&pSrv->pInputName);
pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->pszInputName);
@@ -679,14 +686,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult
MsgSetRuleset(pMsg, pSrv->pRuleset);
STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
- if(pMultiSub == NULL) {
- CHKiRet(submitMsg(pMsg));
- } else {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg(pMultiSub));
- }
-
+ ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg);
finalize_it:
/* reset status variables */
@@ -805,12 +805,11 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
* we have just received a bunch of data! -- rgerhards, 2009-06-16
* EXTRACT from tcps_sess.c
*/
-#define NUM_MULTISUB 1024
static rsRetVal
DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
{
multi_submit_t multiSub;
- msg_t *pMsgs[NUM_MULTISUB];
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
struct syslogTime stTime;
time_t ttGenTime;
char *pEnd;
@@ -821,7 +820,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
datetime.getCurrTime(&stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
- multiSub.maxElem = NUM_MULTISUB;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
/* We now copy the message to the session buffer. */
@@ -831,15 +830,11 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
- if(multiSub.nElem > 0) {
- /* submit anything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&multiSub));
- }
+ iRet = multiSubmitFlush(&multiSub);
finalize_it:
RETiRet;
}
-#undef NUM_MULTISUB
/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
@@ -1051,6 +1046,8 @@ createInstance(instanceConf_t **pinst)
inst->bEmitMsgOnClose = 0;
inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
inst->pBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -1130,6 +1127,9 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort));
+ ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst);
+ ratelimitSetThreadSafe(pSrv->ratelimiter);
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
if(inst->pszBindAddr == NULL)
@@ -1458,6 +1458,10 @@ CODESTARTnewInpInst
inst->iAddtlFrameDelim = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) {
inst->bEmitMsgOnClose = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imptcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c
index fe987a50..31f82b14 100644
--- a/plugins/imrelp/imrelp.c
+++ b/plugins/imrelp/imrelp.c
@@ -113,11 +113,29 @@ static struct cnfparamblk inppblk =
* we will only see the hostname (twice). -- rgerhards, 2009-10-14
*/
static relpRetVal
-onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *pMsg, size_t lenMsg)
+onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *msg, size_t lenMsg)
{
+ prop_t *pProp = NULL;
+ msg_t *pMsg;
DEFiRet;
- parseAndSubmitMessage(pHostname, pIP, pMsg, lenMsg, PARSE_HOSTNAME,
- eFLOWCTL_LIGHT_DELAY, pInputName, NULL, 0, runModConf->pBindRuleset);
+
+ CHKiRet(msgConstruct(&pMsg));
+ MsgSetInputName(pMsg, pInputName);
+ MsgSetRawMsg(pMsg, (char*)msg, lenMsg);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
+ MsgSetRuleset(pMsg, runModConf->pBindRuleset);
+ pMsg->msgFlags = PARSE_HOSTNAME | NEEDS_PARSING;
+
+ /* TODO: optimize this, we can store it inside the session, requires
+ * changes to librelp --> next librelp iteration?. rgerhards, 2012-10-29
+ */
+ MsgSetRcvFromStr(pMsg, pHostname, ustrlen(pHostname), &pProp);
+ CHKiRet(prop.Destruct(&pProp));
+ CHKiRet(MsgSetRcvFromIPStr(pMsg, pIP, ustrlen(pIP), &pProp));
+ CHKiRet(prop.Destruct(&pProp));
+ CHKiRet(submitMsg2(pMsg));
+
+finalize_it:
RETiRet;
}
diff --git a/plugins/imsolaris/imsolaris.c b/plugins/imsolaris/imsolaris.c
index a220e72a..1e7d9b0f 100644
--- a/plugins/imsolaris/imsolaris.c
+++ b/plugins/imsolaris/imsolaris.c
@@ -212,7 +212,7 @@ readLog(int fd, uchar *pRcv, int iMaxLine)
pMsg->iFacility = LOG_FAC(hdr.pri);
pMsg->iSeverity = LOG_PRI(hdr.pri);
pMsg->msgFlags = NEEDS_PARSING | NO_PRI_IN_RAW;
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg(pMsg, NULL));
}
finalize_it:
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index eaf9a213..8d71d5f2 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -105,6 +105,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
+ int ratelimitInterval;
+ int ratelimitBurst;
int bSuppOctetFram;
struct instanceConf_s *next;
};
@@ -155,7 +157,9 @@ static struct cnfparamdescr inppdescr[] = {
{ "port", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
{ "name", eCmdHdlrString, 0 },
{ "ruleset", eCmdHdlrString, 0 },
- { "supportOctetCountedFraming", eCmdHdlrBinary, 0 }
+ { "supportOctetCountedFraming", eCmdHdlrBinary, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -251,6 +255,8 @@ createInstance(instanceConf_t **pinst)
inst->pszBindRuleset = NULL;
inst->pszInputName = NULL;
inst->bSuppOctetFram = 1;
+ inst->ratelimitInterval = 0;
+ inst->ratelimitBurst = 10000;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -334,6 +340,7 @@ addListner(modConfData_t *modConf, instanceConf_t *inst)
CHKiRet(tcpsrv.SetRuleset(pOurTcpsrv, inst->pBindRuleset));
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, inst->pszInputName == NULL ?
UCHAR_CONSTANT("imtcp") : inst->pszInputName));
+ CHKiRet(tcpsrv.SetLinuxLikeRatelimiters(pOurTcpsrv, inst->ratelimitInterval, inst->ratelimitBurst));
tcpsrv.configureTCPListen(pOurTcpsrv, inst->pszBindPort, inst->bSuppOctetFram);
finalize_it:
@@ -376,6 +383,10 @@ CODESTARTnewInpInst
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "supportOctetCountedFraming")) {
inst->bSuppOctetFram = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imtcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -580,7 +591,9 @@ ENDwillRun
BEGINafterRun
CODESTARTafterRun
- /* do cleanup here */
+ if(pOurTcpsrv != NULL)
+ iRet = tcpsrv.Destruct(&pOurTcpsrv);
+
net.clearAllowedSenders(UCHAR_CONSTANT("TCP"));
ENDafterRun
@@ -594,9 +607,6 @@ ENDisCompatibleWithFeature
BEGINmodExit
CODESTARTmodExit
- if(pOurTcpsrv != NULL)
- iRet = tcpsrv.Destruct(&pOurTcpsrv);
-
if(pPermPeersRoot != NULL) {
net.DestructPermittedPeers(&pPermPeersRoot);
}
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 782d7bee..9b6409c1 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -4,8 +4,6 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
- *
* Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
@@ -53,6 +51,7 @@
#include "prop.h"
#include "ruleset.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -77,6 +76,7 @@ static struct lstn_s {
int sock; /* socket */
ruleset_t *pRuleset; /* bound ruleset */
statsobj_t *stats; /* listener stats */
+ ratelimit_t *ratelimiter;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
} *lcnfRoot = NULL, *lcnfLast = NULL;
@@ -109,6 +109,8 @@ struct instanceConf_s {
uchar *pszBindPort; /* Port to bind socket to */
uchar *pszBindRuleset; /* name of ruleset to bind to */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
};
@@ -140,7 +142,9 @@ static struct cnfparamblk modpblk =
static struct cnfparamdescr inppdescr[] = {
{ "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
{ "address", eCmdHdlrString, 0 },
- { "ruleset", eCmdHdlrString, 0 }
+ { "ruleset", eCmdHdlrString, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -165,6 +169,8 @@ createInstance(instanceConf_t **pinst)
inst->pszBindPort = NULL;
inst->pszBindAddr = NULL;
inst->pszBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -223,7 +229,7 @@ addListner(instanceConf_t *inst)
struct lstn_s *newlcnfinfo;
uchar *bindName;
uchar *port;
- uchar statname[64];
+ uchar dispname[64];
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -248,11 +254,14 @@ addListner(instanceConf_t *inst)
newlcnfinfo->next = NULL;
newlcnfinfo->sock = newSocks[iSrc];
newlcnfinfo->pRuleset = inst->pBindRuleset;
+ snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port);
+ dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL));
+ ratelimitSetLinuxLike(newlcnfinfo->ratelimiter, inst->ratelimitInterval,
+ inst->ratelimitBurst);
/* support statistics gathering */
CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
- snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
- statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
- CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname));
STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
@@ -304,7 +313,6 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta
static inline rsRetVal
processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
- DEFiRet;
int iNbrTimeUsed;
time_t ttGenTime;
struct syslogTime stTime;
@@ -314,9 +322,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
msg_t *pMsg;
prop_t *propFromHost = NULL;
prop_t *propFromHostIP = NULL;
+ multi_submit_t multiSub;
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
char errStr[1024];
+ DEFiRet;
assert(pThrd != NULL);
+ multiSub.ppMsgs = pMsgs;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
+ multiSub.nElem = 0;
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a bad receive, done below in the body */
if(pThrd->bShallStop == RSTRUE)
@@ -383,12 +397,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
if(*pbIsPermitted == 2)
pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg));
STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit);
}
}
+
finalize_it:
+ multiSubmitFlush(&multiSub);
+
if(propFromHost != NULL)
prop.Destruct(&propFromHost);
if(propFromHostIP != NULL)
@@ -682,6 +699,10 @@ createListner(es_str_t *port, struct cnfparamvals *pvals)
inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "ruleset")) {
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imudp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -884,6 +905,7 @@ CODESTARTafterRun
net.clearAllowedSenders((uchar*)"UDP");
for(lstn = lcnfRoot ; lstn != NULL ; ) {
statsobj.Destruct(&(lstn->stats));
+ ratelimitDestruct(lstn->ratelimiter);
close(lstn->sock);
lstnDel = lstn;
lstn = lstn->next;
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index d5e4bb31..1b7dface 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -55,6 +55,7 @@
#include "statsobj.h"
#include "datetime.h"
#include "hashtable.h"
+#include "ratelimit.h"
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
@@ -105,15 +106,6 @@ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit)
STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters)
-struct rs_ratelimit_state {
- unsigned short interval;
- unsigned short burst;
- unsigned done;
- unsigned missed;
- time_t begin;
-};
-typedef struct rs_ratelimit_state rs_ratelimit_state_t;
-
/* a very simple "hash function" for process IDs - we simply use the
* pid itself: it is quite expected that all pids may log some time, but
@@ -143,6 +135,7 @@ typedef struct lstn_s {
int flowCtl; /* flow control settings for this socket */
int ratelimitInterval;
int ratelimitBurst;
+ ratelimit_t *dflt_ratelimiter;/*ratelimiter to apply if none else is to be used */
intTiny ratelimitSev; /* severity level (and below) for which rate-limiting shall apply */
struct hashtable *ht; /* our hashtable for rate-limiting */
sbool bParseHost; /* should parser parse host name? read-only after startup */
@@ -271,74 +264,9 @@ static struct cnfparamblk inppblk =
/* we do not use this, because we do not bind to a ruleset so far
* enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */
-static void
-initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst)
-{
- rs->interval = interval;
- rs->burst = burst;
- rs->done = 0;
- rs->missed = 0;
- rs->begin = 0;
-}
-
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
-/* ratelimiting support, modelled after the linux kernel
- * returns 1 if message is within rate limit and shall be
- * processed, 0 otherwise.
- * This implementation is NOT THREAD-SAFE and must not
- * be called concurrently.
- */
-static inline int
-withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid)
-{
- int ret;
- uchar msgbuf[1024];
-
- if(rs->interval == 0) {
- ret = 1;
- goto finalize_it;
- }
-
- assert(rs->burst != 0);
-
- if(rs->begin == 0)
- rs->begin = tt;
-
- /* resume if we go out of out time window */
- if(tt > rs->begin + rs->interval) {
- if(rs->missed) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock lost %u messages from pid %lu due to rate-limiting",
- rs->missed, (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- rs->missed = 0;
- }
- rs->begin = 0;
- rs->done = 0;
- }
-
- /* do actual limit check */
- if(rs->burst > rs->done) {
- rs->done++;
- ret = 1;
- } else {
- if(rs->missed == 0) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock begins to drop messages from pid %lu due to rate-limiting",
- (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- }
- rs->missed++;
- ret = 0;
- }
-
-finalize_it:
- return ret;
-}
-
-
/* create input instance, set default paramters, and
* add it to the list of instances.
*/
@@ -445,7 +373,8 @@ addListner(instanceConf_t *inst)
CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
}
if(inst->ratelimitInterval > 0) {
- if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn,
+ (void(*)(void*))ratelimitDestruct)) == NULL) {
/* in this case, we simply turn off rate-limiting */
DBGPRINTF("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
@@ -464,6 +393,10 @@ addListner(instanceConf_t *inst)
listeners[nfd].bParseTrusted = inst->bParseTrusted;
listeners[nfd].bWritePid = inst->bWritePid;
listeners[nfd].bUseSysTimeStamp = inst->bUseSysTimeStamp;
+ CHKiRet(ratelimitNew(&listeners[nfd].dflt_ratelimiter, "imuxsock", NULL));
+ ratelimitSetLinuxLike(listeners[nfd].dflt_ratelimiter,
+ listeners[nfd].ratelimitInterval,
+ listeners[nfd].ratelimitBurst);
nfd++;
} else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
@@ -475,7 +408,7 @@ finalize_it:
}
-/* discard all log sockets except for "socket" 0. Data for it comes from
+/* discard/Destruct all log sockets except for "socket" 0. Data for it comes from
* the constant memory pool - and if not, it is freeed via some other pointer.
*/
static rsRetVal discardLogSockets(void)
@@ -493,6 +426,7 @@ static rsRetVal discardLogSockets(void)
if(listeners[i].ht != NULL) {
hashtable_destroy(listeners[i].ht, 1); /* 1 => free all values automatically */
}
+ ratelimitDestruct(listeners[i].dflt_ratelimiter);
}
return RS_RET_OK;
@@ -604,19 +538,22 @@ finalize_it:
* listener (the latter being a performance enhancement).
*/
static inline rsRetVal
-findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
+findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl)
{
- rs_ratelimit_state_t *rl;
+ ratelimit_t *rl;
int r;
pid_t *keybuf;
+ char pidbuf[256];
DEFiRet;
if(cred == NULL)
FINALIZE;
+#if 0 // TODO: check deactivated?
if(pLstn->ratelimitInterval == 0) {
*prl = NULL;
FINALIZE;
}
+#endif
rl = hashtable_search(pLstn->ht, &cred->pid);
if(rl == NULL) {
@@ -624,10 +561,13 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n",
(unsigned long) cred->pid);
STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters);
- CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t)));
+ snprintf(pidbuf, sizeof(pidbuf), "pid %lu",
+ (unsigned long) cred->pid);
+ pidbuf[sizeof(pidbuf)-1] = '\0'; /* to be on safe side */
+ CHKiRet(ratelimitNew(&rl, "imuxsock", pidbuf));
+ ratelimitSetLinuxLike(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
CHKmalloc(keybuf = malloc(sizeof(pid_t)));
*keybuf = cred->pid;
- initRatelimitState(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
r = hashtable_insert(pLstn->ht, keybuf, rl);
if(r == 0)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -636,6 +576,8 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
*prl = rl;
finalize_it:
+ if(*prl == NULL)
+ *prl = pLstn->dflt_ratelimiter;
RETiRet;
}
@@ -762,28 +704,6 @@ copyescaped(uchar *dstbuf, uchar *inbuf, int inlen)
}
-#if 0
-/* Creates new field to be added to event
- * used for SystemLogParseTrusted parsing
- */
-struct ee_field *
-createNewField(char *fieldname, char *value, int lenValue) {
- es_str_t *newStr;
- struct ee_value *newVal;
- struct ee_field *newField;
-
- newStr = es_newStrFromBuf(value, (es_size_t) lenValue);
-
- newVal = ee_newValue(ctxee);
- ee_setStrValue(newVal, newStr);
-
- newField = ee_newFieldFromNV(ctxee, fieldname, newVal);
-
- return newField;
-}
-#endif
-
-
/* submit received message to the queue engine
* We now parse the message according to expected format so that we
* can also mangle it if necessary.
@@ -802,8 +722,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
uchar bufParseTAG[CONF_TAG_MAXSIZE];
struct syslogTime st;
time_t tt;
- rs_ratelimit_state_t *ratelimiter = NULL;
int lenProp;
+ ratelimit_t *ratelimiter = NULL;
uchar propBuf[1024];
uchar msgbuf[8192];
uchar *pmsgbuf;
@@ -842,10 +762,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
tt = ts->tv_sec;
}
+#if 0 // TODO: think about stats counters (or wait for request...?)
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
FINALIZE;
}
+#endif
/* created trusted properties */
if(cred != NULL && pLstn->bAnnotate) {
@@ -976,8 +898,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName);
CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
- CHKiRet(submitMsg(pMsg));
-
+ ratelimitAddMsg(ratelimiter, NULL, pMsg);
STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
finalize_it:
RETiRet;
diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c
index 6c770c94..fd002265 100644
--- a/plugins/omruleset/omruleset.c
+++ b/plugins/omruleset/omruleset.c
@@ -120,7 +120,11 @@ CODESTARTdoAction
(char*) pData->pszRulesetName, pData->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
MsgSetRuleset(pMsg, pData->pRuleset);
- submitMsg(pMsg);
+ /* Note: we intentionally use submitMsg2() here, as we process messages
+ * that were already run through the rate-limiter. So it is (at least)
+ * questionable if they were rate-limited again.
+ */
+ submitMsg2(pMsg);
finalize_it:
ENDdoAction
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index c957782f..fbc92d9c 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -65,6 +65,8 @@ librsyslog_la_SOURCES = \
ruleset.h \
prop.c \
prop.h \
+ ratelimit.c \
+ ratelimit.h \
cfsysline.c \
cfsysline.h \
sd-daemon.c \
diff --git a/runtime/conf.c b/runtime/conf.c
index 23fb6bbd..c97391c6 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -607,13 +607,8 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) {
if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL,
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- else {
- dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* here check if the module is compatible with select features
+ * (currently, we have no such features!) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
conf->actions.nbrActions++; /* one more active action! */
}
diff --git a/runtime/glbl.c b/runtime/glbl.c
index a0997829..0e5cac20 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -210,7 +210,7 @@ setLocalHostIPIF(void __attribute__((unused)) *pVal, uchar *pNewVal)
if(propLocalIPIF != NULL) {
errmsg.LogError(0, RS_RET_ERR, "$LocalHostIPIF is already set "
"and cannot be reset; place it at TOP OF rsyslog.conf!");
- ABORT_FINALIZE(RS_RET_ERR_WRKDIR);
+ ABORT_FINALIZE(RS_RET_ERR);
}
localRet = net.GetIFIPAddr(pNewVal, AF_UNSPEC, myIP, (int) sizeof(myIP));
diff --git a/runtime/queue.c b/runtime/queue.c
index 0cd33701..fbf77108 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -127,7 +127,7 @@ static struct cnfparamblk pblk =
};
/* debug aid */
-static void displayBatchState(batch_t *pBatch)
+static inline void displayBatchState(batch_t *pBatch)
{
int i;
for(i = 0 ; i < pBatch->nElem ; ++i) {
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c
new file mode 100644
index 00000000..4b618fb5
--- /dev/null
+++ b/runtime/ratelimit.c
@@ -0,0 +1,359 @@
+/* ratelimit.c
+ * support for rate-limiting sources, including "last message
+ * repeated n times" processing.
+ *
+ * Copyright 2012 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include "rsyslog.h"
+#include "errmsg.h"
+#include "ratelimit.h"
+#include "datetime.h"
+#include "parser.h"
+#include "unicode-helper.h"
+#include "msg.h"
+#include "rsconf.h"
+#include "dirty.h"
+
+/* definitions for objects we access */
+DEFobjStaticHelpers
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(datetime)
+DEFobjCurrIf(parser)
+
+/* static data */
+
+/* generate a "repeated n times" message */
+static inline msg_t *
+ratelimitGenRepMsg(ratelimit_t *ratelimit)
+{
+ msg_t *repMsg;
+ size_t lenRepMsg;
+ uchar szRepMsg[1024];
+
+ if(ratelimit->nsupp == 1) { /* we simply use the original message! */
+ repMsg = MsgAddRef(ratelimit->pMsg);
+ } else {/* we need to duplicate, original message may still be in use in other
+ * parts of the system! */
+ if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) {
+ DBGPRINTF("Message duplication failed, dropping repeat message.\n");
+ goto done;
+ }
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
+ " message repeated %d times: [%.800s]",
+ ratelimit->nsupp, getMSG(ratelimit->pMsg));
+ MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
+ }
+
+done: return repMsg;
+}
+
+static inline rsRetVal
+doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
+{
+ int bNeedUnlockMutex = 0;
+ rsRetVal localRet;
+ DEFiRet;
+
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+
+ if(ratelimit->bThreadSafe) {
+ pthread_mutex_lock(&ratelimit->mut);
+ bNeedUnlockMutex = 1;
+ }
+
+ if( ratelimit->pMsg != NULL &&
+ getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
+ !ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) &&
+ !strcmp(getHOSTNAME(pMsg), getHOSTNAME(ratelimit->pMsg)) &&
+ !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(ratelimit->pMsg, LOCK_MUTEX)) &&
+ !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(ratelimit->pMsg, LOCK_MUTEX))) {
+ ratelimit->nsupp++;
+ DBGPRINTF("msg repeated %d times\n", ratelimit->nsupp);
+ /* use current message, so we have the new timestamp
+ * (means we need to discard previous one) */
+ msgDestruct(&ratelimit->pMsg);
+ ratelimit->pMsg = pMsg;
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ } else {/* new message, do "repeat processing" & save it */
+ if(ratelimit->pMsg != NULL) {
+ if(ratelimit->nsupp > 0) {
+ *ppRepMsg = ratelimitGenRepMsg(ratelimit);
+ ratelimit->nsupp = 0;
+ }
+ msgDestruct(&ratelimit->pMsg);
+ }
+ ratelimit->pMsg = MsgAddRef(pMsg);
+ }
+
+finalize_it:
+ if(bNeedUnlockMutex)
+ pthread_mutex_unlock(&ratelimit->mut);
+ RETiRet;
+}
+
+
+/* helper: tell how many messages we lost due to linux-like ratelimiting */
+static inline void
+tellLostCnt(ratelimit_t *ratelimit)
+{
+ uchar msgbuf[1024];
+ if(ratelimit->missed) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: %u messages lost due to rate-limiting",
+ ratelimit->name, ratelimit->missed);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ ratelimit->missed = 0;
+ }
+}
+
+/* Linux-like ratelimiting, modelled after the linux kernel
+ * returns 1 if message is within rate limit and shall be
+ * processed, 0 otherwise.
+ * This implementation is NOT THREAD-SAFE and must not
+ * be called concurrently.
+ */
+static inline int
+withinRatelimit(ratelimit_t *ratelimit, time_t tt)
+{
+ int ret;
+ uchar msgbuf[1024];
+
+ if(ratelimit->interval == 0) {
+ ret = 1;
+ goto finalize_it;
+ }
+
+ assert(ratelimit->burst != 0);
+
+ if(ratelimit->begin == 0)
+ ratelimit->begin = tt;
+
+ /* resume if we go out of out time window */
+ if(tt > ratelimit->begin + ratelimit->interval) {
+ tellLostCnt(ratelimit);
+ ratelimit->begin = 0;
+ ratelimit->done = 0;
+ }
+
+ /* do actual limit check */
+ if(ratelimit->burst > ratelimit->done) {
+ ratelimit->done++;
+ ret = 1;
+ } else {
+ if(ratelimit->missed == 0) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: begin to drop messages due to rate-limiting",
+ ratelimit->name);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ }
+ ratelimit->missed++;
+ ret = 0;
+ }
+
+finalize_it:
+ return ret;
+}
+
+
+/* ratelimit a message, that means:
+ * - handle "last message repeated n times" logic
+ * - handle actual (discarding) rate-limiting
+ * This function returns RS_RET_OK, if the caller shall process
+ * the message regularly and RS_RET_DISCARD if the caller must
+ * discard the message. The caller should also discard the message
+ * if another return status occurs. This places some burden on the
+ * caller logic, but provides best performance. Demanding this
+ * cooperative mode can enable a faulty caller to thrash up part
+ * of the system, but we accept that risk (a faulty caller can
+ * always do all sorts of evil, so...)
+ * If *ppRepMsg != NULL on return, the caller must enqueue that
+ * message before the original message.
+ */
+rsRetVal
+ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
+{
+ DEFiRet;
+
+ *ppRepMsg = NULL;
+ if(ratelimit->interval) {
+ if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) {
+ msgDestruct(&pMsg);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+ if(ratelimit->bReduceRepeatMsgs) {
+ CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg));
+ }
+finalize_it:
+ RETiRet;
+}
+
+/* returns 1, if the ratelimiter performs any checks and 0 otherwise */
+int
+ratelimitChecked(ratelimit_t *ratelimit)
+{
+ return ratelimit->interval || ratelimit->bReduceRepeatMsgs;
+}
+
+
+/* add a message to a ratelimiter/multisubmit structure.
+ * ratelimiting is automatically handled according to the ratelimit
+ * settings.
+ * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration)
+ */
+rsRetVal
+ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg)
+{
+ rsRetVal localRet;
+ msg_t *repMsg;
+ DEFiRet;
+
+ if(pMultiSub == NULL) {
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL)
+ CHKiRet(submitMsg2(repMsg));
+ if(localRet == RS_RET_OK)
+ CHKiRet(submitMsg2(pMsg));
+ } else {
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
+ if(localRet == RS_RET_OK) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* modname must be a static name (usually expected to be the module
+ * name and MUST be present. dynname may be NULL and can be used for
+ * dynamic information, e.g. PID or listener IP, ...
+ * Both values should be kept brief.
+ */
+rsRetVal
+ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname)
+{
+ ratelimit_t *pThis;
+ char namebuf[256];
+ DEFiRet;
+
+ CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t)));
+ if(modname == NULL)
+ modname ="*ERROR:MODULE NAME MISSING*";
+
+ if(dynname == NULL) {
+ pThis->name = strdup(modname);
+ } else {
+ snprintf(namebuf, sizeof(namebuf), "%s[%s]",
+ modname, dynname);
+ namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */
+ pThis->name = strdup(namebuf);
+ }
+ pThis->bReduceRepeatMsgs = loadConf->globals.bReduceRepeatMsgs;
+ *ppThis = pThis;
+finalize_it:
+ RETiRet;
+}
+
+
+/* enable linux-like ratelimiting */
+void
+ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst)
+{
+ ratelimit->interval = interval;
+ ratelimit->burst = burst;
+ ratelimit->done = 0;
+ ratelimit->missed = 0;
+ ratelimit->begin = 0;
+}
+
+
+/* enable thread-safe operations mode. This make sure that
+ * a single ratelimiter can be called from multiple threads. As
+ * this causes some overhead and is not always required, it needs
+ * to be explicitely enabled. This operation cannot be undone
+ * (think: why should one do that???)
+ */
+void
+ratelimitSetThreadSafe(ratelimit_t *ratelimit)
+{
+ ratelimit->bThreadSafe = 1;
+ pthread_mutex_init(&ratelimit->mut, NULL);
+}
+
+void
+ratelimitDestruct(ratelimit_t *ratelimit)
+{
+ msg_t *pMsg;
+ if(ratelimit->pMsg != NULL) {
+ if(ratelimit->nsupp > 0) {
+ pMsg = ratelimitGenRepMsg(ratelimit);
+ if(pMsg != NULL)
+ submitMsg2(pMsg);
+ }
+ msgDestruct(&ratelimit->pMsg);
+ }
+ tellLostCnt(ratelimit);
+ if(ratelimit->bThreadSafe)
+ pthread_mutex_destroy(&ratelimit->mut);
+ free(ratelimit->name);
+ free(ratelimit);
+}
+
+void
+ratelimitModExit(void)
+{
+ objRelease(datetime, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+}
+
+rsRetVal
+ratelimitModInit(void)
+{
+ DEFiRet;
+ CHKiRet(objGetObjInterface(&obj));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+finalize_it:
+ RETiRet;
+}
+
diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h
new file mode 100644
index 00000000..820817bc
--- /dev/null
+++ b/runtime/ratelimit.h
@@ -0,0 +1,51 @@
+/* header for ratelimit.c
+ *
+ * Copyright 2012 Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef INCLUDED_RATELIMIT_H
+#define INCLUDED_RATELIMIT_H
+
+struct ratelimit_s {
+ char *name; /**< rate limiter name, e.g. for user messages */
+ /* support for Linux kernel-type ratelimiting */
+ unsigned short interval;
+ unsigned short burst;
+ unsigned done;
+ unsigned missed;
+ time_t begin;
+ /* support for "last message repeated n times */
+ int bReduceRepeatMsgs; /**< shall we do "last message repeated n times" processing? */
+ unsigned nsupp; /**< nbr of msgs suppressed */
+ msg_t *pMsg;
+ sbool bThreadSafe; /**< do we need to operate in Thread-Safe mode? */
+ pthread_mutex_t mut; /**< mutex if thread-safe operation desired */
+};
+
+/* prototypes */
+rsRetVal ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname);
+void ratelimitSetThreadSafe(ratelimit_t *ratelimit);
+void ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst);
+rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep);
+rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg);
+void ratelimitDestruct(ratelimit_t *pThis);
+int ratelimitChecked(ratelimit_t *ratelimit);
+rsRetVal ratelimitModInit(void);
+void ratelimitModExit(void);
+
+#endif /* #ifndef INCLUDED_RATELIMIT_H */
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index dcaa1ad9..0fc67499 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -115,8 +115,8 @@ void cnfDoCfsysline(char *ln);
*/
BEGINobjConstruct(rsconf) /* be sure to specify the object type also in END macro! */
pThis->globals.bDebugPrintTemplateList = 1;
- pThis->globals.bDebugPrintModuleList = 1;
- pThis->globals.bDebugPrintCfSysLineHandlerList = 1;
+ pThis->globals.bDebugPrintModuleList = 0;
+ pThis->globals.bDebugPrintCfSysLineHandlerList = 0;
pThis->globals.bLogStatusMsgs = DFLT_bLogStatusMsgs;
pThis->globals.bErrMsgToStderr = 1;
pThis->globals.umask = -1;
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 7712269a..0bf0642a 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -69,6 +69,7 @@
* approach taken here is considered appropriate.
* rgerhards, 2010-06-24
*/
+#define CONF_NUM_MULTISUB 1024 /* default number of messages per multisub structure */
/* ############################################################# *
* # End Config Settings # *
diff --git a/runtime/stream.c b/runtime/stream.c
index 881f4c74..b8dd5150 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -1176,7 +1176,6 @@ static rsRetVal
doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
int zRet; /* zlib return state */
- sbool bzInitDone = RSFALSE;
DEFiRet;
unsigned outavail;
assert(pThis != NULL);
@@ -1195,7 +1194,6 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
}
pThis->bzInitDone = RSTRUE;
}
- bzInitDone = RSTRUE;
/* now doing the compression */
pThis->zstrm.next_in = (Bytef*) pBuf;
diff --git a/runtime/typedefs.h b/runtime/typedefs.h
index ccae08b2..1e0cb466 100644
--- a/runtime/typedefs.h
+++ b/runtime/typedefs.h
@@ -92,6 +92,7 @@ typedef struct cfgmodules_etry_s cfgmodules_etry_t;
typedef struct outchannels_s outchannels_t;
typedef struct modConfData_s modConfData_t;
typedef struct instanceConf_s instanceConf_t;
+typedef struct ratelimit_s ratelimit_t;
typedef int rs_size_t; /* we do never need more than 2Gig strings, signed permits to
* use -1 as a special flag. */
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */
diff --git a/tcps_sess.c b/tcps_sess.c
index e7149cb7..16fd94f5 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -47,6 +47,7 @@
#include "msg.h"
#include "datetime.h"
#include "prop.h"
+#include "ratelimit.h"
#include "debug.h"
@@ -264,14 +265,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG
MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset);
STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit);
- if(pMultiSub == NULL) {
- CHKiRet(submitMsg(pMsg));
- } else {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg(pMultiSub));
- }
-
+ ratelimitAddMsg(pThis->pLstnInfo->ratelimiter, pMultiSub, pMsg);
finalize_it:
/* reset status variables */
@@ -487,11 +481,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen)
while(pData < pEnd) {
CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
-
- if(multiSub.nElem > 0) {
- /* submit anything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&multiSub));
- }
+ iRet = multiSubmitFlush(&multiSub);
finalize_it:
RETiRet;
diff --git a/tcpsrv.c b/tcpsrv.c
index bf12f1fa..7ba557e0 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -72,6 +72,7 @@
#include "nspoll.h"
#include "errmsg.h"
#include "ruleset.h"
+#include "ratelimit.h"
#include "unicode-helper.h"
@@ -151,6 +152,9 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram)
snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort);
statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
CHKiRet(statsobj.SetName(pEntry->stats, statname));
+ CHKiRet(ratelimitNew(&pEntry->ratelimiter, "tcperver", NULL));
+ ratelimitSetLinuxLike(pEntry->ratelimiter, pThis->ratelimitInterval, pThis->ratelimitBurst);
+ ratelimitSetThreadSafe(pEntry->ratelimiter);
STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(pEntry->ctrSubmit)));
@@ -295,6 +299,7 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
while(pEntry != NULL) {
free(pEntry->pszPort);
prop.Destruct(&pEntry->pInputName);
+ ratelimitDestruct(pEntry->ratelimiter);
pDel = pEntry;
pEntry = pEntry->pNext;
free(pDel);
@@ -913,6 +918,8 @@ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macr
pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
pThis->bDisableLFDelim = 0;
pThis->OnMsgReceive = NULL;
+ pThis->ratelimitInterval = 0;
+ pThis->ratelimitBurst = 10000;
pThis->bUseFlowControl = 1;
ENDobjConstruct(tcpsrv)
@@ -1120,6 +1127,17 @@ finalize_it:
}
+/* Set the linux-like ratelimiter settings */
+static rsRetVal
+SetLinuxLikeRatelimiters(tcpsrv_t *pThis, int ratelimitInterval, int ratelimitBurst)
+{
+ DEFiRet;
+ pThis->ratelimitInterval = ratelimitInterval;
+ pThis->ratelimitBurst = ratelimitBurst;
+ RETiRet;
+}
+
+
/* Set the ruleset (ptr) to use */
static rsRetVal
SetRuleset(tcpsrv_t *pThis, ruleset_t *pRuleset)
@@ -1270,6 +1288,7 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->SetCBOnErrClose = SetCBOnErrClose;
pIf->SetOnMsgReceive = SetOnMsgReceive;
pIf->SetRuleset = SetRuleset;
+ pIf->SetLinuxLikeRatelimiters = SetLinuxLikeRatelimiters;
pIf->SetNotificationOnRemoteClose = SetNotificationOnRemoteClose;
finalize_it:
diff --git a/tcpsrv.h b/tcpsrv.h
index d66f682c..93e472c9 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -42,6 +42,7 @@ struct tcpLstnPortList_s {
ruleset_t *pRuleset; /**< associated ruleset */
statsobj_t *stats; /**< associated stats object */
sbool bSuppOctetFram; /**< do we support octect-counted framing? (if no->legay only!)*/
+ ratelimit_t *ratelimiter;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
tcpLstnPortList_t *pNext; /**< next port or NULL */
};
@@ -70,6 +71,8 @@ struct tcpsrv_s {
int addtlFrameDelim; /**< additional frame delimiter for plain TCP syslog framing (e.g. to handle NetScreen) */
int bDisableLFDelim; /**< if 1, standard LF frame delimiter is disabled (*very dangerous*) */
+ int ratelimitInterval;
+ int ratelimitBurst;
tcps_sess_t **pSessions;/**< array of all of our sessions */
void *pUsr; /**< a user-settable pointer (provides extensibility for "derived classes")*/
/* callbacks */
@@ -142,8 +145,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetUseFlowControl)(tcpsrv_t*, int);
/* added v11 -- rgerhards, 2011-05-09 */
rsRetVal (*SetKeepAlive)(tcpsrv_t*, int);
+ /* added v13 -- rgerhards, 2012-10-15 */
+ rsRetVal (*SetLinuxLikeRatelimiters)(tcpsrv_t *pThis, int interval, int burst);
ENDinterface(tcpsrv)
-#define tcpsrvCURR_IF_VERSION 12 /* increment whenever you change the interface structure! */
+#define tcpsrvCURR_IF_VERSION 13 /* increment whenever you change the interface structure! */
/* change for v4:
* - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10
* - SetInputName() added -- rgerhards, 2008-12-10
diff --git a/tests/runtime-dummy.c b/tests/runtime-dummy.c
index 5a9039bf..f6f2d07f 100644
--- a/tests/runtime-dummy.c
+++ b/tests/runtime-dummy.c
@@ -30,7 +30,6 @@
#include "rsyslog.h"
int bReduceRepeatMsgs = 0;
-int repeatinterval = 30;
int bActExecWhenPrevSusp = 0;
int iActExecOnceInterval = 1;
int MarkInterval = 30;
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 05cbfc13..e347794b 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -44,7 +44,6 @@
#include "rsyslog.h"
#define DEFUPRI (LOG_USER|LOG_NOTICE)
-#define TIMERINTVL 30 /* interval for checking flush, mark */
#include <unistd.h>
#include <stdlib.h>
@@ -127,6 +126,7 @@ extern int yydebug; /* interface to flex */
#include "dnscache.h"
#include "sd-daemon.h"
#include "rainerscript.h"
+#include "ratelimit.h"
/* definitions for objects we access */
DEFobjCurrIf(obj)
@@ -205,13 +205,6 @@ static int bFinished = 0; /* used by termination signal handler, read-only excep
*/
int iConfigVerify = 0; /* is this just a config verify run? */
-/* Intervals at which we flush out "message repeated" messages,
- * in seconds after previous message is logged. After each flush,
- * we move to the next interval until we reach the largest.
- * TODO: this shall go into action object! -- rgerhards, 2008-01-29
- */
-int repeatinterval[2] = { 30, 60 }; /* # of secs before flush */
-
#define LIST_DELIMITER ':' /* delimiter between two hosts */
static pid_t ppid; /* This is a quick and dirty hack used for spliting main/startup thread */
@@ -222,6 +215,8 @@ struct queuefilenames_s {
} *queuefilenames = NULL;
+static ratelimit_t *dflt_ratelimiter = NULL; /* ratelimiter for submits without explicit one */
+static ratelimit_t *internalMsg_ratelimiter = NULL; /* ratelimiter for rsyslog-own messages */
int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */
int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */
static int NoFork = 0; /* don't fork - don't run in daemon mode - read-only after startup */
@@ -413,7 +408,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int fla
CHKiRet(prop.Destruct(&pProp));
CHKiRet(MsgSetRcvFromIPStr(pMsg, hnameIP, ustrlen(hnameIP), &pProp));
CHKiRet(prop.Destruct(&pProp));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg2(pMsg));
finalize_it:
RETiRet;
@@ -433,6 +428,12 @@ submitErrMsg(int iErr, uchar *msg)
}
+static inline rsRetVal
+submitMsgWithDfltRatelimiter(msg_t *pMsg)
+{
+ return ratelimitAddMsg(dflt_ratelimiter, NULL, pMsg);
+}
+
/* rgerhards 2004-11-09: the following is a function that can be used
* to log a message orginating from the syslogd itself.
*/
@@ -484,50 +485,13 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags)
/* we have the queue, so we can simply provide the
* message to the queue engine.
*/
- submitMsg(pMsg);
+ ratelimitAddMsg(internalMsg_ratelimiter, NULL, pMsg);
+ //submitMsgWithDfltRatelimiter(pMsg);
}
finalize_it:
RETiRet;
}
-/* check message against ACL set
- * rgerhards, 2009-11-16
- */
-#if 0
-static inline rsRetVal
-chkMsgAgainstACL() {
- /* if we reach this point, we had a good receive and can process the packet received */
- /* check if we have a different sender than before, if so, we need to query some new values */
- if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) {
- CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP));
- memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */
- /* Here we check if a host is permitted to send us
- * syslog messages. If it isn't, we do not further
- * process the message but log a warning (if we are
- * configured to do this).
- * rgerhards, 2005-09-26
- */
- *pbIsPermitted = net.isAllowedSender((uchar*)"UDP",
- (struct sockaddr *)&frominet, (char*)fromHostFQDN);
-
- if(!*pbIsPermitted) {
- DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN);
- if(glbl.GetOption_DisallowWarning) {
- time_t tt;
-
- datetime.GetTime(&tt);
- if(tt > ttLastDiscard + 60) {
- ttLastDiscard = tt;
- errmsg.LogError(0, NO_ERRCODE,
- "UDP message from disallowed sender %s discarded",
- (char*)fromHost);
- }
- }
- }
- }
-}
-#endif
-
/* preprocess a batch of messages, that is ready them for actual processing. This is done
* as a first stage and totally in parallel to any other worker active in the system. So
@@ -620,7 +584,7 @@ int i;
* rgerhards, 2008-02-13
*/
rsRetVal
-submitMsg(msg_t *pMsg)
+submitMsg2(msg_t *pMsg)
{
qqueue_t *pQueue;
ruleset_t *pRuleset;
@@ -633,7 +597,7 @@ submitMsg(msg_t *pMsg)
/* if a plugin logs a message during shutdown, the queue may no longer exist */
if(pQueue == NULL) {
- DBGPRINTF("submitMsg() could not submit message - "
+ DBGPRINTF("submitMsg2() could not submit message - "
"queue does (no longer?) exist - ignored\n");
FINALIZE;
}
@@ -645,13 +609,19 @@ finalize_it:
RETiRet;
}
+rsRetVal
+submitMsg(msg_t *pMsg)
+{
+ return submitMsgWithDfltRatelimiter(pMsg);
+}
+
/* submit multiple messages at once, very similar to submitMsg, just
* for multi_submit_t. All messages need to go into the SAME queue!
* rgerhards, 2009-06-16
*/
rsRetVal
-multiSubmitMsg(multi_submit_t *pMultiSub)
+multiSubmitMsg2(multi_submit_t *pMultiSub)
{
int i;
qqueue_t *pQueue;
@@ -682,8 +652,23 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
finalize_it:
RETiRet;
}
+rsRetVal
+multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */
+{
+ return multiSubmitMsg2(pMultiSub);
+}
+/* flush multiSubmit, e.g. at end of read records */
+rsRetVal
+multiSubmitFlush(multi_submit_t *pMultiSub)
+{
+ DEFiRet;
+ if(pMultiSub->nElem > 0) {
+ iRet = multiSubmitMsg2(pMultiSub);
+ }
+ RETiRet;
+}
static void
@@ -702,43 +687,6 @@ reapchild()
}
-/* helper to doFlushRptdMsgs() to flush the individual action links via llExecFunc
- * rgerhards, 2007-08-02
- */
-DEFFUNC_llExecFunc(flushRptdMsgsActions)
-{
- action_t *pAction = (action_t*) pData;
- assert(pAction != NULL);
-
- BEGINfunc
- d_pthread_mutex_lock(&pAction->mutAction);
- /* TODO: time() performance: the call below could be moved to
- * the beginn of the llExec(). This makes it slightly less correct, but
- * in an acceptable way. -- rgerhards, 2008-09-16
- */
- if (pAction->f_prevcount && datetime.GetTime(NULL) >= REPEATTIME(pAction)) {
- DBGPRINTF("flush %s: repeated %d times, %d sec.\n",
- module.GetStateName(pAction->pMod), pAction->f_prevcount,
- repeatinterval[pAction->f_repeatcount]);
- actionWriteToAction(pAction);
- BACKOFF(pAction);
- }
- d_pthread_mutex_unlock(&pAction->mutAction);
-
- ENDfunc
- return RS_RET_OK; /* we ignore errors, we can not do anything either way */
-}
-
-
-/* This method flushes repeat messages.
- */
-static void
-doFlushRptdMsgs(void)
-{
- ruleset.IterateAllActions(runConf, flushRptdMsgsActions, NULL);
-}
-
-
static void debug_switch()
{
time_t tTime;
@@ -1264,7 +1212,7 @@ static inline void processImInternal(void)
msg_t *pMsg;
while(iminternalRemoveMsg(&pMsg) == RS_RET_OK) {
- submitMsg(pMsg);
+ submitMsgWithDfltRatelimiter(pMsg);
}
}
@@ -1331,49 +1279,22 @@ mainloop(void)
while(!bFinished){
/* this is now just a wait - please note that we do use a near-"eternal"
- * timeout of 1 day if we do not have repeated message reduction turned on
- * (which it is not by default). This enables us to help safe the environment
+ * timeout of 1 day. This enables us to help safe the environment
* by not unnecessarily awaking rsyslog on a regular tick (just think
* powertop, for example). In that case, we primarily wait for a signal,
* but a once-a-day wakeup should be quite acceptable. -- rgerhards, 2008-06-09
*/
- tvSelectTimeout.tv_sec = (runConf->globals.bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/;
- //tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */
+ tvSelectTimeout.tv_sec = 86400 /*1 day*/;
tvSelectTimeout.tv_usec = 0;
select(1, NULL, NULL, NULL, &tvSelectTimeout);
if(bFinished)
- break; /* exit as quickly as possible - see long comment below */
-
- /* If we received a HUP signal, we call doFlushRptdMsgs() a bit early. This
- * doesn't matter, because doFlushRptdMsgs() checks timestamps. What may happen,
- * however, is that the too-early call may lead to a bit too-late output
- * of "last message repeated n times" messages. But that is quite acceptable.
- * rgerhards, 2007-12-21
- * ... and just to explain, we flush here because that is exactly what the mainloop
- * shall do - provide a periodic interval in which not-yet-flushed messages will
- * be flushed. Be careful, there is a potential race condition: doFlushRptdMsgs()
- * needs to aquire a lock on the action objects. If, however, long-running consumers
- * cause the main queue worker threads to lock them for a long time, we may receive
- * a starvation condition, resulting in the mainloop being held on lock for an extended
- * period of time. That, in turn, could lead to unresponsiveness to termination
- * requests. It is especially important that the bFinished flag is checked before
- * doFlushRptdMsgs() is called (I know because I ran into that situation). I am
- * not yet sure if the remaining probability window of a termination-related
- * problem is large enough to justify changing the code - I would consider it
- * extremely unlikely that the problem ever occurs in practice. Fixing it would
- * require not only a lot of effort but would cost considerable performance. So
- * for the time being, I think the remaining risk can be accepted.
- * rgerhards, 2008-01-10
- */
- if(runConf->globals.bReduceRepeatMsgs == 1)
- doFlushRptdMsgs();
+ break; /* exit as quickly as possible */
if(bHadHUP) {
doHUP();
bHadHUP = 0;
continue;
}
- // TODO: remove execScheduled(); /* handle Apc calls (if any) */
}
ENDfunc
}
@@ -1469,6 +1390,7 @@ InitGlobalClasses(void)
CHKiRet(objUse(net, LM_NET_FILENAME));
dnscacheInit();
initRainerscript();
+ ratelimitModInit();
finalize_it:
if(iRet != RS_RET_OK) {
@@ -1507,6 +1429,7 @@ GlobalClassExit(void)
/* TODO: implement the rest of the deinit */
/* dummy "classes */
strExit();
+ ratelimitModExit();
#if 0
CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */
@@ -2043,6 +1966,12 @@ int realMain(int argc, char **argv)
}
CHKiRet(localRet);
+ CHKiRet(ratelimitNew(&dflt_ratelimiter, "rsyslogd", "dflt"));
+ /* TODO: add linux-type limiting capability */
+ CHKiRet(ratelimitNew(&internalMsg_ratelimiter, "rsyslogd", "internal_messages"));
+ ratelimitSetLinuxLike(internalMsg_ratelimiter, 5, 500);
+ /* TODO: make internalMsg ratelimit settings configurable */
+
if(bChDirRoot) {
if(chdir("/") != 0)
fprintf(stderr, "Can not do 'cd /' - still trying to run\n");