From c2976fcbd63fd82dc9c9d73d234c751937129e7d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 11 Jul 2013 11:09:10 +0200 Subject: imudp: refactor in preparation of recvmmsg() enhancement --- plugins/imudp/imudp.c | 137 ++++++++++++++++++++++++++++---------------------- 1 file changed, 77 insertions(+), 60 deletions(-) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index ec128d14..878d61e5 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -290,6 +290,80 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta } +/* This function processes received data. It provides unified handling + * in cases where recvmmsg() is available and not. + */ +static inline rsRetVal +processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, + ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime) +{ + DEFiRet; + socklen_t socklen; + struct sockaddr_storage frominet; + msg_t *pMsg; + + assert(pThrd != NULL); + + if(lenRcvBuf == 0) + FINALIZE; /* this looks a bit strange, but practice shows it happens... */ + + /* if we reach this point, we had a good receive and can process the packet received */ + /* check if we have a different sender than before, if so, we need to query some new values */ + if(bDoACLCheck) { + socklen = sizeof(struct sockaddr_storage); + if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { + memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ + /* Here we check if a host is permitted to send us syslog messages. If it isn't, + * we do not further process the message but log a warning (if we are + * configured to do this). However, if the check would require name resolution, + * it is postponed to the main queue. See also my blog post at + * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html + * rgerhards, 2009-11-16 + */ + *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP", + (struct sockaddr *)&frominet, "", 0); + + if(*pbIsPermitted == 0) { + DBGPRINTF("msg is not from an allowed sender\n"); + if(glbl.GetOption_DisallowWarning) { + time_t tt; + datetime.GetTime(&tt); + if(tt > ttLastDiscard + 60) { + ttLastDiscard = tt; + errmsg.LogError(0, NO_ERRCODE, + "UDP message from disallowed sender discarded"); + } + } + } + } + } else { + *pbIsPermitted = 1; /* no check -> everything permitted */ + } + + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + + if(*pbIsPermitted != 0) { + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); + MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); + MsgSetInputName(pMsg, pInputName); + MsgSetRuleset(pMsg, lstn->pRuleset); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; + if(*pbIsPermitted == 2) + pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ + CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); + CHKiRet(submitMsg(pMsg)); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); + } + +finalize_it: + RETiRet; +} + + + + /* This function is a helper to runInput. I have extracted it * from the main loop just so that we do not have that large amount of code * in a single place. This function takes a socket and pulls messages from @@ -314,9 +388,6 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f socklen_t socklen; ssize_t lenRcvBuf; struct sockaddr_storage frominet; - msg_t *pMsg; - prop_t *propFromHost = NULL; - prop_t *propFromHostIP = NULL; char errStr[1024]; assert(pThrd != NULL); @@ -335,68 +406,14 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } - if(lenRcvBuf == 0) - continue; /* this looks a bit strange, but practice shows it happens... */ - - /* if we reach this point, we had a good receive and can process the packet received */ - /* check if we have a different sender than before, if so, we need to query some new values */ - if(bDoACLCheck) { - if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { - memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ - /* Here we check if a host is permitted to send us syslog messages. If it isn't, - * we do not further process the message but log a warning (if we are - * configured to do this). However, if the check would require name resolution, - * it is postponed to the main queue. See also my blog post at - * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html - * rgerhards, 2009-11-16 - */ - *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP", - (struct sockaddr *)&frominet, "", 0); - - if(*pbIsPermitted == 0) { - DBGPRINTF("msg is not from an allowed sender\n"); - if(glbl.GetOption_DisallowWarning) { - time_t tt; - datetime.GetTime(&tt); - if(tt > ttLastDiscard + 60) { - ttLastDiscard = tt; - errmsg.LogError(0, NO_ERRCODE, - "UDP message from disallowed sender discarded"); - } - } - } - } - } else { - *pbIsPermitted = 1; /* no check -> everything permitted */ + if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); - - if(*pbIsPermitted != 0) { - if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { - datetime.getCurrTime(&stTime, &ttGenTime); - } - /* we now create our own message object and submit it to the queue */ - CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); - MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); - MsgSetInputName(pMsg, pInputName); - MsgSetRuleset(pMsg, lstn->pRuleset); - MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; - if(*pbIsPermitted == 2) - pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ - CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); - CHKiRet(submitMsg(pMsg)); - STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); - } + CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, lenRcvBuf, &stTime, ttGenTime)); } finalize_it: - if(propFromHost != NULL) - prop.Destruct(&propFromHost); - if(propFromHostIP != NULL) - prop.Destruct(&propFromHostIP); - RETiRet; } -- cgit v1.2.3 From ef4c7fe7e6c413494b52330a3c62158b6f2ca884 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 11 Jul 2013 11:43:00 +0200 Subject: imudp: refactor to use recvmsg() in preparation for recvmmsg() --- plugins/imudp/imudp.c | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 878d61e5..a7e9cff3 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -33,6 +33,7 @@ #include #include #include +#include #if HAVE_SYS_EPOLL_H # include #endif @@ -295,11 +296,9 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta */ static inline rsRetVal processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime) + ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, struct sockaddr_storage *frominet, socklen_t socklen) { DEFiRet; - socklen_t socklen; - struct sockaddr_storage frominet; msg_t *pMsg; assert(pThrd != NULL); @@ -311,8 +310,8 @@ processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f /* check if we have a different sender than before, if so, we need to query some new values */ if(bDoACLCheck) { socklen = sizeof(struct sockaddr_storage); - if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { - memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ + if(net.CmpHost(frominet, frominetPrev, socklen) != 0) { + memcpy(frominetPrev, frominet, socklen); /* update cache indicator */ /* Here we check if a host is permitted to send us syslog messages. If it isn't, * we do not further process the message but log a warning (if we are * configured to do this). However, if the check would require name resolution, @@ -321,7 +320,7 @@ processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f * rgerhards, 2009-11-16 */ *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP", - (struct sockaddr *)&frominet, "", 0); + (struct sockaddr *)frominet, "", 0); if(*pbIsPermitted == 0) { DBGPRINTF("msg is not from an allowed sender\n"); @@ -352,7 +351,7 @@ processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ - CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); + CHKiRet(msgSetFromSockinfo(pMsg, frominet)); CHKiRet(submitMsg(pMsg)); STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } @@ -389,14 +388,23 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f ssize_t lenRcvBuf; struct sockaddr_storage frominet; char errStr[1024]; + struct msghdr mh; + struct iovec iov[1]; assert(pThrd != NULL); iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); - socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + memset(iov, 0, sizeof(iov)); + iov[0].iov_base = pRcvBuf; + iov[0].iov_len = iMaxLine; + memset(&mh, 0, sizeof(mh)); + mh.msg_name = &frominet; + mh.msg_namelen = sizeof(struct sockaddr_storage); + mh.msg_iov = iov; + mh.msg_iovlen = 1; + lenRcvBuf = recvmsg(lstn->sock, &mh, 0); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -410,7 +418,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f datetime.getCurrTime(&stTime, &ttGenTime); } - CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, lenRcvBuf, &stTime, ttGenTime)); + CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, lenRcvBuf, &stTime, ttGenTime, &frominet, mh.msg_namelen)); } finalize_it: -- cgit v1.2.3 From dc309b7fa48bf013c28c120fa26110c23726047d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 11 Jul 2013 11:45:10 +0200 Subject: imudp: fix error message on socket error condition --- plugins/imudp/imudp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index a7e9cff3..a0b793ac 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -409,7 +409,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receving on socket: %s", errStr); } ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } -- cgit v1.2.3 From 88193b17381681fe0098ab3c959fae6c2c1d4632 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 11 Jul 2013 12:33:18 +0200 Subject: imudp milestone: recvmmsg() now supported, but reads only a single msg obviously, as an end-result that would be brainless ;) --- plugins/imudp/imudp.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index a0b793ac..0d0ab059 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -363,6 +363,66 @@ finalize_it: +/* The following "two" functions are helpers to runInput. Actually, it is + * just one function. Depending on whether or not we have recvmmsg(), + * an appropriate version is compiled (as such we need to maintain both!). + */ +#ifdef HAVE_RECVMMSG +static inline rsRetVal +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) +{ + DEFiRet; + int iNbrTimeUsed; + time_t ttGenTime; + struct syslogTime stTime; + struct sockaddr_storage frominet; + char errStr[1024]; + struct mmsghdr mmh[1]; + struct iovec iov[1]; + int nelem; + + assert(pThrd != NULL); + iNbrTimeUsed = 0; + while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ + if(pThrd->bShallStop == RSTRUE) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + memset(iov, 0, sizeof(iov)); + iov[0].iov_base = pRcvBuf; + iov[0].iov_len = iMaxLine; + memset(mmh, 0, sizeof(mmh)); + mmh[0].msg_hdr.msg_name = &frominet; + mmh[0].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + mmh[0].msg_hdr.msg_iov = iov; + mmh[0].msg_hdr.msg_iovlen = 1; + nelem = recvmmsg(lstn->sock, mmh, 1, 0, NULL); +dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr))); + if(nelem < 0 && errno == ENOSYS) { + /* be careful: some versions of valgrind do not support recvmmsg()! */ + DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n"); + nelem = recvmsg(lstn->sock, &mmh[0].msg_hdr, 0); + if(nelem >= 0) + mmh[0].msg_len = nelem; + } + if(nelem < 0) { + if(errno != EINTR && errno != EAGAIN) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr); + } + ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! + } + + if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); + } + + CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, mmh[0].msg_len, &stTime, ttGenTime, &frominet, mmh[0].msg_hdr.msg_namelen)); + } + +finalize_it: + RETiRet; +} +#else /* we do not have recvmmsg() */ /* This function is a helper to runInput. I have extracted it * from the main loop just so that we do not have that large amount of code * in a single place. This function takes a socket and pulls messages from @@ -384,7 +444,6 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; - socklen_t socklen; ssize_t lenRcvBuf; struct sockaddr_storage frominet; char errStr[1024]; @@ -409,7 +468,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "imudp: error receving on socket: %s", errStr); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr); } ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } @@ -424,6 +483,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f finalize_it: RETiRet; } +#endif /* #ifdef HAVE_RECVMMSG */ /* check configured scheduling priority. -- cgit v1.2.3 From cd6e7dab26f6fcfb772240cce76b62384a162cf8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 11 Jul 2013 18:28:11 +0200 Subject: imudp: add support for recvmmsg() --- configure.ac | 2 +- doc/imudp.html | 12 ++++++++- plugins/imudp/imudp.c | 67 ++++++++++++++++++++++++++++++++++++--------------- 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/configure.ac b/configure.ac index 3be549e3..1524beaa 100644 --- a/configure.ac +++ b/configure.ac @@ -120,7 +120,7 @@ AC_TYPE_SIGNAL AC_FUNC_STAT AC_FUNC_STRERROR_R AC_FUNC_VPRINTF -AC_CHECK_FUNCS([flock basename alarm clock_gettime getifaddrs gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r getline malloc_trim prctl epoll_create epoll_create1 fdatasync lseek64]) +AC_CHECK_FUNCS([flock recvmmsg basename alarm clock_gettime getifaddrs gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r getline malloc_trim prctl epoll_create epoll_create1 fdatasync lseek64]) # the check below is probably ugly. If someone knows how to do it in a better way, please # let me know! -- rgerhards, 2010-10-06 diff --git a/doc/imudp.html b/doc/imudp.html index 961bbeba..c71e1556 100644 --- a/doc/imudp.html +++ b/doc/imudp.html @@ -36,8 +36,18 @@ is provided by the platform. Most useful to select "fifo" for real-time processing under Linux (and thus reduce chance of packet loss).
  • SchedulingPriority <number>
    Scheduling priority to use. +
  • batchSize <number>
    +This parameter is only meaningful if the system support recvmmsg() (newer Linux +OSs do this). The parameter is silently ignored if the system does not support +it. If supported, it sets the maximum number of UDP messages that can be obtained +with a single OS call. For systems with high UDP traffic, a relatively high batch +size can reduce system overhead and improve performance. However, this parameter +should not be overdone. For each buffer, max message size bytes are statically +required. Also, a too-high number leads to reduced efficiency, as some structures +need to be completely initialized before the OS call is done. We would suggest to not +set it above a value of 128, except if experimental results show that this is useful. -

    Action Parameters:

    +

    Input Parameters:

    • Address <IP>
      local IP address (or name) the UDP listens should bind to
    • diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0d0ab059..c64c06ca 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -81,6 +81,11 @@ static struct lstn_s { STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; +# ifdef HAVE_RECVMMSG +static struct mmsghdr *recvmsg_mmh; +static struct iovec *recvmsg_iov; +#endif + static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -94,6 +99,7 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ +#define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */ #define TIME_REQUERY_DFLT 2 #define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */ /* config vars for legacy config system */ @@ -121,6 +127,7 @@ struct modConfData_s { int iSchedPolicy; /* scheduling policy as SCHED_xxx */ int iSchedPrio; /* scheduling priority */ int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */ + int batchSize; /* max nbr of input batch --> also recvmmsg() max count */ sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ @@ -130,6 +137,7 @@ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current lo static struct cnfparamdescr modpdescr[] = { { "schedulingpolicy", eCmdHdlrGetWord, 0 }, { "schedulingpriority", eCmdHdlrInt, 0 }, + { "batchsize", eCmdHdlrInt, 0 }, { "timerequery", eCmdHdlrInt, 0 } }; static struct cnfparamblk modpblk = @@ -296,7 +304,8 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta */ static inline rsRetVal processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, struct sockaddr_storage *frominet, socklen_t socklen) + uchar *rcvBuf, ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, + struct sockaddr_storage *frominet, socklen_t socklen) { DEFiRet; msg_t *pMsg; @@ -339,12 +348,12 @@ processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, rcvBuf); if(*pbIsPermitted != 0) { /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); - MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); + MsgSetRawMsg(pMsg, (char*)rcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); @@ -377,31 +386,34 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f struct syslogTime stTime; struct sockaddr_storage frominet; char errStr[1024]; - struct mmsghdr mmh[1]; - struct iovec iov[1]; int nelem; + int i; assert(pThrd != NULL); iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); - memset(iov, 0, sizeof(iov)); - iov[0].iov_base = pRcvBuf; - iov[0].iov_len = iMaxLine; - memset(mmh, 0, sizeof(mmh)); - mmh[0].msg_hdr.msg_name = &frominet; - mmh[0].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - mmh[0].msg_hdr.msg_iov = iov; - mmh[0].msg_hdr.msg_iovlen = 1; - nelem = recvmmsg(lstn->sock, mmh, 1, 0, NULL); + memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec)); + memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr)); + for(i = 0 ; i < runModConf->batchSize ; ++i) { + recvmsg_iov[i].iov_base = pRcvBuf+(i*(iMaxLine+1)); + recvmsg_iov[i].iov_len = iMaxLine; + recvmsg_mmh[i].msg_hdr.msg_name = &frominet; + recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + recvmsg_mmh[i].msg_hdr.msg_iov = &(recvmsg_iov[i]); + recvmsg_mmh[i].msg_hdr.msg_iovlen = 1; + } + nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL); dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr))); if(nelem < 0 && errno == ENOSYS) { /* be careful: some versions of valgrind do not support recvmmsg()! */ DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n"); - nelem = recvmsg(lstn->sock, &mmh[0].msg_hdr, 0); - if(nelem >= 0) - mmh[0].msg_len = nelem; + nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0); + if(nelem >= 0) { + recvmsg_mmh[0].msg_len = nelem; + nelem = 1; + } } if(nelem < 0) { if(errno != EINTR && errno != EAGAIN) { @@ -416,7 +428,10 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, datetime.getCurrTime(&stTime, &ttGenTime); } - CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, mmh[0].msg_len, &stTime, ttGenTime, &frominet, mmh[0].msg_hdr.msg_namelen)); + for(i = 0 ; i < nelem ; ++i) { + processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, + recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, recvmsg_mmh[i].msg_hdr.msg_namelen); + } } finalize_it: @@ -477,7 +492,8 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f datetime.getCurrTime(&stTime, &ttGenTime); } - CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, lenRcvBuf, &stTime, ttGenTime, &frominet, mh.msg_namelen)); + CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime, + ttGenTime, &frominet, mh.msg_namelen)); } finalize_it: @@ -818,6 +834,7 @@ CODESTARTbeginCnfLoad pModConf->pConf = pConf; /* init our settings */ loadModConf->configSetViaV2Method = 0; + loadModConf->batchSize = BATCH_SIZE_DFLT; loadModConf->iTimeRequery = TIME_REQUERY_DFLT; loadModConf->iSchedPrio = SCHED_PRIO_UNSET; loadModConf->pszSchedPolicy = NULL; @@ -852,6 +869,8 @@ CODESTARTsetModCnf continue; if(!strcmp(modpblk.descr[i].name, "timerequery")) { loadModConf->iTimeRequery = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "batchsize")) { + loadModConf->batchSize = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpriority")) { loadModConf->iSchedPrio = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) { @@ -927,10 +946,18 @@ ENDactivateCnfPrePrivDrop BEGINactivateCnf + int lenRcvBuf; CODESTARTactivateCnf /* caching various settings */ iMaxLine = glbl.GetMaxLine(); - CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char))); + lenRcvBuf = (iMaxLine + 1) * sizeof(char); +# ifdef HAVE_RECVMMSG + lenRcvBuf *= runModConf->batchSize; + CHKmalloc(recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec))); + CHKmalloc(recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr))); +# endif +dbgprintf("DDDD: sizing for batch size %d\n", runModConf->batchSize); + CHKmalloc(pRcvBuf = MALLOC(lenRcvBuf)); finalize_it: ENDactivateCnf -- cgit v1.2.3