summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c87
1 files changed, 31 insertions, 56 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 8c610b11..8fb734e7 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -81,11 +81,10 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti);
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDestructDisk(qqueue_t *pThis);
/* some constants for queuePersist () */
@@ -705,9 +704,13 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
pThis->qType = QUEUETYPE_DIRECT;
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
+ /* these entry points shall not be used in direct mode
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
+ */
pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ pThis->qDel = NULL;
if(pThis->pqParent != NULL) {
DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
pThis->pqParent->bIsDA = 0;
@@ -957,13 +960,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
+static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
{
batch_t singleBatch;
batch_obj_t batchObj;
batch_state_t batchState = BATCH_STATE_RDY;
- sbool active = 1;
- int i;
DEFiRet;
//TODO: init batchObj (states _OK and new fields -- CHECK)
@@ -983,46 +984,29 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
- singleBatch.active = &active;
- iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
- /* delete the batch string params: TODO: create its own "class" for this */
- for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
- free(batchObj.staticActStrings[i]);
- }
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti);
msgDestruct(&pMsg);
RETiRet;
}
-/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
- * otherwise incured. -- rgerhards, ~2010-06-23
+/* this is called if we do not have a pWti. This currently only happens
+ * when we are called from a main queue in direct mode. If so, we need
+ * to obtain a dummy pWti.
*/
-rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti)
+static rsRetVal
+qAddDirect(qqueue_t *pThis, msg_t* pMsg)
{
+ wti_t *pWti;
DEFiRet;
- ASSERT(pThis != NULL);
-
- /* calling the consumer is quite different here than it is from a worker thread */
- /* we need to provide the consumer's return value back to the caller because in direct
- * mode the consumer probably has a lot to convey (which get's lost in the other modes
- * because they are asynchronous. But direct mode is deliberately synchronous.
- * rgerhards, 2008-02-12
- * We use our knowledge about the batch_t structure below, but without that, we
- * pay a too-large performance toll... -- rgerhards, 2009-04-22
- */
- iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL);
-
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ iRet = qAddDirectWithWti(pThis, pMsg, pWti);
RETiRet;
}
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
-{
- return RS_RET_OK;
-}
-
-
/* --------------- end type-specific handlers -------------------- */
@@ -1317,7 +1301,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1876,7 +1860,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
- CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate));
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -2098,9 +2083,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
case QUEUETYPE_DIRECT:
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
+ /* these entry points shall not be used in direct mode
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
+ */
pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ pThis->qDel = NULL;
break;
}
@@ -2679,16 +2668,17 @@ finalize_it:
/* now, the same function, but for direct mode */
static rsRetVal
-qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti)
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int i;
+ wti_t *pWti;
DEFiRet;
- ISOBJ_TYPE_assert(pThis, qqueue);
- assert(pMultiSub != NULL);
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
+ CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
@@ -2697,21 +2687,6 @@ finalize_it:
/* ------------------------------ END multi-enqueue functions ------------------------------ */
-/* enqueue a new user data element in direct mode
- * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better
- * code later on (like multi submit!) 2010-06-10
- * Enqueues the new element and awakes worker thread.
- */
-rsRetVal
-qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti)
-{
- DEFiRet;
- ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pMsg, pWti);
- RETiRet;
-}
-
-
/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/