diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-10-15 16:39:36 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-10-15 16:39:36 +0200 |
commit | 34a88a7e9a9501593b6fe9f79cc96963c4da7cde (patch) | |
tree | df742c45987b068beeb12e1cdac8996edd79a86e /runtime/ratelimit.c | |
parent | 6ab4666622efeb9944bbf7b6e3581d1372465adb (diff) | |
download | rsyslog-34a88a7e9a9501593b6fe9f79cc96963c4da7cde.tar.gz rsyslog-34a88a7e9a9501593b6fe9f79cc96963c4da7cde.tar.bz2 rsyslog-34a88a7e9a9501593b6fe9f79cc96963c4da7cde.zip |
ratelimit: added linux-like ratelimiter type
Diffstat (limited to 'runtime/ratelimit.c')
-rw-r--r-- | runtime/ratelimit.c | 119 |
1 files changed, 109 insertions, 10 deletions
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index e94ad1bc..22e785a8 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -73,8 +73,16 @@ static inline rsRetVal doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { int bNeedUnlockMutex = 0; + rsRetVal localRet; DEFiRet; + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } + } + if(ratelimit->bThreadSafe) { pthread_mutex_lock(&ratelimit->mut); bNeedUnlockMutex = 1; @@ -110,6 +118,61 @@ finalize_it: RETiRet; } +/* Linux-like ratelimiting, modelled after the linux kernel + * returns 1 if message is within rate limit and shall be + * processed, 0 otherwise. + * This implementation is NOT THREAD-SAFE and must not + * be called concurrently. + */ +static inline int +withinRatelimit(ratelimit_t *ratelimit, time_t tt) +{ + int ret; + uchar msgbuf[1024]; + + if(ratelimit->interval == 0) { + ret = 1; + goto finalize_it; + } + + assert(ratelimit->burst != 0); + + if(ratelimit->begin == 0) + ratelimit->begin = tt; + + /* resume if we go out of out time window */ + if(tt > ratelimit->begin + ratelimit->interval) { + if(ratelimit->missed) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "%s: %u messages lost due to rate-limiting", + ratelimit->name, ratelimit->missed); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + ratelimit->missed = 0; + } + ratelimit->begin = 0; + ratelimit->done = 0; + } + + /* do actual limit check */ + if(ratelimit->burst > ratelimit->done) { + ratelimit->done++; + ret = 1; + } else { + if(ratelimit->missed == 0) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "%s: begin to drop messages due to rate-limiting", + ratelimit->name); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + } + ratelimit->missed++; + ret = 0; + } + +finalize_it: + return ret; +} + + /* ratelimit a message, that means: * - handle "last message repeated n times" logic * - handle actual (discarding) rate-limiting @@ -127,19 +190,13 @@ finalize_it: rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { - rsRetVal localRet; DEFiRet; -#warning be sure to parse only when actually required! - if((pMsg->msgFlags & NEEDS_PARSING) != 0) { - if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { - DBGPRINTF("Message discarded, parsing error %d\n", localRet); + *ppRepMsg = NULL; + if(ratelimit->bLinuxLike) { + if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) ABORT_FINALIZE(RS_RET_DISCARDMSG); - } } - - *ppRepMsg = NULL; - if(ratelimit->bReduceRepeatMsgs) { CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg)); } @@ -147,6 +204,13 @@ finalize_it: RETiRet; } +/* returns 1, if the ratelimiter performs any checks and 0 otherwise */ +int +ratelimitChecked(ratelimit_t *ratelimit) +{ + return ratelimit->bLinuxLike || ratelimit->bReduceRepeatMsgs; +} + /* add a message to a ratelimiter/multisubmit structure. * ratelimiting is automatically handled according to the ratelimit @@ -184,19 +248,53 @@ finalize_it: RETiRet; } + +/* modname must be a static name (usually expected to be the module + * name and MUST be present. dynname may be NULL and can be used for + * dynamic information, e.g. PID or listener IP, ... + * Both values should be kept brief. + */ rsRetVal -ratelimitNew(ratelimit_t **ppThis) +ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname) { ratelimit_t *pThis; + char namebuf[256]; DEFiRet; CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t))); + if(modname == NULL) + modname ="*ERROR:MODULE NAME MISSING*"; + + if(dynname == NULL) { + pThis->name = strdup(modname); + } else { + snprintf(namebuf, sizeof(namebuf), "%s[%s]", + modname, dynname); + namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */ + pThis->name = strdup(namebuf); + } pThis->bReduceRepeatMsgs = runConf->globals.bReduceRepeatMsgs; + if(pThis->bReduceRepeatMsgs) + pThis->bActive = 1; *ppThis = pThis; finalize_it: RETiRet; } + +/* enable linux-like ratelimiting */ +void +ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst) +{ + ratelimit->interval = interval; + ratelimit->burst = burst; + ratelimit->done = 0; + ratelimit->missed = 0; + ratelimit->begin = 0; + ratelimit->bLinuxLike = 1; +} + + /* enable thread-safe operations mode. This make sure that * a single ratelimiter can be called from multiple threads. As * this causes some overhead and is not always required, it needs @@ -224,6 +322,7 @@ ratelimitDestruct(ratelimit_t *ratelimit) } if(ratelimit->bThreadSafe) pthread_mutex_destroy(&ratelimit->mut); + free(ratelimit->name); free(ratelimit); } |