From 4269e4578118f089021bc15cdd82bb0182a97aaf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 9 Oct 2012 18:54:25 +0200 Subject: new ratelimit: interface plumbing added no actual implementation yet done --- dirty.h | 6 +++-- plugins/imdiag/imdiag.c | 2 +- plugins/imfile/imfile.c | 4 ++-- plugins/imkmsg/imkmsg.c | 2 +- plugins/impstats/impstats.c | 2 +- plugins/imptcp/imptcp.c | 6 ++--- plugins/imsolaris/imsolaris.c | 2 +- plugins/imudp/imudp.c | 2 +- runtime/Makefile.am | 2 ++ runtime/ratelimit.c | 56 +++++++++++++++++++++++++++++++++++++++++++ runtime/ratelimit.h | 33 +++++++++++++++++++++++++ runtime/typedefs.h | 1 + tcps_sess.c | 6 ++--- tools/syslogd.c | 23 ++++++++++++++---- 14 files changed, 127 insertions(+), 20 deletions(-) create mode 100644 runtime/ratelimit.c create mode 100644 runtime/ratelimit.h diff --git a/dirty.h b/dirty.h index a3940cb9..3c602caa 100644 --- a/dirty.h +++ b/dirty.h @@ -27,8 +27,10 @@ #ifndef DIRTY_H_INCLUDED #define DIRTY_H_INCLUDED 1 -rsRetVal multiSubmitMsg(multi_submit_t *pMultiSub); -rsRetVal submitMsg(msg_t *pMsg); +rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub); +rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg); +rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub, ratelimit_t *ratelimit); +rsRetVal submitMsg2(msg_t *pMsg, ratelimit_t *ratelimit); rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset); rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 09742537..460b8472 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -220,7 +220,7 @@ doInjectMsg(int iNum) pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pRcvDummy); CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg2(pMsg, NULL)); finalize_it: RETiRet; diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 453b6b05..64a7e032 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -191,7 +191,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) MsgSetRuleset(pMsg, pInfo->pRuleset); pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg; if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem) - CHKiRet(multiSubmitMsg(&pInfo->multiSub)); + CHKiRet(multiSubmitMsg2(&pInfo->multiSub, NULL)); finalize_it: RETiRet; } @@ -306,7 +306,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) finalize_it: if(pThis->multiSub.nElem > 0) { /* submit everything that was not yet submitted */ - CHKiRet(multiSubmitMsg(&pThis->multiSub)); + CHKiRet(multiSubmitMsg2(&pThis->multiSub, NULL)); } ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ /* Note: the problem above is that pthread:cleanup_pop() is a macro which diff --git a/plugins/imkmsg/imkmsg.c b/plugins/imkmsg/imkmsg.c index 2a97f82d..d1a83879 100644 --- a/plugins/imkmsg/imkmsg.c +++ b/plugins/imkmsg/imkmsg.c @@ -113,7 +113,7 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval * pMsg->iFacility = iFacility; pMsg->iSeverity = iSeverity; pMsg->json = json; - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg(pMsg, NULL)); finalize_it: RETiRet; diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 62599969..ef2bebf7 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -138,7 +138,7 @@ doSubmitMsg(uchar *line) pMsg->iSeverity = runModConf->iSeverity; pMsg->msgFlags = 0; - submitMsg(pMsg); + submitMsg2(pMsg, NULL); finalize_it: RETiRet; diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 8150fc33..f804a70e 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -680,11 +680,11 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); if(pMultiSub == NULL) { - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg2(pMsg, NULL)); } else { pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg(pMultiSub)); + CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); } @@ -833,7 +833,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) if(multiSub.nElem > 0) { /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg(&multiSub)); + CHKiRet(multiSubmitMsg2(&multiSub, NULL)); } finalize_it: diff --git a/plugins/imsolaris/imsolaris.c b/plugins/imsolaris/imsolaris.c index a220e72a..1e7d9b0f 100644 --- a/plugins/imsolaris/imsolaris.c +++ b/plugins/imsolaris/imsolaris.c @@ -212,7 +212,7 @@ readLog(int fd, uchar *pRcv, int iMaxLine) pMsg->iFacility = LOG_FAC(hdr.pri); pMsg->iSeverity = LOG_PRI(hdr.pri); pMsg->msgFlags = NEEDS_PARSING | NO_PRI_IN_RAW; - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg(pMsg, NULL)); } finalize_it: diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0dda30ec..c0448cba 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -383,7 +383,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg2(pMsg, NULL)); STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } diff --git a/runtime/Makefile.am b/runtime/Makefile.am index 7af26d2b..7abbc258 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -65,6 +65,8 @@ librsyslog_la_SOURCES = \ ruleset.h \ prop.c \ prop.h \ + ratelimit.c \ + ratelimit.h \ cfsysline.c \ cfsysline.h \ sd-daemon.c \ diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c new file mode 100644 index 00000000..dacbc81d --- /dev/null +++ b/runtime/ratelimit.c @@ -0,0 +1,56 @@ +/* ratelimit.c + * support for rate-limiting sources, including "last message + * repeated n times" processing. + * + * Copyright 2012 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include +#include +#include + +#include "rsyslog.h" +#include "errmsg.h" +#include "ratelimit.h" + +/* definitions for objects we access */ +DEFobjStaticHelpers +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) + +/* static data */ + +void +ratelimitModExit(void) +{ + objRelease(glbl, CORE_COMPONENT); + objRelease(errmsg, CORE_COMPONENT); +} + +rsRetVal +ratelimitModInit(void) +{ + DEFiRet; + CHKiRet(objGetObjInterface(&obj)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); +finalize_it: + RETiRet; +} + diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h new file mode 100644 index 00000000..6ebe4f9c --- /dev/null +++ b/runtime/ratelimit.h @@ -0,0 +1,33 @@ +/* header for ratelimit.c + * + * Copyright 2012 Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef INCLUDED_RATELIMIT_H +#define INCLUDED_RATELIMIT_H + +struct ratelimit_s { + unsigned nsupp; /**< nbr of msgs suppressed */ + /* dummy field list - TODO: implement */ +}; + +/* prototypes */ +rsRetVal ratelimitModInit(void); +void ratelimitModExit(void); + +#endif /* #ifndef INCLUDED_RATELIMIT_H */ diff --git a/runtime/typedefs.h b/runtime/typedefs.h index ccae08b2..1e0cb466 100644 --- a/runtime/typedefs.h +++ b/runtime/typedefs.h @@ -92,6 +92,7 @@ typedef struct cfgmodules_etry_s cfgmodules_etry_t; typedef struct outchannels_s outchannels_t; typedef struct modConfData_s modConfData_t; typedef struct instanceConf_s instanceConf_t; +typedef struct ratelimit_s ratelimit_t; typedef int rs_size_t; /* we do never need more than 2Gig strings, signed permits to * use -1 as a special flag. */ typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */ diff --git a/tcps_sess.c b/tcps_sess.c index e7149cb7..be06a4e1 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -265,11 +265,11 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit); if(pMultiSub == NULL) { - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg2(pMsg, NULL)); } else { pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg(pMultiSub)); + CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); } @@ -490,7 +490,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) if(multiSub.nElem > 0) { /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg(&multiSub)); + CHKiRet(multiSubmitMsg2(&multiSub, NULL)); } finalize_it: diff --git a/tools/syslogd.c b/tools/syslogd.c index 8ba8edd3..bfdb5081 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -125,6 +125,7 @@ #include "dnscache.h" #include "sd-daemon.h" #include "rainerscript.h" +#include "ratelimit.h" /* definitions for objects we access */ DEFobjCurrIf(obj) @@ -417,7 +418,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int fla CHKiRet(prop.Destruct(&pProp)); CHKiRet(MsgSetRcvFromIPStr(pMsg, hnameIP, ustrlen(hnameIP), &pProp)); CHKiRet(prop.Destruct(&pProp)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg2(pMsg, NULL)); finalize_it: RETiRet; @@ -488,7 +489,7 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags) /* we have the queue, so we can simply provide the * message to the queue engine. */ - submitMsg(pMsg); + submitMsg2(pMsg, NULL); } finalize_it: RETiRet; @@ -624,7 +625,7 @@ int i; * rgerhards, 2008-02-13 */ rsRetVal -submitMsg(msg_t *pMsg) +submitMsg2(msg_t *pMsg, ratelimit_t *ratelimit) { qqueue_t *pQueue; ruleset_t *pRuleset; @@ -648,6 +649,11 @@ submitMsg(msg_t *pMsg) finalize_it: RETiRet; } +rsRetVal +submitMsg(msg_t *pMsg) /* backward compat. level */ +{ + return submitMsg2(pMsg, NULL); +} /* submit multiple messages at once, very similar to submitMsg, just @@ -655,7 +661,7 @@ finalize_it: * rgerhards, 2009-06-16 */ rsRetVal -multiSubmitMsg(multi_submit_t *pMultiSub) +multiSubmitMsg2(multi_submit_t *pMultiSub, ratelimit_t *ratelimit) { int i; qqueue_t *pQueue; @@ -686,6 +692,11 @@ multiSubmitMsg(multi_submit_t *pMultiSub) finalize_it: RETiRet; } +rsRetVal +multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */ +{ + return multiSubmitMsg2(pMultiSub, NULL); +} @@ -1268,7 +1279,7 @@ static inline void processImInternal(void) msg_t *pMsg; while(iminternalRemoveMsg(&pMsg) == RS_RET_OK) { - submitMsg(pMsg); + submitMsg2(pMsg, NULL); } } @@ -1473,6 +1484,7 @@ InitGlobalClasses(void) CHKiRet(objUse(net, LM_NET_FILENAME)); dnscacheInit(); initRainerscript(); + ratelimitModInit(); finalize_it: if(iRet != RS_RET_OK) { @@ -1511,6 +1523,7 @@ GlobalClassExit(void) /* TODO: implement the rest of the deinit */ /* dummy "classes */ strExit(); + ratelimitModExit(); #if 0 CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */ -- cgit v1.2.3 From c7aa09bbb277fbcdae1a5e612674fd85cb80a513 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 12 Oct 2012 18:30:51 +0200 Subject: milestone: experimental rate limiter used in imuxsock --- doc/v7compatibility.html | 22 ++++++++ plugins/imuxsock/imuxsock.c | 14 ++++- runtime/ratelimit.c | 127 ++++++++++++++++++++++++++++++++++++++++++++ runtime/ratelimit.h | 6 +++ runtime/rsyslog.h | 1 + 5 files changed, 169 insertions(+), 1 deletion(-) diff --git a/doc/v7compatibility.html b/doc/v7compatibility.html index be89f666..bf4c0eba 100644 --- a/doc/v7compatibility.html +++ b/doc/v7compatibility.html @@ -25,6 +25,28 @@ has been implemented. Consequently, situations where the previous behaviour were desired need now to be solved differently. We do not think that this will cause any problems to anyone, especially as in v6 this was announced as a missing feature. +

"last message repeated n times" Processing

+

This processing has been optimized and moved to the input side. This results +in far better performance and also de-couples different sources from the same +processing. It is now also integrated in to the more generic rate-limiting +processing. The code works almost as before, with two exceptions: +

    +
  • The supression amount can be different, as the new algorithm + precisely check's a single source, and while that source is being + read. The previous algorithm worked on a set of mixed messages + from multiple sources. +
  • The previous algorithm wrote a "last message repeated n times" message + at least every 60 seconds. For performance reasons, we do no longer do + this but write this message only when a new message arrives or rsyslog + is shut down. +
+

Note that the new algorithms needs support from input modules. If old +modules which do not have the necessary support are used, duplicate +messages will most probably not be detected. Upgrading the module code is +simple, and all rsyslog-provided plugins support the new method, so this +should not be a real problem (crafting a solution would result in rather +complex code - for a case that most probably would never happen). +

This documentation is part of the rsyslog project.
Copyright © 2011-2012 by Rainer Gerhards and diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index a4933115..392709a0 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -55,6 +55,7 @@ #include "statsobj.h" #include "datetime.h" #include "hashtable.h" +#include "ratelimit.h" MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP @@ -811,6 +812,10 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim struct syslogTime dummyTS; struct json_object *json = NULL, *jval; DEFiRet; +rsRetVal localRet; +static ratelimit_t *ratelimit = NULL; +if(ratelimit == NULL) + ratelimitNew(&ratelimit); /* TODO: handle format errors?? */ /* we need to parse the pri first, because we need the severity for @@ -982,7 +987,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - CHKiRet(submitMsg(pMsg)); + localRet = ratelimitMsg(pMsg, ratelimit); + if(localRet == RS_RET_OK_HAVE_REPMSG) { +dbgprintf("DDDD: doing repeat submit!\n"); + CHKiRet(submitMsg2(ratelimitGetRepeatMsg(ratelimit), NULL)); + localRet = RS_RET_OK; + } + if(localRet == RS_RET_OK) + CHKiRet(submitMsg2(pMsg, NULL)); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index dacbc81d..6372602f 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -28,17 +28,143 @@ #include "rsyslog.h" #include "errmsg.h" #include "ratelimit.h" +#include "datetime.h" +#include "unicode-helper.h" +#include "msg.h" /* definitions for objects we access */ DEFobjStaticHelpers DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) +DEFobjCurrIf(datetime) /* static data */ +/* ratelimit a message, that means: + * - handle "last message repeated n times" logic + * - handle actual (discarding) rate-limiting + * This function returns RS_RET_OK, if the caller shall process + * the message regularly and RS_RET_DISCARD if the caller must + * discard the message. The caller should also discard the message + * if another return status occurs. This places some burden on the + * caller logic, but provides best performance. Demanding this + * cooperative mode can enable a faulty caller to thrash up part + * of the system, but we accept that risk (a faulty caller can + * always do all sorts of evil, so...) + * Note that the message pointer may be updated upon return. In + * this case, the ratelimiter is reponsible for handling the + * original message. + */ +rsRetVal +ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) +{ + DEFiRet; + + /* suppress duplicate messages */ + if( ratelimit->pMsg != NULL && + getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) && + !ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) && + !strcmp(getHOSTNAME(pMsg), getHOSTNAME(ratelimit->pMsg)) && + !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(ratelimit->pMsg, LOCK_MUTEX)) && + !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(ratelimit->pMsg, LOCK_MUTEX))) { + ratelimit->nsupp++; + DBGPRINTF("msg repeated %d times\n", ratelimit->nsupp); + /* use current message, so we have the new timestamp + * (means we need to discard previous one) */ + msgDestruct(&ratelimit->pMsg); + ratelimit->pMsg = MsgAddRef(pMsg); + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } else {/* new message, save it */ + /* first check if we have a previous message stored + * if so, emit and then discard it first + */ + if(ratelimit->pMsg != NULL) { + if(ratelimit->nsupp > 0) { + dbgprintf("DDDD: would need to emit 'repeat' message\n"); + if(ratelimit->repMsg != NULL) { + dbgprintf("ratelimiter: call sequence error, have " + "previous repeat message - discarding\n"); + msgDestruct(&ratelimit->repMsg); + } + ratelimit->repMsg = ratelimit->pMsg; + iRet = RS_RET_OK_HAVE_REPMSG; + } + } + ratelimit->pMsg = MsgAddRef(pMsg); + } + +finalize_it: + RETiRet; +} + +/* return the current repeat message or NULL, if none is present. This MUST + * be called after ratelimitMsg() returned RS_RET_OK_HAVE_REPMSG. It is + * important that the message returned by us is enqueued BEFORE the original + * message, otherwise users my imply different order. + * If a message object is returned, the caller must destruct it if no longer + * needed. + */ +msg_t * +ratelimitGetRepeatMsg(ratelimit_t *ratelimit) +{ + msg_t *repMsg; + size_t lenRepMsg; + uchar szRepMsg[1024]; +dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); + + /* we need to duplicate, original message may still be in use in other + * parts of the system! */ + if((repMsg = MsgDup(ratelimit->repMsg)) == NULL) { + DBGPRINTF("Message duplication failed, dropping repeat message.\n"); + goto done; + } + +#warning remove/enable after mailing list feedback +#if 0 + if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times", + pAction->f_prevcount); + } else { +#endif + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]", + ratelimit->nsupp, getMSG(ratelimit->repMsg)); +// } + + /* We now need to update the other message properties. Please note that digital + * signatures inside the message are invalidated. */ + datetime.getCurrTime(&(repMsg->tRcvdAt), &(repMsg->ttGenTime)); + memcpy(&repMsg->tTIMESTAMP, &repMsg->tRcvdAt, sizeof(struct syslogTime)); + MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg); + + if(ratelimit->repMsg != NULL) { + ratelimit->repMsg = NULL; + ratelimit->nsupp = 0; + } +done: return repMsg; +} + +rsRetVal +ratelimitNew(ratelimit_t **ppThis) +{ + ratelimit_t *pThis; + DEFiRet; + + CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t))); + *ppThis = pThis; +finalize_it: + RETiRet; +} + +void +ratelimitDestruct(ratelimit_t *pThis) +{ + free(pThis); +} + void ratelimitModExit(void) { + objRelease(datetime, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); } @@ -49,6 +175,7 @@ ratelimitModInit(void) DEFiRet; CHKiRet(objGetObjInterface(&obj)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); finalize_it: RETiRet; diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h index 6ebe4f9c..ba2c6b23 100644 --- a/runtime/ratelimit.h +++ b/runtime/ratelimit.h @@ -23,10 +23,16 @@ struct ratelimit_s { unsigned nsupp; /**< nbr of msgs suppressed */ + msg_t *pMsg; + msg_t *repMsg; /**< repeat message, temporary buffer */ /* dummy field list - TODO: implement */ }; /* prototypes */ +rsRetVal ratelimitNew(ratelimit_t **ppThis); +rsRetVal ratelimitMsg(msg_t *ppMsg, ratelimit_t *ratelimit); +msg_t * ratelimitGetRepeatMsg(ratelimit_t *ratelimit); +void ratelimitDestruct(ratelimit_t *pThis); rsRetVal ratelimitModInit(void); void ratelimitModExit(void); diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 4404c475..e98a9b8c 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -392,6 +392,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_JNAME_NOTFOUND = -2305, /**< JSON name not found (does not exist) */ RS_RET_INVLD_SETOP = -2305, /**< invalid variable set operation, incompatible type */ RS_RET_RULESET_EXISTS = -2306,/**< ruleset already exists */ + RS_RET_OK_HAVE_REPMSG = -2307,/**< processing was OK, but we do have a repeat message (ratelimiter status) */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ -- cgit v1.2.3 From 49fb431e0ce15da6a1e72cb541eadea3a3096ab6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 08:41:04 +0200 Subject: milestone: ratelimiter used in imptcp --- dirty.h | 2 ++ doc/v7compatibility.html | 19 ++++++++++++++++-- plugins/imptcp/imptcp.c | 19 +++++++----------- plugins/imuxsock/imuxsock.c | 5 +++++ runtime/ratelimit.c | 49 ++++++++++++++++++++++++++++++++++----------- tools/syslogd.c | 43 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 111 insertions(+), 26 deletions(-) diff --git a/dirty.h b/dirty.h index 3c602caa..d82d5524 100644 --- a/dirty.h +++ b/dirty.h @@ -31,6 +31,8 @@ rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub); rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg); rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub, ratelimit_t *ratelimit); rsRetVal submitMsg2(msg_t *pMsg, ratelimit_t *ratelimit); +rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub); +rsRetVal multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit); rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset); rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ diff --git a/doc/v7compatibility.html b/doc/v7compatibility.html index bf4c0eba..e954ca1d 100644 --- a/doc/v7compatibility.html +++ b/doc/v7compatibility.html @@ -27,9 +27,12 @@ problems to anyone, especially as in v6 this was announced as a missing feature.

