summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imudp/imudp.c117
-rw-r--r--plugins/imuxsock/imuxsock.c14
2 files changed, 109 insertions, 22 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index b9db1875..735042a4 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -32,6 +32,9 @@
#include <errno.h>
#include <unistd.h>
#include <netdb.h>
+#if HAVE_SYS_EPOLL_H
+# include <sys/epoll.h>
+#endif
#include "rsyslog.h"
#include "dirty.h"
#include "net.h"
@@ -113,7 +116,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
} else {
/* we need to add them */
if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) {
- dbgprintf("out of memory trying to allocate udp listen socket array\n");
+ DBGPRINTF("out of memory trying to allocate udp listen socket array\n");
/* in this case, we discard the new sockets but continue with what we
* already have
*/
@@ -205,7 +208,7 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr);
errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet");
}
- ABORT_FINALIZE(RS_RET_ERR);
+ ABORT_FINALIZE(RS_RET_ERR); // this most often is NOT an error, state is not checked by caller!
}
if(lenRcvBuf == 0)
@@ -270,22 +273,19 @@ finalize_it:
}
-/* This function is called to gather input.
- * Note that udpLstnSocks must be non-NULL because otherwise we would not have
- * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
- * rgerhards, 2008-10-07: I have implemented a very simple, yet in most cases probably
- * highly efficient "name caching". Before querying a name, I now check if the name to be
- * queried is the same as the one queried in the last message processed. If that is the
- * case, we can simple re-use the previous value. This algorithm works quite well with
- * few sender, especially if they emit messages in bursts. The more sender and the
- * more intermixed messages arrive, the less this algorithm works, but the overhead
- * is so minimal (a simple memory compare and move) that this does not hurt. Even
- * with a real name lookup cache, this optimization here is useful as it is quicker
- * than even a cache lookup).
+/* This function implements the main reception loop. Depending on the environment,
+ * we either use the traditional (but slower) select() or the Linux-specific epoll()
+ * interface. ./configure settings control which one is used.
+ * rgerhards, 2009-09-09
*/
-BEGINrunInput
+#if HAVE_EPOLL_CREATE1
+#define NUM_EPOLL_EVENTS 10
+rsRetVal rcvMainLoop()
+{
+ DEFiRet;
int maxfds;
int nfds;
+ int efd;
int i;
fd_set readfds;
struct sockaddr_storage frominetPrev;
@@ -293,16 +293,79 @@ BEGINrunInput
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
-CODESTARTrunInput
+ struct epoll_event *udpEPollEvt = NULL;
+ struct epoll_event currEvt[NUM_EPOLL_EVENTS];
+ char errStr[1024];
+
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
- /* this is an endless loop - it is terminated when the thread is
- * signalled to do so. This, however, is handled by the framework,
- * right into the sleep below.
+
+ CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event)));
+
+ efd = epoll_create1(EPOLL_CLOEXEC);
+ if(efd < 0) {
+ DBGPRINTF("epoll_create1() could not create fd\n");
+ // TODO: "good" error message
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+
+ /* fill the epoll set - we need to do this only once, as the set
+ * can not change dyamically.
+ */
+ maxfds = 0;
+ FD_ZERO (&readfds);
+
+ /* Add the UDP listen sockets to the list of read descriptors. */
+ for (i = 0; i < *udpLstnSocks; i++) {
+ if (udpLstnSocks[i+1] != -1) {
+ udpEPollEvt[i].events = EPOLLIN | EPOLLET;
+ udpEPollEvt[i].data.fd = udpLstnSocks[i+1];
+ if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n",
+ udpLstnSocks[i+1], errStr);
+ }
+ }
+ }
+
+ while(1) {
+ /* wait for io to become ready */
+ nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1);
+ DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds);
+
+ for(i = 0 ; i < nfds ; ++i) {
+ processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted,
+ fromHost, fromHostFQDN, fromHostIP);
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+#else /* #if HAVE_EPOLL_CREATE1 */
+/* this is the code for the select() interface */
+rsRetVal rcvMainLoop()
+{
+ DEFiRet;
+ int maxfds;
+ int nfds;
+ int i;
+ fd_set readfds;
+ struct sockaddr_storage frominetPrev;
+ int bIsPermitted;
+ uchar fromHost[NI_MAXHOST];
+ uchar fromHostIP[NI_MAXHOST];
+ uchar fromHostFQDN[NI_MAXHOST];
+
+ /* start "name caching" algo by making sure the previous system indicator
+ * is invalidated.
*/
+ bIsPermitted = 0;
+ memset(&frominetPrev, 0, sizeof(frominetPrev));
+
while(1) {
/* Add the Unix Domain Sockets to the list of read
* descriptors.
@@ -345,7 +408,21 @@ CODESTARTrunInput
/* end of a run, back to loop for next recv() */
}
- return iRet;
+ RETiRet;
+}
+#endif /* #if HAVE_EPOLL_CREATE1 */
+
+/* This function is called to gather input.
+ * Note that udpLstnSocks must be non-NULL because otherwise we would not have
+ * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
+ */
+BEGINrunInput
+CODESTARTrunInput
+ /* this is an endless loop - it is terminated when the thread is
+ * signalled to do so. This, however, is handled by the framework,
+ * right into the sleep below.
+ */
+ iRet = rcvMainLoop();
ENDrunInput
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index c099be56..b8da4966 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -77,6 +77,7 @@ static int startIndexUxLocalSockets; /* process funix from that index on (used t
*/
static int funixParseHost[MAXFUNIX] = { 0, }; /* should parser parse host name? read-only after startup */
static int funixFlags[MAXFUNIX] = { IGNDATE, }; /* should parser parse host name? read-only after startup */
+static int funixCreateSockPath[MAXFUNIX] = { 0, }; /* auto-creation of socket directory? */
static uchar *funixn[MAXFUNIX] = { (uchar*) _PATH_LOG }; /* read-only after startup */
static uchar *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */
static int funixFlowCtl[MAXFUNIX] = { eFLOWCTL_NO_DELAY, }; /* flow control settings for this socket */
@@ -89,6 +90,8 @@ static uchar *pLogSockName = NULL;
static uchar *pLogHostName = NULL; /* host name to use with this socket */
static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is used! */
static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
+#define DFLT_bCreateSockPath 0
+static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? */
/* set the timestamp ignore / not ignore option for the system
@@ -132,6 +135,7 @@ static rsRetVal addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNe
pLogHostName = NULL; /* re-init for next, not freed because funixHName[] now owns it */
funixFlowCtl[nfunix] = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
funixFlags[nfunix] = bIgnoreTimestamp ? IGNDATE : NOFLAG;
+ funixCreateSockPath[nfunix] = bCreateSockPath;
funixn[nfunix++] = pNewVal;
}
else {
@@ -165,7 +169,7 @@ static rsRetVal discardFunixn(void)
}
-static int create_unix_socket(const char *path)
+static int create_unix_socket(const char *path, int bCreatePath)
{
struct sockaddr_un sunx;
int fd;
@@ -177,6 +181,9 @@ static int create_unix_socket(const char *path)
memset(&sunx, 0, sizeof(sunx));
sunx.sun_family = AF_UNIX;
+ if(bCreatePath) {
+ makeFileParentDirs((uchar*)path, strlen(path), 0755, -1, -1, 0);
+ }
(void) strncpy(sunx.sun_path, path, sizeof(sunx.sun_path));
fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd < 0 || bind(fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 ||
@@ -308,7 +315,7 @@ CODESTARTwillRun
/* initialize and return if will run or not */
for (i = startIndexUxLocalSockets ; i < nfunix ; i++) {
- if ((funix[i] = create_unix_socket((char*) funixn[i])) != -1)
+ if ((funix[i] = create_unix_socket((char*) funixn[i], funixCreateSockPath[i])) != -1)
dbgprintf("Opened UNIX socket '%s' (fd %d).\n", funixn[i], funix[i]);
}
@@ -383,6 +390,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
nfunix = 1;
bIgnoreTimestamp = 1;
bUseFlowCtl = 0;
+ bCreateSockPath = DFLT_bCreateSockPath;
return RS_RET_OK;
}
@@ -416,6 +424,8 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &pLogHostName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary,
NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary,
+ NULL, &bCreateSockPath, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,