diff options
-rw-r--r-- | ChangeLog | 13 | ||||
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | doc/impstats.html | 7 | ||||
-rw-r--r-- | doc/mmpstrucdata.html | 23 | ||||
-rw-r--r-- | plugins/mmpstrucdata/mmpstrucdata.c | 6 | ||||
-rw-r--r-- | runtime/queue.c | 122 | ||||
-rw-r--r-- | runtime/wti.c | 30 | ||||
-rw-r--r-- | tests/Makefile.am | 15 | ||||
-rw-r--r-- | tests/chkseq.c | 29 | ||||
-rwxr-xr-x | tests/mmpstrucdata.sh | 12 | ||||
-rwxr-xr-x | tests/rfc5424parser.sh | 12 | ||||
-rw-r--r-- | tests/tcpflood.c | 28 | ||||
-rw-r--r-- | tests/testsuites/mmpstrucdata.conf | 12 | ||||
-rw-r--r-- | tests/testsuites/rfc5424parser.conf | 10 | ||||
-rw-r--r-- | tests/testsuites/stop-localvar.conf | 8 |
15 files changed, 279 insertions, 50 deletions
@@ -3,7 +3,13 @@ Version 8.1.2 [devel] 2013-11-?? - queue defaults have changed * high water mark is now dynamically 90% of queue size * low water makr is now dynamically 70% of queue size + * queue.discardMark is now dynamically 98% of queue size * queue.workerThreadMinimumMessage set to queue.size / num workers + For queues with very low queue.maxSize (< 100), "emergency" defaults + will be used. +- bugfix: disk queues created files in wrong working directory + if the $WorkDirectory was changed multiple times, all queues only + used the last value set. - bugfix: legacy directive $ActionQueueWorkerThreads was not honored --------------------------------------------------------------------------- Version 8.1.1 [devel] 2013-11-19 @@ -50,7 +56,11 @@ Version 7.5.7 [v7-devel] 2013-11-?? - queue defaults have changed * high water mark is now dynamically 90% of queue size * low water makr is now dynamically 70% of queue size + * queue.discardMark is now dynamically 98% of queue size * queue.workerThreadMinimumMessage set to queue.size / num workers + For queues with very low queue.maxSize (< 100), "emergency" defaults + will be used. +- bugfix: mmpstrucdata generated inaccessible properties - bugfix: RainerScript optimizer did not optimize PRI filters things like "if $syslogfacility-text == "local3"" were not converted to PRIFILT. This was a regression introduced in 7.5.6. @@ -277,6 +287,9 @@ Version 7.5.0 [devel] 2013-06-11 Thanks to Axel Rau for the patch. --------------------------------------------------------------------------- Version 7.4.7 [v7.4-stable] 2013-11-?? +- bugfix: disk queues created files in wrong working directory + if the $WorkDirectory was changed multiple times, all queues only + used the last value set. - bugfix: legacy directive $ActionQueueWorkerThreads was not honored - bugfix: segfault on startup when certain script constructs are used e.g. "if not $msg ..." diff --git a/configure.ac b/configure.ac index c79a74ce..8f74ebc7 100644 --- a/configure.ac +++ b/configure.ac @@ -932,7 +932,7 @@ AC_ARG_ENABLE(mmnormalize, ) if test "x$enable_mmnormalize" = "xyes"; then PKG_CHECK_MODULES(LIBEE, libee >= 0.4.0) - PKG_CHECK_MODULES(LIBLOGNORM, lognorm >= 0.3.1) + PKG_CHECK_MODULES(LIBLOGNORM, lognorm >= 0.3.1 lognorm < 1.0.0) fi AM_CONDITIONAL(ENABLE_MMNORMALIZE, test x$enable_mmnormalize = xyes) diff --git a/doc/impstats.html b/doc/impstats.html index 3b206941..c768dbf4 100644 --- a/doc/impstats.html +++ b/doc/impstats.html @@ -24,6 +24,13 @@ settings, this impact may be noticeable (for high-load environments). <p>The rsyslog website has an updated overview of available <a href="http://rsyslog.com/rsyslog-statistic-counter/">rsyslog statistic counters</a>. </p> +<p><b>Note that there is a +<a href="http://www.rsyslog.com/impstats-analyzer/">rsyslog statistics +online analyzer</a> available.</b> It can be given a impstats-generated file and +will return problems it detects. Note that the analyzer cannot replace a +human in getting things right, but it is expected to be a good aid in starting +to understand and gain information from the pstats logs. +<7p> <p><b>Module Confguration Parameters</b>:</p> <p>This module supports module parameters, only. <ul> diff --git a/doc/mmpstrucdata.html b/doc/mmpstrucdata.html index b4003062..8197d94a 100644 --- a/doc/mmpstrucdata.html +++ b/doc/mmpstrucdata.html @@ -13,6 +13,7 @@ <p><b>Description</b>:</p> <p>The mmpstrucdata parses RFC5424 structured data into the message json variable tree. +The data parsed, if available, is stored under "jsonRoot!rfc5424-sd!...". <p> </p> <p><b>Module Configuration Parameters</b>:</p> @@ -33,6 +34,10 @@ Specifies into which json container the data shall be parsed to. <p><b>Caveats/Known Bugs:</b> <ul> <li>this module is currently experimental; feedback is appreciated +<li>property names are treated case-insensitive in rsyslog. As such, +RFC5424 names are treated case-insensitive as well. If such names +only differ in case (what is not recommended anyways), problems will +occur. <li>structured data with duplicate SD-IDs and SD-PARAMS is not properly processed </ul> @@ -48,6 +53,24 @@ template(name="jsondump" type="string" string="%msg%: %$!%\n") action(type="omfile" file="/path/to/log" template="jsondump") </textarea> +<p><b>A more practical one:</b> +<p>Take this example message (inspired by RFC5424 sample;)): +<p><code><34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][id@2 test="tast"] BOM'su root' failed for lonvick on /dev/pts/8</code> +<p>We apply this configuration: +<p><textarea rows="6" cols="120">module(load="mmpstrucdata") +action(type="mmpstrucdata") +template(name="sample2" type="string" + string="ALL: %$!%\nSD: %$!RFC5424-SD%\nIUT:%$!rfc5424-sd!exampleSDID@32473!iut%\nRAWMSG: %rawmsg%\n\n") +action(type="omfile" file="/path/to/log" template="sample2") +</textarea> +<p>This will output: +<p><code>ALL: { "rfc5424-sd": { "examplesdid@32473": { "iut": "3", "eventsource": "Application", "eventid": "1011" }, "id@2": { "test": "tast" } } }</br> +SD: { "examplesdid@32473": { "iut": "3", "eventsource": "Application", "eventid": "1011" }, "id@2": { "test": "tast" } }</br> +IUT:3</br> +RAWMSG: <34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][id@2 test="tast"] BOM'su root' failed for lonvick on /dev/pts/8</code> +<p>As you can seem, you can address each of the individual items. Note that the +case of the RFC5424 parameter names has been converted to lower case. + <p>[<a href="rsyslog_conf.html">rsyslog.conf overview</a>] [<a href="manual.html">manual index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p> <p><font size="2">This documentation is part of the diff --git a/plugins/mmpstrucdata/mmpstrucdata.c b/plugins/mmpstrucdata/mmpstrucdata.c index e9e36b27..680ba92b 100644 --- a/plugins/mmpstrucdata/mmpstrucdata.c +++ b/plugins/mmpstrucdata/mmpstrucdata.c @@ -31,6 +31,7 @@ #include <errno.h> #include <unistd.h> #include <stdint.h> +#include <ctype.h> #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -218,7 +219,8 @@ dbgprintf("DDDD: parseSD_NAME %s\n", sdbuf+*curridx); if( sdbuf[i] == '=' || sdbuf[i] == '"' || sdbuf[i] == ']' || sdbuf[i] == ' ') break; - namebuf[j] = sdbuf[i++]; + namebuf[j] = tolower(sdbuf[i]); + ++i; } namebuf[j] = '\0'; dbgprintf("DDDD: parseSD_NAME, NAME: '%s'\n", namebuf); @@ -349,7 +351,7 @@ dbgprintf("DDDD: json: '%s'\n", json_object_get_string(json)); if(jroot == NULL) { ABORT_FINALIZE(RS_RET_ERR); } - json_object_object_add(jroot, "RFC5424-SD", json); + json_object_object_add(jroot, "rfc5424-sd", json); msgAddJSON(pMsg, pData->jsonRoot, jroot); finalize_it: RETiRet; diff --git a/runtime/queue.c b/runtime/queue.c index 5770eae9..ff1b30f4 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -59,6 +59,7 @@ #include "datetime.h" #include "unicode-helper.h" #include "statsobj.h" +#include "parserif.h" #ifdef OS_SOLARIS # include <sched.h> @@ -86,6 +87,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDestructDisk(qqueue_t *pThis); +rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -94,6 +96,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis); /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfpdescr[] = { { "queue.filename", eCmdHdlrGetWord, 0 }, + { "queue.spooldirectory", eCmdHdlrGetWord, 0 }, { "queue.size", eCmdHdlrSize, 0 }, { "queue.dequeuebatchsize", eCmdHdlrInt, 0 }, { "queue.maxdiskspace", eCmdHdlrSize, 0 }, @@ -417,6 +420,7 @@ StartDA(qqueue_t *pThis) CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(qqueueSetSpoolDir(pThis->pqDA, pThis->pszSpoolDir, pThis->lenSpoolDir)); CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); @@ -733,7 +737,7 @@ qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pTh DEFiRet; ISOBJ_TYPE_assert(pStrm, strm); ISOBJ_TYPE_assert(pThis, qqueue); - CHKiRet(strm.SetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pStrm, pThis->pszSpoolDir, pThis->lenSpoolDir)); finalize_it: RETiRet; } @@ -840,7 +844,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) } else { CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite)); CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles)); - CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, pThis->pszSpoolDir, pThis->lenSpoolDir)); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); @@ -852,7 +856,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); - CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, pThis->pszSpoolDir, pThis->lenSpoolDir)); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); @@ -865,7 +869,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel)); CHKiRet(strm.SetbSync(pThis->tVars.disk.pReadDel, pThis->bSyncQueueFiles)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); - CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, pThis->pszSpoolDir, pThis->lenSpoolDir)); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ)); CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR)); @@ -1359,7 +1363,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->iDeqBatchSize = 128; /* default batch size */ pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ - pThis->iDiscardMrk = 980; /* begin to discard messages */ + pThis->iDiscardMrk = -1; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ pThis->iMaxFileSize = 1024*1024; @@ -1389,7 +1393,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->iDeqBatchSize = 1024; /* default batch size */ pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ - pThis->iDiscardMrk = 49500; /* begin to discard messages */ + pThis->iDiscardMrk = -1; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ pThis->iMaxFileSize = 16*1024*1024; @@ -1680,6 +1684,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) /* The rate limiter * + * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when + * it actually delays processing. Otherwise inputs are stalled. + * * Here we may wait if a dequeue time window is defined or if we are * rate-limited. TODO: If we do so, we should also look into the * way new worker threads are spawned. Obviously, it doesn't make much @@ -1765,8 +1772,10 @@ RateLimiter(qqueue_t *pThis) } if(iDelay > 0) { + pthread_mutex_unlock(pThis->mut); DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); srSleep(iDelay, 0); + pthread_mutex_lock(pThis->mut); } RETiRet; @@ -2076,7 +2085,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ /* pre-construct file name for .qi file */ pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), - "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); + "%s/%s.qi", (char*) pThis->pszSpoolDir, (char*)pThis->pszFilePrefix); pThis->pszQIFNam = ustrdup(pszQIFNam); DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam, (int) pThis->lenQIFNam); @@ -2124,21 +2133,68 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } } - if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) + if(pThis->iDiscardMrk > pThis->iMaxQueueSize) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": " + "queue.discardMark %d is set larger than queue.size", + obj.GetName((obj_t*) pThis), pThis->iDiscardMrk); + } + + goodval = (pThis->iMaxQueueSize / 100) * 80; + if(pThis->iDiscardMrk != -1 && pThis->iDiscardMrk < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": queue.discardMark " + "is set quite low at %d. You should only set it below " + "80%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iDiscardMrk, goodval); + } + + + /* now come parameter corrections and defaults */ + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) { pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; + if(pThis->iHighWtrMrk == 0) { /* guard against very low max queue sizes! */ + pThis->iHighWtrMrk = pThis->iMaxQueueSize; + } + } if( pThis->iLowWtrMrk < 2 || pThis->iLowWtrMrk > pThis->iMaxQueueSize - || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) + || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) { pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iLowWtrMrk == 0) { + pThis->iLowWtrMrk = 1; + } + } + if( pThis->iMinMsgsPerWrkr < 1 - || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) { pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; - if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) + } + + if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) { pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; - if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) + if(pThis->iFullDlyMrk == 0) { + pThis->iFullDlyMrk = + (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1; + } + } + if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) { pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70; - if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) + if(pThis->iLightDlyMrk == 0) { + pThis->iLightDlyMrk = + (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1; + } + } + + if(pThis->iDiscardMrk < 1 || pThis->iDiscardMrk > pThis->iMaxQueueSize) { + pThis->iDiscardMrk = (pThis->iMaxQueueSize / 100) * 98; + if(pThis->iDiscardMrk == 0) { + /* for very small queues, we disable this by default */ + pThis->iDiscardMrk = pThis->iMaxQueueSize; + } + } + + if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) { pThis->iDeqBatchSize = pThis->iMaxQueueSize; + } /* finalize some initializations that could not yet be done because it is * influenced by properties which might have been set after queueConstruct () @@ -2170,14 +2226,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iFullDlyMrk = wrk; } - DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, " - "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, " - "max wrkr %d, min msgs f. wrkr %d\n", - pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize, + DBGOPRINT((obj_t*) pThis, "params: type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, " + "maxQSize %d, lqsize %d, pqsize %d, child %d, full delay %d, " + "light delay %d, deq batch size %d, high wtrmrk %d, low wtrmrk %d, " + "discardmrk %d, max wrkr %d, min msgs f. wrkr %d\n", + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir, + pThis->iMaxFileSize, pThis->iMaxQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk, - pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr); + pThis->iDiscardMrk, pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr); pThis->bQueueStarted = 1; if(pThis->qType == QUEUETYPE_DIRECT) @@ -2484,6 +2542,24 @@ CODESTARTobjDestruct(qqueue) ENDobjDestruct(qqueue) +/* set the queue's spool directory. The directory MUST NOT be NULL. + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. + */ +rsRetVal +qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir) +{ + DEFiRet; + + free(pThis->pszSpoolDir); + CHKmalloc(pThis->pszSpoolDir = ustrdup(pszSpoolDir)); + pThis->lenSpoolDir = lenSpoolDir; + +finalize_it: + RETiRet; +} + + /* set the queue's file prefix * The passed-in string is duplicated. So if the caller does not need * it any longer, it must free it. @@ -2834,6 +2910,16 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst) pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr); } else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) { pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "queue.spooldirectory")) { + free(pThis->pszSpoolDir); + pThis->pszSpoolDir = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + pThis->lenSpoolDir = es_strlen(pvals[i].val.d.estr); + if(pThis->pszSpoolDir[pThis->lenSpoolDir-1] == '/') { + pThis->pszSpoolDir[pThis->lenSpoolDir-1] = '\0'; + --pThis->lenSpoolDir; + parser_errmsg("queue.spooldirectory must not end with '/', " + "corrected to '%s'", pThis->pszSpoolDir); + } } else if(!strcmp(pblk.descr[i].name, "queue.size")) { pThis->iMaxQueueSize = pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) { diff --git a/runtime/wti.c b/runtime/wti.c index bbefc537..c02d0573 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -303,15 +303,26 @@ wtiWorker(wti_t *pThis) pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("DDDD: wti %p: worker starting\n", pThis); - /* now we have our identity, on to real processing */ - while(1) { /* loop will be broken below - need to do mutex locks */ + + /* note: in this loop, the mutex is "never" unlocked. Of course, + * this is not true: it actually is unlocked when the actual processing + * is done, as part of pWtp->pfDoWork() processing. Note that this + * function is required to re-lock it when done. We cannot do the + * lock/unlock here ourselfs, as pfDoWork() needs to access queue + * structures itself. + * The same goes for pfRateLimiter(). While we could unlock/lock when + * we call it, in practice the function is often called without any + * ratelimiting actually done. Only the rate limiter itself knows + * that. As such, it needs to bear the burden of doing the locking + * when required. -- rgerhards, 2013-11-20 + */ + d_pthread_mutex_lock(pWtp->pmutUsr); + while(1) { /* loop will be broken below */ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } - d_pthread_mutex_lock(pWtp->pmutUsr); - /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { @@ -319,36 +330,29 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis); localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", localRet); - d_pthread_mutex_unlock(pWtp->pmutUsr); break; } /* try to execute and process whatever we have */ - /* Note that this function releases and re-aquires the mutex. The returned - * information on idle state must be processed before releasing the mutex again. - */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { - d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } else if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { - d_pthread_mutex_unlock(pWtp->pmutUsr); DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", terminateRet, bInactivityTOOccured); break; /* end of loop */ } doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } - d_pthread_mutex_unlock(pWtp->pmutUsr); - bInactivityTOOccured = 0; /* reset for next run */ } + d_pthread_mutex_unlock(pWtp->pmutUsr); + DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis); for(i = 0 ; i < iActionNbr ; ++i) { dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData); diff --git a/tests/Makefile.am b/tests/Makefile.am index a2548a68..5232e3ef 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -7,6 +7,8 @@ TESTS = $(TESTRUNS) if ENABLE_IMDIAG TESTS += \ stop-localvar.sh \ + stop-msgvar.sh \ + rfc5424parser.sh \ arrayqueue.sh \ global_vars.sh \ da-mainmsg-q.sh \ @@ -122,6 +124,11 @@ TESTS += \ imptcp_conndrop.sh endif +if ENABLE_MMPSTRUCDATA +TESTS += \ + mmpstrucdata.sh +endif + if ENABLE_GNUTLS # TODO: re-enable in newer version #TESTS += \ @@ -298,10 +305,14 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ testsuites/stop.conf \ stop-localvar.sh \ testsuites/stop-localvar.conf \ + stop-msgvar.sh \ + testsuites/stop-msgvar.conf \ global_vars.sh \ testsuites/global_vars.conf \ + rfc5424parser.sh \ + testsuites/rfc5424parser.conf \ rs_optimizer_pri.sh \ - testsuites/rs_optimizer_pr.conf \ + testsuites/rs_optimizer_pri.conf \ rscript_prifilt.sh \ testsuites/rscript_prifilt.conf \ rscript_optimizer1.sh \ @@ -526,6 +537,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ mysql-asyn.sh \ mysql-asyn-vg.sh \ testsuites/mysql-asyn.conf \ + mmpstrucdata.sh \ + testsuites/mmpstrucdata.conf \ cfg.sh # TODO: re-enable diff --git a/tests/chkseq.c b/tests/chkseq.c index bea9f83a..bd8597e8 100644 --- a/tests/chkseq.c +++ b/tests/chkseq.c @@ -51,6 +51,7 @@ int main(int argc, char *argv[]) int reachedEOF; int edLen; /* length of extra data */ static char edBuf[500*1024]; /* buffer for extra data (pretty large to be on the save side...) */ + static char ioBuf[sizeof(edBuf)+1024]; char *file = NULL; while((opt = getopt(argc, argv, "e:f:ds:vE")) != EOF) { @@ -103,14 +104,22 @@ int main(int argc, char *argv[]) for(i = start ; i < end+1 ; ++i) { if(bHaveExtraData) { - scanfOK = fscanf(fp, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0; + if(fgets(ioBuf, sizeof(ioBuf), fp) == NULL) { + scanfOK = 0; + } else { + scanfOK = sscanf(ioBuf, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0; + } if(edLen != (int) strlen(edBuf)) { printf("extra data length specified %d, but actually is %ld in record %d\n", edLen, (long) strlen(edBuf), i); exit(1); } } else { - scanfOK = fscanf(fp, "%d\n", &val) == 1 ? 1 : 0; + if(fgets(ioBuf, sizeof(ioBuf), fp) == NULL) { + scanfOK = 0; + } else { + scanfOK = sscanf(ioBuf, "%d\n", &val) == 1 ? 1 : 0; + } } if(!scanfOK) { printf("scanf error in index i=%d\n", i); @@ -132,9 +141,11 @@ int main(int argc, char *argv[]) exit(1); } - if(feof(fp)) { + int c = getc(fp); + if(c == EOF) { reachedEOF = 1; } else { + ungetc(c, fp); /* if duplicates are permitted, we need to do a final check if we have duplicates at the * end of file. */ @@ -142,14 +153,22 @@ int main(int argc, char *argv[]) i = end; while(!feof(fp)) { if(bHaveExtraData) { - scanfOK = fscanf(fp, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0; + if(fgets(ioBuf, sizeof(ioBuf), fp) == NULL) { + scanfOK = 0; + } else { + scanfOK = sscanf(ioBuf, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0; + } if(edLen != (int) strlen(edBuf)) { printf("extra data length specified %d, but actually is %ld in record %d\n", edLen, (long) strlen(edBuf), i); exit(1); } } else { - scanfOK = fscanf(fp, "%d\n", &val) == 1 ? 1 : 0; + if(fgets(ioBuf, sizeof(ioBuf), fp) == NULL) { + scanfOK = 0; + } else { + scanfOK = sscanf(ioBuf, "%d\n", &val) == 1 ? 1 : 0; + } } if(val != i) { diff --git a/tests/mmpstrucdata.sh b/tests/mmpstrucdata.sh new file mode 100755 index 00000000..62b6ba96 --- /dev/null +++ b/tests/mmpstrucdata.sh @@ -0,0 +1,12 @@ +# This file is part of the rsyslog project, released under ASL 2.0 +# rgerhards, 2013-11-22 +echo =============================================================================== +echo \[mmpstrucdata.sh\]: testing mmpstrucdata +source $srcdir/diag.sh init +source $srcdir/diag.sh startup mmpstrucdata.conf +sleep 1 +source $srcdir/diag.sh tcpflood -m100 -y +source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +source $srcdir/diag.sh wait-shutdown +source $srcdir/diag.sh seq-check 0 99 +source $srcdir/diag.sh exit diff --git a/tests/rfc5424parser.sh b/tests/rfc5424parser.sh new file mode 100755 index 00000000..3f5be497 --- /dev/null +++ b/tests/rfc5424parser.sh @@ -0,0 +1,12 @@ +# This file is part of the rsyslog project, released under ASL 2.0 +# rgerhards, 2013-11-22 +echo =============================================================================== +echo \[rfc5424parser.sh\]: testing mmpstrucdata +source $srcdir/diag.sh init +source $srcdir/diag.sh startup rfc5424parser.conf +sleep 1 +source $srcdir/diag.sh tcpflood -m100 -y +source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +source $srcdir/diag.sh wait-shutdown +source $srcdir/diag.sh seq-check 0 99 +source $srcdir/diag.sh exit diff --git a/tests/tcpflood.c b/tests/tcpflood.c index b3cef2e0..f17363f2 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -48,13 +48,14 @@ * -b number of messages within a batch (default: 100,000,000 millions) * -Y use multiple threads, one per connection (which means 1 if one only connection * is configured!) + * -y use RFC5424 style test message * -z private key file for TLS mode * -Z cert (public key) file for TLS mode * -L loglevel to use for GnuTLS troubleshooting (0-off to 10-all, 0 default) * * Part of the testbench for rsyslog. * - * Copyright 2009, 2010 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009, 2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -111,6 +112,7 @@ static int targetPort = 13514; static int numTargetPorts = 1; static int dynFileIDs = 0; static int extraDataLen = 0; /* amount of extra data to add to message */ +static int useRFC5424Format = 0; /* should the test message be in RFC5424 format? */ static int bRandomizeExtraData = 0; /* randomize amount of extra data added */ static int numMsgsToSend; /* number of messages to send */ static unsigned numConnections = 1; /* number of connections to create */ @@ -363,8 +365,14 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst) snprintf(dynFileIDBuf, sizeof(dynFileIDBuf), "%d:", rand() % dynFileIDs); } if(extraDataLen == 0) { - *pLenBuf = snprintf(buf, maxBuf, "<%s>Mar 1 01:00:00 172.20.245.8 tag msgnum:%s%8.8d:%c", - msgPRI, dynFileIDBuf, msgNum, frameDelim); + if(useRFC5424Format) { + *pLenBuf = snprintf(buf, maxBuf, "<%s>1 2003-03-01T01:00:00.000Z mymachine.example.com tcpflood " + "- tag [tcpflood@32473 MSGNUM=\"%8.8d\"] msgnum:%s%8.8d:%c", + msgPRI, msgNum, dynFileIDBuf, msgNum, frameDelim); + } else { + *pLenBuf = snprintf(buf, maxBuf, "<%s>Mar 1 01:00:00 172.20.245.8 tag msgnum:%s%8.8d:%c", + msgPRI, dynFileIDBuf, msgNum, frameDelim); + } } else { if(bRandomizeExtraData) edLen = ((long) rand() + extraDataLen) % extraDataLen + 1; @@ -372,8 +380,14 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst) edLen = extraDataLen; memset(extraData, 'X', edLen); extraData[edLen] = '\0'; - *pLenBuf = snprintf(buf, maxBuf, "<%s>Mar 1 01:00:00 172.20.245.8 tag msgnum:%s%8.8d:%d:%s%c", - msgPRI, dynFileIDBuf, msgNum, edLen, extraData, frameDelim); + if(useRFC5424Format) { + *pLenBuf = snprintf(buf, maxBuf, "<%s>1 2003-03-01T01:00:00.000Z mymachine.example.com tcpflood " + "- tag [tcpflood@32473 MSGNUM=\"%8.8d\"] msgnum:%s%8.8d:%c", + msgPRI, msgNum, dynFileIDBuf, msgNum, frameDelim); + } else { + *pLenBuf = snprintf(buf, maxBuf, "<%s>Mar 1 01:00:00 172.20.245.8 tag msgnum:%s%8.8d:%d:%s%c", + msgPRI, dynFileIDBuf, msgNum, edLen, extraData, frameDelim); + } } } else { /* use fixed message format from command line */ @@ -830,7 +844,7 @@ int main(int argc, char *argv[]) setvbuf(stdout, buf, _IONBF, 48); - while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:L:M:rsBR:S:T:XW:Yz:Z:")) != -1) { + while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:L:M:rsBR:S:T:XW:yYz:Z:")) != -1) { switch (opt) { case 'b': batchsize = atoll(optarg); break; @@ -908,6 +922,8 @@ int main(int argc, char *argv[]) break; case 'Y': runMultithreaded = 1; break; + case 'y': useRFC5424Format = 1; + break; case 'z': tlsKeyFile = optarg; break; case 'Z': tlsCertFile = optarg; diff --git a/tests/testsuites/mmpstrucdata.conf b/tests/testsuites/mmpstrucdata.conf new file mode 100644 index 00000000..fd18fd99 --- /dev/null +++ b/tests/testsuites/mmpstrucdata.conf @@ -0,0 +1,12 @@ +$IncludeConfig diag-common.conf + +module(load="../plugins/mmpstrucdata/.libs/mmpstrucdata") +module(load="../plugins/imtcp/.libs/imtcp") + +template(name="outfmt" type="string" string="%$!rfc5424-sd!tcpflood@32473!msgnum%\n") + +input(type="imtcp" port="13514") + +action(type="mmpstrucdata") +if $msg contains "msgnum" then + action(type="omfile" template="outfmt" file="rsyslog.out.log") diff --git a/tests/testsuites/rfc5424parser.conf b/tests/testsuites/rfc5424parser.conf new file mode 100644 index 00000000..cd90d120 --- /dev/null +++ b/tests/testsuites/rfc5424parser.conf @@ -0,0 +1,10 @@ +$IncludeConfig diag-common.conf + +module(load="../plugins/imtcp/.libs/imtcp") + +template(name="outfmt" type="string" string="%msg:F,58:2%\n") + +input(type="imtcp" port="13514") + +if $msg contains "msgnum" then + action(type="omfile" template="outfmt" file="rsyslog.out.log") diff --git a/tests/testsuites/stop-localvar.conf b/tests/testsuites/stop-localvar.conf index 020ebd87..63df6509 100644 --- a/tests/testsuites/stop-localvar.conf +++ b/tests/testsuites/stop-localvar.conf @@ -5,17 +5,17 @@ * rgerhards, 2013-11-19 */ $IncludeConfig diag-common.conf -template(name="outfmt" type="string" string="%$!nbr%\n") +template(name="outfmt" type="string" string="%$.nbr%\n") module(load="../plugins/imtcp/.libs/imtcp") input(type="imtcp" port="13514") if $msg contains "msgnum:" then { - set $!nbr = field($msg, 58, 2); - if cnum($!nbr) < 100 then + set $.nbr = field($msg, 58, 2); + if cnum($.nbr) < 100 then stop /* check is intentionally more complex than needed! */ - else if not (cnum($!nbr) > 999) then { + else if not (cnum($.nbr) > 999) then { action(type="omfile" file="rsyslog.out.log" template="outfmt") } } |