summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog26
-rw-r--r--doc/rainerscript.html13
-rw-r--r--grammar/lexer.l2
-rw-r--r--grammar/rainerscript.h4
-rw-r--r--runtime/glbl.c30
-rw-r--r--runtime/glbl.h3
-rw-r--r--runtime/msg.c8
-rw-r--r--runtime/queue.c15
-rw-r--r--runtime/rsconf.c21
-rw-r--r--runtime/stream.c2
-rw-r--r--runtime/stream.h3
-rw-r--r--tests/chkseq.c41
12 files changed, 146 insertions, 22 deletions
diff --git a/ChangeLog b/ChangeLog
index eeed6dcd..7373032d 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,9 @@
---------------------------------------------------------------------------
Version 7.5.3 [devel] 2013-07-??
+- bugfix: queue file size was not correctly processed
+ this could lead to using one queue file per message for sizes >2GiB
+ Thanks to Tomas Heinrich for the patch.
+- add main_queue() configuration object to configure main message queue
- bugfix: stream compression in imptcp caused timestamp to be corrupted
- imudp: add ability to specify SO_RCVBUF size (rcvbufSize parameter)
- impstats: add process resource usage counters [via getrusage()]
@@ -68,7 +72,21 @@ Version 7.5.0 [devel] 2013-06-11
connections
Thanks to Axel Rau for the patch.
---------------------------------------------------------------------------
-Version 7.4.3 [v7.4-stable] 2013-07-??
+Version 7.4.3 [v7.4-stable] 2013-07-18
+- bugfix: queue file size was not correctly processed
+ this could lead to using one queue file per message for sizes >2GiB
+ Thanks to Tomas Heinrich for the patch.
+- bugfix: $QHOUR/$HHOUR were always "00" or "01"
+ regression some time between v5 and here
+ Thanks to forum user rjmcinty for reporting this bug
+- bugfix: testbench tool chkseq did improperly report invalid file
+ This happened when permitted duplicate values existed in the very
+ last lines, right before end-of-file.
+ Thanks to Radu Gheorghe for reporting this bug.
+---------------------------------------------------------------------------
+Version 7.4.3 [v7.4-stable] 2013-07-18
+- bugfix: memory leak if disk queues were used and json data present
+- bugfix: CEE/json data was lost during disk queue operation
- bugfix: potential segfault during startup on invalid config
could happen if invalid actions were present, which could lead
to improper handling in optimizer.
@@ -76,6 +94,9 @@ Version 7.4.3 [v7.4-stable] 2013-07-??
- bugfix: omlibdbi did not properly close connection on some errors
This happened to errors occuring in Begin/End Transaction entry
points.
+- cosmetic bugfix: file name buffer was not freed on disk queue destruction
+ This was an extremely small one-time per run memleak, so nothing of
+ concern. However, it bugs under valgrind and similar memory debuggers.
- fix build on FreeBSD
Thanks to Christiano Rolim for the patch
---------------------------------------------------------------------------
@@ -1536,6 +1557,9 @@ expected that interfaces, even new ones, break during the initial
[ported from v4]
---------------------------------------------------------------------------
Version 5.10.2 [V5-STABLE], 201?-??-??
+- bugfix: queue file size was not correctly processed
+ this could lead to using one queue file per message for sizes >2GiB
+ Thanks to Tomas Heinrich for the patch.
- updated systemd files to match current systemd source
- bugfix: spurios error messages from imuxsock about (non-error) EAGAIN
Thanks to Marius Tomaschewski for the patch.
diff --git a/doc/rainerscript.html b/doc/rainerscript.html
index 7cbbfa9f..d1d9f0a2 100644
--- a/doc/rainerscript.html
+++ b/doc/rainerscript.html
@@ -34,6 +34,19 @@ return a valid result, as you can't really add two letters (to
concatenate them, use the concatenation operator &).
 However, all type conversions are automatically done by the
