summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/Makefile.am3
-rw-r--r--runtime/batch.h22
-rw-r--r--runtime/conf.c9
-rw-r--r--runtime/dnscache.c2
-rw-r--r--runtime/glbl.c2
-rw-r--r--runtime/msg.c320
-rw-r--r--runtime/msg.h6
-rw-r--r--runtime/obj.c111
-rw-r--r--runtime/obj.h6
-rw-r--r--runtime/queue.c227
-rw-r--r--runtime/queue.h23
-rw-r--r--runtime/ratelimit.c359
-rw-r--r--runtime/ratelimit.h51
-rw-r--r--runtime/rsconf.c7
-rw-r--r--runtime/rsyslog.h5
-rw-r--r--runtime/ruleset.c18
-rw-r--r--runtime/srutils.c22
-rw-r--r--runtime/stream.c205
-rw-r--r--runtime/stream.h12
-rw-r--r--runtime/typedefs.h2
20 files changed, 1095 insertions, 317 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 7af26d2b..fbc92d9c 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -65,12 +65,13 @@ librsyslog_la_SOURCES = \
ruleset.h \
prop.c \
prop.h \
+ ratelimit.c \
+ ratelimit.h \
cfsysline.c \
cfsysline.h \
sd-daemon.c \
sd-daemon.h \
\
- \
../action.h \
../action.c \
../threads.c \
diff --git a/runtime/batch.h b/runtime/batch.h
index f743c188..0f19f5bb 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -46,7 +46,7 @@ typedef enum {
/* an object inside a batch, including any information (state!) needed for it to "life".
*/
struct batch_obj_s {
- obj_t *pUsrp; /* pointer to user object (most often message) */
+ msg_t *pMsg;
batch_state_t state; /* associated state */
/* work variables for action processing; these are reused for each action (or block of
* actions)
@@ -97,13 +97,13 @@ batchSetSingleRuleset(batch_t *pBatch, sbool val) {
/* get the batches ruleset (if we have a single ruleset) */
static inline ruleset_t*
batchGetRuleset(batch_t *pBatch) {
- return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
+ return (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL;
}
/* get the ruleset of a specifc element of the batch (index not verified!) */
static inline ruleset_t*
batchElemGetRuleset(batch_t *pBatch, int i) {
- return ((msg_t*) pBatch->pElem[i].pUsrp)->pRuleset;
+ return pBatch->pElem[i].pMsg->pRuleset;
}
/* get number of msgs for this batch */
@@ -134,22 +134,6 @@ batchIsValidElem(batch_t *pBatch, int i) {
}
-/* copy one batch element to another.
- * This creates a complete duplicate in those cases where
- * it is needed. Use duplication only when absolutely necessary!
- * Note that all working fields are reset to zeros. If that were
- * not done, we would have potential problems with invalid
- * or double pointer frees.
- * rgerhards, 2010-06-10
- */
-static inline void
-batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) {
- memset(pDest, 0, sizeof(batch_obj_t));
- pDest->pUsrp = pSrc->pUsrp;
- pDest->state = pSrc->state;
-}
-
-
/* free members of a batch "object". Note that we can not do the usual
* destruction as the object typically is allocated on the stack and so the
* object itself cannot be freed! -- rgerhards, 2010-06-15
diff --git a/runtime/conf.c b/runtime/conf.c
index 23fb6bbd..c97391c6 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -607,13 +607,8 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) {
if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL,
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- else {
- dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* here check if the module is compatible with select features
+ * (currently, we have no such features!) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
conf->actions.nbrActions++; /* one more active action! */
}
diff --git a/runtime/dnscache.c b/runtime/dnscache.c
index 32d6e425..0b89d0bb 100644
--- a/runtime/dnscache.c
+++ b/runtime/dnscache.c
@@ -314,7 +314,7 @@ finalize_it:
* TODO: implement!
*/
static inline rsRetVal
-validateEntry(dnscache_entry_t *etry, struct sockaddr_storage *addr)
+validateEntry(dnscache_entry_t __attribute__((unused)) *etry, struct sockaddr_storage __attribute__((unused)) *addr)
{
return RS_RET_OK;
}
diff --git a/runtime/glbl.c b/runtime/glbl.c
index a0997829..0e5cac20 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -210,7 +210,7 @@ setLocalHostIPIF(void __attribute__((unused)) *pVal, uchar *pNewVal)
if(propLocalIPIF != NULL) {
errmsg.LogError(0, RS_RET_ERR, "$LocalHostIPIF is already set "
"and cannot be reset; place it at TOP OF rsyslog.conf!");
- ABORT_FINALIZE(RS_RET_ERR_WRKDIR);
+ ABORT_FINALIZE(RS_RET_ERR);
}
localRet = net.GetIFIPAddr(pNewVal, AF_UNSPEC, myIP, (int) sizeof(myIP));
diff --git a/runtime/msg.c b/runtime/msg.c
index 32a02424..d16bbb75 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -63,6 +63,7 @@
#include "ruleset.h"
#include "prop.h"
#include "net.h"
+#include "var.h"
#include "rsconf.h"
/* static data */
@@ -72,6 +73,7 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(regexp)
DEFobjCurrIf(prop)
DEFobjCurrIf(net)
+DEFobjCurrIf(var)
static struct {
uchar *pszName;
@@ -317,6 +319,16 @@ MsgUnlock(msg_t *pThis)
}
+/* rgerhards 2012-04-18: set associated ruleset (by ruleset name)
+ * If ruleset cannot be found, no update is done.
+ */
+static void
+MsgSetRulesetByName(msg_t *pMsg, cstr_t *rulesetName)
+{
+ rulesetGetRuleset(runConf, &(pMsg->pRuleset), rsCStrGetSzStrNoNULL(rulesetName));
+}
+
+
static inline int getProtocolVersion(msg_t *pM)
{
return(pM->iProtocolVersion);
@@ -719,6 +731,19 @@ finalize_it:
}
+/* Special msg constructor, to be used when an object is deserialized.
+ * we do only the base init as we know the properties will be set in
+ * any case by the deserializer. We still do the "inexpensive" inits
+ * just to be on the safe side. The whole process needs to be
+ * refactored together with the msg serialization subsystem.
+ */
+rsRetVal
+msgConstructForDeserializer(msg_t **ppThis)
+{
+ return msgBaseConstruct(ppThis);
+}
+
+
/* some free handlers for (slightly) complicated cases... All of them may be called
* with an empty element.
*/
@@ -1016,6 +1041,168 @@ finalize_it:
}
+/* This is a helper for MsgDeserialize that re-inits the var object. This
+ * whole construct should be replaced, var is really ready to be retired.
+ * But as an interim help during refactoring let's introduce this function
+ * here (and thus NOT as method of var object!). -- rgerhads, 2012-11-06
+ */
+static inline void
+reinitVar(var_t *pVar)
+{
+ rsCStrDestruct(&pVar->pcsName); /* no longer needed */
+ if(pVar->varType == VARTYPE_STR) {
+ if(pVar->val.pStr != NULL)
+ rsCStrDestruct(&pVar->val.pStr);
+ }
+}
+/* deserialize the message again
+ * we deserialize the properties in the same order that we serialized them. Except
+ * for some checks to cover downlevel version, we do not need to do all these
+ * CPU intense name checkings.
+ */
+#define isProp(name) !rsCStrSzStrCmp(pVar->pcsName, (uchar*) name, sizeof(name) - 1)
+rsRetVal
+MsgDeserialize(msg_t *pMsg, strm_t *pStrm)
+{
+ prop_t *myProp;
+ prop_t *propRcvFrom = NULL;
+ prop_t *propRcvFromIP = NULL;
+ struct json_tokener *tokener;
+ struct json_object *json;
+ var_t *pVar = NULL;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pStrm, strm);
+
+ CHKiRet(var.Construct(&pVar));
+ CHKiRet(var.ConstructFinalize(pVar));
+
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ if(isProp("iProtocolVersion")) {
+ setProtocolVersion(pMsg, pVar->val.num);
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("iSeverity")) {
+ pMsg->iSeverity = pVar->val.num;
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("iFacility")) {
+ pMsg->iFacility = pVar->val.num;
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("msgFlags")) {
+ pMsg->msgFlags = pVar->val.num;
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("ttGenTime")) {
+ pMsg->ttGenTime = pVar->val.num;
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("tRcvdAt")) {
+ memcpy(&pMsg->tRcvdAt, &pVar->val.vSyslogTime, sizeof(struct syslogTime));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("tTIMESTAMP")) {
+ memcpy(&pMsg->tTIMESTAMP, &pVar->val.vSyslogTime, sizeof(struct syslogTime));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pszTAG")) {
+ MsgSetTAG(pMsg, rsCStrGetSzStrNoNULL(pVar->val.pStr), cstrLen(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pszRawMsg")) {
+ MsgSetRawMsg(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr), cstrLen(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pszHOSTNAME")) {
+ MsgSetHOSTNAME(pMsg, rsCStrGetSzStrNoNULL(pVar->val.pStr), rsCStrLen(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pszInputName")) {
+ /* we need to create a property */
+ CHKiRet(prop.Construct(&myProp));
+ CHKiRet(prop.SetString(myProp, rsCStrGetSzStrNoNULL(pVar->val.pStr), rsCStrLen(pVar->val.pStr)));
+ CHKiRet(prop.ConstructFinalize(myProp));
+ MsgSetInputName(pMsg, myProp);
+ prop.Destruct(&myProp);
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pszRcvFrom")) {
+ MsgSetRcvFromStr(pMsg, rsCStrGetSzStrNoNULL(pVar->val.pStr), rsCStrLen(pVar->val.pStr), &propRcvFrom);
+ prop.Destruct(&propRcvFrom);
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pszRcvFromIP")) {
+ MsgSetRcvFromIPStr(pMsg, rsCStrGetSzStrNoNULL(pVar->val.pStr), rsCStrLen(pVar->val.pStr), &propRcvFromIP);
+ prop.Destruct(&propRcvFromIP);
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("json")) {
+ tokener = json_tokener_new();
+ json = json_tokener_parse_ex(tokener, (char*)rsCStrGetSzStrNoNULL(pVar->val.pStr),
+ cstrLen(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pCSStrucData")) {
+ MsgSetStructuredData(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pCSAPPNAME")) {
+ MsgSetAPPNAME(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pCSPROCID")) {
+ MsgSetPROCID(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pCSMSGID")) {
+ MsgSetMSGID(pMsg, (char*) rsCStrGetSzStrNoNULL(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pszUUID")) {
+ pMsg->pszUUID = ustrdup(rsCStrGetSzStrNoNULL(pVar->val.pStr));
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ if(isProp("pszRuleset")) {
+ MsgSetRulesetByName(pMsg, pVar->val.pStr);
+ reinitVar(pVar);
+ CHKiRet(objDeserializeProperty(pVar, pStrm));
+ }
+ /* "offMSG" must always be our last field, so we use this as an
+ * indicator if the sequence is correct. This is a bit questionable,
+ * but on the other hand it works decently AND we will probably replace
+ * the whole persisted format soon in any case. -- rgerhards, 2012-11-06
+ */
+ if(!isProp("offMSG"))
+ ABORT_FINALIZE(RS_RET_DS_PROP_SEQ_ERR);
+ MsgSetMSGoffs(pMsg, pVar->val.num);
+finalize_it:
+ if(pVar != NULL)
+ var.Destruct(&pVar);
+ RETiRet;
+}
+#undef isProp
+
+
/* Increment reference count - see description of the "msg"
* structure for details. As a convenience to developers,
* this method returns the msg pointer that is passed to it.
@@ -1684,16 +1871,6 @@ void MsgSetRuleset(msg_t *pMsg, ruleset_t *pRuleset)
}
-/* rgerhards 2012-04-18: set associated ruleset (by ruleset name)
- * If ruleset cannot be found, no update is done.
- */
-static void
-MsgSetRulesetByName(msg_t *pMsg, cstr_t *rulesetName)
-{
- rulesetGetRuleset(runConf, &(pMsg->pRuleset), rsCStrGetSzStrNoNULL(rulesetName));
-}
-
-
/* set TAG in msg object
* (rewritten 2009-06-18 rgerhards)
*/
@@ -2271,40 +2448,42 @@ char *textpri(char *pRes, size_t pResLen, int pri)
*/
typedef enum ENOWType { NOW_NOW, NOW_YEAR, NOW_MONTH, NOW_DAY, NOW_HOUR, NOW_HHOUR, NOW_QHOUR, NOW_MINUTE } eNOWType;
#define tmpBUFSIZE 16 /* size of formatting buffer */
-static uchar *getNOW(eNOWType eNow)
+static uchar *getNOW(eNOWType eNow, struct syslogTime *t)
{
uchar *pBuf;
- struct syslogTime t;
if((pBuf = (uchar*) MALLOC(sizeof(uchar) * tmpBUFSIZE)) == NULL) {
return NULL;
}
- datetime.getCurrTime(&t, NULL);
+ if(t->year == 0) { /* not yet set! */
+ datetime.getCurrTime(t, NULL);
+ }
+
switch(eNow) {
case NOW_NOW:
- snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d-%2.2d-%2.2d", t.year, t.month, t.day);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d-%2.2d-%2.2d", t->year, t->month, t->day);
break;
case NOW_YEAR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d", t.year);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d", t->year);
break;
case NOW_MONTH:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.month);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->month);
break;
case NOW_DAY:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.day);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->day);
break;
case NOW_HOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.hour);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->hour);
break;
case NOW_HHOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute / 30);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute / 30);
break;
case NOW_QHOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute / 15);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute / 15);
break;
case NOW_MINUTE:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute);
break;
}
@@ -2571,7 +2750,7 @@ finalize_it:
* Parameter "bMustBeFreed" is set by this function. It tells the
* caller whether or not the string returned must be freed by the
* caller itself. It is is 0, the caller MUST NOT free it. If it is
- * 1, the caller MUST free 1. Handling this wrongly leads to either
+ * 1, the caller MUST free it. Handling this wrongly leads to either
* a memory leak of a program abort (do to double-frees or frees on
* the constant memory pool). So be careful to do it right.
* rgerhards 2004-11-23
@@ -2588,7 +2767,7 @@ finalize_it:
return(UCHAR_CONSTANT("**OUT OF MEMORY**"));}
uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propid, es_str_t *propName, rs_size_t *pPropLen,
- unsigned short *pbMustBeFreed)
+ unsigned short *pbMustBeFreed, struct syslogTime *ttNow)
{
uchar *pRes; /* result pointer */
rs_size_t bufLen = -1; /* length of string or -1, if not known */
@@ -2703,52 +2882,68 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
pRes = (uchar*)getParseSuccess(pMsg);
break;
case PROP_SYS_NOW:
- if((pRes = getNOW(NOW_NOW)) == NULL) {
+ if((pRes = getNOW(NOW_NOW, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 10;
+ }
break;
case PROP_SYS_YEAR:
- if((pRes = getNOW(NOW_YEAR)) == NULL) {
+ if((pRes = getNOW(NOW_YEAR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 4;
+ }
break;
case PROP_SYS_MONTH:
- if((pRes = getNOW(NOW_MONTH)) == NULL) {
+ if((pRes = getNOW(NOW_MONTH, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_DAY:
- if((pRes = getNOW(NOW_DAY)) == NULL) {
+ if((pRes = getNOW(NOW_DAY, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_HOUR:
- if((pRes = getNOW(NOW_HOUR)) == NULL) {
+ if((pRes = getNOW(NOW_HOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_HHOUR:
- if((pRes = getNOW(NOW_HHOUR)) == NULL) {
+ if((pRes = getNOW(NOW_HHOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_QHOUR:
- if((pRes = getNOW(NOW_QHOUR)) == NULL) {
+ if((pRes = getNOW(NOW_QHOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_MINUTE:
- if((pRes = getNOW(NOW_MINUTE)) == NULL) {
+ if((pRes = getNOW(NOW_MINUTE, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_MYHOSTNAME:
pRes = glbl.GetLocalHostName();
@@ -2810,7 +3005,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
/* If we did not receive a template pointer, we are already done... */
- if(pTpe == NULL) {
+ if(pTpe == NULL || !pTpe->bComplexProcessing) {
*pPropLen = (bufLen == -1) ? ustrlen(pRes) : bufLen;
return pRes;
}
@@ -3399,9 +3594,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
jsonField(pTpe, &pRes, pbMustBeFreed, &bufLen);
}
- if(bufLen == -1)
- bufLen = ustrlen(pRes);
- *pPropLen = bufLen;
+ *pPropLen = (bufLen == -1) ? ustrlen(pRes) : bufLen;
ENDfunc
return(pRes);
@@ -3458,7 +3651,7 @@ msgGetMsgVarNew(msg_t *pThis, uchar *name)
/* always call MsgGetProp() without a template specifier */
/* TODO: optimize propNameToID() call -- rgerhards, 2009-06-26 */
propNameStrToID(name, &propid);
- pszProp = (uchar*) MsgGetProp(pThis, NULL, propid, NULL, &propLen, &bMustBeFreed);
+ pszProp = (uchar*) MsgGetProp(pThis, NULL, propid, NULL, &propLen, &bMustBeFreed, NULL);
estr = es_newStrFromCStr((char*)pszProp, propLen);
if(bMustBeFreed)
@@ -3557,12 +3750,10 @@ finalize_it:
* satisfies the base object class getSeverity semantics.
* rgerhards, 2008-01-14
*/
-static rsRetVal
-MsgGetSeverity(obj_t_ptr pThis, int *piSeverity)
+rsRetVal
+MsgGetSeverity(msg_t *pMsg, int *piSeverity)
{
- ISOBJ_TYPE_assert(pThis, msg);
- assert(piSeverity != NULL);
- *piSeverity = ((msg_t*) pThis)->iSeverity;
+ *piSeverity = pMsg->iSeverity;
return RS_RET_OK;
}
@@ -3820,25 +4011,25 @@ done: return dst;
rsRetVal
-msgSetJSONFromVar(msg_t *pMsg, uchar *varname, struct var *var)
+msgSetJSONFromVar(msg_t *pMsg, uchar *varname, struct var *v)
{
struct json_object *json = NULL;
char *cstr;
DEFiRet;
- switch(var->datatype) {
+ switch(v->datatype) {
case 'S':/* string */
- cstr = es_str2cstr(var->d.estr, NULL);
+ cstr = es_str2cstr(v->d.estr, NULL);
json = json_object_new_string(cstr);
free(cstr);
break;
case 'N':/* number (integer) */
- json = json_object_new_int((int) var->d.n);
+ json = json_object_new_int((int) v->d.n);
break;
case 'J':/* native JSON */
- json = jsonDeepCopy(var->d.json);
+ json = jsonDeepCopy(v->d.json);
break;
default:DBGPRINTF("msgSetJSONFromVar: unsupported datatype %c\n",
- var->datatype);
+ v->datatype);
ABORT_FINALIZE(RS_RET_ERR);
}
msgAddJSON(pMsg, varname+1, json);
@@ -3858,11 +4049,10 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE)
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(var, CORE_COMPONENT));
/* set our own handlers */
OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize);
- OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty);
- OBJSetMethodHandler(objMethod_GETSEVERITY, MsgGetSeverity);
/* some more inits */
# if HAVE_MALLOC_TRIM
INIT_ATOMIC_HELPER_MUT(mutTrimCtr);
diff --git a/runtime/msg.h b/runtime/msg.h
index ab479001..c3acebd1 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -146,6 +146,8 @@ struct msg {
PROTOTYPEObjClassInit(msg);
rsRetVal msgConstruct(msg_t **ppThis);
rsRetVal msgConstructWithTime(msg_t **ppThis, struct syslogTime *stTime, time_t ttGenTime);
+rsRetVal msgConstructForDeserializer(msg_t **ppThis);
+rsRetVal msgConstructFinalizer(msg_t *pThis);
rsRetVal msgDestruct(msg_t **ppM);
msg_t* MsgDup(msg_t* pOld);
msg_t *MsgAddRef(msg_t *pM);
@@ -172,7 +174,7 @@ void MsgSetRawMsg(msg_t *pMsg, char* pszRawMsg, size_t lenMsg);
rsRetVal MsgReplaceMSG(msg_t *pThis, uchar* pszMSG, int lenMSG);
uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propid, es_str_t *propName,
- rs_size_t *pPropLen, unsigned short *pbMustBeFreed);
+ rs_size_t *pPropLen, unsigned short *pbMustBeFreed, struct syslogTime *ttNow);
char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name);
@@ -185,6 +187,8 @@ rsRetVal msgGetCEEVar(msg_t *pThis, cstr_t *propName, var_t **ppVar);
es_str_t* msgGetCEEVarNew(msg_t *pMsg, char *name);
rsRetVal msgAddJSON(msg_t *pM, uchar *name, struct json_object *json);
rsRetVal getCEEPropVal(msg_t *pM, es_str_t *propName, uchar **pRes, rs_size_t *buflen, unsigned short *pbMustBeFreed);
+rsRetVal MsgGetSeverity(msg_t *pThis, int *piSeverity);
+rsRetVal MsgDeserialize(msg_t *pMsg, strm_t *pStrm);
/* TODO: remove these five (so far used in action.c) */
uchar *getMSG(msg_t *pM);
diff --git a/runtime/obj.c b/runtime/obj.c
index 3ecf9ab2..63f1f38c 100644
--- a/runtime/obj.c
+++ b/runtime/obj.c
@@ -604,7 +604,7 @@ finalize_it:
/* Deserialize a single property. Pointer must be positioned at begin of line. Whole line
* up until the \n is read.
*/
-static rsRetVal objDeserializeProperty(var_t *pProp, strm_t *pStrm)
+rsRetVal objDeserializeProperty(var_t *pProp, strm_t *pStrm)
{
DEFiRet;
number_t i;
@@ -767,21 +767,20 @@ finalize_it:
* of the trailer. Header must already have been processed.
* rgerhards, 2008-01-11
*/
-static rsRetVal objDeserializeProperties(obj_t *pObj, objInfo_t *pObjInfo, strm_t *pStrm)
+static rsRetVal objDeserializeProperties(obj_t *pObj, rsRetVal (*objSetProperty)(), strm_t *pStrm)
{
DEFiRet;
var_t *pVar = NULL;
ISOBJ_assert(pObj);
ISOBJ_TYPE_assert(pStrm, strm);
- ASSERT(pObjInfo != NULL);
CHKiRet(var.Construct(&pVar));
CHKiRet(var.ConstructFinalize(pVar));
iRet = objDeserializeProperty(pVar, pStrm);
while(iRet == RS_RET_OK) {
- CHKiRet(pObjInfo->objMethods[objMethod_SETPROPERTY](pObj, pVar));
+ CHKiRet(objSetProperty(pObj, pVar));
/* re-init var object - TODO: method of var! */
rsCStrDestruct(&pVar->pcsName); /* no longer needed */
if(pVar->varType == VARTYPE_STR) {
@@ -848,7 +847,7 @@ Deserialize(void *ppObj, uchar *pszTypeExpected, strm_t *pStrm, rsRetVal (*fFixu
CHKiRet(pObjInfo->objMethods[objMethod_CONSTRUCT](&pObj));
/* we got the object, now we need to fill the properties */
- CHKiRet(objDeserializeProperties(pObj, pObjInfo, pStrm));
+ CHKiRet(objDeserializeProperties(pObj, pObjInfo->objMethods[objMethod_SETPROPERTY], pStrm));
/* check if we need to call a fixup function that modifies the object
* before it is finalized. -- rgerhards, 2008-01-13
@@ -873,6 +872,104 @@ finalize_it:
}
+/* De-Serialize an object, with known constructur and destructor. Params like Deserialize().
+ * rgerhards, 2012-11-03
+ */
+rsRetVal
+objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr, rsRetVal (*objConstruct)(), rsRetVal (*objConstructFinalize)(), rsRetVal (*objDeserialize)())
+{
+ DEFiRet;
+ rsRetVal iRetLocal;
+ obj_t *pObj = NULL;
+ int oVers = 0; /* keep compiler happy, but it is totally useless but takes up some execution time... */
+ cstr_t *pstrID = NULL;
+
+ assert(ppObj != NULL);
+ assert(pszTypeExpected != NULL);
+ ISOBJ_TYPE_assert(pStrm, strm);
+
+ /* we de-serialize the header. if all goes well, we are happy. However, if
+ * we experience a problem, we try to recover. We do this by skipping to
+ * the next object header. This is defined via the line-start cookies. In
+ * worst case, we exhaust the queue, but then we receive EOF return state,
+ * from objDeserializeTryRecover(), what will cause us to ultimately give up.
+ * rgerhards, 2008-07-08
+ */
+ do {
+ iRetLocal = objDeserializeHeader((uchar*) "Obj", &pstrID, &oVers, pStrm);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("objDeserialize error %d during header processing - "
+ "trying to recover\n", iRetLocal);
+ CHKiRet(objDeserializeTryRecover(pStrm));
+ }
+ } while(iRetLocal != RS_RET_OK);
+
+ if(rsCStrSzStrCmp(pstrID, pszTypeExpected, lenTypeExpected))
+ ABORT_FINALIZE(RS_RET_INVALID_OID);
+
+ CHKiRet(objConstruct(&pObj));
+
+ /* we got the object, now we need to fill the properties */
+ CHKiRet(objDeserialize(pObj, pStrm));
+ CHKiRet(objDeserializeTrailer(pStrm)); /* do trailer checks */
+
+ /* check if we need to call a fixup function that modifies the object
+ * before it is finalized. -- rgerhards, 2008-01-13
+ */
+ if(fFixup != NULL)
+ CHKiRet(fFixup(pObj, pUsr));
+
+ /* we have a valid object, let's finalize our work and return */
+ if(objConstructFinalize != NULL) {
+ CHKiRet(objConstructFinalize(pObj));
+ }
+
+ *((obj_t**) ppObj) = pObj;
+
+finalize_it:
+ if(iRet != RS_RET_OK && pObj != NULL)
+ free(pObj); /* TODO: check if we can call destructor 2008-01-13 rger */
+
+ if(pstrID != NULL)
+ rsCStrDestruct(&pstrID);
+
+ RETiRet;
+}
+
+/* This is a dummy deserializer, to be used for the delete queue reader
+ * specifically. This is kind of a hack, but also to be replace (hopefully) soon
+ * by totally different code. So let's make it as simple as possible...
+ * rgerhards, 2012-11-06
+ */
+rsRetVal
+objDeserializeDummy(obj_t __attribute__((unused)) *pObj, strm_t *pStrm)
+{
+ DEFiRet;
+ var_t *pVar = NULL;
+
+ CHKiRet(var.Construct(&pVar));
+ CHKiRet(var.ConstructFinalize(pVar));
+
+ iRet = objDeserializeProperty(pVar, pStrm);
+ while(iRet == RS_RET_OK) {
+ /* this loop does actually NOGHTING but read the file... */
+ /* re-init var object - TODO: method of var! */
+ rsCStrDestruct(&pVar->pcsName); /* no longer needed */
+ if(pVar->varType == VARTYPE_STR) {
+ if(pVar->val.pStr != NULL)
+ rsCStrDestruct(&pVar->val.pStr);
+ }
+ iRet = objDeserializeProperty(pVar, pStrm);
+ }
+finalize_it:
+ if(iRet == RS_RET_NO_PROPLINE)
+ iRet = RS_RET_OK; /* NO_PROPLINE is OK and a kind of EOF! */
+ if(pVar != NULL)
+ var.Destruct(&pVar);
+ RETiRet;
+}
+
+
/* De-Serialize an object, but treat it as property bag.
* rgerhards, 2008-01-11
*/
@@ -909,7 +1006,7 @@ objDeserializeObjAsPropBag(obj_t *pObj, strm_t *pStrm)
CHKiRet(FindObjInfo(pstrID, &pObjInfo));
/* we got the object, now we need to fill the properties */
- CHKiRet(objDeserializeProperties(pObj, pObjInfo, pStrm));
+ CHKiRet(objDeserializeProperties(pObj, pObjInfo->objMethods[objMethod_SETPROPERTY], pStrm));
finalize_it:
if(pstrID != NULL)
@@ -961,7 +1058,7 @@ DeserializePropBag(obj_t *pObj, strm_t *pStrm)
CHKiRet(FindObjInfo(pstrID, &pObjInfo));
/* we got the object, now we need to fill the properties */
- CHKiRet(objDeserializeProperties(pObj, pObjInfo, pStrm));
+ CHKiRet(objDeserializeProperties(pObj, pObjInfo->objMethods[objMethod_SETPROPERTY], pStrm));
finalize_it:
if(pstrID != NULL)
diff --git a/runtime/obj.h b/runtime/obj.h
index 32f7ef09..27d32b7a 100644
--- a/runtime/obj.h
+++ b/runtime/obj.h
@@ -83,10 +83,7 @@
((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ; \
((obj_t*) (pThis))->pszName = NULL
#endif
-#define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(&pThis)
#define objSerialize(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_SERIALIZE])
-#define objGetSeverity(pThis, piSever) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_GETSEVERITY])(pThis, piSever)
-#define objDebugPrint(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DEBUGPRINT])(pThis)
#define OBJSetMethodHandler(methodID, pHdlr) \
CHKiRet(obj.InfoSetMethod(pObjInfoOBJ, methodID, (rsRetVal (*)(void*)) pHdlr))
@@ -121,6 +118,9 @@ ENDinterface(obj)
rsRetVal objGetObjInterface(obj_if_t *pIf);
PROTOTYPEObjClassInit(obj);
PROTOTYPEObjClassExit(obj);
+rsRetVal objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr, rsRetVal (*objConstruct)(), rsRetVal (*objConstructFinalize)(), rsRetVal (*objDeserialize)());
+rsRetVal objDeserializeProperty(var_t *pProp, strm_t *pStrm);
+rsRetVal objDeserializeDummy(obj_t *pObj, strm_t *pStrm);
/* the following definition is only for "friends" */
diff --git a/runtime/queue.c b/runtime/queue.c
index bb40e540..99fb5fbd 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -59,7 +59,6 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "statsobj.h"
-#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */
#ifdef OS_SOLARIS
# include <sched.h>
@@ -74,7 +73,7 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(statsobj)
/* forward-definitions */
-static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -83,7 +82,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@@ -242,8 +241,8 @@ getQueueTypeName(queueType_t t)
case QUEUETYPE_DIRECT:
r = "Direct";
break;
- default:
- r = "unknown queue type";
+ default:
+ r = "invalid/unknown queue mode";
break;
}
return r;
@@ -315,16 +314,16 @@ getLogicalQueueSize(qqueue_t *pThis)
*/
static inline void queueDrain(qqueue_t *pThis)
{
- void *pUsr;
+ msg_t *pMsg;
ASSERT(pThis != NULL);
BEGINfunc
DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
- pThis->qDeq(pThis, &pUsr);
- if(pUsr != NULL) {
- objDestruct(pUsr);
+ pThis->qDeq(pThis, &pMsg);
+ if(pMsg != NULL) {
+ msgDestruct(&pMsg);
}
pThis->qDel(pThis);
}
@@ -414,7 +413,7 @@ StartDA(qqueue_t *pThis)
*/
pThis->pqDA->pqParent = pThis;
- CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr));
+ CHKiRet(qqueueSetpAction(pThis->pqDA, pThis->pAction));
CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
@@ -549,7 +548,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis)
}
-static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
+static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in)
{
DEFiRet;
@@ -563,7 +562,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
}
-static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
+static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out)
{
DEFiRet;
@@ -624,7 +623,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
RETiRet;
}
-static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pMsg)
{
qLinkedList_t *pEntry;
DEFiRet;
@@ -632,7 +631,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
- pEntry->pUsr = pUsr;
+ pEntry->pMsg = pMsg;
if(pThis->tVars.linklist.pDelRoot == NULL) {
pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
@@ -650,14 +649,13 @@ finalize_it:
}
-static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
+static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppMsg)
{
qLinkedList_t *pEntry;
DEFiRet;
pEntry = pThis->tVars.linklist.pDeqRoot;
- ISOBJ_TYPE_assert(pEntry->pUsr, msg);
- *ppUsr = pEntry->pUsr;
+ *ppMsg = pEntry->pMsg;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
RETiRet;
@@ -747,18 +745,12 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
{
DEFiRet;
strm_t *psQIF = NULL;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
struct stat stat_buf;
ISOBJ_TYPE_assert(pThis, qqueue);
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
-
/* check if the file exists */
- if(stat((char*) pszQIFNam, &stat_buf) == -1) {
+ if(stat((char*) pThis->pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n");
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
@@ -773,7 +765,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(strm.Construct(&psQIF));
CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ));
CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
- CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam));
CHKiRet(strm.ConstructFinalize(psQIF));
/* first, we try to read the property bag for ourselfs */
@@ -785,9 +777,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- /* create a duplicate for the read "pointer".
- */
-
+ /* create a duplicate for the read "pointer". */
CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
@@ -806,7 +796,7 @@ finalize_it:
strm.Destruct(&psQIF);
if(iRet != RS_RET_OK) {
- DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
+ DBGOPRINT((obj_t*) pThis, "state %d reading .qi file - can not read persisted info (if any)\n",
iRet);
}
@@ -898,7 +888,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
RETiRet;
}
-static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg)
{
DEFiRet;
number_t nWriteCount;
@@ -906,7 +896,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
ASSERT(pThis != NULL);
CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount));
- CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
+ CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite));
CHKiRet(strm.Flush(pThis->tVars.disk.pWrite));
CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */
@@ -916,7 +906,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
* the in-memory representation. The instance will be re-created upon
* dequeue. -- rgerhards, 2008-07-09
*/
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n",
nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly);
@@ -926,43 +916,11 @@ finalize_it:
}
-static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
+static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg)
{
DEFiRet;
- iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
- RETiRet;
-}
-
-
-static rsRetVal qDelDisk(qqueue_t *pThis)
-{
- obj_t *pDummyObj; /* we need to deserialize it... */
- DEFiRet;
-
- int64 offsIn;
- int64 offsOut;
-
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn));
- CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL));
- objDestruct(pDummyObj);
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut));
-
- /* This time it is a bit tricky: we free disk space only upon file deletion. So we need
- * to keep track of what we have read until we get an out-offset that is lower than the
- * in-offset (which indicates file change). Then, we can subtract the whole thing from
- * the on-disk size. -- rgerhards, 2008-01-30
- */
- if(offsIn < offsOut) {
- pThis->tVars.disk.bytesRead += offsOut - offsIn;
- } else {
- pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
- pThis->tVars.disk.bytesRead = offsOut;
- DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
- /* awake possibly waiting enq process */
- pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
- }
-
-finalize_it:
+ iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL,
+ NULL, msgConstructForDeserializer, NULL, MsgDeserialize);
RETiRet;
}
@@ -979,7 +937,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
{
batch_t singleBatch;
batch_obj_t batchObj;
@@ -1001,16 +959,16 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
- batchObj.pUsrp = (obj_t*) pUsr;
+ batchObj.pMsg = pMsg;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.active = &active;
- iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
free(batchObj.staticActStrings[i]);
}
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
RETiRet;
}
@@ -1032,7 +990,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, pBatch, &pThis->bShutdownImmediate);
RETiRet;
}
@@ -1053,13 +1011,13 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal
-qqueueAdd(qqueue_t *pThis, void *pUsr)
+qqueueAdd(qqueue_t *pThis, msg_t *pMsg)
{
DEFiRet;
ASSERT(pThis != NULL);
- CHKiRet(pThis->qAdd(pThis, pUsr));
+ CHKiRet(pThis->qAdd(pThis, pMsg));
if(pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
@@ -1075,7 +1033,7 @@ finalize_it:
/* generic code to dequeue a queue entry
*/
static rsRetVal
-qqueueDeq(qqueue_t *pThis, void **ppUsr)
+qqueueDeq(qqueue_t *pThis, msg_t **ppMsg)
{
DEFiRet;
@@ -1086,7 +1044,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- iRet = pThis->qDeq(pThis, ppUsr);
+ iRet = pThis->qDeq(pThis, ppMsg);
ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
@@ -1184,11 +1142,11 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
rsRetVal iRetLocal;
DEFiRet;
-RUNLOG_STR("trying to shutdown workers within Action Timeout");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
+ DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout");
DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
@@ -1459,22 +1417,21 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
* the return state!
* rgerhards, 2008-01-24
*/
-static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pMsg)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
ISOBJ_TYPE_assert(pThis, qqueue);
- ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
- iRetLocal = objGetSeverity(pUsr, &iSeverity);
+ iRetLocal = MsgGetSeverity(pMsg, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
iQueueSize, iSeverity);
STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg "
@@ -1493,13 +1450,32 @@ static inline rsRetVal
DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
{
int i;
+ off64_t bytesDel;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
/* now send delete request to storage driver */
- for(i = 0 ; i < nElem ; ++i) {
- pThis->qDel(pThis);
+ if(pThis->qType == QUEUETYPE_DISK) {
+ strmMultiFileSeek(pThis->tVars.disk.pReadDel, pThis->tVars.disk.deqFileNumOut,
+ pThis->tVars.disk.deqOffs, &bytesDel);
+ /* We need to correct the on-disk file size. This time it is a bit tricky:
+ * we free disk space only upon file deletion. So we need to keep track of what we
+ * have read until we get an out-offset that is lower than the in-offset (which
+ * indicates file change). Then, we can subtract the whole thing from the on-disk
+ * size. -- rgerhards, 2008-01-30
+ */
+ if(bytesDel != 0) {
+ pThis->tVars.disk.sizeOnDisk -= bytesDel;
+ DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk "
+ "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk);
+ /* awake possibly waiting enq process */
+ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
+ }
+ } else { /* memory queue */
+ for(i = 0 ; i < nElem ; ++i) {
+ pThis->qDel(pThis);
+ }
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
@@ -1560,7 +1536,7 @@ static inline rsRetVal
DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
- void *pUsr;
+ msg_t *pMsg;
int nEnqueued = 0;
rsRetVal localRet;
DEFiRet;
@@ -1569,17 +1545,16 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
- pUsr = pBatch->pElem[i].pUsrp;
+ pMsg = pBatch->pElem[i].pMsg;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
- localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
- (obj_t*)MsgAddRef((msg_t*) pUsr));
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
}
}
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
}
DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
@@ -1611,7 +1586,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
int nDiscarded;
int nDeleted;
int iQueueSize;
- void *pUsr;
+ msg_t *pMsg;
rsRetVal localRet;
DEFiRet;
@@ -1619,11 +1594,14 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
+ if(pThis->qType == QUEUETYPE_DISK) {
+ pThis->tVars.disk.deqFileNumIn = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
+ }
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
- CHKiRet(qqueueDeq(pThis, &pUsr));
+ CHKiRet(qqueueDeq(pThis, &pMsg));
/* check if we should discard this element */
- localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr);
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg);
if(localRet == RS_RET_QUEUE_FULL) {
++nDiscarded;
continue;
@@ -1632,11 +1610,16 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
}
/* all well, use this element */
- pWti->batch.pElem[nDequeued].pUsrp = pUsr;
+ pWti->batch.pElem[nDequeued].pMsg = pMsg;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
++nDequeued;
}
+ if(pThis->qType == QUEUETYPE_DISK) {
+ strm.GetCurrOffset(pThis->tVars.disk.pReadDeq, &pThis->tVars.disk.deqOffs);
+ pThis->tVars.disk.deqFileNumOut = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
+ }
+
/* it is sufficient to persist only when the bulk of work is done */
qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
@@ -1644,7 +1627,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
pWti->batch.nElemDeq = nDequeued + nDiscarded;
pWti->batch.deqID = getNextDeqID(pThis);
*piRemainingQueueSize = iQueueSize;
-
finalize_it:
RETiRet;
}
@@ -1680,7 +1662,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- // TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
@@ -1874,7 +1855,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* at this spot, we may be cancelled */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
- CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
+ CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1932,12 +1913,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
- * the message. So far, we simply assume we always have msg_t, what currently is always the case.
- * rgerhards, 2009-05-28
- */
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
- (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ MsgAddRef(pWti->batch.pElem[i].pMsg)));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
}
@@ -2018,6 +1995,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
uchar pszBuf[64];
+ uchar pszQIFNam[MAXFNAME];
int wrk;
uchar *qName;
size_t lenBuf;
@@ -2040,8 +2018,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qDeq = qDeqLinkedList;
+ pThis->qDel = qDelLinkedList;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
@@ -2049,10 +2027,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qDestruct = qDestructDisk;
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
- pThis->qDel = qDelDisk;
+ pThis->qDel = NULL; /* delete for disk handled via special code! */
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
+ /* pre-construct file name for .qi file */
+ pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar),
+ "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
+ pThis->pszQIFNam = ustrdup(pszQIFNam);
+ DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam,
+ (int) pThis->lenQIFNam);
break;
case QUEUETYPE_DIRECT:
pThis->qConstruct = qConstructDirect;
@@ -2179,7 +2163,7 @@ finalize_it:
}
-/* persist the queue to disk. If we have something to persist, we first
+/* persist the queue to disk (write the .qi file). If we have something to persist, we first
* save the information on the queue properties itself and then we call
* the queue-type specific drivers.
* Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint,
@@ -2190,8 +2174,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
{
DEFiRet;
strm_t *psQIF = NULL; /* Queue Info File */
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
ASSERT(pThis != NULL);
@@ -2209,13 +2191,9 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis));
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
-
if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) {
if(pThis->bNeedDelQIF) {
- unlink((char*)pszQIFNam);
+ unlink((char*)pThis->pszQIFNam);
pThis->bNeedDelQIF = 0;
}
/* indicate spool file needs to be deleted */
@@ -2228,7 +2206,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC));
CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles));
CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
- CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam));
CHKiRet(strm.ConstructFinalize(psQIF));
/* first, write the property bag for ourselfs
@@ -2240,7 +2218,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
- objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(obj.EndSerialize(psQIF));
/* now persist the stream info */
@@ -2470,7 +2447,7 @@ finalize_it:
* rgerhards, 2009-06-16
*/
static inline rsRetVal
-doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
{
DEFiRet;
int err;
@@ -2479,7 +2456,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued);
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
- CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg));
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2556,7 +2533,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
if(pThis->toEnq == 0 || pThis->bEnqOnly) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq);
@@ -2568,7 +2545,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
@@ -2576,7 +2553,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
}
/* and finally enqueue the message */
- CHKiRet(qqueueAdd(pThis, pUsr));
+ CHKiRet(qqueueAdd(pThis, pMsg));
STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize);
finalize_it:
@@ -2652,11 +2629,11 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
+qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pUsr);
+ iRet = qAddDirect(pThis, pMsg);
RETiRet;
}
@@ -2665,7 +2642,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
{
DEFiRet;
int iCancelStateSave;
@@ -2677,7 +2654,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
d_pthread_mutex_lock(pThis->mut);
}
- CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr));
+ CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg));
qqueueChkPersist(pThis, 1);
@@ -2806,7 +2783,7 @@ DEFpropSetMeth(qqueue, iLightDlyMrk, int)
DEFpropSetMeth(qqueue, bIsDA, int)
DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
-DEFpropSetMeth(qqueue, pUsr, void*)
+DEFpropSetMeth(qqueue, pAction, action_t*)
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
DEFpropSetMeth(qqueue, iDeqBatchSize, int)
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
@@ -2829,8 +2806,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
pThis->iQueueSize = pProp->val.num;
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
- } else if(isProp("tVars.disk.bytesRead")) {
- pThis->tVars.disk.bytesRead = pProp->val.num;
} else if(isProp("qType")) {
if(pThis->qType != pProp->val.num)
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);
diff --git a/runtime/queue.h b/runtime/queue.h
index 91c100ed..886fac8d 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -51,7 +51,7 @@ typedef enum {
/* list member definition for linked list types of queues: */
typedef struct qLinkedList_S {
struct qLinkedList_S *pNext;
- void *pUsr;
+ msg_t *pMsg;
} qLinkedList_t;
@@ -71,7 +71,7 @@ struct queue_s {
int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */
wtp_t *pWtpDA;
wtp_t *pWtpReg;
- void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */
+ action_t *pAction; /* for action queues, ptr to action object; for main queues unused */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
sbool bSyncQueueFiles;/* if working with files, sync them after each write? */
@@ -111,8 +111,8 @@ struct queue_s {
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
rsRetVal (*qDestruct)(struct queue_s *pThis);
- rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
- rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
+ rsRetVal (*qAdd)(struct queue_s *pThis, msg_t *pMsg);
+ rsRetVal (*qDeq)(struct queue_s *pThis, msg_t **ppMsg);
rsRetVal (*qDel)(struct queue_s *pThis);
/* end type-specific handler */
/* public entry points (set during construction, permit to set best algorithm for params selected) */
@@ -135,6 +135,8 @@ struct queue_s {
size_t lenSpoolDir;
uchar *pszFilePrefix;
size_t lenFilePrefix;
+ uchar *pszQIFNam; /* full .qi file name, based on parts above */
+ size_t lenQIFNam;
int iNumberFiles; /* how many files make up the queue? */
int64 iMaxFileSize; /* max size for a single queue file */
int64 sizeOnDiskMax; /* maximum size on disk allowed */
@@ -145,7 +147,8 @@ struct queue_s {
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
/* now follow queueing mode specific data elements */
- union { /* different data elements based on queue type (qType) */
+ //union { /* different data elements based on queue type (qType) */
+ struct { /* different data elements based on queue type (qType) */
struct {
long deqhead, head, tail;
void** pBuf; /* the queued user data structure */
@@ -157,7 +160,9 @@ struct queue_s {
} linklist;
struct {
int64 sizeOnDisk; /* current amount of disk space used */
- int64 bytesRead; /* number of bytes read from current (undeleted!) file */
+ int64 deqOffs; /* offset after dequeue batch - used for file deleter */
+ int deqFileNumIn; /* same for the circular file numbers, mainly for */
+ int deqFileNumOut;/* deleting finished files */
strm_t *pWrite; /* current file to be written */
strm_t *pReadDeq; /* current file for dequeueing */
strm_t *pReadDel; /* current file for deleting */
@@ -184,8 +189,8 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr);
-rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
+rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg);
+rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
@@ -215,7 +220,7 @@ PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int);
PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int);
PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int);
PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
-PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
+PROTOTYPEpropSetMeth(qqueue, pAction, action_t*);
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c
new file mode 100644
index 00000000..4b618fb5
--- /dev/null
+++ b/runtime/ratelimit.c
@@ -0,0 +1,359 @@
+/* ratelimit.c
+ * support for rate-limiting sources, including "last message
+ * repeated n times" processing.
+ *
+ * Copyright 2012 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include "rsyslog.h"
+#include "errmsg.h"
+#include "ratelimit.h"
+#include "datetime.h"
+#include "parser.h"
+#include "unicode-helper.h"
+#include "msg.h"
+#include "rsconf.h"
+#include "dirty.h"
+
+/* definitions for objects we access */
+DEFobjStaticHelpers
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(datetime)
+DEFobjCurrIf(parser)
+
+/* static data */
+
+/* generate a "repeated n times" message */
+static inline msg_t *
+ratelimitGenRepMsg(ratelimit_t *ratelimit)
+{
+ msg_t *repMsg;
+ size_t lenRepMsg;
+ uchar szRepMsg[1024];
+
+ if(ratelimit->nsupp == 1) { /* we simply use the original message! */
+ repMsg = MsgAddRef(ratelimit->pMsg);
+ } else {/* we need to duplicate, original message may still be in use in other
+ * parts of the system! */
+ if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) {
+ DBGPRINTF("Message duplication failed, dropping repeat message.\n");
+ goto done;
+ }
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
+ " message repeated %d times: [%.800s]",
+ ratelimit->nsupp, getMSG(ratelimit->pMsg));
+ MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
+ }
+
+done: return repMsg;
+}
+
+static inline rsRetVal
+doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
+{
+ int bNeedUnlockMutex = 0;
+ rsRetVal localRet;
+ DEFiRet;
+
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+
+ if(ratelimit->bThreadSafe) {
+ pthread_mutex_lock(&ratelimit->mut);
+ bNeedUnlockMutex = 1;
+ }
+
+ if( ratelimit->pMsg != NULL &&
+ getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
+ !ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) &&
+ !strcmp(getHOSTNAME(pMsg), getHOSTNAME(ratelimit->pMsg)) &&
+ !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(ratelimit->pMsg, LOCK_MUTEX)) &&
+ !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(ratelimit->pMsg, LOCK_MUTEX))) {
+ ratelimit->nsupp++;
+ DBGPRINTF("msg repeated %d times\n", ratelimit->nsupp);
+ /* use current message, so we have the new timestamp
+ * (means we need to discard previous one) */
+ msgDestruct(&ratelimit->pMsg);
+ ratelimit->pMsg = pMsg;
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ } else {/* new message, do "repeat processing" & save it */
+ if(ratelimit->pMsg != NULL) {
+ if(ratelimit->nsupp > 0) {
+ *ppRepMsg = ratelimitGenRepMsg(ratelimit);
+ ratelimit->nsupp = 0;
+ }
+ msgDestruct(&ratelimit->pMsg);
+ }
+ ratelimit->pMsg = MsgAddRef(pMsg);
+ }
+
+finalize_it:
+ if(bNeedUnlockMutex)
+ pthread_mutex_unlock(&ratelimit->mut);
+ RETiRet;
+}
+
+
+/* helper: tell how many messages we lost due to linux-like ratelimiting */
+static inline void
+tellLostCnt(ratelimit_t *ratelimit)
+{
+ uchar msgbuf[1024];
+ if(ratelimit->missed) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: %u messages lost due to rate-limiting",
+ ratelimit->name, ratelimit->missed);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ ratelimit->missed = 0;
+ }
+}
+
+/* Linux-like ratelimiting, modelled after the linux kernel
+ * returns 1 if message is within rate limit and shall be
+ * processed, 0 otherwise.
+ * This implementation is NOT THREAD-SAFE and must not
+ * be called concurrently.
+ */
+static inline int
+withinRatelimit(ratelimit_t *ratelimit, time_t tt)
+{
+ int ret;
+ uchar msgbuf[1024];
+
+ if(ratelimit->interval == 0) {
+ ret = 1;
+ goto finalize_it;
+ }
+
+ assert(ratelimit->burst != 0);
+
+ if(ratelimit->begin == 0)
+ ratelimit->begin = tt;
+
+ /* resume if we go out of out time window */
+ if(tt > ratelimit->begin + ratelimit->interval) {
+ tellLostCnt(ratelimit);
+ ratelimit->begin = 0;
+ ratelimit->done = 0;
+ }
+
+ /* do actual limit check */
+ if(ratelimit->burst > ratelimit->done) {
+ ratelimit->done++;
+ ret = 1;
+ } else {
+ if(ratelimit->missed == 0) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: begin to drop messages due to rate-limiting",
+ ratelimit->name);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ }
+ ratelimit->missed++;
+ ret = 0;
+ }
+
+finalize_it:
+ return ret;
+}
+
+
+/* ratelimit a message, that means:
+ * - handle "last message repeated n times" logic
+ * - handle actual (discarding) rate-limiting
+ * This function returns RS_RET_OK, if the caller shall process
+ * the message regularly and RS_RET_DISCARD if the caller must
+ * discard the message. The caller should also discard the message
+ * if another return status occurs. This places some burden on the
+ * caller logic, but provides best performance. Demanding this
+ * cooperative mode can enable a faulty caller to thrash up part
+ * of the system, but we accept that risk (a faulty caller can
+ * always do all sorts of evil, so...)
+ * If *ppRepMsg != NULL on return, the caller must enqueue that
+ * message before the original message.
+ */
+rsRetVal
+ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
+{
+ DEFiRet;
+
+ *ppRepMsg = NULL;
+ if(ratelimit->interval) {
+ if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) {
+ msgDestruct(&pMsg);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+ if(ratelimit->bReduceRepeatMsgs) {
+ CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg));
+ }
+finalize_it:
+ RETiRet;
+}
+
+/* returns 1, if the ratelimiter performs any checks and 0 otherwise */
+int
+ratelimitChecked(ratelimit_t *ratelimit)
+{
+ return ratelimit->interval || ratelimit->bReduceRepeatMsgs;
+}
+
+
+/* add a message to a ratelimiter/multisubmit structure.
+ * ratelimiting is automatically handled according to the ratelimit
+ * settings.
+ * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration)
+ */
+rsRetVal
+ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg)
+{
+ rsRetVal localRet;
+ msg_t *repMsg;
+ DEFiRet;
+
+ if(pMultiSub == NULL) {
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL)
+ CHKiRet(submitMsg2(repMsg));
+ if(localRet == RS_RET_OK)
+ CHKiRet(submitMsg2(pMsg));
+ } else {
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
+ if(localRet == RS_RET_OK) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* modname must be a static name (usually expected to be the module
+ * name and MUST be present. dynname may be NULL and can be used for
+ * dynamic information, e.g. PID or listener IP, ...
+ * Both values should be kept brief.
+ */
+rsRetVal
+ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname)
+{
+ ratelimit_t *pThis;
+ char namebuf[256];
+ DEFiRet;
+
+ CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t)));
+ if(modname == NULL)
+ modname ="*ERROR:MODULE NAME MISSING*";
+
+ if(dynname == NULL) {
+ pThis->name = strdup(modname);
+ } else {
+ snprintf(namebuf, sizeof(namebuf), "%s[%s]",
+ modname, dynname);
+ namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */
+ pThis->name = strdup(namebuf);
+ }
+ pThis->bReduceRepeatMsgs = loadConf->globals.bReduceRepeatMsgs;
+ *ppThis = pThis;
+finalize_it:
+ RETiRet;
+}
+
+
+/* enable linux-like ratelimiting */
+void
+ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst)
+{
+ ratelimit->interval = interval;
+ ratelimit->burst = burst;
+ ratelimit->done = 0;
+ ratelimit->missed = 0;
+ ratelimit->begin = 0;
+}
+
+
+/* enable thread-safe operations mode. This make sure that
+ * a single ratelimiter can be called from multiple threads. As
+ * this causes some overhead and is not always required, it needs
+ * to be explicitely enabled. This operation cannot be undone
+ * (think: why should one do that???)
+ */
+void
+ratelimitSetThreadSafe(ratelimit_t *ratelimit)
+{
+ ratelimit->bThreadSafe = 1;
+ pthread_mutex_init(&ratelimit->mut, NULL);
+}
+
+void
+ratelimitDestruct(ratelimit_t *ratelimit)
+{
+ msg_t *pMsg;
+ if(ratelimit->pMsg != NULL) {
+ if(ratelimit->nsupp > 0) {
+ pMsg = ratelimitGenRepMsg(ratelimit);
+ if(pMsg != NULL)
+ submitMsg2(pMsg);
+ }
+ msgDestruct(&ratelimit->pMsg);
+ }
+ tellLostCnt(ratelimit);
+ if(ratelimit->bThreadSafe)
+ pthread_mutex_destroy(&ratelimit->mut);
+ free(ratelimit->name);
+ free(ratelimit);
+}
+
+void
+ratelimitModExit(void)
+{
+ objRelease(datetime, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+}
+
+rsRetVal
+ratelimitModInit(void)
+{
+ DEFiRet;
+ CHKiRet(objGetObjInterface(&obj));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+finalize_it:
+ RETiRet;
+}
+
diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h
new file mode 100644
index 00000000..820817bc
--- /dev/null
+++ b/runtime/ratelimit.h
@@ -0,0 +1,51 @@
+/* header for ratelimit.c
+ *
+ * Copyright 2012 Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef INCLUDED_RATELIMIT_H
+#define INCLUDED_RATELIMIT_H
+
+struct ratelimit_s {
+ char *name; /**< rate limiter name, e.g. for user messages */
+ /* support for Linux kernel-type ratelimiting */
+ unsigned short interval;
+ unsigned short burst;
+ unsigned done;
+ unsigned missed;
+ time_t begin;
+ /* support for "last message repeated n times */
+ int bReduceRepeatMsgs; /**< shall we do "last message repeated n times" processing? */
+ unsigned nsupp; /**< nbr of msgs suppressed */
+ msg_t *pMsg;
+ sbool bThreadSafe; /**< do we need to operate in Thread-Safe mode? */
+ pthread_mutex_t mut; /**< mutex if thread-safe operation desired */
+};
+
+/* prototypes */
+rsRetVal ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname);
+void ratelimitSetThreadSafe(ratelimit_t *ratelimit);
+void ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst);
+rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep);
+rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg);
+void ratelimitDestruct(ratelimit_t *pThis);
+int ratelimitChecked(ratelimit_t *ratelimit);
+rsRetVal ratelimitModInit(void);
+void ratelimitModExit(void);
+
+#endif /* #ifndef INCLUDED_RATELIMIT_H */
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index ac9cd800..d8b81f1b 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -115,8 +115,8 @@ void cnfDoCfsysline(char *ln);
*/
BEGINobjConstruct(rsconf) /* be sure to specify the object type also in END macro! */
pThis->globals.bDebugPrintTemplateList = 1;
- pThis->globals.bDebugPrintModuleList = 1;
- pThis->globals.bDebugPrintCfSysLineHandlerList = 1;
+ pThis->globals.bDebugPrintModuleList = 0;
+ pThis->globals.bDebugPrintCfSysLineHandlerList = 0;
pThis->globals.bLogStatusMsgs = DFLT_bLogStatusMsgs;
pThis->globals.bErrMsgToStderr = 1;
pThis->globals.umask = -1;
@@ -414,7 +414,8 @@ void cnfDoObj(struct cnfobj *o)
inputProcessCnf(o);
break;
case CNFOBJ_TPL:
- tplProcessCnf(o);
+ if(tplProcessCnf(o) != RS_RET_OK)
+ parser_errmsg("error processing template object");
break;
case CNFOBJ_RULESET:
rulesetProcessCnf(o);
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 60f0d1a7..936a76e2 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -69,6 +69,7 @@
* approach taken here is considered appropriate.
* rgerhards, 2010-06-24
*/
+#define CONF_NUM_MULTISUB 1024 /* default number of messages per multisub structure */
/* ############################################################# *
* # End Config Settings # *
@@ -376,6 +377,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_LEGA_ACT_NOT_SUPPORTED = -2215, /**< the module (no longer) supports legacy action syntax */
RS_RET_MAX_OMSR_REACHED = -2216, /**< max nbr of string requests reached, not supported by core */
RS_RET_UID_MISSING = -2217, /**< a user id is missing (but e.g. a password provided) */
+ RS_RET_DATAFAIL = -2218, /**< data passed to action caused failure */
/* reserved for pre-v6.5 */
RS_RET_DUP_PARAM = -2220, /**< config parameter is given more than once */
RS_RET_MODULE_ALREADY_IN_CONF = -2221, /**< module already in current configuration */
@@ -393,6 +395,9 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_INVLD_SETOP = -2305, /**< invalid variable set operation, incompatible type */
RS_RET_RULESET_EXISTS = -2306,/**< ruleset already exists */
RS_RET_DEPRECATED = -2307,/**< deprecated functionality is used */
+ RS_RET_DS_PROP_SEQ_ERR = -2308,/**< property sequence error deserializing object */
+ RS_RET_TPL_INVLD_PROP = -2309,/**< property name error in template (unknown name) */
+ RS_RET_NO_RULEBASE = -2310,/**< mmnormalize: rulebase can not be found or otherwise invalid */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index b74f8ec8..21c3c337 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -194,7 +194,7 @@ processBatchMultiRuleset(batch_t *pBatch)
for(i = iStart ; i < pBatch->nElem ; ++i) {
if(batchElemGetRuleset(pBatch, i) == currRuleset) {
/* for performance reasons, we copy only those members that we actually need */
- snglRuleBatch.pElem[iNew].pUsrp = pBatch->pElem[i].pUsrp;
+ snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg;
snglRuleBatch.pElem[iNew].state = pBatch->pElem[i].state;
++iNew;
/* We indicate the element also as done, so it will not be processed again */
@@ -244,8 +244,8 @@ execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if( pBatch->pElem[i].state != BATCH_STATE_DISC
&& (active == NULL || active[i])) {
- cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pUsrp);
- msgSetJSONFromVar((msg_t*)pBatch->pElem[i].pUsrp, stmt->d.s_set.varname,
+ cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg);
+ msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname,
&result);
varDelete(&result);
}
@@ -261,7 +261,7 @@ execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if( pBatch->pElem[i].state != BATCH_STATE_DISC
&& (active == NULL || active[i])) {
- msgUnsetJSON((msg_t*)pBatch->pElem[i].pUsrp, stmt->d.s_unset.varname);
+ msgUnsetJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname);
}
}
RETiRet;
@@ -305,8 +305,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
if(pBatch->pElem[i].state == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
if(active == NULL || active[i]) {
- bRet = cnfexprEvalBool(stmt->d.s_if.expr,
- (msg_t*)(pBatch->pElem[i].pUsrp));
+ bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg);
} else
bRet = 0;
newAct[i] = bRet;
@@ -344,7 +343,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
return;
if(pBatch->pElem[i].state == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
- pMsg = (msg_t*)(pBatch->pElem[i].pUsrp);
+ pMsg = pBatch->pElem[i].pMsg;
if(active == NULL || active[i]) {
if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) ||
((stmt->d.s_prifilt.pmask[pMsg->iFacility]
@@ -387,7 +386,8 @@ evalPROPFILT(struct cnfstmt *stmt, msg_t *pMsg)
goto done;
pszPropVal = MsgGetProp(pMsg, NULL, stmt->d.s_propfilt.propID,
- stmt->d.s_propfilt.propName, &propLen, &pbMustBeFreed);
+ stmt->d.s_propfilt.propName, &propLen,
+ &pbMustBeFreed, NULL);
/* Now do the compares (short list currently ;)) */
switch(stmt->d.s_propfilt.operation ) {
@@ -476,7 +476,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
if(pBatch->pElem[i].state == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
if(active == NULL || active[i]) {
- bRet = evalPROPFILT(stmt, (msg_t*)(pBatch->pElem[i].pUsrp));
+ bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg);
} else
bRet = 0;
thenAct[i] = bRet;
diff --git a/runtime/srutils.c b/runtime/srutils.c
index 4ce6196a..7b485b23 100644
--- a/runtime/srutils.c
+++ b/runtime/srutils.c
@@ -527,8 +527,7 @@ char *rs_strerror_r(int errnum, char *buf, size_t buflen) {
}
-/* Decode a symbolic name to a numeric value
- */
+/* Decode a symbolic name to a numeric value */
int decodeSyslogName(uchar *name, syslogName_t *codetab)
{
register syslogName_t *c;
@@ -538,22 +537,23 @@ int decodeSyslogName(uchar *name, syslogName_t *codetab)
ASSERT(name != NULL);
ASSERT(codetab != NULL);
- dbgprintf("symbolic name: %s", name);
- if (isdigit((int) *name))
- {
- dbgprintf("\n");
+ DBGPRINTF("symbolic name: %s", name);
+ if(isdigit((int) *name)) {
+ DBGPRINTF("\n");
return (atoi((char*) name));
}
strncpy((char*) buf, (char*) name, 79);
- for (p = buf; *p; p++)
+ for(p = buf; *p; p++) {
if (isupper((int) *p))
*p = tolower((int) *p);
- for (c = codetab; c->c_name; c++)
- if (!strcmp((char*) buf, (char*) c->c_name))
- {
- dbgprintf(" ==> %d\n", c->c_val);
+ }
+ for(c = codetab; c->c_name; c++) {
+ if(!strcmp((char*) buf, (char*) c->c_name)) {
+ DBGPRINTF(" ==> %d\n", c->c_val);
return (c->c_val);
}
+ }
+ DBGPRINTF("\n");
return (-1);
}
diff --git a/runtime/stream.c b/runtime/stream.c
index 193d14db..9f4d3556 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -74,11 +74,12 @@ DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
/* forward definitions */
-static rsRetVal strmFlushInternal(strm_t *pThis);
+static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
-static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush);
+static rsRetVal doZipFinish(strm_t *pThis);
static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
@@ -341,7 +342,10 @@ static rsRetVal strmCloseFile(strm_t *pThis)
(pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName);
if(pThis->tOperationsMode != STREAMMODE_READ) {
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
+ if(pThis->iZipLevel) {
+ doZipFinish(pThis);
+ }
if(pThis->bAsyncWrite) {
strmWaitAsyncWriterDone(pThis);
}
@@ -361,7 +365,13 @@ static rsRetVal strmCloseFile(strm_t *pThis)
pThis->fdDir = -1;
}
- if(pThis->bDeleteOnClose && pThis->pszCurrFName != NULL) {
+ if(pThis->bDeleteOnClose) {
+ if(pThis->pszCurrFName == NULL) {
+ CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
+ pThis->pszFName, pThis->lenFName, pThis->iCurrFNum,
+ pThis->iFileNumDigits));
+ }
+ DBGPRINTF("strmCloseFile: deleting '%s'\n", pThis->pszCurrFName);
if(unlink((char*) pThis->pszCurrFName) == -1) {
char errStr[1024];
int err = errno;
@@ -369,12 +379,13 @@ static rsRetVal strmCloseFile(strm_t *pThis)
DBGPRINTF("error %d unlinking '%s' - ignored: %s\n",
errno, pThis->pszCurrFName, errStr);
}
- free(pThis->pszCurrFName); /* no longer needed in any case (just for open) */
+ free(pThis->pszCurrFName);
pThis->pszCurrFName = NULL;
}
pThis->iCurrOffs = 0; /* we are back at begin of file */
+finalize_it:
RETiRet;
}
@@ -682,6 +693,7 @@ BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro!
pThis->fd = -1;
pThis->fdDir = -1;
pThis->iUngetC = -1;
+ pThis->bVeryReliableZip = 0;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
pThis->tOpenMode = 0600;
@@ -785,6 +797,7 @@ stopWriter(strm_t *pThis)
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(strm)
+ /* we need to stop the ZIP writer */
if(pThis->bAsyncWrite)
/* Note: mutex will be unlocked in stopWriter! */
d_pthread_mutex_lock(&pThis->mut);
@@ -927,14 +940,14 @@ finalize_it:
/* write memory buffer to a stream object.
*/
static inline rsRetVal
-doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
DEFiRet;
ASSERT(pThis != NULL);
if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf, bFlush));
} else {
/* write without zipping */
CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
@@ -979,7 +992,7 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
* the background thread. -- rgerhards, 2009-07-07
*/
static rsRetVal
-strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlushZip)
{
DEFiRet;
@@ -998,7 +1011,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->bAsyncWrite) {
CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
} else {
- CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf, bFlushZip));
}
@@ -1038,7 +1051,7 @@ asyncWriterThread(void *pPtr)
}
if(bTimedOut && pThis->iBufPtr > 0) {
/* if we timed out, we need to flush pending data */
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
bTimedOut = 0;
continue; /* now we should have data */
}
@@ -1064,7 +1077,7 @@ asyncWriterThread(void *pPtr)
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
- doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state
// TODO: error check????? 2009-07-06
--pThis->iCnt;
@@ -1175,63 +1188,97 @@ finalize_it:
* rgerhards, 2009-06-04
*/
static rsRetVal
-doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
- z_stream zstrm;
int zRet; /* zlib return state */
- sbool bzInitDone = RSFALSE;
DEFiRet;
+ unsigned outavail;
assert(pThis != NULL);
assert(pBuf != NULL);
- /* allocate deflate state */
- zstrm.zalloc = Z_NULL;
- zstrm.zfree = Z_NULL;
- zstrm.opaque = Z_NULL;
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- /* see note in file header for the params we use with deflateInit2() */
- zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
- ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ /* see note in file header for the params we use with deflateInit2() */
+ zRet = zlibw.DeflateInit2(&pThis->zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
}
- bzInitDone = RSTRUE;
/* now doing the compression */
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- zstrm.avail_in = lenBuf;
+ pThis->zstrm.next_in = (Bytef*) pBuf;
+ pThis->zstrm.avail_in = lenBuf;
/* run deflate() on buffer until everything has been compressed */
do {
- DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
- zstrm.avail_out = pThis->sIOBufSize;
- zstrm.next_out = pThis->pZipBuf;
- zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
- DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
- assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
- if(zstrm.avail_out == pThis->sIOBufSize)
- break; /* this is valid, indicates end of compression --> see zlib howto */
- CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
- } while (zstrm.avail_out == 0);
- assert(zstrm.avail_in == 0); /* all input will be used */
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, bFlush ? Z_SYNC_FLUSH : Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail =pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
+ }
+ } while (pThis->zstrm.avail_out == 0);
finalize_it:
- if(bzInitDone) {
- zRet = zlibw.DeflateEnd(&zstrm);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
- }
+ if(pThis->bzInitDone && pThis->bVeryReliableZip) {
+ doZipFinish(pThis);
}
-
RETiRet;
}
+
+/* finish zlib buffer, to be called before closing the ZIP file (if
+ * running in stream mode).
+ */
+static rsRetVal
+doZipFinish(strm_t *pThis)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ assert(pThis != NULL);
+
+ if(!pThis->bzInitDone)
+ goto done;
+
+ pThis->zstrm.avail_in = 0;
+ /* run deflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
+ }
+ } while (pThis->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = zlibw.DeflateEnd(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
+ }
+
+ pThis->bzInitDone = 0;
+done: RETiRet;
+}
+
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
*/
static rsRetVal
-strmFlushInternal(strm_t *pThis)
+strmFlushInternal(strm_t *pThis, int bFlushZip)
{
DEFiRet;
@@ -1241,7 +1288,7 @@ strmFlushInternal(strm_t *pThis)
(long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : "");
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr, bFlushZip);
}
RETiRet;
@@ -1263,7 +1310,7 @@ strmFlush(strm_t *pThis)
if(pThis->bAsyncWrite)
d_pthread_mutex_lock(&pThis->mut);
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 1));
finalize_it:
if(pThis->bAsyncWrite)
@@ -1286,7 +1333,7 @@ static rsRetVal strmSeek(strm_t *pThis, off64_t offs)
if(pThis->fd == -1) {
CHKiRet(strmOpenFile(pThis));
} else {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
long long i;
DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs);
@@ -1298,6 +1345,56 @@ finalize_it:
RETiRet;
}
+/* multi-file seek, seeks to file number & offset within file. This
+ * is a support function for the queue, in circular mode. DO NOT USE
+ * IT FOR OTHER NEEDS - it may not work as expected. It will
+ * seek to the new position and delete interim files, as it skips them.
+ * Note: this code can be removed when the queue gets a new disk store
+ * handler (if and when it does ;)).
+ * The output parameter bytesDel receives the number of bytes that have
+ * been deleted (if a file is deleted) or 0 if nothing was deleted.
+ * rgerhards, 2012-11-07
+ */
+rsRetVal
+strmMultiFileSeek(strm_t *pThis, int FNum, off64_t offs, off64_t *bytesDel)
+{
+ struct stat statBuf;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ if(FNum == 0 && offs == 0) { /* happens during queue init */
+ *bytesDel = 0;
+ FINALIZE;
+ }
+
+ if(pThis->iCurrFNum != FNum) {
+ /* Note: we assume that no more than one file is skipped - an
+ * assumption that is being used also by the whole rest of the
+ * code and most notably the queue subsystem.
+ */
+ CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
+ pThis->pszFName, pThis->lenFName, pThis->iCurrFNum,
+ pThis->iFileNumDigits));
+ stat((char*)pThis->pszCurrFName, &statBuf);
+ *bytesDel = statBuf.st_size;
+ DBGPRINTF("strmMultiFileSeek: detected new filenum, was %d, new %d, "
+ "deleting '%s' (%lld bytes)\n", pThis->iCurrFNum, FNum,
+ pThis->pszCurrFName, (long long) *bytesDel);
+ unlink((char*)pThis->pszCurrFName);
+ free(pThis->pszCurrFName);
+ pThis->pszCurrFName = NULL;
+ pThis->iCurrFNum = FNum;
+ } else {
+ *bytesDel = 0;
+ }
+ pThis->iCurrOffs = offs;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* seek to current offset. This is primarily a helper to readjust the OS file
* pointer after a strm object has been deserialized.
@@ -1329,7 +1426,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
/* we now always have space for one character, so we simply copy it */
*(pThis->pIOBuf + pThis->iBufPtr) = c;
@@ -1389,7 +1486,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
-//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+ /* DEV DEBUG ONLY DBGPRINTF("strmWrite(%p[%s], '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pThis->pszCurrFName, pBuf,(long) lenBuf, pThis->bDisabled, (long) pThis->iSizeLimit, (long long) pThis->iCurrOffs); */
if(pThis->bAsyncWrite)
d_pthread_mutex_lock(&pThis->mut);
@@ -1399,7 +1496,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
@@ -1414,7 +1511,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
* write it. This seems more natural than waiting (hours?) for the next message...
*/
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
finalize_it:
@@ -1442,6 +1539,7 @@ DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
DEFpropSetMeth(strm, iZipLevel, int)
+DEFpropSetMeth(strm, bVeryReliableZip, int)
DEFpropSetMeth(strm, bSync, int)
DEFpropSetMeth(strm, sIOBufSize, size_t)
DEFpropSetMeth(strm, iSizeLimit, off_t)
@@ -1573,7 +1671,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis));
objSerializeSCALAR(pStrm, iCurrFNum, INT);
@@ -1770,6 +1868,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SettOpenMode = strmSettOpenMode;
pIf->SetsType = strmSetsType;
pIf->SetiZipLevel = strmSetiZipLevel;
+ pIf->SetbVeryReliableZip = strmSetbVeryReliableZip;
pIf->SetbSync = strmSetbSync;
pIf->SetsIOBufSize = strmSetsIOBufSize;
pIf->SetiSizeLimit = strmSetiSizeLimit;
diff --git a/runtime/stream.h b/runtime/stream.h
index 78dbc0d6..b7e74074 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -124,6 +124,8 @@ typedef struct strm_s {
sbool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
sbool bStopWriter; /* shall writer thread terminate? */
sbool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ sbool bVeryReliableZip; /* shall we write interim headers to create a very reliable ZIP file? */
int iFlushInterval; /* flush in which interval - 0, no flushing */
pthread_mutex_t mut;/* mutex for flush in async mode */
pthread_cond_t notFull;
@@ -132,6 +134,7 @@ typedef struct strm_s {
unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
short iCnt; /* current nbr of elements in buffer */
+ z_stream zstrm; /* zip stream to use */
struct {
uchar *pBuf;
size_t lenBuf;
@@ -182,11 +185,18 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
/* v6 added */
rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, int mode);
+ /* v7 added 2012-09-14 */
+ INTERFACEpropSetMeth(strm, bVeryReliableZip, int);
ENDinterface(strm)
-#define strmCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
+#define strmCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
+static inline int
+strmGetCurrFileNum(strm_t *pStrm) {
+ return pStrm->iCurrFNum;
+}
/* prototypes */
PROTOTYPEObjClassInit(strm);
+rsRetVal strmMultiFileSeek(strm_t *pThis, int fileNum, off64_t offs, off64_t *bytesDel);
#endif /* #ifndef STREAM_H_INCLUDED */
diff --git a/runtime/typedefs.h b/runtime/typedefs.h
index ccae08b2..5cc24e4a 100644
--- a/runtime/typedefs.h
+++ b/runtime/typedefs.h
@@ -92,6 +92,8 @@ typedef struct cfgmodules_etry_s cfgmodules_etry_t;
typedef struct outchannels_s outchannels_t;
typedef struct modConfData_s modConfData_t;
typedef struct instanceConf_s instanceConf_t;
+typedef struct ratelimit_s ratelimit_t;
+typedef struct action_s action_t;
typedef int rs_size_t; /* we do never need more than 2Gig strings, signed permits to
* use -1 as a special flag. */
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */