summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-10-11 15:44:03 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2013-10-11 15:44:03 +0200
commit3f1fade3558f74e71233d51a14b1d0f1ae7716eb (patch)
tree0120e56fd9821aac021395aab70b9770d18f2310
parent97bda43e372a506671cb7007b6041e4160a02b04 (diff)
downloadrsyslog-3f1fade3558f74e71233d51a14b1d0f1ae7716eb.tar.gz
rsyslog-3f1fade3558f74e71233d51a14b1d0f1ae7716eb.tar.bz2
rsyslog-3f1fade3558f74e71233d51a14b1d0f1ae7716eb.zip
imudp: support for multiple receiver threads added
-rw-r--r--ChangeLog1
-rw-r--r--plugins/imudp/imudp.c125
2 files changed, 105 insertions, 21 deletions
diff --git a/ChangeLog b/ChangeLog
index c0d8ed2a..f1f01188 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,6 @@
---------------------------------------------------------------------------
Version 7.5.5 [devel] 2013-10-??
+- imudp: support for multiple receiver threads added
- imudp: add "dfltTZ" input config parameter
- bugfix: memory leak in mmnormalize
- bugfix: mmutf8fix did not properly handle invalid UTF-8 at END of message
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 20675b0d..310221b2 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -32,6 +32,7 @@
#include <unistd.h>
#include <netdb.h>
#include <sys/socket.h>
+#include <pthread.h>
#if HAVE_SYS_EPOLL_H
# include <sys/epoll.h>
#endif
@@ -60,6 +61,7 @@ MODULE_TYPE_NOKEEP
MODULE_CNFNAME("imudp")
/* defines */
+#define MAX_WRKR_THREADS 32
/* Module static data */
DEF_IMOD_STATIC_DATA
@@ -126,6 +128,19 @@ struct instanceConf_s {
sbool bAppendPortToInpname;
};
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ int id;
+ thrdInfo_t *pThrd;
+ statsobj_t *stats; /* worker thread stats */
+ STATSCOUNTER_DEF(ctrCall_recvmmsg, mutCtrCall_recvmmsg)
+ STATSCOUNTER_DEF(ctrCall_recvmsg, mutCtrCall_recvmsg)
+ STATSCOUNTER_DEF(ctrMsgsRcvd, mutCtrMsgsRcvd)
+} wrkrInfo[MAX_WRKR_THREADS];
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
instanceConf_t *root, *tail;
@@ -134,6 +149,7 @@ struct modConfData_s {
int iSchedPrio; /* scheduling priority */
int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */
int batchSize; /* max nbr of input batch --> also recvmmsg() max count */
+ int8_t wrkrMax; /* max nbr of worker threads */
sbool configSetViaV2Method;
};
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
@@ -144,6 +160,7 @@ static struct cnfparamdescr modpdescr[] = {
{ "schedulingpolicy", eCmdHdlrGetWord, 0 },
{ "schedulingpriority", eCmdHdlrInt, 0 },
{ "batchsize", eCmdHdlrInt, 0 },
+ { "threads", eCmdHdlrPositiveInt, 0 },
{ "timerequery", eCmdHdlrInt, 0 }
};
static struct cnfparamblk modpblk =
@@ -416,7 +433,7 @@ finalize_it:
*/
#ifdef HAVE_RECVMMSG
static inline rsRetVal
-processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
+processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
DEFiRet;
int iNbrTimeUsed;
@@ -429,13 +446,12 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
int nelem;
int i;
- assert(pThrd != NULL);
multiSub.ppMsgs = pMsgs;
multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */
- if(pThrd->bShallStop == RSTRUE)
+ if(pWrkr->pThrd->bShallStop == RSTRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec));
memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr));
@@ -448,11 +464,13 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
recvmsg_mmh[i].msg_hdr.msg_iovlen = 1;
}
nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL);
+ STATSCOUNTER_INC(pWrkr->ctrCall_recvmmsg, pWrkr->mutCtrCall_recvmmsg);
dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr)));
if(nelem < 0 && errno == ENOSYS) {
/* be careful: some versions of valgrind do not support recvmmsg()! */
DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n");
nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0);
+ STATSCOUNTER_INC(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg);
if(nelem >= 0) {
recvmsg_mmh[0].msg_len = nelem;
nelem = 1;
@@ -471,8 +489,9 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr,
datetime.getCurrTime(&stTime, &ttGenTime);
}
+ pWrkr->ctrMsgsRcvd += nelem;
for(i = 0 ; i < nelem ; ++i) {
- processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base,
+ processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base,
recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet,
recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub);
}
@@ -498,7 +517,7 @@ finalize_it:
* on scheduling order. -- rgerhards, 2008-10-02
*/
static inline rsRetVal
-processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
+processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
int iNbrTimeUsed;
time_t ttGenTime;
@@ -515,13 +534,12 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
struct iovec iov[1];
DEFiRet;
- assert(pThrd != NULL);
multiSub.ppMsgs = pMsgs;
multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a bad receive, done below in the body */
- if(pThrd->bShallStop == RSTRUE)
+ if(pWrkr->pThrd->bShallStop == RSTRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
memset(iov, 0, sizeof(iov));
iov[0].iov_base = pRcvBuf;
@@ -532,6 +550,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
mh.msg_iov = iov;
mh.msg_iovlen = 1;
lenRcvBuf = recvmsg(lstn->sock, &mh, 0);
+ STATSCOUNTER_INC(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg);
if(lenRcvBuf < 0) {
if(errno != EINTR && errno != EAGAIN) {
rs_strerror_r(errno, errStr, sizeof(errStr));
@@ -541,11 +560,12 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller!
}
+ ++pWrkr->ctrMsgsRcvd;
if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) {
datetime.getCurrTime(&stTime, &ttGenTime);
}
- CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime,
+ CHKiRet(processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime,
ttGenTime, &frominet, mh.msg_namelen, &multiSub));
}
@@ -685,7 +705,7 @@ finalize_it:
*/
#if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE)
#define NUM_EPOLL_EVENTS 10
-rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
+rsRetVal rcvMainLoop(struct wrkrInfo_s *pWrkr)
{
DEFiRet;
int nfds;
@@ -748,11 +768,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1);
DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds);
- if(pThrd->bShallStop == RSTRUE)
+ if(pWrkr->pThrd->bShallStop == RSTRUE)
break; /* terminate input! */
for(i = 0 ; i < nfds ; ++i) {
- processSocket(pThrd, currEvt[i].data.ptr, &frominetPrev, &bIsPermitted);
+ processSocket(pWrkr, currEvt[i].data.ptr, &frominetPrev, &bIsPermitted);
}
}
@@ -764,7 +784,7 @@ finalize_it:
}
#else /* #if HAVE_EPOLL_CREATE1 */
/* this is the code for the select() interface */
-rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
+rsRetVal rcvMainLoop(thrdInfo_t *pWrkr)
{
DEFiRet;
int maxfds;
@@ -811,7 +831,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) {
if(FD_ISSET(lstn->sock, &readfds)) {
- processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted);
+ processSocket(pWrkr, lstn, &frominetPrev, &bIsPermitted);
--nfds; /* indicate we have processed one descriptor */
}
}
@@ -896,6 +916,7 @@ CODESTARTbeginCnfLoad
pModConf->pConf = pConf;
/* init our settings */
loadModConf->configSetViaV2Method = 0;
+ loadModConf->wrkrMax = 1; /* conservative, but least msg reordering */
loadModConf->batchSize = BATCH_SIZE_DFLT;
loadModConf->iTimeRequery = TIME_REQUERY_DFLT;
loadModConf->iSchedPrio = SCHED_PRIO_UNSET;
@@ -913,6 +934,7 @@ ENDbeginCnfLoad
BEGINsetModCnf
struct cnfparamvals *pvals = NULL;
int i;
+ int wrkrMax;
CODESTARTsetModCnf
pvals = nvlstGetParams(lst, &modpblk, NULL);
if(pvals == NULL) {
@@ -937,6 +959,16 @@ CODESTARTsetModCnf
loadModConf->iSchedPrio = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) {
loadModConf->pszSchedPolicy = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(modpblk.descr[i].name, "threads")) {
+ wrkrMax = (int) pvals[i].val.d.n;
+ if(wrkrMax > MAX_WRKR_THREADS) {
+ errmsg.LogError(0, RS_RET_PARAM_ERROR, "imudp: configured for %d"
+ "worker threads, but maximum permitted is %d",
+ wrkrMax, MAX_WRKR_THREADS);
+ loadModConf->wrkrMax = MAX_WRKR_THREADS;
+ } else {
+ loadModConf->wrkrMax = wrkrMax;
+ }
} else {
dbgprintf("imudp: program error, non-handled "
"param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
@@ -1038,14 +1070,27 @@ CODESTARTfreeCnf
}
ENDfreeCnf
-/* This function is called to gather input.
- * Note that sock must be non-NULL because otherwise we would not have
- * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
- */
-BEGINrunInput
-CODESTARTrunInput
+
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *pWrkr = (struct wrkrInfo_s*) myself;
+# if HAVE_PRCTL && defined PR_SET_NAME
+ uchar *pszDbgHdr;
+# endif
+ uchar thrdName[32];
+
+ snprintf((char*)thrdName, sizeof(thrdName), "imudp(w%d)", pWrkr->id);
+# if HAVE_PRCTL && defined PR_SET_NAME
+ /* set thread name - we ignore if the call fails, has no harsh consequences... */
+ if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) {
+ DBGPRINTF("prctl failed, not setting thread name for '%s'\n", thrdName);
+ }
+# endif
+ dbgOutputTID((char*)thrdName);
+
/* Note well: the setting of scheduling parameters will not work
- * when we dropped privileges (if the user is not sufficently
+ * when we dropped privileges (if the user is not sufficiently
* privileged, of course). Howerver, we can't change the
* scheduling params in PrePrivDrop(), as at that point our thread
* is not yet created. So at least as an interim solution, we do
@@ -1053,7 +1098,45 @@ CODESTARTrunInput
* privileges within the same instance.
*/
setSchedParams(runModConf);
- iRet = rcvMainLoop(pThrd);
+
+ /* support statistics gathering */
+ statsobj.Construct(&(pWrkr->stats));
+ statsobj.SetName(pWrkr->stats, thrdName);
+ STATSCOUNTER_INIT(pWrkr->ctrCall_recvmmsg, pWrkr->mutCtrCall_recvmmsg);
+ statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("called.recvmmsg"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrCall_recvmmsg));
+ STATSCOUNTER_INIT(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg);
+ statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("called.recvmsg"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrCall_recvmsg));
+ STATSCOUNTER_INIT(pWrkr->ctrMsgsRcvd, pWrkr->mutCtrMsgsRcvd);
+ statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("msgs.received"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrMsgsRcvd));
+ statsobj.ConstructFinalize(pWrkr->stats);
+
+ rcvMainLoop(pWrkr);
+
+ /* cleanup */
+ return NULL;
+}
+
+/* This function is called to gather input.
+ * In essence, it just starts the pool of workers.
+ */
+BEGINrunInput
+ int i;
+ pthread_attr_t wrkrThrdAttr;
+CODESTARTrunInput
+ pthread_attr_init(&wrkrThrdAttr);
+ pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024);
+ for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
+ wrkrInfo[i].pThrd = pThrd;
+ wrkrInfo[i].id = i;
+ pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i]));
+ }
+ pthread_attr_destroy(&wrkrThrdAttr);
+ for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
+ pthread_join(wrkrInfo[i].tid, NULL);
+ }
ENDrunInput