summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog13
-rw-r--r--configure.ac2
-rw-r--r--doc/impstats.html7
-rw-r--r--doc/mmpstrucdata.html23
-rw-r--r--plugins/mmpstrucdata/mmpstrucdata.c6
-rw-r--r--runtime/queue.c122
-rw-r--r--runtime/wti.c30
-rw-r--r--tests/Makefile.am15
-rw-r--r--tests/chkseq.c29
-rwxr-xr-xtests/mmpstrucdata.sh12
-rwxr-xr-xtests/rfc5424parser.sh12
-rw-r--r--tests/tcpflood.c28
-rw-r--r--tests/testsuites/mmpstrucdata.conf12
-rw-r--r--tests/testsuites/rfc5424parser.conf10
-rw-r--r--tests/testsuites/stop-localvar.conf8
15 files changed, 279 insertions, 50 deletions
diff --git a/ChangeLog b/ChangeLog
index 702662ad..bdeda7de 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -3,7 +3,13 @@ Version 8.1.2 [devel] 2013-11-??
- queue defaults have changed
* high water mark is now dynamically 90% of queue size
* low water makr is now dynamically 70% of queue size
+ * queue.discardMark is now dynamically 98% of queue size
* queue.workerThreadMinimumMessage set to queue.size / num workers
+ For queues with very low queue.maxSize (< 100), "emergency" defaults
+ will be used.
+- bugfix: disk queues created files in wrong working directory
+ if the $WorkDirectory was changed multiple times, all queues only
+ used the last value set.
- bugfix: legacy directive $ActionQueueWorkerThreads was not honored
---------------------------------------------------------------------------
Version 8.1.1 [devel] 2013-11-19
@@ -50,7 +56,11 @@ Version 7.5.7 [v7-devel] 2013-11-??
- queue defaults have changed
* high water mark is now dynamically 90% of queue size
* low water makr is now dynamically 70% of queue size
+ * queue.discardMark is now dynamically 98% of queue size
* queue.workerThreadMinimumMessage set to queue.size / num workers
+ For queues with very low queue.maxSize (< 100), "emergency" defaults
+ will be used.
+- bugfix: mmpstrucdata generated inaccessible properties
- bugfix: RainerScript optimizer did not optimize PRI filters
things like "if $syslogfacility-text == "local3"" were not converted
to PRIFILT. This was a regression introduced in 7.5.6.
@@ -277,6 +287,9 @@ Version 7.5.0 [devel] 2013-06-11
Thanks to Axel Rau for the patch.
---------------------------------------------------------------------------
Version 7.4.7 [v7.4-stable] 2013-11-??
+- bugfix: disk queues created files in wrong working directory
+ if the $WorkDirectory was changed multiple times, all queues only
+ used the last value set.
- bugfix: legacy directive $ActionQueueWorkerThreads was not honored
- bugfix: segfault on startup when certain script constructs are used
e.g. "if not $msg ..."
diff --git a/configure.ac b/configure.ac
index c79a74ce..8f74ebc7 100644
--- a/configure.ac
+++ b/configure.ac
@@ -932,7 +932,7 @@ AC_ARG_ENABLE(mmnormalize,
)
if test "x$enable_mmnormalize" = "xyes"; then
PKG_CHECK_MODULES(LIBEE, libee >= 0.4.0)
- PKG_CHECK_MODULES(LIBLOGNORM, lognorm >= 0.3.1)
+ PKG_CHECK_MODULES(LIBLOGNORM, lognorm >= 0.3.1 lognorm < 1.0.0)
fi
AM_CONDITIONAL(ENABLE_MMNORMALIZE, test x$enable_mmnormalize = xyes)
diff --git a/doc/impstats.html b/doc/impstats.html
index 3b206941..c768dbf4 100644
--- a/doc/impstats.html
+++ b/doc/impstats.html
@@ -24,6 +24,13 @@ settings, this impact may be noticeable (for high-load environments).
<p>The rsyslog website has an updated overview of available
<a href="http://rsyslog.com/rsyslog-statistic-counter/">rsyslog statistic counters</a>.
</p>
+<p><b>Note that there is a
+<a href="http://www.rsyslog.com/impstats-analyzer/">rsyslog statistics
+online analyzer</a> available.</b> It can be given a impstats-generated file and
+will return problems it detects. Note that the analyzer cannot replace a
+human in getting things right, but it is expected to be a good aid in starting
+to understand and gain information from the pstats logs.
+<7p>
<p><b>Module Confguration Parameters</b>:</p>
<p>This module supports module parameters, only.
<ul>
diff --git a/doc/mmpstrucdata.html b/doc/mmpstrucdata.html
index b4003062..8197d94a 100644
--- a/doc/mmpstrucdata.html
+++ b/doc/mmpstrucdata.html
@@ -13,6 +13,7 @@
<p><b>Description</b>:</p>
<p>The mmpstrucdata parses RFC5424 structured data into the message
json variable tree.
+The data parsed, if available, is stored under "jsonRoot!rfc5424-sd!...".
<p>&nbsp;</p>
<p><b>Module Configuration Parameters</b>:</p>
@@ -33,6 +34,10 @@ Specifies into which json container the data shall be parsed to.
<p><b>Caveats/Known Bugs:</b>
<ul>
<li>this module is currently experimental; feedback is appreciated
+<li>property names are treated case-insensitive in rsyslog. As such,
+RFC5424 names are treated case-insensitive as well. If such names
+only differ in case (what is not recommended anyways), problems will
+occur.
<li>structured data with duplicate SD-IDs and SD-PARAMS is not
properly processed
</ul>
@@ -48,6 +53,24 @@ template(name="jsondump" type="string" string="%msg%: %$!%\n")
action(type="omfile" file="/path/to/log" template="jsondump")
</textarea>
+<p><b>A more practical one:</b>
+<p>Take this example message (inspired by RFC5424 sample;)):
+<p><code><34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][id@2 test="tast"] BOM'su root' failed for lonvick on /dev/pts/8</code>
+<p>We apply this configuration:
+<p><textarea rows="6" cols="120">module(load="mmpstrucdata")
+action(type="mmpstrucdata")
+template(name="sample2" type="string"
+ string="ALL: %$!%\nSD: %$!RFC5424-SD%\nIUT:%$!rfc5424-sd!exampleSDID@32473!iut%\nRAWMSG: %rawmsg%\n\n")
+action(type="omfile" file="/path/to/log" template="sample2")
+</textarea>
+<p>This will output:
+<p><code>ALL: { "rfc5424-sd": { "examplesdid@32473": { "iut": "3", "eventsource": "Application", "eventid": "1011" }, "id@2": { "test": "tast" } } }</br>
+SD: { "examplesdid@32473": { "iut": "3", "eventsource": "Application", "eventid": "1011" }, "id@2": { "test": "tast" } }</br>
+IUT:3</br>
+RAWMSG: <34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][id@2 test="tast"] BOM'su root' failed for lonvick on /dev/pts/8</code>
+<p>As you can seem, you can address each of the individual items. Note that the
+case of the RFC5424 parameter names has been converted to lower case.
+
<p>[<a href="rsyslog_conf.html">rsyslog.conf overview</a>] [<a href="manual.html">manual
index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p>
<p><font size="2">This documentation is part of the
diff --git a/plugins/mmpstrucdata/mmpstrucdata.c b/plugins/mmpstrucdata/mmpstrucdata.c
index e9e36b27..680ba92b 100644
--- a/plugins/mmpstrucdata/mmpstrucdata.c
+++ b/plugins/mmpstrucdata/mmpstrucdata.c
@@ -31,6 +31,7 @@
#include <errno.h>
#include <unistd.h>
#include <stdint.h>
+#include <ctype.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
@@ -218,7 +219,8 @@ dbgprintf("DDDD: parseSD_NAME %s\n", sdbuf+*curridx);
if( sdbuf[i] == '=' || sdbuf[i] == '"'
|| sdbuf[i] == ']' || sdbuf[i] == ' ')
break;
- namebuf[j] = sdbuf[i++];
+ namebuf[j] = tolower(sdbuf[i]);
+ ++i;
}
namebuf[j] = '\0';
dbgprintf("DDDD: parseSD_NAME, NAME: '%s'\n", namebuf);
@@ -349,7 +351,7 @@ dbgprintf("DDDD: json: '%s'\n", json_object_get_string(json));
if(jroot == NULL) {
ABORT_FINALIZE(RS_RET_ERR);
}
- json_object_object_add(jroot, "RFC5424-SD", json);
+ json_object_object_add(jroot, "rfc5424-sd", json);
msgAddJSON(pMsg, pData->jsonRoot, jroot);
finalize_it:
RETiRet;
diff --git a/runtime/queue.c b/runtime/queue.c
index 5770eae9..ff1b30f4 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -59,6 +59,7 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "statsobj.h"
+#include "parserif.h"
#ifdef OS_SOLARIS
# include <sched.h>
@@ -86,6 +87,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDestructDisk(qqueue_t *pThis);
+rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -94,6 +96,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis);
/* tables for interfacing with the v6 config system */
static struct cnfparamdescr cnfpdescr[] = {
{ "queue.filename", eCmdHdlrGetWord, 0 },
+ { "queue.spooldirectory", eCmdHdlrGetWord, 0 },
{ "queue.size", eCmdHdlrSize, 0 },
{ "queue.dequeuebatchsize", eCmdHdlrInt, 0 },
{ "queue.maxdiskspace", eCmdHdlrSize, 0 },
@@ -417,6 +420,7 @@ StartDA(qqueue_t *pThis)
CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(qqueueSetSpoolDir(pThis->pqDA, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
@@ -733,7 +737,7 @@ qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pTh
DEFiRet;
ISOBJ_TYPE_assert(pStrm, strm);
ISOBJ_TYPE_assert(pThis, qqueue);
- CHKiRet(strm.SetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pStrm, pThis->pszSpoolDir, pThis->lenSpoolDir));
finalize_it:
RETiRet;
}
@@ -840,7 +844,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
} else {
CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite));
CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
@@ -852,7 +856,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
@@ -865,7 +869,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel));
CHKiRet(strm.SetbSync(pThis->tVars.disk.pReadDel, pThis->bSyncQueueFiles));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR));
@@ -1359,7 +1363,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
pThis->iDeqBatchSize = 128; /* default batch size */
pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
- pThis->iDiscardMrk = 980; /* begin to discard messages */
+ pThis->iDiscardMrk = -1; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
pThis->iMaxFileSize = 1024*1024;
@@ -1389,7 +1393,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
pThis->iDeqBatchSize = 1024; /* default batch size */
pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
- pThis->iDiscardMrk = 49500; /* begin to discard messages */
+ pThis->iDiscardMrk = -1; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
pThis->iMaxFileSize = 16*1024*1024;
@@ -1680,6 +1684,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
/* The rate limiter
*
+ * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when
+ * it actually delays processing. Otherwise inputs are stalled.
+ *
* Here we may wait if a dequeue time window is defined or if we are
* rate-limited. TODO: If we do so, we should also look into the
* way new worker threads are spawned. Obviously, it doesn't make much
@@ -1765,8 +1772,10 @@ RateLimiter(qqueue_t *pThis)
}
if(iDelay > 0) {
+ pthread_mutex_unlock(pThis->mut);
DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
srSleep(iDelay, 0);
+ pthread_mutex_lock(pThis->mut);
}
RETiRet;
@@ -2076,7 +2085,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
/* pre-construct file name for .qi file */
pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar),
- "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
+ "%s/%s.qi", (char*) pThis->pszSpoolDir, (char*)pThis->pszFilePrefix);
pThis->pszQIFNam = ustrdup(pszQIFNam);
DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam,
(int) pThis->lenQIFNam);
@@ -2124,21 +2133,68 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
}
- if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize)
+ if(pThis->iDiscardMrk > pThis->iMaxQueueSize) {
+ errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": "
+ "queue.discardMark %d is set larger than queue.size",
+ obj.GetName((obj_t*) pThis), pThis->iDiscardMrk);
+ }
+
+ goodval = (pThis->iMaxQueueSize / 100) * 80;
+ if(pThis->iDiscardMrk != -1 && pThis->iDiscardMrk < goodval) {
+ errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": queue.discardMark "
+ "is set quite low at %d. You should only set it below "
+ "80%% (%d) if you have a good reason for this.",
+ obj.GetName((obj_t*) pThis), pThis->iDiscardMrk, goodval);
+ }
+
+
+ /* now come parameter corrections and defaults */
+ if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) {
pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90;
+ if(pThis->iHighWtrMrk == 0) { /* guard against very low max queue sizes! */
+ pThis->iHighWtrMrk = pThis->iMaxQueueSize;
+ }
+ }
if( pThis->iLowWtrMrk < 2
|| pThis->iLowWtrMrk > pThis->iMaxQueueSize
- || pThis->iLowWtrMrk > pThis->iHighWtrMrk )
+ || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) {
pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70;
+ if(pThis->iLowWtrMrk == 0) {
+ pThis->iLowWtrMrk = 1;
+ }
+ }
+
if( pThis->iMinMsgsPerWrkr < 1
- || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize )
+ || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) {
pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads;
- if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize)
+ }
+
+ if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) {
pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97;
- if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize)
+ if(pThis->iFullDlyMrk == 0) {
+ pThis->iFullDlyMrk =
+ (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1;
+ }
+ }
+ if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) {
pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70;
- if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize)
+ if(pThis->iLightDlyMrk == 0) {
+ pThis->iLightDlyMrk =
+ (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1;
+ }
+ }
+
+ if(pThis->iDiscardMrk < 1 || pThis->iDiscardMrk > pThis->iMaxQueueSize) {
+ pThis->iDiscardMrk = (pThis->iMaxQueueSize / 100) * 98;
+ if(pThis->iDiscardMrk == 0) {
+ /* for very small queues, we disable this by default */
+ pThis->iDiscardMrk = pThis->iMaxQueueSize;
+ }
+ }
+
+ if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) {
pThis->iDeqBatchSize = pThis->iMaxQueueSize;
+ }
/* finalize some initializations that could not yet be done because it is
* influenced by properties which might have been set after queueConstruct ()
@@ -2170,14 +2226,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iFullDlyMrk = wrk;
}
- DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, "
- "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, "
- "max wrkr %d, min msgs f. wrkr %d\n",
- pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize,
+ DBGOPRINT((obj_t*) pThis, "params: type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, "
+ "maxQSize %d, lqsize %d, pqsize %d, child %d, full delay %d, "
+ "light delay %d, deq batch size %d, high wtrmrk %d, low wtrmrk %d, "
+ "discardmrk %d, max wrkr %d, min msgs f. wrkr %d\n",
+ pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir,
+ pThis->iMaxFileSize, pThis->iMaxQueueSize,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk,
pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk,
- pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr);
+ pThis->iDiscardMrk, pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr);
pThis->bQueueStarted = 1;
if(pThis->qType == QUEUETYPE_DIRECT)
@@ -2484,6 +2542,24 @@ CODESTARTobjDestruct(qqueue)
ENDobjDestruct(qqueue)
+/* set the queue's spool directory. The directory MUST NOT be NULL.
+ * The passed-in string is duplicated. So if the caller does not need
+ * it any longer, it must free it.
+ */
+rsRetVal
+qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir)
+{
+ DEFiRet;
+
+ free(pThis->pszSpoolDir);
+ CHKmalloc(pThis->pszSpoolDir = ustrdup(pszSpoolDir));
+ pThis->lenSpoolDir = lenSpoolDir;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* set the queue's file prefix
* The passed-in string is duplicated. So if the caller does not need
* it any longer, it must free it.
@@ -2834,6 +2910,16 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst)
pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
} else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) {
pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(pblk.descr[i].name, "queue.spooldirectory")) {
+ free(pThis->pszSpoolDir);
+ pThis->pszSpoolDir = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ pThis->lenSpoolDir = es_strlen(pvals[i].val.d.estr);
+ if(pThis->pszSpoolDir[pThis->lenSpoolDir-1] == '/') {
+ pThis->pszSpoolDir[pThis->lenSpoolDir-1] = '\0';
+ --pThis->lenSpoolDir;
+ parser_errmsg("queue.spooldirectory must not end with '/', "
+ "corrected to '%s'", pThis->pszSpoolDir);
+ }
} else if(!strcmp(pblk.descr[i].name, "queue.size")) {
pThis->iMaxQueueSize = pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {
diff --git a/runtime/wti.c b/runtime/wti.c
index bbefc537..c02d0573 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -303,15 +303,26 @@ wtiWorker(wti_t *pThis)
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("DDDD: wti %p: worker starting\n", pThis);
-
/* now we have our identity, on to real processing */
- while(1) { /* loop will be broken below - need to do mutex locks */
+
+ /* note: in this loop, the mutex is "never" unlocked. Of course,
+ * this is not true: it actually is unlocked when the actual processing
+ * is done, as part of pWtp->pfDoWork() processing. Note that this
+ * function is required to re-lock it when done. We cannot do the
+ * lock/unlock here ourselfs, as pfDoWork() needs to access queue
+ * structures itself.
+ * The same goes for pfRateLimiter(). While we could unlock/lock when
+ * we call it, in practice the function is often called without any
+ * ratelimiting actually done. Only the rate limiter itself knows
+ * that. As such, it needs to bear the burden of doing the locking
+ * when required. -- rgerhards, 2013-11-20
+ */
+ d_pthread_mutex_lock(pWtp->pmutUsr);
+ while(1) { /* loop will be broken below */
if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
- d_pthread_mutex_lock(pWtp->pmutUsr);
-
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
if(terminateRet == RS_RET_TERMINATE_NOW) {
@@ -319,36 +330,29 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis);
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
localRet);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
/* try to execute and process whatever we have */
- /* Note that this function releases and re-aquires the mutex. The returned
- * information on idle state must be processed before releasing the mutex again.
- */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
} else if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n",
terminateRet, bInactivityTOOccured);
break; /* end of loop */
}
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
- d_pthread_mutex_unlock(pWtp->pmutUsr);
-
bInactivityTOOccured = 0; /* reset for next run */
}
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+
DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis);
for(i = 0 ; i < iActionNbr ; ++i) {
dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData);
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a2548a68..5232e3ef 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -7,6 +7,8 @@ TESTS = $(TESTRUNS)
if ENABLE_IMDIAG
TESTS += \
stop-localvar.sh \
+ stop-msgvar.sh \
+ rfc5424parser.sh \
arrayqueue.sh \
global_vars.sh \
da-mainmsg-q.sh \
@@ -122,6 +124,11 @@ TESTS += \
imptcp_conndrop.sh
endif
+if ENABLE_MMPSTRUCDATA
+TESTS += \
+ mmpstrucdata.sh
+endif
+
if ENABLE_GNUTLS
# TODO: re-enable in newer version
#TESTS += \
@@ -298,10 +305,14 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
testsuites/stop.conf \
stop-localvar.sh \
testsuites/stop-localvar.conf \
+ stop-msgvar.sh \
+ testsuites/stop-msgvar.conf \
global_vars.sh \
testsuites/global_vars.conf \
+ rfc5424parser.sh \
+ testsuites/rfc5424parser.conf \
rs_optimizer_pri.sh \
- testsuites/rs_optimizer_pr.conf \
+ testsuites/rs_optimizer_pri.conf \
rscript_prifilt.sh \
testsuites/rscript_prifilt.conf \
rscript_optimizer1.sh \
@@ -526,6 +537,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
mysql-asyn.sh \
mysql-asyn-vg.sh \
testsuites/mysql-asyn.conf \
+ mmpstrucdata.sh \
+ testsuites/mmpstrucdata.conf \
cfg.sh
# TODO: re-enable
diff --git a/tests/chkseq.c b/tests/chkseq.c
index bea9f83a..bd8597e8 100644
--- a/tests/chkseq.c
+++ b/tests/chkseq.c
@@ -51,6 +51,7 @@ int main(int argc, char *argv[])
int reachedEOF;
int edLen; /* length of extra data */
static char edBuf[500*1024]; /* buffer for extra data (pretty large to be on the save side...) */
+ static char ioBuf[sizeof(edBuf)+1024];
char *file = NULL;
while((opt = getopt(argc, argv, "e:f:ds:vE")) != EOF) {
@@ -103,14 +104,22 @@ int main(int argc, char *argv[])
for(i = start ; i < end+1 ; ++i) {
if(bHaveExtraData) {
- scanfOK = fscanf(fp, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0;
+ if(fgets(ioBuf, sizeof(ioBuf), fp) == NULL) {
+ scanfOK = 0;
+ } else {
+ scanfOK = sscanf(ioBuf, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0;
+ }
if(edLen != (int) strlen(edBuf)) {
printf("extra data length specified %d, but actually is %ld in record %d\n",
edLen, (long) strlen(edBuf), i);
exit(1);
}
} else {
- scanfOK = fscanf(fp, "%d\n", &val) == 1 ? 1 : 0;
+ if(fgets(ioBuf, sizeof(ioBuf), fp) == NULL) {
+ scanfOK = 0;
+ } else {
+ scanfOK = sscanf(ioBuf, "%d\n", &val) == 1 ? 1 : 0;
+ }
}
if(!scanfOK) {
printf("scanf error in index i=%d\n", i);
@@ -132,9 +141,11 @@ int main(int argc, char *argv[])
exit(1);
}
- if(feof(fp)) {
+ int c = getc(fp);
+ if(c == EOF) {
reachedEOF = 1;
} else {
+ ungetc(c, fp);
/* if duplicates are permitted, we need to do a final check if we have duplicates at the
* end of file.
*/
@@ -142,14 +153,22 @@ int main(int argc, char *argv[])
i = end;
while(!feof(fp)) {
if(bHaveExtraData) {
- scanfOK = fscanf(fp, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0;
+ if(fgets(ioBuf, sizeof(ioBuf), fp) == NULL) {
+ scanfOK = 0;
+ } else {
+ scanfOK = sscanf(ioBuf, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0;
+ }
if(edLen != (int) strlen(edBuf)) {
printf("extra data length specified %d, but actually is %ld in record %d\n",
edLen, (long) strlen(edBuf), i);
exit(1);
}
} else {
- scanfOK = fscanf(fp, "%d\n", &val) == 1 ? 1 : 0;
+ if(fgets(ioBuf, sizeof(ioBuf), fp) == NULL) {
+ scanfOK = 0;
+ } else {
+ scanfOK = sscanf(ioBuf, "%d\n", &val) == 1 ? 1 : 0;
+ }
}
if(val != i) {
diff --git a/tests/mmpstrucdata.sh b/tests/mmpstrucdata.sh
new file mode 100755
index 00000000..62b6ba96
--- /dev/null
+++ b/tests/mmpstrucdata.sh
@@ -0,0 +1,12 @@
+# This file is part of the rsyslog project, released under ASL 2.0
+# rgerhards, 2013-11-22
+echo ===============================================================================
+echo \[mmpstrucdata.sh\]: testing mmpstrucdata
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup mmpstrucdata.conf
+sleep 1
+source $srcdir/diag.sh tcpflood -m100 -y
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh wait-shutdown
+source $srcdir/diag.sh seq-check 0 99
+source $srcdir/diag.sh exit
diff --git a/tests/rfc5424parser.sh b/tests/rfc5424parser.sh
new file mode 100755
index 00000000..3f5be497
--- /dev/null
+++ b/tests/rfc5424parser.sh
@@ -0,0 +1,12 @@
+# This file is part of the rsyslog project, released under ASL 2.0
+# rgerhards, 2013-11-22
+echo ===============================================================================
+echo \[rfc5424parser.sh\]: testing mmpstrucdata
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup rfc5424parser.conf
+sleep 1
+source $srcdir/diag.sh tcpflood -m100 -y
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh wait-shutdown
+source $srcdir/diag.sh seq-check 0 99
+source $srcdir/diag.sh exit
diff --git a/tests/tcpflood.c b/tests/tcpflood.c
index b3cef2e0..f17363f2 100644
--- a/tests/tcpflood.c
+++ b/tests/tcpflood.c
@@ -48,13 +48,14 @@
* -b number of messages within a batch (default: 100,000,000 millions)
* -Y use multiple threads, one per connection (which means 1 if one only connection
* is configured!)
+ * -y use RFC5424 style test message
* -z private key file for TLS mode
* -Z cert (public key) file for TLS mode
* -L loglevel to use for GnuTLS troubleshooting (0-off to 10-all, 0 default)
*
* Part of the testbench for rsyslog.
*
- * Copyright 2009, 2010 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2009, 2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -111,6 +112,7 @@ static int targetPort = 13514;
static int numTargetPorts = 1;
static int dynFileIDs = 0;
static int extraDataLen = 0; /* amount of extra data to add to message */
+static int useRFC5424Format = 0; /* should the test message be in RFC5424 format? */
static int bRandomizeExtraData = 0; /* randomize amount of extra data added */
static int numMsgsToSend; /* number of messages to send */
static unsigned numConnections = 1; /* number of connections to create */
@@ -363,8 +365,14 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst)
snprintf(dynFileIDBuf, sizeof(dynFileIDBuf), "%d:", rand() % dynFileIDs);
}
if(extraDataLen == 0) {
- *pLenBuf = snprintf(buf, maxBuf, "<%s>Mar 1 01:00:00 172.20.245.8 tag msgnum:%s%8.8d:%c",
- msgPRI, dynFileIDBuf, msgNum, frameDelim);
+ if(useRFC5424Format) {
+ *pLenBuf = snprintf(buf, maxBuf, "<%s>1 2003-03-01T01:00:00.000Z mymachine.example.com tcpflood "
+ "- tag [tcpflood@32473 MSGNUM=\"%8.8d\"] msgnum:%s%8.8d:%c",
+ msgPRI, msgNum, dynFileIDBuf, msgNum, frameDelim);
+ } else {
+ *pLenBuf = snprintf(buf, maxBuf, "<%s>Mar 1 01:00:00 172.20.245.8 tag msgnum:%s%8.8d:%c",
+ msgPRI, dynFileIDBuf, msgNum, frameDelim);
+ }
} else {
if(bRandomizeExtraData)
edLen = ((long) rand() + extraDataLen) % extraDataLen + 1;
@@ -372,8 +380,14 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst)
edLen = extraDataLen;
memset(extraData, 'X', edLen);
extraData[edLen] = '\0';
- *pLenBuf = snprintf(buf, maxBuf, "<%s>Mar 1 01:00:00 172.20.245.8 tag msgnum:%s%8.8d:%d:%s%c",
- msgPRI, dynFileIDBuf, msgNum, edLen, extraData, frameDelim);
+ if(useRFC5424Format) {
+ *pLenBuf = snprintf(buf, maxBuf, "<%s>1 2003-03-01T01:00:00.000Z mymachine.example.com tcpflood "
+ "- tag [tcpflood@32473 MSGNUM=\"%8.8d\"] msgnum:%s%8.8d:%c",
+ msgPRI, msgNum, dynFileIDBuf, msgNum, frameDelim);
+ } else {
+ *pLenBuf = snprintf(buf, maxBuf, "<%s>Mar 1 01:00:00 172.20.245.8 tag msgnum:%s%8.8d:%d:%s%c",
+ msgPRI, dynFileIDBuf, msgNum, edLen, extraData, frameDelim);
+ }
}
} else {
/* use fixed message format from command line */
@@ -830,7 +844,7 @@ int main(int argc, char *argv[])
setvbuf(stdout, buf, _IONBF, 48);
- while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:L:M:rsBR:S:T:XW:Yz:Z:")) != -1) {
+ while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:L:M:rsBR:S:T:XW:yYz:Z:")) != -1) {
switch (opt) {
case 'b': batchsize = atoll(optarg);
break;
@@ -908,6 +922,8 @@ int main(int argc, char *argv[])
break;
case 'Y': runMultithreaded = 1;
break;
+ case 'y': useRFC5424Format = 1;
+ break;
case 'z': tlsKeyFile = optarg;
break;
case 'Z': tlsCertFile = optarg;
diff --git a/tests/testsuites/mmpstrucdata.conf b/tests/testsuites/mmpstrucdata.conf
new file mode 100644
index 00000000..fd18fd99
--- /dev/null
+++ b/tests/testsuites/mmpstrucdata.conf
@@ -0,0 +1,12 @@
+$IncludeConfig diag-common.conf
+
+module(load="../plugins/mmpstrucdata/.libs/mmpstrucdata")
+module(load="../plugins/imtcp/.libs/imtcp")
+
+template(name="outfmt" type="string" string="%$!rfc5424-sd!tcpflood@32473!msgnum%\n")
+
+input(type="imtcp" port="13514")
+
+action(type="mmpstrucdata")
+if $msg contains "msgnum" then
+ action(type="omfile" template="outfmt" file="rsyslog.out.log")
diff --git a/tests/testsuites/rfc5424parser.conf b/tests/testsuites/rfc5424parser.conf
new file mode 100644
index 00000000..cd90d120
--- /dev/null
+++ b/tests/testsuites/rfc5424parser.conf
@@ -0,0 +1,10 @@
+$IncludeConfig diag-common.conf
+
+module(load="../plugins/imtcp/.libs/imtcp")
+
+template(name="outfmt" type="string" string="%msg:F,58:2%\n")
+
+input(type="imtcp" port="13514")
+
+if $msg contains "msgnum" then
+ action(type="omfile" template="outfmt" file="rsyslog.out.log")
diff --git a/tests/testsuites/stop-localvar.conf b/tests/testsuites/stop-localvar.conf
index 020ebd87..63df6509 100644
--- a/tests/testsuites/stop-localvar.conf
+++ b/tests/testsuites/stop-localvar.conf
@@ -5,17 +5,17 @@
* rgerhards, 2013-11-19
*/
$IncludeConfig diag-common.conf
-template(name="outfmt" type="string" string="%$!nbr%\n")
+template(name="outfmt" type="string" string="%$.nbr%\n")
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="13514")
if $msg contains "msgnum:" then {
- set $!nbr = field($msg, 58, 2);
- if cnum($!nbr) < 100 then
+ set $.nbr = field($msg, 58, 2);
+ if cnum($.nbr) < 100 then
stop
/* check is intentionally more complex than needed! */
- else if not (cnum($!nbr) > 999) then {
+ else if not (cnum($.nbr) > 999) then {
action(type="omfile" file="rsyslog.out.log" template="outfmt")
}
}