diff options
Diffstat (limited to 'plugins/imudp')
-rw-r--r-- | plugins/imudp/Makefile.am | 2 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 500 |
2 files changed, 372 insertions, 130 deletions
diff --git a/plugins/imudp/Makefile.am b/plugins/imudp/Makefile.am index 517b1287..bc64b8c8 100644 --- a/plugins/imudp/Makefile.am +++ b/plugins/imudp/Makefile.am @@ -3,4 +3,4 @@ pkglib_LTLIBRARIES = imudp.la imudp_la_SOURCES = imudp.c imudp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) imudp_la_LDFLAGS = -module -avoid-version -imudp_la_LIBADD = +imudp_la_LIBADD = $(IMUDP_LIBS) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index d76f3544..0db6bf9a 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -26,12 +26,19 @@ * A copy of the GPL can be found in the file "COPYING" in this distribution. */ #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <netdb.h> +#if HAVE_SYS_EPOLL_H +# include <sys/epoll.h> +#endif +#ifdef HAVE_SCHED_H +# include <sched.h> +#endif #include "rsyslog.h" #include "dirty.h" #include "net.h" @@ -44,10 +51,12 @@ #include "parser.h" #include "datetime.h" #include "prop.h" +#include "ruleset.h" +#include "statsobj.h" #include "unicode-helper.h" -#include "unlimited_select.h" MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP /* defines */ @@ -58,26 +67,128 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(net) DEFobjCurrIf(datetime) DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) +DEFobjCurrIf(statsobj) + + +static struct lstn_s { + struct lstn_s *next; + int sock; /* socket */ + ruleset_t *pRuleset; /* bound ruleset */ + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) +} *lcnfRoot = NULL, *lcnfLast = NULL; +static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded * This shall prevent remote DoS when the "discard on disallowed sender" * message is configured to be logged on occurance of such a case. */ -static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements - * read-only after init(), but beware of restart! */ + static uchar *pszBindAddr = NULL; /* IP to bind socket to */ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ -// TODO: static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ +static uchar *pszSchedPolicy = NULL; /* scheduling policy string */ +static int iSchedPolicy; /* scheduling policy as SCHED_xxx */ +static int iSchedPrio; /* scheduling priority */ +static int seen_iSchedPrio = 0; /* have we seen scheduling priority in the config file? */ +static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ #define TIME_REQUERY_DFLT 2 static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ /* config settings */ +static rsRetVal check_scheduling_priority(int report_error) +{ + DEFiRet; + +#ifdef HAVE_SCHED_GET_PRIORITY_MAX + if (iSchedPrio < sched_get_priority_min(iSchedPolicy) || + iSchedPrio > sched_get_priority_max(iSchedPolicy)) { + if (report_error) + errmsg.LogError(errno, NO_ERRCODE, + "imudp: scheduling priority %d out of range (%d - %d)" + " for scheduling policy '%s' - ignoring settings", + iSchedPrio, + sched_get_priority_min(iSchedPolicy), + sched_get_priority_max(iSchedPolicy), + pszSchedPolicy); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } +#endif + +finalize_it: + RETiRet; +} + +/* Set scheduling priority in the supplied variable (will be iSchedPrio) + * and record that we have seen the directive (in seen_iSchedPrio). + */ +static rsRetVal set_scheduling_priority(void *pVal, int value) +{ + DEFiRet; + + if (seen_iSchedPrio) { + errmsg.LogError(0, NO_ERRCODE, "directive already seen"); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + *(int *)pVal = value; + seen_iSchedPrio = 1; + if (pszSchedPolicy != NULL) + CHKiRet(check_scheduling_priority(1)); + +finalize_it: + RETiRet; +} + +/* Set scheduling policy in iSchedPolicy */ +static rsRetVal set_scheduling_policy(void *pVal, uchar *pNewVal) +{ + int have_sched_policy = 0; + DEFiRet; + + if (pszSchedPolicy != NULL) { + errmsg.LogError(0, NO_ERRCODE, "directive already seen"); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + *((uchar**)pVal) = pNewVal; /* pVal is pszSchedPolicy */ + if (0) { /* trick to use conditional compilation */ +#ifdef SCHED_FIFO + } else if (!strcasecmp((char*)pszSchedPolicy, "fifo")) { + iSchedPolicy = SCHED_FIFO; + have_sched_policy = 1; +#endif +#ifdef SCHED_RR + } else if (!strcasecmp((char*)pszSchedPolicy, "rr")) { + iSchedPolicy = SCHED_RR; + have_sched_policy = 1; +#endif +#ifdef SCHED_OTHER + } else if (!strcasecmp((char*)pszSchedPolicy, "other")) { + iSchedPolicy = SCHED_OTHER; + have_sched_policy = 1; +#endif + } else { + errmsg.LogError(errno, NO_ERRCODE, + "imudp: invalid scheduling policy '%s' " + "- ignoring setting", pszSchedPolicy); + } + if (have_sched_policy == 0) { + free(pszSchedPolicy); + pszSchedPolicy = NULL; + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + if (seen_iSchedPrio) + CHKiRet(check_scheduling_priority(1)); + +finalize_it: + RETiRet; +} + /* This function is called when a new listener shall be added. It takes * the configured parameters, tries to bind the socket and, if that @@ -89,8 +200,11 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) DEFiRet; uchar *bindAddr; int *newSocks; - int *tmpSocks; - int iSrc, iDst; + int iSrc; + struct lstn_s *newlcnfinfo; + uchar *bindName; + uchar *port; + uchar statname[64]; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -101,48 +215,48 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) bindAddr = NULL; else bindAddr = pszBindAddr; + bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr; - DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", - (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal); + DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, pNewVal); - newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1); + port = (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal; + newSocks = net.create_udp_socket(bindAddr, port, 1); if(newSocks != NULL) { /* we now need to add the new sockets to the existing set */ - if(udpLstnSocks == NULL) { - /* esay, we can just replace it */ - udpLstnSocks = newSocks; - } else { - /* we need to add them */ - if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) { - dbgprintf("out of memory trying to allocate udp listen socket array\n"); - /* in this case, we discard the new sockets but continue with what we - * already have - */ - free(newSocks); - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } else { - /* ready to copy */ - iDst = 1; - for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc) - tmpSocks[iDst++] = udpLstnSocks[iSrc]; - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) - tmpSocks[iDst++] = newSocks[iSrc]; - tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; - free(newSocks); - free(udpLstnSocks); - udpLstnSocks = tmpSocks; - } + /* ready to copy */ + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { + CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s))); + newlcnfinfo->next = NULL; + newlcnfinfo->sock = newSocks[iSrc]; + newlcnfinfo->pRuleset = pBindRuleset; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); + snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats)); + /* link to list. Order must be preserved to take care for + * conflicting matches. + */ + if(lcnfRoot == NULL) + lcnfRoot = newlcnfinfo; + if(lcnfLast == NULL) + lcnfLast = newlcnfinfo; + else + lcnfLast->next = newlcnfinfo; } } finalize_it: free(pNewVal); /* in any case, this is no longer needed */ + free(newSocks); RETiRet; } -#if 0 /* TODO: implement when tehre is time, requires restructure of socket array! */ /* accept a new ruleset to bind. Checks if it exists and complains, if not */ static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) @@ -163,7 +277,6 @@ finalize_it: free(pszName); /* no longer needed */ RETiRet; } -#endif /* This function is a helper to runInput. I have extracted it @@ -181,8 +294,7 @@ finalize_it: * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP) +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { DEFiRet; int iNbrTimeUsed; @@ -196,17 +308,20 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, prop_t *propFromHostIP = NULL; char errStr[1024]; + assert(pThrd != NULL); iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ + if(pThrd->bShallStop == TRUE) + ABORT_FINALIZE(RS_RET_FORCE_TERM); socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet"); } - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller! } if(lenRcvBuf == 0) @@ -214,37 +329,39 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, /* if we reach this point, we had a good receive and can process the packet received */ /* check if we have a different sender than before, if so, we need to query some new values */ - if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { - CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP)); - memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ - /* Here we check if a host is permitted to send us - * syslog messages. If it isn't, we do not further - * process the message but log a warning (if we are - * configured to do this). - * rgerhards, 2005-09-26 - */ - *pbIsPermitted = net.isAllowedSender((uchar*)"UDP", - (struct sockaddr *)&frominet, (char*)fromHostFQDN); - - if(!*pbIsPermitted) { - DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN); - if(glbl.GetOption_DisallowWarning) { - time_t tt; - - time(&tt); - if(tt > ttLastDiscard + 60) { - ttLastDiscard = tt; - errmsg.LogError(0, NO_ERRCODE, - "UDP message from disallowed sender %s discarded", - (char*)fromHost); + if(bDoACLCheck) { + if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { + memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ + /* Here we check if a host is permitted to send us syslog messages. If it isn't, + * we do not further process the message but log a warning (if we are + * configured to do this). However, if the check would require name resolution, + * it is postponed to the main queue. See also my blog post at + * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html + * rgerhards, 2009-11-16 + */ + *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP", + (struct sockaddr *)&frominet, "", 0); + + if(*pbIsPermitted == 0) { + DBGPRINTF("msg is not from an allowed sender\n"); + if(glbl.GetOption_DisallowWarning) { + time_t tt; + datetime.GetTime(&tt); + if(tt > ttLastDiscard + 60) { + ttLastDiscard = tt; + errmsg.LogError(0, NO_ERRCODE, + "UDP message from disallowed sender discarded"); + } } } } + } else { + *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); - if(*pbIsPermitted) { + if(*pbIsPermitted != 0) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { datetime.getCurrTime(&stTime, &ttGenTime); } @@ -252,12 +369,14 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); + MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; - pMsg->bParseHOSTNAME = 1; - MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); - CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; + if(*pbIsPermitted == 2) + pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ + CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); CHKiRet(submitMsg(pMsg)); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } @@ -270,88 +389,195 @@ finalize_it: RETiRet; } +static void set_thread_schedparam(void) +{ + struct sched_param sparam; + + if (pszSchedPolicy != NULL && seen_iSchedPrio == 0) { + errmsg.LogError(0, NO_ERRCODE, + "imudp: scheduling policy set, but without priority - ignoring settings"); + } else if (pszSchedPolicy == NULL && seen_iSchedPrio != 0) { + errmsg.LogError(0, NO_ERRCODE, + "imudp: scheduling priority set, but without policy - ignoring settings"); + } else if (pszSchedPolicy != NULL && seen_iSchedPrio != 0 && + check_scheduling_priority(0) == 0) { +#ifndef HAVE_PTHREAD_SETSCHEDPARAM + errmsg.LogError(0, NO_ERRCODE, + "imudp: cannot set thread scheduling policy, " + "pthread_setschedparam() not available"); +#else + int err; + + memset(&sparam, 0, sizeof sparam); + sparam.sched_priority = iSchedPrio; + dbgprintf("imudp trying to set sched policy to '%s', prio %d\n", + pszSchedPolicy, iSchedPrio); + err = pthread_setschedparam(pthread_self(), iSchedPolicy, &sparam); + if (err != 0) { + errmsg.LogError(err, NO_ERRCODE, "imudp: pthread_setschedparam() failed"); + } +#endif + } -/* This function is called to gather input. - * Note that udpLstnSocks must be non-NULL because otherwise we would not have - * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 - * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably - * highly efficient "name caching". Before querying a name, I now check if the name to be - * queried is the same as the one queried in the last message processed. If that is the - * case, we can simple re-use the previous value. This algorithm works quite well with - * few sender, especially if they emit messages in bursts. The more sender and the - * more intermixed messages arrive, the less this algorithm works, but the overhead - * is so minimal (a simple memory compare and move) that this does not hurt. Even - * with a real name lookup cache, this optimization here is useful as it is quicker - * than even a cache lookup). + if (pszSchedPolicy != NULL) { + free(pszSchedPolicy); + pszSchedPolicy = NULL; + } +} + +/* This function implements the main reception loop. Depending on the environment, + * we either use the traditional (but slower) select() or the Linux-specific epoll() + * interface. ./configure settings control which one is used. + * rgerhards, 2009-09-09 */ -BEGINrunInput - int maxfds; +#if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE) +#define NUM_EPOLL_EVENTS 10 +rsRetVal rcvMainLoop(thrdInfo_t *pThrd) +{ + DEFiRet; int nfds; + int efd; int i; struct sockaddr_storage frominetPrev; int bIsPermitted; - uchar fromHost[NI_MAXHOST]; - uchar fromHostIP[NI_MAXHOST]; - uchar fromHostFQDN[NI_MAXHOST]; -#ifdef USE_UNLIMITED_SELECT - fd_set *pReadfds = malloc(glbl.GetFdSetSize()); -#else - fd_set readfds; - fd_set *pReadfds = &readfds; -#endif + struct epoll_event *udpEPollEvt = NULL; + struct epoll_event currEvt[NUM_EPOLL_EVENTS]; + char errStr[1024]; + struct lstn_s *lstn; + int nLstn; -CODESTARTrunInput /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ + set_thread_schedparam(); bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); - /* this is an endless loop - it is terminated when the thread is - * signalled to do so. This, however, is handled by the framework, - * right into the sleep below. + + /* count num listeners -- do it here in order to avoid inconsistency */ + nLstn = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) + ++nLstn; + CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event))); + +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + DBGPRINTF("imudp uses epoll_create1()\n"); + efd = epoll_create1(EPOLL_CLOEXEC); + if(efd < 0 && errno == ENOSYS) +#endif + { + DBGPRINTF("imudp uses epoll_create()\n"); + efd = epoll_create(NUM_EPOLL_EVENTS); + } + + if(efd < 0) { + DBGPRINTF("epoll_create1() could not create fd\n"); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + + /* fill the epoll set - we need to do this only once, as the set + * can not change dyamically. */ + i = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if(lstn->sock != -1) { + udpEPollEvt[i].events = EPOLLIN | EPOLLET; + udpEPollEvt[i].data.u64 = (long long unsigned) lstn; + if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", + lstn->sock, errStr); + } + } + i++; + } + while(1) { - /* Add the Unix Domain Sockets to the list of read - * descriptors. - * rgerhards 2005-08-01: we must now check if there are - * any local sockets to listen to at all. If the -o option - * is given without -a, we do not need to listen at all.. + /* wait for io to become ready */ + nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); + DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); + + if(pThrd->bShallStop == TRUE) + break; /* terminate input! */ + + for(i = 0 ; i < nfds ; ++i) { + processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted); + } + } + +finalize_it: + if(udpEPollEvt != NULL) + free(udpEPollEvt); + + RETiRet; +} +#else /* #if HAVE_EPOLL_CREATE1 */ +/* this is the code for the select() interface */ +rsRetVal rcvMainLoop(thrdInfo_t *pThrd) +{ + DEFiRet; + int maxfds; + int nfds; + fd_set readfds; + struct sockaddr_storage frominetPrev; + int bIsPermitted; + struct lstn_s *lstn; + + /* start "name caching" algo by making sure the previous system indicator + * is invalidated. + */ + set_thread_schedparam(); + bIsPermitted = 0; + memset(&frominetPrev, 0, sizeof(frominetPrev)); + DBGPRINTF("imudp uses select()\n"); + + while(1) { + /* Add the Unix Domain Sockets to the list of read descriptors. */ maxfds = 0; - FD_ZERO (pReadfds); + FD_ZERO(&readfds); /* Add the UDP listen sockets to the list of read descriptors. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if (lstn->sock != -1) { if(Debug) - net.debugListenInfo(udpLstnSocks[i+1], "UDP"); - FD_SET(udpLstnSocks[i+1], pReadfds); - if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; + net.debugListenInfo(lstn->sock, "UDP"); + FD_SET(lstn->sock, &readfds); + if(lstn->sock>maxfds) maxfds=lstn->sock; } } if(Debug) { dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds); for (nfds = 0; nfds <= maxfds; ++nfds) - if ( FD_ISSET(nfds, pReadfds) ) + if(FD_ISSET(nfds, &readfds)) dbgprintf("%d ", nfds); dbgprintf("\n"); } /* wait for io to become ready */ - nfds = select(maxfds+1, (fd_set *) pReadfds, NULL, NULL, NULL); + nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ - for(i = 0; nfds && i < *udpLstnSocks; i++) { - if(FD_ISSET(udpLstnSocks[i+1], pReadfds)) { - processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, - fromHost, fromHostFQDN, fromHostIP); + for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) { + if(FD_ISSET(lstn->sock, &readfds)) { + processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted); --nfds; /* indicate we have processed one descriptor */ } } /* end of a run, back to loop for next recv() */ } - freeFdSet(pReadfds); - return iRet; + RETiRet; +} +#endif /* #if HAVE_EPOLL_CREATE1 */ + +/* This function is called to gather input. + * Note that sock must be non-NULL because otherwise we would not have + * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 + */ +BEGINrunInput +CODESTARTrunInput + iRet = rcvMainLoop(pThrd); ENDrunInput @@ -364,28 +590,34 @@ CODESTARTwillRun CHKiRet(prop.ConstructFinalize(pInputName)); net.PrintAllowedSenders(1); /* UDP */ + net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */ /* if we could not set up any listners, there is no point in running... */ - if(udpLstnSocks == NULL) + if(lcnfRoot == NULL) { + DBGPRINTF("imudp: no listeners configured, will not run\n"); ABORT_FINALIZE(RS_RET_NO_RUN); + } iMaxLine = glbl.GetMaxLine(); - if((pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char))); finalize_it: ENDwillRun BEGINafterRun + struct lstn_s *lstn, *lstnDel; CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); - if(udpLstnSocks != NULL) { - net.closeUDPListenSockets(udpLstnSocks); - udpLstnSocks = NULL; + for(lstn = lcnfRoot ; lstn != NULL ; ) { + statsobj.Destruct(&(lstn->stats)); + close(lstn->sock); + lstnDel = lstn; + lstn = lstn->next; + free(lstnDel); } + lcnfRoot = lcnfLast = NULL; if(pRcvBuf != NULL) { free(pRcvBuf); pRcvBuf = NULL; @@ -400,15 +632,25 @@ CODESTARTmodExit /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); ENDmodExit +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) @@ -417,10 +659,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a free(pszBindAddr); pszBindAddr = NULL; } - if(udpLstnSocks != NULL) { - net.closeUDPListenSockets(udpLstnSocks); - udpLstnSocks = NULL; - } iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */ return RS_RET_OK; } @@ -432,19 +670,23 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); /* register config file handlers */ - /* TODO: add - but this requires more changes, no time right now... - CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverbindruleset", 0, eCmdHdlrGetWord, + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); - */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverrun", 0, eCmdHdlrGetWord, addListner, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord, + &set_scheduling_policy, &pszSchedPolicy, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt, + &set_scheduling_priority, &iSchedPrio, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt, NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, |