summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/queue.c24
1 files changed, 12 insertions, 12 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 699e2a66..728eb133 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1043,7 +1043,7 @@ qqueueAdd(qqueue_t *pThis, msg_t *pMsg)
if(pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
- DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
+ DBGOPRINT((obj_t*) pThis, "qqueueAdd: entry added, size now log %d, phys %d entries\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
@@ -1488,7 +1488,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
*/
if(bytesDel != 0) {
pThis->tVars.disk.sizeOnDisk -= bytesDel;
- DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk "
+ DBGOPRINT((obj_t*) pThis, "doDeleteBatch: a %lld octet file has been deleted, now %lld octets disk "
"space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk);
/* awake possibly waiting enq process */
pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
@@ -1502,7 +1502,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
- DBGPRINTF("delete batch from store, new sizes: log %d, phys %d\n",
+ DBGPRINTF("doDeleteBatch: delete batch from store, new sizes: log %d, phys %d\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
@@ -1572,13 +1572,13 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
++nEnqueued;
if(localRet != RS_RET_OK) {
- DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
+ DBGPRINTF("DeleteProcessedBatch: error %d re-enqueuing unprocessed data element - discarded\n", localRet);
}
}
msgDestruct(&pMsg);
}
- DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
+ DBGPRINTF("DeleteProcessedBatch: we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
if(nEnqueued > 0)
qqueueChkPersist(pThis, nEnqueued);
@@ -2520,7 +2520,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
* In any case, this was the old code (if we do the TODO):
* pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut);
*/
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message "
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: FullDelay mark reached for full delayable message "
"- blocking, queue size is %d.\n", pThis->iQueueSize);
timeoutComp(&t, 1000);
err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t);
@@ -2537,7 +2537,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
}
} else if(flowCtlType == eFLOWCTL_LIGHT_DELAY && !glbl.GetGlobalInputTermState()) {
if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light "
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: LightDelay mark reached for light "
"delayable message - blocking a bit.\n");
timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t);
@@ -2560,24 +2560,24 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull);
if(pThis->toEnq == 0 || pThis->bEnqOnly) {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n");
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate discarding QueueSize=%d MaxQueueSize=%d sizeOnDisk=%d sizeOnDiskMax=%d\n", pThis->iQueueSize, pThis->iMaxQueueSize, pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax);
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq);
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - waiting %dms to drain.\n", pThis->toEnq);
if(glbl.GetGlobalInputTermState()) {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n");
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL, discard due to FORCE_TERM.\n");
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
timeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: cond timeout, dropping message!\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
- dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
+ dbgoprint((obj_t*) pThis, "doEnqSingleObject: wait solved queue full condition, enqueing\n");
}
}