summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/Makefile.am1
-rw-r--r--runtime/atomic.h86
-rw-r--r--runtime/cfsysline.c15
-rw-r--r--runtime/conf.c4
-rw-r--r--runtime/ctok.c4
-rw-r--r--runtime/datetime.c2
-rw-r--r--runtime/debug.c31
-rw-r--r--runtime/debug.h5
-rw-r--r--runtime/glbl.c38
-rw-r--r--runtime/glbl.h11
-rw-r--r--runtime/msg.c46
-rw-r--r--runtime/msg.h4
-rw-r--r--runtime/net.c10
-rw-r--r--runtime/nsd_gtls.c13
-rw-r--r--runtime/nsdsel_gtls.c17
-rw-r--r--runtime/nsdsel_ptcp.c51
-rw-r--r--runtime/nsdsel_ptcp.h5
-rw-r--r--runtime/parser.c46
-rw-r--r--runtime/prop.c6
-rw-r--r--runtime/prop.h2
-rw-r--r--runtime/queue.c23
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.c2
-rw-r--r--runtime/rsyslog.h27
-rw-r--r--runtime/ruleset.c1
-rw-r--r--runtime/stream.c60
-rw-r--r--runtime/stringbuf.c2
-rw-r--r--runtime/strmsrv.c6
-rw-r--r--runtime/unlimited_select.h45
-rw-r--r--runtime/vm.c44
-rw-r--r--runtime/wti.c11
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c26
-rw-r--r--runtime/wtp.h3
34 files changed, 501 insertions, 148 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 14abe722..c1a15198 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -15,6 +15,7 @@ librsyslog_la_SOURCES = \
nsd.h \
glbl.h \
glbl.c \
+ unlimited_select.h \
conf.c \
conf.h \
parser.h \
diff --git a/runtime/atomic.h b/runtime/atomic.h
index d5aaf56b..fc3e0b2d 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -31,8 +31,6 @@
* A copy of the GPL can be found in the file "COPYING" in this distribution.
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
-#include "config.h" /* autotools! */
-
#ifndef INCLUDED_ATOMIC_H
#define INCLUDED_ATOMIC_H
@@ -41,17 +39,28 @@
* They simply came in too late. -- rgerhards, 2008-04-02
*/
#ifdef HAVE_ATOMIC_BUILTINS
-# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1))
# define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1)
-# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
-# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
+# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1))
+# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
-# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0)
-# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1)
+# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
+# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal));
+# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
+
+ /* functions below are not needed if we have atomics */
+# define DEF_ATOMIC_HELPER_MUT(x)
+# define INIT_ATOMIC_HELPER_MUT(x)
+# define DESTROY_ATOMIC_HELPER_MUT(x)
+
+ /* the following operations should preferrably be done atomic, but it is
+ * not fatal if not -- that means we can live with some missed updates. So be
+ * sure to use these macros only if that really does not matter!
+ */
+# define PREFER_ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
#else
/* note that we gained parctical proof that theoretical problems DO occur
* if we do not properly address them. See this blog post for details:
@@ -60,12 +69,63 @@
* simply go ahead and do without them - use mutexes or other things. The
* code needs to be checked against all those cases. -- rgerhards, 2009-01-30
*/
+ #include <pthread.h>
+# define ATOMIC_INC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ ++(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(&hlpmut); \
+ *(data) = 0; \
+ pthread_mutex_unlock(&hlpmut); \
+ }
+
+# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(&hlpmut); \
+ *(data) = 1; \
+ pthread_mutex_unlock(&hlpmut); \
+ }
+
+ static inline int
+ ATOMIC_CAS_VAL(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ if(*data == oldVal) {
+ *data = newVal;
+ }
+ val = *data;
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+# define ATOMIC_DEC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ --(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+ static inline int
+ ATOMIC_DEC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ val = --(*data);
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+#if 0
# warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!"
-# define ATOMIC_INC(data) (++(data))
-# define ATOMIC_DEC(data) (--(data))
-# define ATOMIC_DEC_AND_FETCH(data) (--(data))
-# define ATOMIC_FETCH_32BIT(data) (data)
-# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1
+# define ATOMIC_INC_AND_FETCH(data) (++(data))
+# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del
+# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del
+#endif
+# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x
+# define INIT_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+# define DESTROY_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+
+# define PREFER_ATOMIC_INC(data) ((void) ++data)
+
#endif
#endif /* #ifndef INCLUDED_ATOMIC_H */
diff --git a/runtime/cfsysline.c b/runtime/cfsysline.c
index 184c0d87..037e9f84 100644
--- a/runtime/cfsysline.c
+++ b/runtime/cfsysline.c
@@ -217,9 +217,11 @@ static rsRetVal doGetSize(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *
case 'K': i *= 1000; ++(*pp); break;
case 'M': i *= 1000000; ++(*pp); break;
case 'G': i *= 1000000000; ++(*pp); break;
- case 'T': i *= 1000000000000; ++(*pp); break; /* tera */
- case 'P': i *= 1000000000000000; ++(*pp); break; /* peta */
- case 'E': i *= 1000000000000000000; ++(*pp); break; /* exa */
+ /* we need to use the multiplication below because otherwise
+ * the compiler gets an error during constant parsing */
+ case 'T': i *= (int64) 1000 * 1000000000; ++(*pp); break; /* tera */
+ case 'P': i *= (int64) 1000000 * 1000000000; ++(*pp); break; /* peta */
+ case 'E': i *= (int64) 1000000000 * 1000000000; ++(*pp); break; /* exa */
}
/* done */
@@ -951,8 +953,6 @@ finalize_it:
*/
void dbgPrintCfSysLineHandlers(void)
{
- DEFiRet;
-
cslCmd_t *pCmd;
cslCmdHdlr_t *pCmdHdlr;
linkedListCookie_t llCookieCmd;
@@ -961,11 +961,11 @@ void dbgPrintCfSysLineHandlers(void)
dbgprintf("Sytem Line Configuration Commands:\n");
llCookieCmd = NULL;
- while((iRet = llGetNextElt(&llCmdList, &llCookieCmd, (void*)&pCmd)) == RS_RET_OK) {
+ while(llGetNextElt(&llCmdList, &llCookieCmd, (void*)&pCmd) == RS_RET_OK) {
llGetKey(llCookieCmd, (void*) &pKey); /* TODO: using the cookie is NOT clean! */
dbgprintf("\tCommand '%s':\n", pKey);
llCookieCmdHdlr = NULL;
- while((iRet = llGetNextElt(&pCmd->llCmdHdlrs, &llCookieCmdHdlr, (void*)&pCmdHdlr)) == RS_RET_OK) {
+ while(llGetNextElt(&pCmd->llCmdHdlrs, &llCookieCmdHdlr, (void*)&pCmdHdlr) == RS_RET_OK) {
dbgprintf("\t\ttype : %d\n", pCmdHdlr->eType);
dbgprintf("\t\tpData: 0x%lx\n", (unsigned long) pCmdHdlr->pData);
dbgprintf("\t\tHdlr : 0x%lx\n", (unsigned long) pCmdHdlr->cslCmdHdlr);
@@ -974,7 +974,6 @@ void dbgPrintCfSysLineHandlers(void)
}
}
dbgprintf("\n");
- ENDfunc
}
diff --git a/runtime/conf.c b/runtime/conf.c
index ef795237..bbd2147a 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -1015,7 +1015,7 @@ static rsRetVal cflineProcessTagSelector(uchar **pline)
if(**pline != '\0' && **pline == '*' && *(*pline+1) == '\0') {
dbgprintf("resetting programname filter\n");
if(pDfltProgNameCmp != NULL) {
- CHKiRet(rsCStrSetSzStr(pDfltProgNameCmp, NULL));
+ rsCStrDestruct(&pDfltProgNameCmp);
}
} else {
dbgprintf("setting programname filter to '%s'\n", *pline);
@@ -1084,7 +1084,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction)
DEFiRet;
modInfo_t *pMod;
omodStringRequest_t *pOMSR;
- action_t *pAction;
+ action_t *pAction = NULL;
void *pModData;
ASSERT(p != NULL);
diff --git a/runtime/ctok.c b/runtime/ctok.c
index 18ddaed2..99b0e095 100644
--- a/runtime/ctok.c
+++ b/runtime/ctok.c
@@ -1,4 +1,4 @@
-/* cfgtok.c - helper class to tokenize an input stream - which surprisingly
+/* ctok.c - helper class to tokenize an input stream - which surprisingly
* currently does not work with streams but with string. But that will
* probably change over time ;) This class was originally written to support
* the expression module but may evolve when (if) the expression module is
@@ -267,7 +267,7 @@ ctokGetVar(ctok_t *pThis, ctok_token_t *pToken)
{
DEFiRet;
uchar c;
- cstr_t *pstrVal;
+ cstr_t *pstrVal = NULL;
ISOBJ_TYPE_assert(pThis, ctok);
ASSERT(pToken != NULL);
diff --git a/runtime/datetime.c b/runtime/datetime.c
index eff72f91..593c3d5c 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -122,7 +122,7 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
else
t->OffsetMode = '+';
t->OffsetHour = lBias / 3600;
- t->OffsetMinute = lBias % 3600;
+ t->OffsetMinute = (lBias % 3600) / 60;
t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */
}
diff --git a/runtime/debug.c b/runtime/debug.c
index 9b7c2952..81b45d41 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -154,7 +154,9 @@ static pthread_key_t keyCallStack;
*/
static void dbgMutexCancelCleanupHdlr(void *pmut)
{
- pthread_mutex_unlock((pthread_mutex_t*) pmut);
+ int ret;
+ ret = pthread_mutex_unlock((pthread_mutex_t*) pmut);
+ assert(ret == 0);
}
@@ -433,14 +435,13 @@ dbgMutLog_t *dbgMutLogFindHolder(pthread_mutex_t *pmut)
static inline void dbgMutexPreLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB, int ln)
{
dbgMutLog_t *pHolder;
- dbgMutLog_t *pLog;
char pszBuf[128];
char pszHolderThrdName[64];
char *pszHolder;
pthread_mutex_lock(&mutMutLog);
pHolder = dbgMutLogFindHolder(pmut);
- pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCKWAIT, pFuncDB, ln);
+ dbgMutLogAddEntry(pmut, MUTOP_LOCKWAIT, pFuncDB, ln);
if(pHolder == NULL)
pszHolder = "[NONE]";
@@ -481,14 +482,13 @@ static inline void dbgMutexLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB,
static inline void dbgMutexPreTryLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB, int ln)
{
dbgMutLog_t *pHolder;
- dbgMutLog_t *pLog;
char pszBuf[128];
char pszHolderThrdName[64];
char *pszHolder;
pthread_mutex_lock(&mutMutLog);
pHolder = dbgMutLogFindHolder(pmut);
- pLog = dbgMutLogAddEntry(pmut, MUTOP_TRYLOCK, pFuncDB, ln);
+ dbgMutLogAddEntry(pmut, MUTOP_TRYLOCK, pFuncDB, ln);
if(pHolder == NULL)
pszHolder = "[NONE]";
@@ -846,6 +846,7 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
size_t lenWriteBuf;
struct timespec t;
uchar *pszObjName = NULL;
+ int ret;
/* we must get the object name before we lock the mutex, because the object
* potentially calls back into us. If we locked the mutex, we would deadlock
@@ -857,7 +858,8 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
pszObjName = obj.GetName(pObj);
}
- pthread_mutex_lock(&mutdbgprint);
+ ret = pthread_mutex_lock(&mutdbgprint);
+ assert(ret == 0); /* make sure mutex operation does not fail */
pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
/* The bWasNL handler does not really work. It works if no thread
@@ -941,6 +943,15 @@ dbgoprint(obj_t *pObj, char *fmt, ...)
va_start(ap, fmt);
lenWriteBuf = vsnprintf(pszWriteBuf, sizeof(pszWriteBuf), fmt, ap);
va_end(ap);
+ if(lenWriteBuf >= sizeof(pszWriteBuf)) {
+ /* prevent buffer overrruns and garbagge display */
+ pszWriteBuf[sizeof(pszWriteBuf) - 5] = '.';
+ pszWriteBuf[sizeof(pszWriteBuf) - 4] = '.';
+ pszWriteBuf[sizeof(pszWriteBuf) - 3] = '.';
+ pszWriteBuf[sizeof(pszWriteBuf) - 2] = '\n';
+ pszWriteBuf[sizeof(pszWriteBuf) - 1] = '\0';
+ lenWriteBuf = sizeof(pszWriteBuf);
+ }
dbgprint(pObj, pszWriteBuf, lenWriteBuf);
}
@@ -952,7 +963,7 @@ void
dbgprintf(char *fmt, ...)
{
va_list ap;
- char pszWriteBuf[20480];
+ char pszWriteBuf[32*1024];
size_t lenWriteBuf;
if(!(Debug && debugging_on))
@@ -1060,7 +1071,7 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int
}
/* when we reach this point, we have a fully-initialized FuncDB! */
- ATOMIC_INC(pFuncDB->nTimesCalled);
+ PREFER_ATOMIC_INC(pFuncDB->nTimesCalled);
if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot))
dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
if(pThrd->stackPtr >= (int) (sizeof(pThrd->callStack) / sizeof(dbgFuncDB_t*))) {
@@ -1280,11 +1291,11 @@ dbgGetRuntimeOptions(void)
/* this is earlier in the process than the -d option, as such it
* allows us to spit out debug messages from the very beginning.
*/
- Debug = 1;
+ Debug = DEBUG_FULL;
debugging_on = 1;
} else if(!strcasecmp((char*)optname, "debugondemand")) {
/* Enables debugging, but turns off debug output */
- Debug = 1;
+ Debug = DEBUG_ONDEMAND;
debugging_on = 1;
dbgprintf("Note: debug on demand turned on via configuraton file, "
"use USR1 signal to activate.\n");
diff --git a/runtime/debug.h b/runtime/debug.h
index dcbfb930..cfdf819c 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -29,6 +29,11 @@
#include <pthread.h>
#include "obj-types.h"
+/* some settings for various debug modes */
+#define DEBUG_OFF 0
+#define DEBUG_ONDEMAND 1
+#define DEBUG_FULL 2
+
/* external static data elements (some time to be replaced) */
extern int Debug; /* debug flag - read-only after startup */
extern int debugging_on; /* read-only, except on sig USR1 */
diff --git a/runtime/glbl.c b/runtime/glbl.c
index 7fa61963..59d1fb0f 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -64,6 +64,7 @@ static int option_DisallowWarning = 1; /* complain if message from disallowed se
static int bDisableDNS = 0; /* don't look up IP addresses of remote messages */
static prop_t *propLocalHostName = NULL;/* our hostname as FQDN - read-only after startup */
static uchar *LocalHostName = NULL;/* our hostname - read-only after startup */
+static uchar *LocalHostNameOverride = NULL;/* user-overridden hostname - read-only after startup */
static uchar *LocalFQDNName = NULL;/* our hostname as FQDN - read-only after startup */
static uchar *LocalDomain; /* our local domain name - read-only after startup */
static char **StripDomains = NULL;/* these domains may be stripped before writing logs - r/o after s.u., never touched by init */
@@ -72,6 +73,9 @@ static uchar *pszDfltNetstrmDrvr = NULL; /* module name of default netstream dri
static uchar *pszDfltNetstrmDrvrCAF = NULL; /* default CA file for the netstrm driver */
static uchar *pszDfltNetstrmDrvrKeyFile = NULL; /* default key file for the netstrm driver (server) */
static uchar *pszDfltNetstrmDrvrCertFile = NULL; /* default cert file for the netstrm driver (server) */
+#ifdef USE_UNLIMITED_SELECT
+static int iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); /* size of select() bitmask in bytes */
+#endif
/* define a macro for the simple properties' set and get functions
@@ -104,6 +108,9 @@ SIMP_PROP(DisableDNS, bDisableDNS, int)
SIMP_PROP(LocalDomain, LocalDomain, uchar*)
SIMP_PROP(StripDomains, StripDomains, char**)
SIMP_PROP(LocalHosts, LocalHosts, char**)
+#ifdef USE_UNLIMITED_SELECT
+SIMP_PROP(FdSetSize, iFdSetSize, int)
+#endif
SIMP_PROP_SET(LocalFQDNName, LocalFQDNName, uchar*)
SIMP_PROP_SET(LocalHostName, LocalHostName, uchar*)
@@ -150,14 +157,19 @@ GenerateLocalHostNameProperty(void)
prop.Destruct(&propLocalHostName);
CHKiRet(prop.Construct(&propLocalHostName));
- if(LocalHostName == NULL)
- pszName = (uchar*) "[localhost]";
- else {
- if(GetPreserveFQDN() == 1)
- pszName = LocalFQDNName;
- else
- pszName = LocalHostName;
+ if(LocalHostNameOverride == NULL) {
+ if(LocalHostName == NULL)
+ pszName = (uchar*) "[localhost]";
+ else {
+ if(GetPreserveFQDN() == 1)
+ pszName = LocalFQDNName;
+ else
+ pszName = LocalHostName;
+ }
+ } else { /* local hostname is overriden via config */
+ pszName = LocalHostNameOverride;
}
+ DBGPRINTF("GenerateLocalHostName uses '%s'\n", pszName);
CHKiRet(prop.SetString(propLocalHostName, pszName, ustrlen(pszName)));
CHKiRet(prop.ConstructFinalize(propLocalHostName));
@@ -261,6 +273,9 @@ CODESTARTobjQueryInterface(glbl)
SIMP_PROP(DfltNetstrmDrvrCAF)
SIMP_PROP(DfltNetstrmDrvrKeyFile)
SIMP_PROP(DfltNetstrmDrvrCertFile)
+#ifdef USE_UNLIMITED_SELECT
+ SIMP_PROP(FdSetSize)
+#endif
#undef SIMP_PROP
finalize_it:
ENDobjQueryInterface(glbl)
@@ -287,6 +302,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
free(pszDfltNetstrmDrvrCertFile);
pszDfltNetstrmDrvrCertFile = NULL;
}
+ if(LocalHostNameOverride != NULL) {
+ free(LocalHostNameOverride);
+ LocalHostNameOverride = NULL;
+ }
if(pszWorkDir != NULL) {
free(pszWorkDir);
pszWorkDir = NULL;
@@ -295,6 +314,9 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
bOptimizeUniProc = 1;
bHUPisRestart = 0;
bPreserveFQDN = 0;
+#ifdef USE_UNLIMITED_SELECT
+ iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask);
+#endif
return RS_RET_OK;
}
@@ -315,6 +337,7 @@ BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercafile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCAF, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdriverkeyfile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrKeyFile, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercertfile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCertFile, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"localhostname", 0, eCmdHdlrGetWord, NULL, &LocalHostNameOverride, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"optimizeforuniprocessor", 0, eCmdHdlrBinary, NULL, &bOptimizeUniProc, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"hupisrestart", 0, eCmdHdlrBinary, NULL, &bHUPisRestart, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"preservefqdn", 0, eCmdHdlrBinary, NULL, &bPreserveFQDN, NULL));
@@ -338,6 +361,7 @@ BEGINObjClassExit(glbl, OBJ_IS_CORE_MODULE) /* class, version */
free(pszWorkDir);
if(LocalHostName != NULL)
free(LocalHostName);
+ free(LocalHostNameOverride);
if(LocalFQDNName != NULL)
free(LocalFQDNName);
objRelease(prop, CORE_COMPONENT);
diff --git a/runtime/glbl.h b/runtime/glbl.h
index dcfb6d5f..6a332576 100644
--- a/runtime/glbl.h
+++ b/runtime/glbl.h
@@ -62,9 +62,18 @@ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */
/* added v3, 2009-06-30 */
rsRetVal (*GenerateLocalHostNameProperty)(void);
prop_t* (*GetLocalHostNameProp)(void);
+ /* note: v4, v5 are already used by more recent versions, so we need to skip them! */
+ /* added v6, 2009-11-16 as part of varmojfekoj's "unlimited select()" patch
+ * Note that it must be always present, otherwise the interface would have different
+ * versions depending on compile settings, what is not acceptable.
+ * Use this property with care, it is only truly available if UNLIMITED_SELECT is enabled
+ * (I did not yet further investigate the details, because that code hopefully can be removed
+ * at some later stage).
+ */
+ SIMP_PROP(FdSetSize, int)
#undef SIMP_PROP
ENDinterface(glbl)
-#define glblCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */
+#define glblCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
/* version 2 had PreserveFQDN added - rgerhards, 2008-12-08 */
/* the remaining prototypes */
diff --git a/runtime/msg.c b/runtime/msg.c
index 2ce7843a..6335f462 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -748,7 +748,7 @@ BEGINobjDestruct(msg) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(msg)
/* DEV Debugging only ! dbgprintf("msgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pThis, pThis->iRefCount - 1); */
# ifdef HAVE_ATOMIC_BUILTINS
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, NULL);
# else
MsgLock(pThis);
currRefCount = --pThis->iRefCount;
@@ -800,9 +800,18 @@ CODESTARTobjDestruct(msg)
* that we trim too often when the counter wraps.
*/
static unsigned iTrimCtr = 1;
+# ifdef HAVE_ATOMICS
if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) {
malloc_trim(128*1024);
}
+# else
+static pthread_mutex_t mutTrimCtr = PTHREAD_MUTEX_INITIALIZER;
+ d_pthread_mutex_lock(&mutTrimCtr);
+ if(iTrimCtr++ % 100000 == 0) {
+ malloc_trim(128*1024);
+ }
+ d_pthread_mutex_unlock(&mutTrimCtr);
+# endif
}
# endif
} else {
@@ -888,7 +897,7 @@ msg_t* MsgDup(msg_t* pOld)
*/
if(pOld->iLenTAG > 0) {
if(pOld->iLenTAG < CONF_TAG_BUFSIZE) {
- memcpy(pNew->TAG.szBuf, pOld->TAG.szBuf, pOld->iLenTAG);
+ memcpy(pNew->TAG.szBuf, pOld->TAG.szBuf, pOld->iLenTAG + 1);
} else {
if((pNew->TAG.pszTAG = srUtilStrDup(pOld->TAG.pszTAG, pOld->iLenTAG)) == NULL) {
msgDestruct(&pNew);
@@ -1002,7 +1011,7 @@ msg_t *MsgAddRef(msg_t *pM)
{
assert(pM != NULL);
# ifdef HAVE_ATOMIC_BUILTINS
- ATOMIC_INC(pM->iRefCount);
+ ATOMIC_INC(&pM->iRefCount, NULL);
# else
MsgLock(pM);
pM->iRefCount++;
@@ -1476,7 +1485,7 @@ rsRetVal MsgSetPROCID(msg_t *pMsg, char* pszPROCID)
CHKiRet(cstrConstruct(&pMsg->pCSPROCID));
}
/* if we reach this point, we have the object */
- iRet = rsCStrSetSzStr(pMsg->pCSPROCID, (uchar*) pszPROCID);
+ CHKiRet(rsCStrSetSzStr(pMsg->pCSPROCID, (uchar*) pszPROCID));
CHKiRet(cstrFinalize(pMsg->pCSPROCID));
finalize_it:
@@ -2010,6 +2019,8 @@ finalize_it:
/* set raw message in message object. Size of message is provided.
+ * The function makes sure that the stored rawmsg is properly
+ * terminated by '\0'.
* rgerhards, 2009-06-16
*/
void MsgSetRawMsg(msg_t *pThis, char* pszRawMsg, size_t lenMsg)
@@ -2319,13 +2330,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pPropLen = sizeof("**INVALID PROPERTY NAME**") - 1;
return UCHAR_CONSTANT("**INVALID PROPERTY NAME**");
}
- /* the following line fixes the symptom, but not the root cause -- at least MSG sometimes
- * returns a size of one too less. To prevent all troubles, we recalculate the sizes based
- * on what we actually got. TODO: remove once root cause is found.
- * rgerhards, 2010-03-23
- */
- bufLen = ustrlen(pRes);
-
/* If we did not receive a template pointer, we are already done... */
if(pTpe == NULL) {
@@ -2489,7 +2493,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
bFound = 1;
} else {
dbgprintf("regex found at offset %d, new offset %d, tries %d\n",
- iOffs, iOffs + pmatch[0].rm_eo, iTry);
+ iOffs, (int) (iOffs + pmatch[0].rm_eo), iTry);
iOffs += pmatch[0].rm_eo;
++iTry;
}
@@ -2823,7 +2827,13 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
/* check for "." and ".." (note the parenthesis in the if condition!) */
- if((*pRes == '.') && (*(pRes + 1) == '\0' || (*(pRes + 1) == '.' && *(pRes + 2) == '\0'))) {
+ if(*pRes == '\0') {
+ if(*pbMustBeFreed == 1)
+ free(pRes);
+ pRes = UCHAR_CONSTANT("_");
+ bufLen = 1;
+ *pbMustBeFreed = 0;
+ } else if((*pRes == '.') && (*(pRes + 1) == '\0' || (*(pRes + 1) == '.' && *(pRes + 2) == '\0'))) {
uchar *pTmp = pRes;
if(*(pRes + 1) == '\0')
@@ -2833,12 +2843,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pTmp);
*pbMustBeFreed = 0;
- } else if(*pRes == '\0') {
- if(*pbMustBeFreed == 1)
- free(pRes);
- pRes = UCHAR_CONSTANT("_");
- bufLen = 1;
- *pbMustBeFreed = 0;
}
}
@@ -2877,7 +2881,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(pTpe->data.field.options.bCSV) {
/* we need to obtain a private copy, as we need to at least add the double quotes */
int iBufLen;
- int i;
uchar *pBStart;
uchar *pDst;
uchar *pSrc;
@@ -2892,7 +2895,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
RET_OUT_OF_MEMORY;
}
pSrc = pRes;
- i = 0;
*pDst++ = '"'; /* starting quote */
while(*pSrc) {
if(*pSrc == '"')
@@ -3053,7 +3055,7 @@ static rsRetVal msgConstructFinalizer(msg_t *pThis)
* rgerhards, 2008-01-14
*/
static rsRetVal
-MsgGetSeverity(obj_t *pThis, int *piSeverity)
+MsgGetSeverity(obj_t_ptr pThis, int *piSeverity)
{
ISOBJ_TYPE_assert(pThis, msg);
assert(piSeverity != NULL);
diff --git a/runtime/msg.h b/runtime/msg.h
index 3a02365b..a6923d52 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -32,6 +32,7 @@
#include "obj.h"
#include "syslogd-types.h"
#include "template.h"
+#include "atomic.h"
/* rgerhards 2004-11-08: The following structure represents a
@@ -59,9 +60,9 @@ struct msg {
flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because
once data has entered the queue, this property is no longer needed. */
pthread_mutex_t mut;
+ int iRefCount; /* reference counter (0 = unused) */
bool bDoLock; /* use the mutex? */
bool bParseHOSTNAME; /* should the hostname be parsed from the message? */
- short iRefCount; /* reference counter (0 = unused) */
/* background: the hostname is not present on "regular" messages
* received via UNIX domain sockets from the same machine. However,
* it is available when we have a forwarder (e.g. rfc3195d) using local
@@ -130,6 +131,7 @@ struct msg {
#define MARK 0x008 /* this message is a mark */
#define NEEDS_PARSING 0x010 /* raw message, must be parsed before processing can be done */
#define PARSE_HOSTNAME 0x020 /* parse the hostname during message parsing */
+#define NO_PRI_IN_RAW 0x100 /* rawmsg does not include a PRI (Solaris!), but PRI is already set correctly in the msg object */
/* function prototypes
diff --git a/runtime/net.c b/runtime/net.c
index fe6eef5b..8fd8cc2b 100644
--- a/runtime/net.c
+++ b/runtime/net.c
@@ -502,8 +502,8 @@ static inline void MaskIP4 (struct in_addr *addr, uint8_t bits) {
addr->s_addr &= htonl(0xffffffff << (32 - bits));
}
-#define SIN(sa) ((struct sockaddr_in *)(sa))
-#define SIN6(sa) ((struct sockaddr_in6 *)(sa))
+#define SIN(sa) ((struct sockaddr_in *)(void*)(sa))
+#define SIN6(sa) ((struct sockaddr_in6 *)(void*)(sa))
/* This is a cancel-safe getnameinfo() version, because we learned
@@ -721,7 +721,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
SIN(allowIP.addr.NetAddr)->sin_port = 0;
memcpy(&(SIN(allowIP.addr.NetAddr)->sin_addr.s_addr),
&(SIN6(res->ai_addr)->sin6_addr.s6_addr32[3]),
- sizeof (struct sockaddr_in));
+ sizeof (in_addr_t));
if((iRet = AddAllowedSenderEntry(ppRoot, ppLast, &allowIP,
iSignificantBits))
@@ -1165,12 +1165,12 @@ void debugListenInfo(int fd, char *type)
switch(sa.sa_family) {
case PF_INET:
szFamily = "IPv4";
- ipv4 = (struct sockaddr_in*) &sa;
+ ipv4 = (struct sockaddr_in*)(void*) &sa;
port = ntohs(ipv4->sin_port);
break;
case PF_INET6:
szFamily = "IPv6";
- ipv6 = (struct sockaddr_in6*) &sa;
+ ipv6 = (struct sockaddr_in6*)(void*) &sa;
port = ntohs(ipv6->sin6_port);
break;
default:
diff --git a/runtime/nsd_gtls.c b/runtime/nsd_gtls.c
index fb2e219d..744020e9 100644
--- a/runtime/nsd_gtls.c
+++ b/runtime/nsd_gtls.c
@@ -112,7 +112,7 @@ readFile(uchar *pszFile, gnutls_datum_t *pBuf)
pBuf->data = NULL;
- if((fd = open((char*)pszFile, 0)) == -1) {
+ if((fd = open((char*)pszFile, O_RDONLY)) == -1) {
errmsg.LogError(0, RS_RET_FILE_NOT_FOUND, "can not read file '%s'", pszFile);
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
@@ -201,10 +201,14 @@ finalize_it:
if(iRet != RS_RET_OK) {
if(data.data != NULL)
free(data.data);
- if(pThis->bOurCertIsInit)
+ if(pThis->bOurCertIsInit) {
gnutls_x509_crt_deinit(pThis->ourCert);
- if(pThis->bOurKeyIsInit)
+ pThis->bOurCertIsInit = 0;
+ }
+ if(pThis->bOurKeyIsInit) {
gnutls_x509_privkey_deinit(pThis->ourKey);
+ pThis->bOurKeyIsInit = 0;
+ }
}
RETiRet;
}
@@ -1118,6 +1122,7 @@ gtlsEndSess(nsd_gtls_t *pThis)
}
}
gnutls_deinit(pThis->sess);
+ pThis->bHaveSess = 0;
}
RETiRet;
}
@@ -1171,6 +1176,8 @@ CODESTARTobjDestruct(nsd_gtls)
gnutls_x509_crt_deinit(pThis->ourCert);
if(pThis->bOurKeyIsInit)
gnutls_x509_privkey_deinit(pThis->ourKey);
+ if(pThis->bHaveSess)
+ gnutls_deinit(pThis->sess);
ENDobjDestruct(nsd_gtls)
diff --git a/runtime/nsdsel_gtls.c b/runtime/nsdsel_gtls.c
index c3a93bee..1a389a00 100644
--- a/runtime/nsdsel_gtls.c
+++ b/runtime/nsdsel_gtls.c
@@ -76,6 +76,9 @@ Add(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp)
if(pNsdGTLS->iMode == 1) {
if(waitOp == NSDSEL_RD && gtlsHasRcvInBuffer(pNsdGTLS)) {
++pThis->iBufferRcvReady;
+ dbgprintf("nsdsel_gtls: data already present in buffer, initiating "
+ "dummy select %p->iBufferRcvReady=%d\n",
+ pThis, pThis->iBufferRcvReady);
FINALIZE;
}
if(pNsdGTLS->rtryCall != gtlsRtry_None) {
@@ -109,6 +112,7 @@ Select(nsdsel_t *pNsdsel, int *piNumReady)
if(pThis->iBufferRcvReady > 0) {
/* we still have data ready! */
*piNumReady = pThis->iBufferRcvReady;
+ dbgprintf("nsdsel_gtls: doing dummy select, data present\n");
} else {
iRet = nsdsel_ptcp.Select(pThis->pTcp, piNumReady);
}
@@ -190,6 +194,9 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady)
if(pNsdGTLS->iMode == 1) {
if(waitOp == NSDSEL_RD && gtlsHasRcvInBuffer(pNsdGTLS)) {
*pbIsReady = 1;
+ --pThis->iBufferRcvReady; /* one "pseudo-read" less */
+ dbgprintf("nsdl_gtls: dummy read, decermenting %p->iBufRcvReady, now %d\n",
+ pThis, pThis->iBufferRcvReady);
FINALIZE;
}
if(pNsdGTLS->rtryCall != gtlsRtry_None) {
@@ -200,6 +207,16 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady)
*pbIsReady = 0;
FINALIZE;
}
+ /* now we must ensure that we do not fall back to PTCP if we have
+ * done a "dummy" select. In that case, we know when the predicate
+ * is not matched here, we do not have data available for this
+ * socket. -- rgerhards, 2010-11-20
+ */
+ if(pThis->iBufferRcvReady) {
+ dbgprintf("nsd_gtls: dummy read, buffer not available for this FD\n");
+ *pbIsReady = 0;
+ FINALIZE;
+ }
}
CHKiRet(nsdsel_ptcp.IsReady(pThis->pTcp, pNsdGTLS->pTcp, waitOp, pbIsReady));
diff --git a/runtime/nsdsel_ptcp.c b/runtime/nsdsel_ptcp.c
index 41b85e0c..e2cfca7c 100644
--- a/runtime/nsdsel_ptcp.c
+++ b/runtime/nsdsel_ptcp.c
@@ -36,6 +36,7 @@
#include "errmsg.h"
#include "nsd_ptcp.h"
#include "nsdsel_ptcp.h"
+#include "unlimited_select.h"
/* static data */
DEFobjStaticHelpers
@@ -47,14 +48,23 @@ DEFobjCurrIf(glbl)
*/
BEGINobjConstruct(nsdsel_ptcp) /* be sure to specify the object type also in END macro! */
pThis->maxfds = 0;
+#ifdef USE_UNLIMITED_SELECT
+ pThis->pReadfds = calloc(1, glbl.GetFdSetSize());
+ pThis->pWritefds = calloc(1, glbl.GetFdSetSize());
+#else
FD_ZERO(&pThis->readfds);
FD_ZERO(&pThis->writefds);
+#endif
ENDobjConstruct(nsdsel_ptcp)
/* destructor for the nsdsel_ptcp object */
BEGINobjDestruct(nsdsel_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(nsdsel_ptcp)
+#ifdef USE_UNLIMITED_SELECT
+ freeFdSet(pThis->pReadfds);
+ freeFdSet(pThis->pWritefds);
+#endif
ENDobjDestruct(nsdsel_ptcp)
@@ -65,20 +75,27 @@ Add(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp)
DEFiRet;
nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds = pThis->pReadfds;
+ fd_set *pWritefds = pThis->pWritefds;
+#else
+ fd_set *pReadfds = &pThis->readfds;
+ fd_set *pWritefds = &pThis->writefds;
+#endif
ISOBJ_TYPE_assert(pSock, nsd_ptcp);
ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
switch(waitOp) {
case NSDSEL_RD:
- FD_SET(pSock->sock, &pThis->readfds);
+ FD_SET(pSock->sock, pReadfds);
break;
case NSDSEL_WR:
- FD_SET(pSock->sock, &pThis->writefds);
+ FD_SET(pSock->sock, pWritefds);
break;
case NSDSEL_RDWR:
- FD_SET(pSock->sock, &pThis->readfds);
- FD_SET(pSock->sock, &pThis->writefds);
+ FD_SET(pSock->sock, pReadfds);
+ FD_SET(pSock->sock, pWritefds);
break;
}
@@ -98,6 +115,13 @@ Select(nsdsel_t *pNsdsel, int *piNumReady)
DEFiRet;
int i;
nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds = pThis->pReadfds;
+ fd_set *pWritefds = pThis->pWritefds;
+#else
+ fd_set *pReadfds = &pThis->readfds;
+ fd_set *pWritefds = &pThis->writefds;
+#endif
ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
assert(piNumReady != NULL);
@@ -106,13 +130,13 @@ Select(nsdsel_t *pNsdsel, int *piNumReady)
// TODO: name in dbgprintf!
dbgprintf("--------<NSDSEL_PTCP> calling select, active fds (max %d): ", pThis->maxfds);
for(i = 0; i <= pThis->maxfds; ++i)
- if(FD_ISSET(i, &pThis->readfds) || FD_ISSET(i, &pThis->writefds))
+ if(FD_ISSET(i, pReadfds) || FD_ISSET(i, pWritefds))
dbgprintf("%d ", i);
dbgprintf("\n");
}
/* now do the select */
- *piNumReady = select(pThis->maxfds+1, &pThis->readfds, &pThis->writefds, NULL, NULL);
+ *piNumReady = select(pThis->maxfds+1, pReadfds, pWritefds, NULL, NULL);
RETiRet;
}
@@ -125,6 +149,13 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady)
DEFiRet;
nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds = pThis->pReadfds;
+ fd_set *pWritefds = pThis->pWritefds;
+#else
+ fd_set *pReadfds = &pThis->readfds;
+ fd_set *pWritefds = &pThis->writefds;
+#endif
ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
ISOBJ_TYPE_assert(pSock, nsd_ptcp);
@@ -132,14 +163,14 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady)
switch(waitOp) {
case NSDSEL_RD:
- *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds);
+ *pbIsReady = FD_ISSET(pSock->sock, pReadfds);
break;
case NSDSEL_WR:
- *pbIsReady = FD_ISSET(pSock->sock, &pThis->writefds);
+ *pbIsReady = FD_ISSET(pSock->sock, pWritefds);
break;
case NSDSEL_RDWR:
- *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds)
- | FD_ISSET(pSock->sock, &pThis->writefds);
+ *pbIsReady = FD_ISSET(pSock->sock, pReadfds)
+ | FD_ISSET(pSock->sock, pWritefds);
break;
}
diff --git a/runtime/nsdsel_ptcp.h b/runtime/nsdsel_ptcp.h
index 6c0c7fa7..f9ec8210 100644
--- a/runtime/nsdsel_ptcp.h
+++ b/runtime/nsdsel_ptcp.h
@@ -31,8 +31,13 @@ typedef nsdsel_if_t nsdsel_ptcp_if_t; /* we just *implement* this interface */
struct nsdsel_ptcp_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
int maxfds;
+#ifdef USE_UNLIMITED_SELECT
+ fd_set *pReadfds;
+ fd_set *pWritefds;
+#else
fd_set readfds;
fd_set writefds;
+#endif
};
/* interface is defined in nsd.h, we just implement it! */
diff --git a/runtime/parser.c b/runtime/parser.c
index 57b7bf8f..8428ea0f 100644
--- a/runtime/parser.c
+++ b/runtime/parser.c
@@ -176,7 +176,10 @@ sanitizeMessage(msg_t *pMsg)
pszMsg = pMsg->pszRawMsg;
lenMsg = pMsg->iLenRawMsg;
- /* remove NUL character at end of message (see comment in function header) */
+ /* remove NUL character at end of message (see comment in function header)
+ * Note that we do not need to add a NUL character in this case, because it
+ * is already present ;)
+ */
if(pszMsg[lenMsg-1] == '\0') {
DBGPRINTF("dropped NUL at very end of message\n");
bUpdatedLen = TRUE;
@@ -190,8 +193,9 @@ sanitizeMessage(msg_t *pMsg)
*/
if(bDropTrailingLF && pszMsg[lenMsg-1] == '\n') {
DBGPRINTF("dropped LF at very end of message (DropTrailingLF is set)\n");
- bUpdatedLen = TRUE;
lenMsg--;
+ pszMsg[lenMsg] = '\0';
+ bUpdatedLen = TRUE;
}
/* it is much quicker to sweep over the message and see if it actually
@@ -249,6 +253,7 @@ sanitizeMessage(msg_t *pMsg)
}
++iSrc;
}
+ pDst[iDst] = '\0';
MsgSetRawMsg(pMsg, (char*)pDst, iDst); /* save sanitized string */
@@ -271,7 +276,6 @@ rsRetVal parseMsg(msg_t *pMsg)
uchar *msg;
int pri;
int lenMsg;
- int iPriText;
if(pMsg->iLenRawMsg == 0)
ABORT_FINALIZE(RS_RET_EMPTY_MSG);
@@ -285,24 +289,28 @@ rsRetVal parseMsg(msg_t *pMsg)
lenMsg = pMsg->iLenRawMsg;
msg = pMsg->pszRawMsg;
pri = DEFUPRI;
- iPriText = 0;
- if(*msg == '<') {
- /* while we process the PRI, we also fill the PRI textual representation
- * inside the msg object. This may not be ideal from an OOP point of view,
- * but it offers us performance...
- */
- pri = 0;
- while(--lenMsg > 0 && isdigit((int) *++msg)) {
- pri = 10 * pri + (*msg - '0');
+ if(pMsg->msgFlags & NO_PRI_IN_RAW) {
+ /* In this case, simply do so as if the pri would be right at top */
+ MsgSetAfterPRIOffs(pMsg, 0);
+ } else {
+ if(*msg == '<') {
+ /* while we process the PRI, we also fill the PRI textual representation
+ * inside the msg object. This may not be ideal from an OOP point of view,
+ * but it offers us performance...
+ */
+ pri = 0;
+ while(--lenMsg > 0 && isdigit((int) *++msg)) {
+ pri = 10 * pri + (*msg - '0');
+ }
+ if(*msg == '>')
+ ++msg;
+ if(pri & ~(LOG_FACMASK|LOG_PRIMASK))
+ pri = DEFUPRI;
}
- if(*msg == '>')
- ++msg;
- if(pri & ~(LOG_FACMASK|LOG_PRIMASK))
- pri = DEFUPRI;
+ pMsg->iFacility = LOG_FAC(pri);
+ pMsg->iSeverity = LOG_PRI(pri);
+ MsgSetAfterPRIOffs(pMsg, msg - pMsg->pszRawMsg);
}
- pMsg->iFacility = LOG_FAC(pri);
- pMsg->iSeverity = LOG_PRI(pri);
- MsgSetAfterPRIOffs(pMsg, msg - pMsg->pszRawMsg);
/* rger 2005-11-24 (happy thanksgiving!): we now need to check if we have
* a traditional syslog message or one formatted according to syslog-protocol.
diff --git a/runtime/prop.c b/runtime/prop.c
index d188b2ed..7f2a56ff 100644
--- a/runtime/prop.c
+++ b/runtime/prop.c
@@ -53,6 +53,7 @@ DEFobjStaticHelpers
*/
BEGINobjConstruct(prop) /* be sure to specify the object type also in END macro! */
pThis->iRefCount = 1;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutRefCount);
ENDobjConstruct(prop)
@@ -60,11 +61,12 @@ ENDobjConstruct(prop)
BEGINobjDestruct(prop) /* be sure to specify the object type also in END and CODESTART macros! */
int currRefCount;
CODESTARTobjDestruct(prop)
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, &pThis->mutRefCount);
if(currRefCount == 0) {
/* (only) in this case we need to actually destruct the object */
if(pThis->len >= CONF_PROP_BUFSIZE)
free(pThis->szVal.psz);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutRefCount);
} else {
pThis = NULL; /* tell framework NOT to destructing the object! */
}
@@ -132,7 +134,7 @@ propConstructFinalize(prop_t __attribute__((unused)) *pThis)
*/
static rsRetVal AddRef(prop_t *pThis)
{
- ATOMIC_INC(pThis->iRefCount);
+ ATOMIC_INC(&pThis->iRefCount, &pThis->mutRefCount);
return RS_RET_OK;
}
diff --git a/runtime/prop.h b/runtime/prop.h
index e3519664..07b2ab7e 100644
--- a/runtime/prop.h
+++ b/runtime/prop.h
@@ -24,6 +24,7 @@
*/
#ifndef INCLUDED_PROP_H
#define INCLUDED_PROP_H
+#include "atomic.h"
/* the prop object */
struct prop_s {
@@ -34,6 +35,7 @@ struct prop_s {
uchar sz[CONF_PROP_BUFSIZE];
} szVal;
int len; /* we use int intentionally, otherwise we may get some troubles... */
+ DEF_ATOMIC_HELPER_MUT(mutRefCount);
};
/* interfaces */
diff --git a/runtime/queue.c b/runtime/queue.c
index 9d7a9058..9c7f96d0 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -110,7 +110,7 @@ static inline void queueDrain(qqueue_t *pThis)
ASSERT(pThis != NULL);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
+ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
pThis->qDel(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
@@ -685,7 +685,6 @@ qqueueHaveQIF(qqueue_t *pThis)
{
DEFiRet;
uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
struct stat stat_buf;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -694,8 +693,8 @@ qqueueHaveQIF(qqueue_t *pThis)
ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
/* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
+ snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
+ (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
/* check if the file exists */
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
@@ -1028,7 +1027,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1057,7 +1056,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
+ ATOMIC_DEC(&pThis->iQueueSize, &pThis->mutQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
@@ -1345,6 +1344,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
break;
}
+ INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
RETiRet;
@@ -2065,6 +2066,8 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
@@ -2206,6 +2209,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
+ dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
}
/* and finally enqueue the message */
@@ -2292,6 +2296,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
+ dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
}
/* and finally enqueue the message */
@@ -2315,6 +2320,7 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int iCancelStateSave;
int i;
+ rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -2326,8 +2332,9 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
}
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
-dbgprintf("queueMultiEnq: %d\n", i);
- CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
+ localRet = doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]);
+ if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL)
+ ABORT_FINALIZE(localRet);
}
finalize_it:
diff --git a/runtime/queue.h b/runtime/queue.h
index 1d82d8d9..aafdaa45 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -160,6 +160,7 @@ typedef struct queue_s {
strm_t *pRead; /* current file to be read */
} disk;
} tVars;
+ DEF_ATOMIC_HELPER_MUT(mutQueueSize);
} qqueue_t;
/* some symbolic constants for easier reference */
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 443d0f41..c209ae30 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -80,6 +80,7 @@
#include "prop.h"
#include "rule.h"
#include "ruleset.h"
+#include "atomic.h"
/* forward definitions */
static rsRetVal dfltErrLogger(int, uchar *errMsg);
@@ -215,6 +216,7 @@ rsrtExit(void)
glblClassExit();
rulesetClassExit();
ruleClassExit();
+
objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */
}
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 8979893a..03f5120d 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -61,6 +61,7 @@
/* define some base data types */
+
typedef unsigned char uchar;/* get rid of the unhandy "unsigned char" */
typedef struct thrdInfo thrdInfo_t;
typedef struct obj_s obj_t;
@@ -78,8 +79,6 @@ typedef struct nsd_gsspi_s nsd_gsspi_t;
typedef struct nsd_nss_s nsd_nss_t;
typedef struct nsdsel_ptcp_s nsdsel_ptcp_t;
typedef struct nsdsel_gtls_s nsdsel_gtls_t;
-typedef obj_t nsd_t;
-typedef obj_t nsdsel_t;
typedef struct msg msg_t;
typedef struct prop_s prop_t;
typedef struct interface_s interface_t;
@@ -98,6 +97,21 @@ typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript f
typedef struct tcpLstnPortList_s tcpLstnPortList_t; // TODO: rename?
typedef struct strmLstnPortList_s strmLstnPortList_t; // TODO: rename?
+/* under Solaris (actually only SPARC), we need to redefine some types
+ * to be void, so that we get void* pointers. Otherwise, we will see
+ * alignment errors.
+ */
+#ifdef OS_SOLARIS
+ typedef void * obj_t_ptr;
+ typedef void nsd_t;
+ typedef void nsdsel_t;
+#else
+ typedef obj_t *obj_t_ptr;
+ typedef obj_t nsd_t;
+ typedef obj_t nsdsel_t;
+#endif
+
+
/* some universal 64 bit define... */
typedef long long int64;
typedef long long unsigned uint64;
@@ -360,6 +374,15 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_VAR_NOT_FOUND = -2142, /**< variable not found */
RS_RET_EMPTY_MSG = -2143, /**< provided (raw) MSG is empty */
RS_RET_PEER_CLOSED_CONN = -2144, /**< remote peer closed connection (information, no error) */
+ RS_RET_ERR_OPEN_KLOG = -2145, /**< error opening the kernel log socket (primarily solaris) */
+ RS_RET_ERR_AQ_CONLOG = -2146, /**< error aquiring console log (on solaris) */
+ RS_RET_ERR_DOOR = -2147, /**< some problems with handling the Solaris door functionality */
+ RS_RET_NO_SOCK_CONFIGURED = -2166, /**< no socket (name) was configured where one is required */
+ RS_RET_NO_LSTN_DEFINED = -2172, /**< no listener defined (e.g. inside an input module) */
+ RS_RET_EPOLL_CR_FAILED = -2173, /**< epoll_create() failed */
+ RS_RET_EPOLL_CTL_FAILED = -2174, /**< epoll_ctl() failed */
+ RS_RET_INTERNAL_ERROR = -2175, /**< rsyslogd internal error, unexpected code path reached */
+ RS_RET_OUTDATED_STMT = -2184, /**< some outdated statement/functionality is being used in conf file */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index d98b4217..af61f24f 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -348,6 +348,7 @@ destructAllActions(void)
CHKiRet(llDestroy(&llRulesets));
CHKiRet(llInit(&llRulesets, rulesetDestructForLinkedList, keyDestruct, strcasecmp));
+ pDfltRuleset = NULL;
finalize_it:
RETiRet;
diff --git a/runtime/stream.c b/runtime/stream.c
index e8805a40..44b24ee2 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -60,7 +60,14 @@
# include <sys/prctl.h>
#endif
-#define inline
+/* some platforms do not have large file support :( */
+#ifndef O_LARGEFILE
+# define O_LARGEFILE 0
+#endif
+#ifndef HAVE_LSEEK64
+ typedef off_t off64_t;
+# define lseek64(fd, offset, whence) lseek(fd, offset, whence)
+#endif
/* static data */
DEFobjStaticHelpers
@@ -214,8 +221,9 @@ doPhysOpen(strm_t *pThis)
iFlags |= O_NONBLOCK;
}
- pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
- DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName, pThis->fd, pThis->tOpenMode);
+ pThis->fd = open((char*)pThis->pszCurrFName, iFlags | O_LARGEFILE, pThis->tOpenMode);
+ DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName,
+ pThis->fd, (int) pThis->tOpenMode);
if(pThis->fd == -1) {
char errStr[1024];
int err = errno;
@@ -251,6 +259,7 @@ static rsRetVal strmOpenFile(strm_t *pThis)
if(pThis->fd != -1)
ABORT_FINALIZE(RS_RET_OK);
+ pThis->pszCurrFName = NULL; /* used to prevent mem leak in case of error */
if(pThis->pszFName == NULL)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
@@ -282,6 +291,16 @@ static rsRetVal strmOpenFile(strm_t *pThis)
(pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd);
finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pThis->pszCurrFName != NULL) {
+ free(pThis->pszCurrFName);
+ pThis->pszCurrFName = NULL; /* just to prevent mis-adressing down the road... */
+ }
+ if(pThis->fd != -1) {
+ close(pThis->fd);
+ pThis->fd = -1;
+ }
+ }
RETiRet;
}
@@ -393,6 +412,12 @@ finalize_it:
* If we are monitoring a file, someone may have rotated it. In this case, we
* also need to close it and reopen it under the same name.
* rgerhards, 2008-02-13
+ * The previous code also did a check for file truncation, in which case the
+ * file was considered rewritten. However, this potential border case turned
+ * out to be a big trouble spot on busy systems. It caused massive message
+ * duplication (I guess stat() can return a too-low number under some
+ * circumstances). So starting as of now, we only check the inode number and
+ * a file change is detected only if the inode changes. -- rgerhards, 2011-01-10
*/
static rsRetVal
strmHandleEOFMonitor(strm_t *pThis)
@@ -402,23 +427,18 @@ strmHandleEOFMonitor(strm_t *pThis)
struct stat statName;
ISOBJ_TYPE_assert(pThis, strm);
- /* find inodes of both current descriptor as well as file now in file
- * system. If they are different, the file has been rotated (or
- * otherwise rewritten). We also check the size, because the inode
- * does not change if the file is truncated (this, BTW, is also a case
- * where we actually loose log lines, because we can not do anything
- * against truncation...). We do NOT rely on the time of last
- * modificaton because that may not be available under all
- * circumstances. -- rgerhards, 2008-02-13
- */
if(fstat(pThis->fd, &statOpen) == -1)
ABORT_FINALIZE(RS_RET_IO_ERROR);
if(stat((char*) pThis->pszCurrFName, &statName) == -1)
ABORT_FINALIZE(RS_RET_IO_ERROR);
- if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) {
+ DBGPRINTF("stream checking for file change on '%s', inode %u/%u",
+ pThis->pszCurrFName, (unsigned) statOpen.st_ino,
+ (unsigned) statName.st_ino);
+ if(statOpen.st_ino == statName.st_ino) {
ABORT_FINALIZE(RS_RET_EOF);
} else {
/* we had a file change! */
+ DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName);
CHKiRet(strmCloseFile(pThis));
CHKiRet(strmOpenFile(pThis));
}
@@ -1188,7 +1208,7 @@ finalize_it:
* is invalidated.
* rgerhards, 2008-01-12
*/
-static rsRetVal strmSeek(strm_t *pThis, off_t offs)
+static rsRetVal strmSeek(strm_t *pThis, off64_t offs)
{
DEFiRet;
@@ -1198,9 +1218,9 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs)
strmOpenFile(pThis);
else
strmFlushInternal(pThis);
- int i;
- DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs);
- i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error!
+ long long i;
+ DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs);
+ i = lseek64(pThis->fd, offs, SEEK_SET); // TODO: check error!
pThis->iCurrOffs = offs; /* we are now at *this* offset */
pThis->iBufPtr = 0; /* buffer invalidated */
@@ -1477,7 +1497,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
{
DEFiRet;
int i;
- long l;
+ int64 l;
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
@@ -1499,8 +1519,8 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
i = pThis->tOpenMode;
objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i);
- l = (long) pThis->iCurrOffs;
- objSerializeSCALAR_VAR(pStrm, iCurrOffs, LONG, l);
+ l = pThis->iCurrOffs;
+ objSerializeSCALAR_VAR(pStrm, iCurrOffs, INT64, l);
CHKiRet(obj.EndSerialize(pStrm));
diff --git a/runtime/stringbuf.c b/runtime/stringbuf.c
index 93995b38..8b2fe455 100644
--- a/runtime/stringbuf.c
+++ b/runtime/stringbuf.c
@@ -156,7 +156,7 @@ rsRetVal
rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded)
{
uchar *pNewBuf;
- unsigned short iNewSize;
+ size_t iNewSize;
DEFiRet;
/* first compute the new size needed */
diff --git a/runtime/strmsrv.c b/runtime/strmsrv.c
index 3dc53a97..8cebf810 100644
--- a/runtime/strmsrv.c
+++ b/runtime/strmsrv.c
@@ -520,6 +520,7 @@ Run(strmsrv_t *pThis)
strms_sess_t *pNewSess;
nssel_t *pSel;
ssize_t iRcvd;
+ rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, strmsrv);
@@ -579,11 +580,12 @@ Run(strmsrv_t *pThis)
break;
case RS_RET_OK:
/* valid data received, process it! */
- if(strms_sess.DataRcvd(pThis->pSessions[iSTRMSess], buf, iRcvd) != RS_RET_OK) {
+ localRet = strms_sess.DataRcvd(pThis->pSessions[iSTRMSess], buf, iRcvd);
+ if(localRet != RS_RET_OK) {
/* in this case, something went awfully wrong.
* We are instructed to terminate the session.
*/
- errmsg.LogError(0, NO_ERRCODE, "Tearing down STRM Session %d - see "
+ errmsg.LogError(0, localRet, "Tearing down STRM Session %d - see "
"previous messages for reason(s)\n", iSTRMSess);
pThis->pOnErrClose(pThis->pSessions[iSTRMSess]);
strms_sess.Destruct(&pThis->pSessions[iSTRMSess]);
diff --git a/runtime/unlimited_select.h b/runtime/unlimited_select.h
new file mode 100644
index 00000000..32dadc03
--- /dev/null
+++ b/runtime/unlimited_select.h
@@ -0,0 +1,45 @@
+/* unlimited_select.h
+ * Tweak the macros for accessing fd_set so that the select() syscall
+ * won't be limited to a particular number of file descriptors.
+ *
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+
+#ifndef UNLIMITED_SELECT_H_INCLUDED
+
+#include <string.h>
+#include <stdlib.h>
+#include <sys/select.h>
+#include "glbl.h"
+
+#ifdef USE_UNLIMITED_SELECT
+# undef FD_ZERO
+# define FD_ZERO(set) memset((set), 0, glbl.GetFdSetSize());
+#endif
+
+#ifdef USE_UNLIMITED_SELECT
+void freeFdSet(fd_set *p) {
+ free(p);
+}
+#else
+# define freeFdSet(x)
+#endif
+
+#endif /* #ifndef UNLIMITED_SELECT_H_INCLUDED */
diff --git a/runtime/vm.c b/runtime/vm.c
index aaf3c879..0ed174d1 100644
--- a/runtime/vm.c
+++ b/runtime/vm.c
@@ -34,6 +34,7 @@
#include "vm.h"
#include "sysvar.h"
#include "stringbuf.h"
+#include "unicode-helper.h"
/* static data */
DEFobjStaticHelpers
@@ -41,6 +42,8 @@ DEFobjCurrIf(vmstk)
DEFobjCurrIf(var)
DEFobjCurrIf(sysvar)
+static pthread_mutex_t mutGetenv; /* we need to make this global because otherwise we can not guarantee proper init! */
+
/* ------------------------------ function registry code and structures ------------------------------ */
/* we maintain a registry of known functions */
@@ -542,6 +545,42 @@ finalize_it:
}
+/* The getenv function. Note that we guard the OS call by a mutex, as that
+ * function is not guaranteed to be thread-safe. This implementation here is far from
+ * being optimal, at least we should cache the result. This is left TODO for
+ * a later revision.
+ * rgerhards, 2009-11-03
+ */
+static rsRetVal
+rsf_getenv(vmstk_t *pStk, int numOperands)
+{
+ DEFiRet;
+ var_t *operand1;
+ char *envResult;
+ cstr_t *pCstr;
+
+ if(numOperands != 1)
+ ABORT_FINALIZE(RS_RET_INVLD_NBR_ARGUMENTS);
+
+ /* pop args and do operaton (trivial case here...) */
+ vmstk.PopString(pStk, &operand1);
+ d_pthread_mutex_lock(&mutGetenv);
+ envResult = getenv((char*) rsCStrGetSzStr(operand1->val.pStr));
+ DBGPRINTF("rsf_getenv(): envvar '%s', return '%s'\n", rsCStrGetSzStr(operand1->val.pStr),
+ envResult == NULL ? "(NULL)" : envResult);
+ iRet = rsCStrConstructFromszStr(&pCstr, (envResult == NULL) ? UCHAR_CONSTANT("") : (uchar*)envResult);
+ d_pthread_mutex_unlock(&mutGetenv);
+ if(iRet != RS_RET_OK)
+ FINALIZE; /* need to do this after mutex is unlocked! */
+
+ /* Store result and cleanup */
+ var.SetString(operand1, pCstr);
+ vmstk.Push(pStk, operand1);
+finalize_it:
+ RETiRet;
+}
+
+
/* The "tolower" function, which converts its sole argument to lower case.
* Quite honestly, currently this is primarily a test driver for me...
* rgerhards, 2009-04-06
@@ -759,6 +798,8 @@ BEGINObjClassExit(vm, OBJ_IS_CORE_MODULE) /* class, version */
objRelease(sysvar, CORE_COMPONENT);
objRelease(var, CORE_COMPONENT);
objRelease(vmstk, CORE_COMPONENT);
+
+ pthread_mutex_destroy(&mutGetenv);
ENDObjClassExit(vm)
@@ -779,6 +820,9 @@ BEGINObjClassInit(vm, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* register built-in functions // TODO: move to its own module */
CHKiRet(rsfrAddFunction((uchar*)"strlen", rsf_strlen));
CHKiRet(rsfrAddFunction((uchar*)"tolower", rsf_tolower));
+ CHKiRet(rsfrAddFunction((uchar*)"getenv", rsf_getenv));
+
+ pthread_mutex_init(&mutGetenv, NULL);
ENDObjClassInit(vm)
diff --git a/runtime/wti.c b/runtime/wti.c
index abdf4add..90bb14ed 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -146,7 +146,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
break;
}
/* apply the new state */
- unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
+ unsigned val = ATOMIC_CAS_VAL((int*)&pThis->tCurrCmd, tCurrCmd, tCmd, &pThis->mutCurrCmd);
if(val != tCurrCmd) {
DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
}
@@ -178,7 +178,7 @@ wtiCancelThrd(wti_t *pThis)
dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
pthread_cancel(pThis->thrdID);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pThis->pWtp, 1); /* indicate change, so harverster will be called */
}
d_pthread_mutex_unlock(&pThis->mut);
@@ -209,6 +209,7 @@ CODESTARTobjDestruct(wti)
/* actual destruction */
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -219,6 +220,7 @@ ENDobjDestruct(wti)
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
ENDobjConstruct(wti)
@@ -326,8 +328,7 @@ wtiWorkerCancelCleanup(void *arg)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pWtp->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -414,7 +415,7 @@ wtiWorker(wti_t *pThis)
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/wti.h b/runtime/wti.h
index 72653b15..d81672f3 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -39,6 +39,7 @@ typedef struct wti_s {
pthread_mutex_t mut;
bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutCurrCmd);
} wti_t;
/* some symbolic constants for easier reference */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 0c66dd11..b4fd2e04 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -97,6 +97,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfOnWorkerCancel = NotImplementedDummy;
pThis->pfOnWorkerStartup = NotImplementedDummy;
pThis->pfOnWorkerShutdown = NotImplementedDummy;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
ENDobjConstruct(wtp)
@@ -153,6 +154,7 @@ CODESTARTobjDestruct(wtp)
pthread_cond_destroy(&pThis->condThrdTrm);
pthread_mutex_destroy(&pThis->mut);
pthread_mutex_destroy(&pThis->mutThrdShutdwn);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
@@ -186,6 +188,20 @@ wtpWakeupAllWrkr(wtp_t *pThis)
}
+/* set the bThrdStateChanged in an atomic way. Note that
+ * val may only be 0 or 1.
+ */
+void
+wtpSetThrdStateChanged(wtp_t *pThis, int val)
+{
+ if(val == 0) {
+ ATOMIC_STORE_0_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged);
+ } else {
+ ATOMIC_STORE_1_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged);
+ }
+}
+
+
/* check if we had any worker thread changes and, if so, act
* on them. At a minimum, terminated threads are harvested (joined).
* This function MUST NEVER block on the queue mutex!
@@ -216,7 +232,7 @@ wtpProcessThrdChanges(wtp_t *pThis)
*/
do {
/* reset the change marker */
- ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged);
+ wtpSetThrdStateChanged(pThis, 0);
/* go through all threads */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);
@@ -421,13 +437,15 @@ wtpWrkrExecCancelCleanup(void *arg)
static void *
wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
{
- uchar *pszDbgHdr;
- uchar thrdName[32] = "rs:";
DEFiRet;
DEFVARS_mutexProtection;
wti_t *pWti = (wti_t*) arg;
wtp_t *pThis;
sigset_t sigSet;
+# if HAVE_PRCTL && defined PR_SET_NAME
+ uchar *pszDbgHdr;
+ uchar thrdName[32] = "rs:";
+# endif
ISOBJ_TYPE_assert(pWti, wti);
pThis = pWti->pWtp;
@@ -460,7 +478,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
do {
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */
+ wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
} while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1);
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 1ce171cc..640c3320 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -26,6 +26,7 @@
#include <pthread.h>
#include "obj.h"
+#include "atomic.h"
/* commands and states for worker threads. */
typedef enum {
@@ -79,6 +80,7 @@ typedef struct wtp_s {
rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged);
} wtp_t;
/* some symbolic constants for easier reference */
@@ -100,6 +102,7 @@ rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp);
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex);
+void wtpSetThrdStateChanged(wtp_t *pThis, int val);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));