summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c20
1 files changed, 12 insertions, 8 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 3c91fd02..90ffc0a0 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -131,7 +131,7 @@ static void displayBatchState(batch_t *pBatch)
{
int i;
for(i = 0 ; i < pBatch->nElem ; ++i) {
- dbgprintf("XXXXX: displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state);
+ DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state);
}
}
@@ -232,12 +232,16 @@ getQueueTypeName(queueType_t t)
switch(t) {
case QUEUETYPE_FIXED_ARRAY:
r = "FixedArray";
+ break;
case QUEUETYPE_LINKEDLIST:
r = "LinkedList";
+ break;
case QUEUETYPE_DISK:
r = "Disk";
+ break;
case QUEUETYPE_DIRECT:
r = "Direct";
+ break;
}
return r;
}
@@ -976,6 +980,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
batch_t singleBatch;
batch_obj_t batchObj;
+ sbool active = 1;
int i;
DEFiRet;
@@ -994,9 +999,9 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
batchObj.pUsrp = (obj_t*) pUsr;
- batchObj.bFilterOK = 1;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
+ singleBatch.active = &active;
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
@@ -1467,7 +1472,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
-dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ DBGPRINTF("delete batch from store, new sizes: log %d, phys %d\n",
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
RETiRet;
@@ -1503,7 +1509,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else {
/* can not delete, insert into to-delete list */
- dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
+ DBGPRINTF("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
}
@@ -1533,7 +1539,6 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
-dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state);
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*) pUsr));
++nEnqueued;
@@ -1544,7 +1549,7 @@ dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch
objDestruct(pUsr);
}
- dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
+ DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
if(nEnqueued > 0)
qqueueChkPersist(pThis, nEnqueued);
@@ -1596,7 +1601,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
/* all well, use this element */
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
- pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance
++nDequeued;
}
@@ -2731,7 +2735,7 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals)
} else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) {
pThis->iDeqtWinToHr = pvals[i].val.d.n;
} else {
- dbgprintf("queue: program error, non-handled "
+ DBGPRINTF("queue: program error, non-handled "
"param '%s'\n", pblk.descr[i].name);
}
}