diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-10-11 15:44:03 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-10-11 15:44:03 +0200 |
commit | 3f1fade3558f74e71233d51a14b1d0f1ae7716eb (patch) | |
tree | 0120e56fd9821aac021395aab70b9770d18f2310 | |
parent | 97bda43e372a506671cb7007b6041e4160a02b04 (diff) | |
download | rsyslog-3f1fade3558f74e71233d51a14b1d0f1ae7716eb.tar.gz rsyslog-3f1fade3558f74e71233d51a14b1d0f1ae7716eb.tar.bz2 rsyslog-3f1fade3558f74e71233d51a14b1d0f1ae7716eb.zip |
imudp: support for multiple receiver threads added
-rw-r--r-- | ChangeLog | 1 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 125 |
2 files changed, 105 insertions, 21 deletions
@@ -1,5 +1,6 @@ --------------------------------------------------------------------------- Version 7.5.5 [devel] 2013-10-?? +- imudp: support for multiple receiver threads added - imudp: add "dfltTZ" input config parameter - bugfix: memory leak in mmnormalize - bugfix: mmutf8fix did not properly handle invalid UTF-8 at END of message diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 20675b0d..310221b2 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -32,6 +32,7 @@ #include <unistd.h> #include <netdb.h> #include <sys/socket.h> +#include <pthread.h> #if HAVE_SYS_EPOLL_H # include <sys/epoll.h> #endif @@ -60,6 +61,7 @@ MODULE_TYPE_NOKEEP MODULE_CNFNAME("imudp") /* defines */ +#define MAX_WRKR_THREADS 32 /* Module static data */ DEF_IMOD_STATIC_DATA @@ -126,6 +128,19 @@ struct instanceConf_s { sbool bAppendPortToInpname; }; +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + int id; + thrdInfo_t *pThrd; + statsobj_t *stats; /* worker thread stats */ + STATSCOUNTER_DEF(ctrCall_recvmmsg, mutCtrCall_recvmmsg) + STATSCOUNTER_DEF(ctrCall_recvmsg, mutCtrCall_recvmsg) + STATSCOUNTER_DEF(ctrMsgsRcvd, mutCtrMsgsRcvd) +} wrkrInfo[MAX_WRKR_THREADS]; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ instanceConf_t *root, *tail; @@ -134,6 +149,7 @@ struct modConfData_s { int iSchedPrio; /* scheduling priority */ int iTimeRequery; /* how often is time to be queried inside tight recv loop? 0=always */ int batchSize; /* max nbr of input batch --> also recvmmsg() max count */ + int8_t wrkrMax; /* max nbr of worker threads */ sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ @@ -144,6 +160,7 @@ static struct cnfparamdescr modpdescr[] = { { "schedulingpolicy", eCmdHdlrGetWord, 0 }, { "schedulingpriority", eCmdHdlrInt, 0 }, { "batchsize", eCmdHdlrInt, 0 }, + { "threads", eCmdHdlrPositiveInt, 0 }, { "timerequery", eCmdHdlrInt, 0 } }; static struct cnfparamblk modpblk = @@ -416,7 +433,7 @@ finalize_it: */ #ifdef HAVE_RECVMMSG static inline rsRetVal -processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) +processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { DEFiRet; int iNbrTimeUsed; @@ -429,13 +446,12 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f int nelem; int i; - assert(pThrd != NULL); multiSub.ppMsgs = pMsgs; multiSub.maxElem = CONF_NUM_MULTISUB; multiSub.nElem = 0; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ - if(pThrd->bShallStop == RSTRUE) + if(pWrkr->pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec)); memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr)); @@ -448,11 +464,13 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f recvmsg_mmh[i].msg_hdr.msg_iovlen = 1; } nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL); + STATSCOUNTER_INC(pWrkr->ctrCall_recvmmsg, pWrkr->mutCtrCall_recvmmsg); dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr))); if(nelem < 0 && errno == ENOSYS) { /* be careful: some versions of valgrind do not support recvmmsg()! */ DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n"); nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0); + STATSCOUNTER_INC(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg); if(nelem >= 0) { recvmsg_mmh[0].msg_len = nelem; nelem = 1; @@ -471,8 +489,9 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, datetime.getCurrTime(&stTime, &ttGenTime); } + pWrkr->ctrMsgsRcvd += nelem; for(i = 0 ; i < nelem ; ++i) { - processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, + processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub); } @@ -498,7 +517,7 @@ finalize_it: * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) +processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { int iNbrTimeUsed; time_t ttGenTime; @@ -515,13 +534,12 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f struct iovec iov[1]; DEFiRet; - assert(pThrd != NULL); multiSub.ppMsgs = pMsgs; multiSub.maxElem = CONF_NUM_MULTISUB; multiSub.nElem = 0; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ - if(pThrd->bShallStop == RSTRUE) + if(pWrkr->pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); memset(iov, 0, sizeof(iov)); iov[0].iov_base = pRcvBuf; @@ -532,6 +550,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f mh.msg_iov = iov; mh.msg_iovlen = 1; lenRcvBuf = recvmsg(lstn->sock, &mh, 0); + STATSCOUNTER_INC(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -541,11 +560,12 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } + ++pWrkr->ctrMsgsRcvd; if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { datetime.getCurrTime(&stTime, &ttGenTime); } - CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime, + CHKiRet(processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime, ttGenTime, &frominet, mh.msg_namelen, &multiSub)); } @@ -685,7 +705,7 @@ finalize_it: */ #if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE) #define NUM_EPOLL_EVENTS 10 -rsRetVal rcvMainLoop(thrdInfo_t *pThrd) +rsRetVal rcvMainLoop(struct wrkrInfo_s *pWrkr) { DEFiRet; int nfds; @@ -748,11 +768,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); - if(pThrd->bShallStop == RSTRUE) + if(pWrkr->pThrd->bShallStop == RSTRUE) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(pThrd, currEvt[i].data.ptr, &frominetPrev, &bIsPermitted); + processSocket(pWrkr, currEvt[i].data.ptr, &frominetPrev, &bIsPermitted); } } @@ -764,7 +784,7 @@ finalize_it: } #else /* #if HAVE_EPOLL_CREATE1 */ /* this is the code for the select() interface */ -rsRetVal rcvMainLoop(thrdInfo_t *pThrd) +rsRetVal rcvMainLoop(thrdInfo_t *pWrkr) { DEFiRet; int maxfds; @@ -811,7 +831,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) { if(FD_ISSET(lstn->sock, &readfds)) { - processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted); + processSocket(pWrkr, lstn, &frominetPrev, &bIsPermitted); --nfds; /* indicate we have processed one descriptor */ } } @@ -896,6 +916,7 @@ CODESTARTbeginCnfLoad pModConf->pConf = pConf; /* init our settings */ loadModConf->configSetViaV2Method = 0; + loadModConf->wrkrMax = 1; /* conservative, but least msg reordering */ loadModConf->batchSize = BATCH_SIZE_DFLT; loadModConf->iTimeRequery = TIME_REQUERY_DFLT; loadModConf->iSchedPrio = SCHED_PRIO_UNSET; @@ -913,6 +934,7 @@ ENDbeginCnfLoad BEGINsetModCnf struct cnfparamvals *pvals = NULL; int i; + int wrkrMax; CODESTARTsetModCnf pvals = nvlstGetParams(lst, &modpblk, NULL); if(pvals == NULL) { @@ -937,6 +959,16 @@ CODESTARTsetModCnf loadModConf->iSchedPrio = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "schedulingpolicy")) { loadModConf->pszSchedPolicy = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "threads")) { + wrkrMax = (int) pvals[i].val.d.n; + if(wrkrMax > MAX_WRKR_THREADS) { + errmsg.LogError(0, RS_RET_PARAM_ERROR, "imudp: configured for %d" + "worker threads, but maximum permitted is %d", + wrkrMax, MAX_WRKR_THREADS); + loadModConf->wrkrMax = MAX_WRKR_THREADS; + } else { + loadModConf->wrkrMax = wrkrMax; + } } else { dbgprintf("imudp: program error, non-handled " "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); @@ -1038,14 +1070,27 @@ CODESTARTfreeCnf } ENDfreeCnf -/* This function is called to gather input. - * Note that sock must be non-NULL because otherwise we would not have - * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 - */ -BEGINrunInput -CODESTARTrunInput + +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *pWrkr = (struct wrkrInfo_s*) myself; +# if HAVE_PRCTL && defined PR_SET_NAME + uchar *pszDbgHdr; +# endif + uchar thrdName[32]; + + snprintf((char*)thrdName, sizeof(thrdName), "imudp(w%d)", pWrkr->id); +# if HAVE_PRCTL && defined PR_SET_NAME + /* set thread name - we ignore if the call fails, has no harsh consequences... */ + if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", thrdName); + } +# endif + dbgOutputTID((char*)thrdName); + /* Note well: the setting of scheduling parameters will not work - * when we dropped privileges (if the user is not sufficently + * when we dropped privileges (if the user is not sufficiently * privileged, of course). Howerver, we can't change the * scheduling params in PrePrivDrop(), as at that point our thread * is not yet created. So at least as an interim solution, we do @@ -1053,7 +1098,45 @@ CODESTARTrunInput * privileges within the same instance. */ setSchedParams(runModConf); - iRet = rcvMainLoop(pThrd); + + /* support statistics gathering */ + statsobj.Construct(&(pWrkr->stats)); + statsobj.SetName(pWrkr->stats, thrdName); + STATSCOUNTER_INIT(pWrkr->ctrCall_recvmmsg, pWrkr->mutCtrCall_recvmmsg); + statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("called.recvmmsg"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrCall_recvmmsg)); + STATSCOUNTER_INIT(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg); + statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("called.recvmsg"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrCall_recvmsg)); + STATSCOUNTER_INIT(pWrkr->ctrMsgsRcvd, pWrkr->mutCtrMsgsRcvd); + statsobj.AddCounter(pWrkr->stats, UCHAR_CONSTANT("msgs.received"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkr->ctrMsgsRcvd)); + statsobj.ConstructFinalize(pWrkr->stats); + + rcvMainLoop(pWrkr); + + /* cleanup */ + return NULL; +} + +/* This function is called to gather input. + * In essence, it just starts the pool of workers. + */ +BEGINrunInput + int i; + pthread_attr_t wrkrThrdAttr; +CODESTARTrunInput + pthread_attr_init(&wrkrThrdAttr); + pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024); + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { + wrkrInfo[i].pThrd = pThrd; + wrkrInfo[i].id = i; + pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); + } + pthread_attr_destroy(&wrkrThrdAttr); + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { + pthread_join(wrkrInfo[i].tid, NULL); + } ENDrunInput |