From 92ec206279e29d12d3d44e51280485d641579e41 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 7 Oct 2009 10:53:05 +0200 Subject: bugfix and testbench improvement made shutdown more reliable by makeing sure that the main queue DA worker is only cancelled if this is actually unavoidable. Also moved down the deletion of rsyslogd's pid file to immediately before termination, so that absence of the file is a proper indication that rsyslogd has finished (in the past, e.g. the testbench accidently ran two intances as the pid file was deleted too early). Also some improvments to the testbench, namely to handle aborts more intelligently (but still not perfect). --- runtime/queue.c | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index cb14b58d..96ebd6d5 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1188,7 +1188,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) if(iRetLocal == RS_RET_TIMED_OUT) { DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); } else { - DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n"); + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n"); } } @@ -1247,13 +1247,31 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This + * is necessary because we may be in a situation where the DA queue regular worker and the + * main queue worker stopped rather quickly. In this case, there is almost no time (and + * probably no thread switch!) between the point where we instructed the main queue DA + * worker to shutdown and this code location. In consequence, it may not even have + * noticed that it should should down, less acutally done this. So we provide it with a + * fixed 100ms timeout to try complete its work, what usually should be sufficient. + * rgerhards, 2009-10-06 + */ + timeoutComp(&tTimeout, 100); + DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " + "(this is not good, but probably OK)\n"); + } else { + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n"); + } } RETiRet; } -/* This function cancels all remenaing regular workers for both the main and the DA +/* This function cancels all remaining regular workers for both the main and the DA * queue. The main queue's DA worker pool continues to run (if it exists and is active). * rgerhards, 2009-05-29 */ -- cgit v1.2.3 From 5625dbd1b6cddb8b84d8a3d8c60f95eaaa49be66 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 7 Oct 2009 18:40:30 +0200 Subject: bugfix and testbench improvements - bugfix: solved potential (temporary) stall of messages when the queue was almost empty and few new data added (caused testbench to sometimes hang!) - fixed some race condition in testbench - added more elaborate diagnostics to parts of the testbench - solved a potential race inside the queue engine --- runtime/queue.c | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 96ebd6d5..101052a1 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1669,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); - d_pthread_mutex_unlock(pThis->mut); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1776,9 +1775,7 @@ RateLimiter(qqueue_t *pThis) } -/* This dequeues the next batch and checks if the queue is empty. If it is - * empty, return RS_RET_IDLE. That will trigger termination of the function - * and tell the upper layer caller to initiate idle processing. +/* This dequeues the next batch. * rgerhards, 2009-05-20 */ static inline rsRetVal @@ -1789,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); +dbgprintf("YYY: deqeueu for consumer"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) ABORT_FINALIZE(RS_RET_IDLE); + finalize_it: RETiRet; } @@ -1832,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit @@ -1844,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; @@ -1869,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs @@ -1878,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; @@ -2531,6 +2544,7 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); +dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); -- cgit v1.2.3