diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imfile/imfile.c | 22 | ||||
-rw-r--r-- | plugins/imkmsg/imkmsg.c | 2 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 25 | ||||
-rw-r--r-- | plugins/imsolaris/imsolaris.c | 2 | ||||
-rw-r--r-- | plugins/imtcp/imtcp.c | 7 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 18 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 17 |
7 files changed, 51 insertions, 42 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 453b6b05..99210b76 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -48,6 +48,7 @@ #include "prop.h" #include "stringbuf.h" #include "ruleset.h" +#include "ratelimit.h" MODULE_TYPE_INPUT /* must be present for input modules, do not remove */ MODULE_TYPE_NOKEEP @@ -82,6 +83,7 @@ typedef struct fileInfo_s { strm_t *pStrm; /* its stream (NULL if not assigned) */ int readMode; /* which mode to use in ReadMulteLine call? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + ratelimit_t *ratelimiter; multi_submit_t multiSub; } fileInfo_t; @@ -189,9 +191,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); MsgSetRuleset(pMsg, pInfo->pRuleset); - pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg; - if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem) - CHKiRet(multiSubmitMsg(&pInfo->multiSub)); + ratelimitAddMsg(pInfo->ratelimiter, &pInfo->multiSub, pMsg); finalize_it: RETiRet; } @@ -304,18 +304,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) } finalize_it: - if(pThis->multiSub.nElem > 0) { - /* submit everything that was not yet submitted */ - CHKiRet(multiSubmitMsg(&pThis->multiSub)); - } - ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ - /* Note: the problem above is that pthread:cleanup_pop() is a macro which - * evaluates to something like "} while(0);". So the code would become - * "finalize_it: }", that is a label without a statement. The C standard does - * not permit this. So we add an empty statement "finalize_it: ; }" and - * everybody is happy. Note that without the ;, an error is reported only - * on some platforms/compiler versions. -- rgerhards, 2008-08-15 - */ + multiSubmitFlush(&pThis->multiSub); pthread_cleanup_pop(0); if(pCStr != NULL) { @@ -423,6 +412,7 @@ addListner(instanceConf_t *inst) pThis->lenTag = ustrlen(pThis->pszTag); pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile); + CHKiRet(ratelimitNew(&pThis->ratelimiter)); CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*))); pThis->multiSub.maxElem = inst->nMultiSub; pThis->multiSub.nElem = 0; @@ -773,6 +763,8 @@ CODESTARTafterRun persistStrmState(&files[i]); strm.Destruct(&(files[i].pStrm)); } + ratelimitDestruct(files[i].ratelimiter); + free(files[i].multiSub.ppMsgs); free(files[i].pszFileName); free(files[i].pszTag); free(files[i].pszStateFile); diff --git a/plugins/imkmsg/imkmsg.c b/plugins/imkmsg/imkmsg.c index 2a97f82d..d1a83879 100644 --- a/plugins/imkmsg/imkmsg.c +++ b/plugins/imkmsg/imkmsg.c @@ -113,7 +113,7 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval * pMsg->iFacility = iFacility; pMsg->iSeverity = iSeverity; pMsg->json = json; - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg(pMsg, NULL)); finalize_it: RETiRet; diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 8150fc33..8495b293 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -67,6 +67,7 @@ #include "ruleset.h" #include "msg.h" #include "statsobj.h" +#include "ratelimit.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -195,6 +196,7 @@ struct ptcpsrv_s { sbool bKeepAlive; /* support keep-alive packets */ sbool bEmitMsgOnClose; sbool bSuppOctetFram; + ratelimit_t *ratelimiter; }; /* the ptcp session object. Describes a single active session. @@ -295,6 +297,7 @@ destructSess(ptcpsess_t *pSess) static void destructSrv(ptcpsrv_t *pSrv) { + ratelimitDestruct(pSrv->ratelimiter); prop.Destruct(&pSrv->pInputName); pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->pszInputName); @@ -679,14 +682,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult MsgSetRuleset(pMsg, pSrv->pRuleset); STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); - if(pMultiSub == NULL) { - CHKiRet(submitMsg(pMsg)); - } else { - pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg(pMultiSub)); - } - + ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg); finalize_it: /* reset status variables */ @@ -805,12 +801,11 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG * we have just received a bunch of data! -- rgerhards, 2009-06-16 * EXTRACT from tcps_sess.c */ -#define NUM_MULTISUB 1024 static rsRetVal DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) { multi_submit_t multiSub; - msg_t *pMsgs[NUM_MULTISUB]; + msg_t *pMsgs[CONF_NUM_MULTISUB]; struct syslogTime stTime; time_t ttGenTime; char *pEnd; @@ -821,7 +816,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) datetime.getCurrTime(&stTime, &ttGenTime); multiSub.ppMsgs = pMsgs; - multiSub.maxElem = NUM_MULTISUB; + multiSub.maxElem = CONF_NUM_MULTISUB; multiSub.nElem = 0; /* We now copy the message to the session buffer. */ @@ -831,15 +826,11 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } - if(multiSub.nElem > 0) { - /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg(&multiSub)); - } + iRet = multiSubmitFlush(&multiSub); finalize_it: RETiRet; } -#undef NUM_MULTISUB /****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/ @@ -1130,6 +1121,8 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; + CHKiRet(ratelimitNew(&pSrv->ratelimiter)); + ratelimitSetThreadSafe(pSrv->ratelimiter); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; if(inst->pszBindAddr == NULL) diff --git a/plugins/imsolaris/imsolaris.c b/plugins/imsolaris/imsolaris.c index a220e72a..1e7d9b0f 100644 --- a/plugins/imsolaris/imsolaris.c +++ b/plugins/imsolaris/imsolaris.c @@ -212,7 +212,7 @@ readLog(int fd, uchar *pRcv, int iMaxLine) pMsg->iFacility = LOG_FAC(hdr.pri); pMsg->iSeverity = LOG_PRI(hdr.pri); pMsg->msgFlags = NEEDS_PARSING | NO_PRI_IN_RAW; - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg(pMsg, NULL)); } finalize_it: diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index eaf9a213..beb7d705 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -580,7 +580,9 @@ ENDwillRun BEGINafterRun CODESTARTafterRun - /* do cleanup here */ + if(pOurTcpsrv != NULL) + iRet = tcpsrv.Destruct(&pOurTcpsrv); + net.clearAllowedSenders(UCHAR_CONSTANT("TCP")); ENDafterRun @@ -594,9 +596,6 @@ ENDisCompatibleWithFeature BEGINmodExit CODESTARTmodExit - if(pOurTcpsrv != NULL) - iRet = tcpsrv.Destruct(&pOurTcpsrv); - if(pPermPeersRoot != NULL) { net.DestructPermittedPeers(&pPermPeersRoot); } diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0dda30ec..7f9e00dc 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -4,8 +4,6 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) - * * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. @@ -53,6 +51,7 @@ #include "prop.h" #include "ruleset.h" #include "statsobj.h" +#include "ratelimit.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -77,6 +76,7 @@ static struct lstn_s { int sock; /* socket */ ruleset_t *pRuleset; /* bound ruleset */ statsobj_t *stats; /* listener stats */ + ratelimit_t *ratelimiter; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; @@ -248,6 +248,7 @@ addListner(instanceConf_t *inst) newlcnfinfo->next = NULL; newlcnfinfo->sock = newSocks[iSrc]; newlcnfinfo->pRuleset = inst->pBindRuleset; + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter)); /* support statistics gathering */ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); @@ -304,7 +305,6 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta static inline rsRetVal processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { - DEFiRet; int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; @@ -314,9 +314,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f msg_t *pMsg; prop_t *propFromHost = NULL; prop_t *propFromHostIP = NULL; + multi_submit_t multiSub; + msg_t *pMsgs[CONF_NUM_MULTISUB]; char errStr[1024]; + DEFiRet; assert(pThrd != NULL); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = CONF_NUM_MULTISUB; + multiSub.nElem = 0; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) @@ -383,12 +389,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg)); STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } + finalize_it: + multiSubmitFlush(&multiSub); + if(propFromHost != NULL) prop.Destruct(&propFromHost); if(propFromHostIP != NULL) @@ -884,6 +893,7 @@ CODESTARTafterRun net.clearAllowedSenders((uchar*)"UDP"); for(lstn = lcnfRoot ; lstn != NULL ; ) { statsobj.Destruct(&(lstn->stats)); + ratelimitDestruct(lstn->ratelimiter); close(lstn->sock); lstnDel = lstn; lstn = lstn->next; diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index a4933115..3421863d 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -55,6 +55,7 @@ #include "statsobj.h" #include "datetime.h" #include "hashtable.h" +#include "ratelimit.h" MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP @@ -810,7 +811,17 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim int toffs; /* offset for trusted properties */ struct syslogTime dummyTS; struct json_object *json = NULL, *jval; + msg_t *repMsg; DEFiRet; +#warning experimental code needs to be made production-ready! +/* we need to decide how many ratelimiters we use --> hashtable + also remove current homegrown ratelimiting functionality and + replace it with the new one. + */ +rsRetVal localRet; +static ratelimit_t *ratelimit = NULL; +if(ratelimit == NULL) + ratelimitNew(&ratelimit); /* TODO: handle format errors?? */ /* we need to parse the pri first, because we need the severity for @@ -982,7 +993,11 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - CHKiRet(submitMsg(pMsg)); + localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); + if(repMsg != NULL) + CHKiRet(submitMsg(repMsg)); + if(localRet == RS_RET_OK) + CHKiRet(submitMsg(pMsg)); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: |