diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 87 |
1 files changed, 31 insertions, 56 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 8c610b11..8fb734e7 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -81,11 +81,10 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti); -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDestructDisk(qqueue_t *pThis); /* some constants for queuePersist () */ @@ -705,9 +704,13 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) pThis->qType = QUEUETYPE_DIRECT; pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; if(pThis->pqParent != NULL) { DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); pThis->pqParent->bIsDA = 0; @@ -957,13 +960,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) +static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; batch_state_t batchState = BATCH_STATE_RDY; - sbool active = 1; - int i; DEFiRet; //TODO: init batchObj (states _OK and new fields -- CHECK) @@ -983,46 +984,29 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; - singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); - /* delete the batch string params: TODO: create its own "class" for this */ - for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { - free(batchObj.staticActStrings[i]); - } + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti); msgDestruct(&pMsg); RETiRet; } -/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead - * otherwise incured. -- rgerhards, ~2010-06-23 +/* this is called if we do not have a pWti. This currently only happens + * when we are called from a main queue in direct mode. If so, we need + * to obtain a dummy pWti. */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti) +static rsRetVal +qAddDirect(qqueue_t *pThis, msg_t* pMsg) { + wti_t *pWti; DEFiRet; - ASSERT(pThis != NULL); - - /* calling the consumer is quite different here than it is from a worker thread */ - /* we need to provide the consumer's return value back to the caller because in direct - * mode the consumer probably has a lot to convey (which get's lost in the other modes - * because they are asynchronous. But direct mode is deliberately synchronous. - * rgerhards, 2008-02-12 - * We use our knowledge about the batch_t structure below, but without that, we - * pay a too-large performance toll... -- rgerhards, 2009-04-22 - */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL); - + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + iRet = qAddDirectWithWti(pThis, pMsg, pWti); RETiRet; } -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) -{ - return RS_RET_OK; -} - - /* --------------- end type-specific handlers -------------------- */ @@ -1317,7 +1301,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*)) { DEFiRet; qqueue_t *pThis; @@ -1876,7 +1860,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate)); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2098,9 +2083,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; break; } @@ -2679,16 +2668,17 @@ finalize_it: /* now, the same function, but for direct mode */ static rsRetVal -qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti) +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int i; + wti_t *pWti; DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - assert(pMultiSub != NULL); + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); + CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2697,21 +2687,6 @@ finalize_it: /* ------------------------------ END multi-enqueue functions ------------------------------ */ -/* enqueue a new user data element in direct mode - * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better - * code later on (like multi submit!) 2010-06-10 - * Enqueues the new element and awakes worker thread. - */ -rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti) -{ - DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg, pWti); - RETiRet; -} - - /* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ |