summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-11-03 12:51:19 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2012-11-03 12:51:19 +0100
commitf88d4f76f69f52918726eb91a2ce06a163cbf0c6 (patch)
tree57ba989a0c6a7966cabbfdc1643fb85f5621de5e /runtime/queue.c
parentc7ac716085fab4dbc93bc26e8b8b852991a3c76a (diff)
downloadrsyslog-f88d4f76f69f52918726eb91a2ce06a163cbf0c6.tar.gz
rsyslog-f88d4f76f69f52918726eb91a2ce06a163cbf0c6.tar.bz2
rsyslog-f88d4f76f69f52918726eb91a2ce06a163cbf0c6.zip
queue: remove unnecessary (obj_t*) redirection from msg ptrs
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c82
1 files changed, 40 insertions, 42 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 6e40b0c9..bdca61eb 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -312,16 +312,16 @@ getLogicalQueueSize(qqueue_t *pThis)
*/
static inline void queueDrain(qqueue_t *pThis)
{
- msg_t *pUsr;
+ msg_t *pMsg;
ASSERT(pThis != NULL);
BEGINfunc
DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
- pThis->qDeq(pThis, &pUsr);
- if(pUsr != NULL) {
- objDestruct(pUsr);
+ pThis->qDeq(pThis, &pMsg);
+ if(pMsg != NULL) {
+ msgDestruct(&pMsg);
}
pThis->qDel(pThis);
}
@@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
RETiRet;
}
-static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr)
+static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pMsg)
{
qLinkedList_t *pEntry;
DEFiRet;
@@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr)
CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
- pEntry->pMsg = pUsr;
+ pEntry->pMsg = pMsg;
if(pThis->tVars.linklist.pDelRoot == NULL) {
pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
@@ -647,14 +647,13 @@ finalize_it:
}
-static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr)
+static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppMsg)
{
qLinkedList_t *pEntry;
DEFiRet;
pEntry = pThis->tVars.linklist.pDeqRoot;
- ISOBJ_TYPE_assert(pEntry->pUsr, msg);
- *ppUsr = pEntry->pMsg;
+ *ppMsg = pEntry->pMsg;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
RETiRet;
@@ -889,7 +888,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
RETiRet;
}
-static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr)
+static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg)
{
DEFiRet;
number_t nWriteCount;
@@ -897,7 +896,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr)
ASSERT(pThis != NULL);
CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount));
- CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
+ CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite));
CHKiRet(strm.Flush(pThis->tVars.disk.pWrite));
CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */
@@ -907,7 +906,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr)
* the in-memory representation. The instance will be re-created upon
* dequeue. -- rgerhards, 2008-07-09
*/
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n",
nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly);
@@ -917,10 +916,10 @@ finalize_it:
}
-static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr)
+static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg)
{
DEFiRet;
- iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
+ iRet = obj.Deserialize(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
RETiRet;
}
@@ -970,7 +969,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr)
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
{
batch_t singleBatch;
batch_obj_t batchObj;
@@ -992,7 +991,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr)
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
- batchObj.pMsg = pUsr;
+ batchObj.pMsg = pMsg;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.active = &active;
@@ -1001,7 +1000,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr)
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
free(batchObj.staticActStrings[i]);
}
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
RETiRet;
}
@@ -1044,13 +1043,13 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal
-qqueueAdd(qqueue_t *pThis, msg_t *pUsr)
+qqueueAdd(qqueue_t *pThis, msg_t *pMsg)
{
DEFiRet;
ASSERT(pThis != NULL);
- CHKiRet(pThis->qAdd(pThis, pUsr));
+ CHKiRet(pThis->qAdd(pThis, pMsg));
if(pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
@@ -1066,7 +1065,7 @@ finalize_it:
/* generic code to dequeue a queue entry
*/
static rsRetVal
-qqueueDeq(qqueue_t *pThis, msg_t **ppUsr)
+qqueueDeq(qqueue_t *pThis, msg_t **ppMsg)
{
DEFiRet;
@@ -1077,7 +1076,7 @@ qqueueDeq(qqueue_t *pThis, msg_t **ppUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- iRet = pThis->qDeq(pThis, ppUsr);
+ iRet = pThis->qDeq(pThis, ppMsg);
ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
@@ -1420,7 +1419,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
* the return state!
* rgerhards, 2008-01-24
*/
-static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pMsg)
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1429,12 +1428,12 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr)
ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
- iRetLocal = objGetSeverity(pUsr, &iSeverity);
+ iRetLocal = MsgGetSeverity(pMsg, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
iQueueSize, iSeverity);
STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg "
@@ -1520,7 +1519,7 @@ static inline rsRetVal
DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
- msg_t *pUsr;
+ msg_t *pMsg;
int nEnqueued = 0;
rsRetVal localRet;
DEFiRet;
@@ -1529,16 +1528,16 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
- pUsr = pBatch->pElem[i].pMsg;
+ pMsg = pBatch->pElem[i].pMsg;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
- localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr));
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
}
}
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
}
DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
@@ -1570,7 +1569,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
int nDiscarded;
int nDeleted;
int iQueueSize;
- msg_t *pUsr;
+ msg_t *pMsg;
rsRetVal localRet;
DEFiRet;
@@ -1579,10 +1578,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
nDequeued = nDiscarded = 0;
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
- CHKiRet(qqueueDeq(pThis, &pUsr));
+ CHKiRet(qqueueDeq(pThis, &pMsg));
/* check if we should discard this element */
- localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr);
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg);
if(localRet == RS_RET_QUEUE_FULL) {
++nDiscarded;
continue;
@@ -1591,7 +1590,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
}
/* all well, use this element */
- pWti->batch.pElem[nDequeued].pMsg = pUsr;
+ pWti->batch.pElem[nDequeued].pMsg = pMsg;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
++nDequeued;
}
@@ -2001,7 +2000,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
pThis->qDeq = qDeqLinkedList;
- //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
pThis->qDel = qDelLinkedList;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
@@ -2431,7 +2429,7 @@ finalize_it:
* rgerhards, 2009-06-16
*/
static inline rsRetVal
-doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
+doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
{
DEFiRet;
int err;
@@ -2440,7 +2438,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued);
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
- CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg));
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2517,7 +2515,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
if(pThis->toEnq == 0 || pThis->bEnqOnly) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq);
@@ -2529,7 +2527,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
- objDestruct(pUsr);
+ msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
@@ -2537,7 +2535,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
}
/* and finally enqueue the message */
- CHKiRet(qqueueAdd(pThis, pUsr));
+ CHKiRet(qqueueAdd(pThis, pMsg));
STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize);
finalize_it:
@@ -2613,11 +2611,11 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr)
+qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pUsr);
+ iRet = qAddDirect(pThis, pMsg);
RETiRet;
}
@@ -2626,7 +2624,7 @@ qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr)
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
+qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
{
DEFiRet;
int iCancelStateSave;
@@ -2638,7 +2636,7 @@ qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
d_pthread_mutex_lock(pThis->mut);
}
- CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr));
+ CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg));
qqueueChkPersist(pThis, 1);