diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/Makefile.am | 1 | ||||
-rw-r--r-- | runtime/atomic.h | 86 | ||||
-rw-r--r-- | runtime/cfsysline.c | 15 | ||||
-rw-r--r-- | runtime/conf.c | 4 | ||||
-rw-r--r-- | runtime/ctok.c | 4 | ||||
-rw-r--r-- | runtime/datetime.c | 2 | ||||
-rw-r--r-- | runtime/debug.c | 31 | ||||
-rw-r--r-- | runtime/debug.h | 5 | ||||
-rw-r--r-- | runtime/glbl.c | 38 | ||||
-rw-r--r-- | runtime/glbl.h | 11 | ||||
-rw-r--r-- | runtime/msg.c | 46 | ||||
-rw-r--r-- | runtime/msg.h | 4 | ||||
-rw-r--r-- | runtime/net.c | 10 | ||||
-rw-r--r-- | runtime/nsd_gtls.c | 13 | ||||
-rw-r--r-- | runtime/nsdsel_gtls.c | 17 | ||||
-rw-r--r-- | runtime/nsdsel_ptcp.c | 51 | ||||
-rw-r--r-- | runtime/nsdsel_ptcp.h | 5 | ||||
-rw-r--r-- | runtime/parser.c | 46 | ||||
-rw-r--r-- | runtime/prop.c | 6 | ||||
-rw-r--r-- | runtime/prop.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 23 | ||||
-rw-r--r-- | runtime/queue.h | 1 | ||||
-rw-r--r-- | runtime/rsyslog.c | 2 | ||||
-rw-r--r-- | runtime/rsyslog.h | 27 | ||||
-rw-r--r-- | runtime/ruleset.c | 1 | ||||
-rw-r--r-- | runtime/stream.c | 60 | ||||
-rw-r--r-- | runtime/stringbuf.c | 2 | ||||
-rw-r--r-- | runtime/strmsrv.c | 6 | ||||
-rw-r--r-- | runtime/unlimited_select.h | 45 | ||||
-rw-r--r-- | runtime/vm.c | 44 | ||||
-rw-r--r-- | runtime/wti.c | 11 | ||||
-rw-r--r-- | runtime/wti.h | 1 | ||||
-rw-r--r-- | runtime/wtp.c | 26 | ||||
-rw-r--r-- | runtime/wtp.h | 3 |
34 files changed, 501 insertions, 148 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am index 14abe722..c1a15198 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -15,6 +15,7 @@ librsyslog_la_SOURCES = \ nsd.h \ glbl.h \ glbl.c \ + unlimited_select.h \ conf.c \ conf.h \ parser.h \ diff --git a/runtime/atomic.h b/runtime/atomic.h index d5aaf56b..fc3e0b2d 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -31,8 +31,6 @@ * A copy of the GPL can be found in the file "COPYING" in this distribution. * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. */ -#include "config.h" /* autotools! */ - #ifndef INCLUDED_ATOMIC_H #define INCLUDED_ATOMIC_H @@ -41,17 +39,28 @@ * They simply came in too late. -- rgerhards, 2008-04-02 */ #ifdef HAVE_ATOMIC_BUILTINS -# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1)) +# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1)) # define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1) -# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1)) -# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1) +# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1)) +# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1) # define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff)) # define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1) -# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0) -# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1) +# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0) +# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1) # define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) # define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); -# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal)); +# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal)); + + /* functions below are not needed if we have atomics */ +# define DEF_ATOMIC_HELPER_MUT(x) +# define INIT_ATOMIC_HELPER_MUT(x) +# define DESTROY_ATOMIC_HELPER_MUT(x) + + /* the following operations should preferrably be done atomic, but it is + * not fatal if not -- that means we can live with some missed updates. So be + * sure to use these macros only if that really does not matter! + */ +# define PREFER_ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1)) #else /* note that we gained parctical proof that theoretical problems DO occur * if we do not properly address them. See this blog post for details: @@ -60,12 +69,63 @@ * simply go ahead and do without them - use mutexes or other things. The * code needs to be checked against all those cases. -- rgerhards, 2009-01-30 */ + #include <pthread.h> +# define ATOMIC_INC(data, phlpmut) { \ + pthread_mutex_lock(phlpmut); \ + ++(*(data)); \ + pthread_mutex_unlock(phlpmut); \ + } + +# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \ + pthread_mutex_lock(&hlpmut); \ + *(data) = 0; \ + pthread_mutex_unlock(&hlpmut); \ + } + +# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \ + pthread_mutex_lock(&hlpmut); \ + *(data) = 1; \ + pthread_mutex_unlock(&hlpmut); \ + } + + static inline int + ATOMIC_CAS_VAL(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) { + int val; + pthread_mutex_lock(phlpmut); + if(*data == oldVal) { + *data = newVal; + } + val = *data; + pthread_mutex_unlock(phlpmut); + return(val); + } + +# define ATOMIC_DEC(data, phlpmut) { \ + pthread_mutex_lock(phlpmut); \ + --(*(data)); \ + pthread_mutex_unlock(phlpmut); \ + } + + static inline int + ATOMIC_DEC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) { + int val; + pthread_mutex_lock(phlpmut); + val = --(*data); + pthread_mutex_unlock(phlpmut); + return(val); + } +#if 0 # warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!" -# define ATOMIC_INC(data) (++(data)) -# define ATOMIC_DEC(data) (--(data)) -# define ATOMIC_DEC_AND_FETCH(data) (--(data)) -# define ATOMIC_FETCH_32BIT(data) (data) -# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 +# define ATOMIC_INC_AND_FETCH(data) (++(data)) +# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del +# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del +#endif +# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x +# define INIT_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL) +# define DESTROY_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL) + +# define PREFER_ATOMIC_INC(data) ((void) ++data) + #endif #endif /* #ifndef INCLUDED_ATOMIC_H */ diff --git a/runtime/cfsysline.c b/runtime/cfsysline.c index 184c0d87..037e9f84 100644 --- a/runtime/cfsysline.c +++ b/runtime/cfsysline.c @@ -217,9 +217,11 @@ static rsRetVal doGetSize(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void * case 'K': i *= 1000; ++(*pp); break; case 'M': i *= 1000000; ++(*pp); break; case 'G': i *= 1000000000; ++(*pp); break; - case 'T': i *= 1000000000000; ++(*pp); break; /* tera */ - case 'P': i *= 1000000000000000; ++(*pp); break; /* peta */ - case 'E': i *= 1000000000000000000; ++(*pp); break; /* exa */ + /* we need to use the multiplication below because otherwise + * the compiler gets an error during constant parsing */ + case 'T': i *= (int64) 1000 * 1000000000; ++(*pp); break; /* tera */ + case 'P': i *= (int64) 1000000 * 1000000000; ++(*pp); break; /* peta */ + case 'E': i *= (int64) 1000000000 * 1000000000; ++(*pp); break; /* exa */ } /* done */ @@ -951,8 +953,6 @@ finalize_it: */ void dbgPrintCfSysLineHandlers(void) { - DEFiRet; - cslCmd_t *pCmd; cslCmdHdlr_t *pCmdHdlr; linkedListCookie_t llCookieCmd; @@ -961,11 +961,11 @@ void dbgPrintCfSysLineHandlers(void) dbgprintf("Sytem Line Configuration Commands:\n"); llCookieCmd = NULL; - while((iRet = llGetNextElt(&llCmdList, &llCookieCmd, (void*)&pCmd)) == RS_RET_OK) { + while(llGetNextElt(&llCmdList, &llCookieCmd, (void*)&pCmd) == RS_RET_OK) { llGetKey(llCookieCmd, (void*) &pKey); /* TODO: using the cookie is NOT clean! */ dbgprintf("\tCommand '%s':\n", pKey); llCookieCmdHdlr = NULL; - while((iRet = llGetNextElt(&pCmd->llCmdHdlrs, &llCookieCmdHdlr, (void*)&pCmdHdlr)) == RS_RET_OK) { + while(llGetNextElt(&pCmd->llCmdHdlrs, &llCookieCmdHdlr, (void*)&pCmdHdlr) == RS_RET_OK) { dbgprintf("\t\ttype : %d\n", pCmdHdlr->eType); dbgprintf("\t\tpData: 0x%lx\n", (unsigned long) pCmdHdlr->pData); dbgprintf("\t\tHdlr : 0x%lx\n", (unsigned long) pCmdHdlr->cslCmdHdlr); @@ -974,7 +974,6 @@ void dbgPrintCfSysLineHandlers(void) } } dbgprintf("\n"); - ENDfunc } diff --git a/runtime/conf.c b/runtime/conf.c index ef795237..bbd2147a 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -1015,7 +1015,7 @@ static rsRetVal cflineProcessTagSelector(uchar **pline) if(**pline != '\0' && **pline == '*' && *(*pline+1) == '\0') { dbgprintf("resetting programname filter\n"); if(pDfltProgNameCmp != NULL) { - CHKiRet(rsCStrSetSzStr(pDfltProgNameCmp, NULL)); + rsCStrDestruct(&pDfltProgNameCmp); } } else { dbgprintf("setting programname filter to '%s'\n", *pline); @@ -1084,7 +1084,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction) DEFiRet; modInfo_t *pMod; omodStringRequest_t *pOMSR; - action_t *pAction; + action_t *pAction = NULL; void *pModData; ASSERT(p != NULL); diff --git a/runtime/ctok.c b/runtime/ctok.c index 18ddaed2..99b0e095 100644 --- a/runtime/ctok.c +++ b/runtime/ctok.c @@ -1,4 +1,4 @@ -/* cfgtok.c - helper class to tokenize an input stream - which surprisingly +/* ctok.c - helper class to tokenize an input stream - which surprisingly * currently does not work with streams but with string. But that will * probably change over time ;) This class was originally written to support * the expression module but may evolve when (if) the expression module is @@ -267,7 +267,7 @@ ctokGetVar(ctok_t *pThis, ctok_token_t *pToken) { DEFiRet; uchar c; - cstr_t *pstrVal; + cstr_t *pstrVal = NULL; ISOBJ_TYPE_assert(pThis, ctok); ASSERT(pToken != NULL); diff --git a/runtime/datetime.c b/runtime/datetime.c index eff72f91..593c3d5c 100644 --- a/runtime/datetime.c +++ b/runtime/datetime.c @@ -122,7 +122,7 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds) else t->OffsetMode = '+'; t->OffsetHour = lBias / 3600; - t->OffsetMinute = lBias % 3600; + t->OffsetMinute = (lBias % 3600) / 60; t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */ } diff --git a/runtime/debug.c b/runtime/debug.c index 9b7c2952..81b45d41 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -154,7 +154,9 @@ static pthread_key_t keyCallStack; */ static void dbgMutexCancelCleanupHdlr(void *pmut) { - pthread_mutex_unlock((pthread_mutex_t*) pmut); + int ret; + ret = pthread_mutex_unlock((pthread_mutex_t*) pmut); + assert(ret == 0); } @@ -433,14 +435,13 @@ dbgMutLog_t *dbgMutLogFindHolder(pthread_mutex_t *pmut) static inline void dbgMutexPreLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB, int ln) { dbgMutLog_t *pHolder; - dbgMutLog_t *pLog; char pszBuf[128]; char pszHolderThrdName[64]; char *pszHolder; pthread_mutex_lock(&mutMutLog); pHolder = dbgMutLogFindHolder(pmut); - pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCKWAIT, pFuncDB, ln); + dbgMutLogAddEntry(pmut, MUTOP_LOCKWAIT, pFuncDB, ln); if(pHolder == NULL) pszHolder = "[NONE]"; @@ -481,14 +482,13 @@ static inline void dbgMutexLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB, static inline void dbgMutexPreTryLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB, int ln) { dbgMutLog_t *pHolder; - dbgMutLog_t *pLog; char pszBuf[128]; char pszHolderThrdName[64]; char *pszHolder; pthread_mutex_lock(&mutMutLog); pHolder = dbgMutLogFindHolder(pmut); - pLog = dbgMutLogAddEntry(pmut, MUTOP_TRYLOCK, pFuncDB, ln); + dbgMutLogAddEntry(pmut, MUTOP_TRYLOCK, pFuncDB, ln); if(pHolder == NULL) pszHolder = "[NONE]"; @@ -846,6 +846,7 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg) size_t lenWriteBuf; struct timespec t; uchar *pszObjName = NULL; + int ret; /* we must get the object name before we lock the mutex, because the object * potentially calls back into us. If we locked the mutex, we would deadlock @@ -857,7 +858,8 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg) pszObjName = obj.GetName(pObj); } - pthread_mutex_lock(&mutdbgprint); + ret = pthread_mutex_lock(&mutdbgprint); + assert(ret == 0); /* make sure mutex operation does not fail */ pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint); /* The bWasNL handler does not really work. It works if no thread @@ -941,6 +943,15 @@ dbgoprint(obj_t *pObj, char *fmt, ...) va_start(ap, fmt); lenWriteBuf = vsnprintf(pszWriteBuf, sizeof(pszWriteBuf), fmt, ap); va_end(ap); + if(lenWriteBuf >= sizeof(pszWriteBuf)) { + /* prevent buffer overrruns and garbagge display */ + pszWriteBuf[sizeof(pszWriteBuf) - 5] = '.'; + pszWriteBuf[sizeof(pszWriteBuf) - 4] = '.'; + pszWriteBuf[sizeof(pszWriteBuf) - 3] = '.'; + pszWriteBuf[sizeof(pszWriteBuf) - 2] = '\n'; + pszWriteBuf[sizeof(pszWriteBuf) - 1] = '\0'; + lenWriteBuf = sizeof(pszWriteBuf); + } dbgprint(pObj, pszWriteBuf, lenWriteBuf); } @@ -952,7 +963,7 @@ void dbgprintf(char *fmt, ...) { va_list ap; - char pszWriteBuf[20480]; + char pszWriteBuf[32*1024]; size_t lenWriteBuf; if(!(Debug && debugging_on)) @@ -1060,7 +1071,7 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int } /* when we reach this point, we have a fully-initialized FuncDB! */ - ATOMIC_INC(pFuncDB->nTimesCalled); + PREFER_ATOMIC_INC(pFuncDB->nTimesCalled); if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot)) dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func); if(pThrd->stackPtr >= (int) (sizeof(pThrd->callStack) / sizeof(dbgFuncDB_t*))) { @@ -1280,11 +1291,11 @@ dbgGetRuntimeOptions(void) /* this is earlier in the process than the -d option, as such it * allows us to spit out debug messages from the very beginning. */ - Debug = 1; + Debug = DEBUG_FULL; debugging_on = 1; } else if(!strcasecmp((char*)optname, "debugondemand")) { /* Enables debugging, but turns off debug output */ - Debug = 1; + Debug = DEBUG_ONDEMAND; debugging_on = 1; dbgprintf("Note: debug on demand turned on via configuraton file, " "use USR1 signal to activate.\n"); diff --git a/runtime/debug.h b/runtime/debug.h index dcbfb930..cfdf819c 100644 --- a/runtime/debug.h +++ b/runtime/debug.h @@ -29,6 +29,11 @@ #include <pthread.h> #include "obj-types.h" +/* some settings for various debug modes */ +#define DEBUG_OFF 0 +#define DEBUG_ONDEMAND 1 +#define DEBUG_FULL 2 + /* external static data elements (some time to be replaced) */ extern int Debug; /* debug flag - read-only after startup */ extern int debugging_on; /* read-only, except on sig USR1 */ diff --git a/runtime/glbl.c b/runtime/glbl.c index 7fa61963..59d1fb0f 100644 --- a/runtime/glbl.c +++ b/runtime/glbl.c @@ -64,6 +64,7 @@ static int option_DisallowWarning = 1; /* complain if message from disallowed se static int bDisableDNS = 0; /* don't look up IP addresses of remote messages */ static prop_t *propLocalHostName = NULL;/* our hostname as FQDN - read-only after startup */ static uchar *LocalHostName = NULL;/* our hostname - read-only after startup */ +static uchar *LocalHostNameOverride = NULL;/* user-overridden hostname - read-only after startup */ static uchar *LocalFQDNName = NULL;/* our hostname as FQDN - read-only after startup */ static uchar *LocalDomain; /* our local domain name - read-only after startup */ static char **StripDomains = NULL;/* these domains may be stripped before writing logs - r/o after s.u., never touched by init */ @@ -72,6 +73,9 @@ static uchar *pszDfltNetstrmDrvr = NULL; /* module name of default netstream dri static uchar *pszDfltNetstrmDrvrCAF = NULL; /* default CA file for the netstrm driver */ static uchar *pszDfltNetstrmDrvrKeyFile = NULL; /* default key file for the netstrm driver (server) */ static uchar *pszDfltNetstrmDrvrCertFile = NULL; /* default cert file for the netstrm driver (server) */ +#ifdef USE_UNLIMITED_SELECT +static int iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); /* size of select() bitmask in bytes */ +#endif /* define a macro for the simple properties' set and get functions @@ -104,6 +108,9 @@ SIMP_PROP(DisableDNS, bDisableDNS, int) SIMP_PROP(LocalDomain, LocalDomain, uchar*) SIMP_PROP(StripDomains, StripDomains, char**) SIMP_PROP(LocalHosts, LocalHosts, char**) +#ifdef USE_UNLIMITED_SELECT +SIMP_PROP(FdSetSize, iFdSetSize, int) +#endif SIMP_PROP_SET(LocalFQDNName, LocalFQDNName, uchar*) SIMP_PROP_SET(LocalHostName, LocalHostName, uchar*) @@ -150,14 +157,19 @@ GenerateLocalHostNameProperty(void) prop.Destruct(&propLocalHostName); CHKiRet(prop.Construct(&propLocalHostName)); - if(LocalHostName == NULL) - pszName = (uchar*) "[localhost]"; - else { - if(GetPreserveFQDN() == 1) - pszName = LocalFQDNName; - else - pszName = LocalHostName; + if(LocalHostNameOverride == NULL) { + if(LocalHostName == NULL) + pszName = (uchar*) "[localhost]"; + else { + if(GetPreserveFQDN() == 1) + pszName = LocalFQDNName; + else + pszName = LocalHostName; + } + } else { /* local hostname is overriden via config */ + pszName = LocalHostNameOverride; } + DBGPRINTF("GenerateLocalHostName uses '%s'\n", pszName); CHKiRet(prop.SetString(propLocalHostName, pszName, ustrlen(pszName))); CHKiRet(prop.ConstructFinalize(propLocalHostName)); @@ -261,6 +273,9 @@ CODESTARTobjQueryInterface(glbl) SIMP_PROP(DfltNetstrmDrvrCAF) SIMP_PROP(DfltNetstrmDrvrKeyFile) SIMP_PROP(DfltNetstrmDrvrCertFile) +#ifdef USE_UNLIMITED_SELECT + SIMP_PROP(FdSetSize) +#endif #undef SIMP_PROP finalize_it: ENDobjQueryInterface(glbl) @@ -287,6 +302,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a free(pszDfltNetstrmDrvrCertFile); pszDfltNetstrmDrvrCertFile = NULL; } + if(LocalHostNameOverride != NULL) { + free(LocalHostNameOverride); + LocalHostNameOverride = NULL; + } if(pszWorkDir != NULL) { free(pszWorkDir); pszWorkDir = NULL; @@ -295,6 +314,9 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a bOptimizeUniProc = 1; bHUPisRestart = 0; bPreserveFQDN = 0; +#ifdef USE_UNLIMITED_SELECT + iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); +#endif return RS_RET_OK; } @@ -315,6 +337,7 @@ BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */ CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercafile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCAF, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdriverkeyfile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrKeyFile, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercertfile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCertFile, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"localhostname", 0, eCmdHdlrGetWord, NULL, &LocalHostNameOverride, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"optimizeforuniprocessor", 0, eCmdHdlrBinary, NULL, &bOptimizeUniProc, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"hupisrestart", 0, eCmdHdlrBinary, NULL, &bHUPisRestart, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"preservefqdn", 0, eCmdHdlrBinary, NULL, &bPreserveFQDN, NULL)); @@ -338,6 +361,7 @@ BEGINObjClassExit(glbl, OBJ_IS_CORE_MODULE) /* class, version */ free(pszWorkDir); if(LocalHostName != NULL) free(LocalHostName); + free(LocalHostNameOverride); if(LocalFQDNName != NULL) free(LocalFQDNName); objRelease(prop, CORE_COMPONENT); diff --git a/runtime/glbl.h b/runtime/glbl.h index dcfb6d5f..6a332576 100644 --- a/runtime/glbl.h +++ b/runtime/glbl.h @@ -62,9 +62,18 @@ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */ /* added v3, 2009-06-30 */ rsRetVal (*GenerateLocalHostNameProperty)(void); prop_t* (*GetLocalHostNameProp)(void); + /* note: v4, v5 are already used by more recent versions, so we need to skip them! */ + /* added v6, 2009-11-16 as part of varmojfekoj's "unlimited select()" patch + * Note that it must be always present, otherwise the interface would have different + * versions depending on compile settings, what is not acceptable. + * Use this property with care, it is only truly available if UNLIMITED_SELECT is enabled + * (I did not yet further investigate the details, because that code hopefully can be removed + * at some later stage). + */ + SIMP_PROP(FdSetSize, int) #undef SIMP_PROP ENDinterface(glbl) -#define glblCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */ +#define glblCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */ /* version 2 had PreserveFQDN added - rgerhards, 2008-12-08 */ /* the remaining prototypes */ diff --git a/runtime/msg.c b/runtime/msg.c index 2ce7843a..6335f462 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -748,7 +748,7 @@ BEGINobjDestruct(msg) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(msg) /* DEV Debugging only ! dbgprintf("msgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pThis, pThis->iRefCount - 1); */ # ifdef HAVE_ATOMIC_BUILTINS - currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount); + currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, NULL); # else MsgLock(pThis); currRefCount = --pThis->iRefCount; @@ -800,9 +800,18 @@ CODESTARTobjDestruct(msg) * that we trim too often when the counter wraps. */ static unsigned iTrimCtr = 1; +# ifdef HAVE_ATOMICS if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) { malloc_trim(128*1024); } +# else +static pthread_mutex_t mutTrimCtr = PTHREAD_MUTEX_INITIALIZER; + d_pthread_mutex_lock(&mutTrimCtr); + if(iTrimCtr++ % 100000 == 0) { + malloc_trim(128*1024); + } + d_pthread_mutex_unlock(&mutTrimCtr); +# endif } # endif } else { @@ -888,7 +897,7 @@ msg_t* MsgDup(msg_t* pOld) */ if(pOld->iLenTAG > 0) { if(pOld->iLenTAG < CONF_TAG_BUFSIZE) { - memcpy(pNew->TAG.szBuf, pOld->TAG.szBuf, pOld->iLenTAG); + memcpy(pNew->TAG.szBuf, pOld->TAG.szBuf, pOld->iLenTAG + 1); } else { if((pNew->TAG.pszTAG = srUtilStrDup(pOld->TAG.pszTAG, pOld->iLenTAG)) == NULL) { msgDestruct(&pNew); @@ -1002,7 +1011,7 @@ msg_t *MsgAddRef(msg_t *pM) { assert(pM != NULL); # ifdef HAVE_ATOMIC_BUILTINS - ATOMIC_INC(pM->iRefCount); + ATOMIC_INC(&pM->iRefCount, NULL); # else MsgLock(pM); pM->iRefCount++; @@ -1476,7 +1485,7 @@ rsRetVal MsgSetPROCID(msg_t *pMsg, char* pszPROCID) CHKiRet(cstrConstruct(&pMsg->pCSPROCID)); } /* if we reach this point, we have the object */ - iRet = rsCStrSetSzStr(pMsg->pCSPROCID, (uchar*) pszPROCID); + CHKiRet(rsCStrSetSzStr(pMsg->pCSPROCID, (uchar*) pszPROCID)); CHKiRet(cstrFinalize(pMsg->pCSPROCID)); finalize_it: @@ -2010,6 +2019,8 @@ finalize_it: /* set raw message in message object. Size of message is provided. + * The function makes sure that the stored rawmsg is properly + * terminated by '\0'. * rgerhards, 2009-06-16 */ void MsgSetRawMsg(msg_t *pThis, char* pszRawMsg, size_t lenMsg) @@ -2319,13 +2330,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, *pPropLen = sizeof("**INVALID PROPERTY NAME**") - 1; return UCHAR_CONSTANT("**INVALID PROPERTY NAME**"); } - /* the following line fixes the symptom, but not the root cause -- at least MSG sometimes - * returns a size of one too less. To prevent all troubles, we recalculate the sizes based - * on what we actually got. TODO: remove once root cause is found. - * rgerhards, 2010-03-23 - */ - bufLen = ustrlen(pRes); - /* If we did not receive a template pointer, we are already done... */ if(pTpe == NULL) { @@ -2489,7 +2493,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, bFound = 1; } else { dbgprintf("regex found at offset %d, new offset %d, tries %d\n", - iOffs, iOffs + pmatch[0].rm_eo, iTry); + iOffs, (int) (iOffs + pmatch[0].rm_eo), iTry); iOffs += pmatch[0].rm_eo; ++iTry; } @@ -2823,7 +2827,13 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, } /* check for "." and ".." (note the parenthesis in the if condition!) */ - if((*pRes == '.') && (*(pRes + 1) == '\0' || (*(pRes + 1) == '.' && *(pRes + 2) == '\0'))) { + if(*pRes == '\0') { + if(*pbMustBeFreed == 1) + free(pRes); + pRes = UCHAR_CONSTANT("_"); + bufLen = 1; + *pbMustBeFreed = 0; + } else if((*pRes == '.') && (*(pRes + 1) == '\0' || (*(pRes + 1) == '.' && *(pRes + 2) == '\0'))) { uchar *pTmp = pRes; if(*(pRes + 1) == '\0') @@ -2833,12 +2843,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(*pbMustBeFreed == 1) free(pTmp); *pbMustBeFreed = 0; - } else if(*pRes == '\0') { - if(*pbMustBeFreed == 1) - free(pRes); - pRes = UCHAR_CONSTANT("_"); - bufLen = 1; - *pbMustBeFreed = 0; } } @@ -2877,7 +2881,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, if(pTpe->data.field.options.bCSV) { /* we need to obtain a private copy, as we need to at least add the double quotes */ int iBufLen; - int i; uchar *pBStart; uchar *pDst; uchar *pSrc; @@ -2892,7 +2895,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, RET_OUT_OF_MEMORY; } pSrc = pRes; - i = 0; *pDst++ = '"'; /* starting quote */ while(*pSrc) { if(*pSrc == '"') @@ -3053,7 +3055,7 @@ static rsRetVal msgConstructFinalizer(msg_t *pThis) * rgerhards, 2008-01-14 */ static rsRetVal -MsgGetSeverity(obj_t *pThis, int *piSeverity) +MsgGetSeverity(obj_t_ptr pThis, int *piSeverity) { ISOBJ_TYPE_assert(pThis, msg); assert(piSeverity != NULL); diff --git a/runtime/msg.h b/runtime/msg.h index 3a02365b..a6923d52 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -32,6 +32,7 @@ #include "obj.h" #include "syslogd-types.h" #include "template.h" +#include "atomic.h" /* rgerhards 2004-11-08: The following structure represents a @@ -59,9 +60,9 @@ struct msg { flowControl_t flowCtlType; /**< type of flow control we can apply, for enqueueing, needs not to be persisted because once data has entered the queue, this property is no longer needed. */ pthread_mutex_t mut; + int iRefCount; /* reference counter (0 = unused) */ bool bDoLock; /* use the mutex? */ bool bParseHOSTNAME; /* should the hostname be parsed from the message? */ - short iRefCount; /* reference counter (0 = unused) */ /* background: the hostname is not present on "regular" messages * received via UNIX domain sockets from the same machine. However, * it is available when we have a forwarder (e.g. rfc3195d) using local @@ -130,6 +131,7 @@ struct msg { #define MARK 0x008 /* this message is a mark */ #define NEEDS_PARSING 0x010 /* raw message, must be parsed before processing can be done */ #define PARSE_HOSTNAME 0x020 /* parse the hostname during message parsing */ +#define NO_PRI_IN_RAW 0x100 /* rawmsg does not include a PRI (Solaris!), but PRI is already set correctly in the msg object */ /* function prototypes diff --git a/runtime/net.c b/runtime/net.c index fe6eef5b..8fd8cc2b 100644 --- a/runtime/net.c +++ b/runtime/net.c @@ -502,8 +502,8 @@ static inline void MaskIP4 (struct in_addr *addr, uint8_t bits) { addr->s_addr &= htonl(0xffffffff << (32 - bits)); } -#define SIN(sa) ((struct sockaddr_in *)(sa)) -#define SIN6(sa) ((struct sockaddr_in6 *)(sa)) +#define SIN(sa) ((struct sockaddr_in *)(void*)(sa)) +#define SIN6(sa) ((struct sockaddr_in6 *)(void*)(sa)) /* This is a cancel-safe getnameinfo() version, because we learned @@ -721,7 +721,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS SIN(allowIP.addr.NetAddr)->sin_port = 0; memcpy(&(SIN(allowIP.addr.NetAddr)->sin_addr.s_addr), &(SIN6(res->ai_addr)->sin6_addr.s6_addr32[3]), - sizeof (struct sockaddr_in)); + sizeof (in_addr_t)); if((iRet = AddAllowedSenderEntry(ppRoot, ppLast, &allowIP, iSignificantBits)) @@ -1165,12 +1165,12 @@ void debugListenInfo(int fd, char *type) switch(sa.sa_family) { case PF_INET: szFamily = "IPv4"; - ipv4 = (struct sockaddr_in*) &sa; + ipv4 = (struct sockaddr_in*)(void*) &sa; port = ntohs(ipv4->sin_port); break; case PF_INET6: szFamily = "IPv6"; - ipv6 = (struct sockaddr_in6*) &sa; + ipv6 = (struct sockaddr_in6*)(void*) &sa; port = ntohs(ipv6->sin6_port); break; default: diff --git a/runtime/nsd_gtls.c b/runtime/nsd_gtls.c index fb2e219d..744020e9 100644 --- a/runtime/nsd_gtls.c +++ b/runtime/nsd_gtls.c @@ -112,7 +112,7 @@ readFile(uchar *pszFile, gnutls_datum_t *pBuf) pBuf->data = NULL; - if((fd = open((char*)pszFile, 0)) == -1) { + if((fd = open((char*)pszFile, O_RDONLY)) == -1) { errmsg.LogError(0, RS_RET_FILE_NOT_FOUND, "can not read file '%s'", pszFile); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); @@ -201,10 +201,14 @@ finalize_it: if(iRet != RS_RET_OK) { if(data.data != NULL) free(data.data); - if(pThis->bOurCertIsInit) + if(pThis->bOurCertIsInit) { gnutls_x509_crt_deinit(pThis->ourCert); - if(pThis->bOurKeyIsInit) + pThis->bOurCertIsInit = 0; + } + if(pThis->bOurKeyIsInit) { gnutls_x509_privkey_deinit(pThis->ourKey); + pThis->bOurKeyIsInit = 0; + } } RETiRet; } @@ -1118,6 +1122,7 @@ gtlsEndSess(nsd_gtls_t *pThis) } } gnutls_deinit(pThis->sess); + pThis->bHaveSess = 0; } RETiRet; } @@ -1171,6 +1176,8 @@ CODESTARTobjDestruct(nsd_gtls) gnutls_x509_crt_deinit(pThis->ourCert); if(pThis->bOurKeyIsInit) gnutls_x509_privkey_deinit(pThis->ourKey); + if(pThis->bHaveSess) + gnutls_deinit(pThis->sess); ENDobjDestruct(nsd_gtls) diff --git a/runtime/nsdsel_gtls.c b/runtime/nsdsel_gtls.c index c3a93bee..1a389a00 100644 --- a/runtime/nsdsel_gtls.c +++ b/runtime/nsdsel_gtls.c @@ -76,6 +76,9 @@ Add(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp) if(pNsdGTLS->iMode == 1) { if(waitOp == NSDSEL_RD && gtlsHasRcvInBuffer(pNsdGTLS)) { ++pThis->iBufferRcvReady; + dbgprintf("nsdsel_gtls: data already present in buffer, initiating " + "dummy select %p->iBufferRcvReady=%d\n", + pThis, pThis->iBufferRcvReady); FINALIZE; } if(pNsdGTLS->rtryCall != gtlsRtry_None) { @@ -109,6 +112,7 @@ Select(nsdsel_t *pNsdsel, int *piNumReady) if(pThis->iBufferRcvReady > 0) { /* we still have data ready! */ *piNumReady = pThis->iBufferRcvReady; + dbgprintf("nsdsel_gtls: doing dummy select, data present\n"); } else { iRet = nsdsel_ptcp.Select(pThis->pTcp, piNumReady); } @@ -190,6 +194,9 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady) if(pNsdGTLS->iMode == 1) { if(waitOp == NSDSEL_RD && gtlsHasRcvInBuffer(pNsdGTLS)) { *pbIsReady = 1; + --pThis->iBufferRcvReady; /* one "pseudo-read" less */ + dbgprintf("nsdl_gtls: dummy read, decermenting %p->iBufRcvReady, now %d\n", + pThis, pThis->iBufferRcvReady); FINALIZE; } if(pNsdGTLS->rtryCall != gtlsRtry_None) { @@ -200,6 +207,16 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady) *pbIsReady = 0; FINALIZE; } + /* now we must ensure that we do not fall back to PTCP if we have + * done a "dummy" select. In that case, we know when the predicate + * is not matched here, we do not have data available for this + * socket. -- rgerhards, 2010-11-20 + */ + if(pThis->iBufferRcvReady) { + dbgprintf("nsd_gtls: dummy read, buffer not available for this FD\n"); + *pbIsReady = 0; + FINALIZE; + } } CHKiRet(nsdsel_ptcp.IsReady(pThis->pTcp, pNsdGTLS->pTcp, waitOp, pbIsReady)); diff --git a/runtime/nsdsel_ptcp.c b/runtime/nsdsel_ptcp.c index 41b85e0c..e2cfca7c 100644 --- a/runtime/nsdsel_ptcp.c +++ b/runtime/nsdsel_ptcp.c @@ -36,6 +36,7 @@ #include "errmsg.h" #include "nsd_ptcp.h" #include "nsdsel_ptcp.h" +#include "unlimited_select.h" /* static data */ DEFobjStaticHelpers @@ -47,14 +48,23 @@ DEFobjCurrIf(glbl) */ BEGINobjConstruct(nsdsel_ptcp) /* be sure to specify the object type also in END macro! */ pThis->maxfds = 0; +#ifdef USE_UNLIMITED_SELECT + pThis->pReadfds = calloc(1, glbl.GetFdSetSize()); + pThis->pWritefds = calloc(1, glbl.GetFdSetSize()); +#else FD_ZERO(&pThis->readfds); FD_ZERO(&pThis->writefds); +#endif ENDobjConstruct(nsdsel_ptcp) /* destructor for the nsdsel_ptcp object */ BEGINobjDestruct(nsdsel_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(nsdsel_ptcp) +#ifdef USE_UNLIMITED_SELECT + freeFdSet(pThis->pReadfds); + freeFdSet(pThis->pWritefds); +#endif ENDobjDestruct(nsdsel_ptcp) @@ -65,20 +75,27 @@ Add(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp) DEFiRet; nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel; nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd; +#ifdef USE_UNLIMITED_SELECT + fd_set *pReadfds = pThis->pReadfds; + fd_set *pWritefds = pThis->pWritefds; +#else + fd_set *pReadfds = &pThis->readfds; + fd_set *pWritefds = &pThis->writefds; +#endif ISOBJ_TYPE_assert(pSock, nsd_ptcp); ISOBJ_TYPE_assert(pThis, nsdsel_ptcp); switch(waitOp) { case NSDSEL_RD: - FD_SET(pSock->sock, &pThis->readfds); + FD_SET(pSock->sock, pReadfds); break; case NSDSEL_WR: - FD_SET(pSock->sock, &pThis->writefds); + FD_SET(pSock->sock, pWritefds); break; case NSDSEL_RDWR: - FD_SET(pSock->sock, &pThis->readfds); - FD_SET(pSock->sock, &pThis->writefds); + FD_SET(pSock->sock, pReadfds); + FD_SET(pSock->sock, pWritefds); break; } @@ -98,6 +115,13 @@ Select(nsdsel_t *pNsdsel, int *piNumReady) DEFiRet; int i; nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel; +#ifdef USE_UNLIMITED_SELECT + fd_set *pReadfds = pThis->pReadfds; + fd_set *pWritefds = pThis->pWritefds; +#else + fd_set *pReadfds = &pThis->readfds; + fd_set *pWritefds = &pThis->writefds; +#endif ISOBJ_TYPE_assert(pThis, nsdsel_ptcp); assert(piNumReady != NULL); @@ -106,13 +130,13 @@ Select(nsdsel_t *pNsdsel, int *piNumReady) // TODO: name in dbgprintf! dbgprintf("--------<NSDSEL_PTCP> calling select, active fds (max %d): ", pThis->maxfds); for(i = 0; i <= pThis->maxfds; ++i) - if(FD_ISSET(i, &pThis->readfds) || FD_ISSET(i, &pThis->writefds)) + if(FD_ISSET(i, pReadfds) || FD_ISSET(i, pWritefds)) dbgprintf("%d ", i); dbgprintf("\n"); } /* now do the select */ - *piNumReady = select(pThis->maxfds+1, &pThis->readfds, &pThis->writefds, NULL, NULL); + *piNumReady = select(pThis->maxfds+1, pReadfds, pWritefds, NULL, NULL); RETiRet; } @@ -125,6 +149,13 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady) DEFiRet; nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel; nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd; +#ifdef USE_UNLIMITED_SELECT + fd_set *pReadfds = pThis->pReadfds; + fd_set *pWritefds = pThis->pWritefds; +#else + fd_set *pReadfds = &pThis->readfds; + fd_set *pWritefds = &pThis->writefds; +#endif ISOBJ_TYPE_assert(pThis, nsdsel_ptcp); ISOBJ_TYPE_assert(pSock, nsd_ptcp); @@ -132,14 +163,14 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady) switch(waitOp) { case NSDSEL_RD: - *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds); + *pbIsReady = FD_ISSET(pSock->sock, pReadfds); break; case NSDSEL_WR: - *pbIsReady = FD_ISSET(pSock->sock, &pThis->writefds); + *pbIsReady = FD_ISSET(pSock->sock, pWritefds); break; case NSDSEL_RDWR: - *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds) - | FD_ISSET(pSock->sock, &pThis->writefds); + *pbIsReady = FD_ISSET(pSock->sock, pReadfds) + | FD_ISSET(pSock->sock, pWritefds); break; } diff --git a/runtime/nsdsel_ptcp.h b/runtime/nsdsel_ptcp.h index 6c0c7fa7..f9ec8210 100644 --- a/runtime/nsdsel_ptcp.h +++ b/runtime/nsdsel_ptcp.h @@ -31,8 +31,13 @@ typedef nsdsel_if_t nsdsel_ptcp_if_t; /* we just *implement* this interface */ struct nsdsel_ptcp_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ int maxfds; +#ifdef USE_UNLIMITED_SELECT + fd_set *pReadfds; + fd_set *pWritefds; +#else fd_set readfds; fd_set writefds; +#endif }; /* interface is defined in nsd.h, we just implement it! */ diff --git a/runtime/parser.c b/runtime/parser.c index 466066e7..be9304d7 100644 --- a/runtime/parser.c +++ b/runtime/parser.c @@ -176,7 +176,10 @@ sanitizeMessage(msg_t *pMsg) pszMsg = pMsg->pszRawMsg; lenMsg = pMsg->iLenRawMsg; - /* remove NUL character at end of message (see comment in function header) */ + /* remove NUL character at end of message (see comment in function header) + * Note that we do not need to add a NUL character in this case, because it + * is already present ;) + */ if(pszMsg[lenMsg-1] == '\0') { DBGPRINTF("dropped NUL at very end of message\n"); bUpdatedLen = TRUE; @@ -190,8 +193,9 @@ sanitizeMessage(msg_t *pMsg) */ if(bDropTrailingLF && pszMsg[lenMsg-1] == '\n') { DBGPRINTF("dropped LF at very end of message (DropTrailingLF is set)\n"); - bUpdatedLen = TRUE; lenMsg--; + pszMsg[lenMsg] = '\0'; + bUpdatedLen = TRUE; } /* it is much quicker to sweep over the message and see if it actually @@ -245,6 +249,7 @@ sanitizeMessage(msg_t *pMsg) } ++iSrc; } + pDst[iDst] = '\0'; MsgSetRawMsg(pMsg, (char*)pDst, iDst); /* save sanitized string */ @@ -267,7 +272,6 @@ rsRetVal parseMsg(msg_t *pMsg) uchar *msg; int pri; int lenMsg; - int iPriText; if(pMsg->iLenRawMsg == 0) ABORT_FINALIZE(RS_RET_EMPTY_MSG); @@ -281,24 +285,28 @@ rsRetVal parseMsg(msg_t *pMsg) lenMsg = pMsg->iLenRawMsg; msg = pMsg->pszRawMsg; pri = DEFUPRI; - iPriText = 0; - if(*msg == '<') { - /* while we process the PRI, we also fill the PRI textual representation - * inside the msg object. This may not be ideal from an OOP point of view, - * but it offers us performance... - */ - pri = 0; - while(--lenMsg > 0 && isdigit((int) *++msg)) { - pri = 10 * pri + (*msg - '0'); + if(pMsg->msgFlags & NO_PRI_IN_RAW) { + /* In this case, simply do so as if the pri would be right at top */ + MsgSetAfterPRIOffs(pMsg, 0); + } else { + if(*msg == '<') { + /* while we process the PRI, we also fill the PRI textual representation + * inside the msg object. This may not be ideal from an OOP point of view, + * but it offers us performance... + */ + pri = 0; + while(--lenMsg > 0 && isdigit((int) *++msg)) { + pri = 10 * pri + (*msg - '0'); + } + if(*msg == '>') + ++msg; + if(pri & ~(LOG_FACMASK|LOG_PRIMASK)) + pri = DEFUPRI; } - if(*msg == '>') - ++msg; - if(pri & ~(LOG_FACMASK|LOG_PRIMASK)) - pri = DEFUPRI; + pMsg->iFacility = LOG_FAC(pri); + pMsg->iSeverity = LOG_PRI(pri); + MsgSetAfterPRIOffs(pMsg, msg - pMsg->pszRawMsg); } - pMsg->iFacility = LOG_FAC(pri); - pMsg->iSeverity = LOG_PRI(pri); - MsgSetAfterPRIOffs(pMsg, msg - pMsg->pszRawMsg); /* rger 2005-11-24 (happy thanksgiving!): we now need to check if we have * a traditional syslog message or one formatted according to syslog-protocol. diff --git a/runtime/prop.c b/runtime/prop.c index d188b2ed..7f2a56ff 100644 --- a/runtime/prop.c +++ b/runtime/prop.c @@ -53,6 +53,7 @@ DEFobjStaticHelpers */ BEGINobjConstruct(prop) /* be sure to specify the object type also in END macro! */ pThis->iRefCount = 1; + INIT_ATOMIC_HELPER_MUT(pThis->mutRefCount); ENDobjConstruct(prop) @@ -60,11 +61,12 @@ ENDobjConstruct(prop) BEGINobjDestruct(prop) /* be sure to specify the object type also in END and CODESTART macros! */ int currRefCount; CODESTARTobjDestruct(prop) - currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount); + currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, &pThis->mutRefCount); if(currRefCount == 0) { /* (only) in this case we need to actually destruct the object */ if(pThis->len >= CONF_PROP_BUFSIZE) free(pThis->szVal.psz); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutRefCount); } else { pThis = NULL; /* tell framework NOT to destructing the object! */ } @@ -132,7 +134,7 @@ propConstructFinalize(prop_t __attribute__((unused)) *pThis) */ static rsRetVal AddRef(prop_t *pThis) { - ATOMIC_INC(pThis->iRefCount); + ATOMIC_INC(&pThis->iRefCount, &pThis->mutRefCount); return RS_RET_OK; } diff --git a/runtime/prop.h b/runtime/prop.h index e3519664..07b2ab7e 100644 --- a/runtime/prop.h +++ b/runtime/prop.h @@ -24,6 +24,7 @@ */ #ifndef INCLUDED_PROP_H #define INCLUDED_PROP_H +#include "atomic.h" /* the prop object */ struct prop_s { @@ -34,6 +35,7 @@ struct prop_s { uchar sz[CONF_PROP_BUFSIZE]; } szVal; int len; /* we use int intentionally, otherwise we may get some troubles... */ + DEF_ATOMIC_HELPER_MUT(mutRefCount); }; /* interfaces */ diff --git a/runtime/queue.c b/runtime/queue.c index 9d7a9058..9c7f96d0 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -110,7 +110,7 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { + while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { pThis->qDel(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); @@ -685,7 +685,6 @@ qqueueHaveQIF(qqueue_t *pThis) { DEFiRet; uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; struct stat stat_buf; ISOBJ_TYPE_assert(pThis, qqueue); @@ -694,8 +693,8 @@ qqueueHaveQIF(qqueue_t *pThis) ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); + snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", + (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); /* check if the file exists */ if(stat((char*) pszQIFNam, &stat_buf) == -1) { @@ -1028,7 +1027,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) CHKiRet(pThis->qAdd(pThis, pUsr)); if(pThis->qType != QUEUETYPE_DIRECT) { - ATOMIC_INC(pThis->iQueueSize); + ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize); } @@ -1057,7 +1056,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr) iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); - ATOMIC_DEC(pThis->iQueueSize); + ATOMIC_DEC(&pThis->iQueueSize, &pThis->mutQueueSize); } dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", @@ -1345,6 +1344,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread break; } + INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP RETiRet; @@ -2065,6 +2066,8 @@ CODESTARTobjDestruct(qqueue) pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + /* type-specific destructor */ iRet = pThis->qDestruct(pThis); @@ -2206,6 +2209,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } + dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); } /* and finally enqueue the message */ @@ -2292,6 +2296,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } + dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); } /* and finally enqueue the message */ @@ -2315,6 +2320,7 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) { int iCancelStateSave; int i; + rsRetVal localRet; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); @@ -2326,8 +2332,9 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) } for(i = 0 ; i < pMultiSub->nElem ; ++i) { -dbgprintf("queueMultiEnq: %d\n", i); - CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); + localRet = doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]); + if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) + ABORT_FINALIZE(localRet); } finalize_it: diff --git a/runtime/queue.h b/runtime/queue.h index 1d82d8d9..aafdaa45 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -160,6 +160,7 @@ typedef struct queue_s { strm_t *pRead; /* current file to be read */ } disk; } tVars; + DEF_ATOMIC_HELPER_MUT(mutQueueSize); } qqueue_t; /* some symbolic constants for easier reference */ diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index 443d0f41..c209ae30 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -80,6 +80,7 @@ #include "prop.h" #include "rule.h" #include "ruleset.h" +#include "atomic.h" /* forward definitions */ static rsRetVal dfltErrLogger(int, uchar *errMsg); @@ -215,6 +216,7 @@ rsrtExit(void) glblClassExit(); rulesetClassExit(); ruleClassExit(); + objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */ } diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 8979893a..03f5120d 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -61,6 +61,7 @@ /* define some base data types */ + typedef unsigned char uchar;/* get rid of the unhandy "unsigned char" */ typedef struct thrdInfo thrdInfo_t; typedef struct obj_s obj_t; @@ -78,8 +79,6 @@ typedef struct nsd_gsspi_s nsd_gsspi_t; typedef struct nsd_nss_s nsd_nss_t; typedef struct nsdsel_ptcp_s nsdsel_ptcp_t; typedef struct nsdsel_gtls_s nsdsel_gtls_t; -typedef obj_t nsd_t; -typedef obj_t nsdsel_t; typedef struct msg msg_t; typedef struct prop_s prop_t; typedef struct interface_s interface_t; @@ -98,6 +97,21 @@ typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript f typedef struct tcpLstnPortList_s tcpLstnPortList_t; // TODO: rename? typedef struct strmLstnPortList_s strmLstnPortList_t; // TODO: rename? +/* under Solaris (actually only SPARC), we need to redefine some types + * to be void, so that we get void* pointers. Otherwise, we will see + * alignment errors. + */ +#ifdef OS_SOLARIS + typedef void * obj_t_ptr; + typedef void nsd_t; + typedef void nsdsel_t; +#else + typedef obj_t *obj_t_ptr; + typedef obj_t nsd_t; + typedef obj_t nsdsel_t; +#endif + + /* some universal 64 bit define... */ typedef long long int64; typedef long long unsigned uint64; @@ -360,6 +374,15 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_VAR_NOT_FOUND = -2142, /**< variable not found */ RS_RET_EMPTY_MSG = -2143, /**< provided (raw) MSG is empty */ RS_RET_PEER_CLOSED_CONN = -2144, /**< remote peer closed connection (information, no error) */ + RS_RET_ERR_OPEN_KLOG = -2145, /**< error opening the kernel log socket (primarily solaris) */ + RS_RET_ERR_AQ_CONLOG = -2146, /**< error aquiring console log (on solaris) */ + RS_RET_ERR_DOOR = -2147, /**< some problems with handling the Solaris door functionality */ + RS_RET_NO_SOCK_CONFIGURED = -2166, /**< no socket (name) was configured where one is required */ + RS_RET_NO_LSTN_DEFINED = -2172, /**< no listener defined (e.g. inside an input module) */ + RS_RET_EPOLL_CR_FAILED = -2173, /**< epoll_create() failed */ + RS_RET_EPOLL_CTL_FAILED = -2174, /**< epoll_ctl() failed */ + RS_RET_INTERNAL_ERROR = -2175, /**< rsyslogd internal error, unexpected code path reached */ + RS_RET_OUTDATED_STMT = -2184, /**< some outdated statement/functionality is being used in conf file */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ diff --git a/runtime/ruleset.c b/runtime/ruleset.c index d98b4217..af61f24f 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -348,6 +348,7 @@ destructAllActions(void) CHKiRet(llDestroy(&llRulesets)); CHKiRet(llInit(&llRulesets, rulesetDestructForLinkedList, keyDestruct, strcasecmp)); + pDfltRuleset = NULL; finalize_it: RETiRet; diff --git a/runtime/stream.c b/runtime/stream.c index e8805a40..44b24ee2 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -60,7 +60,14 @@ # include <sys/prctl.h> #endif -#define inline +/* some platforms do not have large file support :( */ +#ifndef O_LARGEFILE +# define O_LARGEFILE 0 +#endif +#ifndef HAVE_LSEEK64 + typedef off_t off64_t; +# define lseek64(fd, offset, whence) lseek(fd, offset, whence) +#endif /* static data */ DEFobjStaticHelpers @@ -214,8 +221,9 @@ doPhysOpen(strm_t *pThis) iFlags |= O_NONBLOCK; } - pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); - DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName, pThis->fd, pThis->tOpenMode); + pThis->fd = open((char*)pThis->pszCurrFName, iFlags | O_LARGEFILE, pThis->tOpenMode); + DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName, + pThis->fd, (int) pThis->tOpenMode); if(pThis->fd == -1) { char errStr[1024]; int err = errno; @@ -251,6 +259,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); + pThis->pszCurrFName = NULL; /* used to prevent mem leak in case of error */ if(pThis->pszFName == NULL) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -282,6 +291,16 @@ static rsRetVal strmOpenFile(strm_t *pThis) (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd); finalize_it: + if(iRet != RS_RET_OK) { + if(pThis->pszCurrFName != NULL) { + free(pThis->pszCurrFName); + pThis->pszCurrFName = NULL; /* just to prevent mis-adressing down the road... */ + } + if(pThis->fd != -1) { + close(pThis->fd); + pThis->fd = -1; + } + } RETiRet; } @@ -393,6 +412,12 @@ finalize_it: * If we are monitoring a file, someone may have rotated it. In this case, we * also need to close it and reopen it under the same name. * rgerhards, 2008-02-13 + * The previous code also did a check for file truncation, in which case the + * file was considered rewritten. However, this potential border case turned + * out to be a big trouble spot on busy systems. It caused massive message + * duplication (I guess stat() can return a too-low number under some + * circumstances). So starting as of now, we only check the inode number and + * a file change is detected only if the inode changes. -- rgerhards, 2011-01-10 */ static rsRetVal strmHandleEOFMonitor(strm_t *pThis) @@ -402,23 +427,18 @@ strmHandleEOFMonitor(strm_t *pThis) struct stat statName; ISOBJ_TYPE_assert(pThis, strm); - /* find inodes of both current descriptor as well as file now in file - * system. If they are different, the file has been rotated (or - * otherwise rewritten). We also check the size, because the inode - * does not change if the file is truncated (this, BTW, is also a case - * where we actually loose log lines, because we can not do anything - * against truncation...). We do NOT rely on the time of last - * modificaton because that may not be available under all - * circumstances. -- rgerhards, 2008-02-13 - */ if(fstat(pThis->fd, &statOpen) == -1) ABORT_FINALIZE(RS_RET_IO_ERROR); if(stat((char*) pThis->pszCurrFName, &statName) == -1) ABORT_FINALIZE(RS_RET_IO_ERROR); - if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) { + DBGPRINTF("stream checking for file change on '%s', inode %u/%u", + pThis->pszCurrFName, (unsigned) statOpen.st_ino, + (unsigned) statName.st_ino); + if(statOpen.st_ino == statName.st_ino) { ABORT_FINALIZE(RS_RET_EOF); } else { /* we had a file change! */ + DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName); CHKiRet(strmCloseFile(pThis)); CHKiRet(strmOpenFile(pThis)); } @@ -1188,7 +1208,7 @@ finalize_it: * is invalidated. * rgerhards, 2008-01-12 */ -static rsRetVal strmSeek(strm_t *pThis, off_t offs) +static rsRetVal strmSeek(strm_t *pThis, off64_t offs) { DEFiRet; @@ -1198,9 +1218,9 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs) strmOpenFile(pThis); else strmFlushInternal(pThis); - int i; - DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs); - i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error! + long long i; + DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs); + i = lseek64(pThis->fd, offs, SEEK_SET); // TODO: check error! pThis->iCurrOffs = offs; /* we are now at *this* offset */ pThis->iBufPtr = 0; /* buffer invalidated */ @@ -1477,7 +1497,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) { DEFiRet; int i; - long l; + int64 l; ISOBJ_TYPE_assert(pThis, strm); ISOBJ_TYPE_assert(pStrm, strm); @@ -1499,8 +1519,8 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) i = pThis->tOpenMode; objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i); - l = (long) pThis->iCurrOffs; - objSerializeSCALAR_VAR(pStrm, iCurrOffs, LONG, l); + l = pThis->iCurrOffs; + objSerializeSCALAR_VAR(pStrm, iCurrOffs, INT64, l); CHKiRet(obj.EndSerialize(pStrm)); diff --git a/runtime/stringbuf.c b/runtime/stringbuf.c index 93995b38..8b2fe455 100644 --- a/runtime/stringbuf.c +++ b/runtime/stringbuf.c @@ -156,7 +156,7 @@ rsRetVal rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded) { uchar *pNewBuf; - unsigned short iNewSize; + size_t iNewSize; DEFiRet; /* first compute the new size needed */ diff --git a/runtime/strmsrv.c b/runtime/strmsrv.c index 3dc53a97..8cebf810 100644 --- a/runtime/strmsrv.c +++ b/runtime/strmsrv.c @@ -520,6 +520,7 @@ Run(strmsrv_t *pThis) strms_sess_t *pNewSess; nssel_t *pSel; ssize_t iRcvd; + rsRetVal localRet; ISOBJ_TYPE_assert(pThis, strmsrv); @@ -579,11 +580,12 @@ Run(strmsrv_t *pThis) break; case RS_RET_OK: /* valid data received, process it! */ - if(strms_sess.DataRcvd(pThis->pSessions[iSTRMSess], buf, iRcvd) != RS_RET_OK) { + localRet = strms_sess.DataRcvd(pThis->pSessions[iSTRMSess], buf, iRcvd); + if(localRet != RS_RET_OK) { /* in this case, something went awfully wrong. * We are instructed to terminate the session. */ - errmsg.LogError(0, NO_ERRCODE, "Tearing down STRM Session %d - see " + errmsg.LogError(0, localRet, "Tearing down STRM Session %d - see " "previous messages for reason(s)\n", iSTRMSess); pThis->pOnErrClose(pThis->pSessions[iSTRMSess]); strms_sess.Destruct(&pThis->pSessions[iSTRMSess]); diff --git a/runtime/unlimited_select.h b/runtime/unlimited_select.h new file mode 100644 index 00000000..32dadc03 --- /dev/null +++ b/runtime/unlimited_select.h @@ -0,0 +1,45 @@ +/* unlimited_select.h + * Tweak the macros for accessing fd_set so that the select() syscall + * won't be limited to a particular number of file descriptors. + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ + +#ifndef UNLIMITED_SELECT_H_INCLUDED + +#include <string.h> +#include <stdlib.h> +#include <sys/select.h> +#include "glbl.h" + +#ifdef USE_UNLIMITED_SELECT +# undef FD_ZERO +# define FD_ZERO(set) memset((set), 0, glbl.GetFdSetSize()); +#endif + +#ifdef USE_UNLIMITED_SELECT +void freeFdSet(fd_set *p) { + free(p); +} +#else +# define freeFdSet(x) +#endif + +#endif /* #ifndef UNLIMITED_SELECT_H_INCLUDED */ diff --git a/runtime/vm.c b/runtime/vm.c index aaf3c879..0ed174d1 100644 --- a/runtime/vm.c +++ b/runtime/vm.c @@ -34,6 +34,7 @@ #include "vm.h" #include "sysvar.h" #include "stringbuf.h" +#include "unicode-helper.h" /* static data */ DEFobjStaticHelpers @@ -41,6 +42,8 @@ DEFobjCurrIf(vmstk) DEFobjCurrIf(var) DEFobjCurrIf(sysvar) +static pthread_mutex_t mutGetenv; /* we need to make this global because otherwise we can not guarantee proper init! */ + /* ------------------------------ function registry code and structures ------------------------------ */ /* we maintain a registry of known functions */ @@ -542,6 +545,42 @@ finalize_it: } +/* The getenv function. Note that we guard the OS call by a mutex, as that + * function is not guaranteed to be thread-safe. This implementation here is far from + * being optimal, at least we should cache the result. This is left TODO for + * a later revision. + * rgerhards, 2009-11-03 + */ +static rsRetVal +rsf_getenv(vmstk_t *pStk, int numOperands) +{ + DEFiRet; + var_t *operand1; + char *envResult; + cstr_t *pCstr; + + if(numOperands != 1) + ABORT_FINALIZE(RS_RET_INVLD_NBR_ARGUMENTS); + + /* pop args and do operaton (trivial case here...) */ + vmstk.PopString(pStk, &operand1); + d_pthread_mutex_lock(&mutGetenv); + envResult = getenv((char*) rsCStrGetSzStr(operand1->val.pStr)); + DBGPRINTF("rsf_getenv(): envvar '%s', return '%s'\n", rsCStrGetSzStr(operand1->val.pStr), + envResult == NULL ? "(NULL)" : envResult); + iRet = rsCStrConstructFromszStr(&pCstr, (envResult == NULL) ? UCHAR_CONSTANT("") : (uchar*)envResult); + d_pthread_mutex_unlock(&mutGetenv); + if(iRet != RS_RET_OK) + FINALIZE; /* need to do this after mutex is unlocked! */ + + /* Store result and cleanup */ + var.SetString(operand1, pCstr); + vmstk.Push(pStk, operand1); +finalize_it: + RETiRet; +} + + /* The "tolower" function, which converts its sole argument to lower case. * Quite honestly, currently this is primarily a test driver for me... * rgerhards, 2009-04-06 @@ -759,6 +798,8 @@ BEGINObjClassExit(vm, OBJ_IS_CORE_MODULE) /* class, version */ objRelease(sysvar, CORE_COMPONENT); objRelease(var, CORE_COMPONENT); objRelease(vmstk, CORE_COMPONENT); + + pthread_mutex_destroy(&mutGetenv); ENDObjClassExit(vm) @@ -779,6 +820,9 @@ BEGINObjClassInit(vm, 1, OBJ_IS_CORE_MODULE) /* class, version */ /* register built-in functions // TODO: move to its own module */ CHKiRet(rsfrAddFunction((uchar*)"strlen", rsf_strlen)); CHKiRet(rsfrAddFunction((uchar*)"tolower", rsf_tolower)); + CHKiRet(rsfrAddFunction((uchar*)"getenv", rsf_getenv)); + + pthread_mutex_init(&mutGetenv, NULL); ENDObjClassInit(vm) diff --git a/runtime/wti.c b/runtime/wti.c index abdf4add..90bb14ed 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -146,7 +146,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) break; } /* apply the new state */ - unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd); + unsigned val = ATOMIC_CAS_VAL((int*)&pThis->tCurrCmd, tCurrCmd, tCmd, &pThis->mutCurrCmd); if(val != tCurrCmd) { DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd); } @@ -178,7 +178,7 @@ wtiCancelThrd(wti_t *pThis) dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd); pthread_cancel(pThis->thrdID); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ + wtpSetThrdStateChanged(pThis->pWtp, 1); /* indicate change, so harverster will be called */ } d_pthread_mutex_unlock(&pThis->mut); @@ -209,6 +209,7 @@ CODESTARTobjDestruct(wti) /* actual destruction */ pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd); free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -219,6 +220,7 @@ ENDobjDestruct(wti) BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ pthread_cond_init(&pThis->condExitDone, NULL); pthread_mutex_init(&pThis->mut, NULL); + INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd); ENDobjConstruct(wti) @@ -326,8 +328,7 @@ wtiWorkerCancelCleanup(void *arg) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ - ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ + wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pWtp->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -414,7 +415,7 @@ wtiWorker(wti_t *pThis) pWtp->pfOnWorkerShutdown(pWtp->pUsr); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ + wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/wti.h b/runtime/wti.h index 72653b15..d81672f3 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -39,6 +39,7 @@ typedef struct wti_s { pthread_mutex_t mut; bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */ uchar *pszDbgHdr; /* header string for debug messages */ + DEF_ATOMIC_HELPER_MUT(mutCurrCmd); } wti_t; /* some symbolic constants for easier reference */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 0c66dd11..b4fd2e04 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -97,6 +97,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pThis->pfOnWorkerCancel = NotImplementedDummy; pThis->pfOnWorkerStartup = NotImplementedDummy; pThis->pfOnWorkerShutdown = NotImplementedDummy; + INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); ENDobjConstruct(wtp) @@ -153,6 +154,7 @@ CODESTARTobjDestruct(wtp) pthread_cond_destroy(&pThis->condThrdTrm); pthread_mutex_destroy(&pThis->mut); pthread_mutex_destroy(&pThis->mutThrdShutdwn); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); free(pThis->pszDbgHdr); ENDobjDestruct(wtp) @@ -186,6 +188,20 @@ wtpWakeupAllWrkr(wtp_t *pThis) } +/* set the bThrdStateChanged in an atomic way. Note that + * val may only be 0 or 1. + */ +void +wtpSetThrdStateChanged(wtp_t *pThis, int val) +{ + if(val == 0) { + ATOMIC_STORE_0_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged); + } else { + ATOMIC_STORE_1_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged); + } +} + + /* check if we had any worker thread changes and, if so, act * on them. At a minimum, terminated threads are harvested (joined). * This function MUST NEVER block on the queue mutex! @@ -216,7 +232,7 @@ wtpProcessThrdChanges(wtp_t *pThis) */ do { /* reset the change marker */ - ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged); + wtpSetThrdStateChanged(pThis, 0); /* go through all threads */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX); @@ -421,13 +437,15 @@ wtpWrkrExecCancelCleanup(void *arg) static void * wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */ { - uchar *pszDbgHdr; - uchar thrdName[32] = "rs:"; DEFiRet; DEFVARS_mutexProtection; wti_t *pWti = (wti_t*) arg; wtp_t *pThis; sigset_t sigSet; +# if HAVE_PRCTL && defined PR_SET_NAME + uchar *pszDbgHdr; + uchar thrdName[32] = "rs:"; +# endif ISOBJ_TYPE_assert(pWti, wti); pThis = pWti->pWtp; @@ -460,7 +478,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in do { END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */ + wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1); diff --git a/runtime/wtp.h b/runtime/wtp.h index 1ce171cc..640c3320 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -26,6 +26,7 @@ #include <pthread.h> #include "obj.h" +#include "atomic.h" /* commands and states for worker threads. */ typedef enum { @@ -79,6 +80,7 @@ typedef struct wtp_s { rsRetVal (*pfOnWorkerShutdown)(void *pUsr); /* end user objects */ uchar *pszDbgHdr; /* header string for debug messages */ + DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged); } wtp_t; /* some symbolic constants for easier reference */ @@ -100,6 +102,7 @@ rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp); rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout); int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex); +void wtpSetThrdStateChanged(wtp_t *pThis, int val); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); |