summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/queue.c29
-rw-r--r--runtime/queue.h7
2 files changed, 23 insertions, 13 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 952edb0f..5d9296b1 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -707,9 +707,14 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
pThis->qType = QUEUETYPE_DIRECT;
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
- pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
- pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ /* these entry points shall not be used in direct mode
+ To catch program errors, make us abort if that happens!
+ TODO: currently main q in direct mode WILL abort!
+ rgerhards, 2013-11-05
+ */
+ pThis->qAdd = NULL;
+ pThis->qDel = NULL;
+ pThis->MultiEnq = NULL;
if(pThis->pqParent != NULL) {
DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
pThis->pqParent->bIsDA = 0;
@@ -983,7 +988,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
- iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti);
msgDestruct(&pMsg);
RETiRet;
@@ -1289,7 +1294,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1848,7 +1853,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
- CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate));
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -2070,9 +2076,14 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
case QUEUETYPE_DIRECT:
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
- pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
- pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ /* these entry points shall not be used in direct mode
+ To catch program errors, make us abort if that happens!
+ TODO: currently main q in direct mode WILL abort!
+ rgerhards, 2013-11-05
+ */
+ pThis->qAdd = NULL;
+ pThis->qDel = NULL;
+ pThis->MultiEnq = NULL;
break;
}
diff --git a/runtime/queue.h b/runtime/queue.h
index 45eac45d..e5eef168 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -103,11 +103,10 @@ struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,batch_t*, wti_t*,int*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,batch_t*, wti_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
- * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
- * during normal operations and one if the consumer must urgently shut down.
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -201,7 +200,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *));
int queueCnfParamsSet(struct nvlst *lst);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);