diff options
-rw-r--r-- | action.c | 34 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 10 |
2 files changed, 28 insertions, 16 deletions
@@ -685,6 +685,22 @@ finalize_it: } +static rsRetVal +actionCheckAndCreateWrkrInstance(action_t *pThis, wti_t *pWti) +{ + DEFiRet; + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { +dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr); + DBGPRINTF("we need to create a new action worker instance for " + "action %d\n", pThis->iActionNbr); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + } +finalize_it: + RETiRet; +} + /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ @@ -729,18 +745,19 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; assert(pThis != NULL); + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ if(pThis->eState == ACT_STATE_RDY) { - iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionSetState(pThis, ACT_STATE_ITX); @@ -913,14 +930,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", getActStateName(pThis), pThis->iActionNbr); - if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { -dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr); - DBGPRINTF("we need to create a new action worker instance for " - "action %d\n", pThis->iActionNbr); - CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), - pThis->pModData)); - pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; - } + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, @@ -969,7 +979,7 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); - CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(pThis->eState == ACT_STATE_ITX) @@ -998,7 +1008,7 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) FINALIZE; /* nothing to do */ } - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); + CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); if(pThis->eState == ACT_STATE_ITX) { iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 12a71cc9..b878050d 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -122,7 +122,7 @@ static struct cnfparamdescr actpdescr[] = { { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 }, + { "template", eCmdHdlrGetWord, 0 }, { "dynbulkid", eCmdHdlrBinary, 0 }, { "bulkid", eCmdHdlrGetWord, 0 }, }; @@ -142,6 +142,7 @@ ENDcreateInstance BEGINcreateWrkrInstance CODESTARTcreateWrkrInstance +dbgprintf("omelasticsearch: createWrkrInstance\n"); pWrkrData->restURL = NULL; if(pData->bulkmode) { pWrkrData->batch.currTpl1 = NULL; @@ -154,6 +155,7 @@ CODESTARTcreateWrkrInstance } CHKiRet(curlSetup(pWrkrData, pWrkrData->pData)); finalize_it: +dbgprintf("DDDD: createWrkrInstance,pData %p/%p, pWrkrData %p\n", pData, pWrkrData->pData, pWrkrData); ENDcreateWrkrInstance BEGINisCompatibleWithFeature @@ -666,7 +668,7 @@ finalize_it: BEGINbeginTransaction CODESTARTbeginTransaction -dbgprintf("omelasticsearch: beginTransaction\n"); +dbgprintf("omelasticsearch: beginTransaction, pWrkrData %p, pData %p\n", pWrkrData, pWrkrData->pData); if(!pWrkrData->pData->bulkmode) { FINALIZE; } @@ -680,14 +682,14 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - if(pData->bulkmode) { + if(pWrkrData->pData->bulkmode) { CHKiRet(buildBatch(pWrkrData, ppString[0], ppString)); } else { CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]), ppString, 1)); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pWrkrData->pData->bulkmode); ENDdoAction |