From c82f2781d2ed372e918ea5b5529ead57ff95fc2a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 18 Nov 2009 14:16:08 +0100 Subject: added skeleton for supporting epoll() API in netstream subsystem --- doc/manual.html | 2 +- doc/src/tls.dia | Bin 4656 -> 5173 bytes runtime/Makefile.am | 9 ++- runtime/nsd.h | 9 +++ runtime/nsdpoll_ptcp.c | 120 ++++++++++++++++++++++++++++++ runtime/nsdpoll_ptcp.h | 41 +++++++++++ runtime/nspoll.c | 196 +++++++++++++++++++++++++++++++++++++++++++++++++ runtime/nspoll.h | 55 ++++++++++++++ runtime/rsyslog.h | 5 ++ 9 files changed, 434 insertions(+), 3 deletions(-) create mode 100644 runtime/nsdpoll_ptcp.c create mode 100644 runtime/nsdpoll_ptcp.h create mode 100644 runtime/nspoll.c create mode 100644 runtime/nspoll.h diff --git a/doc/manual.html b/doc/manual.html index c24d1080..480bf56c 100644 --- a/doc/manual.html +++ b/doc/manual.html @@ -19,7 +19,7 @@ rsyslog support available directly from the source!

Please visit the rsyslog sponsor's page to honor the project sponsors or become one yourself! We are very grateful for any help towards the project goals.

-

This documentation is for version 5.5.0 (devel branch) of rsyslog. +

This documentation is for version 5.5.1 (devel branch) of rsyslog. Visit the rsyslog status page to obtain current version information and project status.

