summaryrefslogtreecommitdiffstats
path: root/plugins/imptcp/imptcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r--plugins/imptcp/imptcp.c36
1 files changed, 20 insertions, 16 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 9888086f..5c8bb67a 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -67,6 +67,7 @@
#include "ruleset.h"
#include "msg.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */
@@ -121,6 +122,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
};
@@ -158,6 +161,8 @@ static struct cnfparamdescr inppdescr[] = {
{ "keepalive.time", eCmdHdlrInt, 0 },
{ "keepalive.interval", eCmdHdlrInt, 0 },
{ "addtlframedelimiter", eCmdHdlrInt, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -195,6 +200,7 @@ struct ptcpsrv_s {
sbool bKeepAlive; /* support keep-alive packets */
sbool bEmitMsgOnClose;
sbool bSuppOctetFram;
+ ratelimit_t *ratelimiter;
};
/* the ptcp session object. Describes a single active session.
@@ -295,6 +301,7 @@ destructSess(ptcpsess_t *pSess)
static void
destructSrv(ptcpsrv_t *pSrv)
{
+ ratelimitDestruct(pSrv->ratelimiter);
prop.Destruct(&pSrv->pInputName);
pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->pszInputName);
@@ -679,14 +686,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult
MsgSetRuleset(pMsg, pSrv->pRuleset);
STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
- if(pMultiSub == NULL) {
- CHKiRet(submitMsg(pMsg));
- } else {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg(pMultiSub));
- }
-
+ ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg);
finalize_it:
/* reset status variables */
@@ -805,12 +805,11 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
* we have just received a bunch of data! -- rgerhards, 2009-06-16
* EXTRACT from tcps_sess.c
*/
-#define NUM_MULTISUB 1024
static rsRetVal
DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
{
multi_submit_t multiSub;
- msg_t *pMsgs[NUM_MULTISUB];
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
struct syslogTime stTime;
time_t ttGenTime;
char *pEnd;
@@ -821,7 +820,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
datetime.getCurrTime(&stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
- multiSub.maxElem = NUM_MULTISUB;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
/* We now copy the message to the session buffer. */
@@ -831,15 +830,11 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
- if(multiSub.nElem > 0) {
- /* submit anything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&multiSub));
- }
+ iRet = multiSubmitFlush(&multiSub);
finalize_it:
RETiRet;
}
-#undef NUM_MULTISUB
/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
@@ -1051,6 +1046,8 @@ createInstance(instanceConf_t **pinst)
inst->bEmitMsgOnClose = 0;
inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
inst->pBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -1130,6 +1127,9 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort));
+ ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst);
+ ratelimitSetThreadSafe(pSrv->ratelimiter);
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
if(inst->pszBindAddr == NULL)
@@ -1458,6 +1458,10 @@ CODESTARTnewInpInst
inst->iAddtlFrameDelim = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) {
inst->bEmitMsgOnClose = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imptcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);