diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 93 |
1 files changed, 38 insertions, 55 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 8cbce011..ff1b30f4 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -86,7 +86,6 @@ static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiS static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDestructDisk(qqueue_t *pThis); rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir); @@ -709,9 +708,13 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) pThis->qType = QUEUETYPE_DIRECT; pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; if(pThis->pqParent != NULL) { DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); pThis->pqParent->bIsDA = 0; @@ -961,13 +964,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) +static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; batch_state_t batchState = BATCH_STATE_RDY; - sbool active = 1; - int i; DEFiRet; //TODO: init batchObj (states _OK and new fields -- CHECK) @@ -987,46 +988,29 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; - singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); - /* delete the batch string params: TODO: create its own "class" for this */ - for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { - free(batchObj.staticActStrings[i]); - } + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti); msgDestruct(&pMsg); RETiRet; } -/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead - * otherwise incured. -- rgerhards, ~2010-06-23 +/* this is called if we do not have a pWti. This currently only happens + * when we are called from a main queue in direct mode. If so, we need + * to obtain a dummy pWti. */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +static rsRetVal +qAddDirect(qqueue_t *pThis, msg_t* pMsg) { + wti_t *pWti; DEFiRet; - ASSERT(pThis != NULL); - - /* calling the consumer is quite different here than it is from a worker thread */ - /* we need to provide the consumer's return value back to the caller because in direct - * mode the consumer probably has a lot to convey (which get's lost in the other modes - * because they are asynchronous. But direct mode is deliberately synchronous. - * rgerhards, 2008-02-12 - * We use our knowledge about the batch_t structure below, but without that, we - * pay a too-large performance toll... -- rgerhards, 2009-04-22 - */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL); - + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + iRet = qAddDirectWithWti(pThis, pMsg, pWti); RETiRet; } -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) -{ - return RS_RET_OK; -} - - /* --------------- end type-specific handlers -------------------- */ @@ -1321,7 +1305,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*)) { DEFiRet; qqueue_t *pThis; @@ -1700,6 +1684,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) /* The rate limiter * + * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when + * it actually delays processing. Otherwise inputs are stalled. + * * Here we may wait if a dequeue time window is defined or if we are * rate-limited. TODO: If we do so, we should also look into the * way new worker threads are spawned. Obviously, it doesn't make much @@ -1785,8 +1772,10 @@ RateLimiter(qqueue_t *pThis) } if(iDelay > 0) { + pthread_mutex_unlock(pThis->mut); DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); srSleep(iDelay, 0); + pthread_mutex_lock(pThis->mut); } RETiRet; @@ -1880,7 +1869,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2103,9 +2093,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; break; } @@ -2169,9 +2163,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iLowWtrMrk = 1; } } + if( pThis->iMinMsgsPerWrkr < 1 - || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) { pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; + } + if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) { pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; if(pThis->iFullDlyMrk == 0) { @@ -2777,13 +2774,14 @@ static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int i; + wti_t *pWti; DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - assert(pMultiSub != NULL); + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); + CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2792,22 +2790,7 @@ finalize_it: /* ------------------------------ END multi-enqueue functions ------------------------------ */ -/* enqueue a new user data element in direct mode - * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better - * code later on (like multi submit!) 2010-06-10 - * Enqueues the new element and awakes worker thread. - */ -rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) -{ - DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg); - RETiRet; -} - - -/* enqueue a new user data element +/* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ rsRetVal |