summaryrefslogtreecommitdiffstats
path: root/plugins/imudp/imudp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r--plugins/imudp/imudp.c67
1 files changed, 47 insertions, 20 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 0d0ab059..c64c06ca 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -81,6 +81,11 @@ static struct lstn_s {
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
} *lcnfRoot = NULL, *lcnfLast = NULL;
+# ifdef HAVE_RECVMMSG
+static struct mmsghdr *recvmsg_mmh;
+static struct iovec *recvmsg_iov;
+#endif
+
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
static int iMaxLine; /* maximum UDP message size supported */
@@ -94,6 +99,7 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a
*/
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
+#define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */
#define TIME_REQUERY_DFLT 2
#define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */
/* config vars for legacy config system */
@@ -121,6 +127,7 @@ struct modConfData_s {
int iSchedPolicy; /* scheduling policy as SCHED_xxx */
int iSchedPrio; /* scheduling priority */
int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */
+ int batchSize; /* max nbr of input batch --> also recvmmsg() max count */
sbool configSetViaV2Method;
};
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
@@ -130,6 +137,7 @@ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current lo
static struct cnfparamdescr modpdescr[] = {
{ "schedulingpolicy", eCmdHdlrGetWord, 0 },
{ "schedulingpriority", eCmdHdlrInt, 0 },
+ { "batchsize", eCmdHdlrInt, 0 },
{ "timerequery", eCmdHdlrInt, 0 }
};
static struct cnfparamblk modpblk =
@@ -296,7 +304,8 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta
*/
static inline rsRetVal
processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
- ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, struct sockaddr_storage *frominet, socklen_t socklen)
+ uchar *rcvBuf, ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime,
+ struct sockaddr_storage *frominet, socklen_t socklen)
{
DEFiRet;
msg_t *pMsg;
@@ -339,12 +348,12 @@ processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
*pbIsPermitted = 1; /* no check -> everything permitted */
}
- DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
+ DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, rcvBuf);
if(*pbIsPermitted != 0) {
/* we now create our own message object and submit it to the queue */
CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime));
- MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
+ MsgSetRawMsg(pMsg, (char*)rcvBuf, lenRcvBuf);
MsgSetInputName(pMsg, pInputName);
MsgSetRuleset(pMsg, lstn->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
@@ -377,31 +386,34 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
struct syslogTime stTime;
struct sockaddr_storage frominet;
char errStr[1024];
- struct mmsghdr mmh[1];
- struct iovec iov[1];
int nelem;
+ int i;
assert(pThrd != NULL);
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */
if(pThrd->bShallStop == RSTRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- memset(iov, 0, sizeof(iov));
- iov[0].iov_base = pRcvBuf;
- iov[0].iov_len = iMaxLine;
- memset(mmh, 0, sizeof(mmh));
- mmh[0].msg_hdr.msg_name = &frominet;
- mmh[0].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
- mmh[0].msg_hdr.msg_iov = iov;
- mmh[0].msg_hdr.msg_iovlen = 1;
- nelem = recvmmsg(lstn->sock, mmh, 1, 0, NULL);
+ memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec));
+ memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr));
+ for(i = 0 ; i < runModConf->batchSize ; ++i) {
+ recvmsg_iov[i].iov_base = pRcvBuf+(i*(iMaxLine+1));
+ recvmsg_iov[i].iov_len = iMaxLine;
+ recvmsg_mmh[i].msg_hdr.msg_name = &frominet;
+ recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ recvmsg_mmh[i].msg_hdr.msg_iov = &(recvmsg_iov[i]);
+ recvmsg_mmh[i].msg_hdr.msg_iovlen = 1;
+ }
+ nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL);
dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr)));
if(nelem < 0 && errno == ENOSYS) {
/* be careful: some versions of valgrind do not support recvmmsg()! */
DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n");
- nelem = recvmsg(lstn->sock, &mmh[0].msg_hdr, 0);
- if(nelem >= 0)
- mmh[0].msg_len = nelem;
+ nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0);
+ if(nelem >= 0) {
+ recvmsg_mmh[0].msg_len = nelem;
+ nelem = 1;
+ }
}
if(nelem < 0) {
if(errno != EINTR && errno != EAGAIN) {
@@ -416,7 +428,10 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr,
datetime.getCurrTime(&stTime, &ttGenTime);
}
- CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, mmh[0].msg_len, &stTime, ttGenTime, &frominet, mmh[0].msg_hdr.msg_namelen));
+ for(i = 0 ; i < nelem ; ++i) {
+ processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base,
+ recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, recvmsg_mmh[i].msg_hdr.msg_namelen);
+ }
}
finalize_it:
@@ -477,7 +492,8 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
datetime.getCurrTime(&stTime, &ttGenTime);
}
- CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, lenRcvBuf, &stTime, ttGenTime, &frominet, mh.msg_namelen));
+ CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime,
+ ttGenTime, &frominet, mh.msg_namelen));
}
finalize_it:
@@ -818,6 +834,7 @@ CODESTARTbeginCnfLoad
pModConf->pConf = pConf;
/* init our settings */
loadModConf->configSetViaV2Method = 0;
+ loadModConf->batchSize = BATCH_SIZE_DFLT;
loadModConf->iTimeRequery = TIME_REQUERY_DFLT;
loadModConf->iSchedPrio = SCHED_PRIO_UNSET;
loadModConf->pszSchedPolicy = NULL;
@@ -852,6 +869,8 @@ CODESTARTsetModCnf
continue;
if(!strcmp(modpblk.descr[i].name, "timerequery")) {
loadModConf->iTimeRequery = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "batchsize")) {
+ loadModConf->batchSize = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "schedulingpriority")) {
loadModConf->iSchedPrio = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) {
@@ -927,10 +946,18 @@ ENDactivateCnfPrePrivDrop
BEGINactivateCnf
+ int lenRcvBuf;
CODESTARTactivateCnf
/* caching various settings */
iMaxLine = glbl.GetMaxLine();
- CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char)));
+ lenRcvBuf = (iMaxLine + 1) * sizeof(char);
+# ifdef HAVE_RECVMMSG
+ lenRcvBuf *= runModConf->batchSize;
+ CHKmalloc(recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec)));
+ CHKmalloc(recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr)));
+# endif
+dbgprintf("DDDD: sizing for batch size %d\n", runModConf->batchSize);
+ CHKmalloc(pRcvBuf = MALLOC(lenRcvBuf));
finalize_it:
ENDactivateCnf