summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/Makefile.am12
-rw-r--r--runtime/atomic.h134
-rw-r--r--runtime/batch.h122
-rw-r--r--runtime/cfsysline.c8
-rw-r--r--runtime/debug.c32
-rw-r--r--runtime/debug.h5
-rw-r--r--runtime/glbl.c22
-rw-r--r--runtime/glbl.h13
-rw-r--r--runtime/module-template.h43
-rw-r--r--runtime/modules.c66
-rw-r--r--runtime/modules.h10
-rw-r--r--runtime/msg.c172
-rw-r--r--runtime/msg.h34
-rw-r--r--runtime/net.c69
-rw-r--r--runtime/net.h7
-rw-r--r--runtime/netstrms.c3
-rw-r--r--runtime/nsd.h9
-rw-r--r--runtime/nsd_ptcp.c12
-rw-r--r--runtime/nsdpoll_ptcp.c288
-rw-r--r--runtime/nsdpoll_ptcp.h60
-rw-r--r--runtime/nsdsel_ptcp.c51
-rw-r--r--runtime/nsdsel_ptcp.h5
-rw-r--r--runtime/nspoll.c195
-rw-r--r--runtime/nspoll.h65
-rw-r--r--runtime/nssel.c1
-rw-r--r--runtime/parser.c64
-rw-r--r--runtime/parser.h4
-rw-r--r--runtime/prop.c6
-rw-r--r--runtime/prop.h2
-rw-r--r--runtime/queue.c131
-rw-r--r--runtime/queue.h18
-rw-r--r--runtime/rsyslog.c5
-rw-r--r--runtime/rsyslog.h57
-rw-r--r--runtime/rule.c61
-rw-r--r--runtime/rule.h7
-rw-r--r--runtime/ruleset.c90
-rw-r--r--runtime/ruleset.h4
-rw-r--r--runtime/srutils.c27
-rw-r--r--runtime/stream.c12
-rw-r--r--runtime/stream.h16
-rw-r--r--runtime/strgen.c279
-rw-r--r--runtime/strgen.h60
-rw-r--r--runtime/unlimited_select.h45
-rw-r--r--runtime/wti.c57
-rw-r--r--runtime/wti.h8
-rw-r--r--runtime/wtp.c51
-rw-r--r--runtime/wtp.h3
47 files changed, 2170 insertions, 275 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index caf7c5ca..f7db3e35 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -16,10 +16,13 @@ librsyslog_la_SOURCES = \
nsd.h \
glbl.h \
glbl.c \
+ unlimited_select.h \
conf.c \
conf.h \
parser.h \
parser.c \
+ strgen.h \
+ strgen.c \
msg.c \
msg.h \
linkedlist.c \
@@ -136,7 +139,10 @@ lmnet_la_LDFLAGS = -module -avoid-version
lmnet_la_LIBADD =
# network stream master class and stream factory
-lmnetstrms_la_SOURCES = netstrms.c netstrms.h netstrm.c netstrm.h nssel.c nssel.h
+lmnetstrms_la_SOURCES = netstrms.c netstrms.h \
+ netstrm.c netstrm.h \
+ nssel.c nssel.h \
+ nspoll.c nspoll.h
lmnetstrms_la_CPPFLAGS = $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
lmnetstrms_la_LDFLAGS = -module -avoid-version
lmnetstrms_la_LIBADD =
@@ -152,7 +158,9 @@ lmstrmsrv_la_LIBADD =
# plain tcp driver - main driver
pkglib_LTLIBRARIES += lmnsd_ptcp.la
-lmnsd_ptcp_la_SOURCES = nsd_ptcp.c nsd_ptcp.h nsdsel_ptcp.c nsdsel_ptcp.h
+lmnsd_ptcp_la_SOURCES = nsd_ptcp.c nsd_ptcp.h \
+ nsdsel_ptcp.c nsdsel_ptcp.h \
+ nsdpoll_ptcp.c nsdpoll_ptcp.h
lmnsd_ptcp_la_CPPFLAGS = $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
lmnsd_ptcp_la_LDFLAGS = -module -avoid-version
lmnsd_ptcp_la_LIBADD =
diff --git a/runtime/atomic.h b/runtime/atomic.h
index b507b769..da544c4b 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -31,8 +31,6 @@
* A copy of the GPL can be found in the file "COPYING" in this distribution.
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
-#include "config.h" /* autotools! */
-
#ifndef INCLUDED_ATOMIC_H
#define INCLUDED_ATOMIC_H
@@ -41,18 +39,30 @@
* They simply came in too late. -- rgerhards, 2008-04-02
*/
#ifdef HAVE_ATOMIC_BUILTINS
-# define ATOMIC_SUB(data, val) __sync_fetch_and_sub(&(data), val)
+# define ATOMIC_SUB(data, val, phlpmut) __sync_fetch_and_sub(data, val)
# define ATOMIC_ADD(data, val) __sync_fetch_and_add(&(data), val)
-# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
-# define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1)
-# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
-# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
-# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
+# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1))
+# define ATOMIC_INC_AND_FETCH(data, phlpmut) __sync_fetch_and_add(data, 1)
+# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1))
+# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1)
+# define ATOMIC_FETCH_32BIT(data, phlpmut) ((unsigned) __sync_fetch_and_and(data, 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
-# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0)
-# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1)
-# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal));
+# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
+# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
+# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
+# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal))
+# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
+
+ /* functions below are not needed if we have atomics */
+# define DEF_ATOMIC_HELPER_MUT(x)
+# define INIT_ATOMIC_HELPER_MUT(x)
+# define DESTROY_ATOMIC_HELPER_MUT(x)
+
+ /* the following operations should preferrably be done atomic, but it is
+ * not fatal if not -- that means we can live with some missed updates. So be
+ * sure to use these macros only if that really does not matter!
+ */
+# define PREFER_ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
#else
/* note that we gained parctical proof that theoretical problems DO occur
* if we do not properly address them. See this blog post for details:
@@ -61,12 +71,102 @@
* simply go ahead and do without them - use mutexes or other things. The
* code needs to be checked against all those cases. -- rgerhards, 2009-01-30
*/
+ #include <pthread.h>
+# define ATOMIC_INC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ ++(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(hlpmut); \
+ *(data) = 0; \
+ pthread_mutex_unlock(hlpmut); \
+ }
+
+# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(hlpmut); \
+ *(data) = 1; \
+ pthread_mutex_unlock(hlpmut); \
+ }
+
+ static inline int
+ ATOMIC_CAS(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) {
+ int bSuccess;
+ pthread_mutex_lock(phlpmut);
+ if(*data == oldVal) {
+ *data = newVal;
+ bSuccess = 1;
+ } else {
+ bSuccess = 0;
+ }
+ pthread_mutex_unlock(phlpmut);
+ return(bSuccess);
+ }
+
+
+ static inline int
+ ATOMIC_CAS_VAL(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ if(*data == oldVal) {
+ *data = newVal;
+ }
+ val = *data;
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+# define ATOMIC_DEC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ --(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+ static inline int
+ ATOMIC_INC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ val = ++(*data);
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+ static inline int
+ ATOMIC_DEC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ val = --(*data);
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+ static inline int
+ ATOMIC_FETCH_32BIT(int *data, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ val = (*data);
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+ static inline void
+ ATOMIC_SUB(int *data, int val, pthread_mutex_t *phlpmut) {
+ pthread_mutex_lock(phlpmut);
+ (*data) -= val;
+ pthread_mutex_unlock(phlpmut);
+ }
+#if 0
# warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!"
-# define ATOMIC_INC(data) (++(data))
-# define ATOMIC_DEC(data) (--(data))
-# define ATOMIC_DEC_AND_FETCH(data) (--(data))
-# define ATOMIC_FETCH_32BIT(data) (data)
-# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1
+# define ATOMIC_INC_AND_FETCH(data) (++(data))
+# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del
+#endif
+# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x
+# define INIT_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+# define DESTROY_ATOMIC_HELPER_MUT(x) pthread_mutex_destroy(&(x))
+
+# define PREFER_ATOMIC_INC(data) ((void) ++data)
+
#endif
#endif /* #ifndef INCLUDED_ATOMIC_H */
diff --git a/runtime/batch.h b/runtime/batch.h
index 2b3aa83e..68f48d8b 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -26,6 +26,9 @@
#ifndef BATCH_H_INCLUDED
#define BATCH_H_INCLUDED
+#include <string.h>
+#include "msg.h"
+
/* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used
* for action processing. So far, this seems acceptable, the status is simply ignored inside the
* main message queue. But over time, it could potentially be useful to split the two.
@@ -45,6 +48,16 @@ typedef enum {
struct batch_obj_s {
obj_t *pUsrp; /* pointer to user object (most often message) */
batch_state_t state; /* associated state */
+ /* work variables for action processing; these are reused for each action (or block of
+ * actions)
+ */
+ sbool bFilterOK; /* work area for filter processing (per action, reused!) */
+ sbool bPrevWasSuspended;
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ /* a cache to save malloc(), if not absolutely necessary */
+ size_t staticLenParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ /* and the same for the message length (if used) */
+ /* end action work variables */
};
/* the batch
@@ -62,11 +75,120 @@ struct batch_obj_s {
* is completed (else, the whole process does not work correctly).
*/
struct batch_s {
+ int maxElem; /* maximum number of elements that this batch supports */
int nElem; /* actual number of element in this entry */
int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */
int iDoneUpTo; /* all messages below this index have state other than RDY */
qDeqID deqID; /* ID of dequeue operation that generated this batch */
+ int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
+ sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */
batch_obj_t *pElem; /* batch elements */
};
+
+/* some inline functions (we may move this off to an object .. or not) */
+static inline void
+batchSetSingleRuleset(batch_t *pBatch, sbool val) {
+ pBatch->bSingleRuleset = val;
+}
+
+/* get the batches ruleset (if we have a single ruleset) */
+static inline ruleset_t*
+batchGetRuleset(batch_t *pBatch) {
+ return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
+}
+
+/* get the ruleset of a specifc element of the batch (index not verified!) */
+static inline ruleset_t*
+batchElemGetRuleset(batch_t *pBatch, int i) {
+ return ((msg_t*) pBatch->pElem[i].pUsrp)->pRuleset;
+}
+
+/* get number of msgs for this batch */
+static inline int
+batchNumMsgs(batch_t *pBatch) {
+ return pBatch->nElem;
+}
+
+
+/* set the status of the i-th batch element. Note that once the status is
+ * DISC, it will never be reset. So this function can NOT be used to initialize
+ * the state table. -- rgerhards, 2010-06-10
+ */
+static inline void
+batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) {
+ if(pBatch->pElem[i].state != BATCH_STATE_DISC)
+ pBatch->pElem[i].state = newState;
+}
+
+
+/* check if an element is a valid entry. We do NOT verify if the
+ * element index is valid. -- rgerhards, 2010-06-10
+ */
+static inline int
+batchIsValidElem(batch_t *pBatch, int i) {
+ return(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC);
+}
+
+
+/* copy one batch element to another.
+ * This creates a complete duplicate in those cases where
+ * it is needed. Use duplication only when absolutely necessary!
+ * rgerhards, 2010-06-10
+ */
+static inline void
+batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) {
+ memcpy(pDest, pSrc, sizeof(batch_obj_t));
+}
+
+
+/* free members of a batch "object". Note that we can not do the usual
+ * destruction as the object typically is allocated on the stack and so the
+ * object itself cannot be freed! -- rgerhards, 2010-06-15
+ */
+static inline void
+batchFree(batch_t *pBatch) {
+ int i;
+ int j;
+ for(i = 0 ; i < pBatch->maxElem ; ++i) {
+ for(j = 0 ; j < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++j) {
+ free(pBatch->pElem[i].staticActParams[j]);
+ }
+ }
+ free(pBatch->pElem);
+}
+
+
+/* initialiaze a batch "object". The record must already exist,
+ * we "just" initialize it. The max number of elements must be
+ * provided. -- rgerhards, 2010-06-15
+ */
+static inline rsRetVal
+batchInit(batch_t *pBatch, int maxElem) {
+ DEFiRet;
+ pBatch->maxElem = maxElem;
+ CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
+ // TODO: replace calloc by inidividual writes?
+finalize_it:
+ RETiRet;
+}
+
+
+/* primarily a helper for debug purposes, get human-readble name of state */
+static inline char *
+batchState2String(batch_state_t state) {
+ switch(state) {
+ case BATCH_STATE_RDY:
+ return "BATCH_STATE_RDY";
+ case BATCH_STATE_BAD:
+ return "BATCH_STATE_BAD";
+ case BATCH_STATE_SUB:
+ return "BATCH_STATE_SUB";
+ case BATCH_STATE_COMM:
+ return "BATCH_STATE_COMM";
+ case BATCH_STATE_DISC:
+ return "BATCH_STATE_DISC";
+ }
+ return "ERROR, batch state not known!";
+}
#endif /* #ifndef BATCH_H_INCLUDED */
diff --git a/runtime/cfsysline.c b/runtime/cfsysline.c
index 184c0d87..5df8e64c 100644
--- a/runtime/cfsysline.c
+++ b/runtime/cfsysline.c
@@ -217,9 +217,11 @@ static rsRetVal doGetSize(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *
case 'K': i *= 1000; ++(*pp); break;
case 'M': i *= 1000000; ++(*pp); break;
case 'G': i *= 1000000000; ++(*pp); break;
- case 'T': i *= 1000000000000; ++(*pp); break; /* tera */
- case 'P': i *= 1000000000000000; ++(*pp); break; /* peta */
- case 'E': i *= 1000000000000000000; ++(*pp); break; /* exa */
+ /* we need to use the multiplication below because otherwise
+ * the compiler gets an error during constant parsing */
+ case 'T': i *= (int64) 1000 * 1000000000; ++(*pp); break; /* tera */
+ case 'P': i *= (int64) 1000000 * 1000000000; ++(*pp); break; /* peta */
+ case 'E': i *= (int64) 1000000000 * 1000000000; ++(*pp); break; /* exa */
}
/* done */
diff --git a/runtime/debug.c b/runtime/debug.c
index 8b4950a1..64e251e5 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -46,6 +46,9 @@
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
+#if _POSIX_TIMERS <= 0
+#include <sys/time.h>
+#endif
#include "rsyslog.h"
#include "debug.h"
@@ -154,7 +157,9 @@ static pthread_key_t keyCallStack;
*/
static void dbgMutexCancelCleanupHdlr(void *pmut)
{
- pthread_mutex_unlock((pthread_mutex_t*) pmut);
+ int ret;
+ ret = pthread_mutex_unlock((pthread_mutex_t*) pmut);
+ assert(ret == 0);
}
@@ -844,6 +849,9 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
char pszWriteBuf[32*1024];
size_t lenWriteBuf;
struct timespec t;
+# if _POSIX_TIMERS <= 0
+ struct timeval tv;
+# endif
/* The bWasNL handler does not really work. It works if no thread
* switching occurs during non-NL messages. Else, things are messed
@@ -869,7 +877,14 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
if(bWasNL) {
if(bPrintTime) {
+# if _POSIX_TIMERS > 0
+ /* this is the "regular" code */
clock_gettime(CLOCK_REALTIME, &t);
+# else
+ gettimeofday(&tv, NULL);
+ t.tv_sec = tv.tv_sec;
+ t.tv_nsec = tv.tv_usec * 1000;
+# endif
lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf),
"%4.4ld.%9.9ld:", (long) (t.tv_sec % 10000), t.tv_nsec);
if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
@@ -950,6 +965,15 @@ dbgoprint(obj_t *pObj, char *fmt, ...)
va_start(ap, fmt);
lenWriteBuf = vsnprintf(pszWriteBuf, sizeof(pszWriteBuf), fmt, ap);
va_end(ap);
+ if(lenWriteBuf >= sizeof(pszWriteBuf)) {
+ /* prevent buffer overrruns and garbagge display */
+ pszWriteBuf[sizeof(pszWriteBuf) - 5] = '.';
+ pszWriteBuf[sizeof(pszWriteBuf) - 4] = '.';
+ pszWriteBuf[sizeof(pszWriteBuf) - 3] = '.';
+ pszWriteBuf[sizeof(pszWriteBuf) - 2] = '\n';
+ pszWriteBuf[sizeof(pszWriteBuf) - 1] = '\0';
+ lenWriteBuf = sizeof(pszWriteBuf);
+ }
dbgprint(pObj, pszWriteBuf, lenWriteBuf);
}
@@ -1069,7 +1093,7 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int
}
/* when we reach this point, we have a fully-initialized FuncDB! */
- ATOMIC_INC(pFuncDB->nTimesCalled);
+ PREFER_ATOMIC_INC(pFuncDB->nTimesCalled);
if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot))
if(strcmp(pFuncDB->file, "stringbuf.c")) { /* TODO: make configurable */
dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
@@ -1307,11 +1331,11 @@ dbgGetRuntimeOptions(void)
/* this is earlier in the process than the -d option, as such it
* allows us to spit out debug messages from the very beginning.
*/
- Debug = 1;
+ Debug = DEBUG_FULL;
debugging_on = 1;
} else if(!strcasecmp((char*)optname, "debugondemand")) {
/* Enables debugging, but turns off debug output */
- Debug = 1;
+ Debug = DEBUG_ONDEMAND;
debugging_on = 1;
dbgprintf("Note: debug on demand turned on via configuraton file, "
"use USR1 signal to activate.\n");
diff --git a/runtime/debug.h b/runtime/debug.h
index 8d9c1ceb..c011dd2d 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -29,6 +29,11 @@
#include <pthread.h>
#include "obj-types.h"
+/* some settings for various debug modes */
+#define DEBUG_OFF 0
+#define DEBUG_ONDEMAND 1
+#define DEBUG_FULL 2
+
/* external static data elements (some time to be replaced) */
extern int Debug; /* debug flag - read-only after startup */
extern int debugging_on; /* read-only, except on sig USR1 */
diff --git a/runtime/glbl.c b/runtime/glbl.c
index 71c2ed0d..278bc4e1 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -74,6 +74,12 @@ static uchar *pszDfltNetstrmDrvrCAF = NULL; /* default CA file for the netstrm d
static uchar *pszDfltNetstrmDrvrKeyFile = NULL; /* default key file for the netstrm driver (server) */
static uchar *pszDfltNetstrmDrvrCertFile = NULL; /* default cert file for the netstrm driver (server) */
static int bTerminateInputs = 0; /* global switch that inputs shall terminate ASAP (1=> terminate) */
+#ifndef HAVE_ATOMIC_BUILTINS
+static DEF_ATOMIC_HELPER_MUT(mutTerminateInputs);
+#endif
+#ifdef USE_UNLIMITED_SELECT
+static int iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); /* size of select() bitmask in bytes */
+#endif
/* define a macro for the simple properties' set and get functions
@@ -106,6 +112,9 @@ SIMP_PROP(DisableDNS, bDisableDNS, int)
SIMP_PROP(LocalDomain, LocalDomain, uchar*)
SIMP_PROP(StripDomains, StripDomains, char**)
SIMP_PROP(LocalHosts, LocalHosts, char**)
+#ifdef USE_UNLIMITED_SELECT
+SIMP_PROP(FdSetSize, iFdSetSize, int)
+#endif
SIMP_PROP_SET(LocalFQDNName, LocalFQDNName, uchar*)
SIMP_PROP_SET(LocalHostName, LocalHostName, uchar*)
@@ -124,7 +133,7 @@ SIMP_PROP_SET(DfltNetstrmDrvrCertFile, pszDfltNetstrmDrvrCertFile, uchar*) /* TO
*/
static int GetGlobalInputTermState(void)
{
- return ATOMIC_FETCH_32BIT(bTerminateInputs);
+ return ATOMIC_FETCH_32BIT(&bTerminateInputs, &mutTerminateInputs);
}
@@ -133,7 +142,7 @@ static int GetGlobalInputTermState(void)
*/
static void SetGlobalInputTermination(void)
{
- ATOMIC_STORE_1_TO_INT(bTerminateInputs);
+ ATOMIC_STORE_1_TO_INT(&bTerminateInputs, &mutTerminateInputs);
}
@@ -284,6 +293,9 @@ CODESTARTobjQueryInterface(glbl)
SIMP_PROP(DfltNetstrmDrvrCAF)
SIMP_PROP(DfltNetstrmDrvrKeyFile)
SIMP_PROP(DfltNetstrmDrvrCertFile)
+#ifdef USE_UNLIMITED_SELECT
+ SIMP_PROP(FdSetSize)
+#endif
#undef SIMP_PROP
finalize_it:
ENDobjQueryInterface(glbl)
@@ -317,6 +329,9 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
bDropMalPTRMsgs = 0;
bOptimizeUniProc = 1;
bPreserveFQDN = 0;
+#ifdef USE_UNLIMITED_SELECT
+ iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask);
+#endif
return RS_RET_OK;
}
@@ -340,6 +355,8 @@ BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */
CHKiRet(regCfSysLineHdlr((uchar *)"optimizeforuniprocessor", 0, eCmdHdlrBinary, NULL, &bOptimizeUniProc, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"preservefqdn", 0, eCmdHdlrBinary, NULL, &bPreserveFQDN, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
+
+ INIT_ATOMIC_HELPER_MUT(mutTerminateInputs);
ENDObjClassInit(glbl)
@@ -362,6 +379,7 @@ BEGINObjClassExit(glbl, OBJ_IS_CORE_MODULE) /* class, version */
if(LocalFQDNName != NULL)
free(LocalFQDNName);
objRelease(prop, CORE_COMPONENT);
+ DESTROY_ATOMIC_HELPER_MUT(mutTerminateInputs);
ENDObjClassExit(glbl)
/* vi:set ai:
diff --git a/runtime/glbl.h b/runtime/glbl.h
index 7506f16b..4b4bdf83 100644
--- a/runtime/glbl.h
+++ b/runtime/glbl.h
@@ -66,9 +66,20 @@ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */
void (*SetGlobalInputTermination)(void);
/* added v5, 2009-11-03 */
SIMP_PROP(ParseHOSTNAMEandTAG, int)
+ /* note: v4, v5 are already used by more recent versions, so we need to skip them! */
+ /* added v6, 2009-11-16 as part of varmojfekoj's "unlimited select()" patch
+ * Note that it must be always present, otherwise the interface would have different
+ * versions depending on compile settings, what is not acceptable.
+ * Use this property with care, it is only truly available if UNLIMITED_SELECT is enabled
+ * (I did not yet further investigate the details, because that code hopefully can be removed
+ * at some later stage).
+ */
+ SIMP_PROP(FdSetSize, int)
+ /* v7: was neeeded to mean v5+v6 - do NOT add anything else for that version! */
+ /* next change is v8! */
#undef SIMP_PROP
ENDinterface(glbl)
-#define glblCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */
+#define glblCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
/* version 2 had PreserveFQDN added - rgerhards, 2008-12-08 */
/* the remaining prototypes */
diff --git a/runtime/module-template.h b/runtime/module-template.h
index 18aad650..d05ec23c 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -49,6 +49,9 @@
#define DEF_PMOD_STATIC_DATA \
DEFobjCurrIf(obj) \
DEF_MOD_STATIC_DATA
+#define DEF_SMOD_STATIC_DATA \
+ DEFobjCurrIf(obj) \
+ DEF_MOD_STATIC_DATA
/* Macro to define the module type. Each module can only have a single type. If
@@ -69,6 +72,7 @@ static rsRetVal modGetType(eModType_t *modType) \
#define MODULE_TYPE_INPUT MODULE_TYPE(eMOD_IN)
#define MODULE_TYPE_OUTPUT MODULE_TYPE(eMOD_OUT)
#define MODULE_TYPE_PARSER MODULE_TYPE(eMOD_PARSER)
+#define MODULE_TYPE_STRGEN MODULE_TYPE(eMOD_STRGEN)
#define MODULE_TYPE_LIB \
DEF_LMOD_STATIC_DATA \
MODULE_TYPE(eMOD_LIB)
@@ -416,6 +420,18 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\
*pEtryPoint = GetParserName;\
}
+/* the following definition is the standard block for queryEtryPt for Strgen
+ * modules. This can be used if no specific handling (e.g. to cover version
+ * differences) is needed.
+ */
+#define CODEqueryEtryPt_STD_SMOD_QUERIES \
+ CODEqueryEtryPt_STD_MOD_QUERIES \
+ else if(!strcmp((char*) name, "strgen")) {\
+ *pEtryPoint = strgen;\
+ } else if(!strcmp((char*) name, "GetName")) {\
+ *pEtryPoint = GetStrgenName;\
+ }
+
/* modInit()
* This has an extra parameter, which is the specific name of the modInit
* function. That is needed for built-in modules, which must have unique
@@ -621,6 +637,21 @@ static rsRetVal parse(msg_t *pMsg)\
}
+/* strgen() - main entry point of parser modules
+ */
+#define BEGINstrgen \
+static rsRetVal strgen(msg_t *pMsg, uchar **ppBuf, size_t *pLenBuf) \
+{\
+ DEFiRet;
+
+#define CODESTARTstrgen \
+ assert(pMsg != NULL);
+
+#define ENDstrgen \
+ RETiRet;\
+}
+
+
/* function to specify the parser name. This is done via a single command which
* receives a ANSI string as parameter.
*/
@@ -632,5 +663,17 @@ static rsRetVal GetParserName(uchar **ppSz)\
}
+
+/* function to specify the strgen name. This is done via a single command which
+ * receives a ANSI string as parameter.
+ */
+#define STRGEN_NAME(x) \
+static rsRetVal GetStrgenName(uchar **ppSz)\
+{\
+ *ppSz = UCHAR_CONSTANT(x);\
+ return RS_RET_OK;\
+}
+
+
/* vim:set ai:
*/
diff --git a/runtime/modules.c b/runtime/modules.c
index fd3468d8..d7362753 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -58,11 +58,13 @@
#include "modules.h"
#include "errmsg.h"
#include "parser.h"
+#include "strgen.h"
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
DEFobjCurrIf(parser)
+DEFobjCurrIf(strgen)
/* we must ensure that only one thread at one time tries to load or unload
* modules, otherwise we may see race conditions. This first came up with
@@ -406,9 +408,10 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
{
rsRetVal localRet;
modInfo_t *pNew = NULL;
- uchar *pParserName;
+ uchar *pName;
parser_t *pParser; /* used for parser modules */
- rsRetVal (*GetParserName)(uchar**);
+ strgen_t *pStrgen; /* used for strgen modules */
+ rsRetVal (*GetName)(uchar**);
rsRetVal (*modGetType)(eModType_t *pType);
DEFiRet;
@@ -472,7 +475,6 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
localRet = (*pNew->modQueryEtryPt)((uchar*)"endTransaction", &pNew->mod.om.endTransaction);
if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) {
pNew->mod.om.endTransaction = dummyEndTransaction;
- //pNew->mod.om.beginTransaction = dummyEndTransaction;
} else if(localRet != RS_RET_OK) {
ABORT_FINALIZE(localRet);
}
@@ -488,8 +490,8 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
CHKiRet(objUse(parser, CORE_COMPONENT));
/* here, we create a new parser object */
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parse", &pNew->mod.pm.parse));
- CHKiRet((*pNew->modQueryEtryPt)((uchar*)"GetParserName", &GetParserName));
- CHKiRet(GetParserName(&pParserName));
+ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"GetParserName", &GetName));
+ CHKiRet(GetName(&pName));
CHKiRet(parser.Construct(&pParser));
/* check some features */
@@ -502,10 +504,26 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
CHKiRet(parser.SetDoPRIParsing(pParser, TRUE));
}
- CHKiRet(parser.SetName(pParser, pParserName));
+ CHKiRet(parser.SetName(pParser, pName));
CHKiRet(parser.SetModPtr(pParser, pNew));
CHKiRet(parser.ConstructFinalize(pParser));
break;
+ case eMOD_STRGEN:
+ /* first, we need to obtain the strgen object. We could not do that during
+ * init as that would have caused class bootstrap issues which are not
+ * absolutely necessary. Note that we can call objUse() multiple times, it
+ * handles that.
+ */
+ CHKiRet(objUse(strgen, CORE_COMPONENT));
+ /* here, we create a new parser object */
+ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"strgen", &pNew->mod.sm.strgen));
+ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"GetName", &GetName));
+ CHKiRet(GetName(&pName));
+ CHKiRet(strgen.Construct(&pStrgen));
+ CHKiRet(strgen.SetName(pStrgen, pName));
+ CHKiRet(strgen.SetModPtr(pStrgen, pNew));
+ CHKiRet(strgen.ConstructFinalize(pStrgen));
+ break;
}
pNew->pszName = (uchar*) strdup((char*)name); /* we do not care if strdup() fails, we can accept that */
@@ -555,14 +573,46 @@ static void modPrintList(void)
case eMOD_PARSER:
dbgprintf("parser");
break;
+ case eMOD_STRGEN:
+ dbgprintf("strgen");
+ break;
}
dbgprintf(" module.\n");
dbgprintf("Entry points:\n");
dbgprintf("\tqueryEtryPt: 0x%lx\n", (unsigned long) pMod->modQueryEtryPt);
- dbgprintf("\tdoAction: 0x%lx\n", (unsigned long) pMod->mod.om.doAction);
- dbgprintf("\tparseSelectorAct: 0x%lx\n", (unsigned long) pMod->mod.om.parseSelectorAct);
dbgprintf("\tdbgPrintInstInfo: 0x%lx\n", (unsigned long) pMod->dbgPrintInstInfo);
dbgprintf("\tfreeInstance: 0x%lx\n", (unsigned long) pMod->freeInstance);
+ switch(pMod->eType) {
+ case eMOD_OUT:
+ dbgprintf("Output Module Entry Points:\n");
+ dbgprintf("\tdoAction: 0x%lx\n", (unsigned long) pMod->mod.om.doAction);
+ dbgprintf("\tparseSelectorAct: 0x%lx\n", (unsigned long) pMod->mod.om.parseSelectorAct);
+ dbgprintf("\ttryResume: 0x%lx\n", (unsigned long) pMod->tryResume);
+ dbgprintf("\tdoHUP: 0x%lx\n", (unsigned long) pMod->doHUP);
+ dbgprintf("\tBeginTransaction: 0x%lx\n", (unsigned long)
+ ((pMod->mod.om.beginTransaction == dummyBeginTransaction) ?
+ 0 : pMod->mod.om.beginTransaction));
+ dbgprintf("\tEndTransaction: 0x%lx\n", (unsigned long)
+ ((pMod->mod.om.endTransaction == dummyEndTransaction) ?
+ 0 : pMod->mod.om.endTransaction));
+ break;
+ case eMOD_IN:
+ dbgprintf("Input Module Entry Points\n");
+ dbgprintf("\trunInput: 0x%lx\n", (unsigned long) pMod->mod.im.runInput);
+ dbgprintf("\twillRun: 0x%lx\n", (unsigned long) pMod->mod.im.willRun);
+ dbgprintf("\tafterRun: 0x%lx\n", (unsigned long) pMod->mod.im.afterRun);
+ break;
+ case eMOD_LIB:
+ break;
+ case eMOD_PARSER:
+ dbgprintf("Parser Module Entry Points\n");
+ dbgprintf("\tparse: 0x%lx\n", (unsigned long) pMod->mod.pm.parse);
+ break;
+ case eMOD_STRGEN:
+ dbgprintf("Strgen Module Entry Points\n");
+ dbgprintf("\tstrgen: 0x%lx\n", (unsigned long) pMod->mod.sm.strgen);
+ break;
+ }
dbgprintf("\n");
pMod = GetNxt(pMod); /* done, go next */
}
diff --git a/runtime/modules.h b/runtime/modules.h
index 62f86ded..49586e8d 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -54,7 +54,8 @@ typedef enum eModType_ {
eMOD_IN = 0, /* input module */
eMOD_OUT = 1, /* output module */
eMOD_LIB = 2, /* library module */
- eMOD_PARSER = 3 /* parser module */
+ eMOD_PARSER = 3,/* parser module */
+ eMOD_STRGEN = 4 /* strgen module */
} eModType_t;
@@ -122,6 +123,9 @@ struct modInfo_s {
struct { /* data for parser modules */
rsRetVal (*parse)(msg_t*);
} pm;
+ struct { /* data for strgen modules */
+ rsRetVal (*strgen)(msg_t*, uchar**, size_t *);
+ } sm;
} mod;
void *pModHdlr; /* handler to the dynamic library holding the module */
# ifdef DEBUG
@@ -132,6 +136,7 @@ struct modInfo_s {
# endif
};
+
/* interfaces */
BEGINinterface(module) /* name must also be changed in ENDinterface macro! */
modInfo_t *(*GetNxt)(modInfo_t *pThis);
@@ -154,7 +159,4 @@ PROTOTYPEObj(module);
/* TODO: remove them below (means move the config init code) -- rgerhards, 2008-02-19 */
extern uchar *pModDir; /* read-only after startup */
-
#endif /* #ifndef MODULES_H_INCLUDED */
-/* vi:set ai:
- */
diff --git a/runtime/msg.c b/runtime/msg.c
index 6424b03a..1b188263 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -35,6 +35,8 @@
#include <string.h>
#include <assert.h>
#include <ctype.h>
+#include <sys/socket.h>
+#include <netdb.h>
#if HAVE_MALLOC_H
# include <malloc.h>
#endif
@@ -51,6 +53,7 @@
#include "unicode-helper.h"
#include "ruleset.h"
#include "prop.h"
+#include "net.h"
/* static data */
DEFobjStaticHelpers
@@ -59,6 +62,7 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(glbl)
DEFobjCurrIf(regexp)
DEFobjCurrIf(prop)
+DEFobjCurrIf(net)
static struct {
uchar *pszName;
@@ -274,8 +278,13 @@ static char *syslog_severity_names[8] = { "emerg", "alert", "crit", "err", "warn
static char *syslog_number_names[24] = { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14",
"15", "16", "17", "18", "19", "20", "21", "22", "23" };
+/* global variables */
+#if defined(HAVE_MALLOC_TRIM) && !defined(HAVE_ATOMIC_BUILTINS)
+static pthread_mutex_t mutTrimCtr; /* mutex to handle malloc trim */
+#endif
+
/* some forward declarations */
-static int getAPPNAMELen(msg_t *pM, bool bLockMutex);
+static int getAPPNAMELen(msg_t *pM, sbool bLockMutex);
static inline int getProtocolVersion(msg_t *pM)
@@ -284,6 +293,41 @@ static inline int getProtocolVersion(msg_t *pM)
}
+/* do a DNS reverse resolution, if not already done, reflect status
+ * rgerhards, 2009-11-16
+ */
+static inline rsRetVal
+resolveDNS(msg_t *pMsg) {
+ rsRetVal localRet;
+ prop_t *propFromHost = NULL;
+ prop_t *propFromHostIP = NULL;
+ uchar fromHost[NI_MAXHOST];
+ uchar fromHostIP[NI_MAXHOST];
+ uchar fromHostFQDN[NI_MAXHOST];
+ DEFiRet;
+
+ CHKiRet(objUse(net, CORE_COMPONENT));
+ if(pMsg->msgFlags & NEEDS_DNSRESOL) {
+ localRet = net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP);
+ if(localRet == RS_RET_OK) {
+ MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
+ CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
+ }
+ }
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ /* best we can do: remove property */
+ MsgSetRcvFromStr(pMsg, UCHAR_CONSTANT(""), 0, &propFromHost);
+ prop.Destruct(&propFromHost);
+ }
+ if(propFromHost != NULL)
+ prop.Destruct(&propFromHost);
+ if(propFromHostIP != NULL)
+ prop.Destruct(&propFromHostIP);
+ RETiRet;
+}
+
+
static inline void
getInputName(msg_t *pM, uchar **ppsz, int *plen)
{
@@ -307,6 +351,7 @@ getRcvFromIP(msg_t *pM)
if(pM == NULL) {
psz = UCHAR_CONSTANT("");
} else {
+ resolveDNS(pM); /* make sure we have a resolved entry */
if(pM->pRcvFromIP == NULL)
psz = UCHAR_CONSTANT("");
else
@@ -660,7 +705,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
pM->pCSMSGID = NULL;
pM->pInputName = NULL;
pM->pRcvFromIP = NULL;
- pM->pRcvFrom = NULL;
+ pM->rcvFrom.pRcvFrom = NULL;
pM->pRuleset = NULL;
memset(&pM->tRcvdAt, 0, sizeof(pM->tRcvdAt));
memset(&pM->tTIMESTAMP, 0, sizeof(pM->tTIMESTAMP));
@@ -744,10 +789,13 @@ static inline void freeHOSTNAME(msg_t *pThis)
BEGINobjDestruct(msg) /* be sure to specify the object type also in END and CODESTART macros! */
int currRefCount;
+# if HAVE_MALLOC_TRIM
+ int currCnt;
+# endif
CODESTARTobjDestruct(msg)
/* DEV Debugging only ! dbgprintf("msgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pThis, pThis->iRefCount - 1); */
# ifdef HAVE_ATOMIC_BUILTINS
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, NULL);
# else
MsgLock(pThis);
currRefCount = --pThis->iRefCount;
@@ -761,8 +809,12 @@ CODESTARTobjDestruct(msg)
freeHOSTNAME(pThis);
if(pThis->pInputName != NULL)
prop.Destruct(&pThis->pInputName);
- if(pThis->pRcvFrom != NULL)
- prop.Destruct(&pThis->pRcvFrom);
+ if((pThis->msgFlags & NEEDS_DNSRESOL) == 0) {
+ if(pThis->rcvFrom.pRcvFrom != NULL)
+ prop.Destruct(&pThis->rcvFrom.pRcvFrom);
+ } else {
+ free(pThis->rcvFrom.pfrominet);
+ }
if(pThis->pRcvFromIP != NULL)
prop.Destruct(&pThis->pRcvFromIP);
free(pThis->pszRcvdAt3164);
@@ -799,7 +851,8 @@ CODESTARTobjDestruct(msg)
* that we trim too often when the counter wraps.
*/
static unsigned iTrimCtr = 1;
- if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) {
+ currCnt = ATOMIC_INC_AND_FETCH(&iTrimCtr, &mutTrimCtr);
+ if(currCnt % 100000 == 0) {
malloc_trim(128*1024);
}
}
@@ -848,6 +901,7 @@ ENDobjDestruct(msg)
msg_t* MsgDup(msg_t* pOld)
{
msg_t* pNew;
+ rsRetVal localRet;
assert(pOld != NULL);
@@ -868,9 +922,19 @@ msg_t* MsgDup(msg_t* pOld)
pNew->iLenMSG = pOld->iLenMSG;
pNew->iLenTAG = pOld->iLenTAG;
pNew->iLenHOSTNAME = pOld->iLenHOSTNAME;
- if(pOld->pRcvFrom != NULL) {
- pNew->pRcvFrom = pOld->pRcvFrom;
- prop.AddRef(pNew->pRcvFrom);
+ if((pOld->msgFlags & NEEDS_DNSRESOL) == 1) {
+ localRet = msgSetFromSockinfo(pNew, pOld->rcvFrom.pfrominet);
+ if(localRet != RS_RET_OK) {
+ /* if something fails, we accept loss of this property, it is
+ * better than losing the whole message.
+ */
+ pNew->msgFlags &= ~NEEDS_DNSRESOL;
+ }
+ } else {
+ if(pOld->rcvFrom.pRcvFrom != NULL) {
+ pNew->rcvFrom.pRcvFrom = pOld->rcvFrom.pRcvFrom;
+ prop.AddRef(pNew->rcvFrom.pRcvFrom);
+ }
}
if(pOld->pRcvFromIP != NULL) {
pNew->pRcvFromIP = pOld->pRcvFromIP;
@@ -1000,7 +1064,7 @@ msg_t *MsgAddRef(msg_t *pM)
{
assert(pM != NULL);
# ifdef HAVE_ATOMIC_BUILTINS
- ATOMIC_INC(pM->iRefCount);
+ ATOMIC_INC(&pM->iRefCount, NULL);
# else
MsgLock(pM);
pM->iRefCount++;
@@ -1193,7 +1257,8 @@ static int getPRIi(msg_t *pM)
/* Get PRI value in text form
*/
-static inline char *getPRI(msg_t *pM)
+char *
+getPRI(msg_t *pM)
{
/* PRI is a number in the range 0..191. Thus, we use a simple lookup table to obtain the
* string value. It looks a bit clumpsy here in code ;)
@@ -1208,7 +1273,8 @@ static inline char *getPRI(msg_t *pM)
}
-static inline char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt)
+char *
+getTimeReported(msg_t *pM, enum tplFormatTypes eFmt)
{
BEGINfunc
if(pM == NULL)
@@ -1492,7 +1558,7 @@ finalize_it:
* This must be called WITHOUT the message lock being held.
* rgerhards, 2009-06-26
*/
-static inline void preparePROCID(msg_t *pM, bool bLockMutex)
+static inline void preparePROCID(msg_t *pM, sbool bLockMutex)
{
if(pM->pCSPROCID == NULL) {
if(bLockMutex == LOCK_MUTEX)
@@ -1509,7 +1575,7 @@ static inline void preparePROCID(msg_t *pM, bool bLockMutex)
#if 0
/* rgerhards, 2005-11-24
*/
-static inline int getPROCIDLen(msg_t *pM, bool bLockMutex)
+static inline int getPROCIDLen(msg_t *pM, sbool bLockMutex)
{
assert(pM != NULL);
preparePROCID(pM, bLockMutex);
@@ -1520,7 +1586,7 @@ static inline int getPROCIDLen(msg_t *pM, bool bLockMutex)
/* rgerhards, 2005-11-24
*/
-char *getPROCID(msg_t *pM, bool bLockMutex)
+char *getPROCID(msg_t *pM, sbool bLockMutex)
{
ISOBJ_TYPE_assert(pM, msg);
preparePROCID(pM, bLockMutex);
@@ -1599,7 +1665,7 @@ void MsgSetTAG(msg_t *pMsg, uchar* pszBuf, size_t lenBuf)
* if there is a TAG and, if not, if it can emulate it.
* rgerhards, 2005-11-24
*/
-static inline void tryEmulateTAG(msg_t *pM, bool bLockMutex)
+static inline void tryEmulateTAG(msg_t *pM, sbool bLockMutex)
{
size_t lenTAG;
uchar bufTAG[CONF_TAG_MAXSIZE];
@@ -1627,7 +1693,7 @@ static inline void tryEmulateTAG(msg_t *pM, bool bLockMutex)
}
-static inline void
+void
getTAG(msg_t *pM, uchar **ppBuf, int *piLen)
{
if(pM == NULL) {
@@ -1652,12 +1718,13 @@ int getHOSTNAMELen(msg_t *pM)
if(pM == NULL)
return 0;
else
- if(pM->pszHOSTNAME == NULL)
- if(pM->pRcvFrom == NULL)
+ if(pM->pszHOSTNAME == NULL) {
+ resolveDNS(pM);
+ if(pM->rcvFrom.pRcvFrom == NULL)
return 0;
else
- return prop.GetStringLen(pM->pRcvFrom);
- else
+ return prop.GetStringLen(pM->rcvFrom.pRcvFrom);
+ } else
return pM->iLenHOSTNAME;
}
@@ -1668,12 +1735,13 @@ char *getHOSTNAME(msg_t *pM)
return "";
else
if(pM->pszHOSTNAME == NULL) {
- if(pM->pRcvFrom == NULL) {
+ resolveDNS(pM);
+ if(pM->rcvFrom.pRcvFrom == NULL) {
return "";
} else {
uchar *psz;
int len;
- prop.GetString(pM->pRcvFrom, &psz, &len);
+ prop.GetString(pM->rcvFrom.pRcvFrom, &psz, &len);
return (char*) psz;
}
} else {
@@ -1687,13 +1755,15 @@ uchar *getRcvFrom(msg_t *pM)
uchar *psz;
int len;
BEGINfunc
+
if(pM == NULL) {
psz = UCHAR_CONSTANT("");
} else {
- if(pM->pRcvFrom == NULL)
+ resolveDNS(pM);
+ if(pM->rcvFrom.pRcvFrom == NULL)
psz = UCHAR_CONSTANT("");
else
- prop.GetString(pM->pRcvFrom, &psz, &len);
+ prop.GetString(pM->rcvFrom.pRcvFrom, &psz, &len);
}
ENDfunc
return psz;
@@ -1740,7 +1810,7 @@ static inline char *getStructuredData(msg_t *pM)
/* check if we have a ProgramName, and, if not, try to aquire/emulate it.
* rgerhards, 2009-06-26
*/
-static inline void prepareProgramName(msg_t *pM, bool bLockMutex)
+static inline void prepareProgramName(msg_t *pM, sbool bLockMutex)
{
if(pM->pCSProgName == NULL) {
if(bLockMutex == LOCK_MUTEX)
@@ -1759,7 +1829,7 @@ static inline void prepareProgramName(msg_t *pM, bool bLockMutex)
/* get the length of the "programname" sz string
* rgerhards, 2005-10-19
*/
-int getProgramNameLen(msg_t *pM, bool bLockMutex)
+int getProgramNameLen(msg_t *pM, sbool bLockMutex)
{
assert(pM != NULL);
prepareProgramName(pM, bLockMutex);
@@ -1770,7 +1840,7 @@ int getProgramNameLen(msg_t *pM, bool bLockMutex)
/* get the "programname" as sz string
* rgerhards, 2005-10-19
*/
-uchar *getProgramName(msg_t *pM, bool bLockMutex)
+uchar *getProgramName(msg_t *pM, sbool bLockMutex)
{
prepareProgramName(pM, bLockMutex);
return (pM->pCSProgName == NULL) ? UCHAR_CONSTANT("") : rsCStrGetSzStrNoNULL(pM->pCSProgName);
@@ -1800,7 +1870,7 @@ static void tryEmulateAPPNAME(msg_t *pM)
* This must be called WITHOUT the message lock being held.
* rgerhards, 2009-06-26
*/
-static inline void prepareAPPNAME(msg_t *pM, bool bLockMutex)
+static inline void prepareAPPNAME(msg_t *pM, sbool bLockMutex)
{
if(pM->pCSAPPNAME == NULL) {
if(bLockMutex == LOCK_MUTEX)
@@ -1817,7 +1887,7 @@ static inline void prepareAPPNAME(msg_t *pM, bool bLockMutex)
/* rgerhards, 2005-11-24
*/
-char *getAPPNAME(msg_t *pM, bool bLockMutex)
+char *getAPPNAME(msg_t *pM, sbool bLockMutex)
{
assert(pM != NULL);
prepareAPPNAME(pM, bLockMutex);
@@ -1826,7 +1896,7 @@ char *getAPPNAME(msg_t *pM, bool bLockMutex)
/* rgerhards, 2005-11-24
*/
-static int getAPPNAMELen(msg_t *pM, bool bLockMutex)
+static int getAPPNAMELen(msg_t *pM, sbool bLockMutex)
{
assert(pM != NULL);
prepareAPPNAME(pM, bLockMutex);
@@ -1849,6 +1919,28 @@ void MsgSetInputName(msg_t *pThis, prop_t *inputName)
}
+/* Set the pfrominet socket store, so that we can obtain the peer at some
+ * later time. Note that we do not check if pRcvFrom is already set, so this
+ * function must only be called during message creation.
+ * NOTE: msgFlags is NOT set. While this is somewhat a violation of layers,
+ * it is done because it gains us some performance. So the caller must make
+ * sure the message flags are properly maintained. For all current callers,
+ * this is always the case and without extra effort required.
+ * rgerhards, 2009-11-17
+ */
+rsRetVal
+msgSetFromSockinfo(msg_t *pThis, struct sockaddr_storage *sa){
+ DEFiRet;
+ assert(pThis->rcvFrom.pRcvFrom == NULL);
+
+ CHKmalloc(pThis->rcvFrom.pfrominet = malloc(sizeof(struct sockaddr_storage)));
+ memcpy(pThis->rcvFrom.pfrominet, sa, sizeof(struct sockaddr_storage));
+
+finalize_it:
+ RETiRet;
+}
+
+
/* rgerhards 2008-09-10: set RcvFrom name in msg object. This calls AddRef()
* on the property, because this must be done in all current cases and there
* is no case expected where this may not be necessary.
@@ -1859,9 +1951,15 @@ void MsgSetRcvFrom(msg_t *pThis, prop_t *new)
assert(pThis != NULL);
prop.AddRef(new);
- if(pThis->pRcvFrom != NULL)
- prop.Destruct(&pThis->pRcvFrom);
- pThis->pRcvFrom = new;
+ if(pThis->msgFlags & NEEDS_DNSRESOL) {
+ if(pThis->rcvFrom.pfrominet != NULL)
+ free(pThis->rcvFrom.pfrominet);
+ pThis->msgFlags &= ~NEEDS_DNSRESOL;
+ } else {
+ if(pThis->rcvFrom.pRcvFrom != NULL)
+ prop.Destruct(&pThis->rcvFrom.pRcvFrom);
+ }
+ pThis->rcvFrom.pRcvFrom = new;
}
@@ -3054,7 +3152,7 @@ static rsRetVal msgConstructFinalizer(msg_t *pThis)
* rgerhards, 2008-01-14
*/
static rsRetVal
-MsgGetSeverity(obj_t *pThis, int *piSeverity)
+MsgGetSeverity(obj_t_ptr pThis, int *piSeverity)
{
ISOBJ_TYPE_assert(pThis, msg);
assert(piSeverity != NULL);
@@ -3087,6 +3185,10 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE)
funcUnlock = MsgLockingDummy;
funcDeleteMutex = MsgLockingDummy;
funcMsgPrepareEnqueue = MsgLockingDummy;
+ /* some more inits */
+# if HAVE_MALLOC_TRIM
+ INIT_ATOMIC_HELPER_MUT(mutTrimCtr);
+# endif
ENDObjClassInit(msg)
/* vim:set ai:
*/
diff --git a/runtime/msg.h b/runtime/msg.h
index 9101cef7..d42f1de2 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -32,6 +32,7 @@
#include "obj.h"
#include "syslogd-types.h"
#include "template.h"
+#include "atomic.h"
/* rgerhards 2004-11-08: The following structure represents a
@@ -59,14 +60,8 @@ struct msg {
flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
once data has entered the queue, this property is no longer needed. */
pthread_mutex_t mut;
- bool bDoLock; /* use the mutex? */
+ sbool bDoLock; /* use the mutex? */
short iRefCount; /* reference counter (0 = unused) */
- /* background: the hostname is not present on "regular" messages
- * received via UNIX domain sockets from the same machine. However,
- * it is available when we have a forwarder (e.g. rfc3195d) using local
- * sockets. All in all, the parser would need parse templates, that would
- * resolve all these issues... rgerhards, 2005-10-06
- */
short iSeverity; /* the severity 0..7 */
short iFacility; /* Facility code 0 .. 23*/
short offAfterPRI; /* offset, at which raw message WITHOUT PRI part starts in pszRawMsg */
@@ -94,8 +89,12 @@ struct msg {
cstr_t *pCSPROCID; /* PROCID */
cstr_t *pCSMSGID; /* MSGID */
prop_t *pInputName; /* input name property */
- prop_t *pRcvFrom; /* name of system message was received from */
prop_t *pRcvFromIP; /* IP of system message was received from */
+ union {
+ prop_t *pRcvFrom;/* name of system message was received from */
+ struct sockaddr_storage *pfrominet; /* unresolved name */
+ } rcvFrom;
+
ruleset_t *pRuleset; /* ruleset to be used for processing this message */
time_t ttGenTime; /* time msg object was generated, same as tRcvdAt, but a Unix timestamp.
While this field looks redundant, it is required because a Unix timestamp
@@ -113,8 +112,8 @@ struct msg {
uchar *pszTAG; /* pointer to tag value */
uchar szBuf[CONF_TAG_BUFSIZE];
} TAG;
- char pszTimestamp3164[16];
- char pszTimestamp3339[33];
+ char pszTimestamp3164[CONST_LEN_TIMESTAMP_3164 + 1];
+ char pszTimestamp3339[CONST_LEN_TIMESTAMP_3339 + 1];
char pszTIMESTAMP_SecFrac[7]; /* Note: a pointer is 64 bits/8 char, so this is actually fewer than a pointer! */
char pszRcvdAt_SecFrac[7]; /* same as above. Both are fractional seconds for their respective timestamp */
};
@@ -129,6 +128,9 @@ struct msg {
#define MARK 0x008 /* this message is a mark */
#define NEEDS_PARSING 0x010 /* raw message, must be parsed before processing can be done */
#define PARSE_HOSTNAME 0x020 /* parse the hostname during message parsing */
+#define NEEDS_DNSRESOL 0x040 /* fromhost address is unresolved and must be locked up via DNS reverse lookup first */
+#define NEEDS_ACLCHK_U 0x080 /* check UDP ACLs after DNS resolution has been done in main queue consumer */
+#define NO_PRI_IN_RAW 0x100 /* rawmsg does not include a PRI (Solaris!), but PRI is already set correctly in the msg object */
/* function prototypes
@@ -148,6 +150,7 @@ void MsgSetTAG(msg_t *pMsg, uchar* pszBuf, size_t lenBuf);
void MsgSetRuleset(msg_t *pMsg, ruleset_t*);
rsRetVal MsgSetFlowControlType(msg_t *pMsg, flowControl_t eFlowCtl);
rsRetVal MsgSetStructuredData(msg_t *pMsg, char* pszStrucData);
+rsRetVal msgSetFromSockinfo(msg_t *pThis, struct sockaddr_storage *sa);
void MsgSetRcvFrom(msg_t *pMsg, prop_t*);
void MsgSetRcvFromStr(msg_t *pMsg, uchar* pszRcvFrom, int, prop_t **);
rsRetVal MsgSetRcvFromIP(msg_t *pMsg, prop_t*);
@@ -164,19 +167,22 @@ char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
rsRetVal MsgEnableThreadSafety(void);
uchar *getRcvFrom(msg_t *pM);
+void getTAG(msg_t *pM, uchar **ppBuf, int *piLen);
+char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt);
+char *getPRI(msg_t *pMsg);
/* TODO: remove these five (so far used in action.c) */
uchar *getMSG(msg_t *pM);
char *getHOSTNAME(msg_t *pM);
-char *getPROCID(msg_t *pM, bool bLockMutex);
-char *getAPPNAME(msg_t *pM, bool bLockMutex);
+char *getPROCID(msg_t *pM, sbool bLockMutex);
+char *getAPPNAME(msg_t *pM, sbool bLockMutex);
int getMSGLen(msg_t *pM);
char *getHOSTNAME(msg_t *pM);
int getHOSTNAMELen(msg_t *pM);
-uchar *getProgramName(msg_t *pM, bool bLockMutex);
-int getProgramNameLen(msg_t *pM, bool bLockMutex);
+uchar *getProgramName(msg_t *pM, sbool bLockMutex);
+int getProgramNameLen(msg_t *pM, sbool bLockMutex);
uchar *getRcvFrom(msg_t *pM);
rsRetVal propNameToID(cstr_t *pCSPropName, propid_t *pPropID);
uchar *propIDToName(propid_t propID);
diff --git a/runtime/net.c b/runtime/net.c
index 85c5cc11..7653ea1d 100644
--- a/runtime/net.c
+++ b/runtime/net.c
@@ -502,8 +502,8 @@ static inline void MaskIP4 (struct in_addr *addr, uint8_t bits) {
addr->s_addr &= htonl(0xffffffff << (32 - bits));
}
-#define SIN(sa) ((struct sockaddr_in *)(sa))
-#define SIN6(sa) ((struct sockaddr_in6 *)(sa))
+#define SIN(sa) ((struct sockaddr_in *)(void*)(sa))
+#define SIN6(sa) ((struct sockaddr_in6 *)(void*)(sa))
/* This is a cancel-safe getnameinfo() version, because we learned
@@ -892,15 +892,18 @@ rsRetVal addAllowedSenderLine(char* pName, uchar** ppRestOfConfLine)
* including IPv4/v6 as well as domain name wildcards.
* This is a helper to isAllowedSender. As it is only called once, it is
* declared inline.
- * Returns 0 if they do not match, something else otherwise.
- * contributed 1007-07-16 by mildew@gmail.com
+ * Returns 0 if they do not match, 1 if they match and 2 if a DNS name would have been required.
+ * contributed 2007-07-16 by mildew@gmail.com
*/
-static inline int MaskCmp(struct NetAddr *pAllow, uint8_t bits, struct sockaddr *pFrom, const char *pszFromHost)
+static inline int
+MaskCmp(struct NetAddr *pAllow, uint8_t bits, struct sockaddr *pFrom, const char *pszFromHost, int bChkDNS)
{
assert(pAllow != NULL);
assert(pFrom != NULL);
if(F_ISSET(pAllow->flags, ADDR_NAME)) {
+ if(bChkDNS == 0)
+ return 2;
dbgprintf("MaskCmp: host=\"%s\"; pattern=\"%s\"\n", pszFromHost, pAllow->addr.HostWildcard);
# if !defined(FNM_CASEFOLD)
@@ -967,18 +970,22 @@ static inline int MaskCmp(struct NetAddr *pAllow, uint8_t bits, struct sockaddr
/* check if a sender is allowed. The root of the the allowed sender.
* list must be proveded by the caller. As such, this function can be
* used to check both UDP and TCP allowed sender lists.
- * returns 1, if the sender is allowed, 0 otherwise.
+ * returns 1, if the sender is allowed, 0 if not and 2 if we could not
+ * obtain a result because we would need a dns name, which we don't have
+ * (2 was added rgerhards, 2009-11-16).
* rgerhards, 2005-09-26
*/
-static int isAllowedSender(uchar *pszType, struct sockaddr *pFrom, const char *pszFromHost)
+static int isAllowedSender2(uchar *pszType, struct sockaddr *pFrom, const char *pszFromHost, int bChkDNS)
{
struct AllowedSenders *pAllow;
struct AllowedSenders *pAllowRoot;
+ int bNeededDNS = 0; /* partial check because we could not resolve DNS? */
+ int ret;
assert(pFrom != NULL);
if(setAllowRoot(&pAllowRoot, pszType) != RS_RET_OK)
- return 0; /* if something went wrong, we denie access - that's the better choice... */
+ return 0; /* if something went wrong, we deny access - that's the better choice... */
if(pAllowRoot == NULL)
return 1; /* checking disabled, everything is valid! */
@@ -990,10 +997,20 @@ static int isAllowedSender(uchar *pszType, struct sockaddr *pFrom, const char *p
* that the sender is disallowed.
*/
for(pAllow = pAllowRoot ; pAllow != NULL ; pAllow = pAllow->pNext) {
- if (MaskCmp (&(pAllow->allowedSender), pAllow->SignificantBits, pFrom, pszFromHost))
+ ret = MaskCmp (&(pAllow->allowedSender), pAllow->SignificantBits, pFrom, pszFromHost, bChkDNS);
+ if(ret == 1)
return 1;
+ else if(ret == 2)
+ bNeededDNS = 2;
}
- return 0;
+ return bNeededDNS;
+}
+
+
+/* legacy API, not to be used any longer */
+static int
+isAllowedSender(uchar *pszType, struct sockaddr *pFrom, const char *pszFromHost) {
+ return isAllowedSender2(pszType, pFrom, pszFromHost, 1);
}
@@ -1165,12 +1182,12 @@ void debugListenInfo(int fd, char *type)
switch(sa.sa_family) {
case PF_INET:
szFamily = "IPv4";
- ipv4 = (struct sockaddr_in*) &sa;
+ ipv4 = (struct sockaddr_in*)(void*) &sa;
port = ntohs(ipv4->sin_port);
break;
case PF_INET6:
szFamily = "IPv6";
- ipv6 = (struct sockaddr_in6*) &sa;
+ ipv6 = (struct sockaddr_in6*)(void*) &sa;
port = ntohs(ipv6->sin6_port);
break;
default:
@@ -1533,12 +1550,36 @@ static int CmpHost(struct sockaddr_storage *s1, struct sockaddr_storage* s2, siz
ret = memcmp(s1, s2, socklen);
}
-dbgprintf("CmpHost returns %d\n", ret);
finalize_it:
return ret;
}
+
+/* check if restrictions (ALCs) exists. The goal of this function is to disable the
+ * somewhat time-consuming ACL checks if no restrictions are defined (the usual case).
+ * This also permits to gain some speedup by using firewall-based ACLs instead of
+ * rsyslog ACLs (the recommended method.
+ * rgerhards, 2009-11-16
+ */
+static rsRetVal
+HasRestrictions(uchar *pszType, int *bHasRestrictions) {
+ struct AllowedSenders *pAllowRoot;
+ DEFiRet;
+
+ CHKiRet(setAllowRoot(&pAllowRoot, pszType));
+
+ *bHasRestrictions = (pAllowRoot == NULL) ? 0 : 1;
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ *bHasRestrictions = 1; /* in this case it is better to check individually */
+ DBGPRINTF("Error %d trying to obtain ACL restriction state of '%s'\n", iRet, pszType);
+ }
+ RETiRet;
+}
+
+
/* queryInterface function
* rgerhards, 2008-03-05
*/
@@ -1562,12 +1603,14 @@ CODESTARTobjQueryInterface(net)
pIf->create_udp_socket = create_udp_socket;
pIf->closeUDPListenSockets = closeUDPListenSockets;
pIf->isAllowedSender = isAllowedSender;
+ pIf->isAllowedSender2 = isAllowedSender2;
pIf->should_use_so_bsdcompat = should_use_so_bsdcompat;
pIf->getLocalHostname = getLocalHostname;
pIf->AddPermittedPeer = AddPermittedPeer;
pIf->DestructPermittedPeers = DestructPermittedPeers;
pIf->PermittedPeerWildcardMatch = PermittedPeerWildcardMatch;
pIf->CmpHost = CmpHost;
+ pIf->HasRestrictions = HasRestrictions;
/* data members */
pIf->pACLAddHostnameOnFail = &ACLAddHostnameOnFail;
pIf->pACLDontResolve = &ACLDontResolve;
diff --git a/runtime/net.h b/runtime/net.h
index ec364b1c..101ce79d 100644
--- a/runtime/net.h
+++ b/runtime/net.h
@@ -139,7 +139,7 @@ BEGINinterface(net) /* name must also be changed in ENDinterface macro! */
void (*debugListenInfo)(int fd, char *type);
int *(*create_udp_socket)(uchar *hostname, uchar *LogPort, int bIsServer);
void (*closeUDPListenSockets)(int *finet);
- int (*isAllowedSender)(uchar *pszType, struct sockaddr *pFrom, const char *pszFromHost);
+ int (*isAllowedSender)(uchar *pszType, struct sockaddr *pFrom, const char *pszFromHost); /* deprecated! */
rsRetVal (*getLocalHostname)(uchar**);
int (*should_use_so_bsdcompat)(void);
/* permitted peer handling should be replaced by something better (see comments above) */
@@ -148,11 +148,14 @@ BEGINinterface(net) /* name must also be changed in ENDinterface macro! */
rsRetVal (*PermittedPeerWildcardMatch)(permittedPeers_t *pPeer, uchar *pszNameToMatch, int *pbIsMatching);
/* v5 interface additions */
int (*CmpHost)(struct sockaddr_storage *, struct sockaddr_storage*, size_t);
+ /* v6 interface additions - 2009-11-16 */
+ rsRetVal (*HasRestrictions)(uchar *, int *bHasRestrictions);
+ int (*isAllowedSender2)(uchar *pszType, struct sockaddr *pFrom, const char *pszFromHost, int bChkDNS);
/* data members - these should go away over time... TODO */
int *pACLAddHostnameOnFail; /* add hostname to acl when DNS resolving has failed */
int *pACLDontResolve; /* add hostname to acl instead of resolving it to IP(s) */
ENDinterface(net)
-#define netCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */
+#define netCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
/* prototypes */
PROTOTYPEObj(net);
diff --git a/runtime/netstrms.c b/runtime/netstrms.c
index 6b28e7ea..e9ff2568 100644
--- a/runtime/netstrms.c
+++ b/runtime/netstrms.c
@@ -36,6 +36,7 @@
#include "nsd.h"
#include "netstrm.h"
#include "nssel.h"
+#include "nspoll.h"
#include "netstrms.h"
MODULE_TYPE_LIB
@@ -304,6 +305,7 @@ ENDObjClassInit(netstrms)
BEGINmodExit
CODESTARTmodExit
nsselClassExit();
+ nspollClassExit();
netstrmsClassExit();
netstrmClassExit(); /* we use this object, so we must exit it after we are finished */
ENDmodExit
@@ -322,6 +324,7 @@ CODESTARTmodInit
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(netstrmClassInit(pModInfo));
CHKiRet(nsselClassInit(pModInfo));
+ CHKiRet(nspollClassInit(pModInfo));
CHKiRet(netstrmsClassInit(pModInfo));
ENDmodInit
/* vi:set ai:
diff --git a/runtime/nsd.h b/runtime/nsd.h
index 8668c934..e5b9320b 100644
--- a/runtime/nsd.h
+++ b/runtime/nsd.h
@@ -87,4 +87,13 @@ BEGINinterface(nsdsel) /* name must also be changed in ENDinterface macro! */
ENDinterface(nsdsel)
#define nsdselCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+/* interface for the epoll call */
+BEGINinterface(nsdpoll) /* name must also be changed in ENDinterface macro! */
+ rsRetVal (*Construct)(nsdpoll_t **ppThis);
+ rsRetVal (*Destruct)(nsdpoll_t **ppThis);
+ rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op);
+ rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr);
+ENDinterface(nsdpoll)
+#define nsdpollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+
#endif /* #ifndef INCLUDED_NSD_H */
diff --git a/runtime/nsd_ptcp.c b/runtime/nsd_ptcp.c
index fe31ab40..ca00749c 100644
--- a/runtime/nsd_ptcp.c
+++ b/runtime/nsd_ptcp.c
@@ -48,6 +48,7 @@
#include "netstrms.h"
#include "netstrm.h"
#include "nsdsel_ptcp.h"
+#include "nsdpoll_ptcp.h"
#include "nsd_ptcp.h"
MODULE_TYPE_LIB
@@ -562,6 +563,7 @@ finalize_it:
static rsRetVal
Rcv(nsd_t *pNsd, uchar *pRcvBuf, ssize_t *pLenBuf)
{
+ char errStr[1024];
DEFiRet;
nsd_ptcp_t *pThis = (nsd_ptcp_t*) pNsd;
ISOBJ_TYPE_assert(pThis, nsd_ptcp);
@@ -571,7 +573,9 @@ Rcv(nsd_t *pNsd, uchar *pRcvBuf, ssize_t *pLenBuf)
if(*pLenBuf == 0) {
ABORT_FINALIZE(RS_RET_CLOSED);
} else if (*pLenBuf < 0) {
- ABORT_FINALIZE(RS_RET_ERR);
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ dbgprintf("error during recv on NSD %p: %s\n", pNsd, errStr);
+ ABORT_FINALIZE(RS_RET_RCV_ERR);
}
finalize_it:
@@ -821,6 +825,9 @@ ENDObjClassInit(nsd_ptcp)
BEGINmodExit
CODESTARTmodExit
+# ifdef HAVE_EPOLL_CREATE /* module only available if epoll() is supported! */
+ nsdpoll_ptcpClassExit();
+# endif
nsdsel_ptcpClassExit();
nsd_ptcpClassExit();
ENDmodExit
@@ -839,6 +846,9 @@ CODESTARTmodInit
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(nsd_ptcpClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
CHKiRet(nsdsel_ptcpClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+# ifdef HAVE_EPOLL_CREATE /* module only available if epoll() is supported! */
+ CHKiRet(nsdpoll_ptcpClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+# endif
ENDmodInit
/* vi:set ai:
*/
diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c
new file mode 100644
index 00000000..51006707
--- /dev/null
+++ b/runtime/nsdpoll_ptcp.c
@@ -0,0 +1,288 @@
+/* nsdpoll_ptcp.c
+ *
+ * An implementation of the nsd epoll() interface for plain tcp sockets.
+ *
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#include "config.h"
+
+#ifdef HAVE_EPOLL_CREATE /* this module requires epoll! */
+
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#if HAVE_SYS_EPOLL_H
+# include <sys/epoll.h>
+#endif
+
+#include "rsyslog.h"
+#include "module-template.h"
+#include "obj.h"
+#include "errmsg.h"
+#include "srUtils.h"
+#include "nspoll.h"
+#include "nsd_ptcp.h"
+#include "nsdpoll_ptcp.h"
+#include "unlimited_select.h"
+
+/* static data */
+DEFobjStaticHelpers
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+
+
+/* -START------------------------- helpers for event list ------------------------------------ */
+
+/* add new entry to list. We assume that the fd is not already present and DO NOT check this!
+ * Returns newly created entry in pEvtLst.
+ * Note that we currently need to use level-triggered mode, because the upper layers do not work
+ * in parallel. As such, in edge-triggered mode we may not get notified, because new data comes
+ * in after we have read everything that was present. To use ET mode, we need to change the upper
+ * peers so that they immediately start a new wait before processing the data read. That obviously
+ * requires more elaborate redesign and we postpone this until the current more simplictic mode has
+ * been proven OK in practice.
+ * rgerhards, 2009-11-18
+ */
+static inline rsRetVal
+addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, nsdpoll_epollevt_lst_t **pEvtLst) {
+ nsdpoll_epollevt_lst_t *pNew;
+ DEFiRet;
+
+ CHKmalloc(pNew = (nsdpoll_epollevt_lst_t*) malloc(sizeof(nsdpoll_epollevt_lst_t)));
+ pNew->id = id;
+ pNew->pUsr = pUsr;
+ pNew->pSock = pSock;
+ pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */
+ if(mode & NSDPOLL_IN)
+ pNew->event.events |= EPOLLIN;
+ if(mode & NSDPOLL_OUT)
+ pNew->event.events |= EPOLLOUT;
+ pNew->event.data.u64 = (uint64) pNew;
+ pNew->pNext = pThis->pRoot;
+ pThis->pRoot = pNew;
+ *pEvtLst = pNew;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* find and unlink the entry identified by id/pUsr from the list.
+ * rgerhards, 2009-11-23
+ */
+static inline rsRetVal
+unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **ppEvtLst) {
+ nsdpoll_epollevt_lst_t *pEvtLst;
+ nsdpoll_epollevt_lst_t *pPrev = NULL;
+ DEFiRet;
+
+ pEvtLst = pThis->pRoot;
+ while(pEvtLst != NULL && !(pEvtLst->id == id && pEvtLst->pUsr == pUsr)) {
+ pPrev = pEvtLst;
+ pEvtLst = pEvtLst->pNext;
+ }
+ if(pEvtLst == NULL)
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
+
+ *ppEvtLst = pEvtLst;
+
+ /* unlink */
+ if(pPrev == NULL)
+ pThis->pRoot = pEvtLst->pNext;
+ else
+ pPrev->pNext = pEvtLst->pNext;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* destruct the provided element. It must already be unlinked from the list.
+ * rgerhards, 2009-11-23
+ */
+static inline rsRetVal
+delEvent(nsdpoll_epollevt_lst_t **ppEvtLst) {
+ DEFiRet;
+ free(*ppEvtLst);
+ *ppEvtLst = NULL;
+ RETiRet;
+}
+
+
+/* -END--------------------------- helpers for event list ------------------------------------ */
+
+
+/* Standard-Constructor
+ */
+BEGINobjConstruct(nsdpoll_ptcp) /* be sure to specify the object type also in END macro! */
+# if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ DBGPRINTF("nsdpoll_ptcp uses epoll_create1()\n");
+ pThis->efd = epoll_create1(EPOLL_CLOEXEC);
+# else
+ DBGPRINTF("nsdpoll_ptcp uses epoll_create()\n");
+ pThis->efd = epoll_create(100); /* size is ignored in newer kernels, but 100 is not bad... */
+# endif
+ if(pThis->efd < 0) {
+ DBGPRINTF("epoll_create1() could not create fd\n");
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+finalize_it:
+ENDobjConstruct(nsdpoll_ptcp)
+
+
+/* destructor for the nsdpoll_ptcp object */
+BEGINobjDestruct(nsdpoll_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(nsdpoll_ptcp)
+ENDobjDestruct(nsdpoll_ptcp)
+
+
+/* Modify socket set */
+static rsRetVal
+Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) {
+ nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll;
+ nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
+ nsdpoll_epollevt_lst_t *pEventLst;
+ int errSave;
+ char errStr[512];
+ DEFiRet;
+
+ if(op == NSDPOLL_ADD) {
+ dbgprintf("adding nsdpoll entry %d/%p, sock %d\n", id, pUsr, pSock->sock);
+ CHKiRet(addEvent(pThis, id, pUsr, mode, pSock, &pEventLst));
+ if(epoll_ctl(pThis->efd, EPOLL_CTL_ADD, pSock->sock, &pEventLst->event) < 0) {
+ errSave = errno;
+ rs_strerror_r(errSave, errStr, sizeof(errStr));
+ errmsg.LogError(errSave, RS_RET_ERR_EPOLL_CTL,
+ "epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n",
+ pSock->sock, id, pUsr, mode, errStr);
+ }
+ } else if(op == NSDPOLL_DEL) {
+ dbgprintf("removing nsdpoll entry %d/%p, sock %d\n", id, pUsr, pSock->sock);
+ CHKiRet(unlinkEvent(pThis, id, pUsr, &pEventLst));
+ if(epoll_ctl(pThis->efd, EPOLL_CTL_DEL, pSock->sock, &pEventLst->event) < 0) {
+ errSave = errno;
+ rs_strerror_r(errSave, errStr, sizeof(errStr));
+ errmsg.LogError(errSave, RS_RET_ERR_EPOLL_CTL,
+ "epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n",
+ pSock->sock, id, pUsr, mode, errStr);
+ ABORT_FINALIZE(RS_RET_ERR_EPOLL_CTL);
+ }
+ CHKiRet(delEvent(&pEventLst));
+ } else {
+ dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", op);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Wait for io to become ready. After the successful call, idRdy contains the
+ * id set by the caller for that i/o event, ppUsr is a pointer to a location
+ * where the user pointer shall be stored.
+ * TODO: this is a trivial implementation that only polls one event at a time. We
+ * may later extend it to poll for multiple events, what would cause less
+ * overhead.
+ * rgerhards, 2009-11-18
+ */
+static rsRetVal
+Wait(nsdpoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr) {
+ nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll;
+ nsdpoll_epollevt_lst_t *pOurEvt;
+ struct epoll_event event;
+ int nfds;
+ DEFiRet;
+
+ assert(idRdy != NULL);
+ assert(ppUsr != NULL);
+
+ nfds = epoll_wait(pThis->efd, &event, 1, timeout);
+ if(nfds == -1) {
+ if(errno == EINTR) {
+ ABORT_FINALIZE(RS_RET_EINTR);
+ } else {
+ DBGPRINTF("epoll() returned with error code %d\n", errno);
+ ABORT_FINALIZE(RS_RET_ERR_EPOLL);
+ }
+ } else if(nfds == 0) {
+ ABORT_FINALIZE(RS_RET_TIMEOUT);
+ }
+
+ /* we got a valid event, so tell the caller... */
+ pOurEvt = (nsdpoll_epollevt_lst_t*) event.data.u64;
+ *idRdy = pOurEvt->id;
+ *ppUsr = pOurEvt->pUsr;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* ------------------------------ end support for the epoll() interface ------------------------------ */
+
+
+/* queryInterface function */
+BEGINobjQueryInterface(nsdpoll_ptcp)
+CODESTARTobjQueryInterface(nsdpoll_ptcp)
+ if(pIf->ifVersion != nsdCURR_IF_VERSION) {/* check for current version, increment on each change */
+ ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
+ }
+
+ /* ok, we have the right interface, so let's fill it
+ * Please note that we may also do some backwards-compatibility
+ * work here (if we can support an older interface version - that,
+ * of course, also affects the "if" above).
+ */
+ pIf->Construct = (rsRetVal(*)(nsdpoll_t**)) nsdpoll_ptcpConstruct;
+ pIf->Destruct = (rsRetVal(*)(nsdpoll_t**)) nsdpoll_ptcpDestruct;
+ pIf->Ctl = Ctl;
+ pIf->Wait = Wait;
+finalize_it:
+ENDobjQueryInterface(nsdpoll_ptcp)
+
+
+/* exit our class
+ */
+BEGINObjClassExit(nsdpoll_ptcp, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nsdpoll_ptcp)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(errmsg, CORE_COMPONENT);
+ENDObjClassExit(nsdpoll_ptcp)
+
+
+/* Initialize the nsdpoll_ptcp class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2008-02-19
+ */
+BEGINObjClassInit(nsdpoll_ptcp, 1, OBJ_IS_CORE_MODULE) /* class, version */
+ /* request objects we use */
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+
+ /* set our own handlers */
+ENDObjClassInit(nsdpoll_ptcp)
+#endif /* #ifdef HAVE_EPOLL_CREATE this module requires epoll! */
+
+/* vi:set ai:
+ */
diff --git a/runtime/nsdpoll_ptcp.h b/runtime/nsdpoll_ptcp.h
new file mode 100644
index 00000000..cea2823d
--- /dev/null
+++ b/runtime/nsdpoll_ptcp.h
@@ -0,0 +1,60 @@
+/* An implementation of the nsd poll interface for plain tcp sockets.
+ *
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+
+#ifndef INCLUDED_NSDPOLL_PTCP_H
+#define INCLUDED_NSDPOLL_PTCP_H
+
+#include "nsd.h"
+#if HAVE_SYS_EPOLL_H
+# include <sys/epoll.h>
+#endif
+typedef nsdpoll_if_t nsdpoll_ptcp_if_t; /* we just *implement* this interface */
+/* a helper object to keep track of the epoll event records
+ * Note that we need to keep track of that list because we need to
+ * free the events when they are no longer needed.
+ */
+typedef struct nsdpoll_epollevt_lst_s nsdpoll_epollevt_lst_t;
+struct nsdpoll_epollevt_lst_s {
+#if HAVE_SYS_EPOLL_H
+ epoll_event_t event;
+#endif
+ int id;
+ void *pUsr;
+ nsd_ptcp_t *pSock; /* our associated netstream driver data */
+ nsdpoll_epollevt_lst_t *pNext;
+};
+
+/* the nsdpoll_ptcp object */
+struct nsdpoll_ptcp_s {
+ BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
+ int efd; /* file descriptor used by epoll */
+ nsdpoll_epollevt_lst_t *pRoot; /* Root of the epoll event list */
+};
+
+/* interface is defined in nsd.h, we just implement it! */
+#define nsdpoll_ptcpCURR_IF_VERSION nsdCURR_IF_VERSION
+
+/* prototypes */
+PROTOTYPEObj(nsdpoll_ptcp);
+
+#endif /* #ifndef INCLUDED_NSDPOLL_PTCP_H */
diff --git a/runtime/nsdsel_ptcp.c b/runtime/nsdsel_ptcp.c
index 41b85e0c..e2cfca7c 100644
--- a/runtime/nsdsel_ptcp.c
+++ b/runtime/nsdsel_ptcp.c
@@ -36,6 +36,7 @@
#include "errmsg.h"
#include "nsd_ptcp.h"
#include "nsdsel_ptcp.h"
+#include "unlimited_select.h"
/* static data */
DEFobjStaticHelpers
@@ -47,14 +48,23 @@ DEFobjCurrIf(glbl)
*/
BEGINobjConstruct(nsdsel_ptcp) /* be sure to specify the object type also in END macro! */
pThis->maxfds = 0;
+#ifdef USE_UNLIMITED_SELECT
+ pThis->pReadfds = calloc(1, glbl.GetFdSetSize());
+ pThis->pWritefds = calloc(1, glbl.GetFdSetSize());
+#else
FD_ZERO(&pThis->readfds);
FD_ZERO(&pThis->writefds);
+#endif
ENDobjConstruct(nsdsel_ptcp)
/* destructor for the nsdsel_ptcp object */
BEGINobjDestruct(nsdsel_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(nsdsel_ptcp)
+#ifdef USE_UNLIMITED_SELECT
+ freeFdSet(pThis->pReadfds);
+ freeFdSet(pThis->pWritefds);
+#endif
ENDobjDestruct(nsdsel_ptcp)
@@ -65,20 +75,27 @@ Add(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp)
DEFiRet;
nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds = pThis->pReadfds;
+ fd_set *pWritefds = pThis->pWritefds;
+#else
+ fd_set *pReadfds = &pThis->readfds;
+ fd_set *pWritefds = &pThis->writefds;
+#endif
ISOBJ_TYPE_assert(pSock, nsd_ptcp);
ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
switch(waitOp) {
case NSDSEL_RD:
- FD_SET(pSock->sock, &pThis->readfds);
+ FD_SET(pSock->sock, pReadfds);
break;
case NSDSEL_WR:
- FD_SET(pSock->sock, &pThis->writefds);
+ FD_SET(pSock->sock, pWritefds);
break;
case NSDSEL_RDWR:
- FD_SET(pSock->sock, &pThis->readfds);
- FD_SET(pSock->sock, &pThis->writefds);
+ FD_SET(pSock->sock, pReadfds);
+ FD_SET(pSock->sock, pWritefds);
break;
}
@@ -98,6 +115,13 @@ Select(nsdsel_t *pNsdsel, int *piNumReady)
DEFiRet;
int i;
nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds = pThis->pReadfds;
+ fd_set *pWritefds = pThis->pWritefds;
+#else
+ fd_set *pReadfds = &pThis->readfds;
+ fd_set *pWritefds = &pThis->writefds;
+#endif
ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
assert(piNumReady != NULL);
@@ -106,13 +130,13 @@ Select(nsdsel_t *pNsdsel, int *piNumReady)
// TODO: name in dbgprintf!
dbgprintf("--------<NSDSEL_PTCP> calling select, active fds (max %d): ", pThis->maxfds);
for(i = 0; i <= pThis->maxfds; ++i)
- if(FD_ISSET(i, &pThis->readfds) || FD_ISSET(i, &pThis->writefds))
+ if(FD_ISSET(i, pReadfds) || FD_ISSET(i, pWritefds))
dbgprintf("%d ", i);
dbgprintf("\n");
}
/* now do the select */
- *piNumReady = select(pThis->maxfds+1, &pThis->readfds, &pThis->writefds, NULL, NULL);
+ *piNumReady = select(pThis->maxfds+1, pReadfds, pWritefds, NULL, NULL);
RETiRet;
}
@@ -125,6 +149,13 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady)
DEFiRet;
nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds = pThis->pReadfds;
+ fd_set *pWritefds = pThis->pWritefds;
+#else
+ fd_set *pReadfds = &pThis->readfds;
+ fd_set *pWritefds = &pThis->writefds;
+#endif
ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
ISOBJ_TYPE_assert(pSock, nsd_ptcp);
@@ -132,14 +163,14 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady)
switch(waitOp) {
case NSDSEL_RD:
- *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds);
+ *pbIsReady = FD_ISSET(pSock->sock, pReadfds);
break;
case NSDSEL_WR:
- *pbIsReady = FD_ISSET(pSock->sock, &pThis->writefds);
+ *pbIsReady = FD_ISSET(pSock->sock, pWritefds);
break;
case NSDSEL_RDWR:
- *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds)
- | FD_ISSET(pSock->sock, &pThis->writefds);
+ *pbIsReady = FD_ISSET(pSock->sock, pReadfds)
+ | FD_ISSET(pSock->sock, pWritefds);
break;
}
diff --git a/runtime/nsdsel_ptcp.h b/runtime/nsdsel_ptcp.h
index 6c0c7fa7..f9ec8210 100644
--- a/runtime/nsdsel_ptcp.h
+++ b/runtime/nsdsel_ptcp.h
@@ -31,8 +31,13 @@ typedef nsdsel_if_t nsdsel_ptcp_if_t; /* we just *implement* this interface */
struct nsdsel_ptcp_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
int maxfds;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds;
+ fd_set *pWritefds;
+#else
fd_set readfds;
fd_set writefds;
+#endif
};
/* interface is defined in nsd.h, we just implement it! */
diff --git a/runtime/nspoll.c b/runtime/nspoll.c
new file mode 100644
index 00000000..64927280
--- /dev/null
+++ b/runtime/nspoll.c
@@ -0,0 +1,195 @@
+/* nspoll.c
+ *
+ * This is an io waiter interface utilizing the much-more-efficient poll/epoll API.
+ * Note that it may not always be available for a given driver. If so, that is reported
+ * back to the upper peer which then should consult a nssel-based io waiter.
+ *
+ * Work on this module begun 2009-11-18 by Rainer Gerhards.
+ *
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#include "config.h"
+
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+
+#include "rsyslog.h"
+#include "obj.h"
+#include "module-template.h"
+#include "netstrm.h"
+#include "nspoll.h"
+
+/* static data */
+DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
+
+
+/* load our low-level driver. This must be done before any
+ * driver-specific functions (allmost all...) can be carried
+ * out. Note that the driver's .ifIsLoaded is correctly
+ * initialized by calloc() and we depend on that. Please note that
+ * we do some name-mangeling. We know that each nsd driver also needs
+ * a nspoll driver. So we simply append "sel" to the nsd driver name: This,
+ * of course, means that the driver name must match these rules, but that
+ * shouldn't be a real problem.
+ * WARNING: this code is mostly identical to similar code in
+ * netstrms.c - TODO: abstract it and move it to some common place.
+ * rgerhards, 2008-04-28
+ */
+static rsRetVal
+loadDrvr(nspoll_t *pThis)
+{
+ DEFiRet;
+ uchar *pBaseDrvrName;
+ uchar szDrvrName[48]; /* 48 shall be large enough */
+
+ pBaseDrvrName = pThis->pBaseDrvrName;
+ if(pBaseDrvrName == NULL) /* if no drvr name is set, use system default */
+ pBaseDrvrName = glbl.GetDfltNetstrmDrvr();
+ if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmnsdpoll_%s", pBaseDrvrName) == sizeof(szDrvrName))
+ ABORT_FINALIZE(RS_RET_DRVRNAME_TOO_LONG);
+ CHKmalloc(pThis->pDrvrName = (uchar*) strdup((char*)szDrvrName));
+
+ pThis->Drvr.ifVersion = nsdCURR_IF_VERSION;
+ /* The pDrvrName+2 below is a hack to obtain the object name. It
+ * safes us to have yet another variable with the name without "lm" in
+ * front of it. If we change the module load interface, we may re-think
+ * about this hack, but for the time being it is efficient and clean
+ * enough. -- rgerhards, 2008-04-18
+ */
+ CHKiRet(obj.UseObj(__FILE__, szDrvrName+2, DONT_LOAD_LIB, (void*) &pThis->Drvr));
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pThis->pDrvrName != NULL)
+ free(pThis->pDrvrName);
+ pThis->pDrvrName = NULL;
+ }
+ RETiRet;
+}
+
+
+/* Standard-Constructor */
+BEGINobjConstruct(nspoll) /* be sure to specify the object type also in END macro! */
+ENDobjConstruct(nspoll)
+
+
+/* destructor for the nspoll object */
+BEGINobjDestruct(nspoll) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(nspoll)
+ if(pThis->pDrvrData != NULL)
+ pThis->Drvr.Destruct(&pThis->pDrvrData);
+
+ /* and now we must release our driver, if we got one. We use the presence of
+ * a driver name string as load indicator (because we also need that string
+ * to release the driver
+ */
+ if(pThis->pDrvrName != NULL) {
+ obj.ReleaseObj(__FILE__, pThis->pDrvrName+2, DONT_LOAD_LIB, (void*) &pThis->Drvr);
+ free(pThis->pDrvrName);
+ }
+ENDobjDestruct(nspoll)
+
+
+/* ConstructionFinalizer */
+static rsRetVal
+ConstructFinalize(nspoll_t *pThis)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, nspoll);
+ CHKiRet(loadDrvr(pThis));
+ CHKiRet(pThis->Drvr.Construct(&pThis->pDrvrData));
+finalize_it:
+ RETiRet;
+}
+
+
+/* Carries out the actual wait (all done in lower layers)
+ */
+static rsRetVal
+Wait(nspoll_t *pThis, int timeout, int *idRdy, void **ppUsr) {
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, nspoll);
+ assert(idRdy != NULL);
+ iRet = pThis->Drvr.Wait(pThis->pDrvrData, timeout, idRdy, ppUsr);
+ RETiRet;
+}
+
+
+/* semantics like the epoll_ctl() function, does the same thing.
+ * rgerhards, 2009-11-18
+ */
+static rsRetVal
+Ctl(nspoll_t *pThis, netstrm_t *pStrm, int id, void *pUsr, int mode, int op) {
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, nspoll);
+ iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pStrm->pDrvrData, id, pUsr, mode, op);
+ RETiRet;
+}
+
+
+/* queryInterface function */
+BEGINobjQueryInterface(nspoll)
+CODESTARTobjQueryInterface(nspoll)
+ if(pIf->ifVersion != nspollCURR_IF_VERSION) {/* check for current version, increment on each change */
+ ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
+ }
+
+ /* ok, we have the right interface, so let's fill it
+ * Please note that we may also do some backwards-compatibility
+ * work here (if we can support an older interface version - that,
+ * of course, also affects the "if" above).
+ */
+ pIf->Construct = nspollConstruct;
+ pIf->ConstructFinalize = ConstructFinalize;
+ pIf->Destruct = nspollDestruct;
+ pIf->Wait = Wait;
+ pIf->Ctl = Ctl;
+finalize_it:
+ENDobjQueryInterface(nspoll)
+
+
+/* exit our class
+ */
+BEGINObjClassExit(nspoll, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nspoll)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ENDObjClassExit(nspoll)
+
+
+/* Initialize the nspoll class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2008-02-19
+ */
+BEGINObjClassInit(nspoll, 1, OBJ_IS_CORE_MODULE) /* class, version */
+ /* request objects we use */
+ DBGPRINTF("doing nspollClassInit\n");
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+
+ /* set our own handlers */
+ENDObjClassInit(nspoll)
+/* vi:set ai:
+ */
diff --git a/runtime/nspoll.h b/runtime/nspoll.h
new file mode 100644
index 00000000..a77759c0
--- /dev/null
+++ b/runtime/nspoll.h
@@ -0,0 +1,65 @@
+/* Definitions for the nspoll io activity waiter
+ *
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+
+#ifndef INCLUDED_NSPOLL_H
+#define INCLUDED_NSPOLL_H
+
+#include "netstrms.h"
+
+/* some operations to be portable when we do not have epoll() available */
+#define NSDPOLL_ADD 1
+#define NSDPOLL_DEL 2
+
+/* and some mode specifiers for waiting on input/output */
+#define NSDPOLL_IN 1 /* EPOLLIN */
+#define NSDPOLL_OUT 2 /* EPOLLOUT */
+/* next is 4, 8, 16, ... - must be bit values, as they are ored! */
+
+/* the nspoll object */
+struct nspoll_s {
+ BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
+ nsd_t *pDrvrData; /**< the driver's data elements */
+ uchar *pBaseDrvrName; /**< nsd base driver name to use, or NULL if system default */
+ uchar *pDrvrName; /**< full base driver name (set when driver is loaded) */
+ nsdpoll_if_t Drvr; /**< our stream driver */
+};
+
+
+/* interface */
+BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */
+ rsRetVal (*Construct)(nspoll_t **ppThis);
+ rsRetVal (*ConstructFinalize)(nspoll_t *pThis);
+ rsRetVal (*Destruct)(nspoll_t **ppThis);
+ rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr);
+ rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int mode, int op);
+ rsRetVal (*IsEPollSupported)(void); /* static method */
+ENDinterface(nspoll)
+#define nspollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+
+/* prototypes */
+PROTOTYPEObj(nspoll);
+
+/* the name of our library binary */
+#define LM_NSPOLL_FILENAME LM_NETSTRMS_FILENAME
+
+#endif /* #ifndef INCLUDED_NSPOLL_H */
diff --git a/runtime/nssel.c b/runtime/nssel.c
index d11d5fe1..7c5be3a9 100644
--- a/runtime/nssel.c
+++ b/runtime/nssel.c
@@ -219,6 +219,7 @@ ENDObjClassExit(nssel)
*/
BEGINObjClassInit(nssel, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
+ DBGPRINTF("doing nsselClassInit\n");
CHKiRet(objUse(glbl, CORE_COMPONENT));
/* set our own handlers */
diff --git a/runtime/parser.c b/runtime/parser.c
index cacbe065..40374ae1 100644
--- a/runtime/parser.c
+++ b/runtime/parser.c
@@ -60,6 +60,8 @@ DEFobjCurrIf(ruleset)
/* config data */
static uchar cCCEscapeChar = '#';/* character to be used to start an escape sequence for control chars */
static int bEscapeCCOnRcv = 1; /* escape control characters on reception: 0 - no, 1 - yes */
+static int bEscape8BitChars = 0; /* escape characters > 127 on reception: 0 - no, 1 - yes */
+static int bEscapeTab = 1; /* escape tab control character when doing CC escapes: 0 - no, 1 - yes */
static int bDropTrailingLF = 1; /* drop trailing LF's on reception? */
/* This is the list of all parsers known to us.
@@ -308,7 +310,7 @@ SanitizeMsg(msg_t *pMsg)
size_t iDst;
size_t iMaxLine;
size_t maxDest;
- bool bUpdatedLen = FALSE;
+ sbool bUpdatedLen = FALSE;
uchar szSanBuf[32*1024]; /* buffer used for sanitizing a string */
assert(pMsg != NULL);
@@ -343,6 +345,11 @@ SanitizeMsg(msg_t *pMsg)
* needs sanitation than to do the sanitation in any case. So we first do
* this and terminate when it is not needed - which is expectedly the case
* for the vast majority of messages. -- rgerhards, 2009-06-15
+ * Note that we do NOT check here if tab characters are to be escaped or
+ * not. I expect this functionality to be seldomly used and thus I do not
+ * like to pay the performance penalty. So the penalty is only with those
+ * that actually use it, because we may call the sanitizer without actual
+ * need below (but it then still will work perfectly well!). -- rgerhards, 2009-11-27
*/
int bNeedSanitize = 0;
for(iSrc = 0 ; iSrc < lenMsg ; iSrc++) {
@@ -351,6 +358,9 @@ SanitizeMsg(msg_t *pMsg)
bNeedSanitize = 1;
break;
}
+ } else if(pszMsg[iSrc] > 127 && bEscape8BitChars) {
+ bNeedSanitize = 1;
+ break;
}
}
@@ -371,7 +381,7 @@ SanitizeMsg(msg_t *pMsg)
CHKmalloc(pDst = MALLOC(sizeof(uchar) * (iMaxLine + 1)));
iSrc = iDst = 0;
while(iSrc < lenMsg && iDst < maxDest - 3) { /* leave some space if last char must be escaped */
- if(iscntrl((int) pszMsg[iSrc])) {
+ if(iscntrl((int) pszMsg[iSrc]) && (pszMsg[iSrc] != '\t' || bEscapeTab)) {
/* note: \0 must always be escaped, the rest of the code currently
* can not handle it! -- rgerhards, 2009-08-26
*/
@@ -385,6 +395,14 @@ SanitizeMsg(msg_t *pMsg)
pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0070) >> 3);
pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0007));
}
+ } else if(pszMsg[iSrc] > 127 && bEscape8BitChars) {
+ /* In this case, we also do the conversion. Note that this most
+ * probably breaks European languages. -- rgerhards, 2010-01-27
+ */
+ pDst[iDst++] = cCCEscapeChar;
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0300) >> 6);
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0070) >> 3);
+ pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0007));
} else {
pDst[iDst++] = pszMsg[iSrc];
}
@@ -419,19 +437,24 @@ ParsePRI(msg_t *pMsg)
msg = pMsg->pszRawMsg;
pri = DEFUPRI;
iPriText = 0;
- if(*msg == '<') {
- /* while we process the PRI, we also fill the PRI textual representation
- * inside the msg object. This may not be ideal from an OOP point of view,
- * but it offers us performance...
- */
- pri = 0;
- while(--lenMsg > 0 && isdigit((int) *++msg)) {
- pri = 10 * pri + (*msg - '0');
+ if(pMsg->msgFlags & NO_PRI_IN_RAW) {
+ /* In this case, simply do so as if the pri would be right at top */
+ MsgSetAfterPRIOffs(pMsg, 0);
+ } else {
+ if(*msg == '<') {
+ /* while we process the PRI, we also fill the PRI textual representation
+ * inside the msg object. This may not be ideal from an OOP point of view,
+ * but it offers us performance...
+ */
+ pri = 0;
+ while(--lenMsg > 0 && isdigit((int) *++msg)) {
+ pri = 10 * pri + (*msg - '0');
+ }
+ if(*msg == '>')
+ ++msg;
+ if(pri & ~(LOG_FACMASK|LOG_PRIMASK))
+ pri = DEFUPRI;
}
- if(*msg == '>')
- ++msg;
- if(pri & ~(LOG_FACMASK|LOG_PRIMASK))
- pri = DEFUPRI;
}
pMsg->iFacility = LOG_FAC(pri);
pMsg->iSeverity = LOG_PRI(pri);
@@ -451,8 +474,8 @@ ParseMsg(msg_t *pMsg)
rsRetVal localRet = RS_RET_ERR;
parserList_t *pParserList;
parser_t *pParser;
- bool bIsSanitized;
- bool bPRIisParsed;
+ sbool bIsSanitized;
+ sbool bPRIisParsed;
static int iErrMsgRateLimiter = 0;
DEFiRet;
@@ -466,8 +489,9 @@ ParseMsg(msg_t *pMsg)
/* we take the risk to print a non-sanitized string, because this is the best we can get
* (and that functionality is too important for debugging to drop it...).
*/
- DBGPRINTF("msg parser: flags %x, from '%s', msg '%.50s'\n", pMsg->msgFlags,
- getRcvFrom(pMsg), pMsg->pszRawMsg);
+ DBGPRINTF("msg parser: flags %x, from '%s', msg '%.60s'\n", pMsg->msgFlags,
+ (pMsg->msgFlags & NEEDS_DNSRESOL) ? UCHAR_CONSTANT("~NOTRESOLVED~") : getRcvFrom(pMsg),
+ pMsg->pszRawMsg);
/* we now need to go through our list of parsers and see which one is capable of
* parsing the message. Note that the first parser that requires message sanitization
@@ -623,6 +647,8 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
{
cCCEscapeChar = '#';
bEscapeCCOnRcv = 1; /* default is to escape control characters */
+ bEscape8BitChars = 0; /* default is to escape control characters */
+ bEscapeTab = 1; /* default is to escape control characters */
bDropTrailingLF = 1; /* default is to drop trailing LF's on reception */
return RS_RET_OK;
@@ -674,6 +700,8 @@ BEGINObjClassInit(parser, 1, OBJ_IS_CORE_MODULE) /* class, version */
CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"escape8bitcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscape8BitChars, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactertab", 0, eCmdHdlrBinary, NULL, &bEscapeTab, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
InitParserList(&pParsLstRoot);
diff --git a/runtime/parser.h b/runtime/parser.h
index c4f63021..bdd572cb 100644
--- a/runtime/parser.h
+++ b/runtime/parser.h
@@ -38,8 +38,8 @@ struct parser_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
uchar *pName; /* name of this parser */
modInfo_t *pModule; /* pointer to parser's module */
- bool bDoSanitazion; /* do standard message sanitazion before calling parser? */
- bool bDoPRIParsing; /* do standard PRI parsing before calling parser? */
+ sbool bDoSanitazion; /* do standard message sanitazion before calling parser? */
+ sbool bDoPRIParsing; /* do standard PRI parsing before calling parser? */
};
/* interfaces */
diff --git a/runtime/prop.c b/runtime/prop.c
index 94d1bd49..d925bb43 100644
--- a/runtime/prop.c
+++ b/runtime/prop.c
@@ -53,6 +53,7 @@ DEFobjStaticHelpers
*/
BEGINobjConstruct(prop) /* be sure to specify the object type also in END macro! */
pThis->iRefCount = 1;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutRefCount);
ENDobjConstruct(prop)
@@ -60,11 +61,12 @@ ENDobjConstruct(prop)
BEGINobjDestruct(prop) /* be sure to specify the object type also in END and CODESTART macros! */
int currRefCount;
CODESTARTobjDestruct(prop)
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, &pThis->mutRefCount);
if(currRefCount == 0) {
/* (only) in this case we need to actually destruct the object */
if(pThis->len >= CONF_PROP_BUFSIZE)
free(pThis->szVal.psz);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutRefCount);
} else {
pThis = NULL; /* tell framework NOT to destructing the object! */
}
@@ -132,7 +134,7 @@ propConstructFinalize(prop_t __attribute__((unused)) *pThis)
*/
static rsRetVal AddRef(prop_t *pThis)
{
- ATOMIC_INC(pThis->iRefCount);
+ ATOMIC_INC(&pThis->iRefCount, &pThis->mutRefCount);
return RS_RET_OK;
}
diff --git a/runtime/prop.h b/runtime/prop.h
index e3519664..07b2ab7e 100644
--- a/runtime/prop.h
+++ b/runtime/prop.h
@@ -24,6 +24,7 @@
*/
#ifndef INCLUDED_PROP_H
#define INCLUDED_PROP_H
+#include "atomic.h"
/* the prop object */
struct prop_s {
@@ -34,6 +35,7 @@ struct prop_s {
uchar sz[CONF_PROP_BUFSIZE];
} szVal;
int len; /* we use int intentionally, otherwise we may get some troubles... */
+ DEF_ATOMIC_HELPER_MUT(mutRefCount);
};
/* interfaces */
diff --git a/runtime/queue.c b/runtime/queue.c
index b29ec7ac..60d17086 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
+static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -214,7 +216,7 @@ static inline void queueDrain(qqueue_t *pThis)
BEGINfunc
DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
+ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
pThis->qDeq(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
@@ -769,8 +771,8 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
*/
objDestruct(pUsr);
- DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
- nWriteCount, pThis->tVars.disk.sizeOnDisk);
+ DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n",
+ nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly);
finalize_it:
RETiRet;
@@ -839,6 +841,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
batch_obj_t batchObj;
DEFiRet;
+ //TODO: init batchObj (states _OK and new fields -- CHECK)
ASSERT(pThis != NULL);
/* calling the consumer is quite different here than it is from a worker thread */
@@ -849,8 +852,11 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
+ memset(&batchObj, 0, sizeof(batch_obj_t));
+ memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
batchObj.pUsrp = (obj_t*) pUsr;
+ batchObj.bFilterOK = 1;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
@@ -859,6 +865,28 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
RETiRet;
}
+/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
+ * otherwise incured. -- rgerhards, ~2010-06-23
+ */
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ /* calling the consumer is quite different here than it is from a worker thread */
+ /* we need to provide the consumer's return value back to the caller because in direct
+ * mode the consumer probably has a lot to convey (which get's lost in the other modes
+ * because they are asynchronous. But direct mode is deliberately synchronous.
+ * rgerhards, 2008-02-12
+ * We use our knowledge about the batch_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
+ */
+ iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate);
+
+ RETiRet;
+}
+
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -884,7 +912,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
@@ -909,7 +937,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
iRet = pThis->qDeq(pThis, ppUsr);
- ATOMIC_INC(pThis->nLogDeq);
+ ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
@@ -944,6 +972,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
d_pthread_mutex_lock(pThis->mut);
/* tell regular queue DA worker to stop shuffling messages to DA queue... */
+ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode for DA worker\n");
pThis->pqDA->bEnqOnly = 1;
wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE);
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
@@ -1010,6 +1039,7 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
+ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
/* now DA queue */
@@ -1201,6 +1231,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
@@ -1208,6 +1239,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddLinkedList;
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1215,6 +1247,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@@ -1223,9 +1256,13 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
break;
}
+ INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
+
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
RETiRet;
@@ -1288,8 +1325,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- ATOMIC_SUB(pThis->iQueueSize, nElem);
- ATOMIC_SUB(pThis->nLogDeq, nElem);
+ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
+ ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
@@ -1356,6 +1393,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
+dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state);
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*) pUsr));
++nEnqueued;
@@ -1373,7 +1411,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
iRet = DeleteBatchFromQStore(pThis, pBatch);
- pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
+ pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14
RETiRet;
}
@@ -1418,6 +1456,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
/* all well, use this element */
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance
++nDequeued;
}
@@ -1597,6 +1636,12 @@ finalize_it:
/* This is called when a batch is processed and the worker does not
* ask for another batch (e.g. because it is to be terminated)
+ * Note that we must not be terminated while we delete a processed
+ * batch. Otherwise, we may not complete it, and then the cancel
+ * handler also tries to delete the batch. But then it finds some of
+ * the messages already destructed. This was a bug we have seen, especially
+ * with disk mode, where a delete takes rather long. Anyhow, the coneptual
+ * problem exists in all queue modes.
* rgerhards, 2009-05-27
*/
static rsRetVal
@@ -1607,8 +1652,12 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
+ int iCancelStateSave;
+ /* at this spot, we must not be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
DeleteProcessedBatch(pThis, &pWti->batch);
qqueueChkPersist(pThis, pWti->batch.nElemDeq);
+ pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
@@ -1693,7 +1742,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
-dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
@@ -2063,6 +2111,9 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
@@ -2189,6 +2240,7 @@ finalize_it:
RETiRet;
}
+/* ------------------------------ multi-enqueue functions ------------------------------ */
/* enqueue multiple user data elements at once. The aim is to provide a faster interface
* for object submission. Uses the multi_submit_t helper object.
* Please note that this function is not cancel-safe and consequently
@@ -2196,9 +2248,12 @@ finalize_it:
* during its execution. If that is not done, race conditions occur if the
* thread is canceled (most important use case is input module termination).
* rgerhards, 2009-06-16
+ * Note: there now exists multiple different functions implementing specially
+ * optimized algorithms for different config cases. -- rgerhards, 2010-06-09
*/
-rsRetVal
-qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
+/* now the function for all modes but direct */
+static rsRetVal
+qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int iCancelStateSave;
int i;
@@ -2207,27 +2262,55 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pMultiSub != NULL);
- if(pThis->qType != QUEUETYPE_DIRECT) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
- }
-
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
}
-
qqueueChkPersist(pThis, pMultiSub->nElem);
finalize_it:
- if(pThis->qType != QUEUETYPE_DIRECT) {
- /* make sure at least one worker is running. */
- qqueueAdviseMaxWorkers(pThis);
- /* and release the mutex */
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+ /* make sure at least one worker is running. */
+ qqueueAdviseMaxWorkers(pThis);
+ /* and release the mutex */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+
+ RETiRet;
+}
+
+/* now, the same function, but for direct mode */
+static rsRetVal
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
+{
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pMultiSub != NULL);
+
+ for(i = 0 ; i < pMultiSub->nElem ; ++i) {
+ CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
}
+finalize_it:
+ RETiRet;
+}
+/* ------------------------------ END multi-enqueue functions ------------------------------ */
+
+
+/* enqueue a new user data element in direct mode
+ * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better
+ * code later on (like multi submit!) 2010-06-10
+ * Enqueues the new element and awakes worker thread.
+ */
+rsRetVal
+qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ iRet = qAddDirect(pThis, pUsr);
RETiRet;
}
diff --git a/runtime/queue.h b/runtime/queue.h
index 93573dae..1c758134 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -60,9 +60,9 @@ struct queue_s {
queueType_t qType;
int nLogDeq; /* number of elements currently logically dequeued */
int bShutdownImmediate; /* should all workers cease processing messages? */
- bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
- bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
- bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
+ sbool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
+ sbool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
+ sbool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
@@ -73,14 +73,14 @@ struct queue_s {
void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
- bool bSyncQueueFiles;/* if working with files, sync them after each write? */
+ sbool bSyncQueueFiles;/* if working with files, sync them after each write? */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */
int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */
int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */
- bool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
+ sbool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
int toQShutdown; /* timeout for regular queue shutdown in ms */
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
@@ -114,6 +114,9 @@ struct queue_s {
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
rsRetVal (*qDel)(struct queue_s *pThis);
/* end type-specific handler */
+ /* public entry points (set during construction, permit to set best algorithm for params selected) */
+ rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub);
+ /* end public entry points */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
@@ -160,6 +163,8 @@ struct queue_s {
strm_t *pReadDel; /* current file for deleting */
} disk;
} tVars;
+ DEF_ATOMIC_HELPER_MUT(mutQueueSize);
+ DEF_ATOMIC_HELPER_MUT(mutLogDeq);
};
@@ -172,13 +177,14 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub);
+rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr);
rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index a76ae25a..a9794840 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -81,6 +81,8 @@
#include "rule.h"
#include "ruleset.h"
#include "parser.h"
+#include "strgen.h"
+#include "atomic.h"
/* forward definitions */
static rsRetVal dfltErrLogger(int, uchar *errMsg);
@@ -184,6 +186,8 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
CHKiRet(confClassInit(NULL));
if(ppErrObj != NULL) *ppErrObj = "parser";
CHKiRet(parserClassInit(NULL));
+ if(ppErrObj != NULL) *ppErrObj = "strgen";
+ CHKiRet(strgenClassInit(NULL));
/* dummy "classes" */
if(ppErrObj != NULL) *ppErrObj = "str";
@@ -216,6 +220,7 @@ rsrtExit(void)
glblClassExit();
rulesetClassExit();
ruleClassExit();
+
objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */
}
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 1a9186ed..a8b5d401 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -27,6 +27,12 @@
#define INCLUDED_RSYSLOG_H
/* ############################################################# *
+ * # Some constant values # *
+ * ############################################################# */
+#define CONST_LEN_TIMESTAMP_3164 15 /* number of chars (excluding \0!) in a RFC3164 timestamp */
+#define CONST_LEN_TIMESTAMP_3339 32 /* number of chars (excluding \0!) in a RFC3339 timestamp */
+
+/* ############################################################# *
* # Config Settings # *
* ############################################################# */
#define RS_STRINGBUF_ALLOC_INCREMENT 128
@@ -52,6 +58,15 @@
* rgerhards, 2006-11-30
*/
+#define CONF_OMOD_NUMSTRINGS_MAXSIZE 2 /* cache for pointers to output module buffer pointers. All
+ * rsyslog-provided plugins do NOT need more than two buffers. If
+ * more are needed (future developments, third-parties), rsyslog
+ * must be recompiled with a larger parameter. Hardcoding this
+ * saves us some overhead, both in runtime in code complexity. As
+ * it is doubtful if ever more than 2 parameters are needed, the
+ * approach taken here is considered appropriate.
+ * rgerhards, 2010-06-24
+ */
/* ############################################################# *
* # End Config Settings # *
@@ -79,6 +94,10 @@
#define CORE_FEATURE_BATCHING 1
/*#define CORE_FEATURE_whatever 2 ... and so on ... */
+/* under Solaris (actually only SPARC), we need to redefine some types
+ * to be void, so that we get void* pointers. Otherwise, we will see
+ * alignment errors.
+ */
/* some universal fixed size integer defines ... */
typedef long long int64;
typedef long long unsigned uint64;
@@ -87,6 +106,7 @@ typedef char intTiny; /* 0..127! */
typedef unsigned char uintTiny; /* 0..255! */
/* define some base data types */
+
typedef unsigned char uchar;/* get rid of the unhandy "unsigned char" */
typedef struct aUsrp_s aUsrp_t;
typedef struct thrdInfo thrdInfo_t;
@@ -98,6 +118,7 @@ typedef struct NetAddr netAddr_t;
typedef struct netstrms_s netstrms_t;
typedef struct netstrm_s netstrm_t;
typedef struct nssel_s nssel_t;
+typedef struct nspoll_s nspoll_t;
typedef enum nsdsel_waitOp_e nsdsel_waitOp_t;
typedef struct nsd_ptcp_s nsd_ptcp_t;
typedef struct nsd_gtls_s nsd_gtls_t;
@@ -105,9 +126,8 @@ typedef struct nsd_gsspi_s nsd_gsspi_t;
typedef struct nsd_nss_s nsd_nss_t;
typedef struct nsdsel_ptcp_s nsdsel_ptcp_t;
typedef struct nsdsel_gtls_s nsdsel_gtls_t;
+typedef struct nsdpoll_ptcp_s nsdpoll_ptcp_t;
typedef struct wti_s wti_t;
-typedef obj_t nsd_t;
-typedef obj_t nsdsel_t;
typedef struct msg msg_t;
typedef struct queue_s qqueue_t;
typedef struct prop_s prop_t;
@@ -128,18 +148,39 @@ typedef struct wtp_s wtp_t;
typedef struct modInfo_s modInfo_t;
typedef struct parser_s parser_t;
typedef struct parserList_s parserList_t;
+typedef struct strgen_s strgen_t;
+typedef struct strgenList_s strgenList_t;
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */
typedef uint64 qDeqID; /* queue Dequeue order ID. 32 bits is considered dangerously few */
typedef struct tcpLstnPortList_s tcpLstnPortList_t; // TODO: rename?
typedef struct strmLstnPortList_s strmLstnPortList_t; // TODO: rename?
+/* under Solaris (actually only SPARC), we need to redefine some types
+ * to be void, so that we get void* pointers. Otherwise, we will see
+ * alignment errors.
+ */
+#ifdef OS_SOLARIS
+ typedef void * obj_t_ptr;
+ typedef void nsd_t;
+ typedef void nsdsel_t;
+ typedef void nsdpoll_t;
+#else
+ typedef obj_t *obj_t_ptr;
+ typedef obj_t nsd_t;
+ typedef obj_t nsdsel_t;
+ typedef obj_t nsdpoll_t;
+#endif
+
+
#ifdef __hpux
typedef unsigned int u_int32_t; /* TODO: is this correct? */
typedef int socklen_t;
#endif
-typedef char bool; /* I intentionally use char, to keep it slim so that many fit into the CPU cache! */
+typedef struct epoll_event epoll_event_t;
+
+typedef char sbool; /* (small bool) I intentionally use char, to keep it slim so that many fit into the CPU cache! */
/* settings for flow control
* TODO: is there a better place for them? -- rgerhards, 2008-03-14
@@ -379,7 +420,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_RSCORE_TOO_OLD = -2120, /**< rsyslog core is too old for ... (eg this plugin) */
RS_RET_DEFER_COMMIT = -2121, /**< output plugin status: not yet committed (an OK state!) */
RS_RET_PREVIOUS_COMMITTED = -2122, /**< output plugin status: previous record was committed (an OK state!) */
- RS_RET_ACTION_FAILED = -2123, /**< action failed and is now suspended (consider this permanent for the time being) */
+ RS_RET_ACTION_FAILED = -2123, /**< action failed and is now suspended */
RS_RET_NONFATAL_CONFIG_ERR = -2124, /**< non-fatal error during config processing */
RS_RET_NON_SIZELIMITCMD = -2125, /**< size limit for file defined, but no size limit command given */
RS_RET_SIZELIMITCMD_DIDNT_RESOLVE = -2126, /**< size limit command did not resolve situation */
@@ -389,6 +430,9 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_VAR_NOT_FOUND = -2142, /**< variable not found */
RS_RET_EMPTY_MSG = -2143, /**< provided (raw) MSG is empty */
RS_RET_PEER_CLOSED_CONN = -2144, /**< remote peer closed connection (information, no error) */
+ RS_RET_ERR_OPEN_KLOG = -2145, /**< error opening the kernel log socket (primarily solaris) */
+ RS_RET_ERR_AQ_CONLOG = -2146, /**< error aquiring console log (on solaris) */
+ RS_RET_ERR_DOOR = -2147, /**< some problems with handling the Solaris door functionality */
RS_RET_NO_SRCNAME_TPL = -2150, /**< sourcename template was not specified where one was needed (omudpspoof spoof addr) */
RS_RET_HOST_NOT_SPECIFIED = -2151, /**< (target) host was not specified where it was needed */
RS_RET_ERR_LIBNET_INIT = -2152, /**< error initializing libnet */
@@ -400,6 +444,11 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_NO_RULESET= -2158,/**< no ruleset name as specified where one was needed */
RS_RET_PARSER_NOT_FOUND = -2159,/**< parser with the specified name was not found */
RS_RET_COULD_NOT_PARSE = -2160,/**< (this) parser could not parse the message (no error, means try next one) */
+ RS_RET_EINTR = -2161, /**< EINTR occured during a system call (not necessarily an error) */
+ RS_RET_ERR_EPOLL = -2162, /**< epoll() returned with an unexpected error code */
+ RS_RET_ERR_EPOLL_CTL = -2163, /**< epol_ctll() returned with an unexpected error code */
+ RS_RET_TIMEOUT = -2164, /**< timeout occured during operation */
+ RS_RET_RCV_ERR = -2165, /**< error occured during socket rcv operation */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/runtime/rule.c b/runtime/rule.c
index 65ad071e..42773768 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -39,6 +39,7 @@
#include "vm.h"
#include "var.h"
#include "srUtils.h"
+#include "batch.h"
#include "unicode-helper.h"
/* static data */
@@ -87,40 +88,20 @@ iterateAllActions(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void* pParam)
}
-
/* helper to processMsg(), used to call the configured actions. It is
* executed from within llExecFunc() of the action list.
* rgerhards, 2007-08-02
*/
-typedef struct processMsgDoActions_s {
- int bPrevWasSuspended; /* was the previous action suspended? */
- msg_t *pMsg;
-} processMsgDoActions_t;
-DEFFUNC_llExecFunc(processMsgDoActions)
+DEFFUNC_llExecFunc(processBatchDoActions)
{
DEFiRet;
rsRetVal iRetMod; /* return value of module - we do not always pass that back */
action_t *pAction = (action_t*) pData;
- processMsgDoActions_t *pDoActData = (processMsgDoActions_t*) pParam;
-
- assert(pAction != NULL);
-
- if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) {
- dbgprintf("not calling action because the previous one is not suspended\n");
- ABORT_FINALIZE(RS_RET_OK);
- }
+ batch_t *pBatch = (batch_t*) pParam;
- iRetMod = actionCallAction(pAction, pDoActData->pMsg);
- if(iRetMod == RS_RET_DISCARDMSG) {
- ABORT_FINALIZE(RS_RET_DISCARDMSG);
- } else if(iRetMod == RS_RET_SUSPENDED) {
- /* indicate suspension for next module to be called */
- pDoActData->bPrevWasSuspended = 1;
- } else {
- pDoActData->bPrevWasSuspended = 0;
- }
+ DBGPRINTF("Processing next action\n");
+ iRetMod = pAction->submitToActQ(pAction, pBatch);
-finalize_it:
RETiRet;
}
@@ -129,7 +110,7 @@ finalize_it:
* provided filter condition.
*/
static rsRetVal
-shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
+shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg)
{
DEFiRet;
unsigned short pbMustBeFreed;
@@ -166,7 +147,6 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
}
}
-RUNLOG_VAR("%p", pRule->pCSProgNameComp);
if(pRule->pCSProgNameComp != NULL) {
int bInv = 0, bEqv = 0, offset = 0;
if(*(rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp)) == '-') {
@@ -279,26 +259,29 @@ finalize_it:
-/* Process (consume) a received message. Calls the actions configured.
+/* Process (consume) a batch of messages. Calls the actions configured.
* rgerhards, 2005-10-13
*/
static rsRetVal
-processMsg(rule_t *pThis, msg_t *pMsg)
+processBatch(rule_t *pThis, batch_t *pBatch)
{
- int bProcessMsg;
- processMsgDoActions_t DoActData;
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
- assert(pMsg != NULL);
-
- /* first check the filters... */
- CHKiRet(shouldProcessThisMessage(pThis, pMsg, &bProcessMsg));
- if(bProcessMsg) {
- DoActData.pMsg = pMsg;
- DoActData.bPrevWasSuspended = 0;
- CHKiRet(llExecFunc(&pThis->llActList, processMsgDoActions, (void*)&DoActData));
+ assert(pBatch != NULL);
+
+ /* first check the filters and reset status variables */
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
+ &(pBatch->pElem[i].bFilterOK)));
+ // TODO: really abort on error? 2010-06-10
+ if(pBatch->pElem[i].bFilterOK) {
+ /* re-init only when actually needed (cache write cost!) */
+ pBatch->pElem[i].bPrevWasSuspended = 0;
+ }
}
+ CHKiRet(llExecFunc(&pThis->llActList, processBatchDoActions, pBatch));
finalize_it:
RETiRet;
@@ -441,7 +424,7 @@ CODESTARTobjQueryInterface(rule)
pIf->DebugPrint = ruleDebugPrint;
pIf->IterateAllActions = iterateAllActions;
- pIf->ProcessMsg = processMsg;
+ pIf->ProcessBatch = processBatch;
pIf->SetAssRuleset = setAssRuleset;
pIf->GetAssRuleset = getAssRuleset;
finalize_it:
diff --git a/runtime/rule.h b/runtime/rule.h
index 99ac44e7..309a2ed8 100644
--- a/runtime/rule.h
+++ b/runtime/rule.h
@@ -47,7 +47,7 @@ struct rule_s {
fiop_t operation;
regex_t *regex_cache; /* cache for compiled REs, if such are used */
cstr_t *pCSCompValue; /* value to "compare" against */
- bool isNegated;
+ sbool isNegated;
propid_t propID; /* ID of the requested property */
} prop;
expr_t *f_expr; /* expression object */
@@ -64,11 +64,12 @@ BEGINinterface(rule) /* name must also be changed in ENDinterface macro! */
rsRetVal (*ConstructFinalize)(rule_t __attribute__((unused)) *pThis);
rsRetVal (*Destruct)(rule_t **ppThis);
rsRetVal (*IterateAllActions)(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void *pParam);
- rsRetVal (*ProcessMsg)(rule_t *pThis, msg_t *pMsg);
+ rsRetVal (*ProcessBatch)(rule_t *pThis, batch_t *pBatch);
rsRetVal (*SetAssRuleset)(rule_t *pThis, ruleset_t*);
ruleset_t* (*GetAssRuleset)(rule_t *pThis);
ENDinterface(rule)
-#define ruleCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+#define ruleCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */
+/* change for v2: ProcessMsg replaced by ProcessBatch - 2010-06-10 */
/* prototypes */
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 785fbeaa..0584e8d6 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -46,6 +46,7 @@
#include "rule.h"
#include "errmsg.h"
#include "parser.h"
+#include "batch.h"
#include "unicode-helper.h"
#include "dirty.h" /* for main ruleset queue creation */
@@ -59,6 +60,9 @@ linkedList_t llRulesets; /* this is NOT a pointer - no typo here ;) */
ruleset_t *pCurrRuleset = NULL; /* currently "active" ruleset */
ruleset_t *pDfltRuleset = NULL; /* current default ruleset, e.g. for binding to actions which have no other */
+/* forward definitions */
+static rsRetVal processBatch(batch_t *pBatch);
+
/* ---------- linked-list key handling functions ---------- */
/* destructor for linked list keys.
@@ -134,34 +138,96 @@ finalize_it:
-/* helper to processMsg(), used to call the configured actions. It is
+/* helper to processBatch(), used to call the configured actions. It is
* executed from within llExecFunc() of the action list.
* rgerhards, 2007-08-02
*/
-DEFFUNC_llExecFunc(processMsgDoRules)
+DEFFUNC_llExecFunc(processBatchDoRules)
{
rsRetVal iRet;
ISOBJ_TYPE_assert(pData, rule);
- iRet = rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam);
+ dbgprintf("Processing next rule\n");
+ iRet = rule.ProcessBatch((rule_t*) pData, (batch_t*) pParam);
dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet);
return iRet;
}
-/* Process (consume) a received message. Calls the actions configured.
+
+/* This function is similar to processBatch(), but works on a batch that
+ * contains rules from multiple rulesets. In this case, we can not push
+ * the whole batch through the ruleset. Instead, we examine it and
+ * partition it into sub-rulesets which we then push through the system.
+ * Note that when we evaluate which message must be processed, we do NOT need
+ * to look at bFilterOK, because this value is only set in a later processing
+ * stage. Doing so caused a bug during development ;)
+ * rgerhards, 2010-06-15
+ */
+static inline rsRetVal
+processBatchMultiRuleset(batch_t *pBatch)
+{
+ ruleset_t *currRuleset;
+ batch_t snglRuleBatch;
+ int i;
+ int iStart; /* start index of partial batch */
+ int iNew; /* index for new (temporary) batch */
+ DEFiRet;
+
+ CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
+ snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
+
+ while(1) { /* loop broken inside */
+ /* search for first unprocessed element */
+ for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart)
+ /* just search, no action */;
+
+ if(iStart == pBatch->nElem)
+ FINALIZE; /* everything processed */
+
+ /* prepare temporary batch */
+ currRuleset = batchElemGetRuleset(pBatch, iStart);
+ iNew = 0;
+ for(i = iStart ; i < pBatch->nElem ; ++i) {
+ if(batchElemGetRuleset(pBatch, i) == currRuleset) {
+ batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i]));
+ /* We indicate the element also as done, so it will not be processed again */
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ }
+ }
+ snglRuleBatch.nElem = iNew; /* was left just right by the for loop */
+ batchSetSingleRuleset(&snglRuleBatch, 1);
+ /* process temp batch */
+ processBatch(&snglRuleBatch);
+ }
+ batchFree(&snglRuleBatch);
+
+finalize_it:
+ RETiRet;
+}
+
+/* Process (consume) a batch of messages. Calls the actions configured.
+ * If the whole batch uses a singel ruleset, we can process the batch as
+ * a whole. Otherwise, we need to process it slower, on a message-by-message
+ * basis (what can be optimized to a per-ruleset basis)
* rgerhards, 2005-10-13
*/
static rsRetVal
-processMsg(msg_t *pMsg)
+processBatch(batch_t *pBatch)
{
ruleset_t *pThis;
DEFiRet;
- assert(pMsg != NULL);
-
- pThis = (pMsg->pRuleset == NULL) ? pDfltRuleset : pMsg->pRuleset;
- ISOBJ_TYPE_assert(pThis, ruleset);
-
- CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg));
+ assert(pBatch != NULL);
+
+dbgprintf("ZZZ: processBatch: batch of %d elements must be processed\n", pBatch->nElem);
+ if(pBatch->bSingleRuleset) {
+ pThis = batchGetRuleset(pBatch);
+ if(pThis == NULL)
+ pThis = pDfltRuleset;
+ ISOBJ_TYPE_assert(pThis, ruleset);
+ CHKiRet(llExecFunc(&pThis->llRules, processBatchDoRules, pBatch));
+ } else {
+ CHKiRet(processBatchMultiRuleset(pBatch));
+ }
finalize_it:
dbgprintf("ruleset.ProcessMsg() returns %d\n", iRet);
@@ -516,7 +582,7 @@ CODESTARTobjQueryInterface(ruleset)
pIf->IterateAllActions = iterateAllActions;
pIf->DestructAllActions = destructAllActions;
pIf->AddRule = addRule;
- pIf->ProcessMsg = processMsg;
+ pIf->ProcessBatch = processBatch;
pIf->SetName = setName;
pIf->DebugPrintAll = debugPrintAll;
pIf->GetCurrent = GetCurrent;
diff --git a/runtime/ruleset.h b/runtime/ruleset.h
index 222d773e..acebd17a 100644
--- a/runtime/ruleset.h
+++ b/runtime/ruleset.h
@@ -48,7 +48,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
rsRetVal (*DestructAllActions)(void);
rsRetVal (*AddRule)(ruleset_t *pThis, rule_t **ppRule);
rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName);
- rsRetVal (*ProcessMsg)(msg_t *pMsg);
+ rsRetVal (*ProcessBatch)(batch_t*);
rsRetVal (*GetRuleset)(ruleset_t **ppThis, uchar*);
rsRetVal (*SetDefaultRuleset)(uchar*);
rsRetVal (*SetCurrRuleset)(uchar*);
@@ -57,7 +57,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
/* v3, 2009-11-04 */
parserList_t* (*GetParserList)(msg_t *);
ENDinterface(ruleset)
-#define rulesetCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */
+#define rulesetCURR_IF_VERSION 4 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/runtime/srutils.c b/runtime/srutils.c
index 92227399..d357cd77 100644
--- a/runtime/srutils.c
+++ b/runtime/srutils.c
@@ -46,6 +46,9 @@
#include "srUtils.h"
#include "obj.h"
+#if _POSIX_TIMERS <= 0
+#include <sys/time.h>
+#endif
/* here we host some syslog specific names. There currently is no better place
* to do it, but over here is also not ideal... -- rgerhards, 2008-02-14
@@ -392,10 +395,22 @@ int getNumberDigits(long lNum)
rsRetVal
timeoutComp(struct timespec *pt, long iTimeout)
{
+# if _POSIX_TIMERS <= 0
+ struct timeval tv;
+# endif
+
BEGINfunc
assert(pt != NULL);
/* compute timeout */
+
+# if _POSIX_TIMERS > 0
+ /* this is the "regular" code */
clock_gettime(CLOCK_REALTIME, pt);
+# else
+ gettimeofday(&tv, NULL);
+ pt->tv_sec = tv.tv_sec;
+ pt->tv_nsec = tv.tv_usec * 1000;
+# endif
pt->tv_sec += iTimeout / 1000;
pt->tv_nsec += (iTimeout % 1000) * 1000000; /* think INTEGER arithmetic! */
if(pt->tv_nsec > 999999999) { /* overrun? */
@@ -417,11 +432,21 @@ timeoutVal(struct timespec *pt)
{
struct timespec t;
long iTimeout;
- BEGINfunc
+# if _POSIX_TIMERS <= 0
+ struct timeval tv;
+# endif
+ BEGINfunc
assert(pt != NULL);
/* compute timeout */
+# if _POSIX_TIMERS > 0
+ /* this is the "regular" code */
clock_gettime(CLOCK_REALTIME, &t);
+# else
+ gettimeofday(&tv, NULL);
+ t.tv_sec = tv.tv_sec;
+ t.tv_nsec = tv.tv_usec * 1000;
+# endif
iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000000;
iTimeout += (pt->tv_sec - t.tv_sec) * 1000;
diff --git a/runtime/stream.c b/runtime/stream.c
index 0415c25c..b4295762 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -215,7 +215,8 @@ doPhysOpen(strm_t *pThis)
}
pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
- DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName, pThis->fd, pThis->tOpenMode);
+ DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName,
+ pThis->fd, (int) pThis->tOpenMode);
if(pThis->fd == -1) {
char errStr[1024];
int err = errno;
@@ -923,7 +924,7 @@ asyncWriterThread(void *pPtr)
{
int iDeq;
struct timespec t;
- bool bTimedOut = 0;
+ sbool bTimedOut = 0;
strm_t *pThis = (strm_t*) pPtr;
ISOBJ_TYPE_assert(pThis, strm);
@@ -936,7 +937,6 @@ asyncWriterThread(void *pPtr)
while(1) { /* loop broken inside */
d_pthread_mutex_lock(&pThis->mut);
-dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName);
while(pThis->iCnt == 0) {
if(pThis->bStopWriter) {
pthread_cond_broadcast(&pThis->isEmpty);
@@ -952,7 +952,6 @@ dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName);
bTimedOut = 0;
timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ // TODO: check the 2000?!?
if(pThis->bDoTimedWait) {
-dbgprintf("asyncWriter thread going to timeout sleep\n");
if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) {
int err = errno;
if(err == ETIMEDOUT) {
@@ -966,16 +965,13 @@ dbgprintf("asyncWriter thread going to timeout sleep\n");
}
}
} else {
-dbgprintf("asyncWriter thread going to eternal sleep\n");
d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
}
-dbgprintf("asyncWriter woke up\n");
}
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
-dbgprintf("asyncWriter writes data\n");
doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
// TODO: error check????? 2009-07-06
@@ -1090,7 +1086,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
z_stream zstrm;
int zRet; /* zlib return state */
- bool bzInitDone = FALSE;
+ sbool bzInitDone = FALSE;
DEFiRet;
assert(pThis != NULL);
assert(pBuf != NULL);
diff --git a/runtime/stream.h b/runtime/stream.h
index e67bcda6..37e9d570 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -102,12 +102,12 @@ typedef struct strm_s {
int64 iMaxFileSize;/* maximum size a file may grow to */
int iMaxFiles; /* maximum number of files if a circular mode is in use */
int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
- bool bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
+ sbool bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
int64 iCurrOffs;/* current offset */
int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since the last CntrSet() */
/* dynamic properties, valid only during file open, not to be persistet */
- bool bDisabled; /* should file no longer be written to? (currently set only if omfile file size limit fails) */
- bool bSync; /* sync this file after every write? */
+ sbool bDisabled; /* should file no longer be written to? (currently set only if omfile file size limit fails) */
+ sbool bSync; /* sync this file after every write? */
size_t sIOBufSize;/* size of IO buffer */
uchar *pszDir; /* Directory */
int lenDir;
@@ -118,13 +118,13 @@ typedef struct strm_s {
size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
size_t iBufPtr; /* pointer into current buffer */
int iUngetC; /* char set via UngetChar() call or -1 if none set */
- bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */
+ sbool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */
int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */
Bytef *pZipBuf;
/* support for async flush procesing */
- bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
- bool bStopWriter; /* shall writer thread terminate? */
- bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
+ sbool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
+ sbool bStopWriter; /* shall writer thread terminate? */
+ sbool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
int iFlushInterval; /* flush in which interval - 0, no flushing */
apc_id_t apcID; /* id of current Apc request (used for cancelling) */
pthread_mutex_t mut;/* mutex for flush in async mode */
@@ -143,7 +143,7 @@ typedef struct strm_s {
/* support for omfile size-limiting commands, special counters, NOT persisted! */
off_t iSizeLimit; /* file size limit, 0 = no limit */
uchar *pszSizeLimitCmd; /* command to carry out when size limit is reached */
- bool bIsTTY; /* is this a tty file? */
+ sbool bIsTTY; /* is this a tty file? */
} strm_t;
diff --git a/runtime/strgen.c b/runtime/strgen.c
new file mode 100644
index 00000000..46be1236
--- /dev/null
+++ b/runtime/strgen.c
@@ -0,0 +1,279 @@
+/* strgen.c
+ * Module to handle string generators. These are C modules that receive
+ * the message object and return a custom-built string. The primary purpose
+ * for their existance is performance -- they do the same as template strings, but
+ * potentially faster (if well implmented).
+ *
+ * Module begun 2010-06-01 by Rainer Gerhards
+ *
+ * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#include "config.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include "rsyslog.h"
+#include "msg.h"
+#include "obj.h"
+#include "errmsg.h"
+#include "strgen.h"
+#include "ruleset.h"
+#include "unicode-helper.h"
+#include "cfsysline.h"
+
+/* definitions for objects we access */
+DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(ruleset)
+
+/* static data */
+
+/* config data */
+
+/* This is the list of all strgens known to us.
+ * This is also used to unload all modules on shutdown.
+ */
+strgenList_t *pStrgenLstRoot = NULL;
+
+
+/* intialize (but NOT allocate) a strgen list. Primarily meant as a hook
+ * which can be used to extend the list in the future. So far, just sets
+ * it to NULL.
+ */
+static rsRetVal
+InitStrgenList(strgenList_t **pListRoot)
+{
+ *pListRoot = NULL;
+ return RS_RET_OK;
+}
+
+
+/* destruct a strgen list. The list elements are destroyed, but the strgen objects
+ * themselves are not modified. (That is done at a late stage during rsyslogd
+ * shutdown and need not be considered here.)
+ */
+static rsRetVal
+DestructStrgenList(strgenList_t **ppListRoot)
+{
+ strgenList_t *pStrgenLst;
+ strgenList_t *pStrgenLstDel;
+
+ pStrgenLst = *ppListRoot;
+ while(pStrgenLst != NULL) {
+ pStrgenLstDel = pStrgenLst;
+ pStrgenLst = pStrgenLst->pNext;
+ free(pStrgenLstDel);
+ }
+ *ppListRoot = NULL;
+ return RS_RET_OK;
+}
+
+
+/* Add a strgen to the list. We use a VERY simple and ineffcient algorithm,
+ * but it is employed only for a few milliseconds during config processing. So
+ * I prefer to keep it very simple and with simple data structures. Unfortunately,
+ * we need to preserve the order, but I don't like to add a tail pointer as that
+ * would require a container object. So I do the extra work to skip to the tail
+ * when adding elements...
+ */
+static rsRetVal
+AddStrgenToList(strgenList_t **ppListRoot, strgen_t *pStrgen)
+{
+ strgenList_t *pThis;
+ strgenList_t *pTail;
+ DEFiRet;
+
+ CHKmalloc(pThis = MALLOC(sizeof(strgenList_t)));
+ pThis->pStrgen = pStrgen;
+ pThis->pNext = NULL;
+
+ if(*ppListRoot == NULL) {
+ pThis->pNext = *ppListRoot;
+ *ppListRoot = pThis;
+ } else {
+ /* find tail first */
+ for(pTail = *ppListRoot ; pTail->pNext != NULL ; pTail = pTail->pNext)
+ /* just search, do nothing else */;
+ /* add at tail */
+ pTail->pNext = pThis;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* find a strgen based on the provided name */
+static rsRetVal
+FindStrgen(strgen_t **ppStrgen, uchar *pName)
+{
+ strgenList_t *pThis;
+ DEFiRet;
+
+ for(pThis = pStrgenLstRoot ; pThis != NULL ; pThis = pThis->pNext) {
+ if(ustrcmp(pThis->pStrgen->pName, pName) == 0) {
+ *ppStrgen = pThis->pStrgen;
+ FINALIZE; /* found it, iRet still eq. OK! */
+ }
+ }
+
+ iRet = RS_RET_PARSER_NOT_FOUND;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* --- END helper functions for strgen list handling --- */
+
+
+BEGINobjConstruct(strgen) /* be sure to specify the object type also in END macro! */
+ENDobjConstruct(strgen)
+
+/* ConstructionFinalizer. The most important chore is to add the strgen object
+ * to our global list of available strgens.
+ * rgerhards, 2009-11-03
+ */
+rsRetVal strgenConstructFinalize(strgen_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strgen);
+ CHKiRet(AddStrgenToList(&pStrgenLstRoot, pThis));
+ DBGPRINTF("Strgen '%s' added to list of available strgens.\n", pThis->pName);
+
+finalize_it:
+ RETiRet;
+}
+
+BEGINobjDestruct(strgen) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(strgen)
+ dbgprintf("destructing strgen '%s'\n", pThis->pName);
+ free(pThis->pName);
+ENDobjDestruct(strgen)
+
+/* set the strgen name - string is copied over, call can continue to use it,
+ * but must free it if desired.
+ */
+static rsRetVal
+SetName(strgen_t *pThis, uchar *name)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strgen);
+ assert(name != NULL);
+
+ if(pThis->pName != NULL) {
+ free(pThis->pName);
+ pThis->pName = NULL;
+ }
+
+ CHKmalloc(pThis->pName = ustrdup(name));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* set a pointer to "our" module. Note that no module
+ * pointer must already be set.
+ */
+static rsRetVal
+SetModPtr(strgen_t *pThis, modInfo_t *pMod)
+{
+ ISOBJ_TYPE_assert(pThis, strgen);
+ assert(pMod != NULL);
+ assert(pThis->pModule == NULL);
+ pThis->pModule = pMod;
+ return RS_RET_OK;
+}
+
+
+/* queryInterface function-- rgerhards, 2009-11-03
+ */
+BEGINobjQueryInterface(strgen)
+CODESTARTobjQueryInterface(strgen)
+ if(pIf->ifVersion != strgenCURR_IF_VERSION) { /* check for current version, increment on each change */
+ ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
+ }
+
+ /* ok, we have the right interface, so let's fill it
+ * Please note that we may also do some backwards-compatibility
+ * work here (if we can support an older interface version - that,
+ * of course, also affects the "if" above).
+ */
+ pIf->Construct = strgenConstruct;
+ pIf->ConstructFinalize = strgenConstructFinalize;
+ pIf->Destruct = strgenDestruct;
+ pIf->SetName = SetName;
+ pIf->SetModPtr = SetModPtr;
+ pIf->InitStrgenList = InitStrgenList;
+ pIf->DestructStrgenList = DestructStrgenList;
+ pIf->AddStrgenToList = AddStrgenToList;
+ pIf->FindStrgen = FindStrgen;
+finalize_it:
+ENDobjQueryInterface(strgen)
+
+
+/* This destroys the master strgenlist and all of its strgen entries. MUST only be
+ * done when the module is shut down. Strgen modules are NOT unloaded, rsyslog
+ * does that at a later stage for all dynamically loaded modules.
+ */
+static void
+destroyMasterStrgenList(void)
+{
+ strgenList_t *pStrgenLst;
+ strgenList_t *pStrgenLstDel;
+
+ pStrgenLst = pStrgenLstRoot;
+ while(pStrgenLst != NULL) {
+ strgenDestruct(&pStrgenLst->pStrgen);
+ pStrgenLstDel = pStrgenLst;
+ pStrgenLst = pStrgenLst->pNext;
+ free(pStrgenLstDel);
+ }
+}
+
+/* Exit our class.
+ * rgerhards, 2009-11-04
+ */
+BEGINObjClassExit(strgen, OBJ_IS_CORE_MODULE) /* class, version */
+ destroyMasterStrgenList();
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
+ENDObjClassExit(strgen)
+
+
+/* Initialize the strgen class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2009-11-02
+ */
+BEGINObjClassInit(strgen, 1, OBJ_IS_CORE_MODULE) /* class, version */
+ /* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ InitStrgenList(&pStrgenLstRoot);
+ENDObjClassInit(strgen)
+
diff --git a/runtime/strgen.h b/runtime/strgen.h
new file mode 100644
index 00000000..3819dccd
--- /dev/null
+++ b/runtime/strgen.h
@@ -0,0 +1,60 @@
+/* header for strgen.c
+ *
+ * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#ifndef INCLUDED_STRGEN_H
+#define INCLUDED_STRGEN_H
+
+
+/* we create a small helper object, a list of strgens, that we can use to
+ * build a chain of them whereever this is needed.
+ */
+struct strgenList_s {
+ strgen_t *pStrgen;
+ strgenList_t *pNext;
+};
+
+
+/* the strgen object, a dummy because we have only static methods */
+struct strgen_s {
+ BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
+ uchar *pName; /* name of this strgen */
+ modInfo_t *pModule; /* pointer to strgen's module */
+};
+
+/* interfaces */
+BEGINinterface(strgen) /* name must also be changed in ENDinterface macro! */
+ rsRetVal (*Construct)(strgen_t **ppThis);
+ rsRetVal (*ConstructFinalize)(strgen_t *pThis);
+ rsRetVal (*Destruct)(strgen_t **ppThis);
+ rsRetVal (*SetName)(strgen_t *pThis, uchar *name);
+ rsRetVal (*SetModPtr)(strgen_t *pThis, modInfo_t *pMod);
+ rsRetVal (*FindStrgen)(strgen_t **ppThis, uchar*name);
+ rsRetVal (*InitStrgenList)(strgenList_t **pListRoot);
+ rsRetVal (*DestructStrgenList)(strgenList_t **pListRoot);
+ rsRetVal (*AddStrgenToList)(strgenList_t **pListRoot, strgen_t *pStrgen);
+ENDinterface(strgen)
+#define strgenCURR_IF_VERSION 1 /* increment whenever you change the interface above! */
+
+
+/* prototypes */
+PROTOTYPEObj(strgen);
+
+#endif /* #ifndef INCLUDED_STRGEN_H */
diff --git a/runtime/unlimited_select.h b/runtime/unlimited_select.h
new file mode 100644
index 00000000..32dadc03
--- /dev/null
+++ b/runtime/unlimited_select.h
@@ -0,0 +1,45 @@
+/* unlimited_select.h
+ * Tweak the macros for accessing fd_set so that the select() syscall
+ * won't be limited to a particular number of file descriptors.
+ *
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+
+#ifndef UNLIMITED_SELECT_H_INCLUDED
+
+#include <string.h>
+#include <stdlib.h>
+#include <sys/select.h>
+#include "glbl.h"
+
+#ifdef USE_UNLIMITED_SELECT
+# undef FD_ZERO
+# define FD_ZERO(set) memset((set), 0, glbl.GetFdSetSize());
+#endif
+
+#ifdef USE_UNLIMITED_SELECT
+void freeFdSet(fd_set *p) {
+ free(p);
+}
+#else
+# define freeFdSet(x)
+#endif
+
+#endif /* #ifndef UNLIMITED_SELECT_H_INCLUDED */
diff --git a/runtime/wti.c b/runtime/wti.c
index 288670b6..9343f5c5 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -39,11 +39,6 @@
#include <pthread.h>
#include <errno.h>
-/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20
-//#ifdef OS_SOLARIS
-//# include <sched.h>
-//#endif
-
#include "rsyslog.h"
#include "stringbuf.h"
#include "srUtils.h"
@@ -79,10 +74,10 @@ wtiGetDbgHdr(wti_t *pThis)
/* return the current worker processing state. For the sake of
* simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17
*/
-bool
+sbool
wtiGetState(wti_t *pThis)
{
- return ATOMIC_FETCH_32BIT(pThis->bIsRunning);
+ return ATOMIC_FETCH_32BIT(&pThis->bIsRunning, &pThis->mutIsRunning);
}
@@ -102,17 +97,39 @@ wtiSetAlwaysRunning(wti_t *pThis)
* is inside wti). -- rgerhards, 2009-07-17
*/
rsRetVal
-wtiSetState(wti_t *pThis, bool bNewVal)
+wtiSetState(wti_t *pThis, sbool bNewVal)
{
ISOBJ_TYPE_assert(pThis, wti);
- if(bNewVal)
- ATOMIC_STORE_1_TO_INT(pThis->bIsRunning);
- else
- ATOMIC_STORE_0_TO_INT(pThis->bIsRunning);
+ if(bNewVal) {
+ ATOMIC_STORE_1_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning);
+ } else {
+ ATOMIC_STORE_0_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning);
+ }
return RS_RET_OK;
}
+/* advise all workers to start by interrupting them. That should unblock all srSleep()
+ * calls.
+ */
+rsRetVal
+wtiWakeupThrd(wti_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+
+ if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ dbgprintf("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID);
+ }
+
+ RETiRet;
+}
+
+
/* Cancel the thread. If the thread is not running. But it is save and legal to
* call wtiCancelThrd() in such situations. This function only returns when the
* thread has terminated. Else we may get race conditions all over the code...
@@ -129,7 +146,16 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
+
+ if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ dbgprintf("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
+ srSleep(0, 10000);
+ }
+
if(wtiGetState(pThis)) {
+ dbgprintf("cooperative worker termination failed, using cancellation...\n");
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
/* now wait until the thread terminates... */
@@ -146,7 +172,9 @@ wtiCancelThrd(wti_t *pThis)
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
/* actual destruction */
- free(pThis->batch.pElem);
+ batchFree(&pThis->batch);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
+
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -154,6 +182,7 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
+ INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
ENDobjConstruct(wti)
@@ -175,7 +204,7 @@ wtiConstructFinalize(wti_t *pThis)
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
- CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t)));
+ CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
finalize_it:
RETiRet;
diff --git a/runtime/wti.h b/runtime/wti.h
index f466a053..51ece4ef 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -35,10 +35,11 @@ struct wti_s {
BEGINobjInstance;
pthread_t thrdID; /* thread ID */
int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */
- bool bAlwaysRunning; /* should this thread always run? */
+ sbool bAlwaysRunning; /* should this thread always run? */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutIsRunning);
};
@@ -50,8 +51,9 @@ rsRetVal wtiWorker(wti_t *pThis);
rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtiCancelThrd(wti_t *pThis);
rsRetVal wtiSetAlwaysRunning(wti_t *pThis);
-rsRetVal wtiSetState(wti_t *pThis, bool bNew);
-bool wtiGetState(wti_t *pThis);
+rsRetVal wtiSetState(wti_t *pThis, sbool bNew);
+rsRetVal wtiWakeupThrd(wti_t *pThis);
+sbool wtiGetState(wti_t *pThis);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 060e6627..ece80911 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -96,6 +96,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfObjProcessed = NotImplementedDummy;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutWtpState);
ENDobjConstruct(wtp)
@@ -149,13 +151,15 @@ CODESTARTobjDestruct(wtp)
pthread_cond_destroy(&pThis->condThrdTrm);
pthread_mutex_destroy(&pThis->mutWtp);
pthread_attr_destroy(&pThis->attrThrd);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutWtpState);
free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21
- * We do not need to do atomic instructions as set operations are only
+ * We do not need to do atomic instructions as set operations are only
* called when terminating the pool, and then in strict sequence. So we
* can never overwrite each other. On the other hand, it also doesn't
* matter if the read operation obtains an older value, as we then simply
@@ -166,7 +170,7 @@ rsRetVal
wtpSetState(wtp_t *pThis, wtpState_t iNewState)
{
ISOBJ_TYPE_assert(pThis, wtp);
- pThis->wtpState = iNewState;
+ pThis->wtpState = iNewState; // TODO: do we need a mutex here? 2010-04-26
return RS_RET_OK;
}
@@ -186,7 +190,7 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
/* we need a consistent value, but it doesn't really matter if it is changed
* right after the fetch - then we simply do one more iteration in the worker
*/
- wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState);
+ wtpState = (wtpState_t) ATOMIC_FETCH_32BIT((int*)&pThis->wtpState, &pThis->mutWtpState);
if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
@@ -216,6 +220,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
{
DEFiRet;
int bTimedOut;
+ int i;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -223,6 +228,10 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
d_pthread_mutex_lock(pThis->pmutUsr);
wtpSetState(pThis, tShutdownCmd);
pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
+ /* awake workers in retry loop */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ wtiWakeupThrd(pThis->pWrkr[i]);
+ }
d_pthread_mutex_unlock(pThis->pmutUsr);
/* wait for worker thread termination */
@@ -231,12 +240,19 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
- wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
+ wtpGetDbgHdr(pThis), timeoutVal(ptTimeout),
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) {
DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
+
+ /* awake workers in retry loop */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ wtiWakeupThrd(pThis->pWrkr[i]);
+ }
+
}
pthread_cleanup_pop(1);
@@ -285,10 +301,11 @@ wtpWrkrExecCleanup(wti_t *pWti)
/* the order of the next two statements is important! */
wtiSetState(pWti, WRKTHRD_STOPPED);
- ATOMIC_DEC(pThis->iCurNumWrkThrd);
+ ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
- DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
- wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
+ DBGPRINTF("%s: Worker thread %lx, terminated, um workers now %d\n",
+ wtpGetDbgHdr(pThis), (unsigned long) pWti,
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
ENDfunc
}
@@ -329,20 +346,28 @@ wtpWrkrExecCancelCleanup(void *arg)
static void *
wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
{
- uchar *pszDbgHdr;
- uchar thrdName[32] = "rs:";
wti_t *pWti = (wti_t*) arg;
wtp_t *pThis;
sigset_t sigSet;
+# if HAVE_PRCTL && defined PR_SET_NAME
+ uchar *pszDbgHdr;
+ uchar thrdName[32] = "rs:";
+# endif
BEGINfunc
ISOBJ_TYPE_assert(pWti, wti);
pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
+ /* block all signals */
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+ /* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */
+ sigemptyset(&sigSet);
+ sigaddset(&sigSet, SIGTTIN);
+ pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
+
# if HAVE_PRCTL && defined PR_SET_NAME
/* set thread name - we ignore if the call fails, has no harsh consequences... */
pszDbgHdr = wtpGetDbgHdr(pThis);
@@ -398,10 +423,11 @@ wtpStartWrkr(wtp_t *pThis)
pWti = pThis->pWrkr[i];
wtiSetState(pWti, WRKTHRD_RUNNING);
iState = pthread_create(&(pWti->thrdID), &pThis->attrThrd, wtpWorker, (void*) pWti);
- ATOMIC_INC(pThis->iCurNumWrkThrd); /* we got one more! */
+ ATOMIC_INC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); /* we got one more! */
DBGPRINTF("%s: started with state %d, num workers now %d\n",
- wtpGetDbgHdr(pThis), iState, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
+ wtpGetDbgHdr(pThis), iState,
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
finalize_it:
d_pthread_mutex_unlock(&pThis->mutWtp);
@@ -432,7 +458,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
nMaxWrkr = pThis->iNumWorkerThreads;
- nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd);
+ nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
if(nMissing > 0) {
DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n",
@@ -442,7 +468,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
CHKiRet(wtpStartWrkr(pThis));
}
} else {
-dbgprintf("YYY: wtpAdviseMaxWorkers, sufficient workers, just doing adivse signal cond busy\n");
pthread_cond_signal(pThis->pcondBusy);
}
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 05c02a8c..7e6b4394 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -26,6 +26,7 @@
#include <pthread.h>
#include "obj.h"
+#include "atomic.h"
/* states for worker threads. */
#define WRKTHRD_STOPPED FALSE
@@ -65,6 +66,8 @@ struct wtp_s {
rsRetVal (*pfDoWork)(void *pUsr, void *pWti);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutCurNumWrkThrd);
+ DEF_ATOMIC_HELPER_MUT(mutWtpState);
};
/* some symbolic constants for easier reference */