diff options
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | doc/imudp.html | 10 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 248 |
3 files changed, 196 insertions, 64 deletions
diff --git a/configure.ac b/configure.ac index f5060f73..aee71664 100644 --- a/configure.ac +++ b/configure.ac @@ -119,7 +119,7 @@ AC_TYPE_SIGNAL AC_FUNC_STAT AC_FUNC_STRERROR_R AC_FUNC_VPRINTF -AC_CHECK_FUNCS([flock basename alarm clock_gettime gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r getline malloc_trim prctl epoll_create epoll_create1 fdatasync syscall lseek64]) +AC_CHECK_FUNCS([flock recvmmsg basename alarm clock_gettime gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r getline malloc_trim prctl epoll_create epoll_create1 fdatasync syscall lseek64]) # getifaddrs is in libc (mostly) or in libsocket (eg Solaris 11) or not defined (eg Solaris 10) AC_SEARCH_LIBS([getifaddrs], [socket], [AC_DEFINE(HAVE_GETIFADDRS, [1], [set define])]) diff --git a/doc/imudp.html b/doc/imudp.html index 043d774c..c84b7f9c 100644 --- a/doc/imudp.html +++ b/doc/imudp.html @@ -36,6 +36,16 @@ is provided by the platform. Most useful to select "fifo" for real-time processing under Linux (and thus reduce chance of packet loss). <li><b>SchedulingPriority</b> <number><br> Scheduling priority to use. +<li><b>batchSize</b> <number><br> +This parameter is only meaningful if the system support recvmmsg() (newer Linux +OSs do this). The parameter is silently ignored if the system does not support +it. If supported, it sets the maximum number of UDP messages that can be obtained +with a single OS call. For systems with high UDP traffic, a relatively high batch +size can reduce system overhead and improve performance. However, this parameter +should not be overdone. For each buffer, max message size bytes are statically +required. Also, a too-high number leads to reduced efficiency, as some structures +need to be completely initialized before the OS call is done. We would suggest to not +set it above a value of 128, except if experimental results show that this is useful. </ul> <p><b>Input Parameters</b>:</p> <ul> diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 386ef219..87f4f37c 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -31,6 +31,7 @@ #include <errno.h> #include <unistd.h> #include <netdb.h> +#include <sys/socket.h> #if HAVE_SYS_EPOLL_H # include <sys/epoll.h> #endif @@ -81,6 +82,11 @@ static struct lstn_s { STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; +# ifdef HAVE_RECVMMSG +static struct mmsghdr *recvmsg_mmh; +static struct iovec *recvmsg_iov; +#endif + static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -93,6 +99,7 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a * termination if we can not get it. -- rgerhards, 2007-12-27 */ +#define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */ #define TIME_REQUERY_DFLT 2 #define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */ /* config vars for legacy config system */ @@ -124,6 +131,7 @@ struct modConfData_s { int iSchedPolicy; /* scheduling policy as SCHED_xxx */ int iSchedPrio; /* scheduling priority */ int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */ + int batchSize; /* max nbr of input batch --> also recvmmsg() max count */ sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ @@ -133,6 +141,7 @@ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current lo static struct cnfparamdescr modpdescr[] = { { "schedulingpolicy", eCmdHdlrGetWord, 0 }, { "schedulingpriority", eCmdHdlrInt, 0 }, + { "batchsize", eCmdHdlrInt, 0 }, { "timerequery", eCmdHdlrInt, 0 } }; static struct cnfparamblk modpblk = @@ -321,6 +330,152 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta } +/* This function processes received data. It provides unified handling + * in cases where recvmmsg() is available and not. + */ +static inline rsRetVal +processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, + uchar *rcvBuf, ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, + struct sockaddr_storage *frominet, socklen_t socklen, multi_submit_t *multiSub) +{ + DEFiRet; + msg_t *pMsg; + + assert(pThrd != NULL); + + if(lenRcvBuf == 0) + FINALIZE; /* this looks a bit strange, but practice shows it happens... */ + + /* if we reach this point, we had a good receive and can process the packet received */ + /* check if we have a different sender than before, if so, we need to query some new values */ + if(bDoACLCheck) { + socklen = sizeof(struct sockaddr_storage); + if(net.CmpHost(frominet, frominetPrev, socklen) != 0) { + memcpy(frominetPrev, frominet, socklen); /* update cache indicator */ + /* Here we check if a host is permitted to send us syslog messages. If it isn't, + * we do not further process the message but log a warning (if we are + * configured to do this). However, if the check would require name resolution, + * it is postponed to the main queue. See also my blog post at + * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html + * rgerhards, 2009-11-16 + */ + *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP", + (struct sockaddr *)frominet, "", 0); + + if(*pbIsPermitted == 0) { + DBGPRINTF("msg is not from an allowed sender\n"); + if(glbl.GetOption_DisallowWarning) { + time_t tt; + datetime.GetTime(&tt); + if(tt > ttLastDiscard + 60) { + ttLastDiscard = tt; + errmsg.LogError(0, NO_ERRCODE, + "UDP message from disallowed sender discarded"); + } + } + } + } + } else { + *pbIsPermitted = 1; /* no check -> everything permitted */ + } + + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, rcvBuf); + + if(*pbIsPermitted != 0) { + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); + MsgSetRawMsg(pMsg, (char*)rcvBuf, lenRcvBuf); + MsgSetInputName(pMsg, lstn->pInputName); + MsgSetRuleset(pMsg, lstn->pRuleset); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; + if(*pbIsPermitted == 2) + pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ + CHKiRet(msgSetFromSockinfo(pMsg, frominet)); + CHKiRet(ratelimitAddMsg(lstn->ratelimiter, multiSub, pMsg)); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); + } + +finalize_it: + RETiRet; +} + + + + +/* The following "two" functions are helpers to runInput. Actually, it is + * just one function. Depending on whether or not we have recvmmsg(), + * an appropriate version is compiled (as such we need to maintain both!). + */ +#ifdef HAVE_RECVMMSG +static inline rsRetVal +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) +{ + DEFiRet; + int iNbrTimeUsed; + time_t ttGenTime; + struct syslogTime stTime; + struct sockaddr_storage frominet; + char errStr[1024]; + msg_t *pMsgs[CONF_NUM_MULTISUB]; + multi_submit_t multiSub; + int nelem; + int i; + + assert(pThrd != NULL); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = CONF_NUM_MULTISUB; + multiSub.nElem = 0; + iNbrTimeUsed = 0; + while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ + if(pThrd->bShallStop == RSTRUE) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec)); + memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr)); + for(i = 0 ; i < runModConf->batchSize ; ++i) { + recvmsg_iov[i].iov_base = pRcvBuf+(i*(iMaxLine+1)); + recvmsg_iov[i].iov_len = iMaxLine; + recvmsg_mmh[i].msg_hdr.msg_name = &frominet; + recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + recvmsg_mmh[i].msg_hdr.msg_iov = &(recvmsg_iov[i]); + recvmsg_mmh[i].msg_hdr.msg_iovlen = 1; + } + nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL); +dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr))); + if(nelem < 0 && errno == ENOSYS) { + /* be careful: some versions of valgrind do not support recvmmsg()! */ + DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n"); + nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0); + if(nelem >= 0) { + recvmsg_mmh[0].msg_len = nelem; + nelem = 1; + } + } + if(nelem < 0) { + if(errno != EINTR && errno != EAGAIN) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr); + } + ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! + } + + if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); + } + + for(i = 0 ; i < nelem ; ++i) { + processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, + recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, + recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub); + } + } + +finalize_it: + multiSubmitFlush(&multiSub); + RETiRet; +} +#else /* we do not have recvmmsg() */ /* This function is a helper to runInput. I have extracted it * from the main loop just so that we do not have that large amount of code * in a single place. This function takes a socket and pulls messages from @@ -341,7 +496,6 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; - socklen_t socklen; ssize_t lenRcvBuf; struct sockaddr_storage frominet; msg_t *pMsg; @@ -351,6 +505,9 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f msg_t *pMsgs[CONF_NUM_MULTISUB]; char errStr[1024]; DEFiRet; + char errStr[1024]; + struct msghdr mh; + struct iovec iov[1]; assert(pThrd != NULL); multiSub.ppMsgs = pMsgs; @@ -360,84 +517,38 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f while(1) { /* loop is terminated if we have a bad receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); - socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + memset(iov, 0, sizeof(iov)); + iov[0].iov_base = pRcvBuf; + iov[0].iov_len = iMaxLine; + memset(&mh, 0, sizeof(mh)); + mh.msg_name = &frominet; + mh.msg_namelen = sizeof(struct sockaddr_storage); + mh.msg_iov = iov; + mh.msg_iovlen = 1; + lenRcvBuf = recvmsg(lstn->sock, &mh, 0); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr); } ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } - if(lenRcvBuf == 0) - continue; /* this looks a bit strange, but practice shows it happens... */ - - /* if we reach this point, we had a good receive and can process the packet received */ - /* check if we have a different sender than before, if so, we need to query some new values */ - if(bDoACLCheck) { - if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { - memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ - /* Here we check if a host is permitted to send us syslog messages. If it isn't, - * we do not further process the message but log a warning (if we are - * configured to do this). However, if the check would require name resolution, - * it is postponed to the main queue. See also my blog post at - * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html - * rgerhards, 2009-11-16 - */ - *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP", - (struct sockaddr *)&frominet, "", 0); - - if(*pbIsPermitted == 0) { - DBGPRINTF("msg is not from an allowed sender\n"); - if(glbl.GetOption_DisallowWarning) { - time_t tt; - datetime.GetTime(&tt); - if(tt > ttLastDiscard + 60) { - ttLastDiscard = tt; - errmsg.LogError(0, NO_ERRCODE, - "UDP message from disallowed sender discarded"); - } - } - } - } - } else { - *pbIsPermitted = 1; /* no check -> everything permitted */ + if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); } - DBGPRINTF("imudp:recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); - - if(*pbIsPermitted != 0) { - if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { - datetime.getCurrTime(&stTime, &ttGenTime); - } - /* we now create our own message object and submit it to the queue */ - CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); - MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); - MsgSetInputName(pMsg, lstn->pInputName); - MsgSetRuleset(pMsg, lstn->pRuleset); - MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; - if(*pbIsPermitted == 2) - pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ - CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); - CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg)); - STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); - } + CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime, + ttGenTime, &frominet, mh.msg_namelen, &multiSub)); } finalize_it: multiSubmitFlush(&multiSub); - - if(propFromHost != NULL) - prop.Destruct(&propFromHost); - if(propFromHostIP != NULL) - prop.Destruct(&propFromHostIP); - RETiRet; } +#endif /* #ifdef HAVE_RECVMMSG */ /* check configured scheduling priority. @@ -777,6 +888,7 @@ CODESTARTbeginCnfLoad pModConf->pConf = pConf; /* init our settings */ loadModConf->configSetViaV2Method = 0; + loadModConf->batchSize = BATCH_SIZE_DFLT; loadModConf->iTimeRequery = TIME_REQUERY_DFLT; loadModConf->iSchedPrio = SCHED_PRIO_UNSET; loadModConf->pszSchedPolicy = NULL; @@ -811,6 +923,8 @@ CODESTARTsetModCnf continue; if(!strcmp(modpblk.descr[i].name, "timerequery")) { loadModConf->iTimeRequery = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "batchsize")) { + loadModConf->batchSize = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpriority")) { loadModConf->iSchedPrio = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) { @@ -886,10 +1000,18 @@ ENDactivateCnfPrePrivDrop BEGINactivateCnf + int lenRcvBuf; CODESTARTactivateCnf /* caching various settings */ iMaxLine = glbl.GetMaxLine(); - CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char))); + lenRcvBuf = (iMaxLine + 1) * sizeof(char); +# ifdef HAVE_RECVMMSG + lenRcvBuf *= runModConf->batchSize; + CHKmalloc(recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec))); + CHKmalloc(recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr))); +# endif +dbgprintf("DDDD: sizing for batch size %d\n", runModConf->batchSize); + CHKmalloc(pRcvBuf = MALLOC(lenRcvBuf)); finalize_it: ENDactivateCnf |