summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c34
1 files changed, 20 insertions, 14 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index b4f00446..d9942365 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1070,6 +1070,11 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout");
/* instruct workers to finish ASAP, even if still work exists */
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
+ /* now DA queue */
+ if(pThis->bIsDA) {
+ pThis->pqDA->bEnqOnly = 1;
+ pThis->pqDA->bShutdownImmediate = 1;
+ }
// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance...
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
@@ -1356,14 +1361,10 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue
/* remove messages from the physical queue store that are fully processed. This is
- * controlled via the to-delete list. We can only delete those elements, that are
- * at the current physical tail of the queue. If the batch is from another position,
- * we schedule it for deletion, but actual deletion will happen at a later call
- * of this function here. We always delete as much as possible, which includes
- * picking up things from the to-delete list.
+ * controlled via the to-delete list.
*/
static inline rsRetVal
-DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
{
toDeleteLst_t *pTdl;
qDeqID deqIDDel;
@@ -1371,11 +1372,10 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
- assert(nDeleted > 0);
pTdl = tdlPeek(pThis); /* get current head element */
if(pTdl == NULL) { /* to-delete list empty */
- DoDeleteBatchFromQStore(pThis, nDeleted);
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
pTdl = tdlPeek(pThis);
@@ -1385,10 +1385,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
++deqIDDel;
pTdl = tdlPeek(pThis);
}
+ /* old entries deleted, now delete current ones... */
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else {
/* can not delete, insert into to-delete list */
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
- CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted));
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
}
finalize_it:
@@ -1408,20 +1410,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
void *pUsr;
int nEnqueued = 0;
rsRetVal localRet;
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
for(i = 0 ; i < pBatch->nElem ; ++i) {
-dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
+dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n",
+i, pBatch->pElem[i].pUsrp, ((msg_t*)pBatch->pElem[i].pUsrp)->iRefCount, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
-RUNLOG_STR("we need to requeue the entry");
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*) pUsr));
+dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRefCount);
++nEnqueued;
if(localRet != RS_RET_OK) {
DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
@@ -1435,11 +1440,11 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque
if(nEnqueued > 0)
qqueueChkPersist(pThis, nEnqueued);
- if(i > 0)
- iRet = DeleteBatchFromQStore(pThis, pBatch, i);
+ iRet = DeleteBatchFromQStore(pThis, pBatch);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
+ pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
@@ -2072,7 +2077,8 @@ CODESTARTobjDestruct(qqueue)
* we need to reset the logical dequeue pointer, persist the queue if configured to do
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
- CHKiRet(pThis->qUnDeqAll(pThis));
+RUNLOG_STR("XXX: NOT undequeueing entries!");
+ //CHKiRet(pThis->qUnDeqAll(pThis));
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));