summaryrefslogtreecommitdiffstats
path: root/plugins/imudp
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imudp')
-rw-r--r--plugins/imudp/imudp.c77
1 files changed, 43 insertions, 34 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 95f9a9a5..8753bae7 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -85,10 +85,6 @@ static struct lstn_s {
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
} *lcnfRoot = NULL, *lcnfLast = NULL;
-# ifdef HAVE_RECVMMSG
-static struct mmsghdr *recvmsg_mmh;
-static struct iovec *recvmsg_iov;
-#endif
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
@@ -97,11 +93,6 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte
* This shall prevent remote DoS when the "discard on disallowed sender"
* message is configured to be logged on occurance of such a case.
*/
-static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc
- * it so that we can check available memory in willRun() and request
- * termination if we can not get it. -- rgerhards, 2007-12-27
- */
-
#define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */
#define TIME_REQUERY_DFLT 2
#define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */
@@ -139,6 +130,11 @@ static struct wrkrInfo_s {
STATSCOUNTER_DEF(ctrCall_recvmmsg, mutCtrCall_recvmmsg)
STATSCOUNTER_DEF(ctrCall_recvmsg, mutCtrCall_recvmsg)
STATSCOUNTER_DEF(ctrMsgsRcvd, mutCtrMsgsRcvd)
+ uchar *pRcvBuf; /* receive buffer (for a single packet) */
+# ifdef HAVE_RECVMMSG
+ struct mmsghdr *recvmsg_mmh;
+ struct iovec *recvmsg_iov;
+# endif
} wrkrInfo[MAX_WRKR_THREADS];
struct modConfData_s {
@@ -453,26 +449,26 @@ processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_sto
while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */
if(pWrkr->pThrd->bShallStop == RSTRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec));
- memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr));
+ memset(pWrkr->recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec));
+ memset(pWrkr->recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr));
for(i = 0 ; i < runModConf->batchSize ; ++i) {
- recvmsg_iov[i].iov_base = pRcvBuf+(i*(iMaxLine+1));
- recvmsg_iov[i].iov_len = iMaxLine;
- recvmsg_mmh[i].msg_hdr.msg_name = &frominet;
- recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
- recvmsg_mmh[i].msg_hdr.msg_iov = &(recvmsg_iov[i]);
- recvmsg_mmh[i].msg_hdr.msg_iovlen = 1;
+ pWrkr->recvmsg_iov[i].iov_base = pWrkr->pRcvBuf+(i*(iMaxLine+1));
+ pWrkr->recvmsg_iov[i].iov_len = iMaxLine;
+ pWrkr->recvmsg_mmh[i].msg_hdr.msg_name = &frominet;
+ pWrkr->recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ pWrkr->recvmsg_mmh[i].msg_hdr.msg_iov = &(pWrkr->recvmsg_iov[i]);
+ pWrkr->recvmsg_mmh[i].msg_hdr.msg_iovlen = 1;
}
- nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL);
+ nelem = recvmmsg(lstn->sock, pWrkr->recvmsg_mmh, runModConf->batchSize, 0, NULL);
STATSCOUNTER_INC(pWrkr->ctrCall_recvmmsg, pWrkr->mutCtrCall_recvmmsg);
-dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr)));
+ DBGPRINTF("imudp: recvmmsg returned %d\n", nelem);
if(nelem < 0 && errno == ENOSYS) {
/* be careful: some versions of valgrind do not support recvmmsg()! */
DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n");
- nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0);
+ nelem = recvmsg(lstn->sock, &(pWrkr->recvmsg_mmh[0].msg_hdr), 0);
STATSCOUNTER_INC(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg);
if(nelem >= 0) {
- recvmsg_mmh[0].msg_len = nelem;
+ pWrkr->recvmsg_mmh[0].msg_len = nelem;
nelem = 1;
}
}
@@ -491,9 +487,9 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr,
pWrkr->ctrMsgsRcvd += nelem;
for(i = 0 ; i < nelem ; ++i) {
- processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base,
- recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet,
- recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub);
+ processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pWrkr->recvmsg_mmh[i].msg_hdr.msg_iov->iov_base,
+ pWrkr->recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet,
+ pWrkr->recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub);
}
}
@@ -542,7 +538,7 @@ processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_sto
if(pWrkr->pThrd->bShallStop == RSTRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
memset(iov, 0, sizeof(iov));
- iov[0].iov_base = pRcvBuf;
+ iov[0].iov_base = pWrkr->pRcvBuf;
iov[0].iov_len = iMaxLine;
memset(&mh, 0, sizeof(mh));
mh.msg_name = &frominet;
@@ -565,7 +561,7 @@ processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_sto
datetime.getCurrTime(&stTime, &ttGenTime);
}
- CHKiRet(processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime,
+ CHKiRet(processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pWrkr->pRcvBuf, lenRcvBuf, &stTime,
ttGenTime, &frominet, mh.msg_namelen, &multiSub));
}
@@ -1040,6 +1036,7 @@ ENDactivateCnfPrePrivDrop
BEGINactivateCnf
+ int i;
int lenRcvBuf;
CODESTARTactivateCnf
/* caching various settings */
@@ -1047,11 +1044,15 @@ CODESTARTactivateCnf
lenRcvBuf = (iMaxLine + 1) * sizeof(char);
# ifdef HAVE_RECVMMSG
lenRcvBuf *= runModConf->batchSize;
- CHKmalloc(recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec)));
- CHKmalloc(recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr)));
# endif
-dbgprintf("DDDD: sizing for batch size %d\n", runModConf->batchSize);
- CHKmalloc(pRcvBuf = MALLOC(lenRcvBuf));
+ for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
+# ifdef HAVE_RECVMMSG
+ CHKmalloc(wrkrInfo[i].recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec)));
+ CHKmalloc(wrkrInfo[i].recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr)));
+# endif
+ CHKmalloc(wrkrInfo[i].pRcvBuf = MALLOC(lenRcvBuf));
+ wrkrInfo[i].id = i;
+ }
finalize_it:
ENDactivateCnf
@@ -1132,7 +1133,6 @@ CODESTARTrunInput
pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024);
for(i = 0 ; i < runModConf->wrkrMax - 1 ; ++i) {
wrkrInfo[i].pThrd = pThrd;
- wrkrInfo[i].id = i;
pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i]));
}
pthread_attr_destroy(&wrkrThrdAttr);
@@ -1157,6 +1157,7 @@ ENDwillRun
BEGINafterRun
struct lstn_s *lstn, *lstnDel;
+ int i;
CODESTARTafterRun
/* do cleanup here */
net.clearAllowedSenders((uchar*)"UDP");
@@ -1170,9 +1171,12 @@ CODESTARTafterRun
free(lstnDel);
}
lcnfRoot = lcnfLast = NULL;
- if(pRcvBuf != NULL) {
- free(pRcvBuf);
- pRcvBuf = NULL;
+ for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
+# ifdef HAVE_RECVMMSG
+ free(wrkrInfo[i].recvmsg_iov);
+ free(wrkrInfo[i].recvmsg_mmh);
+# endif
+ free(wrkrInfo[i].pRcvBuf);
}
ENDafterRun
@@ -1233,6 +1237,11 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(ruleset, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
+ DBGPRINTF("imudp: version %s initializing\n", VERSION);
+# ifdef HAVE_RECVMMSG
+ DBGPRINTF("imdup: support for recvmmsg() present\n");
+# endif
+
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord,
NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID));