diff options
-rw-r--r-- | ChangeLog | 69 | ||||
-rw-r--r-- | action.c | 8 | ||||
-rw-r--r-- | configure.ac | 27 | ||||
-rw-r--r-- | dirty.h | 2 | ||||
-rw-r--r-- | grammar/rainerscript.c | 24 | ||||
-rw-r--r-- | grammar/rainerscript.h | 1 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 10 | ||||
-rw-r--r-- | plugins/omelasticsearch/Makefile.am | 2 | ||||
-rw-r--r-- | runtime/datetime.c | 7 | ||||
-rw-r--r-- | runtime/msg.c | 221 | ||||
-rw-r--r-- | runtime/msg.h | 12 | ||||
-rw-r--r-- | runtime/obj.c | 4 | ||||
-rw-r--r-- | runtime/queue.c | 43 | ||||
-rw-r--r-- | runtime/queue.h | 2 | ||||
-rw-r--r-- | runtime/rsconf.c | 8 | ||||
-rw-r--r-- | runtime/ruleset.c | 25 | ||||
-rw-r--r-- | template.c | 8 | ||||
-rwxr-xr-x | tests/queue-persist-drvr.sh | 1 | ||||
-rw-r--r-- | tools/pidfile.c | 6 | ||||
-rw-r--r-- | tools/syslogd.c | 134 |
20 files changed, 351 insertions, 263 deletions
@@ -21,6 +21,21 @@ Version 7.3.5 [devel] 2012-11-?? - bugfix(kind of): script optimizer did not work for complex boolean expressions - doc bugfix: corrections and improvements in mmnormalize html doc page +- bugfix: some message properties could be garbled due to race condition + This happened only on very high volume systems, if the same message was + being processed by two different actions. This was a regression caused + by the new config processor, which did no longer properly enable msg + locking in multithreaded cases. The bugfix is actually a refactoring of + the msg locking code - we no longer do unlocked operations, as the use + case for it has mostly gone away. It is potentially possible only at + very low-end systems, and there the small additional overhead of doing + the locking does not really hurt. Instead, the removal of that + capability can actually slightly improve performance in common cases, + as the code path is smaller and requires slightly less memory writes. + That probably outperforms the extra locking overhead (which in the + low-end case always happens in user space, without need for kernel + support as we can always directly aquire the lock - there is no + contention at all). --------------------------------------------------------------------------- Version 7.3.4 [devel] 2012-11-23 - further (and rather drastically) improved disk queue performance @@ -96,9 +111,26 @@ Version 7.3.0 [devel] 2012-10-09 This was achieved by somewhat reducing the robustness of the zip archive. This is controlled by the new action parameter "VeryReliableZip". ---------------------------------------------------------------------------- -Version 7.2.4 [v7-stable] 2012-10-?? +Version 7.2.5 [v7-stable] 2013-01-?? +- build system cleanup (thanks to Michael Biebl for this!) +- bugfix: omelasticsearch did not properly compile on some platforms + due to missing libmath. Thanks to Michael Biebl for the fix +- bugfix: invalid DST handling under Solaris + Thanks to Scott Severtson for the patch. +---------------------------------------------------------------------------- +Version 7.2.4 [v7-stable] 2012-12-07 +- enhance: permit RFC3339 timestamp in local log socket messages + Thanks to Sebastien Ponce for the patch. - imklog: added ParseKernelTimestamp parameter (import from 5.10.2) Thanks to Marius Tomaschewski for the patch. +- fix missing functionality: ruleset(){} could not specify ruleset queue + The "queue.xxx" parameter set was not supported, and legacy ruleset + config statements did not work (by intention). The fix introduces the + "queue.xxx" parameter set. It has some regression potential, but only + for the new functionality. Note that using that interface it is possible + to specify duplicate queue file names, which will cause trouble. This + will be solved in v7.3, because there is a too-large regression + potential for the v7.2 stable branch. - imklog: added KeepKernelTimestamp parameter (import from 5.10.2) Thanks to Marius Tomaschewski for the patch. - bugfix: imklog mistakenly took kernel timestamp subseconds as nanoseconds @@ -112,6 +144,21 @@ Version 7.2.4 [v7-stable] 2012-10-?? That invalid error message could be quite misleading and actually stop people from addressing the real problem (aka "go nuts" ;)) - bugfix: template "type" parameter is mandatory (but was not) +- bugfix: some message properties could be garbled due to race condition + This happened only on very high volume systems, if the same message was + being processed by two different actions. This was a regression caused + by the new config processor, which did no longer properly enable msg + locking in multithreaded cases. The bugfix is actually a refactoring of + the msg locking code - we no longer do unlocked operations, as the use + case for it has mostly gone away. It is potentially possible only at + very low-end systems, and there the small additional overhead of doing + the locking does not really hurt. Instead, the removal of that + capability can actually slightly improve performance in common cases, + as the code path is smaller and requires slightly less memory writes. + That probably outperforms the extra locking overhead (which in the + low-end case always happens in user space, without need for kernel + support as we can always directly aquire the lock - there is no + contention at all). ---------------------------------------------------------------------------- Version 7.2.3 [v7-stable] 2012-10-21 - regression fix: rsyslogd terminated when wild-card $IncludeConfig did not @@ -206,7 +253,6 @@ Version 7.1.12 [beta] 2012-10-18 This happened only under some circumstances. Thanks to Marius Tomaschewski and Florian Piekert for their help in solving this issue. ---------------------------------------------------------------------------- ->>>>>>> b151584d0929759284c0fb0399709e5ca0e29d60 Version 7.1.11 [beta] 2012-10-16 - bugfix: imuxsock truncated head of received message This happened only under some circumstances. Thanks to Marius @@ -362,6 +408,23 @@ Version 6.6.1 [v6-stable] 2012-10-?? - bugfix: hostname set in rsyslog.conf was not picked up until HUP which could also mean "never" or "not for a very long time". Thanks to oxpa for providing analysis and a patch +- bugfix: some message properties could be garbled due to race condition + This happened only on very high volume systems, if the same message was + being processed by two different actions. This was a regression caused + by the new config processor, which did no longer properly enable msg + locking in multithreaded cases. The bugfix is actually a refactoring of + the msg locking code - we no longer do unlocked operations, as the use + case for it has mostly gone away. It is potentially possible only at + very low-end systems, and there the small additional overhead of doing + the locking does not really hurt. Instead, the removal of that + capability can actually slightly improve performance in common cases, + as the code path is smaller and requires slightly less memory writes. + That probably outperforms the extra locking overhead (which in the + low-end case always happens in user space, without need for kernel + support as we can always directly aquire the lock - there is no + contention at all). +- bugfix: invalid DST handling under Solaris + Thanks to Scott Severtson for the patch. --------------------------------------------------------------------------- Version 6.6.0 [v6-stable] 2012-10-22 This starts a new stable branch, based on the 6.5.x series, plus: @@ -1035,6 +1098,8 @@ Version 5.10.2 [V5-STABLE], 201?-??-?? ... actually, they are microseconds. So the fractional part of the timestamp was not properly formatted. Thanks to Marius Tomaschewski for the bug report and the patch idea. +- bugfix: invalid DST handling under Solaris + Thanks to Scott Severtson for the patch. --------------------------------------------------------------------------- Version 5.10.1 [V5-STABLE], 2012-10-17 - bugfix: imuxsock and imklog truncated head of received message @@ -421,14 +421,6 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) pThis->submitToActQ = doSubmitToActionQBatch; } - /* we need to make a safety check: if the queue is NOT in direct mode, a single - * message object may be accessed by multiple threads. As such, we need to enable - * msg object thread safety in this case (this costs a bit performance and thus - * is not enabled by default. -- rgerhards, 2008-02-20 - */ - if(cs.ActionQueType != QUEUETYPE_DIRECT) - MsgEnableThreadSafety(); - /* create queue */ /* action queues always (for now) have just one worker. This may change when * we begin to implement an interface the enable output modules to request diff --git a/configure.ac b/configure.ac index 6646f8ab..f2fffc34 100644 --- a/configure.ac +++ b/configure.ac @@ -243,22 +243,7 @@ AM_CONDITIONAL(ENABLE_GSSAPI, test x$enable_gssapi_krb5 = xyes) # multithreading via pthreads -AC_ARG_ENABLE(pthreads, - [AS_HELP_STRING([--enable-pthreads],[Enable multithreading via pthreads @<:@default=yes@:>@])], - [case "${enableval}" in - yes) enable_pthreads="yes" ;; - no) enable_pthreads="no" ;; - *) AC_MSG_ERROR(bad value ${enableval} for --enable-pthreads) ;; - esac], - [enable_pthreads=yes] -) - -if test "x$enable_pthreads" = "xno"; then - AC_MSG_ERROR(rsyslog v3+ does no longer support single threading mode -- use a previous version for that); -fi - -if test "x$enable_pthreads" != "xno"; then - AC_CHECK_HEADERS( +AC_CHECK_HEADERS( [pthread.h], [ AC_CHECK_LIB( @@ -279,8 +264,7 @@ if test "x$enable_pthreads" != "xno"; then ) ], [AC_MSG_FAILURE([pthread is missing])] - ) -fi +) AC_CHECK_FUNCS( [pthread_setschedparam], @@ -711,13 +695,13 @@ AC_SUBST(SNMP_LIBS) AC_ARG_ENABLE(uuid, [AS_HELP_STRING([--enable-uuid],[Enable support for uuid generation @<:@default=yes@:>@])], [case "${enableval}" in - yes) enable_elasticsearch="yes" ;; - no) enable_elasticsearch="no" ;; + yes) enable_uuid="yes" ;; + no) enable_uuid="no" ;; *) AC_MSG_ERROR(bad value ${enableval} for --enable-uuid) ;; esac], [enable_uuid=yes] ) -if test "x$enable_elasticsearch" = "xyes"; then +if test "x$enable_uuid" = "xyes"; then PKG_CHECK_MODULES([LIBUUID], [uuid]) AC_DEFINE(USE_LIBUUID, 1, [Define if you want to enable libuuid support]) fi @@ -736,6 +720,7 @@ AC_ARG_ENABLE(elasticsearch, ) if test "x$enable_elasticsearch" = "xyes"; then PKG_CHECK_MODULES([CURL], [libcurl]) + LT_LIB_M fi AM_CONDITIONAL(ENABLE_ELASTICSEARCH, test x$enable_elasticsearch = xyes) @@ -35,7 +35,7 @@ rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub); rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset); rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ -rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName); +rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName, struct cnfparamvals *queueParams); extern int MarkInterval; extern qqueue_t *pMsgQueue; /* the main message queue */ diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c index 8f20bfcd..bacf851b 100644 --- a/grammar/rainerscript.c +++ b/grammar/rainerscript.c @@ -953,6 +953,30 @@ nvlstGetParams(struct nvlst *lst, struct cnfparamblk *params, } +/* check if at least one cnfparamval is actually set + * returns 1 if so, 0 otherwise + */ +int +cnfparamvalsIsSet(struct cnfparamblk *params, struct cnfparamvals *vals) +{ + int i; + + if(vals == NULL) + return 0; + if(params->version != CNFPARAMBLK_VERSION) { + dbgprintf("nvlstGetParams: invalid param block version " + "%d, expected %d\n", + params->version, CNFPARAMBLK_VERSION); + return 0; + } + for(i = 0 ; i < params->nParams ; ++i) { + if(vals[i].bUsed) + return 1; + } + return 0; +} + + void cnfparamsPrint(struct cnfparamblk *params, struct cnfparamvals *vals) { diff --git a/grammar/rainerscript.h b/grammar/rainerscript.h index 5cfce795..59ce53f3 100644 --- a/grammar/rainerscript.h +++ b/grammar/rainerscript.h @@ -313,6 +313,7 @@ int cnfparamGetIdx(struct cnfparamblk *params, char *name); struct cnfparamvals* nvlstGetParams(struct nvlst *lst, struct cnfparamblk *params, struct cnfparamvals *vals); void cnfparamsPrint(struct cnfparamblk *params, struct cnfparamvals *vals); +int cnfparamvalsIsSet(struct cnfparamblk *params, struct cnfparamvals *vals); void varDelete(struct var *v); void cnfparamvalsDestruct(struct cnfparamvals *paramvals, struct cnfparamblk *blk); struct cnfstmt * cnfstmtNew(unsigned s_type); diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index d0055b7a..66a1c648 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -859,15 +859,19 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim /* in this case, we still need to find out if we have a valid * datestamp or not .. and advance the parse pointer accordingly. */ - datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg); + if (datetime.ParseTIMESTAMP3339(&dummyTS, &parse, &lenMsg) != RS_RET_OK) { + datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg); + } } else { - if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { + if(datetime.ParseTIMESTAMP3339(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK && + datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { DBGPRINTF("we have a problem, invalid timestamp in msg!\n"); } } } else { /* if we pulled the time from the system, we need to update the message text */ uchar *tmpParse = parse; /* just to check correctness of TS */ - if(datetime.ParseTIMESTAMP3164(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK) { + if(datetime.ParseTIMESTAMP3339(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK || + datetime.ParseTIMESTAMP3164(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK) { /* We modify the message only if it contained a valid timestamp, * otherwise we do not touch it at all. */ datetime.formatTimestamp3164(&st, (char*)parse, 0); diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am index 059bdf8a..ba85a896 100644 --- a/plugins/omelasticsearch/Makefile.am +++ b/plugins/omelasticsearch/Makefile.am @@ -4,6 +4,6 @@ pkglib_LTLIBRARIES = omelasticsearch.la omelasticsearch_la_SOURCES = omelasticsearch.c cJSON/cjson.c cJSON/cjson.h omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) omelasticsearch_la_LDFLAGS = -module -avoid-version -omelasticsearch_la_LIBADD = $(CURL_LIBS) +omelasticsearch_la_LIBADD = $(CURL_LIBS) $(LIBM) EXTRA_DIST = diff --git a/runtime/datetime.c b/runtime/datetime.c index 10ab3c64..c03abab9 100644 --- a/runtime/datetime.c +++ b/runtime/datetime.c @@ -77,7 +77,7 @@ timeval2syslogTime(struct timeval *tp, struct syslogTime *t) /* Solaris uses a different method of exporting the time zone. * It is UTC - localtime, which is the opposite sign of mins east of GMT. */ - lBias = -(daylight ? altzone : timezone); + lBias = -(tm->tm_isdst ? altzone : timezone); # elif defined(__hpux) lBias = tz.tz_dsttime ? - tz.tz_minuteswest : 0; # else @@ -900,6 +900,11 @@ time_t syslogTime2time_t(struct syslogTime *ts) case 12: MonthInDays = 334; //until 01 of December break; + default: /* this cannot happen (and would be a program error) + * but we need the code to keep the compiler silent. + */ + MonthInDays = 0; /* any value fits ;) */ + break; } diff --git a/runtime/msg.c b/runtime/msg.c index d648bb92..ce863299 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -302,120 +302,20 @@ static uchar * jsonPathGetLeaf(uchar *name, int lenName); static struct json_object *jsonDeepCopy(struct json_object *src); -/* The following functions will support advanced output module - * multithreading, once this is implemented. Currently, we - * include them as hooks only. The idea is that we need to guard - * some msg objects data fields against concurrent access if - * we run on multiple threads. Please note that in any case this - * is not necessary for calls from INPUT modules, because they - * construct the message object and do this serially. Only when - * the message is in the processing queue, multiple threads may - * access a single object. Consequently, there are no guard functions - * for "set" methods, as these are called during input. Only "get" - * functions that modify important structures have them. - * rgerhards, 2007-07-20 - * We now support locked and non-locked operations, depending on - * the configuration of rsyslog. To support this, we use function - * pointers. Initially, we start in non-locked mode. There, all - * locking operations call into dummy functions. When locking is - * enabled, the function pointers are changed to functions doing - * actual work. We also introduced another MsgPrepareEnqueue() function - * which initializes the locking structures, if needed. This is - * necessary because internal messages during config file startup - * processing are always created in non-locking mode. So we can - * not initialize locking structures during constructions. We now - * postpone this until when the message is fully constructed and - * enqueued. Then we know the status of locking. This has a nice - * side effect, and that is that during the initial creation of - * the Msg object no locking needs to be done, which results in better - * performance. -- rgerhards, 2008-01-05 - */ -static void (*funcLock)(msg_t *pMsg); -static void (*funcUnlock)(msg_t *pMsg); -static void (*funcDeleteMutex)(msg_t *pMsg); -void (*funcMsgPrepareEnqueue)(msg_t *pMsg); -#if 1 /* This is a debug aid */ -#define MsgLock(pMsg) funcLock(pMsg) -#define MsgUnlock(pMsg) funcUnlock(pMsg) -#else -#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; } -#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); } -#endif - -/* the next function is a dummy to be used by the looking functions - * when the class is not yet running in an environment where locking - * is necessary. Please note that the need to lock can (and will) change - * during a single run. Typically, this is depending on the operation mode - * of the message queues (which is operator-configurable). -- rgerhards, 2008-01-05 - */ -static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg) -{ - /* empty be design */ -} - - -/* The following function prepares a message for enqueue into the queue. This is - * where a message may be accessed by multiple threads. This implementation here - * is the version for multiple concurrent acces. It initializes the locking - * structures. - * TODO: change to an iRet interface! -- rgerhards, 2008-07-14 - */ -static void MsgPrepareEnqueueLockingCase(msg_t *pThis) -{ - BEGINfunc - assert(pThis != NULL); - pthread_mutex_init(&pThis->mut, NULL); - pThis->bDoLock = 1; - ENDfunc -} - - -/* ... and now the locking and unlocking implementations: */ -static void MsgLockLockingCase(msg_t *pThis) +/* the locking and unlocking implementations: */ +static inline void +MsgLock(msg_t *pThis) { /* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */ - assert(pThis != NULL); - if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */ - pthread_mutex_lock(&pThis->mut); + pthread_mutex_lock(&pThis->mut); } - -static void MsgUnlockLockingCase(msg_t *pThis) +static inline void +MsgUnlock(msg_t *pThis) { /* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */ - assert(pThis != NULL); - if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */ - pthread_mutex_unlock(&pThis->mut); + pthread_mutex_unlock(&pThis->mut); } -/* delete the mutex object on message destruction (locking case) - */ -static void MsgDeleteMutexLockingCase(msg_t *pThis) -{ - assert(pThis != NULL); - pthread_mutex_destroy(&pThis->mut); -} - -/* enable multiple concurrent access on the message object - * This works on a class-wide basis and can bot be undone. - * That is, if it is once enabled, it can not be disabled during - * the same run. When this function is called, no other thread - * must manipulate message objects. Then we would have race conditions, - * but guarding against this is counter-productive because it - * would cost additional time. Plus, it would be a programming error. - * rgerhards, 2008-01-05 - */ -rsRetVal MsgEnableThreadSafety(void) -{ - DEFiRet; - funcLock = MsgLockLockingCase; - funcUnlock = MsgUnlockLockingCase; - funcMsgPrepareEnqueue = MsgPrepareEnqueueLockingCase; - funcDeleteMutex = MsgDeleteMutexLockingCase; - RETiRet; -} - -/* end locking functions */ - /* rgerhards 2012-04-18: set associated ruleset (by ruleset name) * If ruleset cannot be found, no update is done. @@ -728,8 +628,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) /* initialize members in ORDER they appear in structure (think "cache line"!) */ pM->flowCtlType = 0; - pM->bDoLock = 0; - pM->bAlreadyFreed = 0; pM->bParseSuccess = 0; pM->iRefCount = 1; pM->iSeverity = -1; @@ -772,6 +670,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) pM->pszTIMESTAMP_Unix[0] = '\0'; pM->pszRcvdAt_Unix[0] = '\0'; pM->pszUUID = NULL; + pthread_mutex_init(&pM->mut, NULL); /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/ @@ -874,15 +773,6 @@ CODESTARTobjDestruct(msg) if(currRefCount == 0) { /* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */ - /* The if below is included to try to nail down a well-hidden bug causing - * segfaults. I hope that do to the test code the problem is sooner detected and - * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23. - * TODO: remove when no longer needed. - */ - if(pThis->bAlreadyFreed) - abort(); - pThis->bAlreadyFreed = 1; - /* end debug code */ if(pThis->pszRawMsg != pThis->szRawMsg) free(pThis->pszRawMsg); freeTAG(pThis); @@ -920,7 +810,7 @@ CODESTARTobjDestruct(msg) # ifndef HAVE_ATOMIC_BUILTINS MsgUnlock(pThis); # endif - funcDeleteMutex(pThis); + pthread_mutex_destroy(&pThis->mut); /* now we need to do our own optimization. Testing has shown that at least the glibc * malloc() subsystem returns memory to the OS far too late in our case. So we need * to help it a bit, by calling malloc_trim(), which will tell the alloc subsystem @@ -3759,17 +3649,89 @@ msgGetMsgVarNew(msg_t *pThis, uchar *name) } -/* This is a construction finalizer that must be called after all properties - * have been set. It does some final work on the message object. After this - * is done, the object is considered ready for full processing. - * rgerhards, 2008-07-08 +/* This function can be used as a generic way to set properties. + * We have to handle a lot of legacy, so our return value is not always + * 100% correct (called functions do not always provide one, should + * change over time). + * rgerhards, 2008-01-07 */ -rsRetVal -msgConstructFinalizer(msg_t *pThis) +#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) +rsRetVal MsgSetProperty(msg_t *pThis, var_t *pProp) { - MsgPrepareEnqueue(pThis); - return RS_RET_OK; + prop_t *myProp; + prop_t *propRcvFrom = NULL; + prop_t *propRcvFromIP = NULL; + struct json_tokener *tokener; + struct json_object *json; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, msg); + assert(pProp != NULL); + + if(isProp("iProtocolVersion")) { + setProtocolVersion(pThis, pProp->val.num); + } else if(isProp("iSeverity")) { + pThis->iSeverity = pProp->val.num; + } else if(isProp("iFacility")) { + pThis->iFacility = pProp->val.num; + } else if(isProp("msgFlags")) { + pThis->msgFlags = pProp->val.num; + } else if(isProp("offMSG")) { + MsgSetMSGoffs(pThis, pProp->val.num); + } else if(isProp("pszRawMsg")) { + MsgSetRawMsg(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr), cstrLen(pProp->val.pStr)); + } else if(isProp("pszUxTradMsg")) { + /*IGNORE*/; /* this *was* a property, but does no longer exist */ + } else if(isProp("pszTAG")) { + MsgSetTAG(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), cstrLen(pProp->val.pStr)); + } else if(isProp("pszInputName")) { + /* we need to create a property */ + CHKiRet(prop.Construct(&myProp)); + CHKiRet(prop.SetString(myProp, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr))); + CHKiRet(prop.ConstructFinalize(myProp)); + MsgSetInputName(pThis, myProp); + prop.Destruct(&myProp); + } else if(isProp("pszRcvFromIP")) { + MsgSetRcvFromIPStr(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr), &propRcvFromIP); + prop.Destruct(&propRcvFromIP); + } else if(isProp("pszRcvFrom")) { + MsgSetRcvFromStr(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr), &propRcvFrom); + prop.Destruct(&propRcvFrom); + } else if(isProp("pszHOSTNAME")) { + MsgSetHOSTNAME(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr)); + } else if(isProp("pCSStrucData")) { + MsgSetStructuredData(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr)); + } else if(isProp("pCSAPPNAME")) { + MsgSetAPPNAME(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr)); + } else if(isProp("pCSPROCID")) { + MsgSetPROCID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr)); + } else if(isProp("pCSMSGID")) { + MsgSetMSGID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr)); + } else if(isProp("ttGenTime")) { + pThis->ttGenTime = pProp->val.num; + } else if(isProp("tRcvdAt")) { + memcpy(&pThis->tRcvdAt, &pProp->val.vSyslogTime, sizeof(struct syslogTime)); + } else if(isProp("tTIMESTAMP")) { + memcpy(&pThis->tTIMESTAMP, &pProp->val.vSyslogTime, sizeof(struct syslogTime)); + } else if(isProp("pszRuleset")) { + MsgSetRulesetByName(pThis, pProp->val.pStr); + } else if(isProp("pszMSG")) { + dbgprintf("no longer supported property pszMSG silently ignored\n"); + } else if(isProp("json")) { + tokener = json_tokener_new(); + json = json_tokener_parse_ex(tokener, (char*)rsCStrGetSzStrNoNULL(pProp->val.pStr), + cstrLen(pProp->val.pStr)); + json_tokener_free(tokener); + msgAddJSON(pThis, (uchar*)"!", json); + } else { + dbgprintf("unknown supported property '%s' silently ignored\n", + rsCStrGetSzStrNoNULL(pProp->pcsName)); + } + +finalize_it: + RETiRet; } +#undef isProp /* get the severity - this is an entry point that @@ -4079,11 +4041,6 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE) /* set our own handlers */ OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); - /* initially, we have no need to lock message objects */ - funcLock = MsgLockingDummy; - funcUnlock = MsgLockingDummy; - funcDeleteMutex = MsgLockingDummy; - funcMsgPrepareEnqueue = MsgLockingDummy; /* some more inits */ # if HAVE_MALLOC_TRIM INIT_ATOMIC_HELPER_MUT(mutTrimCtr); diff --git a/runtime/msg.h b/runtime/msg.h index 338139be..c3acebd1 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -64,7 +64,6 @@ struct msg { once data has entered the queue, this property is no longer needed. */ pthread_mutex_t mut; int iRefCount; /* reference counter (0 = unused) */ - sbool bDoLock; /* use the mutex? */ sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */ sbool bParseSuccess; /* set to reflect state of last executed higher level parser */ short iSeverity; /* the severity 0..7 */ @@ -179,7 +178,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, char *textpri(char *pRes, size_t pResLen, int pri); rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar); es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name); -rsRetVal MsgEnableThreadSafety(void); uchar *getRcvFrom(msg_t *pM); void getTAG(msg_t *pM, uchar **ppBuf, int *piLen); char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt); @@ -217,16 +215,6 @@ msgUnsetJSON(msg_t *pMsg, uchar *varname) { } -/* The MsgPrepareEnqueue() function is a macro for performance reasons. - * It needs one global variable to work. This is acceptable, as it gains - * us quite some performance and is fully abstracted using this header file. - * The important thing is that no other module is permitted to actually - * access that global variable! -- rgerhards, 2008-01-05 - */ -extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg); -#define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg) - - /* ------------------------------ some inline functions ------------------------------ */ /* set raw message size. This is needed in some cases where a trunctation is necessary diff --git a/runtime/obj.c b/runtime/obj.c index 03d25cdc..99ccc923 100644 --- a/runtime/obj.c +++ b/runtime/obj.c @@ -921,7 +921,9 @@ objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpect CHKiRet(fFixup(pObj, pUsr)); /* we have a valid object, let's finalize our work and return */ - CHKiRet(objConstructFinalize(pObj)); + if(objConstructFinalize != NULL) { + CHKiRet(objConstructFinalize(pObj)); + } *((obj_t**) ppObj) = pObj; diff --git a/runtime/queue.c b/runtime/queue.c index 1ee34335..99fb5fbd 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -920,7 +920,7 @@ static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL, - NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgDeserialize); + NULL, msgConstructForDeserializer, NULL, MsgDeserialize); RETiRet; } @@ -1340,7 +1340,7 @@ finalize_it: } -/* set default inisde queue object suitable for action queues. +/* set default inside queue object suitable for action queues. * This shall be called directly after queue construction. This functions has * been added in support of the new v6 config system. It expect properly pre-initialized * objects, but we need to differentiate between ruleset main and action queues. @@ -1374,6 +1374,36 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) } +/* set defaults inside queue object suitable for main/ruleset queues. + * See queueSetDefaultsActionQueue() for more details and background. + */ +void +qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) +{ + pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ + pThis->iMaxQueueSize = 50000; /* size of the main message queue above */ + pThis->iDeqBatchSize = 1024; /* default batch size */ + pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */ + pThis->iDiscardMrk = 49500; /* begin to discard messages */ + pThis->iDiscardSeverity = 8; /* turn off */ + pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ + pThis->iMaxFileSize = 16*1024*1024; + pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */ + pThis->bSyncQueueFiles = 0; + pThis->toQShutdown = 1500; /* queue shutdown */ + pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ + pThis->toEnq = 2000; /* timeout for queue enque */ + pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ + pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */ + pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + pThis->sizeOnDiskMax = 0; /* unlimited */ + pThis->iDeqSlowdown = 0; + pThis->iDeqtWinFromHr = 0; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ +} + + /* This function checks if the provided message shall be discarded and does so, if needed. * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to * provide real-time creation of spool files. @@ -2655,6 +2685,15 @@ qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals) return RS_RET_OK; } + +/* are any queue params set at all? 1 - yes, 0 - no */ +int +queueCnfParamsSet(struct cnfparamvals *pvals) +{ + return cnfparamvalsIsSet(&pblk, pvals); +} + + /* apply all params from param block to queue. Must be called before * finalizing. This supports the v6 config system. Defaults were already * set during queue creation. The pvals object is destructed by this diff --git a/runtime/queue.h b/runtime/queue.h index 7db2d90d..886fac8d 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -198,7 +198,9 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); +int queueCnfParamsSet(struct cnfparamvals *pvals); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); +void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); void qqueueSetDefaultsActionQueue(qqueue_t *pThis); void qqueueDbgPrint(qqueue_t *pThis); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index d15d37d7..d8b81f1b 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -292,6 +292,9 @@ getNOW(eNOWType eNow, es_str_t **estr) case NOW_MINUTE: len = snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "%2.2d", t.minute); break; + default: + len = snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "*invld eNow*"); + break; } /* now create a string object out of it and hand that over to the var */ @@ -477,6 +480,9 @@ cnfGetVar(char *name, void *usrptr) estr = msgGetCEEVarNew((msg_t*) usrptr, name+2); else estr = msgGetMsgVarNew((msg_t*) usrptr, (uchar*)name+1); + } else { /* if this happens, we have a program logic error */ + estr = es_newStrFromCStr("err: var must start with $", + strlen("err: var must start with $")); } if(Debug) { char *s; @@ -753,7 +759,7 @@ activateMainQueue() { DEFiRet; /* create message queue */ - CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) { + CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), NULL)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); FINALIZE; diff --git a/runtime/ruleset.c b/runtime/ruleset.c index bc8f5234..c8cb87fd 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -844,7 +844,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal) rsname = (conf->rulesets.pCurr->pszName == NULL) ? (uchar*) "[ruleset]" : conf->rulesets.pCurr->pszName; DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname); - CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname)); + CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname, NULL)); finalize_it: RETiRet; @@ -904,6 +904,7 @@ rsRetVal rulesetProcessCnf(struct cnfobj *o) { struct cnfparamvals *pvals; + struct cnfparamvals *queueParams; rsRetVal localRet; uchar *rsName = NULL; uchar *parserName; @@ -911,6 +912,7 @@ rulesetProcessCnf(struct cnfobj *o) ruleset_t *pRuleset; struct cnfarray *ar; int i; + uchar *rsname; DEFiRet; pvals = nvlstGetParams(o->nvlst, &rspblk, NULL); @@ -938,14 +940,21 @@ rulesetProcessCnf(struct cnfobj *o) /* we have only two params, so we do NOT do the usual param loop */ parserIdx = cnfparamGetIdx(&rspblk, "parser"); - if(parserIdx == -1 || !pvals[parserIdx].bUsed) - FINALIZE; + if(parserIdx != -1 && pvals[parserIdx].bUsed) { + ar = pvals[parserIdx].val.d.ar; + for(i = 0 ; i < ar->nmemb ; ++i) { + parserName = (uchar*)es_str2cstr(ar->arr[i], NULL); + doRulesetAddParser(pRuleset, parserName); + free(parserName); + } + } - ar = pvals[parserIdx].val.d.ar; - for(i = 0 ; i < ar->nmemb ; ++i) { - parserName = (uchar*)es_str2cstr(ar->arr[i], NULL); - doRulesetAddParser(pRuleset, parserName); - free(parserName); + /* pick up ruleset queue parameters */ + qqueueDoCnfParams(o->nvlst, &queueParams); + if(queueCnfParamsSet(queueParams)) { + rsname = (pRuleset->pszName == NULL) ? (uchar*) "[ruleset]" : pRuleset->pszName; + DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname); + CHKiRet(createMainQueue(&pRuleset->pQueue, rsname, queueParams)); } finalize_it: @@ -2114,8 +2114,14 @@ void tplPrintList(rsconf_t *conf) case tplFmtUnixDate: dbgprintf("[Format as Unix timestamp] "); break; + case tplFmtSecFrac: + dbgprintf("[fractional seconds, only] "); + break; + case tplFmtRFC3164BuggyDate: + dbgprintf("[Format as buggy RFC3164-Date] "); + break; default: - dbgprintf("[INVALID eDateFormat %d] ", pTpe->data.field.eDateFormat); + dbgprintf("[UNKNOWN eDateFormat %d] ", pTpe->data.field.eDateFormat); } switch(pTpe->data.field.eCaseConv) { case tplCaseConvNo: diff --git a/tests/queue-persist-drvr.sh b/tests/queue-persist-drvr.sh index 53fbcb8b..de597308 100755 --- a/tests/queue-persist-drvr.sh +++ b/tests/queue-persist-drvr.sh @@ -24,6 +24,7 @@ source $srcdir/diag.sh check-mainq-spool echo "#" > work-delay.conf source $srcdir/diag.sh startup queue-persist.conf source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +./msleep 500 $srcdir/diag.sh wait-shutdown source $srcdir/diag.sh seq-check 0 4999 source $srcdir/diag.sh exit diff --git a/tools/pidfile.c b/tools/pidfile.c index e9601232..8298b94e 100644 --- a/tools/pidfile.c +++ b/tools/pidfile.c @@ -55,7 +55,8 @@ int read_pid (char *pidfile) if (!(f=fopen(pidfile,"r"))) return 0; - fscanf(f,"%d", &pid); + if(fscanf(f,"%d", &pid) != 1) + pid = 0; fclose(f); return pid; } @@ -113,7 +114,8 @@ int write_pid (char *pidfile) #if HAVE_FLOCK if (flock(fd, LOCK_EX|LOCK_NB) == -1) { - fscanf(f, "%d", &pid); + if(fscanf(f, "%d", &pid) != 1) + pid = 0; fclose(f); printf("Can't lock, lock is held by pid %d.\n", pid); return 0; diff --git a/tools/syslogd.c b/tools/syslogd.c index 5a620f80..4be8536d 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -602,7 +602,6 @@ submitMsg2(msg_t *pMsg) FINALIZE; } - MsgPrepareEnqueue(pMsg); qqueueEnqMsg(pQueue, pMsg->flowCtlType, pMsg); finalize_it: @@ -642,10 +641,6 @@ multiSubmitMsg2(multi_submit_t *pMultiSub) FINALIZE; } - for(i = 0 ; i < pMultiSub->nElem ; ++i) { - MsgPrepareEnqueue(pMultiSub->ppMsgs[i]); - } - iRet = pQueue->MultiEnq(pQueue, pMultiSub); pMultiSub->nElem = 0; @@ -727,8 +722,11 @@ static void debug_switch() * a minimal delay, but it is much cleaner than the approach of doing everything * inside the signal handler. * rgerhards, 2005-10-26 - * Note: we do not call DBGPRINTF() as this may cause us to block in case something - * with the threading is wrong. + * Note: + * - we do not call DBGPRINTF() as this may cause us to block in case something + * with the threading is wrong. + * - we do not really care about the return state of write(), but we need this + * strange check we do to silence compiler warnings (thanks, Ubuntu!) */ static void doDie(int sig) { @@ -736,11 +734,13 @@ static void doDie(int sig) # define MSG2 "DoDie called 5 times - unconditional exit\n" static int iRetries = 0; /* debug aid */ dbgprintf(MSG1); - if(Debug == DEBUG_FULL) - write(1, MSG1, sizeof(MSG1) - 1); + if(Debug == DEBUG_FULL) { + if(write(1, MSG1, sizeof(MSG1) - 1)) {} + } if(iRetries++ == 4) { - if(Debug == DEBUG_FULL) - write(1, MSG2, sizeof(MSG2) - 1); + if(Debug == DEBUG_FULL) { + if(write(1, MSG2, sizeof(MSG2) - 1)) {} + } abort(); } bFinished = sig; @@ -1059,7 +1059,7 @@ finalize_it: * the time being (remember that we want to restructure config processing at large!). * rgerhards, 2009-10-27 */ -rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName) +rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName, struct cnfparamvals *queueParams) { struct queuefilenames_s *qfn; uchar *qfname = NULL; @@ -1067,11 +1067,6 @@ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName) uchar qfrenamebuf[1024]; DEFiRet; - /* switch the message object to threaded operation, if necessary */ - if(ourConf->globals.mainQ.MainMsgQueType == QUEUETYPE_DIRECT || ourConf->globals.mainQ.iMainMsgQueueNumWorkers > 1) { - MsgEnableThreadSafety(); - } - /* create message queue */ CHKiRet_Hdlr(qqueueConstruct(ppQueue, ourConf->globals.mainQ.MainMsgQueType, ourConf->globals.mainQ.iMainMsgQueueNumWorkers, ourConf->globals.mainQ.iMainMsgQueueSize, msgConsumer)) { /* no queue is fatal, we need to give up in that case... */ @@ -1080,60 +1075,65 @@ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName) /* name our main queue object (it's not fatal if it fails...) */ obj.SetName((obj_t*) (*ppQueue), pszQueueName); - /* ... set some properties ... */ -# define setQPROP(func, directive, data) \ - CHKiRet_Hdlr(func(*ppQueue, data)) { \ - errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ - } -# define setQPROPstr(func, directive, data) \ - CHKiRet_Hdlr(func(*ppQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ - errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ - } + if(queueParams == NULL) { /* use legacy parameters? */ + /* ... set some properties ... */ + # define setQPROP(func, directive, data) \ + CHKiRet_Hdlr(func(*ppQueue, data)) { \ + errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ + } + # define setQPROPstr(func, directive, data) \ + CHKiRet_Hdlr(func(*ppQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ + errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ + } - if(ourConf->globals.mainQ.pszMainMsgQFName != NULL) { - /* check if the queue file name is unique, else emit an error */ - for(qfn = queuefilenames ; qfn != NULL ; qfn = qfn->next) { - dbgprintf("check queue file name '%s' vs '%s'\n", qfn->name, ourConf->globals.mainQ.pszMainMsgQFName ); - if(!ustrcmp(qfn->name, ourConf->globals.mainQ.pszMainMsgQFName)) { - snprintf((char*)qfrenamebuf, sizeof(qfrenamebuf), "%d-%s-%s", - ++qfn_renamenum, ourConf->globals.mainQ.pszMainMsgQFName, - (pszQueueName == NULL) ? "NONAME" : (char*)pszQueueName); - qfname = ustrdup(qfrenamebuf); - errmsg.LogError(0, NO_ERRCODE, "Error: queue file name '%s' already in use " - " - using '%s' instead", ourConf->globals.mainQ.pszMainMsgQFName, qfname); - break; + if(ourConf->globals.mainQ.pszMainMsgQFName != NULL) { + /* check if the queue file name is unique, else emit an error */ + for(qfn = queuefilenames ; qfn != NULL ; qfn = qfn->next) { + dbgprintf("check queue file name '%s' vs '%s'\n", qfn->name, ourConf->globals.mainQ.pszMainMsgQFName ); + if(!ustrcmp(qfn->name, ourConf->globals.mainQ.pszMainMsgQFName)) { + snprintf((char*)qfrenamebuf, sizeof(qfrenamebuf), "%d-%s-%s", + ++qfn_renamenum, ourConf->globals.mainQ.pszMainMsgQFName, + (pszQueueName == NULL) ? "NONAME" : (char*)pszQueueName); + qfname = ustrdup(qfrenamebuf); + errmsg.LogError(0, NO_ERRCODE, "Error: queue file name '%s' already in use " + " - using '%s' instead", ourConf->globals.mainQ.pszMainMsgQFName, qfname); + break; + } } + if(qfname == NULL) + qfname = ustrdup(ourConf->globals.mainQ.pszMainMsgQFName); + qfn = malloc(sizeof(struct queuefilenames_s)); + qfn->name = qfname; + qfn->next = queuefilenames; + queuefilenames = qfn; } - if(qfname == NULL) - qfname = ustrdup(ourConf->globals.mainQ.pszMainMsgQFName); - qfn = malloc(sizeof(struct queuefilenames_s)); - qfn->name = qfname; - qfn->next = queuefilenames; - queuefilenames = qfn; - } - setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", ourConf->globals.mainQ.iMainMsgQueMaxFileSize); - setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", ourConf->globals.mainQ.iMainMsgQueMaxDiskSpace); - setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", ourConf->globals.mainQ.iMainMsgQueDeqBatchSize); - setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", qfname); - setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", ourConf->globals.mainQ.iMainMsgQPersistUpdCnt); - setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", ourConf->globals.mainQ.bMainMsgQSyncQeueFiles); - setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", ourConf->globals.mainQ.iMainMsgQtoQShutdown ); - setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", ourConf->globals.mainQ.iMainMsgQtoActShutdown); - setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", ourConf->globals.mainQ.iMainMsgQtoWrkShutdown); - setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", ourConf->globals.mainQ.iMainMsgQtoEnq); - setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", ourConf->globals.mainQ.iMainMsgQHighWtrMark); - setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", ourConf->globals.mainQ.iMainMsgQLowWtrMark); - setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", ourConf->globals.mainQ.iMainMsgQDiscardMark); - setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", ourConf->globals.mainQ.iMainMsgQDiscardSeverity); - setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", ourConf->globals.mainQ.iMainMsgQWrkMinMsgs); - setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", ourConf->globals.mainQ.bMainMsgQSaveOnShutdown); - setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", ourConf->globals.mainQ.iMainMsgQDeqSlowdown); - setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", ourConf->globals.mainQ.iMainMsgQueueDeqtWinFromHr); - setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", ourConf->globals.mainQ.iMainMsgQueueDeqtWinToHr); - -# undef setQPROP -# undef setQPROPstr + setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", ourConf->globals.mainQ.iMainMsgQueMaxFileSize); + setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", ourConf->globals.mainQ.iMainMsgQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", ourConf->globals.mainQ.iMainMsgQueDeqBatchSize); + setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", qfname); + setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", ourConf->globals.mainQ.iMainMsgQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", ourConf->globals.mainQ.bMainMsgQSyncQeueFiles); + setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", ourConf->globals.mainQ.iMainMsgQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", ourConf->globals.mainQ.iMainMsgQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", ourConf->globals.mainQ.iMainMsgQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", ourConf->globals.mainQ.iMainMsgQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", ourConf->globals.mainQ.iMainMsgQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", ourConf->globals.mainQ.iMainMsgQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", ourConf->globals.mainQ.iMainMsgQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", ourConf->globals.mainQ.iMainMsgQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", ourConf->globals.mainQ.iMainMsgQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", ourConf->globals.mainQ.bMainMsgQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", ourConf->globals.mainQ.iMainMsgQDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", ourConf->globals.mainQ.iMainMsgQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", ourConf->globals.mainQ.iMainMsgQueueDeqtWinToHr); + + # undef setQPROP + # undef setQPROPstr + } else { /* use new style config! */ + qqueueSetDefaultsRulesetQueue(*ppQueue); + qqueueApplyCnfParam(*ppQueue, queueParams); + } /* ... and finally start the queue! */ CHKiRet_Hdlr(qqueueStart(*ppQueue)) { |