diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-11-03 12:32:50 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-11-03 12:32:50 +0100 |
commit | c55e0a5a06e69106bc346057dd61dcb98688a4aa (patch) | |
tree | 0b9639a89adba961eb8ac2b55854c1aba9bf15a6 /runtime/queue.c | |
parent | feddb2cc8fa213725a14e556d0366aef8d3a4339 (diff) | |
download | rsyslog-c55e0a5a06e69106bc346057dd61dcb98688a4aa.tar.gz rsyslog-c55e0a5a06e69106bc346057dd61dcb98688a4aa.tar.bz2 rsyslog-c55e0a5a06e69106bc346057dd61dcb98688a4aa.zip |
queue: change generic msg ptr (pUsr) to be of msg_t type
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 59 |
1 files changed, 29 insertions, 30 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index eaf33e00..6e40b0c9 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -74,7 +74,7 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj) /* forward-definitions */ -static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -83,7 +83,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -312,7 +312,7 @@ getLogicalQueueSize(qqueue_t *pThis) */ static inline void queueDrain(qqueue_t *pThis) { - void *pUsr; + msg_t *pUsr; ASSERT(pThis != NULL); BEGINfunc @@ -546,7 +546,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis) } -static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) +static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in) { DEFiRet; @@ -560,7 +560,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) } -static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) +static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out) { DEFiRet; @@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) { qLinkedList_t *pEntry; DEFiRet; @@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; - pEntry->pUsr = pUsr; + pEntry->pMsg = pUsr; if(pThis->tVars.linklist.pDelRoot == NULL) { pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; @@ -647,14 +647,14 @@ finalize_it: } -static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) +static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr) { qLinkedList_t *pEntry; DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; ISOBJ_TYPE_assert(pEntry->pUsr, msg); - *ppUsr = pEntry->pUsr; + *ppUsr = pEntry->pMsg; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; RETiRet; @@ -889,7 +889,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) { DEFiRet; number_t nWriteCount; @@ -917,7 +917,7 @@ finalize_it: } -static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) +static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr) { DEFiRet; iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); @@ -970,7 +970,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) { batch_t singleBatch; batch_obj_t batchObj; @@ -992,7 +992,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; - batchObj.pUsrp = (obj_t*) pUsr; + batchObj.pMsg = pUsr; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; @@ -1044,7 +1044,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -qqueueAdd(qqueue_t *pThis, void *pUsr) +qqueueAdd(qqueue_t *pThis, msg_t *pUsr) { DEFiRet; @@ -1066,7 +1066,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, void **ppUsr) +qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) { DEFiRet; @@ -1420,14 +1420,13 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) { DEFiRet; rsRetVal iRetLocal; int iSeverity; ISOBJ_TYPE_assert(pThis, qqueue); - ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); @@ -1521,7 +1520,7 @@ static inline rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; - void *pUsr; + msg_t *pUsr; int nEnqueued = 0; rsRetVal localRet; DEFiRet; @@ -1530,11 +1529,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pUsrp; + pUsr = pBatch->pElem[i].pMsg; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { - localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*) pUsr)); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr)); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); @@ -1572,7 +1570,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz int nDiscarded; int nDeleted; int iQueueSize; - void *pUsr; + msg_t *pUsr; rsRetVal localRet; DEFiRet; @@ -1593,7 +1591,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz } /* all well, use this element */ - pWti->batch.pElem[nDequeued].pUsrp = pUsr; + pWti->batch.pElem[nDequeued].pMsg = pUsr; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; ++nDequeued; } @@ -1897,8 +1895,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, + MsgAddRef(pWti->batch.pElem[i].pMsg))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } @@ -2002,8 +2000,9 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qDeq = qDeqLinkedList; + //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qDel = qDelLinkedList; pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: @@ -2432,7 +2431,7 @@ finalize_it: * rgerhards, 2009-06-16 */ static inline rsRetVal -doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) { DEFiRet; int err; @@ -2614,7 +2613,7 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); @@ -2627,7 +2626,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) { DEFiRet; int iCancelStateSave; |