diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-10-18 16:21:27 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-10-18 16:21:27 +0200 |
commit | 0c0d2ee0454db1d5984d242385b23438d9609572 (patch) | |
tree | 41515f75448c0bc41f7542c0ad2df5025984d853 | |
parent | 8c25a1fe35429c82779c8eeb829ba17c0ce8b555 (diff) | |
download | rsyslog-0c0d2ee0454db1d5984d242385b23438d9609572.tar.gz rsyslog-0c0d2ee0454db1d5984d242385b23438d9609572.tar.bz2 rsyslog-0c0d2ee0454db1d5984d242385b23438d9609572.zip |
bugfix: running imupd on multiple threads lead to segfault if recvmmsg is available
-rw-r--r-- | ChangeLog | 2 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 77 |
2 files changed, 45 insertions, 34 deletions
@@ -1,5 +1,7 @@ --------------------------------------------------------------------------- Version 7.5.6 [devel] 2013-10-?? +- bugfix: running imupd on multiple threads lead to segfault if recvmmsg + is available - bugfix: segfault if re_extract() function was used and no match found - bugfix: omelasticsearch did not compile on platforms without atomic instructions diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 95f9a9a5..8753bae7 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -85,10 +85,6 @@ static struct lstn_s { STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; -# ifdef HAVE_RECVMMSG -static struct mmsghdr *recvmsg_mmh; -static struct iovec *recvmsg_iov; -#endif static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ @@ -97,11 +93,6 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte * This shall prevent remote DoS when the "discard on disallowed sender" * message is configured to be logged on occurance of such a case. */ -static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc - * it so that we can check available memory in willRun() and request - * termination if we can not get it. -- rgerhards, 2007-12-27 - */ - #define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */ #define TIME_REQUERY_DFLT 2 #define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */ @@ -139,6 +130,11 @@ static struct wrkrInfo_s { STATSCOUNTER_DEF(ctrCall_recvmmsg, mutCtrCall_recvmmsg) STATSCOUNTER_DEF(ctrCall_recvmsg, mutCtrCall_recvmsg) STATSCOUNTER_DEF(ctrMsgsRcvd, mutCtrMsgsRcvd) + uchar *pRcvBuf; /* receive buffer (for a single packet) */ +# ifdef HAVE_RECVMMSG + struct mmsghdr *recvmsg_mmh; + struct iovec *recvmsg_iov; +# endif } wrkrInfo[MAX_WRKR_THREADS]; struct modConfData_s { @@ -453,26 +449,26 @@ processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_sto while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ if(pWrkr->pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); - memset(recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec)); - memset(recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr)); + memset(pWrkr->recvmsg_iov, 0, runModConf->batchSize * sizeof(struct iovec)); + memset(pWrkr->recvmsg_mmh, 0, runModConf->batchSize * sizeof(struct mmsghdr)); for(i = 0 ; i < runModConf->batchSize ; ++i) { - recvmsg_iov[i].iov_base = pRcvBuf+(i*(iMaxLine+1)); - recvmsg_iov[i].iov_len = iMaxLine; - recvmsg_mmh[i].msg_hdr.msg_name = &frominet; - recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); - recvmsg_mmh[i].msg_hdr.msg_iov = &(recvmsg_iov[i]); - recvmsg_mmh[i].msg_hdr.msg_iovlen = 1; + pWrkr->recvmsg_iov[i].iov_base = pWrkr->pRcvBuf+(i*(iMaxLine+1)); + pWrkr->recvmsg_iov[i].iov_len = iMaxLine; + pWrkr->recvmsg_mmh[i].msg_hdr.msg_name = &frominet; + pWrkr->recvmsg_mmh[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + pWrkr->recvmsg_mmh[i].msg_hdr.msg_iov = &(pWrkr->recvmsg_iov[i]); + pWrkr->recvmsg_mmh[i].msg_hdr.msg_iovlen = 1; } - nelem = recvmmsg(lstn->sock, recvmsg_mmh, runModConf->batchSize, 0, NULL); + nelem = recvmmsg(lstn->sock, pWrkr->recvmsg_mmh, runModConf->batchSize, 0, NULL); STATSCOUNTER_INC(pWrkr->ctrCall_recvmmsg, pWrkr->mutCtrCall_recvmmsg); -dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, sizeof(errStr))); + DBGPRINTF("imudp: recvmmsg returned %d\n", nelem); if(nelem < 0 && errno == ENOSYS) { /* be careful: some versions of valgrind do not support recvmmsg()! */ DBGPRINTF("imudp: error ENOSYS on call to recvmmsg() - fall back to recvmsg\n"); - nelem = recvmsg(lstn->sock, &recvmsg_mmh[0].msg_hdr, 0); + nelem = recvmsg(lstn->sock, &(pWrkr->recvmsg_mmh[0].msg_hdr), 0); STATSCOUNTER_INC(pWrkr->ctrCall_recvmsg, pWrkr->mutCtrCall_recvmsg); if(nelem >= 0) { - recvmsg_mmh[0].msg_len = nelem; + pWrkr->recvmsg_mmh[0].msg_len = nelem; nelem = 1; } } @@ -491,9 +487,9 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, pWrkr->ctrMsgsRcvd += nelem; for(i = 0 ; i < nelem ; ++i) { - processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, - recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, - recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub); + processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pWrkr->recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, + pWrkr->recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, + pWrkr->recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub); } } @@ -542,7 +538,7 @@ processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_sto if(pWrkr->pThrd->bShallStop == RSTRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); memset(iov, 0, sizeof(iov)); - iov[0].iov_base = pRcvBuf; + iov[0].iov_base = pWrkr->pRcvBuf; iov[0].iov_len = iMaxLine; memset(&mh, 0, sizeof(mh)); mh.msg_name = &frominet; @@ -565,7 +561,7 @@ processSocket(struct wrkrInfo_s *pWrkr, struct lstn_s *lstn, struct sockaddr_sto datetime.getCurrTime(&stTime, &ttGenTime); } - CHKiRet(processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime, + CHKiRet(processPacket(pWrkr->pThrd, lstn, frominetPrev, pbIsPermitted, pWrkr->pRcvBuf, lenRcvBuf, &stTime, ttGenTime, &frominet, mh.msg_namelen, &multiSub)); } @@ -1040,6 +1036,7 @@ ENDactivateCnfPrePrivDrop BEGINactivateCnf + int i; int lenRcvBuf; CODESTARTactivateCnf /* caching various settings */ @@ -1047,11 +1044,15 @@ CODESTARTactivateCnf lenRcvBuf = (iMaxLine + 1) * sizeof(char); # ifdef HAVE_RECVMMSG lenRcvBuf *= runModConf->batchSize; - CHKmalloc(recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec))); - CHKmalloc(recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr))); # endif -dbgprintf("DDDD: sizing for batch size %d\n", runModConf->batchSize); - CHKmalloc(pRcvBuf = MALLOC(lenRcvBuf)); + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { +# ifdef HAVE_RECVMMSG + CHKmalloc(wrkrInfo[i].recvmsg_iov = MALLOC(runModConf->batchSize * sizeof(struct iovec))); + CHKmalloc(wrkrInfo[i].recvmsg_mmh = MALLOC(runModConf->batchSize * sizeof(struct mmsghdr))); +# endif + CHKmalloc(wrkrInfo[i].pRcvBuf = MALLOC(lenRcvBuf)); + wrkrInfo[i].id = i; + } finalize_it: ENDactivateCnf @@ -1132,7 +1133,6 @@ CODESTARTrunInput pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024); for(i = 0 ; i < runModConf->wrkrMax - 1 ; ++i) { wrkrInfo[i].pThrd = pThrd; - wrkrInfo[i].id = i; pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); } pthread_attr_destroy(&wrkrThrdAttr); @@ -1157,6 +1157,7 @@ ENDwillRun BEGINafterRun struct lstn_s *lstn, *lstnDel; + int i; CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); @@ -1170,9 +1171,12 @@ CODESTARTafterRun free(lstnDel); } lcnfRoot = lcnfLast = NULL; - if(pRcvBuf != NULL) { - free(pRcvBuf); - pRcvBuf = NULL; + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { +# ifdef HAVE_RECVMMSG + free(wrkrInfo[i].recvmsg_iov); + free(wrkrInfo[i].recvmsg_mmh); +# endif + free(wrkrInfo[i].pRcvBuf); } ENDafterRun @@ -1233,6 +1237,11 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); + DBGPRINTF("imudp: version %s initializing\n", VERSION); +# ifdef HAVE_RECVMMSG + DBGPRINTF("imdup: support for recvmmsg() present\n"); +# endif + /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); |