summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-11-03 12:32:50 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2012-11-03 12:32:50 +0100
commitc55e0a5a06e69106bc346057dd61dcb98688a4aa (patch)
tree0b9639a89adba961eb8ac2b55854c1aba9bf15a6
parentfeddb2cc8fa213725a14e556d0366aef8d3a4339 (diff)
downloadrsyslog-c55e0a5a06e69106bc346057dd61dcb98688a4aa.tar.gz
rsyslog-c55e0a5a06e69106bc346057dd61dcb98688a4aa.tar.bz2
rsyslog-c55e0a5a06e69106bc346057dd61dcb98688a4aa.zip
queue: change generic msg ptr (pUsr) to be of msg_t type
-rw-r--r--action.c16
-rw-r--r--runtime/batch.h22
-rw-r--r--runtime/queue.c59
-rw-r--r--runtime/queue.h10
-rw-r--r--runtime/ruleset.c15
-rw-r--r--tools/syslogd.c6
6 files changed, 55 insertions, 73 deletions
diff --git a/action.c b/action.c
index e31ebd51..67059b21 100644
--- a/action.c
+++ b/action.c
@@ -811,7 +811,7 @@ prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *
ASSERT(pAction != NULL);
ASSERT(pElem != NULL);
- pMsg = (msg_t*) pElem->pUsrp;
+ pMsg = pElem->pMsg;
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
@@ -1061,7 +1061,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
* enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
*/
if(batchIsValidElem(pBatch, i)) {
- pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ pMsg = pBatch->pElem[i].pMsg;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
pBatch->pbShutdownImmediate);
DBGPRINTF("action %p call returned %d\n", pAction, localRet);
@@ -1392,9 +1392,9 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg));
else
- iRet = qqueueEnqObj(pAction->pQueue, eFLOWCTL_NO_DELAY, (void*) MsgAddRef(pMsg));
+ iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
finalize_it:
RETiRet;
@@ -1480,7 +1480,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
msg_t *pMsg;
DEFiRet;
- pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp);
+ pMsg = pBatch->pElem[idxBtch].pMsg;
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
/* don't output marks to recently written outputs */
@@ -1572,7 +1572,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
* also faster ;) -- rgerhards, 2008-09-17 */
do {
lastAct = pAction->f_time;
- if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) {
+ if(pBatch->pElem[i].pMsg->msgFlags & MARK) {
if((now - lastAct) < MarkInterval / 2) {
pBatch->active[i] = 0;
DBGPRINTF("batch item %d: action was recently called, ignoring "
@@ -1581,7 +1581,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
}
}
} while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
- ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0);
+ pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0);
if(pBatch->active[i]) {
DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n",
i, module.GetStateName(pAction->pMod));
@@ -1684,7 +1684,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( batchIsValidElem(pBatch, i)
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
- doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg);
}
}
}
diff --git a/runtime/batch.h b/runtime/batch.h
index f743c188..0f19f5bb 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -46,7 +46,7 @@ typedef enum {
/* an object inside a batch, including any information (state!) needed for it to "life".
*/
struct batch_obj_s {
- obj_t *pUsrp; /* pointer to user object (most often message) */
+ msg_t *pMsg;
batch_state_t state; /* associated state */
/* work variables for action processing; these are reused for each action (or block of
* actions)
@@ -97,13 +97,13 @@ batchSetSingleRuleset(batch_t *pBatch, sbool val) {
/* get the batches ruleset (if we have a single ruleset) */
static inline ruleset_t*
batchGetRuleset(batch_t *pBatch) {
- return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
+ return (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL;
}
/* get the ruleset of a specifc element of the batch (index not verified!) */
static inline ruleset_t*
batchElemGetRuleset(batch_t *pBatch, int i) {
- return ((msg_t*) pBatch->pElem[i].pUsrp)->pRuleset;
+ return pBatch->pElem[i].pMsg->pRuleset;
}
/* get number of msgs for this batch */
@@ -134,22 +134,6 @@ batchIsValidElem(batch_t *pBatch, int i) {
}
-/* copy one batch element to another.
- * This creates a complete duplicate in those cases where
- * it is needed. Use duplication only when absolutely necessary!
- * Note that all working fields are reset to zeros. If that were
- * not done, we would have potential problems with invalid
- * or double pointer frees.
- * rgerhards, 2010-06-10
- */
-static inline void
-batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) {
- memset(pDest, 0, sizeof(batch_obj_t));
- pDest->pUsrp = pSrc->pUsrp;
- pDest->state = pSrc->state;
-}
-
-
/* free members of a batch "object". Note that we can not do the usual
* destruction as the object typically is allocated on the stack and so the
* object itself cannot be freed! -- rgerhards, 2010-06-15
diff --git a/runtime/queue.c b/runtime/queue.c
index eaf33e00..6e40b0c9 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -74,7 +74,7 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(statsobj)
/* forward-definitions */
-static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -83,7 +83,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@@ -312,7 +312,7 @@ getLogicalQueueSize(qqueue_t *pThis)
*/
static inline void queueDrain(qqueue_t *pThis)
{
- void *pUsr;
+ msg_t *pUsr;
ASSERT(pThis != NULL);
BEGINfunc
@@ -546,7 +546,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis)
}
-static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
+static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in)
{
DEFiRet;
@@ -560,7 +560,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
}
-static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
+static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out)
{
DEFiRet;
@@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
RETiRet;
}
-static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr)
{
qLinkedList_t *pEntry;
DEFiRet;
@@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
- pEntry->pUsr = pUsr;
+ pEntry->pMsg = pUsr;
if(pThis->tVars.linklist.pDelRoot == NULL) {
pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
@@ -647,14 +647,14 @@ finalize_it:
}
-static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
+static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr)
{
qLinkedList_t *pEntry;
DEFiRet;
pEntry = pThis->tVars.linklist.pDeqRoot;
ISOBJ_TYPE_assert(pEntry->pUsr, msg);
- *ppUsr = pEntry->pUsr;
+ *ppUsr = pEntry->pMsg;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
RETiRet;
@@ -889,7 +889,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
RETiRet;
}
-static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr)
{
DEFiRet;
number_t nWriteCount;
@@ -917,7 +917,7 @@ finalize_it:
}
-static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
+static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr)
{
DEFiRet;
iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
@@ -970,7 +970,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr)
{
batch_t singleBatch;
batch_obj_t batchObj;
@@ -992,7 +992,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
- batchObj.pUsrp = (obj_t*) pUsr;
+ batchObj.pMsg = pUsr;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.active = &active;
@@ -1044,7 +1044,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal
-qqueueAdd(qqueue_t *pThis, void *pUsr)
+qqueueAdd(qqueue_t *pThis, msg_t *pUsr)
{
DEFiRet;
@@ -1066,7 +1066,7 @@ finalize_it:
/* generic code to dequeue a queue entry
*/
static rsRetVal
-qqueueDeq(qqueue_t *pThis, void **ppUsr)
+qqueueDeq(qqueue_t *pThis, msg_t **ppUsr)
{
DEFiRet;
@@ -1420,14 +1420,13 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
* the return state!
* rgerhards, 2008-01-24
*/
-static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
ISOBJ_TYPE_assert(pThis, qqueue);
- ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
@@ -1521,7 +1520,7 @@ static inline rsRetVal
DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
- void *pUsr;
+ msg_t *pUsr;
int nEnqueued = 0;
rsRetVal localRet;
DEFiRet;
@@ -1530,11 +1529,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
- pUsr = pBatch->pElem[i].pUsrp;
+ pUsr = pBatch->pElem[i].pMsg;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
- localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
- (obj_t*)MsgAddRef((msg_t*) pUsr));
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr));
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
@@ -1572,7 +1570,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
int nDiscarded;
int nDeleted;
int iQueueSize;
- void *pUsr;
+ msg_t *pUsr;
rsRetVal localRet;
DEFiRet;
@@ -1593,7 +1591,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
}
/* all well, use this element */
- pWti->batch.pElem[nDequeued].pUsrp = pUsr;
+ pWti->batch.pElem[nDequeued].pMsg = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
++nDequeued;
}
@@ -1897,8 +1895,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
- (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ MsgAddRef(pWti->batch.pElem[i].pMsg)));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
}
@@ -2002,8 +2000,9 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qDeq = qDeqLinkedList;
+ //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qDel = qDelLinkedList;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
@@ -2432,7 +2431,7 @@ finalize_it:
* rgerhards, 2009-06-16
*/
static inline rsRetVal
-doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
{
DEFiRet;
int err;
@@ -2614,7 +2613,7 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
+qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -2627,7 +2626,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
{
DEFiRet;
int iCancelStateSave;
diff --git a/runtime/queue.h b/runtime/queue.h
index a5a020ae..e6ccdcdb 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -51,7 +51,7 @@ typedef enum {
/* list member definition for linked list types of queues: */
typedef struct qLinkedList_S {
struct qLinkedList_S *pNext;
- void *pUsr;
+ msg_t *pMsg;
} qLinkedList_t;
@@ -111,8 +111,8 @@ struct queue_s {
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
rsRetVal (*qDestruct)(struct queue_s *pThis);
- rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
- rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
+ rsRetVal (*qAdd)(struct queue_s *pThis, msg_t *pMsg);
+ rsRetVal (*qDeq)(struct queue_s *pThis, msg_t **ppMsg);
rsRetVal (*qDel)(struct queue_s *pThis);
/* end type-specific handler */
/* public entry points (set during construction, permit to set best algorithm for params selected) */
@@ -186,8 +186,8 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr);
-rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
+rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg);
+rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 24d8279c..bc8f5234 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -194,7 +194,7 @@ processBatchMultiRuleset(batch_t *pBatch)
for(i = iStart ; i < pBatch->nElem ; ++i) {
if(batchElemGetRuleset(pBatch, i) == currRuleset) {
/* for performance reasons, we copy only those members that we actually need */
- snglRuleBatch.pElem[iNew].pUsrp = pBatch->pElem[i].pUsrp;
+ snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg;
snglRuleBatch.pElem[iNew].state = pBatch->pElem[i].state;
++iNew;
/* We indicate the element also as done, so it will not be processed again */
@@ -244,8 +244,8 @@ execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if( pBatch->pElem[i].state != BATCH_STATE_DISC
&& (active == NULL || active[i])) {
- cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pUsrp);
- msgSetJSONFromVar((msg_t*)pBatch->pElem[i].pUsrp, stmt->d.s_set.varname,
+ cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg);
+ msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname,
&result);
varDelete(&result);
}
@@ -261,7 +261,7 @@ execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if( pBatch->pElem[i].state != BATCH_STATE_DISC
&& (active == NULL || active[i])) {
- msgUnsetJSON((msg_t*)pBatch->pElem[i].pUsrp, stmt->d.s_unset.varname);
+ msgUnsetJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname);
}
}
RETiRet;
@@ -303,8 +303,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
if(pBatch->pElem[i].state == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
if(active == NULL || active[i]) {
- bRet = cnfexprEvalBool(stmt->d.s_if.expr,
- (msg_t*)(pBatch->pElem[i].pUsrp));
+ bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg);
} else
bRet = 0;
newAct[i] = bRet;
@@ -337,7 +336,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if(pBatch->pElem[i].state == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
- pMsg = (msg_t*)(pBatch->pElem[i].pUsrp);
+ pMsg = pBatch->pElem[i].pMsg;
if(active == NULL || active[i]) {
if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) ||
((stmt->d.s_prifilt.pmask[pMsg->iFacility]
@@ -466,7 +465,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
if(pBatch->pElem[i].state == BATCH_STATE_DISC)
continue; /* will be ignored in any case */
if(active == NULL || active[i]) {
- bRet = evalPROPFILT(stmt, (msg_t*)(pBatch->pElem[i].pUsrp));
+ bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg);
} else
bRet = 0;
thenAct[i] = bRet;
diff --git a/tools/syslogd.c b/tools/syslogd.c
index e347794b..a2ce6469 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -514,10 +514,10 @@ preprocessBatch(batch_t *pBatch) {
DEFiRet;
bSingleRuleset = 1;
- batchRuleset = (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
+ batchRuleset = (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL;
for(i = 0 ; i < pBatch->nElem && !*(pBatch->pbShutdownImmediate) ; i++) {
- pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ pMsg = pBatch->pElem[i].pMsg;
if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK)
@@ -603,7 +603,7 @@ submitMsg2(msg_t *pMsg)
}
MsgPrepareEnqueue(pMsg);
- qqueueEnqObj(pQueue, pMsg->flowCtlType, (void*) pMsg);
+ qqueueEnqMsg(pQueue, pMsg->flowCtlType, pMsg);
finalize_it:
RETiRet;