summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imdiag/imdiag.c10
-rw-r--r--plugins/imfile/imfile.c22
-rw-r--r--plugins/imkmsg/imkmsg.c2
-rw-r--r--plugins/imptcp/imptcp.c36
-rw-r--r--plugins/imsolaris/imsolaris.c2
-rw-r--r--plugins/imtcp/imtcp.c20
-rw-r--r--plugins/imudp/imudp.c40
-rw-r--r--plugins/imuxsock/imuxsock.c121
-rw-r--r--plugins/omruleset/omruleset.c6
9 files changed, 104 insertions, 155 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 09742537..15948215 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -53,6 +53,7 @@
#include "srUtils.h"
#include "msg.h"
#include "datetime.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
MODULE_TYPE_INPUT
@@ -200,7 +201,7 @@ finalize_it:
/* actually submit a message to the rsyslog core
*/
static rsRetVal
-doInjectMsg(int iNum)
+doInjectMsg(int iNum, ratelimit_t *ratelimiter)
{
uchar szMsg[1024];
msg_t *pMsg;
@@ -220,7 +221,7 @@ doInjectMsg(int iNum)
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
MsgSetRcvFrom(pMsg, pRcvDummy);
CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg));
finalize_it:
RETiRet;
@@ -238,6 +239,7 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
int iFrom;
int nMsgs;
int i;
+ ratelimit_t *ratelimit;
DEFiRet;
/* we do not check errors here! */
@@ -245,13 +247,15 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
iFrom = atoi((char*)wordBuf);
getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE);
nMsgs = atoi((char*)wordBuf);
+ ratelimitNew(&ratelimit, "imdiag", "injectmsg");
for(i = 0 ; i < nMsgs ; ++i) {
- doInjectMsg(i + iFrom);
+ doInjectMsg(i + iFrom, ratelimit);
}
CHKiRet(sendResponse(pSess, "%d messages injected\n", nMsgs));
DBGPRINTF("imdiag: %d messages injected\n", nMsgs);
+ ratelimitDestruct(ratelimit);
finalize_it:
RETiRet;
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 453b6b05..d50f917e 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -48,6 +48,7 @@
#include "prop.h"
#include "stringbuf.h"
#include "ruleset.h"
+#include "ratelimit.h"
MODULE_TYPE_INPUT /* must be present for input modules, do not remove */
MODULE_TYPE_NOKEEP
@@ -82,6 +83,7 @@ typedef struct fileInfo_s {
strm_t *pStrm; /* its stream (NULL if not assigned) */
int readMode; /* which mode to use in ReadMulteLine call? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ ratelimit_t *ratelimiter;
multi_submit_t multiSub;
} fileInfo_t;
@@ -189,9 +191,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
MsgSetRuleset(pMsg, pInfo->pRuleset);
- pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg;
- if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem)
- CHKiRet(multiSubmitMsg(&pInfo->multiSub));
+ ratelimitAddMsg(pInfo->ratelimiter, &pInfo->multiSub, pMsg);
finalize_it:
RETiRet;
}
@@ -304,18 +304,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
finalize_it:
- if(pThis->multiSub.nElem > 0) {
- /* submit everything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&pThis->multiSub));
- }
- ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */
- /* Note: the problem above is that pthread:cleanup_pop() is a macro which
- * evaluates to something like "} while(0);". So the code would become
- * "finalize_it: }", that is a label without a statement. The C standard does
- * not permit this. So we add an empty statement "finalize_it: ; }" and
- * everybody is happy. Note that without the ;, an error is reported only
- * on some platforms/compiler versions. -- rgerhards, 2008-08-15
- */
+ multiSubmitFlush(&pThis->multiSub);
pthread_cleanup_pop(0);
if(pCStr != NULL) {
@@ -423,6 +412,7 @@ addListner(instanceConf_t *inst)
pThis->lenTag = ustrlen(pThis->pszTag);
pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile);
+ CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", (char*)inst->pszFileName));
CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*)));
pThis->multiSub.maxElem = inst->nMultiSub;
pThis->multiSub.nElem = 0;
@@ -773,6 +763,8 @@ CODESTARTafterRun
persistStrmState(&files[i]);
strm.Destruct(&(files[i].pStrm));
}
+ ratelimitDestruct(files[i].ratelimiter);
+ free(files[i].multiSub.ppMsgs);
free(files[i].pszFileName);
free(files[i].pszTag);
free(files[i].pszStateFile);
diff --git a/plugins/imkmsg/imkmsg.c b/plugins/imkmsg/imkmsg.c
index 2a97f82d..d1a83879 100644
--- a/plugins/imkmsg/imkmsg.c
+++ b/plugins/imkmsg/imkmsg.c
@@ -113,7 +113,7 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *
pMsg->iFacility = iFacility;
pMsg->iSeverity = iSeverity;
pMsg->json = json;
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg(pMsg, NULL));
finalize_it:
RETiRet;
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 8150fc33..0475e219 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -67,6 +67,7 @@
#include "ruleset.h"
#include "msg.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */
@@ -121,6 +122,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
};
@@ -158,6 +161,8 @@ static struct cnfparamdescr inppdescr[] = {
{ "keepalive.time", eCmdHdlrInt, 0 },
{ "keepalive.interval", eCmdHdlrInt, 0 },
{ "addtlframedelimiter", eCmdHdlrInt, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -195,6 +200,7 @@ struct ptcpsrv_s {
sbool bKeepAlive; /* support keep-alive packets */
sbool bEmitMsgOnClose;
sbool bSuppOctetFram;
+ ratelimit_t *ratelimiter;
};
/* the ptcp session object. Describes a single active session.
@@ -295,6 +301,7 @@ destructSess(ptcpsess_t *pSess)
static void
destructSrv(ptcpsrv_t *pSrv)
{
+ ratelimitDestruct(pSrv->ratelimiter);
prop.Destruct(&pSrv->pInputName);
pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->pszInputName);
@@ -679,14 +686,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult
MsgSetRuleset(pMsg, pSrv->pRuleset);
STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
- if(pMultiSub == NULL) {
- CHKiRet(submitMsg(pMsg));
- } else {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg(pMultiSub));
- }
-
+ ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg);
finalize_it:
/* reset status variables */
@@ -805,12 +805,11 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
* we have just received a bunch of data! -- rgerhards, 2009-06-16
* EXTRACT from tcps_sess.c
*/
-#define NUM_MULTISUB 1024
static rsRetVal
DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
{
multi_submit_t multiSub;
- msg_t *pMsgs[NUM_MULTISUB];
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
struct syslogTime stTime;
time_t ttGenTime;
char *pEnd;
@@ -821,7 +820,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
datetime.getCurrTime(&stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
- multiSub.maxElem = NUM_MULTISUB;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
/* We now copy the message to the session buffer. */
@@ -831,15 +830,11 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
- if(multiSub.nElem > 0) {
- /* submit anything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&multiSub));
- }
+ iRet = multiSubmitFlush(&multiSub);
finalize_it:
RETiRet;
}
-#undef NUM_MULTISUB
/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
@@ -1051,6 +1046,8 @@ createInstance(instanceConf_t **pinst)
inst->bEmitMsgOnClose = 0;
inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
inst->pBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -1130,6 +1127,9 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort));
+ ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst);
+ ratelimitSetThreadSafe(pSrv->ratelimiter);
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
if(inst->pszBindAddr == NULL)
@@ -1458,6 +1458,10 @@ CODESTARTnewInpInst
inst->iAddtlFrameDelim = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) {
inst->bEmitMsgOnClose = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imptcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
diff --git a/plugins/imsolaris/imsolaris.c b/plugins/imsolaris/imsolaris.c
index a220e72a..1e7d9b0f 100644
--- a/plugins/imsolaris/imsolaris.c
+++ b/plugins/imsolaris/imsolaris.c
@@ -212,7 +212,7 @@ readLog(int fd, uchar *pRcv, int iMaxLine)
pMsg->iFacility = LOG_FAC(hdr.pri);
pMsg->iSeverity = LOG_PRI(hdr.pri);
pMsg->msgFlags = NEEDS_PARSING | NO_PRI_IN_RAW;
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg(pMsg, NULL));
}
finalize_it:
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index eaf9a213..8d71d5f2 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -105,6 +105,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
+ int ratelimitInterval;
+ int ratelimitBurst;
int bSuppOctetFram;
struct instanceConf_s *next;
};
@@ -155,7 +157,9 @@ static struct cnfparamdescr inppdescr[] = {
{ "port", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
{ "name", eCmdHdlrString, 0 },
{ "ruleset", eCmdHdlrString, 0 },
- { "supportOctetCountedFraming", eCmdHdlrBinary, 0 }
+ { "supportOctetCountedFraming", eCmdHdlrBinary, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -251,6 +255,8 @@ createInstance(instanceConf_t **pinst)
inst->pszBindRuleset = NULL;
inst->pszInputName = NULL;
inst->bSuppOctetFram = 1;
+ inst->ratelimitInterval = 0;
+ inst->ratelimitBurst = 10000;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -334,6 +340,7 @@ addListner(modConfData_t *modConf, instanceConf_t *inst)
CHKiRet(tcpsrv.SetRuleset(pOurTcpsrv, inst->pBindRuleset));
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, inst->pszInputName == NULL ?
UCHAR_CONSTANT("imtcp") : inst->pszInputName));
+ CHKiRet(tcpsrv.SetLinuxLikeRatelimiters(pOurTcpsrv, inst->ratelimitInterval, inst->ratelimitBurst));
tcpsrv.configureTCPListen(pOurTcpsrv, inst->pszBindPort, inst->bSuppOctetFram);
finalize_it:
@@ -376,6 +383,10 @@ CODESTARTnewInpInst
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "supportOctetCountedFraming")) {
inst->bSuppOctetFram = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imtcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -580,7 +591,9 @@ ENDwillRun
BEGINafterRun
CODESTARTafterRun
- /* do cleanup here */
+ if(pOurTcpsrv != NULL)
+ iRet = tcpsrv.Destruct(&pOurTcpsrv);
+
net.clearAllowedSenders(UCHAR_CONSTANT("TCP"));
ENDafterRun
@@ -594,9 +607,6 @@ ENDisCompatibleWithFeature
BEGINmodExit
CODESTARTmodExit
- if(pOurTcpsrv != NULL)
- iRet = tcpsrv.Destruct(&pOurTcpsrv);
-
if(pPermPeersRoot != NULL) {
net.DestructPermittedPeers(&pPermPeersRoot);
}
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 782d7bee..9b6409c1 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -4,8 +4,6 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
- *
* Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
@@ -53,6 +51,7 @@
#include "prop.h"
#include "ruleset.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -77,6 +76,7 @@ static struct lstn_s {
int sock; /* socket */
ruleset_t *pRuleset; /* bound ruleset */
statsobj_t *stats; /* listener stats */
+ ratelimit_t *ratelimiter;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
} *lcnfRoot = NULL, *lcnfLast = NULL;
@@ -109,6 +109,8 @@ struct instanceConf_s {
uchar *pszBindPort; /* Port to bind socket to */
uchar *pszBindRuleset; /* name of ruleset to bind to */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
};
@@ -140,7 +142,9 @@ static struct cnfparamblk modpblk =
static struct cnfparamdescr inppdescr[] = {
{ "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
{ "address", eCmdHdlrString, 0 },
- { "ruleset", eCmdHdlrString, 0 }
+ { "ruleset", eCmdHdlrString, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -165,6 +169,8 @@ createInstance(instanceConf_t **pinst)
inst->pszBindPort = NULL;
inst->pszBindAddr = NULL;
inst->pszBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -223,7 +229,7 @@ addListner(instanceConf_t *inst)
struct lstn_s *newlcnfinfo;
uchar *bindName;
uchar *port;
- uchar statname[64];
+ uchar dispname[64];
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -248,11 +254,14 @@ addListner(instanceConf_t *inst)
newlcnfinfo->next = NULL;
newlcnfinfo->sock = newSocks[iSrc];
newlcnfinfo->pRuleset = inst->pBindRuleset;
+ snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port);
+ dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL));
+ ratelimitSetLinuxLike(newlcnfinfo->ratelimiter, inst->ratelimitInterval,
+ inst->ratelimitBurst);
/* support statistics gathering */
CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
- snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
- statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
- CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname));
STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
@@ -304,7 +313,6 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta
static inline rsRetVal
processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
- DEFiRet;
int iNbrTimeUsed;
time_t ttGenTime;
struct syslogTime stTime;
@@ -314,9 +322,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
msg_t *pMsg;
prop_t *propFromHost = NULL;
prop_t *propFromHostIP = NULL;
+ multi_submit_t multiSub;
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
char errStr[1024];
+ DEFiRet;
assert(pThrd != NULL);
+ multiSub.ppMsgs = pMsgs;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
+ multiSub.nElem = 0;
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a bad receive, done below in the body */
if(pThrd->bShallStop == RSTRUE)
@@ -383,12 +397,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
if(*pbIsPermitted == 2)
pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg));
STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit);
}
}
+
finalize_it:
+ multiSubmitFlush(&multiSub);
+
if(propFromHost != NULL)
prop.Destruct(&propFromHost);
if(propFromHostIP != NULL)
@@ -682,6 +699,10 @@ createListner(es_str_t *port, struct cnfparamvals *pvals)
inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "ruleset")) {
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imudp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -884,6 +905,7 @@ CODESTARTafterRun
net.clearAllowedSenders((uchar*)"UDP");
for(lstn = lcnfRoot ; lstn != NULL ; ) {
statsobj.Destruct(&(lstn->stats));
+ ratelimitDestruct(lstn->ratelimiter);
close(lstn->sock);
lstnDel = lstn;
lstn = lstn->next;
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index d5e4bb31..1409c24a 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -55,6 +55,7 @@
#include "statsobj.h"
#include "datetime.h"
#include "hashtable.h"
+#include "ratelimit.h"
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
@@ -105,15 +106,6 @@ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit)
STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters)
-struct rs_ratelimit_state {
- unsigned short interval;
- unsigned short burst;
- unsigned done;
- unsigned missed;
- time_t begin;
-};
-typedef struct rs_ratelimit_state rs_ratelimit_state_t;
-
/* a very simple "hash function" for process IDs - we simply use the
* pid itself: it is quite expected that all pids may log some time, but
@@ -271,74 +263,9 @@ static struct cnfparamblk inppblk =
/* we do not use this, because we do not bind to a ruleset so far
* enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */
-static void
-initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst)
-{
- rs->interval = interval;
- rs->burst = burst;
- rs->done = 0;
- rs->missed = 0;
- rs->begin = 0;
-}
-
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
-/* ratelimiting support, modelled after the linux kernel
- * returns 1 if message is within rate limit and shall be
- * processed, 0 otherwise.
- * This implementation is NOT THREAD-SAFE and must not
- * be called concurrently.
- */
-static inline int
-withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid)
-{
- int ret;
- uchar msgbuf[1024];
-
- if(rs->interval == 0) {
- ret = 1;
- goto finalize_it;
- }
-
- assert(rs->burst != 0);
-
- if(rs->begin == 0)
- rs->begin = tt;
-
- /* resume if we go out of out time window */
- if(tt > rs->begin + rs->interval) {
- if(rs->missed) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock lost %u messages from pid %lu due to rate-limiting",
- rs->missed, (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- rs->missed = 0;
- }
- rs->begin = 0;
- rs->done = 0;
- }
-
- /* do actual limit check */
- if(rs->burst > rs->done) {
- rs->done++;
- ret = 1;
- } else {
- if(rs->missed == 0) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock begins to drop messages from pid %lu due to rate-limiting",
- (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- }
- rs->missed++;
- ret = 0;
- }
-
-finalize_it:
- return ret;
-}
-
-
/* create input instance, set default paramters, and
* add it to the list of instances.
*/
@@ -445,7 +372,8 @@ addListner(instanceConf_t *inst)
CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
}
if(inst->ratelimitInterval > 0) {
- if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn,
+ (void(*)(void*))ratelimitDestruct)) == NULL) {
/* in this case, we simply turn off rate-limiting */
DBGPRINTF("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
@@ -604,19 +532,22 @@ finalize_it:
* listener (the latter being a performance enhancement).
*/
static inline rsRetVal
-findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
+findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl)
{
- rs_ratelimit_state_t *rl;
+ ratelimit_t *rl;
int r;
pid_t *keybuf;
+ char pidbuf[256];
DEFiRet;
if(cred == NULL)
FINALIZE;
+#if 0 // TODO: check deactivated?
if(pLstn->ratelimitInterval == 0) {
*prl = NULL;
FINALIZE;
}
+#endif
rl = hashtable_search(pLstn->ht, &cred->pid);
if(rl == NULL) {
@@ -624,10 +555,13 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n",
(unsigned long) cred->pid);
STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters);
- CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t)));
+ snprintf(pidbuf, sizeof(pidbuf), "pid %lu",
+ (unsigned long) cred->pid);
+ pidbuf[sizeof(pidbuf)-1] = '\0'; /* to be on safe side */
+ CHKiRet(ratelimitNew(&rl, "imuxsock", pidbuf));
+ ratelimitSetLinuxLike(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
CHKmalloc(keybuf = malloc(sizeof(pid_t)));
*keybuf = cred->pid;
- initRatelimitState(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
r = hashtable_insert(pLstn->ht, keybuf, rl);
if(r == 0)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -762,28 +696,6 @@ copyescaped(uchar *dstbuf, uchar *inbuf, int inlen)
}
-#if 0
-/* Creates new field to be added to event
- * used for SystemLogParseTrusted parsing
- */
-struct ee_field *
-createNewField(char *fieldname, char *value, int lenValue) {
- es_str_t *newStr;
- struct ee_value *newVal;
- struct ee_field *newField;
-
- newStr = es_newStrFromBuf(value, (es_size_t) lenValue);
-
- newVal = ee_newValue(ctxee);
- ee_setStrValue(newVal, newStr);
-
- newField = ee_newFieldFromNV(ctxee, fieldname, newVal);
-
- return newField;
-}
-#endif
-
-
/* submit received message to the queue engine
* We now parse the message according to expected format so that we
* can also mangle it if necessary.
@@ -802,8 +714,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
uchar bufParseTAG[CONF_TAG_MAXSIZE];
struct syslogTime st;
time_t tt;
- rs_ratelimit_state_t *ratelimiter = NULL;
int lenProp;
+ ratelimit_t *ratelimiter = NULL;
uchar propBuf[1024];
uchar msgbuf[8192];
uchar *pmsgbuf;
@@ -842,10 +754,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
tt = ts->tv_sec;
}
+#if 0 // TODO: think about stats counters (or wait for request...?)
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
FINALIZE;
}
+#endif
/* created trusted properties */
if(cred != NULL && pLstn->bAnnotate) {
@@ -976,8 +890,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName);
CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
- CHKiRet(submitMsg(pMsg));
-
+ ratelimitAddMsg(ratelimiter, NULL, pMsg);
STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
finalize_it:
RETiRet;
diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c
index 6c770c94..fd002265 100644
--- a/plugins/omruleset/omruleset.c
+++ b/plugins/omruleset/omruleset.c
@@ -120,7 +120,11 @@ CODESTARTdoAction
(char*) pData->pszRulesetName, pData->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
MsgSetRuleset(pMsg, pData->pRuleset);
- submitMsg(pMsg);
+ /* Note: we intentionally use submitMsg2() here, as we process messages
+ * that were already run through the rate-limiter. So it is (at least)
+ * questionable if they were rate-limited again.
+ */
+ submitMsg2(pMsg);
finalize_it:
ENDdoAction