script interpreter when there is need to do so.<br>
+<h2>configuration objects</h2>
+<h3>main_queue()</h3>
+<p><i>This object is available since 7.5.3.</i>
+This permits to specify parameters for the main message queue. Note that
+only <a href="queue_parameters.html">queue-parameters</a> are permitted for this
+config object. This permits to set the same options like in ruleset and action
+queues. A special statement is needed for the main queue, because it is a
+different object and cannot be configured via any other object.
+<p>Note that when the main_queue() object is configured, the legacy
+$MainMsgQ... statements are ignored.
+<p>Example:</p>
+<textarea rows="2" cols="60">main_queue(queue.size="100000" queue.type="LinkedList")
+</textarea>
<h2>Expressions</h2>
The language supports arbitrary complex expressions. All usual
operators are supported. The precedence of operations is as follows
diff --git a/grammar/lexer.l b/grammar/lexer.l
index bcf39a7f..44606977 100644
--- a/grammar/lexer.l
+++ b/grammar/lexer.l
@@ -176,6 +176,8 @@ int fileno(FILE *stream);
<INCL>[^ \t\n]+ { if(cnfDoInclude(yytext) != 0)
yyterminate();
BEGIN INITIAL; }
+"main_queue"[ \n\t]*"(" { yylval.objType = CNFOBJ_MAINQ;
+ BEGIN INOBJ; return BEGINOBJ; }
"global"[ \n\t]*"(" { yylval.objType = CNFOBJ_GLOBAL;
BEGIN INOBJ; return BEGINOBJ; }
"template"[ \n\t]*"(" { yylval.objType = CNFOBJ_TPL;
diff --git a/grammar/rainerscript.h b/grammar/rainerscript.h
index d00cc4c3..3e59fc60 100644
--- a/grammar/rainerscript.h
+++ b/grammar/rainerscript.h
@@ -24,6 +24,7 @@ enum cnfobjType {
CNFOBJ_TPL,
CNFOBJ_PROPERTY,
CNFOBJ_CONSTANT,
+ CNFOBJ_MAINQ,
CNFOBJ_INVALID = 0
};
@@ -55,6 +56,9 @@ cnfobjType2str(enum cnfobjType ot)
case CNFOBJ_CONSTANT:
return "constant";
break;
+ case CNFOBJ_MAINQ:
+ return "main_queue";
+ break;
default:return "error: invalid cnfobjType";
}
}
diff --git a/runtime/glbl.c b/runtime/glbl.c
index ccb978ba..6af38e01 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -60,6 +60,7 @@ DEFobjCurrIf(net)
* For this object, these variables are obviously what makes the "meat" of the
* class...
*/
+static struct cnfobj *mainqCnfObj = NULL;/* main queue object, to be used later in startup sequence */
static uchar *pszWorkDir = NULL;
static int bOptimizeUniProc = 1; /* enable uniprocessor optimizations */
static int bParseHOSTNAMEandTAG = 1; /* parser modification (based on startup params!) */
@@ -137,6 +138,7 @@ static dataType Get##nameFunc(void) \
SIMP_PROP(ParseHOSTNAMEandTAG, bParseHOSTNAMEandTAG, int)
SIMP_PROP(OptimizeUniProc, bOptimizeUniProc, int)
SIMP_PROP(PreserveFQDN, bPreserveFQDN, int)
+SIMP_PROP(mainqCnfObj, mainqCnfObj, struct cnfobj *)
SIMP_PROP(MaxLine, iMaxLine, int)
SIMP_PROP(DefPFFamily, iDefPFFamily, int) /* note that in the future we may check the family argument */
SIMP_PROP(DropMalPTRMsgs, bDropMalPTRMsgs, int)
@@ -529,6 +531,7 @@ CODESTARTobjQueryInterface(glbl)
SIMP_PROP(DropMalPTRMsgs);
SIMP_PROP(Option_DisallowWarning);
SIMP_PROP(DisableDNS);
+ SIMP_PROP(mainqCnfObj);
SIMP_PROP(LocalFQDNName)
SIMP_PROP(LocalHostName)
SIMP_PROP(LocalDomain)
@@ -579,6 +582,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
void
glblPrepCnf(void)
{
+ free(mainqCnfObj);
+ mainqCnfObj = NULL;
free(cnfparamvals);
cnfparamvals = NULL;
}
@@ -596,6 +601,31 @@ glblProcessCnf(struct cnfobj *o)
cnfparamsPrint(&paramblk, cnfparamvals);
}
+/* Set mainq parameters. Note that when this is not called, we'll use the
+ * legacy parameter config. mainq parameters can only be set once.
+ */
+void
+glblProcessMainQCnf(struct cnfobj *o)
+{
+ if(mainqCnfObj == NULL) {
+ mainqCnfObj = o;
+ } else {
+ errmsg.LogError(0, RS_RET_ERR, "main_queue() object can only be specified "
+ "once - all but first ignored\n");
+ }
+}
+
+/* destruct the main q cnf object after it is no longer needed. This is
+ * also used to do some final checks.
+ */
+void
+glblDestructMainqCnfObj()
+{
+ nvlstChkUnused(mainqCnfObj->nvlst);
+ cnfobjDestruct(mainqCnfObj);
+ mainqCnfObj = NULL;
+}
+
void
glblDoneLoadCnf(void)
{
diff --git a/runtime/glbl.h b/runtime/glbl.h
index 2c7f3b31..4b1cd9f8 100644
--- a/runtime/glbl.h
+++ b/runtime/glbl.h
@@ -52,6 +52,7 @@ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */
SIMP_PROP(Option_DisallowWarning, int)
SIMP_PROP(DisableDNS, int)
SIMP_PROP(LocalFQDNName, uchar*)
+ SIMP_PROP(mainqCnfObj, struct cnfobj*)
SIMP_PROP(LocalHostName, uchar*)
SIMP_PROP(LocalDomain, uchar*)
SIMP_PROP(StripDomains, char**)
@@ -96,6 +97,8 @@ static inline void glblSetOurPid(pid_t pid) { glbl_ourpid = pid; }
void glblPrepCnf(void);
void glblProcessCnf(struct cnfobj *o);
+void glblProcessMainQCnf(struct cnfobj *o);
+void glblDestructMainqCnfObj();
void glblDoneLoadCnf(void);
#endif /* #ifndef GLBL_H_INCLUDED */
diff --git a/runtime/msg.c b/runtime/msg.c
index 58a74498..73fa0367 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -1127,7 +1127,6 @@ MsgDeserialize(msg_t *pMsg, strm_t *pStrm)
prop_t *propRcvFrom = NULL;
prop_t *propRcvFromIP = NULL;
struct json_tokener *tokener;
- struct json_object *json;
var_t *pVar = NULL;
DEFiRet;
@@ -1211,8 +1210,9 @@ MsgDeserialize(msg_t *pMsg, strm_t *pStrm)
}
if(isProp("json")) {
tokener = json_tokener_new();
- json = json_tokener_parse_ex(tokener, (char*)rsCStrGetSzStrNoNULL(pVar->val.pStr),
+ pMsg->json = json_tokener_parse_ex(tokener, (char*)rsCStrGetSzStrNoNULL(pVar->val.pStr),
cstrLen(pVar->val.pStr));
+ json_tokener_free(tokener);
reinitVar(pVar);
CHKiRet(objDeserializeProperty(pVar, pStrm));
}
@@ -2517,10 +2517,10 @@ static uchar *getNOW(eNOWType eNow, struct syslogTime *t)
memcpy(pBuf, two_digits[(int)t->hour], 3);
break;
case NOW_HHOUR:
- memcpy(pBuf, two_digits[t->hour/30], 3);
+ memcpy(pBuf, two_digits[t->minute/30], 3);
break;
case NOW_QHOUR:
- memcpy(pBuf, two_digits[t->hour/15], 3);
+ memcpy(pBuf, two_digits[t->minute/15], 3);
break;
case NOW_MINUTE:
memcpy(pBuf, two_digits[(int)t->minute], 3);
diff --git a/runtime/queue.c b/runtime/queue.c
index c9c236fd..5666c465 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -898,7 +898,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
DEFiRet;
ASSERT(pThis != NULL);
-
+
+ free(pThis->pszQIFNam);
if(pThis->tVars.disk.pWrite != NULL)
strm.Destruct(&pThis->tVars.disk.pWrite);
if(pThis->tVars.disk.pReadDeq != NULL)
@@ -2147,12 +2148,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iFullDlyMrk = wrk;
}
- DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, "
- "full delay %d, light delay %d, deq batch size %d starting\n",
- pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
+ DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, "
+ "full delay %d, light delay %d, deq batch size %d starting, high wtrrmrk %d, low wtrmrk %d\n",
+ pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk,
- pThis->iDeqBatchSize);
+ pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk);
pThis->bQueueStarted = 1;
if(pThis->qType == QUEUETYPE_DIRECT)
@@ -2596,7 +2597,9 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull);
if(pThis->toEnq == 0 || pThis->bEnqOnly) {
- DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate discarding QueueSize=%d MaxQueueSize=%d sizeOnDisk=%d sizeOnDiskMax=%d\n", pThis->iQueueSize, pThis->iMaxQueueSize, pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax);
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate discarding QueueSize=%d "
+ "MaxQueueSize=%d sizeOnDisk=%lld sizeOnDiskMax=%lld\n", pThis->iQueueSize, pThis->iMaxQueueSize,
+ pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax);
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index d8b81f1b..b13e5cc7 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -399,6 +399,7 @@ yyerror(char *s)
}
void cnfDoObj(struct cnfobj *o)
{
+ int bDestructObj = 1;
int bChkUnuse = 1;
dbgprintf("cnf:global:obj: ");
@@ -407,6 +408,10 @@ void cnfDoObj(struct cnfobj *o)
case CNFOBJ_GLOBAL:
glblProcessCnf(o);
break;
+ case CNFOBJ_MAINQ:
+ glblProcessMainQCnf(o);
+ bDestructObj = 0;
+ break;
case CNFOBJ_MODULE:
modulesProcessCnf(o);
break;
@@ -430,9 +435,11 @@ void cnfDoObj(struct cnfobj *o)
o->objType);
break;
}
- if(bChkUnuse)
- nvlstChkUnused(o->nvlst);
- cnfobjDestruct(o);
+ if(bDestructObj) {
+ if(bChkUnuse)
+ nvlstChkUnused(o->nvlst);
+ cnfobjDestruct(o);
+ }
}
void cnfDoScript(struct cnfstmt *script)
@@ -757,9 +764,14 @@ startInputModules(void)
static inline rsRetVal
activateMainQueue()
{
+ struct cnfobj *mainqCnfObj;
DEFiRet;
+
+ mainqCnfObj = glbl.GetmainqCnfObj();
+ DBGPRINTF("activateMainQueue: mainq cnf obj ptr is %p\n", mainqCnfObj);
/* create message queue */
- CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), NULL)) {
+ CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"),
+ (mainqCnfObj == NULL) ? NULL : mainqCnfObj->nvlst)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
FINALIZE;
@@ -768,6 +780,7 @@ activateMainQueue()
bHaveMainQueue = (ourConf->globals.mainQ.MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1;
DBGPRINTF("Main processing queue is initialized and running\n");
finalize_it:
+ glblDestructMainqCnfObj();
RETiRet;
}
diff --git a/runtime/stream.c b/runtime/stream.c
index 1af5b974..444a33ff 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -1655,7 +1655,7 @@ finalize_it:
/* property set methods */
/* simple ones first */
-DEFpropSetMeth(strm, iMaxFileSize, int)
+DEFpropSetMeth(strm, iMaxFileSize, int64)
DEFpropSetMeth(strm, iFileNumDigits, int)
DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
diff --git a/runtime/stream.h b/runtime/stream.h
index 61d5ede2..4f4a4301 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -158,7 +158,6 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(strm_t **ppThis);
rsRetVal (*ConstructFinalize)(strm_t *pThis);
rsRetVal (*Destruct)(strm_t **ppThis);
- rsRetVal (*SetMaxFileSize)(strm_t *pThis, int64 iMaxFileSize);
rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName);
rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC);
rsRetVal (*UnreadChar)(strm_t *pThis, uchar c);
@@ -176,7 +175,7 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetWCntr)(strm_t *pThis, number_t *pWCnt);
rsRetVal (*Dup)(strm_t *pThis, strm_t **ppNew);
INTERFACEpropSetMeth(strm, bDeleteOnClose, int);
- INTERFACEpropSetMeth(strm, iMaxFileSize, int);
+ INTERFACEpropSetMeth(strm, iMaxFileSize, int64);
INTERFACEpropSetMeth(strm, iMaxFiles, int);
INTERFACEpropSetMeth(strm, iFileNumDigits, int);
INTERFACEpropSetMeth(strm, tOperationsMode, int);
diff --git a/tests/chkseq.c b/tests/chkseq.c
index b22c8992..bea9f83a 100644
--- a/tests/chkseq.c
+++ b/tests/chkseq.c
@@ -48,6 +48,7 @@ int main(int argc, char *argv[])
int start = 0, end = 0;
int opt;
int nDups = 0;
+ int reachedEOF;
int edLen; /* length of extra data */
static char edBuf[500*1024]; /* buffer for extra data (pretty large to be on the save side...) */
char *file = NULL;
@@ -126,15 +127,47 @@ int main(int argc, char *argv[])
}
}
- if(nDups != 0)
- printf("info: had %d duplicates (this is no error)\n", nDups);
-
if(i - 1 != end) {
printf("only %d records in file, expected %d\n", i - 1, end);
exit(1);
}
- if(!feof(fp)) {
+ if(feof(fp)) {
+ reachedEOF = 1;
+ } else {
+ /* if duplicates are permitted, we need to do a final check if we have duplicates at the
+ * end of file.
+ */
+ if(dupsPermitted) {
+ i = end;
+ while(!feof(fp)) {
+ if(bHaveExtraData) {
+ scanfOK = fscanf(fp, "%d,%d,%s\n", &val, &edLen, edBuf) == 3 ? 1 : 0;
+ if(edLen != (int) strlen(edBuf)) {
+ printf("extra data length specified %d, but actually is %ld in record %d\n",
+ edLen, (long) strlen(edBuf), i);
+ exit(1);
+ }
+ } else {
+ scanfOK = fscanf(fp, "%d\n", &val) == 1 ? 1 : 0;
+ }
+
+ if(val != i) {
+ reachedEOF = 0;
+ goto breakIF;
+ }
+ }
+ reachedEOF = feof(fp) ? 1 : 0;
+ } else {
+ reachedEOF = 0;
+ }
+ }
+
+breakIF:
+ if(nDups != 0)
+ printf("info: had %d duplicates (this is no error)\n", nDups);
+
+ if(!reachedEOF) {
printf("end of processing, but NOT end of file!\n");
exit(1);
}