If you like rsyslog, you might diff --git a/doc/src/tls.dia b/doc/src/tls.dia index 77e5d185..91b0d706 100644 Binary files a/doc/src/tls.dia and b/doc/src/tls.dia differ diff --git a/runtime/Makefile.am b/runtime/Makefile.am index 7896e71d..9047c83d 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -137,7 +137,10 @@ lmnet_la_LDFLAGS = -module -avoid-version lmnet_la_LIBADD = # network stream master class and stream factory -lmnetstrms_la_SOURCES = netstrms.c netstrms.h netstrm.c netstrm.h nssel.c nssel.h +lmnetstrms_la_SOURCES = netstrms.c netstrms.h \ + netstrm.c netstrm.h \ + nssel.c nssel.h \ + nspoll.c nspoll.h lmnetstrms_la_CPPFLAGS = $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) lmnetstrms_la_LDFLAGS = -module -avoid-version lmnetstrms_la_LIBADD = @@ -153,7 +156,9 @@ lmstrmsrv_la_LIBADD = # plain tcp driver - main driver pkglib_LTLIBRARIES += lmnsd_ptcp.la -lmnsd_ptcp_la_SOURCES = nsd_ptcp.c nsd_ptcp.h nsdsel_ptcp.c nsdsel_ptcp.h +lmnsd_ptcp_la_SOURCES = nsd_ptcp.c nsd_ptcp.h \ + nsdsel_ptcp.c nsdsel_ptcp.h \ + nsdpoll_ptcp.c nsdpoll_ptcp.h lmnsd_ptcp_la_CPPFLAGS = $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) lmnsd_ptcp_la_LDFLAGS = -module -avoid-version lmnsd_ptcp_la_LIBADD = diff --git a/runtime/nsd.h b/runtime/nsd.h index 8668c934..bcd5a926 100644 --- a/runtime/nsd.h +++ b/runtime/nsd.h @@ -87,4 +87,13 @@ BEGINinterface(nsdsel) /* name must also be changed in ENDinterface macro! */ ENDinterface(nsdsel) #define nsdselCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ +/* interface for the epoll call */ +BEGINinterface(nsdpoll) /* name must also be changed in ENDinterface macro! */ + rsRetVal (*Construct)(nsdpoll_t **ppThis); + rsRetVal (*Destruct)(nsdpoll_t **ppThis); + rsRetVal (*Ctl)(nsdpoll_t *pThis, int fd, int op, epoll_event_t *event); + rsRetVal (*Wait)(nsdpoll_t *pThis, epoll_event_t *events, int maxevents, int timeout); +ENDinterface(nsdpoll) +#define nsdpollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ + #endif /* #ifndef INCLUDED_NSD_H */ diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c new file mode 100644 index 00000000..b6b7af77 --- /dev/null +++ b/runtime/nsdpoll_ptcp.c @@ -0,0 +1,120 @@ +/* nsdpoll_ptcp.c + * + * An implementation of the nsd epoll() interface for plain tcp sockets. + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ +#include "config.h" + +#include +#include +#include +#include + +#include "rsyslog.h" +#include "module-template.h" +#include "obj.h" +#include "errmsg.h" +#include "nsd_ptcp.h" +#include "nsdpoll_ptcp.h" +#include "unlimited_select.h" + +/* static data */ +DEFobjStaticHelpers +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) + + +/* Standard-Constructor + */ +BEGINobjConstruct(nsdpoll_ptcp) /* be sure to specify the object type also in END macro! */ +ENDobjConstruct(nsdpoll_ptcp) + + +/* destructor for the nsdpoll_ptcp object */ +BEGINobjDestruct(nsdpoll_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(nsdpoll_ptcp) +ENDobjDestruct(nsdpoll_ptcp) + + +/* Add a socket to the select set */ +static rsRetVal +Ctl(nsdpoll_t *pThis, int fd, int op, epoll_event_t *event) { + DEFiRet; + RETiRet; +} + + +/* Wait for io to become ready. + */ +static rsRetVal +Wait(nsdpoll_t *pThis, epoll_event_t *events, int maxevents, int timeout) { + DEFiRet; + RETiRet; +} + + +/* ------------------------------ end support for the epoll() interface ------------------------------ */ + + +/* queryInterface function */ +BEGINobjQueryInterface(nsdpoll_ptcp) +CODESTARTobjQueryInterface(nsdpoll_ptcp) + if(pIf->ifVersion != nsdCURR_IF_VERSION) {/* check for current version, increment on each change */ + ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED); + } + + /* ok, we have the right interface, so let's fill it + * Please note that we may also do some backwards-compatibility + * work here (if we can support an older interface version - that, + * of course, also affects the "if" above). + */ + pIf->Construct = (rsRetVal(*)(nsdpoll_t**)) nsdpoll_ptcpConstruct; + pIf->Destruct = (rsRetVal(*)(nsdpoll_t**)) nsdpoll_ptcpDestruct; + pIf->Ctl = Ctl; + pIf->Wait = Wait; +finalize_it: +ENDobjQueryInterface(nsdpoll_ptcp) + + +/* exit our class + */ +BEGINObjClassExit(nsdpoll_ptcp, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */ +CODESTARTObjClassExit(nsdpoll_ptcp) + /* release objects we no longer need */ + objRelease(glbl, CORE_COMPONENT); + objRelease(errmsg, CORE_COMPONENT); +ENDObjClassExit(nsdpoll_ptcp) + + +/* Initialize the nsdpoll_ptcp class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-02-19 + */ +BEGINObjClassInit(nsdpoll_ptcp, 1, OBJ_IS_CORE_MODULE) /* class, version */ + /* request objects we use */ + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + + /* set our own handlers */ +ENDObjClassInit(nsdpoll_ptcp) +/* vi:set ai: + */ diff --git a/runtime/nsdpoll_ptcp.h b/runtime/nsdpoll_ptcp.h new file mode 100644 index 00000000..36d39da7 --- /dev/null +++ b/runtime/nsdpoll_ptcp.h @@ -0,0 +1,41 @@ +/* An implementation of the nsd poll interface for plain tcp sockets. + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#ifndef INCLUDED_NSDPOLL_PTCP_H +#define INCLUDED_NSDPOLL_PTCP_H + +#include "nsd.h" +typedef nsdpoll_if_t nsdpoll_ptcp_if_t; /* we just *implement* this interface */ + +/* the nsdpoll_ptcp object */ +struct nsdpoll_ptcp_s { + BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ +}; + +/* interface is defined in nsd.h, we just implement it! */ +#define nsdpoll_ptcpCURR_IF_VERSION nsdCURR_IF_VERSION + +/* prototypes */ +PROTOTYPEObj(nsdpoll_ptcp); + +#endif /* #ifndef INCLUDED_NSDPOLL_PTCP_H */ diff --git a/runtime/nspoll.c b/runtime/nspoll.c new file mode 100644 index 00000000..ff0b1e92 --- /dev/null +++ b/runtime/nspoll.c @@ -0,0 +1,196 @@ +/* nspoll.c + * + * This is an io waiter interface utilizing the much-more-efficient poll/epoll API. + * Note that it may not always be available for a given driver. If so, that is reported + * back to the upper peer which then should consult a nssel-based io waiter. + * + * Work on this module begun 2009-11-18 by Rainer Gerhards. + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ +#include "config.h" + +#include "rsyslog.h" +#include +#include +#include +#include +#include + +#include "rsyslog.h" +#include "obj.h" +#include "module-template.h" +#include "netstrm.h" +#include "nspoll.h" + +/* static data */ +DEFobjStaticHelpers +DEFobjCurrIf(glbl) + + +/* load our low-level driver. This must be done before any + * driver-specific functions (allmost all...) can be carried + * out. Note that the driver's .ifIsLoaded is correctly + * initialized by calloc() and we depend on that. Please note that + * we do some name-mangeling. We know that each nsd driver also needs + * a nspoll driver. So we simply append "sel" to the nsd driver name: This, + * of course, means that the driver name must match these rules, but that + * shouldn't be a real problem. + * WARNING: this code is mostly identical to similar code in + * netstrms.c - TODO: abstract it and move it to some common place. + * rgerhards, 2008-04-28 + */ +static rsRetVal +loadDrvr(nspoll_t *pThis) +{ + DEFiRet; + uchar *pBaseDrvrName; + uchar szDrvrName[48]; /* 48 shall be large enough */ + + pBaseDrvrName = pThis->pBaseDrvrName; + if(pBaseDrvrName == NULL) /* if no drvr name is set, use system default */ + pBaseDrvrName = glbl.GetDfltNetstrmDrvr(); + if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmnspoll_%s", pBaseDrvrName) == sizeof(szDrvrName)) + ABORT_FINALIZE(RS_RET_DRVRNAME_TOO_LONG); + CHKmalloc(pThis->pDrvrName = (uchar*) strdup((char*)szDrvrName)); + + pThis->Drvr.ifVersion = nsdCURR_IF_VERSION; + /* The pDrvrName+2 below is a hack to obtain the object name. It + * safes us to have yet another variable with the name without "lm" in + * front of it. If we change the module load interface, we may re-think + * about this hack, but for the time being it is efficient and clean + * enough. -- rgerhards, 2008-04-18 + */ + CHKiRet(obj.UseObj(__FILE__, szDrvrName+2, DONT_LOAD_LIB, (void*) &pThis->Drvr)); + +finalize_it: + if(iRet != RS_RET_OK) { + if(pThis->pDrvrName != NULL) + free(pThis->pDrvrName); + pThis->pDrvrName = NULL; + } + RETiRet; +} + + +/* Standard-Constructor */ +BEGINobjConstruct(nspoll) /* be sure to specify the object type also in END macro! */ +ENDobjConstruct(nspoll) + + +/* destructor for the nspoll object */ +BEGINobjDestruct(nspoll) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(nspoll) + if(pThis->pDrvrData != NULL) + pThis->Drvr.Destruct(&pThis->pDrvrData); + + /* and now we must release our driver, if we got one. We use the presence of + * a driver name string as load indicator (because we also need that string + * to release the driver + */ + if(pThis->pDrvrName != NULL) { + obj.ReleaseObj(__FILE__, pThis->pDrvrName+2, DONT_LOAD_LIB, (void*) &pThis->Drvr); + free(pThis->pDrvrName); + } +ENDobjDestruct(nspoll) + + +/* ConstructionFinalizer */ +static rsRetVal +ConstructFinalize(nspoll_t *pThis) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, nspoll); + CHKiRet(loadDrvr(pThis)); + CHKiRet(pThis->Drvr.Construct(&pThis->pDrvrData)); +finalize_it: + RETiRet; +} + + +/* Carries out the actual wait (all done in lower layers) + */ +static rsRetVal +Wait(nspoll_t *pThis, epoll_event_t *events, int maxevents, int timeout) { + DEFiRet; + ISOBJ_TYPE_assert(pThis, nspoll); + assert(events != NULL); + assert(maxevents > 0); + iRet = pThis->Drvr.Wait(pThis->pDrvrData, events, maxevents, timeout); + RETiRet; +} + + +/* semantics like the epoll_ctl() function, does the same thing. + * rgerhards, 2009-11-18 + */ +static rsRetVal +Ctl(nspoll_t *pThis, int fd, int op, epoll_event_t *event) { + DEFiRet; + ISOBJ_TYPE_assert(pThis, nspoll); + assert(event != NULL); + iRet = pThis->Drvr.Ctl(pThis->pDrvrData, fd, op, event); + RETiRet; +} + + +/* queryInterface function */ +BEGINobjQueryInterface(nspoll) +CODESTARTobjQueryInterface(nspoll) + if(pIf->ifVersion != nspollCURR_IF_VERSION) {/* check for current version, increment on each change */ + ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED); + } + + /* ok, we have the right interface, so let's fill it + * Please note that we may also do some backwards-compatibility + * work here (if we can support an older interface version - that, + * of course, also affects the "if" above). + */ + pIf->Construct = nspollConstruct; + pIf->ConstructFinalize = ConstructFinalize; + pIf->Destruct = nspollDestruct; + pIf->Wait = Wait; + pIf->Ctl = Ctl; +finalize_it: +ENDobjQueryInterface(nspoll) + + +/* exit our class + */ +BEGINObjClassExit(nspoll, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */ +CODESTARTObjClassExit(nspoll) + /* release objects we no longer need */ + objRelease(glbl, CORE_COMPONENT); +ENDObjClassExit(nspoll) + + +/* Initialize the nspoll class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-02-19 + */ +BEGINObjClassInit(nspoll, 1, OBJ_IS_CORE_MODULE) /* class, version */ + /* request objects we use */ + CHKiRet(objUse(glbl, CORE_COMPONENT)); + + /* set our own handlers */ +ENDObjClassInit(nspoll) +/* vi:set ai: + */ diff --git a/runtime/nspoll.h b/runtime/nspoll.h new file mode 100644 index 00000000..19efd38b --- /dev/null +++ b/runtime/nspoll.h @@ -0,0 +1,55 @@ +/* Definitions for the nspoll io activity waiter + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#ifndef INCLUDED_NSPOLL_H +#define INCLUDED_NSPOLL_H + +#include "netstrms.h" + +/* the nspoll object */ +struct nspoll_s { + BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ + nsd_t *pDrvrData; /**< the driver's data elements */ + uchar *pBaseDrvrName; /**< nsd base driver name to use, or NULL if system default */ + uchar *pDrvrName; /**< full base driver name (set when driver is loaded) */ + nsdpoll_if_t Drvr; /**< our stream driver */ +}; + + +/* interface */ +BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */ + rsRetVal (*Construct)(nspoll_t **ppThis); + rsRetVal (*ConstructFinalize)(nspoll_t *pThis); + rsRetVal (*Destruct)(nspoll_t **ppThis); + rsRetVal (*Ctl)(nspoll_t *pThis, int fd, int op, epoll_event_t *event); + rsRetVal (*Wait)(nspoll_t *pThis, epoll_event_t *events, int maxevents, int timeout); +ENDinterface(nspoll) +#define nspollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ + +/* prototypes */ +PROTOTYPEObj(nspoll); + +/* the name of our library binary */ +#define LM_NSPOLL_FILENAME LM_NETSTRMS_FILENAME + +#endif /* #ifndef INCLUDED_NSPOLL_H */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index a5e8cf5d..c5f7a9ea 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -107,6 +107,7 @@ typedef struct NetAddr netAddr_t; typedef struct netstrms_s netstrms_t; typedef struct netstrm_s netstrm_t; typedef struct nssel_s nssel_t; +typedef struct nspoll_s nspoll_t; typedef enum nsdsel_waitOp_e nsdsel_waitOp_t; typedef struct nsd_ptcp_s nsd_ptcp_t; typedef struct nsd_gtls_s nsd_gtls_t; @@ -114,9 +115,11 @@ typedef struct nsd_gsspi_s nsd_gsspi_t; typedef struct nsd_nss_s nsd_nss_t; typedef struct nsdsel_ptcp_s nsdsel_ptcp_t; typedef struct nsdsel_gtls_s nsdsel_gtls_t; +typedef struct nsdpoll_ptcp_s nsdpoll_ptcp_t; typedef struct wti_s wti_t; typedef obj_t nsd_t; typedef obj_t nsdsel_t; +typedef obj_t nsdpoll_t; typedef struct msg msg_t; typedef struct queue_s qqueue_t; typedef struct prop_s prop_t; @@ -148,6 +151,8 @@ typedef unsigned int u_int32_t; /* TODO: is this correct? */ typedef int socklen_t; #endif +typedef struct epoll_event epoll_event_t; + typedef char bool; /* I intentionally use char, to keep it slim so that many fit into the CPU cache! */ /* settings for flow control -- cgit v1.2.3 From 48ac0ffecdf244d04ed6cb4f4c560aeea5ba7f23 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 18 Nov 2009 18:40:14 +0100 Subject: milestone commit: first driver layer call done ... does not really run. We can now call into the epoll driver, but not handle epoll(). The driver also needs more modifications. --- doc/src/tls.dia | Bin 5173 -> 5201 bytes runtime/netstrms.c | 3 ++ runtime/nsd.h | 4 +- runtime/nsd_ptcp.c | 2 + runtime/nsdpoll_ptcp.c | 127 +++++++++++++++++++++++++++++++++++++++++++++++-- runtime/nsdpoll_ptcp.h | 14 ++++++ runtime/nspoll.c | 18 +++---- runtime/nspoll.h | 14 +++++- runtime/nssel.c | 1 + runtime/rsyslog.h | 4 ++ tcpsrv.c | 118 +++++++++++++++++++++++++++++++++++++++++++-- 11 files changed, 285 insertions(+), 20 deletions(-) diff --git a/doc/src/tls.dia b/doc/src/tls.dia index 91b0d706..d7c9811d 100644 Binary files a/doc/src/tls.dia and b/doc/src/tls.dia differ diff --git a/runtime/netstrms.c b/runtime/netstrms.c index 6b28e7ea..e9ff2568 100644 --- a/runtime/netstrms.c +++ b/runtime/netstrms.c @@ -36,6 +36,7 @@ #include "nsd.h" #include "netstrm.h" #include "nssel.h" +#include "nspoll.h" #include "netstrms.h" MODULE_TYPE_LIB @@ -304,6 +305,7 @@ ENDObjClassInit(netstrms) BEGINmodExit CODESTARTmodExit nsselClassExit(); + nspollClassExit(); netstrmsClassExit(); netstrmClassExit(); /* we use this object, so we must exit it after we are finished */ ENDmodExit @@ -322,6 +324,7 @@ CODESTARTmodInit /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(netstrmClassInit(pModInfo)); CHKiRet(nsselClassInit(pModInfo)); + CHKiRet(nspollClassInit(pModInfo)); CHKiRet(netstrmsClassInit(pModInfo)); ENDmodInit /* vi:set ai: diff --git a/runtime/nsd.h b/runtime/nsd.h index bcd5a926..fc34ad6e 100644 --- a/runtime/nsd.h +++ b/runtime/nsd.h @@ -91,8 +91,8 @@ ENDinterface(nsdsel) BEGINinterface(nsdpoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(nsdpoll_t **ppThis); rsRetVal (*Destruct)(nsdpoll_t **ppThis); - rsRetVal (*Ctl)(nsdpoll_t *pThis, int fd, int op, epoll_event_t *event); - rsRetVal (*Wait)(nsdpoll_t *pThis, epoll_event_t *events, int maxevents, int timeout); + rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int op); + rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr); ENDinterface(nsdpoll) #define nsdpollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/runtime/nsd_ptcp.c b/runtime/nsd_ptcp.c index fe31ab40..6feee71d 100644 --- a/runtime/nsd_ptcp.c +++ b/runtime/nsd_ptcp.c @@ -821,6 +821,7 @@ ENDObjClassInit(nsd_ptcp) BEGINmodExit CODESTARTmodExit + nsdpoll_ptcpClassExit(); nsdsel_ptcpClassExit(); nsd_ptcpClassExit(); ENDmodExit @@ -839,6 +840,7 @@ CODESTARTmodInit /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(nsd_ptcpClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ CHKiRet(nsdsel_ptcpClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ + CHKiRet(nsdpoll_ptcpClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ ENDmodInit /* vi:set ai: */ diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index b6b7af77..ed683acc 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -28,11 +28,16 @@ #include #include #include +#if HAVE_SYS_EPOLL_H +# include +#endif #include "rsyslog.h" #include "module-template.h" #include "obj.h" #include "errmsg.h" +#include "srUtils.h" +#include "nspoll.h" #include "nsd_ptcp.h" #include "nsdpoll_ptcp.h" #include "unlimited_select.h" @@ -43,9 +48,66 @@ DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) +/* -START------------------------- helpers for event list ------------------------------------ */ + +/* add new entry to list. We assume that the fd is not already present and DO NOT check this! + * Returns newly created entry in pEvtLst. + * rgerhards, 2009-11-18 + */ +static inline rsRetVal +addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, nsdpoll_epollevt_lst_t **pEvtLst) { + nsdpoll_epollevt_lst_t *pNew; + DEFiRet; + + CHKmalloc(pNew = (nsdpoll_epollevt_lst_t*) malloc(sizeof(nsdpoll_epollevt_lst_t))); + pNew->id = id; + pNew->pUsr = pUsr; + pNew->pSock = pSock; + pNew->event.events = EPOLLET; /* always use edge-triggered mode */ + if(mode & NSDPOLL_IN) + pNew->event.events |= EPOLLIN; + if(mode & NSDPOLL_OUT) + pNew->event.events |= EPOLLOUT; + pNew->event.data.u64 = (uint64) pNew; + pNew->pNext = pThis->pRoot; + pThis->pRoot = pNew; + *pEvtLst = pNew; + +finalize_it: + RETiRet; +} + + +/* remove the entry identified by id/pUsr from the list. + * rgerhards, 2009-11-18 + */ +static inline rsRetVal +delEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr) { + DEFiRet; + // TODO: XXX add code! +#warning delEvent implementation is missing! + RETiRet; +} + + +/* -END--------------------------- helpers for event list ------------------------------------ */ + + /* Standard-Constructor */ BEGINobjConstruct(nsdpoll_ptcp) /* be sure to specify the object type also in END macro! */ +# if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + DBGPRINTF("imudp uses epoll_create1()\n"); + pThis->efd = epoll_create1(EPOLL_CLOEXEC); +# else + DBGPRINTF("imudp uses epoll_create()\n"); + pThis->efd = epoll_create(NUM_EPOLL_EVENTS); +# endif + if(pThis->efd < 0) { + DBGPRINTF("epoll_create1() could not create fd\n"); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } +finalize_it: ENDobjConstruct(nsdpoll_ptcp) @@ -55,19 +117,76 @@ CODESTARTobjDestruct(nsdpoll_ptcp) ENDobjDestruct(nsdpoll_ptcp) -/* Add a socket to the select set */ +/* Modify socket set */ static rsRetVal -Ctl(nsdpoll_t *pThis, int fd, int op, epoll_event_t *event) { +Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) { + nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll; + nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd; + nsdpoll_epollevt_lst_t *pEventLst; + int errSave; + char errStr[512]; DEFiRet; + + if(mode == NSDPOLL_ADD) { + dbgprintf("adding nsdpoll entry %d/%p\n", id, pUsr); + CHKiRet(addEvent(pThis, id, pUsr, mode, pSock, &pEventLst)); + if(epoll_ctl(pThis->efd, EPOLL_CTL_ADD, pSock->sock, &pEventLst->event) < 0) { + errSave = errno; + rs_strerror_r(errSave, errStr, sizeof(errStr)); + errmsg.LogError(errSave, RS_RET_ERR_EPOLL_CTL, + "epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n", + pSock->sock, id, pUsr, mode, errStr); + } + } else if(mode == NSDPOLL_DEL) { + // TODO: XXX : code missing! + dbgprintf("removing nsdpoll entry %d/%p\n", id, pUsr); + } else { + dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", mode); + ABORT_FINALIZE(RS_RET_ERR); + } + +finalize_it: RETiRet; } -/* Wait for io to become ready. +/* Wait for io to become ready. After the successful call, idRdy contains the + * id set by the caller for that i/o event, ppUsr is a pointer to a location + * where the user pointer shall be stored. + * TODO: this is a trivial implementation that only polls one event at a time. We + * may later extend it to poll for multiple events, what would cause less + * overhead. + * rgerhards, 2009-11-18 */ static rsRetVal -Wait(nsdpoll_t *pThis, epoll_event_t *events, int maxevents, int timeout) { +Wait(nsdpoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr) { + nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll; + nsdpoll_epollevt_lst_t *pOurEvt; + struct epoll_event event; + int nfds; DEFiRet; + + assert(idRdy != NULL); + assert(ppUsr != NULL); + + nfds = epoll_wait(pThis->efd, &event, 1, timeout); + if(nfds == -1) { + if(errno == EINTR) { + ABORT_FINALIZE(RS_RET_EINTR); + } else { + DBGPRINTF("epoll() returned with error code %d\n", errno); + ABORT_FINALIZE(RS_RET_ERR_EPOLL); + } + } else if(nfds == 0) { + ABORT_FINALIZE(RS_RET_TIMEOUT); + } + + /* we got a valid event, so tell the caller... */ + pOurEvt = (nsdpoll_epollevt_lst_t*) event.data.u64; + *idRdy = pOurEvt->id; + *ppUsr = pOurEvt->pUsr; + +finalize_it: RETiRet; } diff --git a/runtime/nsdpoll_ptcp.h b/runtime/nsdpoll_ptcp.h index 36d39da7..ccdb87f0 100644 --- a/runtime/nsdpoll_ptcp.h +++ b/runtime/nsdpoll_ptcp.h @@ -26,10 +26,24 @@ #include "nsd.h" typedef nsdpoll_if_t nsdpoll_ptcp_if_t; /* we just *implement* this interface */ +/* a helper object to keep track of the epoll event records + * Note that we need to keep track of that list because we need to + * free the events when they are no longer needed. + */ +typedef struct nsdpoll_epollevt_lst_s nsdpoll_epollevt_lst_t; +struct nsdpoll_epollevt_lst_s { + epoll_event_t event; + int id; + void *pUsr; + nsd_ptcp_t *pSock; /* our associated netstream driver data */ + nsdpoll_epollevt_lst_t *pNext; +}; /* the nsdpoll_ptcp object */ struct nsdpoll_ptcp_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ + int efd; /* file descriptor used by epoll */ + nsdpoll_epollevt_lst_t *pRoot; /* Root of the epoll event list */ }; /* interface is defined in nsd.h, we just implement it! */ diff --git a/runtime/nspoll.c b/runtime/nspoll.c index ff0b1e92..4413c2c7 100644 --- a/runtime/nspoll.c +++ b/runtime/nspoll.c @@ -68,7 +68,7 @@ loadDrvr(nspoll_t *pThis) pBaseDrvrName = pThis->pBaseDrvrName; if(pBaseDrvrName == NULL) /* if no drvr name is set, use system default */ pBaseDrvrName = glbl.GetDfltNetstrmDrvr(); - if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmnspoll_%s", pBaseDrvrName) == sizeof(szDrvrName)) + if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmnsdpoll_%s", pBaseDrvrName) == sizeof(szDrvrName)) ABORT_FINALIZE(RS_RET_DRVRNAME_TOO_LONG); CHKmalloc(pThis->pDrvrName = (uchar*) strdup((char*)szDrvrName)); @@ -79,6 +79,7 @@ loadDrvr(nspoll_t *pThis) * about this hack, but for the time being it is efficient and clean * enough. -- rgerhards, 2008-04-18 */ +RUNLOG_VAR("%s", szDrvrName+2); CHKiRet(obj.UseObj(__FILE__, szDrvrName+2, DONT_LOAD_LIB, (void*) &pThis->Drvr)); finalize_it: @@ -119,9 +120,11 @@ ConstructFinalize(nspoll_t *pThis) { DEFiRet; ISOBJ_TYPE_assert(pThis, nspoll); +RUNLOG_STR("trying to load epoll driver\n"); CHKiRet(loadDrvr(pThis)); CHKiRet(pThis->Drvr.Construct(&pThis->pDrvrData)); finalize_it: +dbgprintf("XXX: done trying to load epoll driver, state %d\n", iRet); RETiRet; } @@ -129,12 +132,11 @@ finalize_it: /* Carries out the actual wait (all done in lower layers) */ static rsRetVal -Wait(nspoll_t *pThis, epoll_event_t *events, int maxevents, int timeout) { +Wait(nspoll_t *pThis, int timeout, int *idRdy, void **ppUsr) { DEFiRet; ISOBJ_TYPE_assert(pThis, nspoll); - assert(events != NULL); - assert(maxevents > 0); - iRet = pThis->Drvr.Wait(pThis->pDrvrData, events, maxevents, timeout); + assert(idRdy != NULL); + iRet = pThis->Drvr.Wait(pThis->pDrvrData, timeout, idRdy, ppUsr); RETiRet; } @@ -143,11 +145,10 @@ Wait(nspoll_t *pThis, epoll_event_t *events, int maxevents, int timeout) { * rgerhards, 2009-11-18 */ static rsRetVal -Ctl(nspoll_t *pThis, int fd, int op, epoll_event_t *event) { +Ctl(nspoll_t *pThis, nsd_t *pNsd, int id, void *pUsr, int op) { DEFiRet; ISOBJ_TYPE_assert(pThis, nspoll); - assert(event != NULL); - iRet = pThis->Drvr.Ctl(pThis->pDrvrData, fd, op, event); + iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pNsd, id, pUsr, op); RETiRet; } @@ -188,6 +189,7 @@ ENDObjClassExit(nspoll) */ BEGINObjClassInit(nspoll, 1, OBJ_IS_CORE_MODULE) /* class, version */ /* request objects we use */ + DBGPRINTF("doing nspollClassInit\n"); CHKiRet(objUse(glbl, CORE_COMPONENT)); /* set our own handlers */ diff --git a/runtime/nspoll.h b/runtime/nspoll.h index 19efd38b..9d39e017 100644 --- a/runtime/nspoll.h +++ b/runtime/nspoll.h @@ -26,6 +26,15 @@ #include "netstrms.h" +/* some operations to be portable when we do not have epoll() available */ +#define NSDPOLL_ADD 1 +#define NSDPOLL_DEL 2 + +/* and some mode specifiers for waiting on input/output */ +#define NSDPOLL_IN 1 /* EPOLLIN */ +#define NSDPOLL_OUT 2 /* EPOLLOUT */ +/* next is 4, 8, 16, ... - must be bit values, as they are ored! */ + /* the nspoll object */ struct nspoll_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ @@ -41,8 +50,9 @@ BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(nspoll_t **ppThis); rsRetVal (*ConstructFinalize)(nspoll_t *pThis); rsRetVal (*Destruct)(nspoll_t **ppThis); - rsRetVal (*Ctl)(nspoll_t *pThis, int fd, int op, epoll_event_t *event); - rsRetVal (*Wait)(nspoll_t *pThis, epoll_event_t *events, int maxevents, int timeout); + rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr); + rsRetVal (*Ctl)(nspoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int op); + rsRetVal (*IsEPollSupported)(void); /* static method */ ENDinterface(nspoll) #define nspollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/runtime/nssel.c b/runtime/nssel.c index d11d5fe1..7c5be3a9 100644 --- a/runtime/nssel.c +++ b/runtime/nssel.c @@ -219,6 +219,7 @@ ENDObjClassExit(nssel) */ BEGINObjClassInit(nssel, 1, OBJ_IS_CORE_MODULE) /* class, version */ /* request objects we use */ + DBGPRINTF("doing nsselClassInit\n"); CHKiRet(objUse(glbl, CORE_COMPONENT)); /* set our own handlers */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index c5f7a9ea..b8ebafd9 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -414,6 +414,10 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_NO_RULESET= -2158,/**< no ruleset name as specified where one was needed */ RS_RET_PARSER_NOT_FOUND = -2159,/**< parser with the specified name was not found */ RS_RET_COULD_NOT_PARSE = -2160,/**< (this) parser could not parse the message (no error, means try next one) */ + RS_RET_EINTR = -2161, /**< EINTR occured during a system call (not necessarily an error) */ + RS_RET_ERR_EPOLL = -2162, /**< epoll() returned with an unexpected error code */ + RS_RET_ERR_EPOLL_CTL = -2163, /**< epol_ctll() returned with an unexpected error code */ + RS_RET_TIMEOUT = -2164, /**< timeout occured during operation */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ diff --git a/tcpsrv.c b/tcpsrv.c index ef453f3a..75055195 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -68,6 +68,7 @@ #include "netstrms.h" #include "netstrm.h" #include "nssel.h" +#include "nspoll.h" #include "errmsg.h" #include "ruleset.h" #include "unicode-helper.h" @@ -89,6 +90,7 @@ DEFobjCurrIf(net) DEFobjCurrIf(netstrms) DEFobjCurrIf(netstrm) DEFobjCurrIf(nssel) +DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) @@ -521,10 +523,13 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) } -/* This function is called to gather input. */ +/* This function is called to gather input. + * This variant here is only used if we need to work with a netstream driver + * that does not support epoll(). + */ #pragma GCC diagnostic ignored "-Wempty-body" -static rsRetVal -Run(tcpsrv_t *pThis) +static inline rsRetVal +RunSelect(tcpsrv_t *pThis) { DEFiRet; int nfds; @@ -532,7 +537,7 @@ Run(tcpsrv_t *pThis) int iTCPSess; int bIsReady; tcps_sess_t *pNewSess; - nssel_t *pSel; + nssel_t *pSel = NULL; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -606,6 +611,110 @@ finalize_it: /* this is a very special case - this time only we do not exit the #pragma GCC diagnostic warning "-Wempty-body" +/* This function is called to gather input. It tries doing that via the epoll() + * interface. If the driver does not support that, it falls back to calling its + * select() equivalent. + * rgerhards, 2009-11-18 + */ +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +Run(tcpsrv_t *pThis) +{ + DEFiRet; + int nfds; + int i; + int iTCPSess; + int bIsReady; + tcps_sess_t *pNewSess; + nspoll_t *pPoll; + rsRetVal localRet; + + ISOBJ_TYPE_assert(pThis, tcpsrv); + + /* this is an endless loop - it is terminated by the framework canelling + * this thread. Thus, we also need to instantiate a cancel cleanup handler + * to prevent us from leaking anything. -- rgerhards, 20080-04-24 + */ +#warning implement cancel cleanup handler! + //pthread_cleanup_push(RunCancelCleanup, (void*) &pSel); + if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) { + localRet = nspoll.ConstructFinalize(pPoll); + } + if(localRet != RS_RET_OK) { + /* fall back to select */ + dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); + iRet = RunSelect(pThis); + FINALIZE; + } + + dbgprintf("we would use the poll handler, currently not implemented!\n"); +#if 0 + while(1) { + CHKiRet(nssel.Construct(&pSel)); + // TODO: set driver + CHKiRet(nssel.ConstructFinalize(pSel)); + + /* Add the TCP listen sockets to the list of read descriptors. */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + CHKiRet(nssel.Add(pSel, pThis->ppLstn[i], NSDSEL_RD)); + } + + /* do the sessions */ + iTCPSess = TCPSessGetNxtSess(pThis, -1); + while(iTCPSess != -1) { + /* TODO: access to pNsd is NOT really CLEAN, use method... */ + CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); + /* now get next... */ + iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); + } + + /* wait for io to become ready */ + CHKiRet(nssel.Wait(pSel, &nfds)); + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ + + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + if(glbl.GetGlobalInputTermState() == 1) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); + if(bIsReady) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + --nfds; /* indicate we have processed one */ + } + } + + /* now check the sessions */ + iTCPSess = TCPSessGetNxtSess(pThis, -1); + while(nfds && iTCPSess != -1) { + if(glbl.GetGlobalInputTermState() == 1) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); + if(bIsReady) { + doReceive(pThis, &pThis->pSessions[iTCPSess]); + --nfds; /* indicate we have processed one */ + } + iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); + } + CHKiRet(nssel.Destruct(&pSel)); +finalize_it: /* this is a very special case - this time only we do not exit the function, + * because that would not help us either. So we simply retry it. Let's see + * if that actually is a better idea. Exiting the loop wasn't we always + * crashed, which made sense (the rest of the engine was not prepared for + * that) -- rgerhards, 2008-05-19 + */ + /*EMPTY*/; + } +#endif + /* note that this point is usually not reached */ +// pthread_cleanup_pop(1); /* remove cleanup handler */ + +finalize_it: + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" + + /* Standard-Constructor */ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */ pThis->iSessMax = TCPSESS_MAX_DEFAULT; @@ -969,6 +1078,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME)); CHKiRet(objUse(netstrm, DONT_LOAD_LIB)); CHKiRet(objUse(nssel, DONT_LOAD_LIB)); + CHKiRet(objUse(nspoll, DONT_LOAD_LIB)); CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB)); CHKiRet(objUse(conf, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); -- cgit v1.2.3 From 02e4a98bac7329f6ab4bb3503839aba7e87881e5 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 19 Nov 2009 18:41:37 +0100 Subject: milestone: working towards utilizing epoll interface NOTE: this version does NOT run - tcp-based servers are defunct. --- runtime/nspoll.c | 4 ++-- runtime/nspoll.h | 2 +- tcpsrv.c | 33 ++++++++++++++++++++++++--------- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/runtime/nspoll.c b/runtime/nspoll.c index 4413c2c7..b9a189cb 100644 --- a/runtime/nspoll.c +++ b/runtime/nspoll.c @@ -145,10 +145,10 @@ Wait(nspoll_t *pThis, int timeout, int *idRdy, void **ppUsr) { * rgerhards, 2009-11-18 */ static rsRetVal -Ctl(nspoll_t *pThis, nsd_t *pNsd, int id, void *pUsr, int op) { +Ctl(nspoll_t *pThis, netstrm_t *pStrm, int id, void *pUsr, int op) { DEFiRet; ISOBJ_TYPE_assert(pThis, nspoll); - iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pNsd, id, pUsr, op); + iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pStrm->pDrvrData, id, pUsr, op); RETiRet; } diff --git a/runtime/nspoll.h b/runtime/nspoll.h index 9d39e017..281b8103 100644 --- a/runtime/nspoll.h +++ b/runtime/nspoll.h @@ -51,7 +51,7 @@ BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*ConstructFinalize)(nspoll_t *pThis); rsRetVal (*Destruct)(nspoll_t **ppThis); rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr); - rsRetVal (*Ctl)(nspoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int op); + rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int op); rsRetVal (*IsEPollSupported)(void); /* static method */ ENDinterface(nspoll) #define nspollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/tcpsrv.c b/tcpsrv.c index 75055195..e6fdd087 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -15,9 +15,6 @@ * callbacks before the code is run. The tcpsrv then calls back * into the specific input modules at the appropriate time. * - * NOTE: read comments in module-template.h to understand how this file - * works! - * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * * Copyright 2007, 2008, 2009 Rainer Gerhards and Adiscon GmbH. @@ -626,7 +623,7 @@ Run(tcpsrv_t *pThis) int iTCPSess; int bIsReady; tcps_sess_t *pNewSess; - nspoll_t *pPoll; + nspoll_t *pPoll = NULL; rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -638,6 +635,7 @@ Run(tcpsrv_t *pThis) #warning implement cancel cleanup handler! //pthread_cleanup_push(RunCancelCleanup, (void*) &pSel); if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) { + // TODO: set driver localRet = nspoll.ConstructFinalize(pPoll); } if(localRet != RS_RET_OK) { @@ -648,16 +646,31 @@ Run(tcpsrv_t *pThis) } dbgprintf("we would use the poll handler, currently not implemented!\n"); + + /* Add the TCP listen sockets to the list of sockets to monitor */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + dbgprintf("Trying to add listener %d\n", i); + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, &pThis->ppLstn, NSDPOLL_IN)); + dbgprintf("Added listener %d\n", i); + } + + while(1) { + localRet = nspoll.Wait(pSel, &nfds); + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ + + /* check if we need to ignore the i/o ready state. We do this if we got an invalid + * return state. Validly, this can happen for RS_RET_EINTR, for other cases it may + * not be the right thing, but what is the right thing is really hard at this point... + */ + if(localRet != RS_RET_OK) + continue; + } #if 0 while(1) { CHKiRet(nssel.Construct(&pSel)); - // TODO: set driver CHKiRet(nssel.ConstructFinalize(pSel)); - /* Add the TCP listen sockets to the list of read descriptors. */ - for(i = 0 ; i < pThis->iLstnCurr ; ++i) { - CHKiRet(nssel.Add(pSel, pThis->ppLstn[i], NSDSEL_RD)); - } /* do the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); @@ -710,6 +723,8 @@ finalize_it: /* this is a very special case - this time only we do not exit the // pthread_cleanup_pop(1); /* remove cleanup handler */ finalize_it: + if(pPoll != NULL) + nspoll.Destruct(&pPoll); RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" -- cgit v1.2.3 From e0d77fa90cad334f308da9cbd4369d61f1c97511 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 23 Nov 2009 15:33:52 +0100 Subject: milestone commit: first working version with epoll/tcp ... but not well-tested, so there may be many hidden bugs. --- runtime/nsd.h | 2 +- runtime/nsdpoll_ptcp.c | 60 +++++++++++++++++++++++----- runtime/nspoll.c | 4 +- runtime/nspoll.h | 2 +- tcpsrv.c | 106 +++++++++++++++++-------------------------------- tcpsrv.h | 1 + 6 files changed, 92 insertions(+), 83 deletions(-) diff --git a/runtime/nsd.h b/runtime/nsd.h index fc34ad6e..e5b9320b 100644 --- a/runtime/nsd.h +++ b/runtime/nsd.h @@ -91,7 +91,7 @@ ENDinterface(nsdsel) BEGINinterface(nsdpoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(nsdpoll_t **ppThis); rsRetVal (*Destruct)(nsdpoll_t **ppThis); - rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int op); + rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op); rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr); ENDinterface(nsdpoll) #define nsdpollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index ed683acc..95fd8aa3 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -78,14 +78,44 @@ finalize_it: } -/* remove the entry identified by id/pUsr from the list. - * rgerhards, 2009-11-18 +/* find and unlink the entry identified by id/pUsr from the list. + * rgerhards, 2009-11-23 + */ +static inline rsRetVal +unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **ppEvtLst) { + nsdpoll_epollevt_lst_t *pEvtLst; + nsdpoll_epollevt_lst_t *pPrev = NULL; + DEFiRet; + + pEvtLst = pThis->pRoot; + while(pEvtLst != NULL && pEvtLst->id != id && pEvtLst->pUsr != pUsr) { + pPrev = pEvtLst; + pEvtLst = pEvtLst->pNext; + } + if(pEvtLst == NULL) + ABORT_FINALIZE(RS_RET_NOT_FOUND); + + *ppEvtLst = pEvtLst; + + /* unlink */ + if(pPrev == NULL) + pThis->pRoot = pEvtLst->pNext; + else + pPrev->pNext = pEvtLst->pNext; + +finalize_it: + RETiRet; +} + + +/* destruct the provided element. It must already be unlinked from the list. + * rgerhards, 2009-11-23 */ static inline rsRetVal -delEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr) { +delEvent(nsdpoll_epollevt_lst_t **ppEvtLst) { DEFiRet; - // TODO: XXX add code! -#warning delEvent implementation is missing! + free(*ppEvtLst); + *ppEvtLst = NULL; RETiRet; } @@ -119,7 +149,7 @@ ENDobjDestruct(nsdpoll_ptcp) /* Modify socket set */ static rsRetVal -Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) { +Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) { nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll; nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd; nsdpoll_epollevt_lst_t *pEventLst; @@ -127,7 +157,7 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) { char errStr[512]; DEFiRet; - if(mode == NSDPOLL_ADD) { + if(op == NSDPOLL_ADD) { dbgprintf("adding nsdpoll entry %d/%p\n", id, pUsr); CHKiRet(addEvent(pThis, id, pUsr, mode, pSock, &pEventLst)); if(epoll_ctl(pThis->efd, EPOLL_CTL_ADD, pSock->sock, &pEventLst->event) < 0) { @@ -137,11 +167,21 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) { "epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n", pSock->sock, id, pUsr, mode, errStr); } - } else if(mode == NSDPOLL_DEL) { - // TODO: XXX : code missing! + } else if(op == NSDPOLL_DEL) { + // TODO: XXX : code missing! del Event dbgprintf("removing nsdpoll entry %d/%p\n", id, pUsr); + CHKiRet(unlinkEvent(pThis, id, pUsr, &pEventLst)); + if(epoll_ctl(pThis->efd, EPOLL_CTL_DEL, pSock->sock, &pEventLst->event) < 0) { + errSave = errno; + rs_strerror_r(errSave, errStr, sizeof(errStr)); + errmsg.LogError(errSave, RS_RET_ERR_EPOLL_CTL, + "epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n", + pSock->sock, id, pUsr, mode, errStr); + ABORT_FINALIZE(RS_RET_ERR_EPOLL_CTL); + } + CHKiRet(delEvent(&pEventLst)); } else { - dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", mode); + dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", op); ABORT_FINALIZE(RS_RET_ERR); } diff --git a/runtime/nspoll.c b/runtime/nspoll.c index b9a189cb..f287cd4e 100644 --- a/runtime/nspoll.c +++ b/runtime/nspoll.c @@ -145,10 +145,10 @@ Wait(nspoll_t *pThis, int timeout, int *idRdy, void **ppUsr) { * rgerhards, 2009-11-18 */ static rsRetVal -Ctl(nspoll_t *pThis, netstrm_t *pStrm, int id, void *pUsr, int op) { +Ctl(nspoll_t *pThis, netstrm_t *pStrm, int id, void *pUsr, int mode, int op) { DEFiRet; ISOBJ_TYPE_assert(pThis, nspoll); - iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pStrm->pDrvrData, id, pUsr, op); + iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pStrm->pDrvrData, id, pUsr, mode, op); RETiRet; } diff --git a/runtime/nspoll.h b/runtime/nspoll.h index 281b8103..a77759c0 100644 --- a/runtime/nspoll.h +++ b/runtime/nspoll.h @@ -51,7 +51,7 @@ BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*ConstructFinalize)(nspoll_t *pThis); rsRetVal (*Destruct)(nspoll_t **ppThis); rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr); - rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int op); + rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int mode, int op); rsRetVal (*IsEPollSupported)(void); /* static method */ ENDinterface(nspoll) #define nspollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/tcpsrv.c b/tcpsrv.c index e6fdd087..9d51d83b 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -237,11 +237,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis) if(pThis->pSessions != NULL) { /* close all TCP connections! */ - i = TCPSessGetNxtSess(pThis, -1); - while(i != -1) { - tcps_sess.Destruct(&pThis->pSessions[i]); - /* now get next... */ - i = TCPSessGetNxtSess(pThis, i); + if(!pThis->bUsingEPoll) { + i = TCPSessGetNxtSess(pThis, -1); + while(i != -1) { + tcps_sess.Destruct(&pThis->pSessions[i]); + /* now get next... */ + i = TCPSessGetNxtSess(pThis, i); + } } /* we are done with the session table - so get rid of it... */ @@ -437,7 +439,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, } *ppSess = pSess; - pThis->pSessions[iSess] = pSess; + if(!pThis->bUsingEPoll) + pThis->pSessions[iSess] = pSess; pSess = NULL; /* this is now also handed over */ finalize_it: @@ -465,10 +468,12 @@ RunCancelCleanup(void *arg) /* process a receive request on one of the streams + * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need + * to remove any descriptor we close from the epoll set. * rgerhards, 2009-07-020 */ static rsRetVal -doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) +doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */ ssize_t iRcvd; @@ -490,6 +495,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } + if(pPoll != NULL) { + CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); + } pThis->pOnRegularClose(*ppSess); tcps_sess.Destruct(ppSess); break; @@ -516,6 +524,8 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) tcps_sess.Destruct(ppSess); break; } + +finalize_it: RETiRet; } @@ -585,7 +595,7 @@ RunSelect(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess]); + doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); @@ -613,17 +623,14 @@ finalize_it: /* this is a very special case - this time only we do not exit the * select() equivalent. * rgerhards, 2009-11-18 */ -#pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal Run(tcpsrv_t *pThis) { DEFiRet; - int nfds; int i; - int iTCPSess; - int bIsReady; tcps_sess_t *pNewSess; nspoll_t *pPoll = NULL; + void *pUsr; rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -632,8 +639,6 @@ Run(tcpsrv_t *pThis) * this thread. Thus, we also need to instantiate a cancel cleanup handler * to prevent us from leaking anything. -- rgerhards, 20080-04-24 */ -#warning implement cancel cleanup handler! - //pthread_cleanup_push(RunCancelCleanup, (void*) &pSel); if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) { // TODO: set driver localRet = nspoll.ConstructFinalize(pPoll); @@ -647,15 +652,18 @@ Run(tcpsrv_t *pThis) dbgprintf("we would use the poll handler, currently not implemented!\n"); + /* flag that we are in epoll mode */ + pThis->bUsingEPoll = TRUE; + /* Add the TCP listen sockets to the list of sockets to monitor */ for(i = 0 ; i < pThis->iLstnCurr ; ++i) { dbgprintf("Trying to add listener %d\n", i); - CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, &pThis->ppLstn, NSDPOLL_IN)); + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); dbgprintf("Added listener %d\n", i); } while(1) { - localRet = nspoll.Wait(pSel, &nfds); + localRet = nspoll.Wait(pPoll, -1, &i, &pUsr); if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ @@ -665,69 +673,30 @@ Run(tcpsrv_t *pThis) */ if(localRet != RS_RET_OK) continue; - } -#if 0 - while(1) { - CHKiRet(nssel.Construct(&pSel)); - CHKiRet(nssel.ConstructFinalize(pSel)); + dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr); - /* do the sessions */ - iTCPSess = TCPSessGetNxtSess(pThis, -1); - while(iTCPSess != -1) { - /* TODO: access to pNsd is NOT really CLEAN, use method... */ - CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); - /* now get next... */ - iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); - } - - /* wait for io to become ready */ - CHKiRet(nssel.Wait(pSel, &nfds)); - if(glbl.GetGlobalInputTermState() == 1) - break; /* terminate input! */ - - for(i = 0 ; i < pThis->iLstnCurr ; ++i) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); - if(bIsReady) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); - --nfds; /* indicate we have processed one */ - } + if(pUsr == pThis->ppLstn) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + pNewSess = (tcps_sess_t*) pUsr; + doReceive(pThis, &pNewSess, pPoll); } + } - /* now check the sessions */ - iTCPSess = TCPSessGetNxtSess(pThis, -1); - while(nfds && iTCPSess != -1) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); - if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess]); - --nfds; /* indicate we have processed one */ - } - iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); - } - CHKiRet(nssel.Destruct(&pSel)); -finalize_it: /* this is a very special case - this time only we do not exit the function, - * because that would not help us either. So we simply retry it. Let's see - * if that actually is a better idea. Exiting the loop wasn't we always - * crashed, which made sense (the rest of the engine was not prepared for - * that) -- rgerhards, 2008-05-19 - */ - /*EMPTY*/; + /* remove the tcp listen sockets from the epoll set */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL)); } -#endif - /* note that this point is usually not reached */ -// pthread_cleanup_pop(1); /* remove cleanup handler */ finalize_it: if(pPoll != NULL) nspoll.Destruct(&pPoll); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* Standard-Constructor */ @@ -1032,7 +1001,6 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->ConstructFinalize = tcpsrvConstructFinalize; pIf->Destruct = tcpsrvDestruct; - //pIf->SessAccept = SessAccept; pIf->configureTCPListen = configureTCPListen; pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; diff --git a/tcpsrv.h b/tcpsrv.h index b8d82163..e7a95a46 100644 --- a/tcpsrv.h +++ b/tcpsrv.h @@ -55,6 +55,7 @@ struct tcpsrv_s { ruleset_t *pRuleset; /**< ruleset to bind to */ permittedPeers_t *pPermPeers;/**< driver's permitted peers */ bool bEmitMsgOnClose; /**< emit an informational message when the remote peer closes connection */ + bool bUsingEPoll; /**< are we in epoll mode (means we do not need to keep track of sessions!) */ int iLstnCurr; /**< max nbr of listeners currently supported */ netstrm_t **ppLstn; /**< our netstream listners */ tcpLstnPortList_t **ppLstnPort; /**< pointer to relevant listen port description */ -- cgit v1.2.3 From 18749309f333ee83d5fc14d64971e0418eef360d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Nov 2009 13:28:05 +0100 Subject: fixing some potential segfault conditions --- plugins/imdiag/imdiag.c | 5 +++++ runtime/nsd_ptcp.c | 5 ++++- runtime/nsdpoll_ptcp.c | 15 ++++++++++----- runtime/rsyslog.h | 1 + tcpsrv.c | 33 +++++++++++++++++++++------------ 5 files changed, 41 insertions(+), 18 deletions(-) diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 2f7e5fee..81b357ef 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -270,6 +270,11 @@ waitMainQEmpty(tcps_sess_t *pSess) dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize); srSleep(0,2); /* wait a little bit */ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + if(iMsgQueueSize == 0) { + /* verify that queue is still empty (else it could just be a race!) */ + srSleep(1,5); /* wait a little bit */ + CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + } } CHKiRet(sendResponse(pSess, "mainqueue empty\n")); diff --git a/runtime/nsd_ptcp.c b/runtime/nsd_ptcp.c index 6feee71d..96881097 100644 --- a/runtime/nsd_ptcp.c +++ b/runtime/nsd_ptcp.c @@ -562,6 +562,7 @@ finalize_it: static rsRetVal Rcv(nsd_t *pNsd, uchar *pRcvBuf, ssize_t *pLenBuf) { + char errStr[1024]; DEFiRet; nsd_ptcp_t *pThis = (nsd_ptcp_t*) pNsd; ISOBJ_TYPE_assert(pThis, nsd_ptcp); @@ -571,7 +572,9 @@ Rcv(nsd_t *pNsd, uchar *pRcvBuf, ssize_t *pLenBuf) if(*pLenBuf == 0) { ABORT_FINALIZE(RS_RET_CLOSED); } else if (*pLenBuf < 0) { - ABORT_FINALIZE(RS_RET_ERR); + rs_strerror_r(errno, errStr, sizeof(errStr)); + dbgprintf("error during recv on NSD %p: %s\n", pNsd, errStr); + ABORT_FINALIZE(RS_RET_RCV_ERR); } finalize_it: diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index 95fd8aa3..445ecc71 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -52,6 +52,12 @@ DEFobjCurrIf(glbl) /* add new entry to list. We assume that the fd is not already present and DO NOT check this! * Returns newly created entry in pEvtLst. + * Note that we currently need to use level-triggered mode, because the upper layers do not work + * in parallel. As such, in edge-triggered mode we may not get notified, because new data comes + * in after we have read everything that was present. To use ET mode, we need to change the upper + * peers so that they immediately start a new wait before processing the data read. That obviously + * requires more elaborate redesign and we postpone this until the current more simplictic mode has + * been proven OK in practice. * rgerhards, 2009-11-18 */ static inline rsRetVal @@ -63,7 +69,7 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, pNew->id = id; pNew->pUsr = pUsr; pNew->pSock = pSock; - pNew->event.events = EPOLLET; /* always use edge-triggered mode */ + pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */ if(mode & NSDPOLL_IN) pNew->event.events |= EPOLLIN; if(mode & NSDPOLL_OUT) @@ -88,7 +94,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t ** DEFiRet; pEvtLst = pThis->pRoot; - while(pEvtLst != NULL && pEvtLst->id != id && pEvtLst->pUsr != pUsr) { + while(pEvtLst != NULL && !(pEvtLst->id == id && pEvtLst->pUsr == pUsr)) { pPrev = pEvtLst; pEvtLst = pEvtLst->pNext; } @@ -158,7 +164,7 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) { DEFiRet; if(op == NSDPOLL_ADD) { - dbgprintf("adding nsdpoll entry %d/%p\n", id, pUsr); + dbgprintf("adding nsdpoll entry %d/%p, sock %d\n", id, pUsr, pSock->sock); CHKiRet(addEvent(pThis, id, pUsr, mode, pSock, &pEventLst)); if(epoll_ctl(pThis->efd, EPOLL_CTL_ADD, pSock->sock, &pEventLst->event) < 0) { errSave = errno; @@ -168,8 +174,7 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) { pSock->sock, id, pUsr, mode, errStr); } } else if(op == NSDPOLL_DEL) { - // TODO: XXX : code missing! del Event - dbgprintf("removing nsdpoll entry %d/%p\n", id, pUsr); + dbgprintf("removing nsdpoll entry %d/%p, sock %d\n", id, pUsr, pSock->sock); CHKiRet(unlinkEvent(pThis, id, pUsr, &pEventLst)); if(epoll_ctl(pThis->efd, EPOLL_CTL_DEL, pSock->sock, &pEventLst->event) < 0) { errSave = errno; diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index b8ebafd9..1431c684 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -418,6 +418,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_ERR_EPOLL = -2162, /**< epoll() returned with an unexpected error code */ RS_RET_ERR_EPOLL_CTL = -2163, /**< epol_ctll() returned with an unexpected error code */ RS_RET_TIMEOUT = -2164, /**< timeout occured during operation */ + RS_RET_RCV_ERR = -2165, /**< error occured during socket rcv operation */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ diff --git a/tcpsrv.c b/tcpsrv.c index 9d51d83b..d2ab16f2 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -467,6 +467,22 @@ RunCancelCleanup(void *arg) } +/* helper to close a session. Takes status of poll vs. select into consideration. + * rgerhards, 2009-11-25 + */ +static inline rsRetVal +closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { + DEFiRet; + if(pPoll != NULL) { + CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); + } + pThis->pOnRegularClose(*ppSess); + tcps_sess.Destruct(ppSess); +finalize_it: + RETiRet; +} + + /* process a receive request on one of the streams * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need * to remove any descriptor we close from the epoll set. @@ -482,7 +498,6 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) ISOBJ_TYPE_assert(pThis, tcpsrv); DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm); - /* Receive message */ iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd); switch(iRet) { @@ -495,11 +510,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } - if(pPoll != NULL) { - CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); - } - pThis->pOnRegularClose(*ppSess); - tcps_sess.Destruct(ppSess); + CHKiRet(closeSess(pThis, ppSess, pPoll)); break; case RS_RET_RETRY: /* we simply ignore retry - this is not an error, but we also have not received anything */ @@ -512,16 +523,14 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) */ errmsg.LogError(0, localRet, "Tearing down TCP Session - see " "previous messages for reason(s)\n"); - pThis->pOnErrClose(*ppSess); - tcps_sess.Destruct(ppSess); + CHKiRet(closeSess(pThis, ppSess, pPoll)); } break; default: errno = 0; errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n", (*ppSess)->pStrm); - pThis->pOnErrClose(*ppSess); - tcps_sess.Destruct(ppSess); + CHKiRet(closeSess(pThis, ppSess, pPoll)); break; } @@ -650,14 +659,14 @@ Run(tcpsrv_t *pThis) FINALIZE; } - dbgprintf("we would use the poll handler, currently not implemented!\n"); + dbgprintf("tcpsrv uses epoll() interface, nsdpol driver found\n"); /* flag that we are in epoll mode */ pThis->bUsingEPoll = TRUE; /* Add the TCP listen sockets to the list of sockets to monitor */ for(i = 0 ; i < pThis->iLstnCurr ; ++i) { - dbgprintf("Trying to add listener %d\n", i); + dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); dbgprintf("Added listener %d\n", i); } -- cgit v1.2.3 From 41211495e5234c1ed4ccd408a4043915ad075771 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Nov 2009 13:30:18 +0100 Subject: goal reached: epoll() driver for plain tcp syslog receiver available introduced the ablity for netstream drivers to utilize an epoll interface This offers increased performance and removes the select() FDSET size limit from imtcp. Note that we fall back to select() if there is no epoll netstream drivers. So far, an epoll driver has only been implemented for plain tcp syslog, the rest will follow once the code proves well in practice AND there is demand. --- ChangeLog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ChangeLog b/ChangeLog index bb7228e2..5b256580 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,11 @@ --------------------------------------------------------------------------- Version 5.5.1 [DEVEL] (rgerhards), 2009-11-?? +- introduced the ablity for netstream drivers to utilize an epoll interface + This offers increased performance and removes the select() FDSET size + limit from imtcp. Note that we fall back to select() if there is no + epoll netstream drivers. So far, an epoll driver has only been + implemented for plain tcp syslog, the rest will follow once the code + proves well in practice AND there is demand. --------------------------------------------------------------------------- Version 5.5.0 [DEVEL] (rgerhards), 2009-11-18 - moved DNS resolution code out of imudp and into the backend processing -- cgit v1.2.3