summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c165
1 files changed, 83 insertions, 82 deletions
diff --git a/action.c b/action.c
index c46effc6..29725046 100644
--- a/action.c
+++ b/action.c
@@ -119,10 +119,10 @@
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
-static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
-static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch);
-static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch);
-static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch);
+static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*);
+static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
+static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
+static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -175,10 +175,9 @@ configSettings_t cs_save; /* our saved (scope!) config settings */
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
* to have during one instance of an rsyslogd run. For example, I use them to name actions when there
- * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep
- * counting. -- rgerhards, 2008-01-29
+ * is no better name available.
*/
-static int iActionNbr = 0;
+int iActionNbr = 0;
/* tables for interfacing with the v6 config system */
static struct cnfparamdescr cnfparamdescr[] = {
@@ -308,7 +307,6 @@ rsRetVal actionDestruct(action_t *pThis)
pThis->pMod->freeInstance(pThis->pModData);
pthread_mutex_destroy(&pThis->mutAction);
- pthread_mutex_destroy(&pThis->mutActExec);
d_free(pThis->pszName);
d_free(pThis->ppTpl);
@@ -341,7 +339,7 @@ rsRetVal actionConstruct(action_t **ppThis)
pThis->bExecWhenPrevSusp = 0;
pThis->bRepMsgHasMsg = 0;
pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */
- pthread_mutex_init(&pThis->mutActExec, NULL);
+ pThis->iActionNbr = iActionNbr;
pthread_mutex_init(&pThis->mutAction, NULL);
INIT_ATOMIC_HELPER_MUT(pThis->mutCAS);
@@ -428,7 +426,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst)
* spec. -- rgerhards, 2008-01-30
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize,
- (rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
+ processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszAName);
qqueueSetpAction(pThis->pQueue, pThis);
@@ -635,7 +633,7 @@ static inline void actionSuspend(action_t *pThis)
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
static inline rsRetVal
-actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
+actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
{
int iRetries;
int iSleepPeriod;
@@ -647,7 +645,7 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
iRetries = 0;
while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
- iRet = pThis->pMod->tryResume(pThis->pModData);
+ iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) {
bTreatOKasSusp = 1;
@@ -687,10 +685,26 @@ finalize_it:
}
+static rsRetVal
+actionCheckAndCreateWrkrInstance(action_t *pThis, wti_t *pWti)
+{
+ DEFiRet;
+ if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) {
+dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr);
+ DBGPRINTF("we need to create a new action worker instance for "
+ "action %d\n", pThis->iActionNbr);
+ CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
+ pThis->pModData));
+ pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
+ }
+finalize_it:
+ RETiRet;
+}
+
/* try to resume an action -- rgerhards, 2007-08-02
* changed to new action state engine -- rgerhards, 2009-05-07
*/
-static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
+static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
@@ -714,7 +728,7 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
if(pThis->eState == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, pbShutdownImmediate));
+ CHKiRet(actionDoRetry(pThis, pbShutdownImmediate, pWti));
}
if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
@@ -731,18 +745,19 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate)
+static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
assert(pThis != NULL);
- CHKiRet(actionTryResume(pThis, pbShutdownImmediate));
+ CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
+ CHKiRet(actionTryResume(pThis, pbShutdownImmediate, pWti));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
*/
if(pThis->eState == ACT_STATE_RDY) {
- iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData);
+ iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
actionSetState(pThis, ACT_STATE_ITX);
@@ -905,17 +920,21 @@ done: RETiRet;
* rgerhards, 2008-01-28
*/
rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
+actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
- DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
+ DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
+ getActStateName(pThis), pThis->iActionNbr);
+
+ CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData);
+ iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags,
+ pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -952,19 +971,19 @@ finalize_it:
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-static inline rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate)
+static rsRetVal
+actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
- CHKiRet(actionPrepare(pThis, pbShutdownImmediate));
+ CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
if(pThis->eState == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
+ CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti));
iRet = getReturnCode(pThis);
finalize_it:
@@ -977,7 +996,7 @@ finalize_it:
* rgerhards, 2008-01-28
*/
static rsRetVal
-finishBatch(action_t *pThis, batch_t *pBatch)
+finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
{
int i;
DEFiRet;
@@ -989,9 +1008,9 @@ finishBatch(action_t *pThis, batch_t *pBatch)
FINALIZE; /* nothing to do */
}
- CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate));
+ CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti));
if(pThis->eState == ACT_STATE_ITX) {
- iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData);
+ iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -1034,7 +1053,7 @@ finalize_it:
* rgerhards, 2009-05-12
*/
static inline rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
+tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti)
{
int i;
int iElemProcessed;
@@ -1059,7 +1078,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
if(batchIsValidElem(pBatch, i)) {
pMsg = pBatch->pElem[i].pMsg;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
- pBatch->pbShutdownImmediate);
+ pBatch->pbShutdownImmediate, pWti);
DBGPRINTF("action %p call returned %d\n", pAction, localRet);
/* Note: we directly modify the batch object state, because we know that
* wo do not overwrite BATCH_STATE_DISC indicators!
@@ -1111,7 +1130,7 @@ finalize_it:
* rgerhards, 2009-05-12
*/
static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
+submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti)
{
int i;
int bDone;
@@ -1125,7 +1144,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
wasDoneTo = pBatch->iDoneUpTo;
bDone = 0;
do {
- localRet = tryDoAction(pAction, pBatch, &nElem);
+ localRet = tryDoAction(pAction, pBatch, &nElem, pWti);
if(localRet == RS_RET_FORCE_TERM) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
@@ -1135,7 +1154,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
/* try commit transaction, once done, we can simply do so as if
* that return state was returned from tryDoAction().
*/
- localRet = finishBatch(pAction, pBatch);
+ localRet = finishBatch(pAction, pBatch, pWti);
}
if( localRet == RS_RET_OK
@@ -1164,8 +1183,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
"for iRet %d\n", localRet);
- submitBatch(pAction, pBatch, nElem / 2);
- submitBatch(pAction, pBatch, nElem - (nElem / 2));
+ submitBatch(pAction, pBatch, nElem / 2, pWti);
+ submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti);
bDone = 1;
}
}
@@ -1245,31 +1264,31 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR
* rgerhards, 2009-05-12
*/
static inline rsRetVal
-processAction(action_t *pAction, batch_t *pBatch)
+processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti)
{
DEFiRet;
assert(pBatch != NULL);
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem));
- iRet = finishBatch(pAction, pBatch);
+ CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti));
+ iRet = finishBatch(pAction, pBatch, pWti);
finalize_it:
RETiRet;
}
-#pragma GCC diagnostic ignored "-Wempty-body"
/* receive an array of to-process user pointers and submit them
* for processing.
* rgerhards, 2009-04-22
*/
static rsRetVal
-processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
+processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
{
int *pbShutdownImmdtSave;
sbool *activeSave;
int bMustRestoreActivePtr = 0;
rsRetVal localRet;
+ action_t *pAction = (action_t*) pVoid;
DEFiRet;
assert(pBatch != NULL);
@@ -1280,17 +1299,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
}
CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr));
- /* We now must guard the output module against execution by multiple threads. The
- * plugin interface specifies that output modules must not be thread-safe (except
- * if they notify us they are - functionality not yet implemented...).
- * rgerhards, 2008-01-30
- */
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
-
- iRet = processAction(pAction, pBatch);
-
- pthread_cleanup_pop(1); /* unlock mutex */
+ iRet = processAction(pAction, pBatch, pWti);
/* even if processAction failed, we need to release the batch (else we
* have a memory leak). So we do this first, and then check if we need to
@@ -1312,15 +1321,12 @@ finalize_it:
pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
RETiRet;
}
-#pragma GCC diagnostic warning "-Wempty-body"
-/* call the HUP handler for a given action, if such a handler is defined. The
- * action mutex is locked, because the HUP handler most probably needs to modify
- * some internal state information.
- * rgerhards, 2008-10-22
+/* call the HUP handler for a given action, if such a handler is defined.
+ * Note that the action must be able to service HUP requests concurrently
+ * to any current doAction() processing.
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
actionCallHUPHdlr(action_t *pAction)
{
@@ -1329,19 +1335,13 @@ actionCallHUPHdlr(action_t *pAction)
ASSERT(pAction != NULL);
DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP);
- if(pAction->pMod->doHUP == NULL) {
- FINALIZE; /* no HUP handler, so we are done ;) */
+ if(pAction->pMod->doHUP != NULL) {
+ CHKiRet(pAction->pMod->doHUP(pAction->pModData));
}
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- CHKiRet(pAction->pMod->doHUP(pAction->pModData));
- pthread_cleanup_pop(1); /* unlock mutex */
-
finalize_it:
RETiRet;
}
-#pragma GCC diagnostic warning "-Wempty-body"
/* set the action message queue mode
@@ -1381,7 +1381,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
* rgerhards, 2010-06-08
*/
static inline rsRetVal
-doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
@@ -1392,7 +1392,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg));
+ iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti);
else
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
@@ -1409,7 +1409,7 @@ finalize_it:
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction, msg_t *pMsg)
+actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
@@ -1464,7 +1464,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pMsg);
+ iRet = doSubmitToActionQ(pAction, pMsg, pWti);
finalize_it:
RETiRet;
@@ -1475,7 +1475,7 @@ finalize_it:
* pthread_cleanup_push() POSIX macro...
*/
static inline rsRetVal
-doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
+doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti)
{
msg_t *pMsg;
DEFiRet;
@@ -1490,7 +1490,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
}
/* call the output driver */
- iRet = actionWriteToAction(pAction, pMsg);
+ iRet = actionWriteToAction(pAction, pMsg, pWti);
finalize_it:
/* we need to update the batch to handle failover processing correctly */
@@ -1550,7 +1550,7 @@ activateActions(void)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
+doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
time_t now = 0;
time_t lastAct;
@@ -1588,7 +1588,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
}
}
- iRet = doSubmitToActionQBatch(pAction, pBatch);
+ iRet = doSubmitToActionQBatch(pAction, pBatch, pWti);
free(pBatch->active);
pBatch->active = activeSave;
@@ -1613,7 +1613,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
* rgerhards, 2011-06-16
*/
static inline rsRetVal
-doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
+doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
sbool bNeedSubmit;
sbool *activeSave;
@@ -1645,14 +1645,14 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
if(bNeedSubmit) {
/* note: stats were already computed above */
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
} else {
DBGPRINTF("no need to submit batch, all invalid\n");
}
} else {
if(GatherStats)
countStatsBatchEnq(pAction, pBatch);
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
}
free(pBatch->active);
@@ -1666,7 +1666,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
+doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
int i;
DEFiRet;
@@ -1674,7 +1674,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
- iRet = doQueueEnqObjDirectBatch(pAction, pBatch);
+ iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti);
} else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
@@ -1684,7 +1684,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( batchIsValidElem(pBatch, i)
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
- doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg);
+ doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti);
}
}
}
@@ -1699,7 +1699,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
* rgerhards, 2010-06-23
*/
static inline rsRetVal
-helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
+helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
int i;
DEFiRet;
@@ -1712,7 +1712,7 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( batchIsValidElem(pBatch, i)
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
- doActionCallAction(pAction, pBatch, i);
+ doActionCallAction(pAction, pBatch, i, pWti);
}
}
@@ -1724,13 +1724,14 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static rsRetVal
-doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
+doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
DEFiRet;
d_pthread_mutex_lock(&pAction->mutAction);
+dbgprintf("DDDD: locked mutAction\n");
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
- iRet = helperSubmitToActionQComplexBatch(pAction, pBatch);
+ iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti);
d_pthread_mutex_unlock(&pAction->mutAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */