diff options
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r-- | plugins/imptcp/imptcp.c | 25 |
1 files changed, 9 insertions, 16 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 8150fc33..8495b293 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -67,6 +67,7 @@ #include "ruleset.h" #include "msg.h" #include "statsobj.h" +#include "ratelimit.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -195,6 +196,7 @@ struct ptcpsrv_s { sbool bKeepAlive; /* support keep-alive packets */ sbool bEmitMsgOnClose; sbool bSuppOctetFram; + ratelimit_t *ratelimiter; }; /* the ptcp session object. Describes a single active session. @@ -295,6 +297,7 @@ destructSess(ptcpsess_t *pSess) static void destructSrv(ptcpsrv_t *pSrv) { + ratelimitDestruct(pSrv->ratelimiter); prop.Destruct(&pSrv->pInputName); pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->pszInputName); @@ -679,14 +682,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult MsgSetRuleset(pMsg, pSrv->pRuleset); STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); - if(pMultiSub == NULL) { - CHKiRet(submitMsg(pMsg)); - } else { - pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg(pMultiSub)); - } - + ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg); finalize_it: /* reset status variables */ @@ -805,12 +801,11 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG * we have just received a bunch of data! -- rgerhards, 2009-06-16 * EXTRACT from tcps_sess.c */ -#define NUM_MULTISUB 1024 static rsRetVal DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) { multi_submit_t multiSub; - msg_t *pMsgs[NUM_MULTISUB]; + msg_t *pMsgs[CONF_NUM_MULTISUB]; struct syslogTime stTime; time_t ttGenTime; char *pEnd; @@ -821,7 +816,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) datetime.getCurrTime(&stTime, &ttGenTime); multiSub.ppMsgs = pMsgs; - multiSub.maxElem = NUM_MULTISUB; + multiSub.maxElem = CONF_NUM_MULTISUB; multiSub.nElem = 0; /* We now copy the message to the session buffer. */ @@ -831,15 +826,11 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } - if(multiSub.nElem > 0) { - /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg(&multiSub)); - } + iRet = multiSubmitFlush(&multiSub); finalize_it: RETiRet; } -#undef NUM_MULTISUB /****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/ @@ -1130,6 +1121,8 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; + CHKiRet(ratelimitNew(&pSrv->ratelimiter)); + ratelimitSetThreadSafe(pSrv->ratelimiter); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; if(inst->pszBindAddr == NULL) |