summaryrefslogtreecommitdiffstats
path: root/runtime/ratelimit.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-10-15 16:39:36 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2012-10-15 16:39:36 +0200
commit34a88a7e9a9501593b6fe9f79cc96963c4da7cde (patch)
treedf742c45987b068beeb12e1cdac8996edd79a86e /runtime/ratelimit.c
parent6ab4666622efeb9944bbf7b6e3581d1372465adb (diff)
downloadrsyslog-34a88a7e9a9501593b6fe9f79cc96963c4da7cde.tar.gz
rsyslog-34a88a7e9a9501593b6fe9f79cc96963c4da7cde.tar.bz2
rsyslog-34a88a7e9a9501593b6fe9f79cc96963c4da7cde.zip
ratelimit: added linux-like ratelimiter type
Diffstat (limited to 'runtime/ratelimit.c')
-rw-r--r--runtime/ratelimit.c119
1 files changed, 109 insertions, 10 deletions
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c
index e94ad1bc..22e785a8 100644
--- a/runtime/ratelimit.c
+++ b/runtime/ratelimit.c
@@ -73,8 +73,16 @@ static inline rsRetVal
doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
{
int bNeedUnlockMutex = 0;
+ rsRetVal localRet;
DEFiRet;
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+
if(ratelimit->bThreadSafe) {
pthread_mutex_lock(&ratelimit->mut);
bNeedUnlockMutex = 1;
@@ -110,6 +118,61 @@ finalize_it:
RETiRet;
}
+/* Linux-like ratelimiting, modelled after the linux kernel
+ * returns 1 if message is within rate limit and shall be
+ * processed, 0 otherwise.
+ * This implementation is NOT THREAD-SAFE and must not
+ * be called concurrently.
+ */
+static inline int
+withinRatelimit(ratelimit_t *ratelimit, time_t tt)
+{
+ int ret;
+ uchar msgbuf[1024];
+
+ if(ratelimit->interval == 0) {
+ ret = 1;
+ goto finalize_it;
+ }
+
+ assert(ratelimit->burst != 0);
+
+ if(ratelimit->begin == 0)
+ ratelimit->begin = tt;
+
+ /* resume if we go out of out time window */
+ if(tt > ratelimit->begin + ratelimit->interval) {
+ if(ratelimit->missed) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: %u messages lost due to rate-limiting",
+ ratelimit->name, ratelimit->missed);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ ratelimit->missed = 0;
+ }
+ ratelimit->begin = 0;
+ ratelimit->done = 0;
+ }
+
+ /* do actual limit check */
+ if(ratelimit->burst > ratelimit->done) {
+ ratelimit->done++;
+ ret = 1;
+ } else {
+ if(ratelimit->missed == 0) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: begin to drop messages due to rate-limiting",
+ ratelimit->name);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ }
+ ratelimit->missed++;
+ ret = 0;
+ }
+
+finalize_it:
+ return ret;
+}
+
+
/* ratelimit a message, that means:
* - handle "last message repeated n times" logic
* - handle actual (discarding) rate-limiting
@@ -127,19 +190,13 @@ finalize_it:
rsRetVal
ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
{
- rsRetVal localRet;
DEFiRet;
-#warning be sure to parse only when actually required!
- if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
- if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
- DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ *ppRepMsg = NULL;
+ if(ratelimit->bLinuxLike) {
+ if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0)
ABORT_FINALIZE(RS_RET_DISCARDMSG);
- }
}
-
- *ppRepMsg = NULL;
-
if(ratelimit->bReduceRepeatMsgs) {
CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg));
}
@@ -147,6 +204,13 @@ finalize_it:
RETiRet;
}
+/* returns 1, if the ratelimiter performs any checks and 0 otherwise */
+int
+ratelimitChecked(ratelimit_t *ratelimit)
+{
+ return ratelimit->bLinuxLike || ratelimit->bReduceRepeatMsgs;
+}
+
/* add a message to a ratelimiter/multisubmit structure.
* ratelimiting is automatically handled according to the ratelimit
@@ -184,19 +248,53 @@ finalize_it:
RETiRet;
}
+
+/* modname must be a static name (usually expected to be the module
+ * name and MUST be present. dynname may be NULL and can be used for
+ * dynamic information, e.g. PID or listener IP, ...
+ * Both values should be kept brief.
+ */
rsRetVal
-ratelimitNew(ratelimit_t **ppThis)
+ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname)
{
ratelimit_t *pThis;
+ char namebuf[256];
DEFiRet;
CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t)));
+ if(modname == NULL)
+ modname ="*ERROR:MODULE NAME MISSING*";
+
+ if(dynname == NULL) {
+ pThis->name = strdup(modname);
+ } else {
+ snprintf(namebuf, sizeof(namebuf), "%s[%s]",
+ modname, dynname);
+ namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */
+ pThis->name = strdup(namebuf);
+ }
pThis->bReduceRepeatMsgs = runConf->globals.bReduceRepeatMsgs;
+ if(pThis->bReduceRepeatMsgs)
+ pThis->bActive = 1;
*ppThis = pThis;
finalize_it:
RETiRet;
}
+
+/* enable linux-like ratelimiting */
+void
+ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst)
+{
+ ratelimit->interval = interval;
+ ratelimit->burst = burst;
+ ratelimit->done = 0;
+ ratelimit->missed = 0;
+ ratelimit->begin = 0;
+ ratelimit->bLinuxLike = 1;
+}
+
+
/* enable thread-safe operations mode. This make sure that
* a single ratelimiter can be called from multiple threads. As
* this causes some overhead and is not always required, it needs
@@ -224,6 +322,7 @@ ratelimitDestruct(ratelimit_t *ratelimit)
}
if(ratelimit->bThreadSafe)
pthread_mutex_destroy(&ratelimit->mut);
+ free(ratelimit->name);
free(ratelimit);
}