summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog69
-rw-r--r--action.c8
-rw-r--r--configure.ac27
-rw-r--r--dirty.h2
-rw-r--r--grammar/rainerscript.c24
-rw-r--r--grammar/rainerscript.h1
-rw-r--r--plugins/imuxsock/imuxsock.c10
-rw-r--r--plugins/omelasticsearch/Makefile.am2
-rw-r--r--runtime/datetime.c7
-rw-r--r--runtime/msg.c221
-rw-r--r--runtime/msg.h12
-rw-r--r--runtime/obj.c4
-rw-r--r--runtime/queue.c43
-rw-r--r--runtime/queue.h2
-rw-r--r--runtime/rsconf.c8
-rw-r--r--runtime/ruleset.c25
-rw-r--r--template.c8
-rwxr-xr-xtests/queue-persist-drvr.sh1
-rw-r--r--tools/pidfile.c6
-rw-r--r--tools/syslogd.c134
20 files changed, 351 insertions, 263 deletions
diff --git a/ChangeLog b/ChangeLog
index 38364d92..d86526e9 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -21,6 +21,21 @@ Version 7.3.5 [devel] 2012-11-??
- bugfix(kind of): script optimizer did not work for complex boolean
expressions
- doc bugfix: corrections and improvements in mmnormalize html doc page
+- bugfix: some message properties could be garbled due to race condition
+ This happened only on very high volume systems, if the same message was
+ being processed by two different actions. This was a regression caused
+ by the new config processor, which did no longer properly enable msg
+ locking in multithreaded cases. The bugfix is actually a refactoring of
+ the msg locking code - we no longer do unlocked operations, as the use
+ case for it has mostly gone away. It is potentially possible only at
+ very low-end systems, and there the small additional overhead of doing
+ the locking does not really hurt. Instead, the removal of that
+ capability can actually slightly improve performance in common cases,
+ as the code path is smaller and requires slightly less memory writes.
+ That probably outperforms the extra locking overhead (which in the
+ low-end case always happens in user space, without need for kernel
+ support as we can always directly aquire the lock - there is no
+ contention at all).
---------------------------------------------------------------------------
Version 7.3.4 [devel] 2012-11-23
- further (and rather drastically) improved disk queue performance
@@ -96,9 +111,26 @@ Version 7.3.0 [devel] 2012-10-09
This was achieved by somewhat reducing the robustness of the zip archive.
This is controlled by the new action parameter "VeryReliableZip".
----------------------------------------------------------------------------
-Version 7.2.4 [v7-stable] 2012-10-??
+Version 7.2.5 [v7-stable] 2013-01-??
+- build system cleanup (thanks to Michael Biebl for this!)
+- bugfix: omelasticsearch did not properly compile on some platforms
+ due to missing libmath. Thanks to Michael Biebl for the fix
+- bugfix: invalid DST handling under Solaris
+ Thanks to Scott Severtson for the patch.
+----------------------------------------------------------------------------
+Version 7.2.4 [v7-stable] 2012-12-07
+- enhance: permit RFC3339 timestamp in local log socket messages
+ Thanks to Sebastien Ponce for the patch.
- imklog: added ParseKernelTimestamp parameter (import from 5.10.2)
Thanks to Marius Tomaschewski for the patch.
+- fix missing functionality: ruleset(){} could not specify ruleset queue
+ The "queue.xxx" parameter set was not supported, and legacy ruleset
+ config statements did not work (by intention). The fix introduces the
+ "queue.xxx" parameter set. It has some regression potential, but only
+ for the new functionality. Note that using that interface it is possible
+ to specify duplicate queue file names, which will cause trouble. This
+ will be solved in v7.3, because there is a too-large regression
+ potential for the v7.2 stable branch.
- imklog: added KeepKernelTimestamp parameter (import from 5.10.2)
Thanks to Marius Tomaschewski for the patch.
- bugfix: imklog mistakenly took kernel timestamp subseconds as nanoseconds
@@ -112,6 +144,21 @@ Version 7.2.4 [v7-stable] 2012-10-??
That invalid error message could be quite misleading and actually stop
people from addressing the real problem (aka "go nuts" ;))
- bugfix: template "type" parameter is mandatory (but was not)
+- bugfix: some message properties could be garbled due to race condition
+ This happened only on very high volume systems, if the same message was
+ being processed by two different actions. This was a regression caused
+ by the new config processor, which did no longer properly enable msg
+ locking in multithreaded cases. The bugfix is actually a refactoring of
+ the msg locking code - we no longer do unlocked operations, as the use
+ case for it has mostly gone away. It is potentially possible only at
+ very low-end systems, and there the small additional overhead of doing
+ the locking does not really hurt. Instead, the removal of that
+ capability can actually slightly improve performance in common cases,
+ as the code path is smaller and requires slightly less memory writes.
+ That probably outperforms the extra locking overhead (which in the
+ low-end case always happens in user space, without need for kernel
+ support as we can always directly aquire the lock - there is no
+ contention at all).
----------------------------------------------------------------------------
Version 7.2.3 [v7-stable] 2012-10-21
- regression fix: rsyslogd terminated when wild-card $IncludeConfig did not
@@ -206,7 +253,6 @@ Version 7.1.12 [beta] 2012-10-18
This happened only under some circumstances. Thanks to Marius
Tomaschewski and Florian Piekert for their help in solving this issue.
----------------------------------------------------------------------------
->>>>>>> b151584d0929759284c0fb0399709e5ca0e29d60
Version 7.1.11 [beta] 2012-10-16
- bugfix: imuxsock truncated head of received message
This happened only under some circumstances. Thanks to Marius
@@ -362,6 +408,23 @@ Version 6.6.1 [v6-stable] 2012-10-??
- bugfix: hostname set in rsyslog.conf was not picked up until HUP
which could also mean "never" or "not for a very long time".
Thanks to oxpa for providing analysis and a patch
+- bugfix: some message properties could be garbled due to race condition
+ This happened only on very high volume systems, if the same message was
+ being processed by two different actions. This was a regression caused
+ by the new config processor, which did no longer properly enable msg
+ locking in multithreaded cases. The bugfix is actually a refactoring of
+ the msg locking code - we no longer do unlocked operations, as the use
+ case for it has mostly gone away. It is potentially possible only at
+ very low-end systems, and there the small additional overhead of doing
+ the locking does not really hurt. Instead, the removal of that
+ capability can actually slightly improve performance in common cases,
+ as the code path is smaller and requires slightly less memory writes.
+ That probably outperforms the extra locking overhead (which in the
+ low-end case always happens in user space, without need for kernel
+ support as we can always directly aquire the lock - there is no
+ contention at all).
+- bugfix: invalid DST handling under Solaris
+ Thanks to Scott Severtson for the patch.
---------------------------------------------------------------------------
Version 6.6.0 [v6-stable] 2012-10-22
This starts a new stable branch, based on the 6.5.x series, plus:
@@ -1035,6 +1098,8 @@ Version 5.10.2 [V5-STABLE], 201?-??-??
... actually, they are microseconds. So the fractional part of the
timestamp was not properly formatted.
Thanks to Marius Tomaschewski for the bug report and the patch idea.
+- bugfix: invalid DST handling under Solaris
+ Thanks to Scott Severtson for the patch.
---------------------------------------------------------------------------
Version 5.10.1 [V5-STABLE], 2012-10-17
- bugfix: imuxsock and imklog truncated head of received message
diff --git a/action.c b/action.c
index 856488d3..ccc07061 100644
--- a/action.c
+++ b/action.c
@@ -421,14 +421,6 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
pThis->submitToActQ = doSubmitToActionQBatch;
}
- /* we need to make a safety check: if the queue is NOT in direct mode, a single
- * message object may be accessed by multiple threads. As such, we need to enable
- * msg object thread safety in this case (this costs a bit performance and thus
- * is not enabled by default. -- rgerhards, 2008-02-20
- */
- if(cs.ActionQueType != QUEUETYPE_DIRECT)
- MsgEnableThreadSafety();
-
/* create queue */
/* action queues always (for now) have just one worker. This may change when
* we begin to implement an interface the enable output modules to request
diff --git a/configure.ac b/configure.ac
index 6646f8ab..f2fffc34 100644
--- a/configure.ac
+++ b/configure.ac
@@ -243,22 +243,7 @@ AM_CONDITIONAL(ENABLE_GSSAPI, test x$enable_gssapi_krb5 = xyes)
# multithreading via pthreads
-AC_ARG_ENABLE(pthreads,
- [AS_HELP_STRING([--enable-pthreads],[Enable multithreading via pthreads @<:@default=yes@:>@])],
- [case "${enableval}" in
- yes) enable_pthreads="yes" ;;
- no) enable_pthreads="no" ;;
- *) AC_MSG_ERROR(bad value ${enableval} for --enable-pthreads) ;;
- esac],
- [enable_pthreads=yes]
-)
-
-if test "x$enable_pthreads" = "xno"; then
- AC_MSG_ERROR(rsyslog v3+ does no longer support single threading mode -- use a previous version for that);
-fi
-
-if test "x$enable_pthreads" != "xno"; then
- AC_CHECK_HEADERS(
+AC_CHECK_HEADERS(
[pthread.h],
[
AC_CHECK_LIB(
@@ -279,8 +264,7 @@ if test "x$enable_pthreads" != "xno"; then
)
],
[AC_MSG_FAILURE([pthread is missing])]
- )
-fi
+)
AC_CHECK_FUNCS(
[pthread_setschedparam],
@@ -711,13 +695,13 @@ AC_SUBST(SNMP_LIBS)
AC_ARG_ENABLE(uuid,
[AS_HELP_STRING([--enable-uuid],[Enable support for uuid generation @<:@default=yes@:>@])],
[case "${enableval}" in
- yes) enable_elasticsearch="yes" ;;
- no) enable_elasticsearch="no" ;;
+ yes) enable_uuid="yes" ;;
+ no) enable_uuid="no" ;;
*) AC_MSG_ERROR(bad value ${enableval} for --enable-uuid) ;;
esac],
[enable_uuid=yes]
)
-if test "x$enable_elasticsearch" = "xyes"; then
+if test "x$enable_uuid" = "xyes"; then
PKG_CHECK_MODULES([LIBUUID], [uuid])
AC_DEFINE(USE_LIBUUID, 1, [Define if you want to enable libuuid support])
fi
@@ -736,6 +720,7 @@ AC_ARG_ENABLE(elasticsearch,
)
if test "x$enable_elasticsearch" = "xyes"; then
PKG_CHECK_MODULES([CURL], [libcurl])
+ LT_LIB_M
fi
AM_CONDITIONAL(ENABLE_ELASTICSEARCH, test x$enable_elasticsearch = xyes)
diff --git a/dirty.h b/dirty.h
index 743632a2..4db2e3ce 100644
--- a/dirty.h
+++ b/dirty.h
@@ -35,7 +35,7 @@ rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub);
rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags);
rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset);
rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */
-rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName);
+rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName, struct cnfparamvals *queueParams);
extern int MarkInterval;
extern qqueue_t *pMsgQueue; /* the main message queue */
diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c
index 8f20bfcd..bacf851b 100644
--- a/grammar/rainerscript.c
+++ b/grammar/rainerscript.c
@@ -953,6 +953,30 @@ nvlstGetParams(struct nvlst *lst, struct cnfparamblk *params,
}
+/* check if at least one cnfparamval is actually set
+ * returns 1 if so, 0 otherwise
+ */
+int
+cnfparamvalsIsSet(struct cnfparamblk *params, struct cnfparamvals *vals)
+{
+ int i;
+
+ if(vals == NULL)
+ return 0;
+ if(params->version != CNFPARAMBLK_VERSION) {
+ dbgprintf("nvlstGetParams: invalid param block version "
+ "%d, expected %d\n",
+ params->version, CNFPARAMBLK_VERSION);
+ return 0;
+ }
+ for(i = 0 ; i < params->nParams ; ++i) {
+ if(vals[i].bUsed)
+ return 1;
+ }
+ return 0;
+}
+
+
void
cnfparamsPrint(struct cnfparamblk *params, struct cnfparamvals *vals)
{
diff --git a/grammar/rainerscript.h b/grammar/rainerscript.h
index 5cfce795..59ce53f3 100644
--- a/grammar/rainerscript.h
+++ b/grammar/rainerscript.h
@@ -313,6 +313,7 @@ int cnfparamGetIdx(struct cnfparamblk *params, char *name);
struct cnfparamvals* nvlstGetParams(struct nvlst *lst, struct cnfparamblk *params,
struct cnfparamvals *vals);
void cnfparamsPrint(struct cnfparamblk *params, struct cnfparamvals *vals);
+int cnfparamvalsIsSet(struct cnfparamblk *params, struct cnfparamvals *vals);
void varDelete(struct var *v);
void cnfparamvalsDestruct(struct cnfparamvals *paramvals, struct cnfparamblk *blk);
struct cnfstmt * cnfstmtNew(unsigned s_type);
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index d0055b7a..66a1c648 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -859,15 +859,19 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
/* in this case, we still need to find out if we have a valid
* datestamp or not .. and advance the parse pointer accordingly.
*/
- datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg);
+ if (datetime.ParseTIMESTAMP3339(&dummyTS, &parse, &lenMsg) != RS_RET_OK) {
+ datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg);
+ }
} else {
- if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
+ if(datetime.ParseTIMESTAMP3339(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK &&
+ datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
}
}
} else { /* if we pulled the time from the system, we need to update the message text */
uchar *tmpParse = parse; /* just to check correctness of TS */
- if(datetime.ParseTIMESTAMP3164(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK) {
+ if(datetime.ParseTIMESTAMP3339(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK ||
+ datetime.ParseTIMESTAMP3164(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK) {
/* We modify the message only if it contained a valid timestamp,
* otherwise we do not touch it at all. */
datetime.formatTimestamp3164(&st, (char*)parse, 0);
diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am
index 059bdf8a..ba85a896 100644
--- a/plugins/omelasticsearch/Makefile.am
+++ b/plugins/omelasticsearch/Makefile.am
@@ -4,6 +4,6 @@ pkglib_LTLIBRARIES = omelasticsearch.la
omelasticsearch_la_SOURCES = omelasticsearch.c cJSON/cjson.c cJSON/cjson.h
omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
omelasticsearch_la_LDFLAGS = -module -avoid-version
-omelasticsearch_la_LIBADD = $(CURL_LIBS)
+omelasticsearch_la_LIBADD = $(CURL_LIBS) $(LIBM)
EXTRA_DIST =
diff --git a/runtime/datetime.c b/runtime/datetime.c
index 10ab3c64..c03abab9 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -77,7 +77,7 @@ timeval2syslogTime(struct timeval *tp, struct syslogTime *t)
/* Solaris uses a different method of exporting the time zone.
* It is UTC - localtime, which is the opposite sign of mins east of GMT.
*/
- lBias = -(daylight ? altzone : timezone);
+ lBias = -(tm->tm_isdst ? altzone : timezone);
# elif defined(__hpux)
lBias = tz.tz_dsttime ? - tz.tz_minuteswest : 0;
# else
@@ -900,6 +900,11 @@ time_t syslogTime2time_t(struct syslogTime *ts)
case 12:
MonthInDays = 334; //until 01 of December
break;
+ default: /* this cannot happen (and would be a program error)
+ * but we need the code to keep the compiler silent.
+ */
+ MonthInDays = 0; /* any value fits ;) */
+ break;
}
diff --git a/runtime/msg.c b/runtime/msg.c
index d648bb92..ce863299 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -302,120 +302,20 @@ static uchar * jsonPathGetLeaf(uchar *name, int lenName);
static struct json_object *jsonDeepCopy(struct json_object *src);
-/* The following functions will support advanced output module
- * multithreading, once this is implemented. Currently, we
- * include them as hooks only. The idea is that we need to guard
- * some msg objects data fields against concurrent access if
- * we run on multiple threads. Please note that in any case this
- * is not necessary for calls from INPUT modules, because they
- * construct the message object and do this serially. Only when
- * the message is in the processing queue, multiple threads may
- * access a single object. Consequently, there are no guard functions
- * for "set" methods, as these are called during input. Only "get"
- * functions that modify important structures have them.
- * rgerhards, 2007-07-20
- * We now support locked and non-locked operations, depending on
- * the configuration of rsyslog. To support this, we use function
- * pointers. Initially, we start in non-locked mode. There, all
- * locking operations call into dummy functions. When locking is
- * enabled, the function pointers are changed to functions doing
- * actual work. We also introduced another MsgPrepareEnqueue() function
- * which initializes the locking structures, if needed. This is
- * necessary because internal messages during config file startup
- * processing are always created in non-locking mode. So we can
- * not initialize locking structures during constructions. We now
- * postpone this until when the message is fully constructed and
- * enqueued. Then we know the status of locking. This has a nice
- * side effect, and that is that during the initial creation of
- * the Msg object no locking needs to be done, which results in better
- * performance. -- rgerhards, 2008-01-05
- */
-static void (*funcLock)(msg_t *pMsg);
-static void (*funcUnlock)(msg_t *pMsg);
-static void (*funcDeleteMutex)(msg_t *pMsg);
-void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
-#if 1 /* This is a debug aid */
-#define MsgLock(pMsg) funcLock(pMsg)
-#define MsgUnlock(pMsg) funcUnlock(pMsg)
-#else
-#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; }
-#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); }
-#endif
-
-/* the next function is a dummy to be used by the looking functions
- * when the class is not yet running in an environment where locking
- * is necessary. Please note that the need to lock can (and will) change
- * during a single run. Typically, this is depending on the operation mode
- * of the message queues (which is operator-configurable). -- rgerhards, 2008-01-05
- */
-static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg)
-{
- /* empty be design */
-}
-
-
-/* The following function prepares a message for enqueue into the queue. This is
- * where a message may be accessed by multiple threads. This implementation here
- * is the version for multiple concurrent acces. It initializes the locking
- * structures.
- * TODO: change to an iRet interface! -- rgerhards, 2008-07-14
- */
-static void MsgPrepareEnqueueLockingCase(msg_t *pThis)
-{
- BEGINfunc
- assert(pThis != NULL);
- pthread_mutex_init(&pThis->mut, NULL);
- pThis->bDoLock = 1;
- ENDfunc
-}
-
-
-/* ... and now the locking and unlocking implementations: */
-static void MsgLockLockingCase(msg_t *pThis)
+/* the locking and unlocking implementations: */
+static inline void
+MsgLock(msg_t *pThis)
{
/* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */
- assert(pThis != NULL);
- if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */
- pthread_mutex_lock(&pThis->mut);
+ pthread_mutex_lock(&pThis->mut);
}
-
-static void MsgUnlockLockingCase(msg_t *pThis)
+static inline void
+MsgUnlock(msg_t *pThis)
{
/* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */
- assert(pThis != NULL);
- if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */
- pthread_mutex_unlock(&pThis->mut);
+ pthread_mutex_unlock(&pThis->mut);
}
-/* delete the mutex object on message destruction (locking case)
- */
-static void MsgDeleteMutexLockingCase(msg_t *pThis)
-{
- assert(pThis != NULL);
- pthread_mutex_destroy(&pThis->mut);
-}
-
-/* enable multiple concurrent access on the message object
- * This works on a class-wide basis and can bot be undone.
- * That is, if it is once enabled, it can not be disabled during
- * the same run. When this function is called, no other thread
- * must manipulate message objects. Then we would have race conditions,
- * but guarding against this is counter-productive because it
- * would cost additional time. Plus, it would be a programming error.
- * rgerhards, 2008-01-05
- */
-rsRetVal MsgEnableThreadSafety(void)
-{
- DEFiRet;
- funcLock = MsgLockLockingCase;
- funcUnlock = MsgUnlockLockingCase;
- funcMsgPrepareEnqueue = MsgPrepareEnqueueLockingCase;
- funcDeleteMutex = MsgDeleteMutexLockingCase;
- RETiRet;
-}
-
-/* end locking functions */
-
/* rgerhards 2012-04-18: set associated ruleset (by ruleset name)
* If ruleset cannot be found, no update is done.
@@ -728,8 +628,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
/* initialize members in ORDER they appear in structure (think "cache line"!) */
pM->flowCtlType = 0;
- pM->bDoLock = 0;
- pM->bAlreadyFreed = 0;
pM->bParseSuccess = 0;
pM->iRefCount = 1;
pM->iSeverity = -1;
@@ -772,6 +670,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
pM->pszTIMESTAMP_Unix[0] = '\0';
pM->pszRcvdAt_Unix[0] = '\0';
pM->pszUUID = NULL;
+ pthread_mutex_init(&pM->mut, NULL);
/* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/
@@ -874,15 +773,6 @@ CODESTARTobjDestruct(msg)
if(currRefCount == 0)
{
/* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */
- /* The if below is included to try to nail down a well-hidden bug causing
- * segfaults. I hope that do to the test code the problem is sooner detected and
- * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23.
- * TODO: remove when no longer needed.
- */
- if(pThis->bAlreadyFreed)
- abort();
- pThis->bAlreadyFreed = 1;
- /* end debug code */
if(pThis->pszRawMsg != pThis->szRawMsg)
free(pThis->pszRawMsg);
freeTAG(pThis);
@@ -920,7 +810,7 @@ CODESTARTobjDestruct(msg)
# ifndef HAVE_ATOMIC_BUILTINS
MsgUnlock(pThis);
# endif
- funcDeleteMutex(pThis);
+ pthread_mutex_destroy(&pThis->mut);
/* now we need to do our own optimization. Testing has shown that at least the glibc
* malloc() subsystem returns memory to the OS far too late in our case. So we need
* to help it a bit, by calling malloc_trim(), which will tell the alloc subsystem
@@ -3759,17 +3649,89 @@ msgGetMsgVarNew(msg_t *pThis, uchar *name)
}
-/* This is a construction finalizer that must be called after all properties
- * have been set. It does some final work on the message object. After this
- * is done, the object is considered ready for full processing.
- * rgerhards, 2008-07-08
+/* This function can be used as a generic way to set properties.
+ * We have to handle a lot of legacy, so our return value is not always
+ * 100% correct (called functions do not always provide one, should
+ * change over time).
+ * rgerhards, 2008-01-07
*/
-rsRetVal
-msgConstructFinalizer(msg_t *pThis)
+#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1)
+rsRetVal MsgSetProperty(msg_t *pThis, var_t *pProp)
{
- MsgPrepareEnqueue(pThis);
- return RS_RET_OK;
+ prop_t *myProp;
+ prop_t *propRcvFrom = NULL;
+ prop_t *propRcvFromIP = NULL;
+ struct json_tokener *tokener;
+ struct json_object *json;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, msg);
+ assert(pProp != NULL);
+
+ if(isProp("iProtocolVersion")) {
+ setProtocolVersion(pThis, pProp->val.num);
+ } else if(isProp("iSeverity")) {
+ pThis->iSeverity = pProp->val.num;
+ } else if(isProp("iFacility")) {
+ pThis->iFacility = pProp->val.num;
+ } else if(isProp("msgFlags")) {
+ pThis->msgFlags = pProp->val.num;
+ } else if(isProp("offMSG")) {
+ MsgSetMSGoffs(pThis, pProp->val.num);
+ } else if(isProp("pszRawMsg")) {
+ MsgSetRawMsg(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr), cstrLen(pProp->val.pStr));
+ } else if(isProp("pszUxTradMsg")) {
+ /*IGNORE*/; /* this *was* a property, but does no longer exist */
+ } else if(isProp("pszTAG")) {
+ MsgSetTAG(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), cstrLen(pProp->val.pStr));
+ } else if(isProp("pszInputName")) {
+ /* we need to create a property */
+ CHKiRet(prop.Construct(&myProp));
+ CHKiRet(prop.SetString(myProp, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr)));
+ CHKiRet(prop.ConstructFinalize(myProp));
+ MsgSetInputName(pThis, myProp);
+ prop.Destruct(&myProp);
+ } else if(isProp("pszRcvFromIP")) {
+ MsgSetRcvFromIPStr(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr), &propRcvFromIP);
+ prop.Destruct(&propRcvFromIP);
+ } else if(isProp("pszRcvFrom")) {
+ MsgSetRcvFromStr(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr), &propRcvFrom);
+ prop.Destruct(&propRcvFrom);
+ } else if(isProp("pszHOSTNAME")) {
+ MsgSetHOSTNAME(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr));
+ } else if(isProp("pCSStrucData")) {
+ MsgSetStructuredData(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr));
+ } else if(isProp("pCSAPPNAME")) {
+ MsgSetAPPNAME(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr));
+ } else if(isProp("pCSPROCID")) {
+ MsgSetPROCID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr));
+ } else if(isProp("pCSMSGID")) {
+ MsgSetMSGID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr));
+ } else if(isProp("ttGenTime")) {
+ pThis->ttGenTime = pProp->val.num;
+ } else if(isProp("tRcvdAt")) {
+ memcpy(&pThis->tRcvdAt, &pProp->val.vSyslogTime, sizeof(struct syslogTime));
+ } else if(isProp("tTIMESTAMP")) {
+ memcpy(&pThis->tTIMESTAMP, &pProp->val.vSyslogTime, sizeof(struct syslogTime));
+ } else if(isProp("pszRuleset")) {
+ MsgSetRulesetByName(pThis, pProp->val.pStr);
+ } else if(isProp("pszMSG")) {
+ dbgprintf("no longer supported property pszMSG silently ignored\n");
+ } else if(isProp("json")) {
+ tokener = json_tokener_new();
+ json = json_tokener_parse_ex(tokener, (char*)rsCStrGetSzStrNoNULL(pProp->val.pStr),
+ cstrLen(pProp->val.pStr));
+ json_tokener_free(tokener);
+ msgAddJSON(pThis, (uchar*)"!", json);
+ } else {
+ dbgprintf("unknown supported property '%s' silently ignored\n",
+ rsCStrGetSzStrNoNULL(pProp->pcsName));
+ }
+
+finalize_it:
+ RETiRet;
}
+#undef isProp
/* get the severity - this is an entry point that
@@ -4079,11 +4041,6 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE)
/* set our own handlers */
OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize);
- /* initially, we have no need to lock message objects */
- funcLock = MsgLockingDummy;
- funcUnlock = MsgLockingDummy;
- funcDeleteMutex = MsgLockingDummy;
- funcMsgPrepareEnqueue = MsgLockingDummy;
/* some more inits */
# if HAVE_MALLOC_TRIM
INIT_ATOMIC_HELPER_MUT(mutTrimCtr);
diff --git a/runtime/msg.h b/runtime/msg.h
index 338139be..c3acebd1 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -64,7 +64,6 @@ struct msg {
once data has entered the queue, this property is no longer needed. */
pthread_mutex_t mut;
int iRefCount; /* reference counter (0 = unused) */
- sbool bDoLock; /* use the mutex? */
sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */
sbool bParseSuccess; /* set to reflect state of last executed higher level parser */
short iSeverity; /* the severity 0..7 */
@@ -179,7 +178,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name);
-rsRetVal MsgEnableThreadSafety(void);
uchar *getRcvFrom(msg_t *pM);
void getTAG(msg_t *pM, uchar **ppBuf, int *piLen);
char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt);
@@ -217,16 +215,6 @@ msgUnsetJSON(msg_t *pMsg, uchar *varname) {
}
-/* The MsgPrepareEnqueue() function is a macro for performance reasons.
- * It needs one global variable to work. This is acceptable, as it gains
- * us quite some performance and is fully abstracted using this header file.
- * The important thing is that no other module is permitted to actually
- * access that global variable! -- rgerhards, 2008-01-05
- */
-extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
-#define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg)
-
-
/* ------------------------------ some inline functions ------------------------------ */
/* set raw message size. This is needed in some cases where a trunctation is necessary
diff --git a/runtime/obj.c b/runtime/obj.c
index 03d25cdc..99ccc923 100644
--- a/runtime/obj.c
+++ b/runtime/obj.c
@@ -921,7 +921,9 @@ objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpect
CHKiRet(fFixup(pObj, pUsr));
/* we have a valid object, let's finalize our work and return */
- CHKiRet(objConstructFinalize(pObj));
+ if(objConstructFinalize != NULL) {
+ CHKiRet(objConstructFinalize(pObj));
+ }
*((obj_t**) ppObj) = pObj;
diff --git a/runtime/queue.c b/runtime/queue.c
index 1ee34335..99fb5fbd 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -920,7 +920,7 @@ static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg)
{
DEFiRet;
iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL,
- NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgDeserialize);
+ NULL, msgConstructForDeserializer, NULL, MsgDeserialize);
RETiRet;
}
@@ -1340,7 +1340,7 @@ finalize_it:
}
-/* set default inisde queue object suitable for action queues.
+/* set default inside queue object suitable for action queues.
* This shall be called directly after queue construction. This functions has
* been added in support of the new v6 config system. It expect properly pre-initialized
* objects, but we need to differentiate between ruleset main and action queues.
@@ -1374,6 +1374,36 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
}
+/* set defaults inside queue object suitable for main/ruleset queues.
+ * See queueSetDefaultsActionQueue() for more details and background.
+ */
+void
+qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
+{
+ pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
+ pThis->iMaxQueueSize = 50000; /* size of the main message queue above */
+ pThis->iDeqBatchSize = 1024; /* default batch size */
+ pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */
+ pThis->iDiscardMrk = 49500; /* begin to discard messages */
+ pThis->iDiscardSeverity = 8; /* turn off */
+ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
+ pThis->iMaxFileSize = 16*1024*1024;
+ pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
+ pThis->bSyncQueueFiles = 0;
+ pThis->toQShutdown = 1500; /* queue shutdown */
+ pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
+ pThis->toEnq = 2000; /* timeout for queue enque */
+ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
+ pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */
+ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ pThis->sizeOnDiskMax = 0; /* unlimited */
+ pThis->iDeqSlowdown = 0;
+ pThis->iDeqtWinFromHr = 0;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+}
+
+
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
@@ -2655,6 +2685,15 @@ qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
return RS_RET_OK;
}
+
+/* are any queue params set at all? 1 - yes, 0 - no */
+int
+queueCnfParamsSet(struct cnfparamvals *pvals)
+{
+ return cnfparamvalsIsSet(&pblk, pvals);
+}
+
+
/* apply all params from param block to queue. Must be called before
* finalizing. This supports the v6 config system. Defaults were already
* set during queue creation. The pvals object is destructed by this
diff --git a/runtime/queue.h b/runtime/queue.h
index 7db2d90d..886fac8d 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -198,7 +198,9 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
+int queueCnfParamsSet(struct cnfparamvals *pvals);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
+void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
void qqueueSetDefaultsActionQueue(qqueue_t *pThis);
void qqueueDbgPrint(qqueue_t *pThis);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index d15d37d7..d8b81f1b 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -292,6 +292,9 @@ getNOW(eNOWType eNow, es_str_t **estr)
case NOW_MINUTE:
len = snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "%2.2d", t.minute);
break;
+ default:
+ len = snprintf((char*) szBuf, sizeof(szBuf)/sizeof(uchar), "*invld eNow*");
+ break;
}
/* now create a string object out of it and hand that over to the var */
@@ -477,6 +480,9 @@ cnfGetVar(char *name, void *usrptr)
estr = msgGetCEEVarNew((msg_t*) usrptr, name+2);
else
estr = msgGetMsgVarNew((msg_t*) usrptr, (uchar*)name+1);
+ } else { /* if this happens, we have a program logic error */
+ estr = es_newStrFromCStr("err: var must start with $",
+ strlen("err: var must start with $"));
}
if(Debug) {
char *s;
@@ -753,7 +759,7 @@ activateMainQueue()
{
DEFiRet;
/* create message queue */
- CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) {
+ CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), NULL)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
FINALIZE;
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index bc8f5234..c8cb87fd 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -844,7 +844,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal)
rsname = (conf->rulesets.pCurr->pszName == NULL) ? (uchar*) "[ruleset]" : conf->rulesets.pCurr->pszName;
DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname);
- CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname));
+ CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname, NULL));
finalize_it:
RETiRet;
@@ -904,6 +904,7 @@ rsRetVal
rulesetProcessCnf(struct cnfobj *o)
{
struct cnfparamvals *pvals;
+ struct cnfparamvals *queueParams;
rsRetVal localRet;
uchar *rsName = NULL;
uchar *parserName;
@@ -911,6 +912,7 @@ rulesetProcessCnf(struct cnfobj *o)
ruleset_t *pRuleset;
struct cnfarray *ar;
int i;
+ uchar *rsname;
DEFiRet;
pvals = nvlstGetParams(o->nvlst, &rspblk, NULL);
@@ -938,14 +940,21 @@ rulesetProcessCnf(struct cnfobj *o)
/* we have only two params, so we do NOT do the usual param loop */
parserIdx = cnfparamGetIdx(&rspblk, "parser");
- if(parserIdx == -1 || !pvals[parserIdx].bUsed)
- FINALIZE;
+ if(parserIdx != -1 && pvals[parserIdx].bUsed) {
+ ar = pvals[parserIdx].val.d.ar;
+ for(i = 0 ; i < ar->nmemb ; ++i) {
+ parserName = (uchar*)es_str2cstr(ar->arr[i], NULL);
+ doRulesetAddParser(pRuleset, parserName);
+ free(parserName);
+ }
+ }
- ar = pvals[parserIdx].val.d.ar;
- for(i = 0 ; i < ar->nmemb ; ++i) {
- parserName = (uchar*)es_str2cstr(ar->arr[i], NULL);
- doRulesetAddParser(pRuleset, parserName);
- free(parserName);
+ /* pick up ruleset queue parameters */
+ qqueueDoCnfParams(o->nvlst, &queueParams);
+ if(queueCnfParamsSet(queueParams)) {
+ rsname = (pRuleset->pszName == NULL) ? (uchar*) "[ruleset]" : pRuleset->pszName;
+ DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname);
+ CHKiRet(createMainQueue(&pRuleset->pQueue, rsname, queueParams));
}
finalize_it:
diff --git a/template.c b/template.c
index 015d2c04..35b16831 100644
--- a/template.c
+++ b/template.c
@@ -2114,8 +2114,14 @@ void tplPrintList(rsconf_t *conf)
case tplFmtUnixDate:
dbgprintf("[Format as Unix timestamp] ");
break;
+ case tplFmtSecFrac:
+ dbgprintf("[fractional seconds, only] ");
+ break;
+ case tplFmtRFC3164BuggyDate:
+ dbgprintf("[Format as buggy RFC3164-Date] ");
+ break;
default:
- dbgprintf("[INVALID eDateFormat %d] ", pTpe->data.field.eDateFormat);
+ dbgprintf("[UNKNOWN eDateFormat %d] ", pTpe->data.field.eDateFormat);
}
switch(pTpe->data.field.eCaseConv) {
case tplCaseConvNo:
diff --git a/tests/queue-persist-drvr.sh b/tests/queue-persist-drvr.sh
index 53fbcb8b..de597308 100755
--- a/tests/queue-persist-drvr.sh
+++ b/tests/queue-persist-drvr.sh
@@ -24,6 +24,7 @@ source $srcdir/diag.sh check-mainq-spool
echo "#" > work-delay.conf
source $srcdir/diag.sh startup queue-persist.conf
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+./msleep 500
$srcdir/diag.sh wait-shutdown
source $srcdir/diag.sh seq-check 0 4999
source $srcdir/diag.sh exit
diff --git a/tools/pidfile.c b/tools/pidfile.c
index e9601232..8298b94e 100644
--- a/tools/pidfile.c
+++ b/tools/pidfile.c
@@ -55,7 +55,8 @@ int read_pid (char *pidfile)
if (!(f=fopen(pidfile,"r")))
return 0;
- fscanf(f,"%d", &pid);
+ if(fscanf(f,"%d", &pid) != 1)
+ pid = 0;
fclose(f);
return pid;
}
@@ -113,7 +114,8 @@ int write_pid (char *pidfile)
#if HAVE_FLOCK
if (flock(fd, LOCK_EX|LOCK_NB) == -1) {
- fscanf(f, "%d", &pid);
+ if(fscanf(f, "%d", &pid) != 1)
+ pid = 0;
fclose(f);
printf("Can't lock, lock is held by pid %d.\n", pid);
return 0;
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 5a620f80..4be8536d 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -602,7 +602,6 @@ submitMsg2(msg_t *pMsg)
FINALIZE;
}
- MsgPrepareEnqueue(pMsg);
qqueueEnqMsg(pQueue, pMsg->flowCtlType, pMsg);
finalize_it:
@@ -642,10 +641,6 @@ multiSubmitMsg2(multi_submit_t *pMultiSub)
FINALIZE;
}
- for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- MsgPrepareEnqueue(pMultiSub->ppMsgs[i]);
- }
-
iRet = pQueue->MultiEnq(pQueue, pMultiSub);
pMultiSub->nElem = 0;
@@ -727,8 +722,11 @@ static void debug_switch()
* a minimal delay, but it is much cleaner than the approach of doing everything
* inside the signal handler.
* rgerhards, 2005-10-26
- * Note: we do not call DBGPRINTF() as this may cause us to block in case something
- * with the threading is wrong.
+ * Note:
+ * - we do not call DBGPRINTF() as this may cause us to block in case something
+ * with the threading is wrong.
+ * - we do not really care about the return state of write(), but we need this
+ * strange check we do to silence compiler warnings (thanks, Ubuntu!)
*/
static void doDie(int sig)
{
@@ -736,11 +734,13 @@ static void doDie(int sig)
# define MSG2 "DoDie called 5 times - unconditional exit\n"
static int iRetries = 0; /* debug aid */
dbgprintf(MSG1);
- if(Debug == DEBUG_FULL)
- write(1, MSG1, sizeof(MSG1) - 1);
+ if(Debug == DEBUG_FULL) {
+ if(write(1, MSG1, sizeof(MSG1) - 1)) {}
+ }
if(iRetries++ == 4) {
- if(Debug == DEBUG_FULL)
- write(1, MSG2, sizeof(MSG2) - 1);
+ if(Debug == DEBUG_FULL) {
+ if(write(1, MSG2, sizeof(MSG2) - 1)) {}
+ }
abort();
}
bFinished = sig;
@@ -1059,7 +1059,7 @@ finalize_it:
* the time being (remember that we want to restructure config processing at large!).
* rgerhards, 2009-10-27
*/
-rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName)
+rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName, struct cnfparamvals *queueParams)
{
struct queuefilenames_s *qfn;
uchar *qfname = NULL;
@@ -1067,11 +1067,6 @@ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName)
uchar qfrenamebuf[1024];
DEFiRet;
- /* switch the message object to threaded operation, if necessary */
- if(ourConf->globals.mainQ.MainMsgQueType == QUEUETYPE_DIRECT || ourConf->globals.mainQ.iMainMsgQueueNumWorkers > 1) {
- MsgEnableThreadSafety();
- }
-
/* create message queue */
CHKiRet_Hdlr(qqueueConstruct(ppQueue, ourConf->globals.mainQ.MainMsgQueType, ourConf->globals.mainQ.iMainMsgQueueNumWorkers, ourConf->globals.mainQ.iMainMsgQueueSize, msgConsumer)) {
/* no queue is fatal, we need to give up in that case... */
@@ -1080,60 +1075,65 @@ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName)
/* name our main queue object (it's not fatal if it fails...) */
obj.SetName((obj_t*) (*ppQueue), pszQueueName);
- /* ... set some properties ... */
-# define setQPROP(func, directive, data) \
- CHKiRet_Hdlr(func(*ppQueue, data)) { \
- errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
- }
-# define setQPROPstr(func, directive, data) \
- CHKiRet_Hdlr(func(*ppQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
- errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
- }
+ if(queueParams == NULL) { /* use legacy parameters? */
+ /* ... set some properties ... */
+ # define setQPROP(func, directive, data) \
+ CHKiRet_Hdlr(func(*ppQueue, data)) { \
+ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ }
+ # define setQPROPstr(func, directive, data) \
+ CHKiRet_Hdlr(func(*ppQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
+ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ }
- if(ourConf->globals.mainQ.pszMainMsgQFName != NULL) {
- /* check if the queue file name is unique, else emit an error */
- for(qfn = queuefilenames ; qfn != NULL ; qfn = qfn->next) {
- dbgprintf("check queue file name '%s' vs '%s'\n", qfn->name, ourConf->globals.mainQ.pszMainMsgQFName );
- if(!ustrcmp(qfn->name, ourConf->globals.mainQ.pszMainMsgQFName)) {
- snprintf((char*)qfrenamebuf, sizeof(qfrenamebuf), "%d-%s-%s",
- ++qfn_renamenum, ourConf->globals.mainQ.pszMainMsgQFName,
- (pszQueueName == NULL) ? "NONAME" : (char*)pszQueueName);
- qfname = ustrdup(qfrenamebuf);
- errmsg.LogError(0, NO_ERRCODE, "Error: queue file name '%s' already in use "
- " - using '%s' instead", ourConf->globals.mainQ.pszMainMsgQFName, qfname);
- break;
+ if(ourConf->globals.mainQ.pszMainMsgQFName != NULL) {
+ /* check if the queue file name is unique, else emit an error */
+ for(qfn = queuefilenames ; qfn != NULL ; qfn = qfn->next) {
+ dbgprintf("check queue file name '%s' vs '%s'\n", qfn->name, ourConf->globals.mainQ.pszMainMsgQFName );
+ if(!ustrcmp(qfn->name, ourConf->globals.mainQ.pszMainMsgQFName)) {
+ snprintf((char*)qfrenamebuf, sizeof(qfrenamebuf), "%d-%s-%s",
+ ++qfn_renamenum, ourConf->globals.mainQ.pszMainMsgQFName,
+ (pszQueueName == NULL) ? "NONAME" : (char*)pszQueueName);
+ qfname = ustrdup(qfrenamebuf);
+ errmsg.LogError(0, NO_ERRCODE, "Error: queue file name '%s' already in use "
+ " - using '%s' instead", ourConf->globals.mainQ.pszMainMsgQFName, qfname);
+ break;
+ }
}
+ if(qfname == NULL)
+ qfname = ustrdup(ourConf->globals.mainQ.pszMainMsgQFName);
+ qfn = malloc(sizeof(struct queuefilenames_s));
+ qfn->name = qfname;
+ qfn->next = queuefilenames;
+ queuefilenames = qfn;
}
- if(qfname == NULL)
- qfname = ustrdup(ourConf->globals.mainQ.pszMainMsgQFName);
- qfn = malloc(sizeof(struct queuefilenames_s));
- qfn->name = qfname;
- qfn->next = queuefilenames;
- queuefilenames = qfn;
- }
- setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", ourConf->globals.mainQ.iMainMsgQueMaxFileSize);
- setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", ourConf->globals.mainQ.iMainMsgQueMaxDiskSpace);
- setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", ourConf->globals.mainQ.iMainMsgQueDeqBatchSize);
- setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", qfname);
- setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", ourConf->globals.mainQ.iMainMsgQPersistUpdCnt);
- setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", ourConf->globals.mainQ.bMainMsgQSyncQeueFiles);
- setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", ourConf->globals.mainQ.iMainMsgQtoQShutdown );
- setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", ourConf->globals.mainQ.iMainMsgQtoActShutdown);
- setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", ourConf->globals.mainQ.iMainMsgQtoWrkShutdown);
- setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", ourConf->globals.mainQ.iMainMsgQtoEnq);
- setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", ourConf->globals.mainQ.iMainMsgQHighWtrMark);
- setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", ourConf->globals.mainQ.iMainMsgQLowWtrMark);
- setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", ourConf->globals.mainQ.iMainMsgQDiscardMark);
- setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", ourConf->globals.mainQ.iMainMsgQDiscardSeverity);
- setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", ourConf->globals.mainQ.iMainMsgQWrkMinMsgs);
- setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", ourConf->globals.mainQ.bMainMsgQSaveOnShutdown);
- setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", ourConf->globals.mainQ.iMainMsgQDeqSlowdown);
- setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", ourConf->globals.mainQ.iMainMsgQueueDeqtWinFromHr);
- setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", ourConf->globals.mainQ.iMainMsgQueueDeqtWinToHr);
-
-# undef setQPROP
-# undef setQPROPstr
+ setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", ourConf->globals.mainQ.iMainMsgQueMaxFileSize);
+ setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", ourConf->globals.mainQ.iMainMsgQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", ourConf->globals.mainQ.iMainMsgQueDeqBatchSize);
+ setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", qfname);
+ setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", ourConf->globals.mainQ.iMainMsgQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", ourConf->globals.mainQ.bMainMsgQSyncQeueFiles);
+ setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", ourConf->globals.mainQ.iMainMsgQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", ourConf->globals.mainQ.iMainMsgQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", ourConf->globals.mainQ.iMainMsgQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", ourConf->globals.mainQ.iMainMsgQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", ourConf->globals.mainQ.iMainMsgQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", ourConf->globals.mainQ.iMainMsgQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", ourConf->globals.mainQ.iMainMsgQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", ourConf->globals.mainQ.iMainMsgQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", ourConf->globals.mainQ.iMainMsgQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", ourConf->globals.mainQ.bMainMsgQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", ourConf->globals.mainQ.iMainMsgQDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", ourConf->globals.mainQ.iMainMsgQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", ourConf->globals.mainQ.iMainMsgQueueDeqtWinToHr);
+
+ # undef setQPROP
+ # undef setQPROPstr
+ } else { /* use new style config! */
+ qqueueSetDefaultsRulesetQueue(*ppQueue);
+ qqueueApplyCnfParam(*ppQueue, queueParams);
+ }
/* ... and finally start the queue! */
CHKiRet_Hdlr(qqueueStart(*ppQueue)) {