From ede2e4ccf225b6f02312ba7d5523dca8268acd67 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 20 Nov 2013 17:01:39 +0100 Subject: regression fix: prevent queue stall if ratelimiter is used Thanks to Pavel Levshin for alerting us. This regression was introduced roughly 2 hours ago and was never released. --- runtime/queue.c | 5 +++++ runtime/wti.c | 9 ++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index 5770eae9..0fe95876 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1679,6 +1679,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) /* The rate limiter + * + * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when + * it actually delays processing. Otherwise inputs are stalled. * * Here we may wait if a dequeue time window is defined or if we are * rate-limited. TODO: If we do so, we should also look into the @@ -1765,8 +1768,10 @@ RateLimiter(qqueue_t *pThis) } if(iDelay > 0) { + pthread_mutex_unlock(pThis->mut); DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); srSleep(iDelay, 0); + pthread_mutex_lock(pThis->mut); } RETiRet; diff --git a/runtime/wti.c b/runtime/wti.c index 2c6c5123..c02d0573 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -310,7 +310,12 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis); * is done, as part of pWtp->pfDoWork() processing. Note that this * function is required to re-lock it when done. We cannot do the * lock/unlock here ourselfs, as pfDoWork() needs to access queue - * structures itself. -- rgerhards, 2013-11-20 + * structures itself. + * The same goes for pfRateLimiter(). While we could unlock/lock when + * we call it, in practice the function is often called without any + * ratelimiting actually done. Only the rate limiter itself knows + * that. As such, it needs to bear the burden of doing the locking + * when required. -- rgerhards, 2013-11-20 */ d_pthread_mutex_lock(pWtp->pmutUsr); while(1) { /* loop will be broken below */ @@ -329,8 +334,6 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis); } /* try to execute and process whatever we have */ - /* Note that this function releases and re-aquires the mutex. - */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { -- cgit v1.2.3