summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c13
1 files changed, 7 insertions, 6 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 99fb5fbd..6df1c95e 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -130,7 +130,7 @@ static inline void displayBatchState(batch_t *pBatch)
{
int i;
for(i = 0 ; i < pBatch->nElem ; ++i) {
- DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state);
+ DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]);
}
}
@@ -941,6 +941,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
{
batch_t singleBatch;
batch_obj_t batchObj;
+ batch_state_t batchState = BATCH_STATE_RDY;
sbool active = 1;
int i;
DEFiRet;
@@ -958,10 +959,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
*/
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
- batchObj.state = BATCH_STATE_RDY;
batchObj.pMsg = pMsg;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
+ singleBatch.eltState = &batchState;
singleBatch.active = &active;
iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
@@ -1546,8 +1547,8 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
for(i = 0 ; i < pBatch->nElem ; ++i) {
pMsg = pBatch->pElem[i].pMsg;
- if( pBatch->pElem[i].state == BATCH_STATE_RDY
- || pBatch->pElem[i].state == BATCH_STATE_SUB) {
+ if( pBatch->eltState[i] == BATCH_STATE_RDY
+ || pBatch->eltState[i] == BATCH_STATE_SUB) {
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
++nEnqueued;
if(localRet != RS_RET_OK) {
@@ -1611,7 +1612,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
/* all well, use this element */
pWti->batch.pElem[nDequeued].pMsg = pMsg;
- pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY;
++nDequeued;
}
@@ -1915,7 +1916,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
MsgAddRef(pWti->batch.pElem[i].pMsg)));
- pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
+ pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */
}
/* but now cancellation is no longer permitted */