diff options
-rw-r--r-- | dirty.h | 2 | ||||
-rw-r--r-- | doc/v7compatibility.html | 19 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 19 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 5 | ||||
-rw-r--r-- | runtime/ratelimit.c | 49 | ||||
-rw-r--r-- | tools/syslogd.c | 43 |
6 files changed, 111 insertions, 26 deletions
@@ -31,6 +31,8 @@ rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub); rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg); rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub, ratelimit_t *ratelimit); rsRetVal submitMsg2(msg_t *pMsg, ratelimit_t *ratelimit); +rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub); +rsRetVal multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit); rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset); rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ diff --git a/doc/v7compatibility.html b/doc/v7compatibility.html index bf4c0eba..e954ca1d 100644 --- a/doc/v7compatibility.html +++ b/doc/v7compatibility.html @@ -27,9 +27,12 @@ problems to anyone, especially as in v6 this was announced as a missing feature. <h2>"last message repeated n times" Processing</h2> <p>This processing has been optimized and moved to the input side. This results -in far better performance and also de-couples different sources from the same +in usually far better performance and also de-couples different sources +from the same processing. It is now also integrated in to the more generic rate-limiting -processing. The code works almost as before, with two exceptions: +processing. +<h3>User-Noticable Changes</h3> +The code works almost as before, with two exceptions: <ul> <li>The supression amount can be different, as the new algorithm precisely check's a single source, and while that source is being @@ -46,6 +49,18 @@ messages will most probably not be detected. Upgrading the module code is simple, and all rsyslog-provided plugins support the new method, so this should not be a real problem (crafting a solution would result in rather complex code - for a case that most probably would never happen). +<h3>Performance Implications</h3> +<p>In general, the new method enables far faster output procesing. However, it +needs to be noted that the "last message repeated n" processing needs parsed +messages in order to detect duplicated. Consequently, if it is enabled the +parser step cannot be deferred to the main queue processing thread and +thus must be done during input processing. The changes workload distribution +and may have (good or bad) effect on the overall performance. If you have +a very high performance installation, it is suggested to check the performance +profike before deploying the new version. Note: for high-performance +environments it is highly recommended NOT to use "last message repeated n times" +processing but rather the other (more efficient) rate-limiting methods. These +also do NOT require the parsing step to be done during input processing. <p><font size="2">This documentation is part of the <a href="http://www.rsyslog.com/">rsyslog</a> project.<br> diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index f804a70e..6d2ecd2f 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -67,6 +67,7 @@ #include "ruleset.h" #include "msg.h" #include "statsobj.h" +#include "ratelimit.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -195,6 +196,7 @@ struct ptcpsrv_s { sbool bKeepAlive; /* support keep-alive packets */ sbool bEmitMsgOnClose; sbool bSuppOctetFram; + ratelimit_t *ratelimiter; }; /* the ptcp session object. Describes a single active session. @@ -295,6 +297,7 @@ destructSess(ptcpsess_t *pSess) static void destructSrv(ptcpsrv_t *pSrv) { + ratelimitDestruct(pSrv->ratelimiter); prop.Destruct(&pSrv->pInputName); pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->pszInputName); @@ -661,6 +664,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult msg_t *pMsg; ptcpsrv_t *pSrv; DEFiRet; +dbgprintf("DDDD: in imptcp doSubmitMSg()\n"); if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); @@ -679,14 +683,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult MsgSetRuleset(pMsg, pSrv->pRuleset); STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); - if(pMultiSub == NULL) { - CHKiRet(submitMsg2(pMsg, NULL)); - } else { - pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); - } - + multiSubmitAddMsg(pMultiSub, pMsg, pSrv->ratelimiter); finalize_it: /* reset status variables */ @@ -831,10 +828,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } - if(multiSub.nElem > 0) { - /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg2(&multiSub, NULL)); - } + iRet = multiSubmitFlush(&multiSub); finalize_it: RETiRet; @@ -1130,6 +1124,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; + CHKiRet(ratelimitNew(&pSrv->ratelimiter)); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; if(inst->pszBindAddr == NULL) diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 392709a0..1ff89353 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -812,6 +812,11 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim struct syslogTime dummyTS; struct json_object *json = NULL, *jval; DEFiRet; +#warning experimental code needs to be made production-ready! +/* we need to decide how many ratelimiters we use --> hashtable + also remove current homegrown ratelimiting functionality and + replace it with the new one. + */ rsRetVal localRet; static ratelimit_t *ratelimit = NULL; if(ratelimit == NULL) diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index 6372602f..03b9631e 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -29,14 +29,17 @@ #include "errmsg.h" #include "ratelimit.h" #include "datetime.h" +#include "parser.h" #include "unicode-helper.h" #include "msg.h" +#include "dirty.h" /* definitions for objects we access */ DEFobjStaticHelpers DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(datetime) +DEFobjCurrIf(parser) /* static data */ @@ -58,8 +61,17 @@ DEFobjCurrIf(datetime) rsRetVal ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) { + rsRetVal localRet; DEFiRet; + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } + } + + /* suppress duplicate messages */ if( ratelimit->pMsg != NULL && getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) && @@ -94,6 +106,7 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) } finalize_it: +dbgprintf("DDDD: in ratelimitMsg(): %d\n", iRet); RETiRet; } @@ -119,16 +132,9 @@ dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); goto done; } -#warning remove/enable after mailing list feedback -#if 0 - if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times", - pAction->f_prevcount); - } else { -#endif - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]", - ratelimit->nsupp, getMSG(ratelimit->repMsg)); -// } + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), + " message repeated %d times: [%.800s]", + ratelimit->nsupp, getMSG(ratelimit->repMsg)); /* We now need to update the other message properties. Please note that digital * signatures inside the message are invalidated. */ @@ -143,6 +149,7 @@ dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); done: return repMsg; } + rsRetVal ratelimitNew(ratelimit_t **ppThis) { @@ -156,9 +163,25 @@ finalize_it: } void -ratelimitDestruct(ratelimit_t *pThis) +ratelimitDestruct(ratelimit_t *ratelimit) { - free(pThis); + msg_t *pMsg; + if(ratelimit->pMsg != NULL) { + if(ratelimit->nsupp > 0) { + if(ratelimit->repMsg != NULL) { + dbgprintf("ratelimiter/destuct: call sequence error, have " + "previous repeat message - discarding\n"); + msgDestruct(&ratelimit->repMsg); + } + ratelimit->repMsg = ratelimit->pMsg; + pMsg = ratelimitGetRepeatMsg(ratelimit); + if(pMsg != NULL) + submitMsg(pMsg); + } + } else { + msgDestruct(&ratelimit->pMsg); + } + free(ratelimit); } void @@ -167,6 +190,7 @@ ratelimitModExit(void) objRelease(datetime, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); + objRelease(parser, CORE_COMPONENT); } rsRetVal @@ -177,6 +201,7 @@ ratelimitModInit(void) CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(parser, CORE_COMPONENT)); finalize_it: RETiRet; } diff --git a/tools/syslogd.c b/tools/syslogd.c index bfdb5081..e8957e13 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -699,6 +699,49 @@ multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */ } +/* add a message to a multisubmit structure. This handles ratelimiting. IF + * pMultiSub == NULL, a single-message enqueue happens. */ +rsRetVal +multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit) +{ + rsRetVal localRet; + DEFiRet; + + if(pMultiSub == NULL) { +dbgprintf("DDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n"); + CHKiRet(submitMsg(pMsg)); + } else { +dbgprintf("DDDD: have multisub!\n"); + localRet = ratelimitMsg(pMsg, ratelimit); + if(localRet == RS_RET_OK_HAVE_REPMSG) { +dbgprintf("DDDD: doing repeat submit!\n"); + pMultiSub->ppMsgs[pMultiSub->nElem++] = ratelimitGetRepeatMsg(ratelimit); + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); + localRet = RS_RET_OK; + } + if(localRet == RS_RET_OK) { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); + } + } + +finalize_it: + RETiRet; +} + +/* flush multiSubmit, e.g. at end of read records */ +rsRetVal +multiSubmitFlush(multi_submit_t *pMultiSub) +{ + DEFiRet; +dbgprintf("DDDD: multiSubmitFlish, nElem %d\n", pMultiSub->nElem); + if(pMultiSub->nElem > 0) { + iRet = multiSubmitMsg(pMultiSub); + } + RETiRet; +} static void |