summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c59
1 files changed, 29 insertions, 30 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index eaf33e00..6e40b0c9 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -74,7 +74,7 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(statsobj)
/* forward-definitions */
-static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -83,7 +83,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@@ -312,7 +312,7 @@ getLogicalQueueSize(qqueue_t *pThis)
*/
static inline void queueDrain(qqueue_t *pThis)
{
- void *pUsr;
+ msg_t *pUsr;
ASSERT(pThis != NULL);
BEGINfunc
@@ -546,7 +546,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis)
}
-static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
+static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in)
{
DEFiRet;
@@ -560,7 +560,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
}
-static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
+static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out)
{
DEFiRet;
@@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
RETiRet;
}
-static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr)
{
qLinkedList_t *pEntry;
DEFiRet;
@@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
- pEntry->pUsr = pUsr;
+ pEntry->pMsg = pUsr;
if(pThis->tVars.linklist.pDelRoot == NULL) {
pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
@@ -647,14 +647,14 @@ finalize_it:
}
-static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
+static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr)
{
qLinkedList_t *pEntry;
DEFiRet;
pEntry = pThis->tVars.linklist.pDeqRoot;
ISOBJ_TYPE_assert(pEntry->pUsr, msg);
- *ppUsr = pEntry->pUsr;
+ *ppUsr = pEntry->pMsg;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
RETiRet;
@@ -889,7 +889,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
RETiRet;
}
-static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr)
{
DEFiRet;
number_t nWriteCount;
@@ -917,7 +917,7 @@ finalize_it:
}
-static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
+static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr)
{
DEFiRet;
iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
@@ -970,7 +970,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr)
{
batch_t singleBatch;
batch_obj_t batchObj;
@@ -992,7 +992,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
- batchObj.pUsrp = (obj_t*) pUsr;
+ batchObj.pMsg = pUsr;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.active = &active;
@@ -1044,7 +1044,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal
-qqueueAdd(qqueue_t *pThis, void *pUsr)
+qqueueAdd(qqueue_t *pThis, msg_t *pUsr)
{
DEFiRet;
@@ -1066,7 +1066,7 @@ finalize_it:
/* generic code to dequeue a queue entry
*/
static rsRetVal
-qqueueDeq(qqueue_t *pThis, void **ppUsr)
+qqueueDeq(qqueue_t *pThis, msg_t **ppUsr)
{
DEFiRet;
@@ -1420,14 +1420,13 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
* the return state!
* rgerhards, 2008-01-24
*/
-static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
ISOBJ_TYPE_assert(pThis, qqueue);
- ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
@@ -1521,7 +1520,7 @@ static inline rsRetVal
DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
- void *pUsr;
+ msg_t *pUsr;
int nEnqueued = 0;
rsRetVal localRet;
DEFiRet;
@@ -1530,11 +1529,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
- pUsr = pBatch->pElem[i].pUsrp;
+ pUsr = pBatch->pElem[i].pMsg;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
- localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
- (obj_t*)MsgAddRef((msg_t*) pUsr));
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr));
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
@@ -1572,7 +1570,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
int nDiscarded;
int nDeleted;
int iQueueSize;
- void *pUsr;
+ msg_t *pUsr;
rsRetVal localRet;
DEFiRet;
@@ -1593,7 +1591,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
}
/* all well, use this element */
- pWti->batch.pElem[nDequeued].pUsrp = pUsr;
+ pWti->batch.pElem[nDequeued].pMsg = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
++nDequeued;
}
@@ -1897,8 +1895,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
- (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ MsgAddRef(pWti->batch.pElem[i].pMsg)));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
}
@@ -2002,8 +2000,9 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qDeq = qDeqLinkedList;
+ //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qDel = qDelLinkedList;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
@@ -2432,7 +2431,7 @@ finalize_it:
* rgerhards, 2009-06-16
*/
static inline rsRetVal
-doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
{
DEFiRet;
int err;
@@ -2614,7 +2613,7 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
+qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -2627,7 +2626,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr)
{
DEFiRet;
int iCancelStateSave;