summaryrefslogtreecommitdiffstats
path: root/plugins/imudp/imudp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r--plugins/imudp/imudp.c500
1 files changed, 371 insertions, 129 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index d76f3544..0db6bf9a 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -26,12 +26,19 @@
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <netdb.h>
+#if HAVE_SYS_EPOLL_H
+# include <sys/epoll.h>
+#endif
+#ifdef HAVE_SCHED_H
+# include <sched.h>
+#endif
#include "rsyslog.h"
#include "dirty.h"
#include "net.h"
@@ -44,10 +51,12 @@
#include "parser.h"
#include "datetime.h"
#include "prop.h"
+#include "ruleset.h"
+#include "statsobj.h"
#include "unicode-helper.h"
-#include "unlimited_select.h"
MODULE_TYPE_INPUT
+MODULE_TYPE_NOKEEP
/* defines */
@@ -58,26 +67,128 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(net)
DEFobjCurrIf(datetime)
DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
+DEFobjCurrIf(statsobj)
+
+
+static struct lstn_s {
+ struct lstn_s *next;
+ int sock; /* socket */
+ ruleset_t *pRuleset; /* bound ruleset */
+ statsobj_t *stats; /* listener stats */
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+} *lcnfRoot = NULL, *lcnfLast = NULL;
+static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
static int iMaxLine; /* maximum UDP message size supported */
static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded
* This shall prevent remote DoS when the "discard on disallowed sender"
* message is configured to be logged on occurance of such a case.
*/
-static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements
- * read-only after init(), but beware of restart! */
+
static uchar *pszBindAddr = NULL; /* IP to bind socket to */
static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc
* it so that we can check available memory in willRun() and request
* termination if we can not get it. -- rgerhards, 2007-12-27
*/
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
-// TODO: static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
+static uchar *pszSchedPolicy = NULL; /* scheduling policy string */
+static int iSchedPolicy; /* scheduling policy as SCHED_xxx */
+static int iSchedPrio; /* scheduling priority */
+static int seen_iSchedPrio = 0; /* have we seen scheduling priority in the config file? */
+static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
#define TIME_REQUERY_DFLT 2
static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */
/* config settings */
+static rsRetVal check_scheduling_priority(int report_error)
+{
+ DEFiRet;
+
+#ifdef HAVE_SCHED_GET_PRIORITY_MAX
+ if (iSchedPrio < sched_get_priority_min(iSchedPolicy) ||
+ iSchedPrio > sched_get_priority_max(iSchedPolicy)) {
+ if (report_error)
+ errmsg.LogError(errno, NO_ERRCODE,
+ "imudp: scheduling priority %d out of range (%d - %d)"
+ " for scheduling policy '%s' - ignoring settings",
+ iSchedPrio,
+ sched_get_priority_min(iSchedPolicy),
+ sched_get_priority_max(iSchedPolicy),
+ pszSchedPolicy);
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+#endif
+
+finalize_it:
+ RETiRet;
+}
+
+/* Set scheduling priority in the supplied variable (will be iSchedPrio)
+ * and record that we have seen the directive (in seen_iSchedPrio).
+ */
+static rsRetVal set_scheduling_priority(void *pVal, int value)
+{
+ DEFiRet;
+
+ if (seen_iSchedPrio) {
+ errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ *(int *)pVal = value;
+ seen_iSchedPrio = 1;
+ if (pszSchedPolicy != NULL)
+ CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+ RETiRet;
+}
+
+/* Set scheduling policy in iSchedPolicy */
+static rsRetVal set_scheduling_policy(void *pVal, uchar *pNewVal)
+{
+ int have_sched_policy = 0;
+ DEFiRet;
+
+ if (pszSchedPolicy != NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ *((uchar**)pVal) = pNewVal; /* pVal is pszSchedPolicy */
+ if (0) { /* trick to use conditional compilation */
+#ifdef SCHED_FIFO
+ } else if (!strcasecmp((char*)pszSchedPolicy, "fifo")) {
+ iSchedPolicy = SCHED_FIFO;
+ have_sched_policy = 1;
+#endif
+#ifdef SCHED_RR
+ } else if (!strcasecmp((char*)pszSchedPolicy, "rr")) {
+ iSchedPolicy = SCHED_RR;
+ have_sched_policy = 1;
+#endif
+#ifdef SCHED_OTHER
+ } else if (!strcasecmp((char*)pszSchedPolicy, "other")) {
+ iSchedPolicy = SCHED_OTHER;
+ have_sched_policy = 1;
+#endif
+ } else {
+ errmsg.LogError(errno, NO_ERRCODE,
+ "imudp: invalid scheduling policy '%s' "
+ "- ignoring setting", pszSchedPolicy);
+ }
+ if (have_sched_policy == 0) {
+ free(pszSchedPolicy);
+ pszSchedPolicy = NULL;
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ if (seen_iSchedPrio)
+ CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+ RETiRet;
+}
+
/* This function is called when a new listener shall be added. It takes
* the configured parameters, tries to bind the socket and, if that
@@ -89,8 +200,11 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
DEFiRet;
uchar *bindAddr;
int *newSocks;
- int *tmpSocks;
- int iSrc, iDst;
+ int iSrc;
+ struct lstn_s *newlcnfinfo;
+ uchar *bindName;
+ uchar *port;
+ uchar statname[64];
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -101,48 +215,48 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
bindAddr = NULL;
else
bindAddr = pszBindAddr;
+ bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr;
- DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n",
- (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal);
+ DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, pNewVal);
- newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1);
+ port = (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal;
+ newSocks = net.create_udp_socket(bindAddr, port, 1);
if(newSocks != NULL) {
/* we now need to add the new sockets to the existing set */
- if(udpLstnSocks == NULL) {
- /* esay, we can just replace it */
- udpLstnSocks = newSocks;
- } else {
- /* we need to add them */
- if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) {
- dbgprintf("out of memory trying to allocate udp listen socket array\n");
- /* in this case, we discard the new sockets but continue with what we
- * already have
- */
- free(newSocks);
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- } else {
- /* ready to copy */
- iDst = 1;
- for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc)
- tmpSocks[iDst++] = udpLstnSocks[iSrc];
- for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc)
- tmpSocks[iDst++] = newSocks[iSrc];
- tmpSocks[0] = udpLstnSocks[0] + newSocks[0];
- free(newSocks);
- free(udpLstnSocks);
- udpLstnSocks = tmpSocks;
- }
+ /* ready to copy */
+ for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) {
+ CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s)));
+ newlcnfinfo->next = NULL;
+ newlcnfinfo->sock = newSocks[iSrc];
+ newlcnfinfo->pRuleset = pBindRuleset;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
+ snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats));
+ /* link to list. Order must be preserved to take care for
+ * conflicting matches.
+ */
+ if(lcnfRoot == NULL)
+ lcnfRoot = newlcnfinfo;
+ if(lcnfLast == NULL)
+ lcnfLast = newlcnfinfo;
+ else
+ lcnfLast->next = newlcnfinfo;
}
}
finalize_it:
free(pNewVal); /* in any case, this is no longer needed */
+ free(newSocks);
RETiRet;
}
-#if 0 /* TODO: implement when tehre is time, requires restructure of socket array! */
/* accept a new ruleset to bind. Checks if it exists and complains, if not */
static rsRetVal
setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
@@ -163,7 +277,6 @@ finalize_it:
free(pszName); /* no longer needed */
RETiRet;
}
-#endif
/* This function is a helper to runInput. I have extracted it
@@ -181,8 +294,7 @@ finalize_it:
* on scheduling order. -- rgerhards, 2008-10-02
*/
static inline rsRetVal
-processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
- uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP)
+processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
DEFiRet;
int iNbrTimeUsed;
@@ -196,17 +308,20 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
prop_t *propFromHostIP = NULL;
char errStr[1024];
+ assert(pThrd != NULL);
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a bad receive, done below in the body */
+ if(pThrd->bShallStop == TRUE)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
socklen = sizeof(struct sockaddr_storage);
- lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
+ lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
if(lenRcvBuf < 0) {
if(errno != EINTR && errno != EAGAIN) {
rs_strerror_r(errno, errStr, sizeof(errStr));
DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr);
errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet");
}
- ABORT_FINALIZE(RS_RET_ERR);
+ ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller!
}
if(lenRcvBuf == 0)
@@ -214,37 +329,39 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
/* if we reach this point, we had a good receive and can process the packet received */
/* check if we have a different sender than before, if so, we need to query some new values */
- if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) {
- CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP));
- memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */
- /* Here we check if a host is permitted to send us
- * syslog messages. If it isn't, we do not further
- * process the message but log a warning (if we are
- * configured to do this).
- * rgerhards, 2005-09-26
- */
- *pbIsPermitted = net.isAllowedSender((uchar*)"UDP",
- (struct sockaddr *)&frominet, (char*)fromHostFQDN);
-
- if(!*pbIsPermitted) {
- DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN);
- if(glbl.GetOption_DisallowWarning) {
- time_t tt;
-
- time(&tt);
- if(tt > ttLastDiscard + 60) {
- ttLastDiscard = tt;
- errmsg.LogError(0, NO_ERRCODE,
- "UDP message from disallowed sender %s discarded",
- (char*)fromHost);
+ if(bDoACLCheck) {
+ if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) {
+ memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */
+ /* Here we check if a host is permitted to send us syslog messages. If it isn't,
+ * we do not further process the message but log a warning (if we are
+ * configured to do this). However, if the check would require name resolution,
+ * it is postponed to the main queue. See also my blog post at
+ * http://blog.gerhards.net/2009/11/acls-imudp-and-accepting-messages.html
+ * rgerhards, 2009-11-16
+ */
+ *pbIsPermitted = net.isAllowedSender2((uchar*)"UDP",
+ (struct sockaddr *)&frominet, "", 0);
+
+ if(*pbIsPermitted == 0) {
+ DBGPRINTF("msg is not from an allowed sender\n");
+ if(glbl.GetOption_DisallowWarning) {
+ time_t tt;
+ datetime.GetTime(&tt);
+ if(tt > ttLastDiscard + 60) {
+ ttLastDiscard = tt;
+ errmsg.LogError(0, NO_ERRCODE,
+ "UDP message from disallowed sender discarded");
+ }
}
}
}
+ } else {
+ *pbIsPermitted = 1; /* no check -> everything permitted */
}
- DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf);
+ DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
- if(*pbIsPermitted) {
+ if(*pbIsPermitted != 0) {
if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
datetime.getCurrTime(&stTime, &ttGenTime);
}
@@ -252,12 +369,14 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
MsgSetInputName(pMsg, pInputName);
+ MsgSetRuleset(pMsg, lstn->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
- pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
- pMsg->bParseHOSTNAME = 1;
- MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
- CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL;
+ if(*pbIsPermitted == 2)
+ pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
+ CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
CHKiRet(submitMsg(pMsg));
+ STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit);
}
}
@@ -270,88 +389,195 @@ finalize_it:
RETiRet;
}
+static void set_thread_schedparam(void)
+{
+ struct sched_param sparam;
+
+ if (pszSchedPolicy != NULL && seen_iSchedPrio == 0) {
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: scheduling policy set, but without priority - ignoring settings");
+ } else if (pszSchedPolicy == NULL && seen_iSchedPrio != 0) {
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: scheduling priority set, but without policy - ignoring settings");
+ } else if (pszSchedPolicy != NULL && seen_iSchedPrio != 0 &&
+ check_scheduling_priority(0) == 0) {
+#ifndef HAVE_PTHREAD_SETSCHEDPARAM
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: cannot set thread scheduling policy, "
+ "pthread_setschedparam() not available");
+#else
+ int err;
+
+ memset(&sparam, 0, sizeof sparam);
+ sparam.sched_priority = iSchedPrio;
+ dbgprintf("imudp trying to set sched policy to '%s', prio %d\n",
+ pszSchedPolicy, iSchedPrio);
+ err = pthread_setschedparam(pthread_self(), iSchedPolicy, &sparam);
+ if (err != 0) {
+ errmsg.LogError(err, NO_ERRCODE, "imudp: pthread_setschedparam() failed");
+ }
+#endif
+ }
-/* This function is called to gather input.
- * Note that udpLstnSocks must be non-NULL because otherwise we would not have
- * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
- * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably
- * highly efficient "name caching". Before querying a name, I now check if the name to be
- * queried is the same as the one queried in the last message processed. If that is the
- * case, we can simple re-use the previous value. This algorithm works quite well with
- * few sender, especially if they emit messages in bursts. The more sender and the
- * more intermixed messages arrive, the less this algorithm works, but the overhead
- * is so minimal (a simple memory compare and move) that this does not hurt. Even
- * with a real name lookup cache, this optimization here is useful as it is quicker
- * than even a cache lookup).
+ if (pszSchedPolicy != NULL) {
+ free(pszSchedPolicy);
+ pszSchedPolicy = NULL;
+ }
+}
+
+/* This function implements the main reception loop. Depending on the environment,
+ * we either use the traditional (but slower) select() or the Linux-specific epoll()
+ * interface. ./configure settings control which one is used.
+ * rgerhards, 2009-09-09
*/
-BEGINrunInput
- int maxfds;
+#if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE)
+#define NUM_EPOLL_EVENTS 10
+rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
+{
+ DEFiRet;
int nfds;
+ int efd;
int i;
struct sockaddr_storage frominetPrev;
int bIsPermitted;
- uchar fromHost[NI_MAXHOST];
- uchar fromHostIP[NI_MAXHOST];
- uchar fromHostFQDN[NI_MAXHOST];
-#ifdef USE_UNLIMITED_SELECT
- fd_set *pReadfds = malloc(glbl.GetFdSetSize());
-#else
- fd_set readfds;
- fd_set *pReadfds = &readfds;
-#endif
+ struct epoll_event *udpEPollEvt = NULL;
+ struct epoll_event currEvt[NUM_EPOLL_EVENTS];
+ char errStr[1024];
+ struct lstn_s *lstn;
+ int nLstn;
-CODESTARTrunInput
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
+ set_thread_schedparam();
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
- /* this is an endless loop - it is terminated when the thread is
- * signalled to do so. This, however, is handled by the framework,
- * right into the sleep below.
+
+ /* count num listeners -- do it here in order to avoid inconsistency */
+ nLstn = 0;
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next)
+ ++nLstn;
+ CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event)));
+
+#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ DBGPRINTF("imudp uses epoll_create1()\n");
+ efd = epoll_create1(EPOLL_CLOEXEC);
+ if(efd < 0 && errno == ENOSYS)
+#endif
+ {
+ DBGPRINTF("imudp uses epoll_create()\n");
+ efd = epoll_create(NUM_EPOLL_EVENTS);
+ }
+
+ if(efd < 0) {
+ DBGPRINTF("epoll_create1() could not create fd\n");
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+
+ /* fill the epoll set - we need to do this only once, as the set
+ * can not change dyamically.
*/
+ i = 0;
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+ if(lstn->sock != -1) {
+ udpEPollEvt[i].events = EPOLLIN | EPOLLET;
+ udpEPollEvt[i].data.u64 = (long long unsigned) lstn;
+ if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n",
+ lstn->sock, errStr);
+ }
+ }
+ i++;
+ }
+
while(1) {
- /* Add the Unix Domain Sockets to the list of read
- * descriptors.
- * rgerhards 2005-08-01: we must now check if there are
- * any local sockets to listen to at all. If the -o option
- * is given without -a, we do not need to listen at all..
+ /* wait for io to become ready */
+ nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1);
+ DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds);
+
+ if(pThrd->bShallStop == TRUE)
+ break; /* terminate input! */
+
+ for(i = 0 ; i < nfds ; ++i) {
+ processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted);
+ }
+ }
+
+finalize_it:
+ if(udpEPollEvt != NULL)
+ free(udpEPollEvt);
+
+ RETiRet;
+}
+#else /* #if HAVE_EPOLL_CREATE1 */
+/* this is the code for the select() interface */
+rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
+{
+ DEFiRet;
+ int maxfds;
+ int nfds;
+ fd_set readfds;
+ struct sockaddr_storage frominetPrev;
+ int bIsPermitted;
+ struct lstn_s *lstn;
+
+ /* start "name caching" algo by making sure the previous system indicator
+ * is invalidated.
+ */
+ set_thread_schedparam();
+ bIsPermitted = 0;
+ memset(&frominetPrev, 0, sizeof(frominetPrev));
+ DBGPRINTF("imudp uses select()\n");
+
+ while(1) {
+ /* Add the Unix Domain Sockets to the list of read descriptors.
*/
maxfds = 0;
- FD_ZERO (pReadfds);
+ FD_ZERO(&readfds);
/* Add the UDP listen sockets to the list of read descriptors. */
- for (i = 0; i < *udpLstnSocks; i++) {
- if (udpLstnSocks[i+1] != -1) {
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+ if (lstn->sock != -1) {
if(Debug)
- net.debugListenInfo(udpLstnSocks[i+1], "UDP");
- FD_SET(udpLstnSocks[i+1], pReadfds);
- if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1];
+ net.debugListenInfo(lstn->sock, "UDP");
+ FD_SET(lstn->sock, &readfds);
+ if(lstn->sock>maxfds) maxfds=lstn->sock;
}
}
if(Debug) {
dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds);
for (nfds = 0; nfds <= maxfds; ++nfds)
- if ( FD_ISSET(nfds, pReadfds) )
+ if(FD_ISSET(nfds, &readfds))
dbgprintf("%d ", nfds);
dbgprintf("\n");
}
/* wait for io to become ready */
- nfds = select(maxfds+1, (fd_set *) pReadfds, NULL, NULL, NULL);
+ nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL);
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
- for(i = 0; nfds && i < *udpLstnSocks; i++) {
- if(FD_ISSET(udpLstnSocks[i+1], pReadfds)) {
- processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted,
- fromHost, fromHostFQDN, fromHostIP);
+ for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) {
+ if(FD_ISSET(lstn->sock, &readfds)) {
+ processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted);
--nfds; /* indicate we have processed one descriptor */
}
}
/* end of a run, back to loop for next recv() */
}
- freeFdSet(pReadfds);
- return iRet;
+ RETiRet;
+}
+#endif /* #if HAVE_EPOLL_CREATE1 */
+
+/* This function is called to gather input.
+ * Note that sock must be non-NULL because otherwise we would not have
+ * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
+ */
+BEGINrunInput
+CODESTARTrunInput
+ iRet = rcvMainLoop(pThrd);
ENDrunInput
@@ -364,28 +590,34 @@ CODESTARTwillRun
CHKiRet(prop.ConstructFinalize(pInputName));
net.PrintAllowedSenders(1); /* UDP */
+ net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */
/* if we could not set up any listners, there is no point in running... */
- if(udpLstnSocks == NULL)
+ if(lcnfRoot == NULL) {
+ DBGPRINTF("imudp: no listeners configured, will not run\n");
ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
iMaxLine = glbl.GetMaxLine();
- if((pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pRcvBuf = MALLOC((iMaxLine + 1) * sizeof(char)));
finalize_it:
ENDwillRun
BEGINafterRun
+ struct lstn_s *lstn, *lstnDel;
CODESTARTafterRun
/* do cleanup here */
net.clearAllowedSenders((uchar*)"UDP");
- if(udpLstnSocks != NULL) {
- net.closeUDPListenSockets(udpLstnSocks);
- udpLstnSocks = NULL;
+ for(lstn = lcnfRoot ; lstn != NULL ; ) {
+ statsobj.Destruct(&(lstn->stats));
+ close(lstn->sock);
+ lstnDel = lstn;
+ lstn = lstn->next;
+ free(lstnDel);
}
+ lcnfRoot = lcnfLast = NULL;
if(pRcvBuf != NULL) {
free(pRcvBuf);
pRcvBuf = NULL;
@@ -400,15 +632,25 @@ CODESTARTmodExit
/* release what we no longer need */
objRelease(errmsg, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(datetime, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
objRelease(net, LM_NET_FILENAME);
ENDmodExit
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
@@ -417,10 +659,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
free(pszBindAddr);
pszBindAddr = NULL;
}
- if(udpLstnSocks != NULL) {
- net.closeUDPListenSockets(udpLstnSocks);
- udpLstnSocks = NULL;
- }
iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */
return RS_RET_OK;
}
@@ -432,19 +670,23 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
/* register config file handlers */
- /* TODO: add - but this requires more changes, no time right now...
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverbindruleset", 0, eCmdHdlrGetWord,
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord,
setRuleset, NULL, STD_LOADABLE_MODULE_ID));
- */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverrun", 0, eCmdHdlrGetWord,
addListner, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord,
NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord,
+ &set_scheduling_policy, &pszSchedPolicy, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt,
+ &set_scheduling_priority, &iSchedPrio, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt,
NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,