From c2757103c99769ff6fa9aaeeaba97d7839ecbb2d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 9 Sep 2009 14:38:33 +0200 Subject: changed imudp to utilize epoll(), where available. This shall provide slightly better performance (just slightly because we called select() rather infrequently on a busy system). --- plugins/imudp/imudp.c | 121 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 101 insertions(+), 20 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 0a8920f5..f1a720bc 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -32,6 +32,9 @@ #include #include #include +#if HAVE_SYS_EPOLL_H +# include +#endif #include "rsyslog.h" #include "dirty.h" #include "net.h" @@ -113,7 +116,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) } else { /* we need to add them */ if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) { - dbgprintf("out of memory trying to allocate udp listen socket array\n"); + DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); /* in this case, we discard the new sockets but continue with what we * already have */ @@ -205,7 +208,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); } - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } /* if we reach this point, we had a good receive and can process the packet received */ @@ -267,22 +270,19 @@ finalize_it: } -/* This function is called to gather input. - * Note that udpLstnSocks must be non-NULL because otherwise we would not have - * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 - * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably - * highly efficient "name caching". Before querying a name, I now check if the name to be - * queried is the same as the one queried in the last message processed. If that is the - * case, we can simple re-use the previous value. This algorithm works quite well with - * few sender, especially if they emit messages in bursts. The more sender and the - * more intermixed messages arrive, the less this algorithm works, but the overhead - * is so minimal (a simple memory compare and move) that this does not hurt. Even - * with a real name lookup cache, this optimization here is useful as it is quicker - * than even a cache lookup). +/* This function implements the main reception loop. Depending on the environment, + * we either use the traditional (but slower) select() or the Linux-specific epoll() + * interface. ./configure settings control which one is used. + * rgerhards, 2009-09-09 */ -BEGINrunInput +#if HAVE_EPOLL_CREATE1 +#define NUM_EPOLL_EVENTS 10 +rsRetVal rcvMainLoop() +{ + DEFiRet; int maxfds; int nfds; + int efd; int i; fd_set readfds; struct sockaddr_storage frominetPrev; @@ -290,16 +290,82 @@ BEGINrunInput uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; -CODESTARTrunInput + struct epoll_event *udpEPollEvt = NULL; + struct epoll_event currEvt[NUM_EPOLL_EVENTS]; + char errStr[1024]; + +RUNLOG_STR("ZZZ: imudp epoll startup"); /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); - /* this is an endless loop - it is terminated when the thread is - * signalled to do so. This, however, is handled by the framework, - * right into the sleep below. + + CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); + + efd = epoll_create1(EPOLL_CLOEXEC); + if(efd < 0) { + DBGPRINTF("epoll_create1() could not create fd\n"); + // TODO: "good" error message + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + + /* fill the epoll set - we need to do this only once, as the set + * can not change dyamically. + */ + maxfds = 0; + FD_ZERO (&readfds); + + /* Add the UDP listen sockets to the list of read descriptors. */ + for (i = 0; i < *udpLstnSocks; i++) { + if (udpLstnSocks[i+1] != -1) { + udpEPollEvt[i].events = EPOLLIN | EPOLLET; + udpEPollEvt[i].data.fd = udpLstnSocks[i+1]; + if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", + udpLstnSocks[i+1], errStr); + } + } + } + +RUNLOG_STR("ZZZ: done setting up epoll interface"); + while(1) { + /* wait for io to become ready */ + nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); + DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); + + for(i = 0 ; i < nfds ; ++i) { +dbgprintf("ZZZ: imudp processing fd %d\n", currEvt[i].data.fd); + processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted, + fromHost, fromHostFQDN, fromHostIP); + } + } + +finalize_it: + RETiRet; +} +#else /* #if HAVE_EPOLL_CREATE1 */ +/* this is the code for the select() interface */ +rsRetVal rcvMainLoop() +{ + DEFiRet; + int maxfds; + int nfds; + int i; + fd_set readfds; + struct sockaddr_storage frominetPrev; + int bIsPermitted; + uchar fromHost[NI_MAXHOST]; + uchar fromHostIP[NI_MAXHOST]; + uchar fromHostFQDN[NI_MAXHOST]; + + /* start "name caching" algo by making sure the previous system indicator + * is invalidated. */ + bIsPermitted = 0; + memset(&frominetPrev, 0, sizeof(frominetPrev)); + while(1) { /* Add the Unix Domain Sockets to the list of read * descriptors. @@ -342,7 +408,22 @@ CODESTARTrunInput /* end of a run, back to loop for next recv() */ } - return iRet; + RETiRet; +} +#endif /* #if HAVE_EPOLL_CREATE1 */ + +/* This function is called to gather input. + * Note that udpLstnSocks must be non-NULL because otherwise we would not have + * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 + */ +BEGINrunInput +CODESTARTrunInput + /* this is an endless loop - it is terminated when the thread is + * signalled to do so. This, however, is handled by the framework, + * right into the sleep below. + */ +RUNLOG_STR("ZZZ: imudp startup"); + iRet = rcvMainLoop(); ENDrunInput -- cgit v1.2.3 From dcc207af92d8eda54cf68a550fae02518f500f94 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 14 Sep 2009 15:26:18 +0200 Subject: removed some debugging comments --- plugins/imudp/imudp.c | 4 ---- 1 file changed, 4 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index f1a720bc..6cd7cae3 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -294,7 +294,6 @@ rsRetVal rcvMainLoop() struct epoll_event currEvt[NUM_EPOLL_EVENTS]; char errStr[1024]; -RUNLOG_STR("ZZZ: imudp epoll startup"); /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ @@ -329,14 +328,12 @@ RUNLOG_STR("ZZZ: imudp epoll startup"); } } -RUNLOG_STR("ZZZ: done setting up epoll interface"); while(1) { /* wait for io to become ready */ nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); for(i = 0 ; i < nfds ; ++i) { -dbgprintf("ZZZ: imudp processing fd %d\n", currEvt[i].data.fd); processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted, fromHost, fromHostFQDN, fromHostIP); } @@ -422,7 +419,6 @@ CODESTARTrunInput * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ -RUNLOG_STR("ZZZ: imudp startup"); iRet = rcvMainLoop(); ENDrunInput -- cgit v1.2.3 From ec56b763b83677d1e9cd02a7ae610caf62e902bb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 8 Oct 2009 16:36:17 +0200 Subject: bugfix in debug system and more instrumentation to find an issue bugfix: debug string larger than 1K were improperly displayed. Max size is now 32K, and if a string is even longer it is meaningful truncated. --- plugins/imudp/imudp.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 735042a4..3fabf1a2 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -244,7 +244,8 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, } } - DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); + //DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { -- cgit v1.2.3 From a99297d6a80ae0c3e65d5e6a81d85ef86baf66ba Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 12 Oct 2009 15:17:47 +0200 Subject: changed imuxsock to no longer use deprecated submitAndParseMsg() interface --- plugins/imudp/imudp.c | 1 - 1 file changed, 1 deletion(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 5bfbdd4a..735042a4 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -259,7 +259,6 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, pMsg->bParseHOSTNAME = 1; MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); -dbgprintf("XXX: submitting msg to queue\n"); CHKiRet(submitMsg(pMsg)); } } -- cgit v1.2.3 From 08fca4477cf525cada8c66d309ea1daa2eac88b2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 12 Oct 2009 17:10:04 +0200 Subject: re-enabled input thread termination handling that does avoid thread cancellation ...where possible. This provides a more reliable mode of rsyslogd termination (canceling threads my result in not properly freed resouces and potential later hangs, even though we perform proper cancel handling in our code). This is part of an effort to reduce thread cnacellation as much as possible in rsyslog. NOTE: some comments indicated that there were problems with some code that has been re-activated. Testing did not show any issues. My current assumption is that these issues were related to some other code that has been removed/changed during the previous restructuring events. In any case, if there is a shutdown issue, one should carefully look at this change here! --- plugins/imudp/imudp.c | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 735042a4..a1484e7f 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -336,6 +336,9 @@ rsRetVal rcvMainLoop() nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ + for(i = 0 ; i < nfds ; ++i) { processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted, fromHost, fromHostFQDN, fromHostIP); @@ -343,6 +346,9 @@ rsRetVal rcvMainLoop() } finalize_it: + if(udpEPollEvt != NULL) + free(udpEPollEvt); + RETiRet; } #else /* #if HAVE_EPOLL_CREATE1 */ -- cgit v1.2.3 From 05b0c4322bbd7621762dc4b9b3a2e1baeabdaa1f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 13 Oct 2009 07:29:11 +0200 Subject: removed no longer needed msg_t property "bParseHOSTNAME" --- plugins/imudp/imudp.c | 1 - 1 file changed, 1 deletion(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index a1484e7f..4f05cd12 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -256,7 +256,6 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, MsgSetInputName(pMsg, pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; - pMsg->bParseHOSTNAME = 1; MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); CHKiRet(submitMsg(pMsg)); -- cgit v1.2.3 From f7575cb9a81ed80848e21ea0cb31b6657f908f5d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 13 Oct 2009 09:08:40 +0200 Subject: added multi-ruleset support to imudp also bumped version number and corrected ChangeLog, where I merged some post 5.3.1 changes into the 5.3.1 section. --- plugins/imudp/imudp.c | 61 ++++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 28 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 4f05cd12..12946c39 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -47,6 +47,7 @@ #include "parser.h" #include "datetime.h" #include "prop.h" +#include "ruleset.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -60,6 +61,7 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(net) DEFobjCurrIf(datetime) DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) static int iMaxLine; /* maximum UDP message size supported */ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded @@ -68,13 +70,14 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte */ static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements * read-only after init(), but beware of restart! */ +static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */ static uchar *pszBindAddr = NULL; /* IP to bind socket to */ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ -// TODO: static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ +static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ #define TIME_REQUERY_DFLT 2 static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ @@ -93,6 +96,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) int *newSocks; int *tmpSocks; int iSrc, iDst; + ruleset_t **tmpRulesets; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -113,26 +117,39 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) if(udpLstnSocks == NULL) { /* esay, we can just replace it */ udpLstnSocks = newSocks; + CHKmalloc(udpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (newSocks[0] + 1))); + for(iDst = 1 ; iDst < newSocks[0] ; ++iDst) + udpRulesets[iDst] = pBindRuleset; } else { /* we need to add them */ - if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) { + tmpSocks = (int*) malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0])); + tmpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0])); + if(tmpSocks == NULL || tmpRulesets == NULL) { DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); /* in this case, we discard the new sockets but continue with what we * already have */ free(newSocks); + free(tmpSocks); + free(tmpRulesets); ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } else { /* ready to copy */ iDst = 1; - for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc) + for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc) { tmpSocks[iDst++] = udpLstnSocks[iSrc]; - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) + tmpRulesets[iDst++] = udpRulesets[iSrc]; + } + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { tmpSocks[iDst++] = newSocks[iSrc]; + tmpRulesets[iDst++] = pBindRuleset; + } tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; free(newSocks); free(udpLstnSocks); udpLstnSocks = tmpSocks; + free(udpRulesets); + udpRulesets = tmpRulesets; } } } @@ -144,7 +161,6 @@ finalize_it: } -#if 0 /* TODO: implement when tehre is time, requires restructure of socket array! */ /* accept a new ruleset to bind. Checks if it exists and complains, if not */ static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) @@ -165,7 +181,6 @@ finalize_it: free(pszName); /* no longer needed */ RETiRet; } -#endif /* This function is a helper to runInput. I have extracted it @@ -184,7 +199,7 @@ finalize_it: */ static inline rsRetVal processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP) + uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP, ruleset_t *pRuleset) { DEFiRet; int iNbrTimeUsed; @@ -254,6 +269,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); + MsgSetRuleset(pMsg, pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); @@ -282,11 +298,9 @@ finalize_it: rsRetVal rcvMainLoop() { DEFiRet; - int maxfds; int nfds; int efd; int i; - fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; uchar fromHost[NI_MAXHOST]; @@ -307,21 +321,16 @@ rsRetVal rcvMainLoop() efd = epoll_create1(EPOLL_CLOEXEC); if(efd < 0) { DBGPRINTF("epoll_create1() could not create fd\n"); - // TODO: "good" error message ABORT_FINALIZE(RS_RET_IO_ERROR); } /* fill the epoll set - we need to do this only once, as the set * can not change dyamically. */ - maxfds = 0; - FD_ZERO (&readfds); - - /* Add the UDP listen sockets to the list of read descriptors. */ for (i = 0; i < *udpLstnSocks; i++) { if (udpLstnSocks[i+1] != -1) { udpEPollEvt[i].events = EPOLLIN | EPOLLET; - udpEPollEvt[i].data.fd = udpLstnSocks[i+1]; + udpEPollEvt[i].data.u64 = i+1; if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { rs_strerror_r(errno, errStr, sizeof(errStr)); errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", @@ -339,8 +348,8 @@ rsRetVal rcvMainLoop() break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted, - fromHost, fromHostFQDN, fromHostIP); + processSocket(udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted, + fromHost, fromHostFQDN, fromHostIP, udpRulesets[currEvt[i].data.u64]); } } @@ -406,7 +415,7 @@ rsRetVal rcvMainLoop() for(i = 0; nfds && i < *udpLstnSocks; i++) { if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, - fromHost, fromHostFQDN, fromHostIP); + fromHost, fromHostFQDN, fromHostIP, udpRulesets[i+1]); --nfds; /* indicate we have processed one descriptor */ } } @@ -447,9 +456,7 @@ CODESTARTwillRun iMaxLine = glbl.GetMaxLine(); - if((pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc(pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))); finalize_it: ENDwillRun @@ -461,6 +468,8 @@ CODESTARTafterRun if(udpLstnSocks != NULL) { net.closeUDPListenSockets(udpLstnSocks); udpLstnSocks = NULL; + free(udpRulesets); + udpRulesets = NULL; } if(pRcvBuf != NULL) { free(pRcvBuf); @@ -478,6 +487,7 @@ CODESTARTmodExit objRelease(glbl, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); ENDmodExit @@ -501,10 +511,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a free(pszBindAddr); pszBindAddr = NULL; } - if(udpLstnSocks != NULL) { - net.closeUDPListenSockets(udpLstnSocks); - udpLstnSocks = NULL; - } iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */ return RS_RET_OK; } @@ -518,13 +524,12 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); /* register config file handlers */ - /* TODO: add - but this requires more changes, no time right now... - CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverbindruleset", 0, eCmdHdlrGetWord, + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); - */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverrun", 0, eCmdHdlrGetWord, addListner, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, -- cgit v1.2.3 From 5b1eb920914d97aee68b674459b68e99957c80d6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 15 Oct 2009 13:40:10 +0200 Subject: improved imudp so that epoll can be used in more environments Fixed potential compile time problem if EPOLL_CLOEXEC is not available. --- plugins/imudp/imudp.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 12946c39..3159de4f 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -293,7 +293,7 @@ finalize_it: * interface. ./configure settings control which one is used. * rgerhards, 2009-09-09 */ -#if HAVE_EPOLL_CREATE1 +#if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE) #define NUM_EPOLL_EVENTS 10 rsRetVal rcvMainLoop() { @@ -318,7 +318,13 @@ rsRetVal rcvMainLoop() CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); - efd = epoll_create1(EPOLL_CLOEXEC); +# if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + DBGPRINTF("imudp uses epoll_create1()\n"); + efd = epoll_create1(EPOLL_CLOEXEC); +# else + DBGPRINTF("imudp uses epoll_create()\n"); + efd = epoll_create(); +# endif if(efd < 0) { DBGPRINTF("epoll_create1() could not create fd\n"); ABORT_FINALIZE(RS_RET_IO_ERROR); @@ -379,6 +385,7 @@ rsRetVal rcvMainLoop() */ bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); + DBGPRINTF("imudp uses select()\n"); while(1) { /* Add the Unix Domain Sockets to the list of read -- cgit v1.2.3 From e53d91ce666453b114880e0619dd8c4d40072201 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 15 Oct 2009 18:33:33 +0200 Subject: solved a recently introduced race during input thread shutdown This was introduced when we re-enabled non-cancel thread termination a few commits ago. This code has never been released as a tarball, so that is no bugfix for a release but rather a WiP regression fix and thus does not need to be mentioned in the ChangeLog. --- plugins/imudp/imudp.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 3159de4f..f8227fa7 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -295,7 +295,7 @@ finalize_it: */ #if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE) #define NUM_EPOLL_EVENTS 10 -rsRetVal rcvMainLoop() +rsRetVal rcvMainLoop(thrdInfo_t *pThrd) { DEFiRet; int nfds; @@ -350,7 +350,7 @@ rsRetVal rcvMainLoop() nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); - if(glbl.GetGlobalInputTermState() == 1) + if(pThrd->bShallStop == TRUE) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { @@ -367,7 +367,7 @@ finalize_it: } #else /* #if HAVE_EPOLL_CREATE1 */ /* this is the code for the select() interface */ -rsRetVal rcvMainLoop() +rsRetVal rcvMainLoop(thrdInfo_t *pThrd) { DEFiRet; int maxfds; @@ -443,7 +443,7 @@ CODESTARTrunInput * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ - iRet = rcvMainLoop(); + iRet = rcvMainLoop(pThrd); ENDrunInput -- cgit v1.2.3 From cd118cfcc22ea283c8d0112aeedc3f0d8b42d8a8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 16 Oct 2009 08:41:59 +0200 Subject: bugfix: compile problem when system provided only epoll_create() I introduced that problem yesterday when I improved epoll support. --- plugins/imudp/imudp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index f8227fa7..269380cf 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -323,7 +323,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) efd = epoll_create1(EPOLL_CLOEXEC); # else DBGPRINTF("imudp uses epoll_create()\n"); - efd = epoll_create(); + efd = epoll_create(NUM_EPOLL_EVENTS); # endif if(efd < 0) { DBGPRINTF("epoll_create1() could not create fd\n"); -- cgit v1.2.3 From f3884d52628b4e43425adad3826c9e324cd60291 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 16 Oct 2009 09:18:51 +0200 Subject: ensure proper imudp shutdown even on a very busy system --- plugins/imudp/imudp.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 59d23adb..5a1d9e8b 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -198,7 +198,7 @@ finalize_it: * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, +processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP, ruleset_t *pRuleset) { DEFiRet; @@ -213,8 +213,11 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, prop_t *propFromHostIP = NULL; char errStr[1024]; + assert(pThrd != NULL); iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ + if(pThrd->bShallStop == TRUE) + ABORT_FINALIZE(RS_RET_FORCE_TERM); socklen = sizeof(struct sockaddr_storage); lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(lenRcvBuf < 0) { @@ -259,8 +262,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, } } - //DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); - DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { @@ -355,7 +357,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted, + processSocket(pThrd, udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted, fromHost, fromHostFQDN, fromHostIP, udpRulesets[currEvt[i].data.u64]); } } @@ -422,7 +424,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) for(i = 0; nfds && i < *udpLstnSocks; i++) { if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { - processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, + processSocket(pThrd, udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, fromHost, fromHostFQDN, fromHostIP, udpRulesets[i+1]); --nfds; /* indicate we have processed one descriptor */ } -- cgit v1.2.3 From 244ae5837c5423bb738d21c67e73ca6efe34c7a9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Oct 2009 14:05:30 +0200 Subject: fixed regression in new multi-ruleset imudp code The new code has not been released so far, so this does not fix any bug known to the "real world". --- plugins/imudp/imudp.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 5a1d9e8b..fddee513 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -136,13 +136,13 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) } else { /* ready to copy */ iDst = 1; - for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc) { - tmpSocks[iDst++] = udpLstnSocks[iSrc]; - tmpRulesets[iDst++] = udpRulesets[iSrc]; + for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc, ++iDst) { + tmpSocks[iDst] = udpLstnSocks[iSrc]; + tmpRulesets[iDst] = udpRulesets[iSrc]; } - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { - tmpSocks[iDst++] = newSocks[iSrc]; - tmpRulesets[iDst++] = pBindRuleset; + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) { + tmpSocks[iDst] = newSocks[iSrc]; + tmpRulesets[iDst] = pBindRuleset; } tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; free(newSocks); -- cgit v1.2.3 From 2deec9a12944129c9dce9ee237d04e0a1a8b082b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Oct 2009 18:02:46 +0200 Subject: bugfix: first UDP listener was incorrectly assigned its ruleset this was a regression of the recent imudp multi-ruleset enhancement bug was not in any released version --- plugins/imudp/imudp.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index fddee513..6ce8039f 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -117,8 +117,9 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) if(udpLstnSocks == NULL) { /* esay, we can just replace it */ udpLstnSocks = newSocks; +RUNLOG_VAR("%d", newSocks[0]); CHKmalloc(udpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (newSocks[0] + 1))); - for(iDst = 1 ; iDst < newSocks[0] ; ++iDst) + for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst) udpRulesets[iDst] = pBindRuleset; } else { /* we need to add them */ -- cgit v1.2.3 From e04e1b50025f5fa9c26abd946190dce8f797d08f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 22 Oct 2009 11:33:38 +0200 Subject: enhanced test environment (including testbench) support for enhancing probability of memory addressing failure by using non-NULL default value for malloced memory (optional, only if requested by configure option). This helps to track down some otherwise undetected issues within the testbench and is expected to be very useful in the future. --- plugins/imudp/imudp.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 6ce8039f..5fe82470 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -118,13 +118,13 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) /* esay, we can just replace it */ udpLstnSocks = newSocks; RUNLOG_VAR("%d", newSocks[0]); - CHKmalloc(udpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (newSocks[0] + 1))); + CHKmalloc(udpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (newSocks[0] + 1))); for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst) udpRulesets[iDst] = pBindRuleset; } else { /* we need to add them */ - tmpSocks = (int*) malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0])); - tmpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0])); + tmpSocks = (int*) MALLOC(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0])); + tmpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0])); if(tmpSocks == NULL || tmpRulesets == NULL) { DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); /* in this case, we discard the new sockets but continue with what we @@ -467,7 +467,7 @@ CODESTARTwillRun iMaxLine = glbl.GetMaxLine(); - CHKmalloc(pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))); + CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char))); finalize_it: ENDwillRun -- cgit v1.2.3 From 8b246de2a587454f9260ff91192d27a2e168ea2d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 12 Nov 2009 17:12:10 +0100 Subject: some light performance enhancement ...by replacing time() call with much faster (at least under linux) gettimeofday() calls. --- plugins/imudp/imudp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 5fe82470..307b684f 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -252,7 +252,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, if(glbl.GetOption_DisallowWarning) { time_t tt; - time(&tt); + datetime.GetTime(&tt); if(tt > ttLastDiscard + 60) { ttLastDiscard = tt; errmsg.LogError(0, NO_ERRCODE, -- cgit v1.2.3