summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c34
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c10
2 files changed, 28 insertions, 16 deletions
diff --git a/action.c b/action.c
index 1e2f95d6..aa9346dd 100644
--- a/action.c
+++ b/action.c
@@ -685,6 +685,22 @@ finalize_it:
}
+static rsRetVal
+actionCheckAndCreateWrkrInstance(action_t *pThis, wti_t *pWti)
+{
+ DEFiRet;
+ if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) {
+dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr);
+ DBGPRINTF("we need to create a new action worker instance for "
+ "action %d\n", pThis->iActionNbr);
+ CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
+ pThis->pModData));
+ pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
+ }
+finalize_it:
+ RETiRet;
+}
+
/* try to resume an action -- rgerhards, 2007-08-02
* changed to new action state engine -- rgerhards, 2009-05-07
*/
@@ -729,18 +745,19 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate)
+static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
assert(pThis != NULL);
+ CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
CHKiRet(actionTryResume(pThis, pbShutdownImmediate));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
*/
if(pThis->eState == ACT_STATE_RDY) {
- iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData);
+ iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
actionSetState(pThis, ACT_STATE_ITX);
@@ -913,14 +930,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
getActStateName(pThis), pThis->iActionNbr);
- if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) {
-dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr);
- DBGPRINTF("we need to create a new action worker instance for "
- "action %d\n", pThis->iActionNbr);
- CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
- pThis->pModData));
- pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
- }
+ CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
pThis->bHadAutoCommit = 0;
iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags,
@@ -969,7 +979,7 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
- CHKiRet(actionPrepare(pThis, pbShutdownImmediate));
+ CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
if(pThis->eState == ACT_STATE_ITX)
@@ -998,7 +1008,7 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
FINALIZE; /* nothing to do */
}
- CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate));
+ CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti));
if(pThis->eState == ACT_STATE_ITX) {
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 12a71cc9..b878050d 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -122,7 +122,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
{ "errorfile", eCmdHdlrGetWord, 0 },
- { "template", eCmdHdlrGetWord, 1 },
+ { "template", eCmdHdlrGetWord, 0 },
{ "dynbulkid", eCmdHdlrBinary, 0 },
{ "bulkid", eCmdHdlrGetWord, 0 },
};
@@ -142,6 +142,7 @@ ENDcreateInstance
BEGINcreateWrkrInstance
CODESTARTcreateWrkrInstance
+dbgprintf("omelasticsearch: createWrkrInstance\n");
pWrkrData->restURL = NULL;
if(pData->bulkmode) {
pWrkrData->batch.currTpl1 = NULL;
@@ -154,6 +155,7 @@ CODESTARTcreateWrkrInstance
}
CHKiRet(curlSetup(pWrkrData, pWrkrData->pData));
finalize_it:
+dbgprintf("DDDD: createWrkrInstance,pData %p/%p, pWrkrData %p\n", pData, pWrkrData->pData, pWrkrData);
ENDcreateWrkrInstance
BEGINisCompatibleWithFeature
@@ -666,7 +668,7 @@ finalize_it:
BEGINbeginTransaction
CODESTARTbeginTransaction
-dbgprintf("omelasticsearch: beginTransaction\n");
+dbgprintf("omelasticsearch: beginTransaction, pWrkrData %p, pData %p\n", pWrkrData, pWrkrData->pData);
if(!pWrkrData->pData->bulkmode) {
FINALIZE;
}
@@ -680,14 +682,14 @@ ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
- if(pData->bulkmode) {
+ if(pWrkrData->pData->bulkmode) {
CHKiRet(buildBatch(pWrkrData, ppString[0], ppString));
} else {
CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]),
ppString, 1));
}
finalize_it:
-dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
+dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pWrkrData->pData->bulkmode);
ENDdoAction