diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-11-03 12:51:19 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-11-03 12:51:19 +0100 |
commit | f88d4f76f69f52918726eb91a2ce06a163cbf0c6 (patch) | |
tree | 57ba989a0c6a7966cabbfdc1643fb85f5621de5e /runtime/queue.c | |
parent | c7ac716085fab4dbc93bc26e8b8b852991a3c76a (diff) | |
download | rsyslog-f88d4f76f69f52918726eb91a2ce06a163cbf0c6.tar.gz rsyslog-f88d4f76f69f52918726eb91a2ce06a163cbf0c6.tar.bz2 rsyslog-f88d4f76f69f52918726eb91a2ce06a163cbf0c6.zip |
queue: remove unnecessary (obj_t*) redirection from msg ptrs
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 82 |
1 files changed, 40 insertions, 42 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 6e40b0c9..bdca61eb 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -312,16 +312,16 @@ getLogicalQueueSize(qqueue_t *pThis) */ static inline void queueDrain(qqueue_t *pThis) { - msg_t *pUsr; + msg_t *pMsg; ASSERT(pThis != NULL); BEGINfunc DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { - pThis->qDeq(pThis, &pUsr); - if(pUsr != NULL) { - objDestruct(pUsr); + pThis->qDeq(pThis, &pMsg); + if(pMsg != NULL) { + msgDestruct(&pMsg); } pThis->qDel(pThis); } @@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pMsg) { qLinkedList_t *pEntry; DEFiRet; @@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; - pEntry->pMsg = pUsr; + pEntry->pMsg = pMsg; if(pThis->tVars.linklist.pDelRoot == NULL) { pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; @@ -647,14 +647,13 @@ finalize_it: } -static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr) +static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppMsg) { qLinkedList_t *pEntry; DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; - ISOBJ_TYPE_assert(pEntry->pUsr, msg); - *ppUsr = pEntry->pMsg; + *ppMsg = pEntry->pMsg; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; RETiRet; @@ -889,7 +888,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg) { DEFiRet; number_t nWriteCount; @@ -897,7 +896,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) ASSERT(pThis != NULL); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); - CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); + CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite)); CHKiRet(strm.Flush(pThis->tVars.disk.pWrite)); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ @@ -907,7 +906,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) * the in-memory representation. The instance will be re-created upon * dequeue. -- rgerhards, 2008-07-09 */ - objDestruct(pUsr); + msgDestruct(&pMsg); DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); @@ -917,10 +916,10 @@ finalize_it: } -static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr) +static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); + iRet = obj.Deserialize(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); RETiRet; } @@ -970,7 +969,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) { batch_t singleBatch; batch_obj_t batchObj; @@ -992,7 +991,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; - batchObj.pMsg = pUsr; + batchObj.pMsg = pMsg; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; @@ -1001,7 +1000,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); } - objDestruct(pUsr); + msgDestruct(&pMsg); RETiRet; } @@ -1044,13 +1043,13 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -qqueueAdd(qqueue_t *pThis, msg_t *pUsr) +qqueueAdd(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ASSERT(pThis != NULL); - CHKiRet(pThis->qAdd(pThis, pUsr)); + CHKiRet(pThis->qAdd(pThis, pMsg)); if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); @@ -1066,7 +1065,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) +qqueueDeq(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; @@ -1077,7 +1076,7 @@ qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - iRet = pThis->qDeq(pThis, ppUsr); + iRet = pThis->qDeq(pThis, ppMsg); ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); // DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", @@ -1420,7 +1419,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pMsg) { DEFiRet; rsRetVal iRetLocal; @@ -1429,12 +1428,12 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); + iRetLocal = MsgGetSeverity(pMsg, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", iQueueSize, iSeverity); STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " @@ -1520,7 +1519,7 @@ static inline rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; - msg_t *pUsr; + msg_t *pMsg; int nEnqueued = 0; rsRetVal localRet; DEFiRet; @@ -1529,16 +1528,16 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pMsg; + pMsg = pBatch->pElem[i].pMsg; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { - localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr)); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); } } - objDestruct(pUsr); + msgDestruct(&pMsg); } DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); @@ -1570,7 +1569,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz int nDiscarded; int nDeleted; int iQueueSize; - msg_t *pUsr; + msg_t *pMsg; rsRetVal localRet; DEFiRet; @@ -1579,10 +1578,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { - CHKiRet(qqueueDeq(pThis, &pUsr)); + CHKiRet(qqueueDeq(pThis, &pMsg)); /* check if we should discard this element */ - localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr); + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg); if(localRet == RS_RET_QUEUE_FULL) { ++nDiscarded; continue; @@ -1591,7 +1590,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz } /* all well, use this element */ - pWti->batch.pElem[nDequeued].pMsg = pUsr; + pWti->batch.pElem[nDequeued].pMsg = pMsg; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; ++nDequeued; } @@ -2001,7 +2000,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; pThis->qDeq = qDeqLinkedList; - //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; pThis->qDel = qDelLinkedList; pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; @@ -2431,7 +2429,7 @@ finalize_it: * rgerhards, 2009-06-16 */ static inline rsRetVal -doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int err; @@ -2440,7 +2438,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2517,7 +2515,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) if(pThis->toEnq == 0 || pThis->bEnqOnly) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); @@ -2529,7 +2527,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); @@ -2537,7 +2535,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) } /* and finally enqueue the message */ - CHKiRet(qqueueAdd(pThis, pUsr)); + CHKiRet(qqueueAdd(pThis, pMsg)); STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); finalize_it: @@ -2613,11 +2611,11 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pUsr); + iRet = qAddDirect(pThis, pMsg); RETiRet; } @@ -2626,7 +2624,7 @@ qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int iCancelStateSave; @@ -2638,7 +2636,7 @@ qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) d_pthread_mutex_lock(pThis->mut); } - CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr)); + CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg)); qqueueChkPersist(pThis, 1); |