diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 38 |
1 files changed, 22 insertions, 16 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 2b017747..34935403 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -131,7 +131,7 @@ static void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { - dbgprintf("XXXXX: displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); } } @@ -1418,7 +1418,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); -dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + DBGPRINTF("delete batch from store, new sizes: log %d, phys %d\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ RETiRet; @@ -1454,7 +1455,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else { /* can not delete, insert into to-delete list */ - dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); + DBGPRINTF("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); } @@ -1484,7 +1485,6 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { -dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); ++nEnqueued; @@ -1495,7 +1495,7 @@ dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch objDestruct(pUsr); } - dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); @@ -2462,21 +2462,27 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); - if(glbl.GetGlobalInputTermState()) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n"); - ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - timeoutComp(&t, pThis->toEnq); STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); -// TODO : handle enqOnly => discard! - if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + if(pThis->toEnq == 0 || pThis->bEnqOnly) { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); - } + } else { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); + if(glbl.GetGlobalInputTermState()) { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n"); + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + timeoutComp(&t, pThis->toEnq); + if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); + objDestruct(pUsr); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); + } } /* and finally enqueue the message */ @@ -2674,7 +2680,7 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) { pThis->iDeqtWinToHr = pvals[i].val.d.n; } else { - dbgprintf("queue: program error, non-handled " + DBGPRINTF("queue: program error, non-handled " "param '%s'\n", pblk.descr[i].name); } } |