diff options
-rw-r--r-- | action.c | 10 | ||||
-rw-r--r-- | runtime/queue.c | 23 | ||||
-rw-r--r-- | runtime/queue.h | 4 |
3 files changed, 18 insertions, 19 deletions
@@ -1396,7 +1396,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti) STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); + iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti); else iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); @@ -1617,7 +1617,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch) * rgerhards, 2011-06-16 */ static inline rsRetVal -doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) +doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) { sbool bNeedSubmit; sbool *activeSave; @@ -1649,14 +1649,14 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) } if(bNeedSubmit) { /* note: stats were already computed above */ - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); } else { DBGPRINTF("no need to submit batch, all invalid\n"); } } else { if(GatherStats) countStatsBatchEnq(pAction, pBatch); - iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti); } free(pBatch->active); @@ -1678,7 +1678,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti) DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { - iRet = doQueueEnqObjDirectBatch(pAction, pBatch); + iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti); } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ diff --git a/runtime/queue.c b/runtime/queue.c index 92ac2425..968c016e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -959,7 +959,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; @@ -986,7 +986,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, NULL, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); @@ -999,7 +999,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) /* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead * otherwise incured. -- rgerhards, ~2010-06-23 */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti) { DEFiRet; @@ -1013,8 +1013,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ -#warning TODO: handle wti ptr! - iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL, NULL); + iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL); RETiRet; } @@ -2676,7 +2675,7 @@ finalize_it: /* now, the same function, but for direct mode */ static rsRetVal -qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti) { int i; DEFiRet; @@ -2685,7 +2684,7 @@ qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) assert(pMultiSub != NULL); for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2700,16 +2699,16 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg); + iRet = qAddDirect(pThis, pMsg, pWti); RETiRet; } -/* enqueue a new user data element +/* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ rsRetVal diff --git a/runtime/queue.h b/runtime/queue.h index 01b4f351..91900b30 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -195,14 +195,14 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg); +rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*)); -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti); int queueCnfParamsSet(struct nvlst *lst); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst); void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); |