summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c10
-rw-r--r--runtime/queue.c23
-rw-r--r--runtime/queue.h4
3 files changed, 18 insertions, 19 deletions
diff --git a/action.c b/action.c
index 9ea3aa65..7cc45c37 100644
--- a/action.c
+++ b/action.c
@@ -1396,7 +1396,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti)
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg));
+ iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti);
else
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
@@ -1617,7 +1617,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
* rgerhards, 2011-06-16
*/
static inline rsRetVal
-doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
+doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
sbool bNeedSubmit;
sbool *activeSave;
@@ -1649,14 +1649,14 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
if(bNeedSubmit) {
/* note: stats were already computed above */
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
} else {
DBGPRINTF("no need to submit batch, all invalid\n");
}
} else {
if(GatherStats)
countStatsBatchEnq(pAction, pBatch);
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
}
free(pBatch->active);
@@ -1678,7 +1678,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
- iRet = doQueueEnqObjDirectBatch(pAction, pBatch);
+ iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti);
} else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
diff --git a/runtime/queue.c b/runtime/queue.c
index 92ac2425..968c016e 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@@ -959,7 +959,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
{
batch_t singleBatch;
batch_obj_t batchObj;
@@ -986,7 +986,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
singleBatch.active = &active;
- iRet = pThis->pConsumer(pThis->pAction, &singleBatch, NULL, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
free(batchObj.staticActStrings[i]);
@@ -999,7 +999,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
* otherwise incured. -- rgerhards, ~2010-06-23
*/
-rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti)
{
DEFiRet;
@@ -1013,8 +1013,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
-#warning TODO: handle wti ptr!
- iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL, NULL);
+ iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL);
RETiRet;
}
@@ -2676,7 +2675,7 @@ finalize_it:
/* now, the same function, but for direct mode */
static rsRetVal
-qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti)
{
int i;
DEFiRet;
@@ -2685,7 +2684,7 @@ qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
assert(pMultiSub != NULL);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
+ CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
@@ -2700,16 +2699,16 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg)
+qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pMsg);
+ iRet = qAddDirect(pThis, pMsg, pWti);
RETiRet;
}
-/* enqueue a new user data element
+/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
diff --git a/runtime/queue.h b/runtime/queue.h
index 01b4f351..91900b30 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -195,14 +195,14 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg);
+rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*));
-rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti);
int queueCnfParamsSet(struct nvlst *lst);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);