diff options
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r-- | plugins/imptcp/imptcp.c | 19 |
1 files changed, 7 insertions, 12 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index f804a70e..6d2ecd2f 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -67,6 +67,7 @@ #include "ruleset.h" #include "msg.h" #include "statsobj.h" +#include "ratelimit.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -195,6 +196,7 @@ struct ptcpsrv_s { sbool bKeepAlive; /* support keep-alive packets */ sbool bEmitMsgOnClose; sbool bSuppOctetFram; + ratelimit_t *ratelimiter; }; /* the ptcp session object. Describes a single active session. @@ -295,6 +297,7 @@ destructSess(ptcpsess_t *pSess) static void destructSrv(ptcpsrv_t *pSrv) { + ratelimitDestruct(pSrv->ratelimiter); prop.Destruct(&pSrv->pInputName); pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->pszInputName); @@ -661,6 +664,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult msg_t *pMsg; ptcpsrv_t *pSrv; DEFiRet; +dbgprintf("DDDD: in imptcp doSubmitMSg()\n"); if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); @@ -679,14 +683,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult MsgSetRuleset(pMsg, pSrv->pRuleset); STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); - if(pMultiSub == NULL) { - CHKiRet(submitMsg2(pMsg, NULL)); - } else { - pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub, NULL)); - } - + multiSubmitAddMsg(pMultiSub, pMsg, pSrv->ratelimiter); finalize_it: /* reset status variables */ @@ -831,10 +828,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub)); } - if(multiSub.nElem > 0) { - /* submit anything that was not yet submitted */ - CHKiRet(multiSubmitMsg2(&multiSub, NULL)); - } + iRet = multiSubmitFlush(&multiSub); finalize_it: RETiRet; @@ -1130,6 +1124,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; + CHKiRet(ratelimitNew(&pSrv->ratelimiter)); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; if(inst->pszBindAddr == NULL) |