diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-07-11 18:28:11 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-07-11 18:28:11 +0200 |
commit | cd6e7dab26f6fcfb772240cce76b62384a162cf8 (patch) | |
tree | b39d0aee8f2a99a2854a84fbedca1de8a2d259a9 | |
parent | 88193b17381681fe0098ab3c959fae6c2c1d4632 (diff) | |
download | rsyslog-cd6e7dab26f6fcfb772240cce76b62384a162cf8.tar.gz rsyslog-cd6e7dab26f6fcfb772240cce76b62384a162cf8.tar.bz2 rsyslog-cd6e7dab26f6fcfb772240cce76b62384a162cf8.zip |
imudp: add support for recvmmsg()
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | doc/imudp.html | 12 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 67 |
3 files changed, 59 insertions, 22 deletions
diff --git a/configure.ac b/configure.ac index 3be549e3..1524beaa 100644 --- a/configure.ac +++ b/configure.ac @@ -120,7 +120,7 @@ AC_TYPE_SIGNAL AC_FUNC_STAT AC_FUNC_STRERROR_R AC_FUNC_VPRINTF -AC_CHECK_FUNCS([flock basename alarm clock_gettime getifaddrs gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r getline malloc_trim prctl epoll_create epoll_create1 fdatasync lseek64]) +AC_CHECK_FUNCS([flock recvmmsg basename alarm clock_gettime getifaddrs gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r getline malloc_trim prctl epoll_create epoll_create1 fdatasync lseek64]) # the check below is probably ugly. If someone knows how to do it in a better way, please # let me know! -- rgerhards, 2010-10-06 diff --git a/doc/imudp.html b/doc/imudp.html index 961bbeba..c71e1556 100644 --- a/doc/imudp.html +++ b/doc/imudp.html @@ -36,8 +36,18 @@ is provided by the platform. Most useful to select "fifo" for real-time processing under Linux (and thus reduce chance of packet loss). <li><b>SchedulingPriority</b> <number><br> Scheduling priority to use. +<li><b>batchSize</b> <number><br> +This parameter is only meaningful if the system support recvmmsg() (newer Linux +OSs do this). The parameter is silently ignored if the system does not support +it. If supported, it sets the maximum number of UDP messages that can be obtained +with a single OS call. For systems with high UDP traffic, a relatively high batch +size can reduce system overhead and improve performance. However, this parameter +should not be overdone. For each buffer, max message size bytes are statically +required. Also, a too-high number leads to reduced efficiency, as some structures +need to be completely initialized before the OS call is done. We would suggest to not +set it above a value of 128, except if experimental results show that this is useful. </ul> -<p><b>Action Parameters</b>:</p> +<p><b>Input Parameters</b>:</p> <ul> <li><b>Address</b> <IP><br> local IP address (or name) the UDP listens should bind to</li> diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0d0ab059..c64c06ca 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -81,6 +81,11 @@ static struct lstn_s { STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; +# ifdef HAVE_RECVMMSG +static struct mmsghdr *recvmsg_mmh; +static struct iovec *recvmsg_iov; +#endif + static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -94,6 +99,7 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ +#define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */ #define TIME_REQUERY_DFLT 2 #define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */ /* config vars for legacy config system */ @@ -121,6 +127,7 @@ struct modConfData_s { int iSchedPolicy; /* scheduling policy as SCHED_xxx */ int iSchedPrio; /* scheduling priority */ int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */ + int batchSize; /* max nbr of input batch --> also recvmmsg() max count */ sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ @@ -130,6 +137,7 @@ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current lo static struct cnfparamdescr modpdescr[] = { { "schedulingpolicy", eCmdHdlrGetWord, 0 }, { "schedulingpriority", eCmdHdlrInt, 0 }, + { "batchsize", eCmdHdlrInt, 0 }, { "timerequery", eCmdHdlrInt, 0 } }; static struct cnfparamblk modpblk = @@ -296,7 +304,8 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta */ static inline rsRetVal processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, struct sockaddr_storage *frominet, socklen_t socklen) + uchar *rcvBuf, ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, + struct sockaddr_storage *frominet, socklen_t socklen) { DEFiRet; msg_t *pMsg; @@ -339,12 +348,12 @@ processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, rcvBuf); if(*pbIsPermitted != 0) { /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); - MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); + MsgSetRawMsg(pMsg, (char*)rcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); @@ -377,31 +386,34 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f struct syslogTime stTime; struct sockaddr_storage frominet; char errStr[1024]; - struct mmsghdr mmh[1]; - struct iovec iov[1]; int nelem; + int i; assert(pThrd != NULL); iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); - memset(iov, 0, sizeof(iov)); - iov[0].iov_base = pRcvBuf; - iov[0].iov_len = iMaxLine; - memset(mmh, 0, sizeof(mmh)); - mmh[0].msg_hdr.msg_name = &frominet; - mmh[0].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - mmh[0].msg_hdr.msg_iov = iov; - mmh[0].msg_hdr.msg_iovlen = 1; - nelem = recvmmsg(lstn->sock, mmh, 1, 0, NULL); + memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec)); + memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr)); + for(i = 0 ; i < runModConf->batchSize ; ++i) { + recvmsg_iov[i].iov_base = pRcvBuf+(i*(iMaxLine+1)); + recvmsg_iov[i].iov_len = iMaxLine; + recvmsg_mmh[i].msg_hdr.msg_name = &frominet; + recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + recvmsg_mmh[i].msg_hdr.msg_iov = &(recvmsg_iov[i]); + recvmsg_mmh[i].msg_hdr.msg_iovlen = 1; + } + nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL); dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr))); if(nelem < 0 && errno == ENOSYS) { /* be careful: some versions of valgrind do not support recvmmsg()! */ DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n"); - nelem = recvmsg(lstn->sock, &mmh[0].msg_hdr, 0); - if(nelem >= 0) - mmh[0].msg_len = nelem; + nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0); + if(nelem >= 0) { + recvmsg_mmh[0].msg_len = nelem; + nelem = 1; + } } if(nelem < 0) { if(errno != EINTR && errno != EAGAIN) { @@ -416,7 +428,10 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, datetime.getCurrTime(&stTime, &ttGenTime); } - CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, mmh[0].msg_len, &stTime, ttGenTime, &frominet, mmh[0].msg_hdr.msg_namelen)); + for(i = 0 ; i < nelem ; ++i) { + processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, + recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, recvmsg_mmh[i].msg_hdr.msg_namelen); + } } finalize_it: @@ -477,7 +492,8 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f datetime.getCurrTime(&stTime, &ttGenTime); } - CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, lenRcvBuf, &stTime, ttGenTime, &frominet, mh.msg_namelen)); + CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime, + ttGenTime, &frominet, mh.msg_namelen)); } finalize_it: @@ -818,6 +834,7 @@ CODESTARTbeginCnfLoad pModConf->pConf = pConf; /* init our settings */ loadModConf->configSetViaV2Method = 0; + loadModConf->batchSize = BATCH_SIZE_DFLT; loadModConf->iTimeRequery = TIME_REQUERY_DFLT; loadModConf->iSchedPrio = SCHED_PRIO_UNSET; loadModConf->pszSchedPolicy = NULL; @@ -852,6 +869,8 @@ CODESTARTsetModCnf continue; if(!strcmp(modpblk.descr[i].name, "timerequery")) { loadModConf->iTimeRequery = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "batchsize")) { + loadModConf->batchSize = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpriority")) { loadModConf->iSchedPrio = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) { @@ -927,10 +946,18 @@ ENDactivateCnfPrePrivDrop BEGINactivateCnf + int lenRcvBuf; CODESTARTactivateCnf /* caching various settings */ iMaxLine = glbl.GetMaxLine(); - CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char))); + lenRcvBuf = (iMaxLine + 1) * sizeof(char); +# ifdef HAVE_RECVMMSG + lenRcvBuf *= runModConf->batchSize; + CHKmalloc(recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec))); + CHKmalloc(recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr))); +# endif +dbgprintf("DDDD: sizing for batch size %d\n", runModConf->batchSize); + CHKmalloc(pRcvBuf = MALLOC(lenRcvBuf)); finalize_it: ENDactivateCnf |