"last message repeated n times" Processing

This processing has been optimized and moved to the input side. This results -in far better performance and also de-couples different sources from the same +in usually far better performance and also de-couples different sources +from the same processing. It is now also integrated in to the more generic rate-limiting -processing. The code works almost as before, with two exceptions: +processing. +

User-Noticable Changes

+The code works almost as before, with two exceptions:
  • The supression amount can be different, as the new algorithm precisely check's a single source, and while that source is being @@ -46,6 +49,18 @@ messages will most probably not be detected. Upgrading the module code is simple, and all rsyslog-provided plugins support the new method, so this should not be a real problem (crafting a solution would result in rather complex code - for a case that most probably would never happen). +

    Performance Implications

    +

    In general, the new method enables far faster output procesing. However, it +needs to be noted that the "last message repeated n" processing needs parsed +messages in order to detect duplicated. Consequently, if it is enabled the +parser step cannot be deferred to the main queue processing thread and +thus must be done during input processing. The changes workload distribution +and may have (good or bad) effect on the overall performance. If you have +a very high performance installation, it is suggested to check the performance +profike before deploying the new version. Note: for high-performance +environments it is highly recommended NOT to use "last message repeated n times" +processing but rather the other (more efficient) rate-limiting methods. These +also do NOT require the parsing step to be done during input processing.

    This documentation is part of the rsyslog project.
    diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index f804a70e..6d2ecd2f 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -67,6 +67,7 @@ #include "ruleset.h" #include "msg.h" #include "statsobj.h" +#include "ratelimit.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -195,6 +196,7 @@ struct ptcpsrv_s { sbool bKeepAlive; /* support keep-alive packets */ sbool bEmitMsgOnClose; sbool bSuppOctetFram; + ratelimit_t *ratelimiter; }; /* the ptcp session object. Describes a single active session. @@ -295,6 +297,7 @@ destructSess(ptcpsess_t *pSess) static void destructSrv(ptcpsrv_t *pSrv) { + ratelimitDestruct(pSrv->ratelimiter); prop.Destruct(&pSrv->pInputName); pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->pszInputName); @@ -661,6 +664,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult msg_t *pMsg; ptcpsrv_t *pSrv; DEFiRet; +dbgprintf("DDDD: in imptcp doSubmitMSg()\n"); if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); @@ -679,14 +683,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult MsgSetRuleset(pMsg, pSrv->pRuleset); STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); - if(pMultiSub == NULL) { - CHKiRet(submitMsg2(pMsg, NULL)); - } else { - pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); - } - + multiSubmitAddMsg(pMultiSub, pMsg, pSrv->ratelimiter); finalize_it: /* reset status variables */ @@ -831,10 +828,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } - if(multiSub.nElem > 0) { - /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg2(&multiSub, NULL)); - } + iRet = multiSubmitFlush(&multiSub); finalize_it: RETiRet; @@ -1130,6 +1124,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; + CHKiRet(ratelimitNew(&pSrv->ratelimiter)); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; if(inst->pszBindAddr == NULL) diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 392709a0..1ff89353 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -812,6 +812,11 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim struct syslogTime dummyTS; struct json_object *json = NULL, *jval; DEFiRet; +#warning experimental code needs to be made production-ready! +/* we need to decide how many ratelimiters we use --> hashtable + also remove current homegrown ratelimiting functionality and + replace it with the new one. + */ rsRetVal localRet; static ratelimit_t *ratelimit = NULL; if(ratelimit == NULL) diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index 6372602f..03b9631e 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -29,14 +29,17 @@ #include "errmsg.h" #include "ratelimit.h" #include "datetime.h" +#include "parser.h" #include "unicode-helper.h" #include "msg.h" +#include "dirty.h" /* definitions for objects we access */ DEFobjStaticHelpers DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(datetime) +DEFobjCurrIf(parser) /* static data */ @@ -58,8 +61,17 @@ DEFobjCurrIf(datetime) rsRetVal ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) { + rsRetVal localRet; DEFiRet; + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } + } + + /* suppress duplicate messages */ if( ratelimit->pMsg != NULL && getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) && @@ -94,6 +106,7 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) } finalize_it: +dbgprintf("DDDD: in ratelimitMsg(): %d\n", iRet); RETiRet; } @@ -119,16 +132,9 @@ dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); goto done; } -#warning remove/enable after mailing list feedback -#if 0 - if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times", - pAction->f_prevcount); - } else { -#endif - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]", - ratelimit->nsupp, getMSG(ratelimit->repMsg)); -// } + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), + " message repeated %d times: [%.800s]", + ratelimit->nsupp, getMSG(ratelimit->repMsg)); /* We now need to update the other message properties. Please note that digital * signatures inside the message are invalidated. */ @@ -143,6 +149,7 @@ dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); done: return repMsg; } + rsRetVal ratelimitNew(ratelimit_t **ppThis) { @@ -156,9 +163,25 @@ finalize_it: } void -ratelimitDestruct(ratelimit_t *pThis) +ratelimitDestruct(ratelimit_t *ratelimit) { - free(pThis); + msg_t *pMsg; + if(ratelimit->pMsg != NULL) { + if(ratelimit->nsupp > 0) { + if(ratelimit->repMsg != NULL) { + dbgprintf("ratelimiter/destuct: call sequence error, have " + "previous repeat message - discarding\n"); + msgDestruct(&ratelimit->repMsg); + } + ratelimit->repMsg = ratelimit->pMsg; + pMsg = ratelimitGetRepeatMsg(ratelimit); + if(pMsg != NULL) + submitMsg(pMsg); + } + } else { + msgDestruct(&ratelimit->pMsg); + } + free(ratelimit); } void @@ -167,6 +190,7 @@ ratelimitModExit(void) objRelease(datetime, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); + objRelease(parser, CORE_COMPONENT); } rsRetVal @@ -177,6 +201,7 @@ ratelimitModInit(void) CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(parser, CORE_COMPONENT)); finalize_it: RETiRet; } diff --git a/tools/syslogd.c b/tools/syslogd.c index bfdb5081..e8957e13 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -699,6 +699,49 @@ multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */ } +/* add a message to a multisubmit structure. This handles ratelimiting. IF + * pMultiSub == NULL, a single-message enqueue happens. */ +rsRetVal +multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit) +{ + rsRetVal localRet; + DEFiRet; + + if(pMultiSub == NULL) { +dbgprintf("DDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n"); + CHKiRet(submitMsg(pMsg)); + } else { +dbgprintf("DDDD: have multisub!\n"); + localRet = ratelimitMsg(pMsg, ratelimit); + if(localRet == RS_RET_OK_HAVE_REPMSG) { +dbgprintf("DDDD: doing repeat submit!\n"); + pMultiSub->ppMsgs[pMultiSub->nElem++] = ratelimitGetRepeatMsg(ratelimit); + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); + localRet = RS_RET_OK; + } + if(localRet == RS_RET_OK) { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); + } + } + +finalize_it: + RETiRet; +} + +/* flush multiSubmit, e.g. at end of read records */ +rsRetVal +multiSubmitFlush(multi_submit_t *pMultiSub) +{ + DEFiRet; +dbgprintf("DDDD: multiSubmitFlish, nElem %d\n", pMultiSub->nElem); + if(pMultiSub->nElem > 0) { + iRet = multiSubmitMsg(pMultiSub); + } + RETiRet; +} static void -- cgit v1.2.3 From 2a6ff7d53d1c30f0d7efd022cf321736870c5629 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 08:55:12 +0200 Subject: interface cleanup (probably not final) --- dirty.h | 5 ++--- plugins/imdiag/imdiag.c | 2 +- plugins/imfile/imfile.c | 2 +- plugins/impstats/impstats.c | 2 +- plugins/imptcp/imptcp.c | 1 - plugins/imudp/imudp.c | 2 +- plugins/imuxsock/imuxsock.c | 4 ++-- runtime/ratelimit.c | 2 +- tcps_sess.c | 6 +++--- tools/syslogd.c | 23 ++++++++++++----------- 10 files changed, 24 insertions(+), 25 deletions(-) diff --git a/dirty.h b/dirty.h index d82d5524..e49730a1 100644 --- a/dirty.h +++ b/dirty.h @@ -28,13 +28,12 @@ #define DIRTY_H_INCLUDED 1 rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub); +rsRetVal submitMsg2(msg_t *pMsg); rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg); -rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub, ratelimit_t *ratelimit); -rsRetVal submitMsg2(msg_t *pMsg, ratelimit_t *ratelimit); rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub); rsRetVal multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit); rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); -rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset); +rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset); rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName); diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 460b8472..09742537 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -220,7 +220,7 @@ doInjectMsg(int iNum) pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pRcvDummy); CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy)); - CHKiRet(submitMsg2(pMsg, NULL)); + CHKiRet(submitMsg(pMsg)); finalize_it: RETiRet; diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 64a7e032..d6e6ad0e 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -191,7 +191,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) MsgSetRuleset(pMsg, pInfo->pRuleset); pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg; if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem) - CHKiRet(multiSubmitMsg2(&pInfo->multiSub, NULL)); + CHKiRet(multiSubmitMsg(&pInfo->multiSub)); finalize_it: RETiRet; } diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index ef2bebf7..62599969 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -138,7 +138,7 @@ doSubmitMsg(uchar *line) pMsg->iSeverity = runModConf->iSeverity; pMsg->msgFlags = 0; - submitMsg2(pMsg, NULL); + submitMsg(pMsg); finalize_it: RETiRet; diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 6d2ecd2f..e7650812 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -664,7 +664,6 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult msg_t *pMsg; ptcpsrv_t *pSrv; DEFiRet; -dbgprintf("DDDD: in imptcp doSubmitMSg()\n"); if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index c0448cba..0dda30ec 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -383,7 +383,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); - CHKiRet(submitMsg2(pMsg, NULL)); + CHKiRet(submitMsg(pMsg)); STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 1ff89353..2f7daf87 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -995,11 +995,11 @@ if(ratelimit == NULL) localRet = ratelimitMsg(pMsg, ratelimit); if(localRet == RS_RET_OK_HAVE_REPMSG) { dbgprintf("DDDD: doing repeat submit!\n"); - CHKiRet(submitMsg2(ratelimitGetRepeatMsg(ratelimit), NULL)); + CHKiRet(submitMsg2(ratelimitGetRepeatMsg(ratelimit))); localRet = RS_RET_OK; } if(localRet == RS_RET_OK) - CHKiRet(submitMsg2(pMsg, NULL)); + CHKiRet(submitMsg2(pMsg)); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index 03b9631e..e6ea6f4e 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -176,7 +176,7 @@ ratelimitDestruct(ratelimit_t *ratelimit) ratelimit->repMsg = ratelimit->pMsg; pMsg = ratelimitGetRepeatMsg(ratelimit); if(pMsg != NULL) - submitMsg(pMsg); + submitMsg2(pMsg); } } else { msgDestruct(&ratelimit->pMsg); diff --git a/tcps_sess.c b/tcps_sess.c index be06a4e1..bc81c299 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -265,11 +265,11 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit); if(pMultiSub == NULL) { - CHKiRet(submitMsg2(pMsg, NULL)); + CHKiRet(submitMsg2(pMsg)); } else { pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); + CHKiRet(multiSubmitMsg(pMultiSub)); } @@ -490,7 +490,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) if(multiSub.nElem > 0) { /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg2(&multiSub, NULL)); + CHKiRet(multiSubmitMsg(&multiSub)); } finalize_it: diff --git a/tools/syslogd.c b/tools/syslogd.c index e8957e13..1eac6ccb 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -418,7 +418,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int fla CHKiRet(prop.Destruct(&pProp)); CHKiRet(MsgSetRcvFromIPStr(pMsg, hnameIP, ustrlen(hnameIP), &pProp)); CHKiRet(prop.Destruct(&pProp)); - CHKiRet(submitMsg2(pMsg, NULL)); + CHKiRet(submitMsg2(pMsg)); finalize_it: RETiRet; @@ -489,7 +489,7 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags) /* we have the queue, so we can simply provide the * message to the queue engine. */ - submitMsg2(pMsg, NULL); + submitMsg2(pMsg); } finalize_it: RETiRet; @@ -625,7 +625,7 @@ int i; * rgerhards, 2008-02-13 */ rsRetVal -submitMsg2(msg_t *pMsg, ratelimit_t *ratelimit) +submitMsg2(msg_t *pMsg) { qqueue_t *pQueue; ruleset_t *pRuleset; @@ -649,10 +649,11 @@ submitMsg2(msg_t *pMsg, ratelimit_t *ratelimit) finalize_it: RETiRet; } + rsRetVal -submitMsg(msg_t *pMsg) /* backward compat. level */ +submitMsg(msg_t *pMsg) { - return submitMsg2(pMsg, NULL); + return submitMsg2(pMsg); } @@ -661,7 +662,7 @@ submitMsg(msg_t *pMsg) /* backward compat. level */ * rgerhards, 2009-06-16 */ rsRetVal -multiSubmitMsg2(multi_submit_t *pMultiSub, ratelimit_t *ratelimit) +multiSubmitMsg2(multi_submit_t *pMultiSub) { int i; qqueue_t *pQueue; @@ -695,7 +696,7 @@ finalize_it: rsRetVal multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */ { - return multiSubmitMsg2(pMultiSub, NULL); + return multiSubmitMsg2(pMultiSub); } @@ -717,13 +718,13 @@ dbgprintf("DDDD: have multisub!\n"); dbgprintf("DDDD: doing repeat submit!\n"); pMultiSub->ppMsgs[pMultiSub->nElem++] = ratelimitGetRepeatMsg(ratelimit); if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); + CHKiRet(multiSubmitMsg2(pMultiSub)); localRet = RS_RET_OK; } if(localRet == RS_RET_OK) { pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); + CHKiRet(multiSubmitMsg2(pMultiSub)); } } @@ -738,7 +739,7 @@ multiSubmitFlush(multi_submit_t *pMultiSub) DEFiRet; dbgprintf("DDDD: multiSubmitFlish, nElem %d\n", pMultiSub->nElem); if(pMultiSub->nElem > 0) { - iRet = multiSubmitMsg(pMultiSub); + iRet = multiSubmitMsg2(pMultiSub); } RETiRet; } @@ -1322,7 +1323,7 @@ static inline void processImInternal(void) msg_t *pMsg; while(iminternalRemoveMsg(&pMsg) == RS_RET_OK) { - submitMsg2(pMsg, NULL); + submitMsg(pMsg); } } -- cgit v1.2.3 From de3b0a786124ba2870a01b431811e5302ae3ce10 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 08:57:08 +0200 Subject: indicate no original code left in imudp --- plugins/imudp/imudp.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0dda30ec..382ccbc2 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -4,8 +4,6 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) - * * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. -- cgit v1.2.3 From 1577b76e37eff9aa1dc542390223ac42539fbbbf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 11:06:18 +0200 Subject: Change debug output defaults to match current needs --- runtime/rsconf.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/rsconf.c b/runtime/rsconf.c index ad588832..cf29c720 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -114,8 +114,8 @@ void cnfDoCfsysline(char *ln); */ BEGINobjConstruct(rsconf) /* be sure to specify the object type also in END macro! */ pThis->globals.bDebugPrintTemplateList = 1; - pThis->globals.bDebugPrintModuleList = 1; - pThis->globals.bDebugPrintCfSysLineHandlerList = 1; + pThis->globals.bDebugPrintModuleList = 0; + pThis->globals.bDebugPrintCfSysLineHandlerList = 0; pThis->globals.bLogStatusMsgs = DFLT_bLogStatusMsgs; pThis->globals.bErrMsgToStderr = 1; pThis->globals.umask = -1; -- cgit v1.2.3 From 3806643baa9fe0d50fb36080e4ab3a078b8a5952 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 11:19:22 +0200 Subject: ratelimiter: enable thread-safe mode & (related) API changes --- dirty.h | 2 +- plugins/imfile/imfile.c | 2 +- plugins/imptcp/imptcp.c | 3 +- plugins/imudp/imudp.c | 4 ++ plugins/imuxsock/imuxsock.c | 12 ++-- runtime/ratelimit.c | 141 ++++++++++++++++++++++++++------------------ runtime/ratelimit.h | 9 +-- runtime/rsyslog.h | 1 - tools/syslogd.c | 32 ---------- 9 files changed, 101 insertions(+), 105 deletions(-) diff --git a/dirty.h b/dirty.h index e49730a1..30b30bec 100644 --- a/dirty.h +++ b/dirty.h @@ -28,10 +28,10 @@ #define DIRTY_H_INCLUDED 1 rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub); +rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub); /* friends only! */ rsRetVal submitMsg2(msg_t *pMsg); rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg); rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub); -rsRetVal multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit); rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset); rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index d6e6ad0e..453b6b05 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -306,7 +306,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) finalize_it: if(pThis->multiSub.nElem > 0) { /* submit everything that was not yet submitted */ - CHKiRet(multiSubmitMsg2(&pThis->multiSub, NULL)); + CHKiRet(multiSubmitMsg(&pThis->multiSub)); } ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ /* Note: the problem above is that pthread:cleanup_pop() is a macro which diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index e7650812..22d3ff25 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -682,7 +682,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult MsgSetRuleset(pMsg, pSrv->pRuleset); STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); - multiSubmitAddMsg(pMultiSub, pMsg, pSrv->ratelimiter); + ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg); finalize_it: /* reset status variables */ @@ -1124,6 +1124,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; CHKiRet(ratelimitNew(&pSrv->ratelimiter)); + ratelimitSetThreadSafe(pSrv->ratelimiter); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; if(inst->pszBindAddr == NULL) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 382ccbc2..2ddc10bb 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -51,6 +51,7 @@ #include "prop.h" #include "ruleset.h" #include "statsobj.h" +#include "ratelimit.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -75,6 +76,7 @@ static struct lstn_s { int sock; /* socket */ ruleset_t *pRuleset; /* bound ruleset */ statsobj_t *stats; /* listener stats */ + ratelimit_t *ratelimiter; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; @@ -246,6 +248,7 @@ addListner(instanceConf_t *inst) newlcnfinfo->next = NULL; newlcnfinfo->sock = newSocks[iSrc]; newlcnfinfo->pRuleset = inst->pBindRuleset; + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter)); /* support statistics gathering */ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); @@ -882,6 +885,7 @@ CODESTARTafterRun net.clearAllowedSenders((uchar*)"UDP"); for(lstn = lcnfRoot ; lstn != NULL ; ) { statsobj.Destruct(&(lstn->stats)); + ratelimitDestruct(lstn->ratelimiter); close(lstn->sock); lstnDel = lstn; lstn = lstn->next; diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 2f7daf87..3421863d 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -811,6 +811,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim int toffs; /* offset for trusted properties */ struct syslogTime dummyTS; struct json_object *json = NULL, *jval; + msg_t *repMsg; DEFiRet; #warning experimental code needs to be made production-ready! /* we need to decide how many ratelimiters we use --> hashtable @@ -992,14 +993,11 @@ if(ratelimit == NULL) MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - localRet = ratelimitMsg(pMsg, ratelimit); - if(localRet == RS_RET_OK_HAVE_REPMSG) { -dbgprintf("DDDD: doing repeat submit!\n"); - CHKiRet(submitMsg2(ratelimitGetRepeatMsg(ratelimit))); - localRet = RS_RET_OK; - } + localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); + if(repMsg != NULL) + CHKiRet(submitMsg(repMsg)); if(localRet == RS_RET_OK) - CHKiRet(submitMsg2(pMsg)); + CHKiRet(submitMsg(pMsg)); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index e6ea6f4e..6b637677 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -43,6 +43,31 @@ DEFobjCurrIf(parser) /* static data */ +/* generate a "repeated n times" message */ +static inline msg_t * +ratelimitGenRepMsg(ratelimit_t *ratelimit) +{ + msg_t *repMsg; + size_t lenRepMsg; + uchar szRepMsg[1024]; + + if(ratelimit->nsupp == 1) { /* we simply use the original message! */ + repMsg = MsgAddRef(ratelimit->pMsg); + } else {/* we need to duplicate, original message may still be in use in other + * parts of the system! */ + if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) { + DBGPRINTF("Message duplication failed, dropping repeat message.\n"); + goto done; + } + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), + " message repeated %d times: [%.800s]", + ratelimit->nsupp, getMSG(ratelimit->pMsg)); + MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg); + } + +done: return repMsg; +} + /* ratelimit a message, that means: * - handle "last message repeated n times" logic * - handle actual (discarding) rate-limiting @@ -54,14 +79,14 @@ DEFobjCurrIf(parser) * cooperative mode can enable a faulty caller to thrash up part * of the system, but we accept that risk (a faulty caller can * always do all sorts of evil, so...) - * Note that the message pointer may be updated upon return. In - * this case, the ratelimiter is reponsible for handling the - * original message. + * If *ppRepMsg != NULL on return, the caller must enqueue that + * message before the original message. */ rsRetVal -ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) +ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { rsRetVal localRet; + int bNeedUnlockMutex = 0; DEFiRet; if((pMsg->msgFlags & NEEDS_PARSING) != 0) { @@ -71,7 +96,12 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) } } + if(ratelimit->bThreadSafe) { + pthread_mutex_lock(&ratelimit->mut); + bNeedUnlockMutex = 1; + } + *ppRepMsg = NULL; /* suppress duplicate messages */ if( ratelimit->pMsg != NULL && getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) && @@ -84,72 +114,59 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) /* use current message, so we have the new timestamp * (means we need to discard previous one) */ msgDestruct(&ratelimit->pMsg); - ratelimit->pMsg = MsgAddRef(pMsg); + ratelimit->pMsg = pMsg; ABORT_FINALIZE(RS_RET_DISCARDMSG); - } else {/* new message, save it */ - /* first check if we have a previous message stored - * if so, emit and then discard it first - */ + } else {/* new message, do "repeat processing" & save it */ if(ratelimit->pMsg != NULL) { if(ratelimit->nsupp > 0) { - dbgprintf("DDDD: would need to emit 'repeat' message\n"); - if(ratelimit->repMsg != NULL) { - dbgprintf("ratelimiter: call sequence error, have " - "previous repeat message - discarding\n"); - msgDestruct(&ratelimit->repMsg); - } - ratelimit->repMsg = ratelimit->pMsg; - iRet = RS_RET_OK_HAVE_REPMSG; + *ppRepMsg = ratelimitGenRepMsg(ratelimit); + ratelimit->nsupp = 0; } } ratelimit->pMsg = MsgAddRef(pMsg); } finalize_it: -dbgprintf("DDDD: in ratelimitMsg(): %d\n", iRet); + if(bNeedUnlockMutex) + pthread_mutex_unlock(&ratelimit->mut); RETiRet; } -/* return the current repeat message or NULL, if none is present. This MUST - * be called after ratelimitMsg() returned RS_RET_OK_HAVE_REPMSG. It is - * important that the message returned by us is enqueued BEFORE the original - * message, otherwise users my imply different order. - * If a message object is returned, the caller must destruct it if no longer - * needed. + +/* add a message to a ratelimiter/multisubmit structure. + * ratelimiting is automatically handled according to the ratelimit + * settings. + * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration) */ -msg_t * -ratelimitGetRepeatMsg(ratelimit_t *ratelimit) +rsRetVal +ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg) { + rsRetVal localRet; msg_t *repMsg; - size_t lenRepMsg; - uchar szRepMsg[1024]; -dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); + DEFiRet; - /* we need to duplicate, original message may still be in use in other - * parts of the system! */ - if((repMsg = MsgDup(ratelimit->repMsg)) == NULL) { - DBGPRINTF("Message duplication failed, dropping repeat message.\n"); - goto done; + if(pMultiSub == NULL) { +dbgprintf("DDDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n"); +#warning missing multisub Implementation? + CHKiRet(submitMsg(pMsg)); + } else { + localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); + if(repMsg != NULL) { + pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub)); + } + if(localRet == RS_RET_OK) { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub)); + } } - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), - " message repeated %d times: [%.800s]", - ratelimit->nsupp, getMSG(ratelimit->repMsg)); - - /* We now need to update the other message properties. Please note that digital - * signatures inside the message are invalidated. */ - datetime.getCurrTime(&(repMsg->tRcvdAt), &(repMsg->ttGenTime)); - memcpy(&repMsg->tTIMESTAMP, &repMsg->tRcvdAt, sizeof(struct syslogTime)); - MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg); - - if(ratelimit->repMsg != NULL) { - ratelimit->repMsg = NULL; - ratelimit->nsupp = 0; - } -done: return repMsg; +finalize_it: + RETiRet; } - rsRetVal ratelimitNew(ratelimit_t **ppThis) { @@ -162,25 +179,33 @@ finalize_it: RETiRet; } +/* enable thread-safe operations mode. This make sure that + * a single ratelimiter can be called from multiple threads. As + * this causes some overhead and is not always required, it needs + * to be explicitely enabled. This operation cannot be undone + * (think: why should one do that???) + */ +void +ratelimitSetThreadSafe(ratelimit_t *ratelimit) +{ + ratelimit->bThreadSafe = 1; + pthread_mutex_init(&ratelimit->mut, NULL); +} + void ratelimitDestruct(ratelimit_t *ratelimit) { msg_t *pMsg; if(ratelimit->pMsg != NULL) { if(ratelimit->nsupp > 0) { - if(ratelimit->repMsg != NULL) { - dbgprintf("ratelimiter/destuct: call sequence error, have " - "previous repeat message - discarding\n"); - msgDestruct(&ratelimit->repMsg); - } - ratelimit->repMsg = ratelimit->pMsg; - pMsg = ratelimitGetRepeatMsg(ratelimit); + pMsg = ratelimitGenRepMsg(ratelimit); if(pMsg != NULL) submitMsg2(pMsg); } - } else { msgDestruct(&ratelimit->pMsg); } + if(ratelimit->bThreadSafe) + pthread_mutex_destroy(&ratelimit->mut); free(ratelimit); } diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h index ba2c6b23..37dad900 100644 --- a/runtime/ratelimit.h +++ b/runtime/ratelimit.h @@ -24,14 +24,15 @@ struct ratelimit_s { unsigned nsupp; /**< nbr of msgs suppressed */ msg_t *pMsg; - msg_t *repMsg; /**< repeat message, temporary buffer */ - /* dummy field list - TODO: implement */ + sbool bThreadSafe; /**< do we need to operate in Thread-Safe mode? */ + pthread_mutex_t mut; /**< mutex if thread-safe operation desired */ }; /* prototypes */ rsRetVal ratelimitNew(ratelimit_t **ppThis); -rsRetVal ratelimitMsg(msg_t *ppMsg, ratelimit_t *ratelimit); -msg_t * ratelimitGetRepeatMsg(ratelimit_t *ratelimit); +void ratelimitSetThreadSafe(ratelimit_t *ratelimit); +rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep); +rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg); void ratelimitDestruct(ratelimit_t *pThis); rsRetVal ratelimitModInit(void); void ratelimitModExit(void); diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index e98a9b8c..4404c475 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -392,7 +392,6 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_JNAME_NOTFOUND = -2305, /**< JSON name not found (does not exist) */ RS_RET_INVLD_SETOP = -2305, /**< invalid variable set operation, incompatible type */ RS_RET_RULESET_EXISTS = -2306,/**< ruleset already exists */ - RS_RET_OK_HAVE_REPMSG = -2307,/**< processing was OK, but we do have a repeat message (ratelimiter status) */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ diff --git a/tools/syslogd.c b/tools/syslogd.c index 1eac6ccb..a56cea94 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -700,38 +700,6 @@ multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */ } -/* add a message to a multisubmit structure. This handles ratelimiting. IF - * pMultiSub == NULL, a single-message enqueue happens. */ -rsRetVal -multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit) -{ - rsRetVal localRet; - DEFiRet; - - if(pMultiSub == NULL) { -dbgprintf("DDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n"); - CHKiRet(submitMsg(pMsg)); - } else { -dbgprintf("DDDD: have multisub!\n"); - localRet = ratelimitMsg(pMsg, ratelimit); - if(localRet == RS_RET_OK_HAVE_REPMSG) { -dbgprintf("DDDD: doing repeat submit!\n"); - pMultiSub->ppMsgs[pMultiSub->nElem++] = ratelimitGetRepeatMsg(ratelimit); - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub)); - localRet = RS_RET_OK; - } - if(localRet == RS_RET_OK) { - pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub)); - } - } - -finalize_it: - RETiRet; -} - /* flush multiSubmit, e.g. at end of read records */ rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub) -- cgit v1.2.3 From 25a8496a6ae79480e8601cbdab5b45ad613a8dcd Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 11:48:43 +0200 Subject: imudp: add input batching & ratelimiter interface --- ChangeLog | 3 +++ plugins/imptcp/imptcp.c | 6 ++---- plugins/imudp/imudp.c | 12 ++++++++++-- runtime/ratelimit.c | 1 + runtime/rsyslog.h | 1 + 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/ChangeLog b/ChangeLog index 224afae5..6e8dfc97 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,7 @@ --------------------------------------------------------------------------- +Version 7.3.1 [devel] 2012-10-?? +- imudp: support for input batching added (performance improvement) +--------------------------------------------------------------------------- Version 7.3.0 [devel] 2012-10-09 - omlibdbi improvements, added * support for config load phases & module() parameters diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 22d3ff25..8495b293 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -801,12 +801,11 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG * we have just received a bunch of data! -- rgerhards, 2009-06-16 * EXTRACT from tcps_sess.c */ -#define NUM_MULTISUB 1024 static rsRetVal DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) { multi_submit_t multiSub; - msg_t *pMsgs[NUM_MULTISUB]; + msg_t *pMsgs[CONF_NUM_MULTISUB]; struct syslogTime stTime; time_t ttGenTime; char *pEnd; @@ -817,7 +816,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) datetime.getCurrTime(&stTime, &ttGenTime); multiSub.ppMsgs = pMsgs; - multiSub.maxElem = NUM_MULTISUB; + multiSub.maxElem = CONF_NUM_MULTISUB; multiSub.nElem = 0; /* We now copy the message to the session buffer. */ @@ -832,7 +831,6 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) finalize_it: RETiRet; } -#undef NUM_MULTISUB /****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/ diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 2ddc10bb..7f9e00dc 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -305,7 +305,6 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta static inline rsRetVal processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { - DEFiRet; int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; @@ -315,9 +314,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f msg_t *pMsg; prop_t *propFromHost = NULL; prop_t *propFromHostIP = NULL; + multi_submit_t multiSub; + msg_t *pMsgs[CONF_NUM_MULTISUB]; char errStr[1024]; + DEFiRet; assert(pThrd != NULL); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = CONF_NUM_MULTISUB; + multiSub.nElem = 0; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) @@ -384,12 +389,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg)); STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } + finalize_it: + multiSubmitFlush(&multiSub); + if(propFromHost != NULL) prop.Destruct(&propFromHost); if(propFromHostIP != NULL) diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index 6b637677..2df5f78a 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -122,6 +122,7 @@ ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) *ppRepMsg = ratelimitGenRepMsg(ratelimit); ratelimit->nsupp = 0; } + msgDestruct(&ratelimit->pMsg); } ratelimit->pMsg = MsgAddRef(pMsg); } diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 4404c475..e6238c23 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -69,6 +69,7 @@ * approach taken here is considered appropriate. * rgerhards, 2010-06-24 */ +#define CONF_NUM_MULTISUB 1024 /* default number of messages per multisub structure */ /* ############################################################# * * # End Config Settings # * -- cgit v1.2.3 From 0d60901e6e404419b06ad0d20429edf7ea3053ce Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 12:09:37 +0200 Subject: ratelimit: imtcp (and gssapi) converted to new interface --- plugins/imtcp/imtcp.c | 7 +++---- tcps_sess.c | 16 +++------------- tcpsrv.c | 3 +++ tcpsrv.h | 1 + 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index eaf9a213..beb7d705 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -580,7 +580,9 @@ ENDwillRun BEGINafterRun CODESTARTafterRun - /* do cleanup here */ + if(pOurTcpsrv != NULL) + iRet = tcpsrv.Destruct(&pOurTcpsrv); + net.clearAllowedSenders(UCHAR_CONSTANT("TCP")); ENDafterRun @@ -594,9 +596,6 @@ ENDisCompatibleWithFeature BEGINmodExit CODESTARTmodExit - if(pOurTcpsrv != NULL) - iRet = tcpsrv.Destruct(&pOurTcpsrv); - if(pPermPeersRoot != NULL) { net.DestructPermittedPeers(&pPermPeersRoot); } diff --git a/tcps_sess.c b/tcps_sess.c index bc81c299..16fd94f5 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -47,6 +47,7 @@ #include "msg.h" #include "datetime.h" #include "prop.h" +#include "ratelimit.h" #include "debug.h" @@ -264,14 +265,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset); STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit); - if(pMultiSub == NULL) { - CHKiRet(submitMsg2(pMsg)); - } else { - pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg(pMultiSub)); - } - + ratelimitAddMsg(pThis->pLstnInfo->ratelimiter, pMultiSub, pMsg); finalize_it: /* reset status variables */ @@ -487,11 +481,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) while(pData < pEnd) { CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } - - if(multiSub.nElem > 0) { - /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg(&multiSub)); - } + iRet = multiSubmitFlush(&multiSub); finalize_it: RETiRet; diff --git a/tcpsrv.c b/tcpsrv.c index bf12f1fa..6b134017 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -72,6 +72,7 @@ #include "nspoll.h" #include "errmsg.h" #include "ruleset.h" +#include "ratelimit.h" #include "unicode-helper.h" @@ -151,6 +152,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram) snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort); statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ CHKiRet(statsobj.SetName(pEntry->stats, statname)); + CHKiRet(ratelimitNew(&pEntry->ratelimiter)); STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit); CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(pEntry->ctrSubmit))); @@ -295,6 +297,7 @@ static void deinit_tcp_listener(tcpsrv_t *pThis) while(pEntry != NULL) { free(pEntry->pszPort); prop.Destruct(&pEntry->pInputName); + ratelimitDestruct(pEntry->ratelimiter); pDel = pEntry; pEntry = pEntry->pNext; free(pDel); diff --git a/tcpsrv.h b/tcpsrv.h index d66f682c..4884b34d 100644 --- a/tcpsrv.h +++ b/tcpsrv.h @@ -42,6 +42,7 @@ struct tcpLstnPortList_s { ruleset_t *pRuleset; /**< associated ruleset */ statsobj_t *stats; /**< associated stats object */ sbool bSuppOctetFram; /**< do we support octect-counted framing? (if no->legay only!)*/ + ratelimit_t *ratelimiter; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) tcpLstnPortList_t *pNext; /**< next port or NULL */ }; -- cgit v1.2.3 From a3a3a322d8c7d40dfed0765457d26a484a624bce Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 12:22:16 +0200 Subject: imfile: converted to new ratelimit interface --- plugins/imfile/imfile.c | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 453b6b05..99210b76 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -48,6 +48,7 @@ #include "prop.h" #include "stringbuf.h" #include "ruleset.h" +#include "ratelimit.h" MODULE_TYPE_INPUT /* must be present for input modules, do not remove */ MODULE_TYPE_NOKEEP @@ -82,6 +83,7 @@ typedef struct fileInfo_s { strm_t *pStrm; /* its stream (NULL if not assigned) */ int readMode; /* which mode to use in ReadMulteLine call? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + ratelimit_t *ratelimiter; multi_submit_t multiSub; } fileInfo_t; @@ -189,9 +191,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); MsgSetRuleset(pMsg, pInfo->pRuleset); - pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg; - if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem) - CHKiRet(multiSubmitMsg(&pInfo->multiSub)); + ratelimitAddMsg(pInfo->ratelimiter, &pInfo->multiSub, pMsg); finalize_it: RETiRet; } @@ -304,18 +304,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) } finalize_it: - if(pThis->multiSub.nElem > 0) { - /* submit everything that was not yet submitted */ - CHKiRet(multiSubmitMsg(&pThis->multiSub)); - } - ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ - /* Note: the problem above is that pthread:cleanup_pop() is a macro which - * evaluates to something like "} while(0);". So the code would become - * "finalize_it: }", that is a label without a statement. The C standard does - * not permit this. So we add an empty statement "finalize_it: ; }" and - * everybody is happy. Note that without the ;, an error is reported only - * on some platforms/compiler versions. -- rgerhards, 2008-08-15 - */ + multiSubmitFlush(&pThis->multiSub); pthread_cleanup_pop(0); if(pCStr != NULL) { @@ -423,6 +412,7 @@ addListner(instanceConf_t *inst) pThis->lenTag = ustrlen(pThis->pszTag); pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile); + CHKiRet(ratelimitNew(&pThis->ratelimiter)); CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*))); pThis->multiSub.maxElem = inst->nMultiSub; pThis->multiSub.nElem = 0; @@ -773,6 +763,8 @@ CODESTARTafterRun persistStrmState(&files[i]); strm.Destruct(&(files[i].pStrm)); } + ratelimitDestruct(files[i].ratelimiter); + free(files[i].multiSub.ppMsgs); free(files[i].pszFileName); free(files[i].pszTag); free(files[i].pszStateFile); -- cgit v1.2.3 From 912db8bbacb5372f02f96638102aafff70ba269f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 12:38:49 +0200 Subject: milestone: convert imuxsock to ratelimitAddMsg() interface --- plugins/imuxsock/imuxsock.c | 31 +------------------------------ runtime/ratelimit.c | 8 +++++--- 2 files changed, 6 insertions(+), 33 deletions(-) diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 3421863d..6aa2b1e8 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -763,28 +763,6 @@ copyescaped(uchar *dstbuf, uchar *inbuf, int inlen) } -#if 0 -/* Creates new field to be added to event - * used for SystemLogParseTrusted parsing - */ -struct ee_field * -createNewField(char *fieldname, char *value, int lenValue) { - es_str_t *newStr; - struct ee_value *newVal; - struct ee_field *newField; - - newStr = es_newStrFromBuf(value, (es_size_t) lenValue); - - newVal = ee_newValue(ctxee); - ee_setStrValue(newVal, newStr); - - newField = ee_newFieldFromNV(ctxee, fieldname, newVal); - - return newField; -} -#endif - - /* submit received message to the queue engine * We now parse the message according to expected format so that we * can also mangle it if necessary. @@ -811,14 +789,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim int toffs; /* offset for trusted properties */ struct syslogTime dummyTS; struct json_object *json = NULL, *jval; - msg_t *repMsg; DEFiRet; #warning experimental code needs to be made production-ready! /* we need to decide how many ratelimiters we use --> hashtable also remove current homegrown ratelimiting functionality and replace it with the new one. */ -rsRetVal localRet; static ratelimit_t *ratelimit = NULL; if(ratelimit == NULL) ratelimitNew(&ratelimit); @@ -993,12 +969,7 @@ if(ratelimit == NULL) MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); - if(repMsg != NULL) - CHKiRet(submitMsg(repMsg)); - if(localRet == RS_RET_OK) - CHKiRet(submitMsg(pMsg)); - + ratelimitAddMsg(ratelimit, NULL, pMsg); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: RETiRet; diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index 2df5f78a..d3d32b58 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -147,9 +147,11 @@ ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg) DEFiRet; if(pMultiSub == NULL) { -dbgprintf("DDDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n"); -#warning missing multisub Implementation? - CHKiRet(submitMsg(pMsg)); + localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); + if(repMsg != NULL) + CHKiRet(submitMsg2(repMsg)); + if(localRet == RS_RET_OK) + CHKiRet(submitMsg2(pMsg)); } else { localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); if(repMsg != NULL) { -- cgit v1.2.3 From 6ab4666622efeb9944bbf7b6e3581d1372465adb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 13:05:42 +0200 Subject: ratelimit: respect $repeatedmsgreduction setting --- runtime/ratelimit.c | 67 ++++++++++++++++++++++++++++++++--------------------- runtime/ratelimit.h | 8 +++++++ 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index d3d32b58..e94ad1bc 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -32,6 +32,7 @@ #include "parser.h" #include "unicode-helper.h" #include "msg.h" +#include "rsconf.h" #include "dirty.h" /* definitions for objects we access */ @@ -68,41 +69,17 @@ ratelimitGenRepMsg(ratelimit_t *ratelimit) done: return repMsg; } -/* ratelimit a message, that means: - * - handle "last message repeated n times" logic - * - handle actual (discarding) rate-limiting - * This function returns RS_RET_OK, if the caller shall process - * the message regularly and RS_RET_DISCARD if the caller must - * discard the message. The caller should also discard the message - * if another return status occurs. This places some burden on the - * caller logic, but provides best performance. Demanding this - * cooperative mode can enable a faulty caller to thrash up part - * of the system, but we accept that risk (a faulty caller can - * always do all sorts of evil, so...) - * If *ppRepMsg != NULL on return, the caller must enqueue that - * message before the original message. - */ -rsRetVal -ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) +static inline rsRetVal +doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { - rsRetVal localRet; int bNeedUnlockMutex = 0; DEFiRet; - if((pMsg->msgFlags & NEEDS_PARSING) != 0) { - if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { - DBGPRINTF("Message discarded, parsing error %d\n", localRet); - ABORT_FINALIZE(RS_RET_DISCARDMSG); - } - } - if(ratelimit->bThreadSafe) { pthread_mutex_lock(&ratelimit->mut); bNeedUnlockMutex = 1; } - *ppRepMsg = NULL; - /* suppress duplicate messages */ if( ratelimit->pMsg != NULL && getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) && !ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) && @@ -133,6 +110,43 @@ finalize_it: RETiRet; } +/* ratelimit a message, that means: + * - handle "last message repeated n times" logic + * - handle actual (discarding) rate-limiting + * This function returns RS_RET_OK, if the caller shall process + * the message regularly and RS_RET_DISCARD if the caller must + * discard the message. The caller should also discard the message + * if another return status occurs. This places some burden on the + * caller logic, but provides best performance. Demanding this + * cooperative mode can enable a faulty caller to thrash up part + * of the system, but we accept that risk (a faulty caller can + * always do all sorts of evil, so...) + * If *ppRepMsg != NULL on return, the caller must enqueue that + * message before the original message. + */ +rsRetVal +ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) +{ + rsRetVal localRet; + DEFiRet; + +#warning be sure to parse only when actually required! + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } + } + + *ppRepMsg = NULL; + + if(ratelimit->bReduceRepeatMsgs) { + CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg)); + } +finalize_it: + RETiRet; +} + /* add a message to a ratelimiter/multisubmit structure. * ratelimiting is automatically handled according to the ratelimit @@ -177,6 +191,7 @@ ratelimitNew(ratelimit_t **ppThis) DEFiRet; CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t))); + pThis->bReduceRepeatMsgs = runConf->globals.bReduceRepeatMsgs; *ppThis = pThis; finalize_it: RETiRet; diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h index 37dad900..5fa884a8 100644 --- a/runtime/ratelimit.h +++ b/runtime/ratelimit.h @@ -22,6 +22,14 @@ #define INCLUDED_RATELIMIT_H struct ratelimit_s { + /* support for Linux kernel-type ratelimiting */ + unsigned short interval; + unsigned short burst; + unsigned done; + unsigned missed; + time_t begin; + /* support for "last message repeated n times */ + int bReduceRepeatMsgs; /**< shall we do "last message repeated n times" processing? */ unsigned nsupp; /**< nbr of msgs suppressed */ msg_t *pMsg; sbool bThreadSafe; /**< do we need to operate in Thread-Safe mode? */ -- cgit v1.2.3 From 34a88a7e9a9501593b6fe9f79cc96963c4da7cde Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 16:39:36 +0200 Subject: ratelimit: added linux-like ratelimiter type --- plugins/imfile/imfile.c | 2 +- plugins/imptcp/imptcp.c | 3 +- plugins/imudp/imudp.c | 10 ++-- plugins/imuxsock/imuxsock.c | 106 ++++++--------------------------------- runtime/msg.c | 3 ++ runtime/ratelimit.c | 119 ++++++++++++++++++++++++++++++++++++++++---- runtime/ratelimit.h | 7 ++- tcpsrv.c | 2 +- tools/syslogd.c | 1 - 9 files changed, 143 insertions(+), 110 deletions(-) diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 99210b76..9a8a0373 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -412,7 +412,7 @@ addListner(instanceConf_t *inst) pThis->lenTag = ustrlen(pThis->pszTag); pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile); - CHKiRet(ratelimitNew(&pThis->ratelimiter)); + CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", inst->pszFileName)); CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*))); pThis->multiSub.maxElem = inst->nMultiSub; pThis->multiSub.nElem = 0; diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 8495b293..aba4c439 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -1121,7 +1121,8 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; - CHKiRet(ratelimitNew(&pSrv->ratelimiter)); + CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort)); +//TODO: add!ratelimitSetLinuxLike(pSrv->ratelimiter, 3, 2); ratelimitSetThreadSafe(pSrv->ratelimiter); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 7f9e00dc..aba7d69d 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -223,7 +223,7 @@ addListner(instanceConf_t *inst) struct lstn_s *newlcnfinfo; uchar *bindName; uchar *port; - uchar statname[64]; + uchar dispname[64]; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -248,12 +248,12 @@ addListner(instanceConf_t *inst) newlcnfinfo->next = NULL; newlcnfinfo->sock = newSocks[iSrc]; newlcnfinfo->pRuleset = inst->pBindRuleset; - CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter)); + snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port); + dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, dispname, NULL)); /* support statistics gathering */ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); - snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); - statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ - CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname)); STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit); CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 6aa2b1e8..c871391c 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -106,15 +106,6 @@ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit) STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters) -struct rs_ratelimit_state { - unsigned short interval; - unsigned short burst; - unsigned done; - unsigned missed; - time_t begin; -}; -typedef struct rs_ratelimit_state rs_ratelimit_state_t; - /* a very simple "hash function" for process IDs - we simply use the * pid itself: it is quite expected that all pids may log some time, but @@ -272,74 +263,9 @@ static struct cnfparamblk inppblk = /* we do not use this, because we do not bind to a ruleset so far * enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */ -static void -initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst) -{ - rs->interval = interval; - rs->burst = burst; - rs->done = 0; - rs->missed = 0; - rs->begin = 0; -} - static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ -/* ratelimiting support, modelled after the linux kernel - * returns 1 if message is within rate limit and shall be - * processed, 0 otherwise. - * This implementation is NOT THREAD-SAFE and must not - * be called concurrently. - */ -static inline int -withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid) -{ - int ret; - uchar msgbuf[1024]; - - if(rs->interval == 0) { - ret = 1; - goto finalize_it; - } - - assert(rs->burst != 0); - - if(rs->begin == 0) - rs->begin = tt; - - /* resume if we go out of out time window */ - if(tt > rs->begin + rs->interval) { - if(rs->missed) { - snprintf((char*)msgbuf, sizeof(msgbuf), - "imuxsock lost %u messages from pid %lu due to rate-limiting", - rs->missed, (unsigned long) pid); - logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); - rs->missed = 0; - } - rs->begin = 0; - rs->done = 0; - } - - /* do actual limit check */ - if(rs->burst > rs->done) { - rs->done++; - ret = 1; - } else { - if(rs->missed == 0) { - snprintf((char*)msgbuf, sizeof(msgbuf), - "imuxsock begins to drop messages from pid %lu due to rate-limiting", - (unsigned long) pid); - logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); - } - rs->missed++; - ret = 0; - } - -finalize_it: - return ret; -} - - /* create input instance, set default paramters, and * add it to the list of instances. */ @@ -446,7 +372,8 @@ addListner(instanceConf_t *inst) CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName)); } if(inst->ratelimitInterval > 0) { - if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { + if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, + (void(*)(void*))ratelimitDestruct)) == NULL) { /* in this case, we simply turn off rate-limiting */ DBGPRINTF("imuxsock: turning off rate limiting because we could not " "create hash table\n"); @@ -605,19 +532,22 @@ finalize_it: * listener (the latter being a performance enhancement). */ static inline rsRetVal -findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) +findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl) { - rs_ratelimit_state_t *rl; + ratelimit_t *rl; int r; pid_t *keybuf; + char pidbuf[256]; DEFiRet; if(cred == NULL) FINALIZE; +#if 0 // TODO: check deactivated? if(pLstn->ratelimitInterval == 0) { *prl = NULL; FINALIZE; } +#endif rl = hashtable_search(pLstn->ht, &cred->pid); if(rl == NULL) { @@ -625,10 +555,13 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n", (unsigned long) cred->pid); STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters); - CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t))); + snprintf(pidbuf, sizeof(pidbuf), "pid %lu", + (unsigned long) cred->pid); + pidbuf[sizeof(pidbuf)-1] = '\0'; /* to be on safe side */ + CHKiRet(ratelimitNew(&rl, "imuxsock", pidbuf)); + ratelimitSetLinuxLike(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst); CHKmalloc(keybuf = malloc(sizeof(pid_t))); *keybuf = cred->pid; - initRatelimitState(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst); r = hashtable_insert(pLstn->ht, keybuf, rl); if(r == 0) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -781,8 +714,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim uchar bufParseTAG[CONF_TAG_MAXSIZE]; struct syslogTime st; time_t tt; - rs_ratelimit_state_t *ratelimiter = NULL; int lenProp; + ratelimit_t *ratelimiter = NULL; uchar propBuf[1024]; uchar msgbuf[8192]; uchar *pmsgbuf; @@ -790,14 +723,6 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim struct syslogTime dummyTS; struct json_object *json = NULL, *jval; DEFiRet; -#warning experimental code needs to be made production-ready! -/* we need to decide how many ratelimiters we use --> hashtable - also remove current homegrown ratelimiting functionality and - replace it with the new one. - */ -static ratelimit_t *ratelimit = NULL; -if(ratelimit == NULL) - ratelimitNew(&ratelimit); /* TODO: handle format errors?? */ /* we need to parse the pri first, because we need the severity for @@ -829,10 +754,12 @@ if(ratelimit == NULL) tt = ts->tv_sec; } +#if 0 // TODO: think about stats counters (or wait for request...?) if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) { STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit); FINALIZE; } +#endif /* created trusted properties */ if(cred != NULL && pLstn->bAnnotate) { @@ -939,7 +866,6 @@ if(ratelimit == NULL) parse[15] = ' '; /* re-write \0 from fromatTimestamp3164 by SP */ /* update "counters" to reflect processed timestamp */ parse += 16; - lenMsg -= 16; } } @@ -969,7 +895,7 @@ if(ratelimit == NULL) MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - ratelimitAddMsg(ratelimit, NULL, pMsg); + ratelimitAddMsg(ratelimiter, NULL, pMsg); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: RETiRet; diff --git a/runtime/msg.c b/runtime/msg.c index d874178b..6ba20de6 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -2259,6 +2259,9 @@ void MsgSetMSGoffs(msg_t *pMsg, short offs) ISOBJ_TYPE_assert(pMsg, msg); pMsg->offMSG = offs; if(offs > pMsg->iLenRawMsg) { + if(!(offs - 1 == pMsg->iLenRawMsg)) { + *((char*)(0)) = "abd"; + } assert(offs - 1 == pMsg->iLenRawMsg); pMsg->iLenMSG = 0; } else { diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index e94ad1bc..22e785a8 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -73,8 +73,16 @@ static inline rsRetVal doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { int bNeedUnlockMutex = 0; + rsRetVal localRet; DEFiRet; + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } + } + if(ratelimit->bThreadSafe) { pthread_mutex_lock(&ratelimit->mut); bNeedUnlockMutex = 1; @@ -110,6 +118,61 @@ finalize_it: RETiRet; } +/* Linux-like ratelimiting, modelled after the linux kernel + * returns 1 if message is within rate limit and shall be + * processed, 0 otherwise. + * This implementation is NOT THREAD-SAFE and must not + * be called concurrently. + */ +static inline int +withinRatelimit(ratelimit_t *ratelimit, time_t tt) +{ + int ret; + uchar msgbuf[1024]; + + if(ratelimit->interval == 0) { + ret = 1; + goto finalize_it; + } + + assert(ratelimit->burst != 0); + + if(ratelimit->begin == 0) + ratelimit->begin = tt; + + /* resume if we go out of out time window */ + if(tt > ratelimit->begin + ratelimit->interval) { + if(ratelimit->missed) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "%s: %u messages lost due to rate-limiting", + ratelimit->name, ratelimit->missed); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + ratelimit->missed = 0; + } + ratelimit->begin = 0; + ratelimit->done = 0; + } + + /* do actual limit check */ + if(ratelimit->burst > ratelimit->done) { + ratelimit->done++; + ret = 1; + } else { + if(ratelimit->missed == 0) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "%s: begin to drop messages due to rate-limiting", + ratelimit->name); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + } + ratelimit->missed++; + ret = 0; + } + +finalize_it: + return ret; +} + + /* ratelimit a message, that means: * - handle "last message repeated n times" logic * - handle actual (discarding) rate-limiting @@ -127,19 +190,13 @@ finalize_it: rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { - rsRetVal localRet; DEFiRet; -#warning be sure to parse only when actually required! - if((pMsg->msgFlags & NEEDS_PARSING) != 0) { - if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { - DBGPRINTF("Message discarded, parsing error %d\n", localRet); + *ppRepMsg = NULL; + if(ratelimit->bLinuxLike) { + if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) ABORT_FINALIZE(RS_RET_DISCARDMSG); - } } - - *ppRepMsg = NULL; - if(ratelimit->bReduceRepeatMsgs) { CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg)); } @@ -147,6 +204,13 @@ finalize_it: RETiRet; } +/* returns 1, if the ratelimiter performs any checks and 0 otherwise */ +int +ratelimitChecked(ratelimit_t *ratelimit) +{ + return ratelimit->bLinuxLike || ratelimit->bReduceRepeatMsgs; +} + /* add a message to a ratelimiter/multisubmit structure. * ratelimiting is automatically handled according to the ratelimit @@ -184,19 +248,53 @@ finalize_it: RETiRet; } + +/* modname must be a static name (usually expected to be the module + * name and MUST be present. dynname may be NULL and can be used for + * dynamic information, e.g. PID or listener IP, ... + * Both values should be kept brief. + */ rsRetVal -ratelimitNew(ratelimit_t **ppThis) +ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname) { ratelimit_t *pThis; + char namebuf[256]; DEFiRet; CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t))); + if(modname == NULL) + modname ="*ERROR:MODULE NAME MISSING*"; + + if(dynname == NULL) { + pThis->name = strdup(modname); + } else { + snprintf(namebuf, sizeof(namebuf), "%s[%s]", + modname, dynname); + namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */ + pThis->name = strdup(namebuf); + } pThis->bReduceRepeatMsgs = runConf->globals.bReduceRepeatMsgs; + if(pThis->bReduceRepeatMsgs) + pThis->bActive = 1; *ppThis = pThis; finalize_it: RETiRet; } + +/* enable linux-like ratelimiting */ +void +ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst) +{ + ratelimit->interval = interval; + ratelimit->burst = burst; + ratelimit->done = 0; + ratelimit->missed = 0; + ratelimit->begin = 0; + ratelimit->bLinuxLike = 1; +} + + /* enable thread-safe operations mode. This make sure that * a single ratelimiter can be called from multiple threads. As * this causes some overhead and is not always required, it needs @@ -224,6 +322,7 @@ ratelimitDestruct(ratelimit_t *ratelimit) } if(ratelimit->bThreadSafe) pthread_mutex_destroy(&ratelimit->mut); + free(ratelimit->name); free(ratelimit); } diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h index 5fa884a8..a7959dfe 100644 --- a/runtime/ratelimit.h +++ b/runtime/ratelimit.h @@ -22,7 +22,10 @@ #define INCLUDED_RATELIMIT_H struct ratelimit_s { + int bActive; /**< any rate-limiting at all desired? */ + char *name; /**< rate limiter name, e.g. for user messages */ /* support for Linux kernel-type ratelimiting */ + int bLinuxLike; /**< Linux-like rate limiting enabled? */ unsigned short interval; unsigned short burst; unsigned done; @@ -37,11 +40,13 @@ struct ratelimit_s { }; /* prototypes */ -rsRetVal ratelimitNew(ratelimit_t **ppThis); +rsRetVal ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname); void ratelimitSetThreadSafe(ratelimit_t *ratelimit); +void ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst); rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep); rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg); void ratelimitDestruct(ratelimit_t *pThis); +int ratelimitChecked(ratelimit_t *ratelimit); rsRetVal ratelimitModInit(void); void ratelimitModExit(void); diff --git a/tcpsrv.c b/tcpsrv.c index 6b134017..89ad7325 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -152,7 +152,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram) snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort); statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ CHKiRet(statsobj.SetName(pEntry->stats, statname)); - CHKiRet(ratelimitNew(&pEntry->ratelimiter)); + CHKiRet(ratelimitNew(&pEntry->ratelimiter, "tcperver", NULL)); STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit); CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(pEntry->ctrSubmit))); diff --git a/tools/syslogd.c b/tools/syslogd.c index a56cea94..f76fe8cb 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -705,7 +705,6 @@ rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub) { DEFiRet; -dbgprintf("DDDD: multiSubmitFlish, nElem %d\n", pMultiSub->nElem); if(pMultiSub->nElem > 0) { iRet = multiSubmitMsg2(pMultiSub); } -- cgit v1.2.3 From cfa8d01f52c96b18e17fb905092788ad520b39ac Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 17:01:00 +0200 Subject: imptcp: support for Linux-Type ratelimiting added --- ChangeLog | 1 + doc/imptcp.html | 15 ++++++++------- plugins/imptcp/imptcp.c | 12 +++++++++++- runtime/ratelimit.c | 7 ++----- runtime/ratelimit.h | 2 -- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/ChangeLog b/ChangeLog index 307abceb..a3ab3f71 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,6 @@ --------------------------------------------------------------------------- Version 7.3.1 [devel] 2012-10-?? +- imptcp: support for Linux-Type ratelimiting added - imudp: support for input batching added (performance improvement) - change lumberjack cookie to "@cee:" from "@cee: " CEE originally specified the cookie with SP, whereas other lumberjack diff --git a/doc/imptcp.html b/doc/imptcp.html index 7e712afa..33b8b13b 100644 --- a/doc/imptcp.html +++ b/doc/imptcp.html @@ -13,18 +13,12 @@

    Description:

    Provides the ability to receive syslog messages via plain TCP syslog. This is a specialised input plugin tailored for high performance on Linux. It will -probably not run on any other platform. Also, it does no provide TLS services. +probably not run on any other platform. Also, it does not provide TLS services. Encryption can be provided by using stunnel.

    This module has no limit on the number of listeners and sessions that can be used. -

    Multiple receivers may be configured by -specifying $InputPTCPServerRun multiple times.

    Configuration Directives:

    -

    This plugin has config directives similar named as imtcp, but they all have PTCP in -their name instead of just TCP. Note that only a subset of the parameters are supported. -

      -

      Global Directives:

      • Threads <number>
        @@ -91,6 +85,13 @@ the message was received from. Binds specified ruleset to next server defined.
      • Address <name>
        On multi-homed machines, specifies to which local address the listerner should be bound. +
      • RateLimit.Interval [number] - (available since 7.3.1) specifies the rate-limiting +interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number +of seconds (5 recommended) to activate rate-limiting. +
      • +
      • RateLimit.Burst [number] - (available since 7.3.1) specifies the rate-limiting +burst in number of messages. Default is 10,000. +
      Caveats/Known Bugs:
        diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index aba4c439..0475e219 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -122,6 +122,8 @@ struct instanceConf_s { uchar *pszBindRuleset; /* name of ruleset to bind to */ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + int ratelimitInterval; + int ratelimitBurst; struct instanceConf_s *next; }; @@ -159,6 +161,8 @@ static struct cnfparamdescr inppdescr[] = { { "keepalive.time", eCmdHdlrInt, 0 }, { "keepalive.interval", eCmdHdlrInt, 0 }, { "addtlframedelimiter", eCmdHdlrInt, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 } }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, @@ -1042,6 +1046,8 @@ createInstance(instanceConf_t **pinst) inst->bEmitMsgOnClose = 0; inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; inst->pBindRuleset = NULL; + inst->ratelimitBurst = 10000; /* arbitrary high limit */ + inst->ratelimitInterval = 0; /* off */ /* node created, let's add to config */ if(loadModConf->tail == NULL) { @@ -1122,7 +1128,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort)); -//TODO: add!ratelimitSetLinuxLike(pSrv->ratelimiter, 3, 2); + ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst); ratelimitSetThreadSafe(pSrv->ratelimiter); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; @@ -1452,6 +1458,10 @@ CODESTARTnewInpInst inst->iAddtlFrameDelim = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) { inst->bEmitMsgOnClose = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) { + inst->ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) { + inst->ratelimitInterval = (int) pvals[i].val.d.n; } else { dbgprintf("imptcp: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index 22e785a8..24152e8d 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -193,7 +193,7 @@ ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) DEFiRet; *ppRepMsg = NULL; - if(ratelimit->bLinuxLike) { + if(ratelimit->interval) { if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) ABORT_FINALIZE(RS_RET_DISCARDMSG); } @@ -208,7 +208,7 @@ finalize_it: int ratelimitChecked(ratelimit_t *ratelimit) { - return ratelimit->bLinuxLike || ratelimit->bReduceRepeatMsgs; + return ratelimit->interval || ratelimit->bReduceRepeatMsgs; } @@ -274,8 +274,6 @@ ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname) pThis->name = strdup(namebuf); } pThis->bReduceRepeatMsgs = runConf->globals.bReduceRepeatMsgs; - if(pThis->bReduceRepeatMsgs) - pThis->bActive = 1; *ppThis = pThis; finalize_it: RETiRet; @@ -291,7 +289,6 @@ ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned ratelimit->done = 0; ratelimit->missed = 0; ratelimit->begin = 0; - ratelimit->bLinuxLike = 1; } diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h index a7959dfe..820817bc 100644 --- a/runtime/ratelimit.h +++ b/runtime/ratelimit.h @@ -22,10 +22,8 @@ #define INCLUDED_RATELIMIT_H struct ratelimit_s { - int bActive; /**< any rate-limiting at all desired? */ char *name; /**< rate limiter name, e.g. for user messages */ /* support for Linux kernel-type ratelimiting */ - int bLinuxLike; /**< Linux-like rate limiting enabled? */ unsigned short interval; unsigned short burst; unsigned done; -- cgit v1.2.3 From 411b7fd43d27baa64fc80f717606ca4efa0d4801 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 17:03:39 +0200 Subject: remove debug code went accidently into commit --- runtime/msg.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/runtime/msg.c b/runtime/msg.c index 6ba20de6..d874178b 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -2259,9 +2259,6 @@ void MsgSetMSGoffs(msg_t *pMsg, short offs) ISOBJ_TYPE_assert(pMsg, msg); pMsg->offMSG = offs; if(offs > pMsg->iLenRawMsg) { - if(!(offs - 1 == pMsg->iLenRawMsg)) { - *((char*)(0)) = "abd"; - } assert(offs - 1 == pMsg->iLenRawMsg); pMsg->iLenMSG = 0; } else { -- cgit v1.2.3 From af378ec5dd50699d63bedf11f57b6239fb70896b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 17:12:47 +0200 Subject: imudp: support for Linux-like ratelimiting added --- ChangeLog | 4 +++- doc/imudp.html | 7 +++++++ plugins/imudp/imudp.c | 16 ++++++++++++++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/ChangeLog b/ChangeLog index a3ab3f71..aeefee6c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,7 +1,9 @@ --------------------------------------------------------------------------- Version 7.3.1 [devel] 2012-10-?? - imptcp: support for Linux-Type ratelimiting added -- imudp: support for input batching added (performance improvement) +- imudp enhancements: + * support for input batching added (performance improvement) + * support for Linux-Type ratelimiting added - change lumberjack cookie to "@cee:" from "@cee: " CEE originally specified the cookie with SP, whereas other lumberjack tools used it without space. In order to keep interop with lumberjack, diff --git a/doc/imudp.html b/doc/imudp.html index b1a3ecc9..e32f9ecf 100644 --- a/doc/imudp.html +++ b/doc/imudp.html @@ -47,6 +47,13 @@ default 514, start UDP server on this port. Either a single port can be specifie
        Array of ports: Port=["514","515","10514","..."]
      • Ruleset <ruleset>
        Binds the listener to a specific ruleset.
      • +
      • RateLimit.Interval [number] - (available since 7.3.1) specifies the rate-limiting +interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number +of seconds (5 recommended) to activate rate-limiting. +
      • +
      • RateLimit.Burst [number] - (available since 7.3.1) specifies the rate-limiting +burst in number of messages. Default is 10,000. +
      Caveats/Known Bugs:
        diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index aba7d69d..3b140847 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -109,6 +109,8 @@ struct instanceConf_s { uchar *pszBindPort; /* Port to bind socket to */ uchar *pszBindRuleset; /* name of ruleset to bind to */ ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + int ratelimitInterval; + int ratelimitBurst; struct instanceConf_s *next; }; @@ -140,7 +142,9 @@ static struct cnfparamblk modpblk = static struct cnfparamdescr inppdescr[] = { { "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */ { "address", eCmdHdlrString, 0 }, - { "ruleset", eCmdHdlrString, 0 } + { "ruleset", eCmdHdlrString, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 } }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, @@ -165,6 +169,8 @@ createInstance(instanceConf_t **pinst) inst->pszBindPort = NULL; inst->pszBindAddr = NULL; inst->pszBindRuleset = NULL; + inst->ratelimitBurst = 10000; /* arbitrary high limit */ + inst->ratelimitInterval = 0; /* off */ /* node created, let's add to config */ if(loadModConf->tail == NULL) { @@ -250,7 +256,9 @@ addListner(instanceConf_t *inst) newlcnfinfo->pRuleset = inst->pBindRuleset; snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port); dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */ - CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, dispname, NULL)); + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL)); + ratelimitSetLinuxLike(newlcnfinfo->ratelimiter, inst->ratelimitInterval, + inst->ratelimitBurst); /* support statistics gathering */ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname)); @@ -691,6 +699,10 @@ createListner(es_str_t *port, struct cnfparamvals *pvals) inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) { + inst->ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) { + inst->ratelimitInterval = (int) pvals[i].val.d.n; } else { dbgprintf("imudp: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); -- cgit v1.2.3 From 111706066670306026155ba7845ffffd65860c4e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 17:16:25 +0200 Subject: fix: linux-like ratelimiting did not emit "final" message... on destruction of rate limiter. --- runtime/ratelimit.c | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index 24152e8d..22059ac8 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -118,6 +118,21 @@ finalize_it: RETiRet; } + +/* helper: tell how many messages we lost due to linux-like ratelimiting */ +static inline void +tellLostCnt(ratelimit_t *ratelimit) +{ + uchar msgbuf[1024]; + if(ratelimit->missed) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "%s: %u messages lost due to rate-limiting", + ratelimit->name, ratelimit->missed); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + ratelimit->missed = 0; + } +} + /* Linux-like ratelimiting, modelled after the linux kernel * returns 1 if message is within rate limit and shall be * processed, 0 otherwise. @@ -142,13 +157,7 @@ withinRatelimit(ratelimit_t *ratelimit, time_t tt) /* resume if we go out of out time window */ if(tt > ratelimit->begin + ratelimit->interval) { - if(ratelimit->missed) { - snprintf((char*)msgbuf, sizeof(msgbuf), - "%s: %u messages lost due to rate-limiting", - ratelimit->name, ratelimit->missed); - logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); - ratelimit->missed = 0; - } + tellLostCnt(ratelimit); ratelimit->begin = 0; ratelimit->done = 0; } @@ -317,6 +326,7 @@ ratelimitDestruct(ratelimit_t *ratelimit) } msgDestruct(&ratelimit->pMsg); } + tellLostCnt(ratelimit); if(ratelimit->bThreadSafe) pthread_mutex_destroy(&ratelimit->mut); free(ratelimit->name); -- cgit v1.2.3 From 68414e7815813cf7a433b9b8a8cfbb28ec27c637 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 17:19:12 +0200 Subject: cosmetic: remove compiler warning --- plugins/imfile/imfile.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 9a8a0373..d50f917e 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -412,7 +412,7 @@ addListner(instanceConf_t *inst) pThis->lenTag = ustrlen(pThis->pszTag); pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile); - CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", inst->pszFileName)); + CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", (char*)inst->pszFileName)); CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*))); pThis->multiSub.maxElem = inst->nMultiSub; pThis->multiSub.nElem = 0; -- cgit v1.2.3 From 048a6b9573aec760129bb1332610b323c7cd8cd7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 18:11:51 +0200 Subject: fix: we need to use loadConf to access global settings Hint: globals need to be re-done... --- runtime/ratelimit.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index 22059ac8..ccfcfdd2 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -282,7 +282,7 @@ ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname) namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */ pThis->name = strdup(namebuf); } - pThis->bReduceRepeatMsgs = runConf->globals.bReduceRepeatMsgs; + pThis->bReduceRepeatMsgs = loadConf->globals.bReduceRepeatMsgs; *ppThis = pThis; finalize_it: RETiRet; -- cgit v1.2.3 From b6e6cfeff8cdf4c8e2da1e623b66698f4a51856f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 18:17:59 +0200 Subject: imdiag: add support for new (ratelimiting) msg submit interface --- plugins/imdiag/imdiag.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 09742537..15948215 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -53,6 +53,7 @@ #include "srUtils.h" #include "msg.h" #include "datetime.h" +#include "ratelimit.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ MODULE_TYPE_INPUT @@ -200,7 +201,7 @@ finalize_it: /* actually submit a message to the rsyslog core */ static rsRetVal -doInjectMsg(int iNum) +doInjectMsg(int iNum, ratelimit_t *ratelimiter) { uchar szMsg[1024]; msg_t *pMsg; @@ -220,7 +221,7 @@ doInjectMsg(int iNum) pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pRcvDummy); CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg)); finalize_it: RETiRet; @@ -238,6 +239,7 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess) int iFrom; int nMsgs; int i; + ratelimit_t *ratelimit; DEFiRet; /* we do not check errors here! */ @@ -245,13 +247,15 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess) iFrom = atoi((char*)wordBuf); getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE); nMsgs = atoi((char*)wordBuf); + ratelimitNew(&ratelimit, "imdiag", "injectmsg"); for(i = 0 ; i < nMsgs ; ++i) { - doInjectMsg(i + iFrom); + doInjectMsg(i + iFrom, ratelimit); } CHKiRet(sendResponse(pSess, "%d messages injected\n", nMsgs)); DBGPRINTF("imdiag: %d messages injected\n", nMsgs); + ratelimitDestruct(ratelimit); finalize_it: RETiRet; -- cgit v1.2.3 From 9602d83730803899965e0bad780bc4b5a09adae2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 18:46:08 +0200 Subject: fix: memory leak in linux-like ratelimiter --- runtime/ratelimit.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index ccfcfdd2..4b618fb5 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -203,8 +203,10 @@ ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) *ppRepMsg = NULL; if(ratelimit->interval) { - if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) + if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) { + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_DISCARDMSG); + } } if(ratelimit->bReduceRepeatMsgs) { CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg)); -- cgit v1.2.3 From 0dc56f1426315854c09e78b37104ed1e818a75bb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Oct 2012 19:01:05 +0200 Subject: imtcp: support for Linux-Type ratelimiting added --- ChangeLog | 1 + doc/imtcp.html | 11 +++++++---- plugins/imtcp/imtcp.c | 13 ++++++++++++- tcpsrv.c | 16 ++++++++++++++++ tcpsrv.h | 6 +++++- 5 files changed, 41 insertions(+), 6 deletions(-) diff --git a/ChangeLog b/ChangeLog index aeefee6c..973495b4 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,6 @@ --------------------------------------------------------------------------- Version 7.3.1 [devel] 2012-10-?? +- imtcp: support for Linux-Type ratelimiting added - imptcp: support for Linux-Type ratelimiting added - imudp enhancements: * support for input batching added (performance improvement) diff --git a/doc/imtcp.html b/doc/imtcp.html index 01ea2802..4bda46ba 100644 --- a/doc/imtcp.html +++ b/doc/imtcp.html @@ -17,10 +17,6 @@ Encryption is natively provided by selecting the approprioate network stream driver and can also be provided by using stunnel (an alternative is the use the imgssapi module).

        -

        Multiple receivers may be configured by specifying -$InputTCPServerRun multiple times. This is available since version 4.3.1, earlier -versions do NOT support it. -

        Configuration Directives:

        Global Directives:

        @@ -100,6 +96,13 @@ activated. This is the default and should be left unchanged until you know very well what you do. It may be useful to turn it off, if you know this framing is not used and some senders emit multi-line messages into the message stream. +
      • RateLimit.Interval [number] - (available since 7.3.1) specifies the rate-limiting +interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number +of seconds (5 recommended) to activate rate-limiting. +
      • +
      • RateLimit.Burst [number] - (available since 7.3.1) specifies the rate-limiting +burst in number of messages. Default is 10,000. +
      Caveats/Known Bugs:
        diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index beb7d705..8d71d5f2 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -105,6 +105,8 @@ struct instanceConf_s { uchar *pszBindRuleset; /* name of ruleset to bind to */ ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ + int ratelimitInterval; + int ratelimitBurst; int bSuppOctetFram; struct instanceConf_s *next; }; @@ -155,7 +157,9 @@ static struct cnfparamdescr inppdescr[] = { { "port", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */ { "name", eCmdHdlrString, 0 }, { "ruleset", eCmdHdlrString, 0 }, - { "supportOctetCountedFraming", eCmdHdlrBinary, 0 } + { "supportOctetCountedFraming", eCmdHdlrBinary, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 } }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, @@ -251,6 +255,8 @@ createInstance(instanceConf_t **pinst) inst->pszBindRuleset = NULL; inst->pszInputName = NULL; inst->bSuppOctetFram = 1; + inst->ratelimitInterval = 0; + inst->ratelimitBurst = 10000; /* node created, let's add to config */ if(loadModConf->tail == NULL) { @@ -334,6 +340,7 @@ addListner(modConfData_t *modConf, instanceConf_t *inst) CHKiRet(tcpsrv.SetRuleset(pOurTcpsrv, inst->pBindRuleset)); CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, inst->pszInputName == NULL ? UCHAR_CONSTANT("imtcp") : inst->pszInputName)); + CHKiRet(tcpsrv.SetLinuxLikeRatelimiters(pOurTcpsrv, inst->ratelimitInterval, inst->ratelimitBurst)); tcpsrv.configureTCPListen(pOurTcpsrv, inst->pszBindPort, inst->bSuppOctetFram); finalize_it: @@ -376,6 +383,10 @@ CODESTARTnewInpInst inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "supportOctetCountedFraming")) { inst->bSuppOctetFram = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) { + inst->ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) { + inst->ratelimitInterval = (int) pvals[i].val.d.n; } else { dbgprintf("imtcp: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); diff --git a/tcpsrv.c b/tcpsrv.c index 89ad7325..7ba557e0 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -153,6 +153,8 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram) statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ CHKiRet(statsobj.SetName(pEntry->stats, statname)); CHKiRet(ratelimitNew(&pEntry->ratelimiter, "tcperver", NULL)); + ratelimitSetLinuxLike(pEntry->ratelimiter, pThis->ratelimitInterval, pThis->ratelimitBurst); + ratelimitSetThreadSafe(pEntry->ratelimiter); STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit); CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(pEntry->ctrSubmit))); @@ -916,6 +918,8 @@ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macr pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; pThis->bDisableLFDelim = 0; pThis->OnMsgReceive = NULL; + pThis->ratelimitInterval = 0; + pThis->ratelimitBurst = 10000; pThis->bUseFlowControl = 1; ENDobjConstruct(tcpsrv) @@ -1123,6 +1127,17 @@ finalize_it: } +/* Set the linux-like ratelimiter settings */ +static rsRetVal +SetLinuxLikeRatelimiters(tcpsrv_t *pThis, int ratelimitInterval, int ratelimitBurst) +{ + DEFiRet; + pThis->ratelimitInterval = ratelimitInterval; + pThis->ratelimitBurst = ratelimitBurst; + RETiRet; +} + + /* Set the ruleset (ptr) to use */ static rsRetVal SetRuleset(tcpsrv_t *pThis, ruleset_t *pRuleset) @@ -1273,6 +1288,7 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->SetCBOnErrClose = SetCBOnErrClose; pIf->SetOnMsgReceive = SetOnMsgReceive; pIf->SetRuleset = SetRuleset; + pIf->SetLinuxLikeRatelimiters = SetLinuxLikeRatelimiters; pIf->SetNotificationOnRemoteClose = SetNotificationOnRemoteClose; finalize_it: diff --git a/tcpsrv.h b/tcpsrv.h index 4884b34d..93e472c9 100644 --- a/tcpsrv.h +++ b/tcpsrv.h @@ -71,6 +71,8 @@ struct tcpsrv_s { int addtlFrameDelim; /**< additional frame delimiter for plain TCP syslog framing (e.g. to handle NetScreen) */ int bDisableLFDelim; /**< if 1, standard LF frame delimiter is disabled (*very dangerous*) */ + int ratelimitInterval; + int ratelimitBurst; tcps_sess_t **pSessions;/**< array of all of our sessions */ void *pUsr; /**< a user-settable pointer (provides extensibility for "derived classes")*/ /* callbacks */ @@ -143,8 +145,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetUseFlowControl)(tcpsrv_t*, int); /* added v11 -- rgerhards, 2011-05-09 */ rsRetVal (*SetKeepAlive)(tcpsrv_t*, int); + /* added v13 -- rgerhards, 2012-10-15 */ + rsRetVal (*SetLinuxLikeRatelimiters)(tcpsrv_t *pThis, int interval, int burst); ENDinterface(tcpsrv) -#define tcpsrvCURR_IF_VERSION 12 /* increment whenever you change the interface structure! */ +#define tcpsrvCURR_IF_VERSION 13 /* increment whenever you change the interface structure! */ /* change for v4: * - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10 * - SetInputName() added -- rgerhards, 2008-12-10 -- cgit v1.2.3 From 83d2e1945959b02f63943804ed67ed156d43aa62 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 18 Oct 2012 18:28:05 +0200 Subject: omruleset: update to new message submit APIs/ratelimiter ... with the notable exception, that this module actualy needs no rate-limiter by design! --- plugins/omruleset/omruleset.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c index 6c770c94..fd002265 100644 --- a/plugins/omruleset/omruleset.c +++ b/plugins/omruleset/omruleset.c @@ -120,7 +120,11 @@ CODESTARTdoAction (char*) pData->pszRulesetName, pData->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); MsgSetRuleset(pMsg, pData->pRuleset); - submitMsg(pMsg); + /* Note: we intentionally use submitMsg2() here, as we process messages + * that were already run through the rate-limiter. So it is (at least) + * questionable if they were rate-limited again. + */ + submitMsg2(pMsg); finalize_it: ENDdoAction -- cgit v1.2.3 From de2fd07836accab563ecbf0405749b6bef9e76b4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 19 Oct 2012 08:45:04 +0200 Subject: cosmetic: get rid of compiler warning on currently unused debug code --- runtime/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/queue.c b/runtime/queue.c index 0cd33701..fbf77108 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -127,7 +127,7 @@ static struct cnfparamblk pblk = }; /* debug aid */ -static void displayBatchState(batch_t *pBatch) +static inline void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { -- cgit v1.2.3 From a7c5807346d3f8d4ae54f3dada580a026e4747ec Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 19 Oct 2012 10:03:30 +0200 Subject: imklog, impstats: support for new rate-limiting API added --- plugins/imklog/imklog.c | 8 +++----- plugins/impstats/impstats.c | 3 ++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c index 2897d76d..6eed33fe 100644 --- a/plugins/imklog/imklog.c +++ b/plugins/imklog/imklog.c @@ -101,11 +101,8 @@ static struct cnfparamblk modpblk = modpdescr }; - - static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */ -static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */ - +static prop_t *pLocalHostIP = NULL; static inline void initConfigSettings(void) @@ -148,7 +145,8 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval * MsgSetTAG(pMsg, pszTag, ustrlen(pszTag)); pMsg->iFacility = iFacility; pMsg->iSeverity = iSeverity; - CHKiRet(submitMsg(pMsg)); + /* note: we do NOT use rate-limiting, as the kernel itself does rate-limiting */ + CHKiRet(submitMsg2(pMsg)); finalize_it: RETiRet; diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 62599969..9bd11556 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -138,7 +138,8 @@ doSubmitMsg(uchar *line) pMsg->iSeverity = runModConf->iSeverity; pMsg->msgFlags = 0; - submitMsg(pMsg); + /* we do not use rate-limiting, as the stats message always need to be emitted */ + submitMsg2(pMsg); finalize_it: RETiRet; -- cgit v1.2.3 From fb53a0420f84400628fea810306a9936933ec875 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 19 Oct 2012 10:04:59 +0200 Subject: cleanup --- tools/syslogd.c | 38 -------------------------------------- 1 file changed, 38 deletions(-) diff --git a/tools/syslogd.c b/tools/syslogd.c index 2e5d08cd..a8c5d887 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -489,44 +489,6 @@ finalize_it: RETiRet; } -/* check message against ACL set - * rgerhards, 2009-11-16 - */ -#if 0 -static inline rsRetVal -chkMsgAgainstACL() { - /* if we reach this point, we had a good receive and can process the packet received */ - /* check if we have a different sender than before, if so, we need to query some new values */ - if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { - CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP)); - memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ - /* Here we check if a host is permitted to send us - * syslog messages. If it isn't, we do not further - * process the message but log a warning (if we are - * configured to do this). - * rgerhards, 2005-09-26 - */ - *pbIsPermitted = net.isAllowedSender((uchar*)"UDP", - (struct sockaddr *)&frominet, (char*)fromHostFQDN); - - if(!*pbIsPermitted) { - DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN); - if(glbl.GetOption_DisallowWarning) { - time_t tt; - - datetime.GetTime(&tt); - if(tt > ttLastDiscard + 60) { - ttLastDiscard = tt; - errmsg.LogError(0, NO_ERRCODE, - "UDP message from disallowed sender %s discarded", - (char*)fromHost); - } - } - } - } -} -#endif - /* preprocess a batch of messages, that is ready them for actual processing. This is done * as a first stage and totally in parallel to any other worker active in the system. So -- cgit v1.2.3 From 9516e8f99f0385eb7948df9b6af87b51d3462ccb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 19 Oct 2012 11:01:37 +0200 Subject: ratelimit: add default ratelimiter --- tools/syslogd.c | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tools/syslogd.c b/tools/syslogd.c index a8c5d887..cb6a47cd 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -221,6 +221,7 @@ struct queuefilenames_s { } *queuefilenames = NULL; +static ratelimit_t *dflt_ratelimiter = NULL; /* ratelimiter for submits without explicit one */ int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */ int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */ static int NoFork = 0; /* don't fork - don't run in daemon mode - read-only after startup */ @@ -432,6 +433,12 @@ submitErrMsg(int iErr, uchar *msg) } +static inline rsRetVal +submitMsgWithDfltRatelimiter(msg_t *pMsg) +{ + return ratelimitAddMsg(dflt_ratelimiter, NULL, pMsg); +} + /* rgerhards 2004-11-09: the following is a function that can be used * to log a message orginating from the syslogd itself. */ @@ -483,7 +490,7 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags) /* we have the queue, so we can simply provide the * message to the queue engine. */ - submitMsg2(pMsg); + submitMsgWithDfltRatelimiter(pMsg); } finalize_it: RETiRet; @@ -594,7 +601,7 @@ submitMsg2(msg_t *pMsg) /* if a plugin logs a message during shutdown, the queue may no longer exist */ if(pQueue == NULL) { - DBGPRINTF("submitMsg() could not submit message - " + DBGPRINTF("submitMsg2() could not submit message - " "queue does (no longer?) exist - ignored\n"); FINALIZE; } @@ -609,7 +616,7 @@ finalize_it: rsRetVal submitMsg(msg_t *pMsg) { - return submitMsg2(pMsg); + return submitMsgWithDfltRatelimiter(pMsg); } @@ -1246,7 +1253,7 @@ static inline void processImInternal(void) msg_t *pMsg; while(iminternalRemoveMsg(&pMsg) == RS_RET_OK) { - submitMsg(pMsg); + submitMsgWithDfltRatelimiter(pMsg); } } @@ -2023,6 +2030,9 @@ int realMain(int argc, char **argv) } CHKiRet(localRet); + CHKiRet(ratelimitNew(&dflt_ratelimiter, "rsyslogd", NULL)); + /* TODO: add linux-type limiting capability */ + if(bChDirRoot) { if(chdir("/") != 0) fprintf(stderr, "Can not do 'cd /' - still trying to run\n"); -- cgit v1.2.3 From 3d41da36b8e7e3980579d67600cf42128e5c3aee Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 23 Oct 2012 11:22:56 +0200 Subject: remove "last message repeated n times" from rsyslog output part --- action.c | 142 ++++++-------------------------------------------- action.h | 11 +--- dirty.h | 11 ---- runtime/conf.c | 9 +--- tests/diag.sh | 6 +-- tests/runtime-dummy.c | 1 - tools/syslogd.c | 78 ++------------------------- 7 files changed, 26 insertions(+), 232 deletions(-) diff --git a/action.c b/action.c index cf010d01..18bd68bc 100644 --- a/action.c +++ b/action.c @@ -12,11 +12,11 @@ * necessary to triple-check that everything works well in *all* modes. * The different modes (and calling sequence) are: * - * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval + * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval * - doSubmitToActionQComplexBatch * - helperSubmitToActionQComplexBatch * - doActionCallAction - * handles duplicate message processing, but in essence calls + * handles mark message reduction, but in essence calls * - actionWriteToAction * - qqueueEnqObj * (now queue engine processing) @@ -307,9 +307,6 @@ rsRetVal actionDestruct(action_t *pThis) if(pThis->pMod != NULL) pThis->pMod->freeInstance(pThis->pModData); - if(pThis->f_pMsg != NULL) - msgDestruct(&pThis->f_pMsg); - pthread_mutex_destroy(&pThis->mutAction); pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); @@ -410,16 +407,11 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 */ if( pThis->iExecEveryNthOccur > 1 - || pThis->f_ReduceRepeated || pThis->iSecsExecOnceInterval ) { DBGPRINTF("info: firehose mode disabled for action because " - "iExecEveryNthOccur=%d, " - "ReduceRepeated=%d, " - "iSecsExecOnceInterval=%d\n", - pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, - pThis->iSecsExecOnceInterval - ); + "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); pThis->submitToActQ = doSubmitToActionQComplexBatch; } else if(pThis->bWriteAllMarkMsgs == RSFALSE) { /* nearly full-speed submission mode, default case */ @@ -782,7 +774,6 @@ rsRetVal actionDbgPrint(action_t *pThis) pThis->pMod->dbgPrintInstInfo(pThis->pModData); dbgprintf("\n"); dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); - dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); if(pThis->eState == ACT_STATE_SUSP) { dbgprintf("\tresume next retry: %u, number retries: %d", @@ -1418,14 +1409,10 @@ finalize_it: * be filtered out before calling us (what is done currently!). */ rsRetVal -actionWriteToAction(action_t *pAction) +actionWriteToAction(action_t *pAction, msg_t *pMsg) { - msg_t *pMsgSave; /* to save current message pointer, necessary to restore - it in case it needs to be updated (e.g. repeated msgs) */ DEFiRet; - pMsgSave = NULL; /* indicate message poiner not saved */ - /* first, we check if the action should actually be called. The action-specific * $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth * time. So we need to check if we need to drop the (otherwise perfectly executable) @@ -1452,43 +1439,6 @@ actionWriteToAction(action_t *pAction) } } - /* then check if this is a regular message or the repeation of - * a previous message. If so, we need to change the message text - * to "last message repeated n times" and then go ahead and write - * it. Please note that we can not modify the message object, because - * that would update it in other selectors as well. As such, we first - * need to create a local copy of the message, which we than can update. - * rgerhards, 2007-07-10 - */ - if(pAction->f_prevcount > 1) { - msg_t *pMsg; - size_t lenRepMsg; - uchar szRepMsg[1024]; - - if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) { - /* it failed - nothing we can do against it... */ - DBGPRINTF("Message duplication failed, dropping repeat message.\n"); - ABORT_FINALIZE(RS_RET_ERR); - } - - if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times", - pAction->f_prevcount); - } else { - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]", - pAction->f_prevcount, getMSG(pAction->f_pMsg)); - } - - /* We now need to update the other message properties. Please note that digital - * signatures inside the message are also invalidated. - */ - datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime)); - memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime)); - MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg); - pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */ - pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ - } - DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); /* now check if we need to drop the message because otherwise the action would be too @@ -1509,31 +1459,14 @@ actionWriteToAction(action_t *pAction) /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) * rgerhards, 2008-09-17 */ pAction->tLastExec = getActNow(pAction); /* re-init time flags */ - pAction->f_time = pAction->f_pMsg->ttGenTime; + pAction->f_time = pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = doSubmitToActionQ(pAction, pAction->f_pMsg); - - if(iRet == RS_RET_OK) - pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ + iRet = doSubmitToActionQ(pAction, pMsg); finalize_it: - if(pMsgSave != NULL) { - /* we had saved the original message pointer. That was - * done because we needed to create a temporary one - * (most often for "message repeated n time" handling). If so, - * we need to restore the original one now, so that procesing - * can continue as normal. We also need to discard the temporary - * one, as we do not like memory leaks ;) Please note that the original - * message object will be discarded by our callers, so this is nothing - * of our business. rgerhards, 2007-07-10 - */ - msgDestruct(&pAction->f_pMsg); - pAction->f_pMsg = pMsgSave; /* restore it */ - } - RETiRet; } @@ -1556,43 +1489,8 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) ABORT_FINALIZE(RS_RET_OK); } - /* suppress duplicate messages */ - if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL && - (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) && - !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && - !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) && - !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) && - !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) { - pAction->f_prevcount++; - DBGPRINTF("msg repeated %d times, %ld sec of %d.\n", - pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time, - repeatinterval[pAction->f_repeatcount]); - /* use current message, so we have the new timestamp (means we need to discard previous one) */ - msgDestruct(&pAction->f_pMsg); - pAction->f_pMsg = MsgAddRef(pMsg); - /* If domark would have logged this by now, flush it now (so we don't hold - * isolated messages), but back off so we'll flush less often in the future. - */ - if(getActNow(pAction) > REPEATTIME(pAction)) { - iRet = actionWriteToAction(pAction); - BACKOFF(pAction); - } - } else {/* new message, save it */ - /* first check if we have a previous message stored - * if so, emit and then discard it first - */ - if(pAction->f_pMsg != NULL) { - if(pAction->f_prevcount > 0) - actionWriteToAction(pAction); - /* we do not care about iRet above - I think it's right but if we have - * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01 - */ - msgDestruct(&pAction->f_pMsg); - } - pAction->f_pMsg = MsgAddRef(pMsg); - /* call the output driver */ - iRet = actionWriteToAction(pAction); - } + /* call the output driver */ + iRet = actionWriteToAction(pAction, pMsg); finalize_it: /* we need to update the batch to handle failover processing correctly */ @@ -1890,13 +1788,16 @@ actionRequiresDateCall(action_t *pAction) int i; int r = 0; + if(pAction->eParamPassing == ACT_MSG_PASSING) + /* in msg passing mode, we have NO templates! */ + goto done; for(i = 0 ; i < pAction->iNumTpls ; ++i) { if(tplRequiresDateCall(pAction->ppTpl[i])) { r = 1; break; } } - return r; +done: return r; } @@ -1995,13 +1896,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, pAction->pMod = pMod; pAction->pModData = pModData; - /* now check if the module is compatible with select features */ - if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) { - pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; - } else { - DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); - pAction->f_ReduceRepeated = 0; - } + /* check if the module is compatible with select features (currently no such features exist) */ pAction->eState = ACT_STATE_RDY; /* action is enabled */ pAction->requiresDateCall = actionRequiresDateCall(pAction); @@ -2102,13 +1997,8 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { - /* now check if the module is compatible with select features */ - if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) - pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; - else { - DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); - pAction->f_ReduceRepeated = 0; - } + /* check if the module is compatible with select features + * (currently no such features exist) */ pAction->eState = ACT_STATE_RDY; /* action is enabled */ loadConf->actions.nbrActions++; /* one more active action! */ } diff --git a/action.h b/action.h index 177fd682..fd84e2f1 100644 --- a/action.h +++ b/action.h @@ -48,7 +48,7 @@ typedef enum { */ typedef struct action_s action_t; struct action_s { - time_t f_time; /* used for "message repeated n times" - be careful, old, old code */ + time_t f_time; /* used for "max. n messages in m seconds" processing */ time_t tActNow; /* the current time for an action execution. Initially set to -1 and populated on an as-needed basis. This is a performance optimization. */ time_t tLastExec; /* time this action was last executed */ @@ -69,10 +69,7 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */ sbool requiresDateCall;/* do we need to do a date call before creating templates? */ - int f_prevcount; /* repetition cnt of prevline */ - int f_repeatcount; /* number of "repeated" msgs */ rsRetVal (*submitToActQ)(action_t *, batch_t *);/* function submit message to action queue */ rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2, @@ -81,10 +78,6 @@ struct action_s { int iNumTpls; /* number of array entries for template element below */ struct template **ppTpl;/* array of template to use - strings must be passed to doAction * in this order. */ - msg_t *f_pMsg; /* pointer to the message (this will replace the other vars with msg - * content later). This is preserved after the message has been - * processed - it is also used to detect duplicates. - */ qqueue_t *pQueue; /* action queue */ pthread_mutex_t mutAction; /* primary action mutex */ pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ @@ -105,7 +98,7 @@ rsRetVal actionDestruct(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); -rsRetVal actionWriteToAction(action_t *pAction); +rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct cnfparamvals *queueParams, int bSuspended); diff --git a/dirty.h b/dirty.h index 30b30bec..743632a2 100644 --- a/dirty.h +++ b/dirty.h @@ -37,19 +37,8 @@ rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar * rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName); -/* Intervals at which we flush out "message repeated" messages, - * in seconds after previous message is logged. After each flush, - * we move to the next interval until we reach the largest. - * TODO: move this to action object! Only action.c and syslogd.c use it. - */ extern int MarkInterval; -extern int repeatinterval[2]; extern qqueue_t *pMsgQueue; /* the main message queue */ extern int iConfigVerify; /* is this just a config verify run? */ extern int bHaveMainQueue; -#define MAXREPEAT ((int)((sizeof(repeatinterval) / sizeof(repeatinterval[0])) - 1)) -#define REPEATTIME(f) ((f)->f_time + repeatinterval[(f)->f_repeatcount]) -#define BACKOFF(f) { if (++(f)->f_repeatcount > MAXREPEAT) \ - (f)->f_repeatcount = MAXREPEAT; \ - } #endif /* #ifndef DIRTY_H_INCLUDED */ diff --git a/runtime/conf.c b/runtime/conf.c index 23fb6bbd..c97391c6 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -607,13 +607,8 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction) if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) { if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { - /* now check if the module is compatible with select features */ - if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) - pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; - else { - dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n"); - pAction->f_ReduceRepeated = 0; - } + /* here check if the module is compatible with select features + * (currently, we have no such features!) */ pAction->eState = ACT_STATE_RDY; /* action is enabled */ conf->actions.nbrActions++; /* one more active action! */ } diff --git a/tests/diag.sh b/tests/diag.sh index b278d2c5..0cbf7abb 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -5,13 +5,13 @@ # not always able to convey back states to the upper-level test driver # begun 2009-05-27 by rgerhards # This file is part of the rsyslog project, released under GPLv3 -#valgrind="valgrind --malloc-fill=ff --free-fill=fe --log-fd=1" +valgrind="valgrind --malloc-fill=ff --free-fill=fe --log-fd=1" #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #valgrind="valgrind --tool=exp-ptrcheck --log-fd=1" #set -o xtrace -#export RSYSLOG_DEBUG="debug nologfuncflow noprintmutexaction nostdout" -#export RSYSLOG_DEBUGLOG="log" +export RSYSLOG_DEBUG="debug nologfuncflow noprintmutexaction nostdout" +export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason cp $srcdir/testsuites/diag-common.conf diag-common.conf diff --git a/tests/runtime-dummy.c b/tests/runtime-dummy.c index 5a9039bf..f6f2d07f 100644 --- a/tests/runtime-dummy.c +++ b/tests/runtime-dummy.c @@ -30,7 +30,6 @@ #include "rsyslog.h" int bReduceRepeatMsgs = 0; -int repeatinterval = 30; int bActExecWhenPrevSusp = 0; int iActExecOnceInterval = 1; int MarkInterval = 30; diff --git a/tools/syslogd.c b/tools/syslogd.c index cb6a47cd..3e85108f 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -44,7 +44,6 @@ #include "rsyslog.h" #define DEFUPRI (LOG_USER|LOG_NOTICE) -#define TIMERINTVL 30 /* interval for checking flush, mark */ #include #include @@ -204,13 +203,6 @@ static int bFinished = 0; /* used by termination signal handler, read-only excep */ int iConfigVerify = 0; /* is this just a config verify run? */ -/* Intervals at which we flush out "message repeated" messages, - * in seconds after previous message is logged. After each flush, - * we move to the next interval until we reach the largest. - * TODO: this shall go into action object! -- rgerhards, 2008-01-29 - */ -int repeatinterval[2] = { 30, 60 }; /* # of secs before flush */ - #define LIST_DELIMITER ':' /* delimiter between two hosts */ static pid_t ppid; /* This is a quick and dirty hack used for spliting main/startup thread */ @@ -691,43 +683,6 @@ reapchild() } -/* helper to doFlushRptdMsgs() to flush the individual action links via llExecFunc - * rgerhards, 2007-08-02 - */ -DEFFUNC_llExecFunc(flushRptdMsgsActions) -{ - action_t *pAction = (action_t*) pData; - assert(pAction != NULL); - - BEGINfunc - d_pthread_mutex_lock(&pAction->mutAction); - /* TODO: time() performance: the call below could be moved to - * the beginn of the llExec(). This makes it slightly less correct, but - * in an acceptable way. -- rgerhards, 2008-09-16 - */ - if (pAction->f_prevcount && datetime.GetTime(NULL) >= REPEATTIME(pAction)) { - DBGPRINTF("flush %s: repeated %d times, %d sec.\n", - module.GetStateName(pAction->pMod), pAction->f_prevcount, - repeatinterval[pAction->f_repeatcount]); - actionWriteToAction(pAction); - BACKOFF(pAction); - } - d_pthread_mutex_unlock(&pAction->mutAction); - - ENDfunc - return RS_RET_OK; /* we ignore errors, we can not do anything either way */ -} - - -/* This method flushes repeat messages. - */ -static void -doFlushRptdMsgs(void) -{ - ruleset.IterateAllActions(runConf, flushRptdMsgsActions, NULL); -} - - static void debug_switch() { time_t tTime; @@ -1320,49 +1275,22 @@ mainloop(void) while(!bFinished){ /* this is now just a wait - please note that we do use a near-"eternal" - * timeout of 1 day if we do not have repeated message reduction turned on - * (which it is not by default). This enables us to help safe the environment + * timeout of 1 day. This enables us to help safe the environment * by not unnecessarily awaking rsyslog on a regular tick (just think * powertop, for example). In that case, we primarily wait for a signal, * but a once-a-day wakeup should be quite acceptable. -- rgerhards, 2008-06-09 */ - tvSelectTimeout.tv_sec = (runConf->globals.bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/; - //tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */ + tvSelectTimeout.tv_sec = 86400 /*1 day*/; tvSelectTimeout.tv_usec = 0; select(1, NULL, NULL, NULL, &tvSelectTimeout); if(bFinished) - break; /* exit as quickly as possible - see long comment below */ - - /* If we received a HUP signal, we call doFlushRptdMsgs() a bit early. This - * doesn't matter, because doFlushRptdMsgs() checks timestamps. What may happen, - * however, is that the too-early call may lead to a bit too-late output - * of "last message repeated n times" messages. But that is quite acceptable. - * rgerhards, 2007-12-21 - * ... and just to explain, we flush here because that is exactly what the mainloop - * shall do - provide a periodic interval in which not-yet-flushed messages will - * be flushed. Be careful, there is a potential race condition: doFlushRptdMsgs() - * needs to aquire a lock on the action objects. If, however, long-running consumers - * cause the main queue worker threads to lock them for a long time, we may receive - * a starvation condition, resulting in the mainloop being held on lock for an extended - * period of time. That, in turn, could lead to unresponsiveness to termination - * requests. It is especially important that the bFinished flag is checked before - * doFlushRptdMsgs() is called (I know because I ran into that situation). I am - * not yet sure if the remaining probability window of a termination-related - * problem is large enough to justify changing the code - I would consider it - * extremely unlikely that the problem ever occurs in practice. Fixing it would - * require not only a lot of effort but would cost considerable performance. So - * for the time being, I think the remaining risk can be accepted. - * rgerhards, 2008-01-10 - */ - if(runConf->globals.bReduceRepeatMsgs == 1) - doFlushRptdMsgs(); + break; /* exit as quickly as possible */ if(bHadHUP) { doHUP(); bHadHUP = 0; continue; } - // TODO: remove execScheduled(); /* handle Apc calls (if any) */ } ENDfunc } -- cgit v1.2.3 From 4c7578de4a7febfa70c3f7328b7d102a04cd5938 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 23 Oct 2012 14:16:30 +0200 Subject: nit: valgrind-testaid was accidentely enabled in testbench --- tests/diag.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/diag.sh b/tests/diag.sh index 0cbf7abb..ed0709da 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -5,7 +5,7 @@ # not always able to convey back states to the upper-level test driver # begun 2009-05-27 by rgerhards # This file is part of the rsyslog project, released under GPLv3 -valgrind="valgrind --malloc-fill=ff --free-fill=fe --log-fd=1" +#valgrind="valgrind --malloc-fill=ff --free-fill=fe --log-fd=1" #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #valgrind="valgrind --tool=exp-ptrcheck --log-fd=1" -- cgit v1.2.3 From 7fb43edd3b7914b8c217d11046dd7ebb7798de8c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 23 Oct 2012 15:40:23 +0200 Subject: imuxsock: use a default ratelimiter if none else applies --- plugins/imuxsock/imuxsock.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 1409c24a..1b7dface 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -135,6 +135,7 @@ typedef struct lstn_s { int flowCtl; /* flow control settings for this socket */ int ratelimitInterval; int ratelimitBurst; + ratelimit_t *dflt_ratelimiter;/*ratelimiter to apply if none else is to be used */ intTiny ratelimitSev; /* severity level (and below) for which rate-limiting shall apply */ struct hashtable *ht; /* our hashtable for rate-limiting */ sbool bParseHost; /* should parser parse host name? read-only after startup */ @@ -392,6 +393,10 @@ addListner(instanceConf_t *inst) listeners[nfd].bParseTrusted = inst->bParseTrusted; listeners[nfd].bWritePid = inst->bWritePid; listeners[nfd].bUseSysTimeStamp = inst->bUseSysTimeStamp; + CHKiRet(ratelimitNew(&listeners[nfd].dflt_ratelimiter, "imuxsock", NULL)); + ratelimitSetLinuxLike(listeners[nfd].dflt_ratelimiter, + listeners[nfd].ratelimitInterval, + listeners[nfd].ratelimitBurst); nfd++; } else { errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n", @@ -403,7 +408,7 @@ finalize_it: } -/* discard all log sockets except for "socket" 0. Data for it comes from +/* discard/Destruct all log sockets except for "socket" 0. Data for it comes from * the constant memory pool - and if not, it is freeed via some other pointer. */ static rsRetVal discardLogSockets(void) @@ -421,6 +426,7 @@ static rsRetVal discardLogSockets(void) if(listeners[i].ht != NULL) { hashtable_destroy(listeners[i].ht, 1); /* 1 => free all values automatically */ } + ratelimitDestruct(listeners[i].dflt_ratelimiter); } return RS_RET_OK; @@ -570,6 +576,8 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl) *prl = rl; finalize_it: + if(*prl == NULL) + *prl = pLstn->dflt_ratelimiter; RETiRet; } -- cgit v1.2.3 From 99d6dc01d22b1283e992cec481e4f2a03ee74340 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 23 Oct 2012 17:12:09 +0200 Subject: rate-limit rsyslog internal messages a special ratelimiter is used; it's linux-type ratelimit settings are currently hardcoded --- tools/syslogd.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tools/syslogd.c b/tools/syslogd.c index 2195e51c..d66bfb81 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -214,6 +214,7 @@ struct queuefilenames_s { static ratelimit_t *dflt_ratelimiter = NULL; /* ratelimiter for submits without explicit one */ +static ratelimit_t *internalMsg_ratelimiter = NULL; /* ratelimiter for rsyslog-own messages */ int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */ int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */ static int NoFork = 0; /* don't fork - don't run in daemon mode - read-only after startup */ @@ -482,7 +483,8 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags) /* we have the queue, so we can simply provide the * message to the queue engine. */ - submitMsgWithDfltRatelimiter(pMsg); + ratelimitAddMsg(internalMsg_ratelimiter, NULL, pMsg); + //submitMsgWithDfltRatelimiter(pMsg); } finalize_it: RETiRet; @@ -1958,8 +1960,11 @@ int realMain(int argc, char **argv) } CHKiRet(localRet); - CHKiRet(ratelimitNew(&dflt_ratelimiter, "rsyslogd", NULL)); + CHKiRet(ratelimitNew(&dflt_ratelimiter, "rsyslogd", "dflt")); /* TODO: add linux-type limiting capability */ + CHKiRet(ratelimitNew(&internalMsg_ratelimiter, "rsyslogd", "internal_messages")); + ratelimitSetLinuxLike(internalMsg_ratelimiter, 5, 500); + /* TODO: make internalMsg ratelimit settings configurable */ if(bChDirRoot) { if(chdir("/") != 0) -- cgit v1.2.3 From c18c7a0dc6b38269ee97fa66ce6327693194cb5d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 24 Oct 2012 17:58:09 +0200 Subject: nit: correct invalid error code --- runtime/glbl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/glbl.c b/runtime/glbl.c index a0997829..0e5cac20 100644 --- a/runtime/glbl.c +++ b/runtime/glbl.c @@ -210,7 +210,7 @@ setLocalHostIPIF(void __attribute__((unused)) *pVal, uchar *pNewVal) if(propLocalIPIF != NULL) { errmsg.LogError(0, RS_RET_ERR, "$LocalHostIPIF is already set " "and cannot be reset; place it at TOP OF rsyslog.conf!"); - ABORT_FINALIZE(RS_RET_ERR_WRKDIR); + ABORT_FINALIZE(RS_RET_ERR); } localRet = net.GetIFIPAddr(pNewVal, AF_UNSPEC, myIP, (int) sizeof(myIP)); -- cgit v1.2.3 From 9f74e3521fdfe0d2c68038b07ff64508a3d7d145 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 29 Oct 2012 12:08:43 +0100 Subject: remove legacy API parseAndSubmitMessage() from imrelp --- plugins/imrelp/imrelp.c | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c index fe987a50..31f82b14 100644 --- a/plugins/imrelp/imrelp.c +++ b/plugins/imrelp/imrelp.c @@ -113,11 +113,29 @@ static struct cnfparamblk inppblk = * we will only see the hostname (twice). -- rgerhards, 2009-10-14 */ static relpRetVal -onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *pMsg, size_t lenMsg) +onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *msg, size_t lenMsg) { + prop_t *pProp = NULL; + msg_t *pMsg; DEFiRet; - parseAndSubmitMessage(pHostname, pIP, pMsg, lenMsg, PARSE_HOSTNAME, - eFLOWCTL_LIGHT_DELAY, pInputName, NULL, 0, runModConf->pBindRuleset); + + CHKiRet(msgConstruct(&pMsg)); + MsgSetInputName(pMsg, pInputName); + MsgSetRawMsg(pMsg, (char*)msg, lenMsg); + MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); + MsgSetRuleset(pMsg, runModConf->pBindRuleset); + pMsg->msgFlags = PARSE_HOSTNAME | NEEDS_PARSING; + + /* TODO: optimize this, we can store it inside the session, requires + * changes to librelp --> next librelp iteration?. rgerhards, 2012-10-29 + */ + MsgSetRcvFromStr(pMsg, pHostname, ustrlen(pHostname), &pProp); + CHKiRet(prop.Destruct(&pProp)); + CHKiRet(MsgSetRcvFromIPStr(pMsg, pIP, ustrlen(pIP), &pProp)); + CHKiRet(prop.Destruct(&pProp)); + CHKiRet(submitMsg2(pMsg)); + +finalize_it: RETiRet; } -- cgit v1.2.3