diff options
-rw-r--r-- | plugins/imudp/imudp.c | 64 |
1 files changed, 62 insertions, 2 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index a0b793ac..0d0ab059 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -363,6 +363,66 @@ finalize_it: +/* The following "two" functions are helpers to runInput. Actually, it is + * just one function. Depending on whether or not we have recvmmsg(), + * an appropriate version is compiled (as such we need to maintain both!). + */ +#ifdef HAVE_RECVMMSG +static inline rsRetVal +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) +{ + DEFiRet; + int iNbrTimeUsed; + time_t ttGenTime; + struct syslogTime stTime; + struct sockaddr_storage frominet; + char errStr[1024]; + struct mmsghdr mmh[1]; + struct iovec iov[1]; + int nelem; + + assert(pThrd != NULL); + iNbrTimeUsed = 0; + while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ + if(pThrd->bShallStop == RSTRUE) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + memset(iov, 0, sizeof(iov)); + iov[0].iov_base = pRcvBuf; + iov[0].iov_len = iMaxLine; + memset(mmh, 0, sizeof(mmh)); + mmh[0].msg_hdr.msg_name = &frominet; + mmh[0].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + mmh[0].msg_hdr.msg_iov = iov; + mmh[0].msg_hdr.msg_iovlen = 1; + nelem = recvmmsg(lstn->sock, mmh, 1, 0, NULL); +dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr))); + if(nelem < 0 && errno == ENOSYS) { + /* be careful: some versions of valgrind do not support recvmmsg()! */ + DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n"); + nelem = recvmsg(lstn->sock, &mmh[0].msg_hdr, 0); + if(nelem >= 0) + mmh[0].msg_len = nelem; + } + if(nelem < 0) { + if(errno != EINTR && errno != EAGAIN) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr); + } + ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! + } + + if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { + datetime.getCurrTime(&stTime, &ttGenTime); + } + + CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, mmh[0].msg_len, &stTime, ttGenTime, &frominet, mmh[0].msg_hdr.msg_namelen)); + } + +finalize_it: + RETiRet; +} +#else /* we do not have recvmmsg() */ /* This function is a helper to runInput. I have extracted it * from the main loop just so that we do not have that large amount of code * in a single place. This function takes a socket and pulls messages from @@ -384,7 +444,6 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; - socklen_t socklen; ssize_t lenRcvBuf; struct sockaddr_storage frominet; char errStr[1024]; @@ -409,7 +468,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "imudp: error receving on socket: %s", errStr); + errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr); } ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } @@ -424,6 +483,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f finalize_it: RETiRet; } +#endif /* #ifdef HAVE_RECVMMSG */ /* check configured scheduling priority. |