summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-07-11 18:28:11 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2013-07-11 18:28:11 +0200
commitcd6e7dab26f6fcfb772240cce76b62384a162cf8 (patch)
treeb39d0aee8f2a99a2854a84fbedca1de8a2d259a9
parent88193b17381681fe0098ab3c959fae6c2c1d4632 (diff)
downloadrsyslog-cd6e7dab26f6fcfb772240cce76b62384a162cf8.tar.gz
rsyslog-cd6e7dab26f6fcfb772240cce76b62384a162cf8.tar.bz2
rsyslog-cd6e7dab26f6fcfb772240cce76b62384a162cf8.zip
imudp: add support for recvmmsg()
-rw-r--r--configure.ac2
-rw-r--r--doc/imudp.html12
-rw-r--r--plugins/imudp/imudp.c67
3 files changed, 59 insertions, 22 deletions
diff --git a/configure.ac b/configure.ac
index 3be549e3..1524beaa 100644
--- a/configure.ac
+++ b/configure.ac
@@ -120,7 +120,7 @@ AC_TYPE_SIGNAL
AC_FUNC_STAT
AC_FUNC_STRERROR_R
AC_FUNC_VPRINTF
-AC_CHECK_FUNCS([flock basename alarm clock_gettime getifaddrs gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r getline malloc_trim prctl epoll_create epoll_create1 fdatasync lseek64])
+AC_CHECK_FUNCS([flock recvmmsg basename alarm clock_gettime getifaddrs gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r getline malloc_trim prctl epoll_create epoll_create1 fdatasync lseek64])
# the check below is probably ugly. If someone knows how to do it in a better way, please
# let me know! -- rgerhards, 2010-10-06
diff --git a/doc/imudp.html b/doc/imudp.html
index 961bbeba..c71e1556 100644
--- a/doc/imudp.html
+++ b/doc/imudp.html
@@ -36,8 +36,18 @@ is provided by the platform. Most useful to select "fifo" for real-time
processing under Linux (and thus reduce chance of packet loss).
<li><b>SchedulingPriority</b> &lt;number&gt;<br>
Scheduling priority to use.
+<li><b>batchSize</b> &lt;number&gt;<br>
+This parameter is only meaningful if the system support recvmmsg() (newer Linux
+OSs do this). The parameter is silently ignored if the system does not support
+it. If supported, it sets the maximum number of UDP messages that can be obtained
+with a single OS call. For systems with high UDP traffic, a relatively high batch
+size can reduce system overhead and improve performance. However, this parameter
+should not be overdone. For each buffer, max message size bytes are statically
+required. Also, a too-high number leads to reduced efficiency, as some structures
+need to be completely initialized before the OS call is done. We would suggest to not
+set it above a value of 128, except if experimental results show that this is useful.
</ul>
-<p><b>Action Parameters</b>:</p>
+<p><b>Input Parameters</b>:</p>
<ul>
<li><b>Address</b> &lt;IP&gt;<br>
local IP address (or name) the UDP listens should bind to</li>
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 0d0ab059..c64c06ca 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -81,6 +81,11 @@ static struct lstn_s {
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
} *lcnfRoot = NULL, *lcnfLast = NULL;
+# ifdef HAVE_RECVMMSG
+static struct mmsghdr *recvmsg_mmh;
+static struct iovec *recvmsg_iov;
+#endif
+
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
static int iMaxLine; /* maximum UDP message size supported */
@@ -94,6 +99,7 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a
*/
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
+#define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */
#define TIME_REQUERY_DFLT 2
#define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */
/* config vars for legacy config system */
@@ -121,6 +127,7 @@ struct modConfData_s {
int iSchedPolicy; /* scheduling policy as SCHED_xxx */
int iSchedPrio; /* scheduling priority */
int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */
+ int batchSize; /* max nbr of input batch --> also recvmmsg() max count */
sbool configSetViaV2Method;
};
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
@@ -130,6 +137,7 @@ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current lo
static struct cnfparamdescr modpdescr[] = {
{ "schedulingpolicy", eCmdHdlrGetWord, 0 },
{ "schedulingpriority", eCmdHdlrInt, 0 },
+ { "batchsize", eCmdHdlrInt, 0 },
{ "timerequery", eCmdHdlrInt, 0 }
};
static struct cnfparamblk modpblk =
@@ -296,7 +304,8 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta
*/
static inline rsRetVal
processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
- ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, struct sockaddr_storage *frominet, socklen_t socklen)
+ uchar *rcvBuf, ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime,
+ struct sockaddr_storage *frominet, socklen_t socklen)
{
DEFiRet;
msg_t *pMsg;
@@ -339,12 +348,12 @@ processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
*pbIsPermitted = 1; /* no check -> everything permitted */
}
- DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
+ DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, rcvBuf);
if(*pbIsPermitted != 0) {
/* we now create our own message object and submit it to the queue */
CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime));
- MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
+ MsgSetRawMsg(pMsg, (char*)rcvBuf, lenRcvBuf);
MsgSetInputName(pMsg, pInputName);
MsgSetRuleset(pMsg, lstn->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
@@ -377,31 +386,34 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
struct syslogTime stTime;
struct sockaddr_storage frominet;
char errStr[1024];
- struct mmsghdr mmh[1];
- struct iovec iov[1];
int nelem;
+ int i;
assert(pThrd != NULL);
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */
if(pThrd->bShallStop == RSTRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- memset(iov, 0, sizeof(iov));
- iov[0].iov_base = pRcvBuf;
- iov[0].iov_len = iMaxLine;
- memset(mmh, 0, sizeof(mmh));
- mmh[0].msg_hdr.msg_name = &frominet;
- mmh[0].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
- mmh[0].msg_hdr.msg_iov = iov;
- mmh[0].msg_hdr.msg_iovlen = 1;
- nelem = recvmmsg(lstn->sock, mmh, 1, 0, NULL);
+ memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec));
+ memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr));
+ for(i = 0 ; i < runModConf->batchSize ; ++i) {
+ recvmsg_iov[i].iov_base = pRcvBuf+(i*(iMaxLine+1));
+ recvmsg_iov[i].iov_len = iMaxLine;
+ recvmsg_mmh[i].msg_hdr.msg_name = &frominet;
+ recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ recvmsg_mmh[i].msg_hdr.msg_iov = &(recvmsg_iov[i]);
+ recvmsg_mmh[i].msg_hdr.msg_iovlen = 1;
+ }
+ nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL);
dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr)));
if(nelem < 0 && errno == ENOSYS) {
/* be careful: some versions of valgrind do not support recvmmsg()! */
DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n");
- nelem = recvmsg(lstn->sock, &mmh[0].msg_hdr, 0);
- if(nelem >= 0)
- mmh[0].msg_len = nelem;
+ nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0);
+ if(nelem >= 0) {
+ recvmsg_mmh[0].msg_len = nelem;
+ nelem = 1;
+ }
}
if(nelem < 0) {
if(errno != EINTR && errno != EAGAIN) {
@@ -416,7 +428,10 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr,
datetime.getCurrTime(&stTime, &ttGenTime);
}
- CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, mmh[0].msg_len, &stTime, ttGenTime, &frominet, mmh[0].msg_hdr.msg_namelen));
+ for(i = 0 ; i < nelem ; ++i) {
+ processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base,
+ recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, recvmsg_mmh[i].msg_hdr.msg_namelen);
+ }
}
finalize_it:
@@ -477,7 +492,8 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
datetime.getCurrTime(&stTime, &ttGenTime);
}
- CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, lenRcvBuf, &stTime, ttGenTime, &frominet, mh.msg_namelen));
+ CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime,
+ ttGenTime, &frominet, mh.msg_namelen));
}
finalize_it:
@@ -818,6 +834,7 @@ CODESTARTbeginCnfLoad
pModConf->pConf = pConf;
/* init our settings */
loadModConf->configSetViaV2Method = 0;
+ loadModConf->batchSize = BATCH_SIZE_DFLT;
loadModConf->iTimeRequery = TIME_REQUERY_DFLT;
loadModConf->iSchedPrio = SCHED_PRIO_UNSET;
loadModConf->pszSchedPolicy = NULL;
@@ -852,6 +869,8 @@ CODESTARTsetModCnf
continue;
if(!strcmp(modpblk.descr[i].name, "timerequery")) {
loadModConf->iTimeRequery = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "batchsize")) {
+ loadModConf->batchSize = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "schedulingpriority")) {
loadModConf->iSchedPrio = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) {
@@ -927,10 +946,18 @@ ENDactivateCnfPrePrivDrop
BEGINactivateCnf
+ int lenRcvBuf;
CODESTARTactivateCnf
/* caching various settings */
iMaxLine = glbl.GetMaxLine();
- CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char)));
+ lenRcvBuf = (iMaxLine + 1) * sizeof(char);
+# ifdef HAVE_RECVMMSG
+ lenRcvBuf *= runModConf->batchSize;
+ CHKmalloc(recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec)));
+ CHKmalloc(recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr)));
+# endif
+dbgprintf("DDDD: sizing for batch size %d\n", runModConf->batchSize);
+ CHKmalloc(pRcvBuf = MALLOC(lenRcvBuf));
finalize_it:
ENDactivateCnf