diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 34 |
1 files changed, 20 insertions, 14 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index b4f00446..d9942365 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1070,6 +1070,11 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); /* instruct workers to finish ASAP, even if still work exists */ pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; + /* now DA queue */ + if(pThis->bIsDA) { + pThis->pqDA->bEnqOnly = 1; + pThis->pqDA->bShutdownImmediate = 1; + } // TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance... /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ @@ -1356,14 +1361,10 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue /* remove messages from the physical queue store that are fully processed. This is - * controlled via the to-delete list. We can only delete those elements, that are - * at the current physical tail of the queue. If the batch is from another position, - * we schedule it for deletion, but actual deletion will happen at a later call - * of this function here. We always delete as much as possible, which includes - * picking up things from the to-delete list. + * controlled via the to-delete list. */ static inline rsRetVal -DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) { toDeleteLst_t *pTdl; qDeqID deqIDDel; @@ -1371,11 +1372,10 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - assert(nDeleted > 0); pTdl = tdlPeek(pThis); /* get current head element */ if(pTdl == NULL) { /* to-delete list empty */ - DoDeleteBatchFromQStore(pThis, nDeleted); + DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); @@ -1385,10 +1385,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) ++deqIDDel; pTdl = tdlPeek(pThis); } + /* old entries deleted, now delete current ones... */ + DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else { /* can not delete, insert into to-delete list */ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); - CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); } finalize_it: @@ -1408,20 +1410,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) void *pUsr; int nEnqueued = 0; rsRetVal localRet; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); for(i = 0 ; i < pBatch->nElem ; ++i) { -dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); +dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n", +i, pBatch->pElem[i].pUsrp, ((msg_t*)pBatch->pElem[i].pUsrp)->iRefCount, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { -RUNLOG_STR("we need to requeue the entry"); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); +dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRefCount); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); @@ -1435,11 +1440,11 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); - if(i > 0) - iRet = DeleteBatchFromQStore(pThis, pBatch, i); + iRet = DeleteBatchFromQStore(pThis, pBatch); pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -2072,7 +2077,8 @@ CODESTARTobjDestruct(qqueue) * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ - CHKiRet(pThis->qUnDeqAll(pThis)); +RUNLOG_STR("XXX: NOT undequeueing entries!"); + //CHKiRet(pThis->qUnDeqAll(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); |