diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 99fb5fbd..6df1c95e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -130,7 +130,7 @@ static inline void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { - DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]); } } @@ -941,6 +941,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) { batch_t singleBatch; batch_obj_t batchObj; + batch_state_t batchState = BATCH_STATE_RDY; sbool active = 1; int i; DEFiRet; @@ -958,10 +959,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) */ memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); - batchObj.state = BATCH_STATE_RDY; batchObj.pMsg = pMsg; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; + singleBatch.eltState = &batchState; singleBatch.active = &active; iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ @@ -1546,8 +1547,8 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) for(i = 0 ; i < pBatch->nElem ; ++i) { pMsg = pBatch->pElem[i].pMsg; - if( pBatch->pElem[i].state == BATCH_STATE_RDY - || pBatch->pElem[i].state == BATCH_STATE_SUB) { + if( pBatch->eltState[i] == BATCH_STATE_RDY + || pBatch->eltState[i] == BATCH_STATE_SUB) { localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { @@ -1611,7 +1612,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz /* all well, use this element */ pWti->batch.pElem[nDequeued].pMsg = pMsg; - pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY; ++nDequeued; } @@ -1915,7 +1916,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg))); - pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ + pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */ } /* but now cancellation is no longer permitted */ |