diff options
-rw-r--r-- | runtime/queue.c | 5 | ||||
-rw-r--r-- | runtime/wti.c | 9 |
2 files changed, 11 insertions, 3 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 5770eae9..0fe95876 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1680,6 +1680,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) /* The rate limiter * + * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when + * it actually delays processing. Otherwise inputs are stalled. + * * Here we may wait if a dequeue time window is defined or if we are * rate-limited. TODO: If we do so, we should also look into the * way new worker threads are spawned. Obviously, it doesn't make much @@ -1765,8 +1768,10 @@ RateLimiter(qqueue_t *pThis) } if(iDelay > 0) { + pthread_mutex_unlock(pThis->mut); DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); srSleep(iDelay, 0); + pthread_mutex_lock(pThis->mut); } RETiRet; diff --git a/runtime/wti.c b/runtime/wti.c index 2c6c5123..c02d0573 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -310,7 +310,12 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis); * is done, as part of pWtp->pfDoWork() processing. Note that this * function is required to re-lock it when done. We cannot do the * lock/unlock here ourselfs, as pfDoWork() needs to access queue - * structures itself. -- rgerhards, 2013-11-20 + * structures itself. + * The same goes for pfRateLimiter(). While we could unlock/lock when + * we call it, in practice the function is often called without any + * ratelimiting actually done. Only the rate limiter itself knows + * that. As such, it needs to bear the burden of doing the locking + * when required. -- rgerhards, 2013-11-20 */ d_pthread_mutex_lock(pWtp->pmutUsr); while(1) { /* loop will be broken below */ @@ -329,8 +334,6 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis); } /* try to execute and process whatever we have */ - /* Note that this function releases and re-aquires the mutex. - */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { |