summaryrefslogtreecommitdiffstats
path: root/plugins/imudp/imudp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r--plugins/imudp/imudp.c64
1 files changed, 62 insertions, 2 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index a0b793ac..0d0ab059 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -363,6 +363,66 @@ finalize_it:
+/* The following "two" functions are helpers to runInput. Actually, it is
+ * just one function. Depending on whether or not we have recvmmsg(),
+ * an appropriate version is compiled (as such we need to maintain both!).
+ */
+#ifdef HAVE_RECVMMSG
+static inline rsRetVal
+processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
+{
+ DEFiRet;
+ int iNbrTimeUsed;
+ time_t ttGenTime;
+ struct syslogTime stTime;
+ struct sockaddr_storage frominet;
+ char errStr[1024];
+ struct mmsghdr mmh[1];
+ struct iovec iov[1];
+ int nelem;
+
+ assert(pThrd != NULL);
+ iNbrTimeUsed = 0;
+ while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */
+ if(pThrd->bShallStop == RSTRUE)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ memset(iov, 0, sizeof(iov));
+ iov[0].iov_base = pRcvBuf;
+ iov[0].iov_len = iMaxLine;
+ memset(mmh, 0, sizeof(mmh));
+ mmh[0].msg_hdr.msg_name = &frominet;
+ mmh[0].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ mmh[0].msg_hdr.msg_iov = iov;
+ mmh[0].msg_hdr.msg_iovlen = 1;
+ nelem = recvmmsg(lstn->sock, mmh, 1, 0, NULL);
+dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr)));
+ if(nelem < 0 && errno == ENOSYS) {
+ /* be careful: some versions of valgrind do not support recvmmsg()! */
+ DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n");
+ nelem = recvmsg(lstn->sock, &mmh[0].msg_hdr, 0);
+ if(nelem >= 0)
+ mmh[0].msg_len = nelem;
+ }
+ if(nelem < 0) {
+ if(errno != EINTR && errno != EAGAIN) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr);
+ errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr);
+ }
+ ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller!
+ }
+
+ if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) {
+ datetime.getCurrTime(&stTime, &ttGenTime);
+ }
+
+ CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, mmh[0].msg_len, &stTime, ttGenTime, &frominet, mmh[0].msg_hdr.msg_namelen));
+ }
+
+finalize_it:
+ RETiRet;
+}
+#else /* we do not have recvmmsg() */
/* This function is a helper to runInput. I have extracted it
* from the main loop just so that we do not have that large amount of code
* in a single place. This function takes a socket and pulls messages from
@@ -384,7 +444,6 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
int iNbrTimeUsed;
time_t ttGenTime;
struct syslogTime stTime;
- socklen_t socklen;
ssize_t lenRcvBuf;
struct sockaddr_storage frominet;
char errStr[1024];
@@ -409,7 +468,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
if(errno != EINTR && errno != EAGAIN) {
rs_strerror_r(errno, errStr, sizeof(errStr));
DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr);
- errmsg.LogError(errno, NO_ERRCODE, "imudp: error receving on socket: %s", errStr);
+ errmsg.LogError(errno, NO_ERRCODE, "imudp: error receiving on socket: %s", errStr);
}
ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller!
}
@@ -424,6 +483,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
finalize_it:
RETiRet;
}
+#endif /* #ifdef HAVE_RECVMMSG */
/* check configured scheduling priority.