diff options
-rw-r--r-- | doc/v7compatibility.html | 22 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 14 | ||||
-rw-r--r-- | runtime/ratelimit.c | 127 | ||||
-rw-r--r-- | runtime/ratelimit.h | 6 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 |
5 files changed, 169 insertions, 1 deletions
diff --git a/doc/v7compatibility.html b/doc/v7compatibility.html index be89f666..bf4c0eba 100644 --- a/doc/v7compatibility.html +++ b/doc/v7compatibility.html @@ -25,6 +25,28 @@ has been implemented. Consequently, situations where the previous behaviour were desired need now to be solved differently. We do not think that this will cause any problems to anyone, especially as in v6 this was announced as a missing feature. +<h2>"last message repeated n times" Processing</h2> +<p>This processing has been optimized and moved to the input side. This results +in far better performance and also de-couples different sources from the same +processing. It is now also integrated in to the more generic rate-limiting +processing. The code works almost as before, with two exceptions: +<ul> +<li>The supression amount can be different, as the new algorithm + precisely check's a single source, and while that source is being + read. The previous algorithm worked on a set of mixed messages + from multiple sources. +<li>The previous algorithm wrote a "last message repeated n times" message + at least every 60 seconds. For performance reasons, we do no longer do + this but write this message only when a new message arrives or rsyslog + is shut down. +</ul> +<p>Note that the new algorithms needs support from input modules. If old +modules which do not have the necessary support are used, duplicate +messages will most probably not be detected. Upgrading the module code is +simple, and all rsyslog-provided plugins support the new method, so this +should not be a real problem (crafting a solution would result in rather +complex code - for a case that most probably would never happen). + <p><font size="2">This documentation is part of the <a href="http://www.rsyslog.com/">rsyslog</a> project.<br> Copyright © 2011-2012 by <a href="http://www.gerhards.net/rainer">Rainer Gerhards</a> and diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index a4933115..392709a0 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -55,6 +55,7 @@ #include "statsobj.h" #include "datetime.h" #include "hashtable.h" +#include "ratelimit.h" MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP @@ -811,6 +812,10 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim struct syslogTime dummyTS; struct json_object *json = NULL, *jval; DEFiRet; +rsRetVal localRet; +static ratelimit_t *ratelimit = NULL; +if(ratelimit == NULL) + ratelimitNew(&ratelimit); /* TODO: handle format errors?? */ /* we need to parse the pri first, because we need the severity for @@ -982,7 +987,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - CHKiRet(submitMsg(pMsg)); + localRet = ratelimitMsg(pMsg, ratelimit); + if(localRet == RS_RET_OK_HAVE_REPMSG) { +dbgprintf("DDDD: doing repeat submit!\n"); + CHKiRet(submitMsg2(ratelimitGetRepeatMsg(ratelimit), NULL)); + localRet = RS_RET_OK; + } + if(localRet == RS_RET_OK) + CHKiRet(submitMsg2(pMsg, NULL)); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index dacbc81d..6372602f 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -28,17 +28,143 @@ #include "rsyslog.h" #include "errmsg.h" #include "ratelimit.h" +#include "datetime.h" +#include "unicode-helper.h" +#include "msg.h" /* definitions for objects we access */ DEFobjStaticHelpers DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) +DEFobjCurrIf(datetime) /* static data */ +/* ratelimit a message, that means: + * - handle "last message repeated n times" logic + * - handle actual (discarding) rate-limiting + * This function returns RS_RET_OK, if the caller shall process + * the message regularly and RS_RET_DISCARD if the caller must + * discard the message. The caller should also discard the message + * if another return status occurs. This places some burden on the + * caller logic, but provides best performance. Demanding this + * cooperative mode can enable a faulty caller to thrash up part + * of the system, but we accept that risk (a faulty caller can + * always do all sorts of evil, so...) + * Note that the message pointer may be updated upon return. In + * this case, the ratelimiter is reponsible for handling the + * original message. + */ +rsRetVal +ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) +{ + DEFiRet; + + /* suppress duplicate messages */ + if( ratelimit->pMsg != NULL && + getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) && + !ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) && + !strcmp(getHOSTNAME(pMsg), getHOSTNAME(ratelimit->pMsg)) && + !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(ratelimit->pMsg, LOCK_MUTEX)) && + !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(ratelimit->pMsg, LOCK_MUTEX))) { + ratelimit->nsupp++; + DBGPRINTF("msg repeated %d times\n", ratelimit->nsupp); + /* use current message, so we have the new timestamp + * (means we need to discard previous one) */ + msgDestruct(&ratelimit->pMsg); + ratelimit->pMsg = MsgAddRef(pMsg); + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } else {/* new message, save it */ + /* first check if we have a previous message stored + * if so, emit and then discard it first + */ + if(ratelimit->pMsg != NULL) { + if(ratelimit->nsupp > 0) { + dbgprintf("DDDD: would need to emit 'repeat' message\n"); + if(ratelimit->repMsg != NULL) { + dbgprintf("ratelimiter: call sequence error, have " + "previous repeat message - discarding\n"); + msgDestruct(&ratelimit->repMsg); + } + ratelimit->repMsg = ratelimit->pMsg; + iRet = RS_RET_OK_HAVE_REPMSG; + } + } + ratelimit->pMsg = MsgAddRef(pMsg); + } + +finalize_it: + RETiRet; +} + +/* return the current repeat message or NULL, if none is present. This MUST + * be called after ratelimitMsg() returned RS_RET_OK_HAVE_REPMSG. It is + * important that the message returned by us is enqueued BEFORE the original + * message, otherwise users my imply different order. + * If a message object is returned, the caller must destruct it if no longer + * needed. + */ +msg_t * +ratelimitGetRepeatMsg(ratelimit_t *ratelimit) +{ + msg_t *repMsg; + size_t lenRepMsg; + uchar szRepMsg[1024]; +dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); + + /* we need to duplicate, original message may still be in use in other + * parts of the system! */ + if((repMsg = MsgDup(ratelimit->repMsg)) == NULL) { + DBGPRINTF("Message duplication failed, dropping repeat message.\n"); + goto done; + } + +#warning remove/enable after mailing list feedback +#if 0 + if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times", + pAction->f_prevcount); + } else { +#endif + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]", + ratelimit->nsupp, getMSG(ratelimit->repMsg)); +// } + + /* We now need to update the other message properties. Please note that digital + * signatures inside the message are invalidated. */ + datetime.getCurrTime(&(repMsg->tRcvdAt), &(repMsg->ttGenTime)); + memcpy(&repMsg->tTIMESTAMP, &repMsg->tRcvdAt, sizeof(struct syslogTime)); + MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg); + + if(ratelimit->repMsg != NULL) { + ratelimit->repMsg = NULL; + ratelimit->nsupp = 0; + } +done: return repMsg; +} + +rsRetVal +ratelimitNew(ratelimit_t **ppThis) +{ + ratelimit_t *pThis; + DEFiRet; + + CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t))); + *ppThis = pThis; +finalize_it: + RETiRet; +} + +void +ratelimitDestruct(ratelimit_t *pThis) +{ + free(pThis); +} + void ratelimitModExit(void) { + objRelease(datetime, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); } @@ -49,6 +175,7 @@ ratelimitModInit(void) DEFiRet; CHKiRet(objGetObjInterface(&obj)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); finalize_it: RETiRet; diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h index 6ebe4f9c..ba2c6b23 100644 --- a/runtime/ratelimit.h +++ b/runtime/ratelimit.h @@ -23,10 +23,16 @@ struct ratelimit_s { unsigned nsupp; /**< nbr of msgs suppressed */ + msg_t *pMsg; + msg_t *repMsg; /**< repeat message, temporary buffer */ /* dummy field list - TODO: implement */ }; /* prototypes */ +rsRetVal ratelimitNew(ratelimit_t **ppThis); +rsRetVal ratelimitMsg(msg_t *ppMsg, ratelimit_t *ratelimit); +msg_t * ratelimitGetRepeatMsg(ratelimit_t *ratelimit); +void ratelimitDestruct(ratelimit_t *pThis); rsRetVal ratelimitModInit(void); void ratelimitModExit(void); diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 4404c475..e98a9b8c 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -392,6 +392,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_JNAME_NOTFOUND = -2305, /**< JSON name not found (does not exist) */ RS_RET_INVLD_SETOP = -2305, /**< invalid variable set operation, incompatible type */ RS_RET_RULESET_EXISTS = -2306,/**< ruleset already exists */ + RS_RET_OK_HAVE_REPMSG = -2307,/**< processing was OK, but we do have a repeat message (ratelimiter status) */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ |