summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c51
1 files changed, 33 insertions, 18 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 5d9296b1..a04e4f4d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti);
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@@ -708,13 +708,12 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
/* these entry points shall not be used in direct mode
- To catch program errors, make us abort if that happens!
- TODO: currently main q in direct mode WILL abort!
- rgerhards, 2013-11-05
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
*/
- pThis->qAdd = NULL;
+ pThis->qAdd = qAddDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
pThis->qDel = NULL;
- pThis->MultiEnq = NULL;
if(pThis->pqParent != NULL) {
DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
pThis->pqParent->bIsDA = 0;
@@ -964,7 +963,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
+static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
{
batch_t singleBatch;
batch_obj_t batchObj;
@@ -994,6 +993,22 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
RETiRet;
}
+/* this is called if we do not have a pWti. This currently only happens
+ * when we are called from a main queue in direct mode. If so, we need
+ * to obtain a dummy pWti.
+ */
+static rsRetVal
+qAddDirect(qqueue_t *pThis, msg_t* pMsg)
+{
+ wti_t *pWti;
+ DEFiRet;
+
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ iRet = qAddDirectWithWti(pThis, pMsg, pWti);
+ RETiRet;
+}
+
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
@@ -2077,13 +2092,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
/* these entry points shall not be used in direct mode
- To catch program errors, make us abort if that happens!
- TODO: currently main q in direct mode WILL abort!
- rgerhards, 2013-11-05
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
*/
- pThis->qAdd = NULL;
+ pThis->qAdd = qAddDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
pThis->qDel = NULL;
- pThis->MultiEnq = NULL;
break;
}
@@ -2656,16 +2670,17 @@ finalize_it:
/* now, the same function, but for direct mode */
static rsRetVal
-qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti)
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int i;
+ wti_t *pWti;
DEFiRet;
- ISOBJ_TYPE_assert(pThis, qqueue);
- assert(pMultiSub != NULL);
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
+ CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
@@ -2684,7 +2699,7 @@ qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pMsg, pWti);
+ iRet = qAddDirectWithWti(pThis, pMsg, pWti);
RETiRet;
}