From f8019d52f83884acb5e8f8755fe976d1592b4ccb Mon Sep 17 00:00:00 2001 From: Nathan Scott Date: Mon, 15 Aug 2011 22:22:13 +1000 Subject: add elasticsearch output module Add support for sending events to elasticsearch - a distributed, RESTful, search engine built on Lucene (www.elasticsearch.org). The output module is enabled via a configure option, and uses libcurl to send the messages from rsyslog to elasticsearch. This patch makes use of the earlier JSON quoting patch to ensure valid JSON strings are sent to the server. Signed-off-by: Nathan Scott --- plugins/omelasticsearch/omelasticsearch.c | 271 ++++++++++++++++++++++++++++++ 1 file changed, 271 insertions(+) create mode 100644 plugins/omelasticsearch/omelasticsearch.c (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c new file mode 100644 index 00000000..ce2e0c1f --- /dev/null +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -0,0 +1,271 @@ +/* omelasticsearch.c + * This is the http://www.elasticsearch.org/ output module. + * + * NOTE: read comments in module-template.h for more specifics! + * + * Copyright 2011 Nathan Scott. + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "statsobj.h" +#include "cfsysline.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP + +/* internal structures */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) + +statsobj_t *indexStats; +STATSCOUNTER_DEF(indexConFail, mutIndexConFail) +STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) +STATSCOUNTER_DEF(indexFailed, mutIndexFailed) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) + +/* REST API for elasticsearch hits this URL: + * http://:// + */ +typedef struct curl_slist HEADER; +typedef struct _instanceData { + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ +} instanceData; + +/* config variables */ +static int restPort = 9200; +static char *hostName = "localhost"; +static char *searchIndex = "system"; +static char *searchType = "events"; + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +BEGINfreeInstance +CODESTARTfreeInstance + if (pData->postHeader) { + curl_slist_free_all(pData->postHeader); + pData->postHeader = NULL; + } + if (pData->curlHandle) { + curl_easy_cleanup(pData->curlHandle); + pData->curlHandle = NULL; + } +ENDfreeInstance + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + +rsRetVal +curlPost(instanceData *instance, uchar *message) +{ + CURLcode code; + CURL *curl = instance->curlHandle; + int length = strlen((char *)message); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + code = curl_easy_perform(curl); + switch (code) { + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_CONNECT: + case CURLE_WRITE_ERROR: + STATSCOUNTER_INC(indexConFail, mutIndexConFail); + return RS_RET_SUSPENDED; + default: + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + return RS_RET_OK; + } +} + +BEGINdoAction +CODESTARTdoAction + CHKiRet(curlPost(pData, ppString[0])); +finalize_it: +ENDdoAction + +/* elasticsearch POST result string ... useful for debugging */ +size_t +curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) +{ + unsigned int i; + char *p = (char *)ptr; + char *jsonData = (char *)userdata; + static char ok[] = "{\"ok\":true,"; + + ASSERT(size == 1); + + if (size == 1 && + nmemb > sizeof(ok)-1 && + strncmp(p, ok, sizeof(ok)-1) == 0) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else { + STATSCOUNTER_INC(indexFailed, mutIndexFailed); + if (Debug) { + DBGPRINTF("omelasticsearch request: %s\n", jsonData); + DBGPRINTF("omelasticsearch result: "); + for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); + DBGPRINTF("\n"); + } + } + return size * nmemb; +} + +static rsRetVal +curlSetup(instanceData *instance) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + HEADER *header; + CURL *handle; + + handle = curl_easy_init(); + if (handle == NULL) { + return RS_RET_OBJ_CREATION_FAILED; + } + + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + hostName, restPort, searchIndex, searchType); + header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_URL, restURL); + curl_easy_setopt(handle, CURLOPT_POST, 1); + + instance->curlHandle = handle; + instance->postHeader = header; + + DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + return RS_RET_OK; +} + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + + /* check if a non-standard template is to be applied */ + if(*(p-1) == ';') + --p; + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + + /* all good, we can now initialise our private data */ + CHKiRet(curlSetup(pData)); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINmodExit +CODESTARTmodExit + curl_global_cleanup(); + statsobj.Destruct(&indexStats); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + DEFiRet; + restPort = 9200; + hostName = "localhost"; + searchIndex = "system"; + searchType = "events"; + RETiRet; +} + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); + ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&indexStats)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", + ctrType_IntCtr, &indexConFail)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + ctrType_IntCtr, &indexSubmit)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", + ctrType_IntCtr, &indexFailed)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", + ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.ConstructFinalize(indexStats)); +ENDmodInit + +/* vi:set ai: + */ -- cgit v1.2.3 From 9911dacf59e898bae2c8c6619b85348bf54ddc23 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 17 Jan 2012 10:42:04 +0100 Subject: elasticsearch: move to asl 2.0 email conversation Nathan/Rainer 2012-01-16&17 --- plugins/omelasticsearch/omelasticsearch.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index ce2e0c1f..3bec1838 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -8,20 +8,19 @@ * * This file is part of rsyslog. * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see . - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include "config.h" #include "rsyslog.h" -- cgit v1.2.3 From 24179166f748464f4d556a80bd0c8dcc4cbd57b0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 5 Apr 2012 17:26:16 +0200 Subject: omelasticsearch: moved to v6 config system --- plugins/omelasticsearch/omelasticsearch.c | 126 +++++++++++++++++++++--------- 1 file changed, 91 insertions(+), 35 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 3bec1838..8965d40b 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -29,7 +29,7 @@ #include #include #include -#include +//#include #include #include #include @@ -46,6 +46,7 @@ MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omelasticsearch") /* internal structures */ DEF_OMOD_STATIC_DATA @@ -63,15 +64,30 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { + uchar *server; + int port; + uchar *searchIndex; + uchar *searchType; + uchar *tplName; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; -/* config variables */ -static int restPort = 9200; -static char *hostName = "localhost"; -static char *searchIndex = "system"; -static char *searchType = "events"; + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "server", eCmdHdlrGetWord, 0 }, + { "serverport", eCmdHdlrInt, 0 }, + { "searchindex", eCmdHdlrGetWord, 0 }, + { "searchtype", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; BEGINcreateInstance CODESTARTcreateInstance @@ -93,6 +109,10 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + free(pData->server); + free(pData->searchIndex); + free(pData->searchType); + free(pData->tplName); ENDfreeInstance BEGINdbgPrintInstInfo @@ -174,7 +194,7 @@ curlSetup(instanceData *instance) } snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - hostName, restPort, searchIndex, searchType); + instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); @@ -189,25 +209,77 @@ curlSetup(instanceData *instance) return RS_RET_OK; } -BEGINparseSelectorAct -CODESTARTparseSelectorAct -CODE_STD_STRING_REQUESTparseSelectorAct(1) - if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { - ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->server = NULL; + pData->port = 9200; + pData->searchIndex = NULL; + pData->searchType = NULL; + pData->tplName = NULL; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } - p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + CODE_STD_STRING_REQUESTparseSelectorAct(1) + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "server")) { + pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "serverport")) { + pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("omelasticsearch: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + " StdJSONFmt" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); - /* check if a non-standard template is to be applied */ - if(*(p-1) == ';') - --p; - CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + if(pData->server == NULL) + pData->server = (uchar*) strdup("localhost"); + if(pData->searchIndex == NULL) + pData->searchIndex = (uchar*) strdup("system"); + if(pData->searchType == NULL) + pData->searchType = (uchar*) strdup("events"); - /* all good, we can now initialise our private data */ +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); CHKiRet(curlSetup(pData)); +ENDnewActInst + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch supports only v6 config format, use: " + "action(type=\"omelasticsearch\" server=...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct + BEGINmodExit CODESTARTmodExit curl_global_cleanup(); @@ -220,18 +292,9 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt -static rsRetVal -resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) -{ - DEFiRet; - restPort = 9200; - hostName = "localhost"; - searchIndex = "system"; - searchType = "events"; - RETiRet; -} BEGINmodInit() CODESTARTmodInit @@ -240,13 +303,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); - /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); - if (curl_global_init(CURL_GLOBAL_ALL) != 0) { errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); -- cgit v1.2.3 From f9bb245a4ae06151897a4a68d6c22ba88a6ecee8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 13 Apr 2012 12:12:39 +0200 Subject: omelasticsearch: permit dynamic index/type parameters (just like dynafile) --- plugins/omelasticsearch/omelasticsearch.c | 123 ++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 16 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 8965d40b..1143de06 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -43,6 +43,7 @@ #include "errmsg.h" #include "statsobj.h" #include "cfsysline.h" +#include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -69,6 +70,8 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + sbool dynSrchIdx; + sbool dynSrchType; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -81,6 +84,8 @@ static struct cnfparamdescr actpdescr[] = { { "serverport", eCmdHdlrInt, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "dynsearchindex", eCmdHdlrBinary, 0 }, + { "dynsearchtype", eCmdHdlrBinary, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -123,12 +128,44 @@ BEGINtryResume CODESTARTtryResume ENDtryResume -rsRetVal -curlPost(instanceData *instance, uchar *message) + +static rsRetVal +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + uchar *searchIndex; + uchar *searchType; + + if(pData->dynSrchIdx) { + searchIndex = tpl1; + if(pData->dynSrchType) + searchType = tpl2; + else + searchType = pData->searchType; + } else { + searchIndex = pData->searchIndex; + if(pData->dynSrchType) + searchType = tpl1; + else + searchType = pData->searchType; + } + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); + return RS_RET_OK; +} + +static rsRetVal +curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; int length = strlen((char *)message); + DEFiRet; + + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -145,11 +182,13 @@ curlPost(instanceData *instance, uchar *message) STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); return RS_RET_OK; } +finalize_it: + RETiRet; } BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0])); + CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction @@ -181,10 +220,10 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) return size * nmemb; } + static rsRetVal -curlSetup(instanceData *instance) +curlSetup(instanceData *pData) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ HEADER *header; CURL *handle; @@ -193,19 +232,26 @@ curlSetup(instanceData *instance) return RS_RET_OBJ_CREATION_FAILED; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); - curl_easy_setopt(handle, CURLOPT_URL, restURL); curl_easy_setopt(handle, CURLOPT_POST, 1); - instance->curlHandle = handle; - instance->postHeader = header; + pData->curlHandle = handle; + pData->postHeader = header; - DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { + /* in this case, we know no tpls are involved --> NULL OK! */ + setCurlURL(pData, NULL, NULL); + } + + if(Debug) { + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + dbgprintf("omelasticsearch setup, using static REST URL\n"); + else + dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); + } return RS_RET_OK; } @@ -216,12 +262,15 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->dynSrchIdx = 0; + pData->dynSrchType = 0; pData->tplName = NULL; } BEGINnewActInst struct cnfparamvals *pvals; int i; + int iNumTpls; CODESTARTnewActInst if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); @@ -230,7 +279,6 @@ CODESTARTnewActInst CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); - CODE_STD_STRING_REQUESTparseSelectorAct(1) for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; @@ -238,10 +286,14 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; - } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { + pData->dynSrchIdx = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { + pData->dynSrchType = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { @@ -249,11 +301,49 @@ CODESTARTnewActInst "param '%s'\n", actpblk.descr[i].name); } } + + if(pData->dynSrchIdx && pData->searchIndex == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search index, but no " + "name for index template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + if(pData->dynSrchType && pData->searchType == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search type, but no " + "name for type template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + + iNumTpls = 1; + if(pData->dynSrchIdx) ++iNumTpls; + if(pData->dynSrchType) ++iNumTpls; + DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); + CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); + + /* we need to request additional templates. If we have a dynamic search index, + * it will always be string 1. Type may be 1 or 2, depending on whether search + * index is dynamic as well. Rule needs to be followed throughout the module. + */ + if(pData->dynSrchIdx) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } + if(pData->server == NULL) pData->server = (uchar*) strdup("localhost"); if(pData->searchIndex == NULL) @@ -261,9 +351,10 @@ CODESTARTnewActInst if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); + CHKiRet(curlSetup(pData)); + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); - CHKiRet(curlSetup(pData)); ENDnewActInst -- cgit v1.2.3 From 7d8ac48a4a214da917f7455f24b4c292c3e6d015 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 16 Apr 2012 16:49:07 +0200 Subject: omelasticsearch: added asnycRepl and timeout config params --- plugins/omelasticsearch/omelasticsearch.c | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1143de06..0b2e0bf1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h for more specifics! * * Copyright 2011 Nathan Scott. - * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -70,8 +70,10 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -86,6 +88,8 @@ static struct cnfparamdescr actpdescr[] = { { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "asyncrepl", eCmdHdlrBinary, 0 }, + { "timeout", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -149,8 +153,21 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) else searchType = pData->searchType; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + if(pData->asyncRepl) { + if(pData->timeout != NULL) { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async&timeout=%s", + pData->server, pData->port, searchIndex, searchType, + pData->timeout); + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async", + pData->server, pData->port, searchIndex, searchType); + } + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); return RS_RET_OK; @@ -262,8 +279,10 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->asyncRepl = 0; pData->tplName = NULL; } @@ -294,6 +313,10 @@ CODESTARTnewActInst pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "timeout")) { + pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { + pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { -- cgit v1.2.3 From 1acde66c3274c57d084146621d4124eaaef9ccf9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 16 Apr 2012 17:46:46 +0200 Subject: make "old" omelasticsearch compile this module is to be replaced by branch currently in development. But I want to be able to compile the previous one, at least ;) --- plugins/omelasticsearch/omelasticsearch.c | 1 - 1 file changed, 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 3bec1838..2b451eb4 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include -- cgit v1.2.3 From 54966fd878bcf1c52019e0ec977da4d7b0a9f52a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Apr 2012 15:05:01 +0200 Subject: omelasticsearch: provide authentication support (UNTESTED) --- plugins/omelasticsearch/omelasticsearch.c | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 79a24ffe..c20c67c1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -66,6 +66,8 @@ typedef struct curl_slist HEADER; typedef struct _instanceData { uchar *server; int port; + uchar *uid; + uchar *pwd; uchar *searchIndex; uchar *searchType; uchar *tplName; @@ -83,6 +85,8 @@ typedef struct _instanceData { static struct cnfparamdescr actpdescr[] = { { "server", eCmdHdlrGetWord, 0 }, { "serverport", eCmdHdlrInt, 0 }, + { "uid", eCmdHdlrGetWord, 0 }, + { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, @@ -118,6 +122,8 @@ CODESTARTfreeInstance pData->curlHandle = NULL; } free(pData->server); + free(pData->uid); + free(pData->pwd); free(pData->searchIndex); free(pData->searchType); free(pData->tplName); @@ -125,6 +131,7 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo + dbgprintf("omelasticsearch, target server %s", pData->server); ENDdbgPrintInstInfo BEGINtryResume @@ -136,6 +143,7 @@ static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + char authBuf[1024]; uchar *searchIndex; uchar *searchType; @@ -168,6 +176,14 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) pData->server, pData->port, searchIndex, searchType); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + if(pData->uid != NULL) { + snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + //TODO: create better code, check errors! + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + } DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); return RS_RET_OK; } @@ -276,6 +292,8 @@ setInstParamDefaults(instanceData *pData) { pData->server = NULL; pData->port = 9200; + pData->uid = NULL; + pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; pData->timeout = NULL; @@ -304,6 +322,10 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "uid")) { + pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "pwd")) { + pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { @@ -324,6 +346,12 @@ CODESTARTnewActInst } } + if(pData->pwd != NULL && pData->uid == NULL) { + errmsg.LogError(0, RS_RET_UID_MISSING, + "omelasticsearch: password is provided, but no uid " + "- action definition invalid"); + ABORT_FINALIZE(RS_RET_UID_MISSING); + } if(pData->dynSrchIdx && pData->searchIndex == NULL) { errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omelasticsearch: requested dynamic search index, but no " -- cgit v1.2.3 From a4c0d08c78ef27891b192973174b997a0c7c4aa1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 24 May 2012 16:39:39 +0200 Subject: omelasticsearch: added transactional interface & better debug output --- plugins/omelasticsearch/omelasticsearch.c | 37 ++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index c20c67c1..7642d603 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -74,6 +74,7 @@ typedef struct _instanceData { uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool bulkmode; sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ @@ -91,6 +92,7 @@ static struct cnfparamdescr actpdescr[] = { { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } @@ -131,7 +133,19 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo - dbgprintf("omelasticsearch, target server %s", pData->server); + dbgprintf("omelasticsearch\n"); + dbgprintf("\ttemplate='%s'\n", pData->tplName); + dbgprintf("\tserver='%s'\n", pData->server); + dbgprintf("\tserverport=%d\n", pData->port); + dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); + dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tsearch index='%s'\n", pData->searchIndex); + dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\ttimeout='%s'\n", pData->timeout); + dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); + dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tasync replication=%d\n", pData->asyncRepl); + dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo BEGINtryResume @@ -218,12 +232,30 @@ finalize_it: RETiRet; } +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omelasticsearch: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction + +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("elasticsearch: endTransaction\n"); +#if 0 + if(pData->offsSndBuf != 0) { + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + pData->offsSndBuf = 0; + } +#endif +ENDendTransaction + /* elasticsearch POST result string ... useful for debugging */ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) @@ -334,6 +366,8 @@ CODESTARTnewActInst pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { + pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { @@ -434,6 +468,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt -- cgit v1.2.3 From c7ca67a37586164a028c52a4d0cd9328b09e8697 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 May 2012 08:03:19 +0200 Subject: omelasticsearch: test commit, first shot at bulk interface This obviously does not work correctly. So expect problems if you set bulkmode="on". --- plugins/omelasticsearch/omelasticsearch.c | 96 +++++++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 11 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 7642d603..1705c605 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -76,6 +76,11 @@ typedef struct _instanceData { sbool dynSrchType; sbool bulkmode; sbool asyncRepl; + struct { + es_str_t *data; + uchar *currTpl1; + uchar *currTpl2; + } batch; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -160,7 +165,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) char authBuf[1024]; uchar *searchIndex; uchar *searchType; + uchar *bulkpart; + bulkpart = (pData->bulkmode) ? "/_bulk" : ""; if(pData->dynSrchIdx) { searchIndex = tpl1; if(pData->dynSrchType) @@ -176,18 +183,19 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) } if(pData->asyncRepl) { if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async&timeout=%s", pData->server, pData->port, searchIndex, searchType, - pData->timeout); + bulkpart, pData->timeout); } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async", - pData->server, pData->port, searchIndex, searchType); + pData->server, pData->port, searchIndex, searchType, + bulkpart); } } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", + pData->server, pData->port, searchIndex, searchType, bulkpart); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -202,12 +210,44 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) return RS_RET_OK; } + +/* this method does not directly submit but builds a batch instead. It + * may submit, if we have dynamic index/type and the current type or + * index changes. + */ +static rsRetVal +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +{ + int length = strlen((char *)message); + int r; + DEFiRet; + +#if 0 + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); +#endif + + //if(pData->batch.currTpl1 == NULL + r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); + + if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r != 0) { + DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); + ABORT_FINALIZE(RS_RET_ERR); + } + iRet = RS_RET_DEFER_COMMIT; + +finalize_it: + RETiRet; +} + static rsRetVal -curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; - int length = strlen((char *)message); DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType) @@ -215,8 +255,9 @@ curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); +dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -235,25 +276,43 @@ finalize_it: BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("omelasticsearch: beginTransaction\n"); + if(!pData->bulkmode) { + FINALIZE; + } + + es_emptyStr(pData->batch.data); +finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); + if(pData->bulkmode) { + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + } else { + CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + ppString[1], ppString[2])); + } finalize_it: +dbgprintf("omelasticsearch: result doAction: %d\n", iRet); ENDdoAction BEGINendTransaction + char *cstr; CODESTARTendTransaction -dbgprintf("elasticsearch: endTransaction\n"); + cstr = es_str2cstr(pData->batch.data, NULL); + dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), + NULL, NULL)); #if 0 if(pData->offsSndBuf != 0) { iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); pData->offsSndBuf = 0; } #endif +finalize_it: + free(cstr); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -266,6 +325,11 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) static char ok[] = "{\"ok\":true,"; ASSERT(size == 1); +DBGPRINTF("omelasticsearch request: %s\n", jsonData); +DBGPRINTF("omelasticsearch result: "); +for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); +DBGPRINTF("\n"); if (size == 1 && nmemb > sizeof(ok)-1 && @@ -399,6 +463,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); } + if(pData->bulkmode) { + pData->batch.currTpl1 = NULL; + pData->batch.currTpl2 = NULL; + if((pData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; -- cgit v1.2.3 From 5f85909b17920172121d2ff8367c8185623f1409 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 May 2012 18:55:29 +0200 Subject: omelasticsearc: milestone, bulk insert basically works dynamic index&type is not yet used, but easy to add (did not manage to get it in today...) --- plugins/omelasticsearch/omelasticsearch.c | 115 +++++++++++++++++------------- 1 file changed, 65 insertions(+), 50 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1705c605..704c9950 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -158,46 +158,67 @@ CODESTARTtryResume ENDtryResume +/* get the current index and type for this message */ +static inline void +getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, + uchar **srchType) +{ + if(pData->dynSrchIdx) { + *srchIndex = tpl1; + if(pData->dynSrchType) + *srchType = tpl2; + else + *srchType = pData->searchType; + } else { + *srchIndex = pData->searchIndex; + if(pData->dynSrchType) + *srchType = tpl1; + else + *srchType = pData->searchType; + } +} + + static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ char authBuf[1024]; + char portBuf[64]; + char *restURL; uchar *searchIndex; uchar *searchType; - uchar *bulkpart; + es_str_t *url; + int r; - bulkpart = (pData->bulkmode) ? "/_bulk" : ""; - if(pData->dynSrchIdx) { - searchIndex = tpl1; - if(pData->dynSrchType) - searchType = tpl2; - else - searchType = pData->searchType; + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + + r = es_addBuf(&url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(&url, ':'); + if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(&url, '/'); + if(pData->bulkmode) { + if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - searchIndex = pData->searchIndex; - if(pData->dynSrchType) - searchType = tpl1; - else - searchType = pData->searchType; + if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + if(r == 0) r = es_addChar(&url, '/'); + if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } + if(r == 0) r = es_addChar(&url, '?'); if(pData->asyncRepl) { - if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async&timeout=%s", - pData->server, pData->port, searchIndex, searchType, - bulkpart, pData->timeout); - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async", - pData->server, pData->port, searchIndex, searchType, - bulkpart); - } - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", - pData->server, pData->port, searchIndex, searchType, bulkpart); + if(r == 0) r = es_addBuf(&url, "replication=async&", + sizeof("replication=async&")-1); } + if(pData->timeout != NULL) { + if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + } + restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + es_deleteStr(url); + free(restURL); if(pData->uid != NULL) { snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -221,18 +242,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) int length = strlen((char *)message); int r; DEFiRet; - -#if 0 - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); -#endif - - //if(pData->batch.currTpl1 == NULL - r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); +# define META_STRT "{\"index\":{\"_index\": \"" +# define META_TYPE "\",\"_type\":\"" +# define META_END "\"}}\n" + +#warning TODO: use dynamic index/type! + r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, + ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, + ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); - - if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); @@ -257,7 +280,6 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); -dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -303,14 +325,7 @@ BEGINendTransaction CODESTARTendTransaction cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), - NULL, NULL)); -#if 0 - if(pData->offsSndBuf != 0) { - iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); - pData->offsSndBuf = 0; - } -#endif + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); ENDendTransaction @@ -369,8 +384,8 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { - /* in this case, we know no tpls are involved --> NULL OK! */ + if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + /* in this case, we know no tpls are involved in the request-->NULL OK! */ setCurlURL(pData, NULL, NULL); } -- cgit v1.2.3 From 0314f370a4c15ce2190febfccee6664ecccab788 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 May 2012 15:14:16 +0200 Subject: omelasticsearch: dyn index&type now also supported in bulk mode --- plugins/omelasticsearch/omelasticsearch.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 704c9950..a09851b1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -241,18 +241,21 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) { int length = strlen((char *)message); int r; + uchar *searchIndex; + uchar *searchType; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_END "\"}}\n" #warning TODO: use dynamic index/type! + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, - ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, - ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + ustrlen(searchType)); if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); -- cgit v1.2.3 From b0a3e85e102c3e549574bf8f418ca643109f2884 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 13 Jun 2012 12:49:25 +0200 Subject: omelasticsearch: bugfix: bulkmode setting was not properly initialized --- plugins/omelasticsearch/omelasticsearch.c | 1 + 1 file changed, 1 insertion(+) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a09851b1..d8db7307 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -414,6 +414,7 @@ setInstParamDefaults(instanceData *pData) pData->dynSrchIdx = 0; pData->dynSrchType = 0; pData->asyncRepl = 0; + pData->bulkmode = 0; pData->tplName = NULL; } -- cgit v1.2.3 From ebaf375ed108b14c5a5a3af62067df988506bfec Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 9 Jul 2012 17:28:40 +0200 Subject: omelasticsearch: better debug instrumentation --- plugins/omelasticsearch/omelasticsearch.c | 48 +++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 15 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index d8db7307..c18c1c52 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -143,7 +143,7 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tserver='%s'\n", pData->server); dbgprintf("\tserverport=%d\n", pData->port); dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); - dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); dbgprintf("\ttimeout='%s'\n", pData->timeout); @@ -155,6 +155,7 @@ ENDdbgPrintInstInfo BEGINtryResume CODESTARTtryResume + DBGPRINTF("omelasticsearch: tryResume called\n"); ENDtryResume @@ -188,7 +189,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) uchar *searchIndex; uchar *searchType; es_str_t *url; + int rLocal; int r; + DEFiRet; getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); url = es_newStr(128); @@ -218,17 +221,23 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); es_deleteStr(url); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); free(restURL); if(pData->uid != NULL) { - snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, - (pData->pwd == NULL) ? "" : (char*)pData->pwd); - //TODO: create better code, check errors! + rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + if(rLocal != (int) es_strlen(url)) { + errmsg.LogError(0, RS_RET_ERR, "omelasticsearch: snprintf failed " + "when trying to build auth string (return %d)\n", + rLocal); + ABORT_FINALIZE(RS_RET_ERR); + } curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - return RS_RET_OK; +finalize_it: + RETiRet; } @@ -248,7 +257,6 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) # define META_TYPE "\",\"_type\":\"" # define META_END "\"}}\n" -#warning TODO: use dynamic index/type! getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, @@ -282,13 +290,18 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); +dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); +DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_CONNECT: case CURLE_WRITE_ERROR: STATSCOUNTER_INC(indexConFail, mutIndexConFail); + DBGPRINTF("omelasticsearch: we are suspending ourselfs due " + "to failure %lld of curl_easy_perform()\n", + (long long) code); return RS_RET_SUSPENDED; default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); @@ -315,22 +328,25 @@ CODESTARTdoAction if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); } else { +dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString[1], ppString[2])); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d\n", iRet); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); ENDdoAction BEGINendTransaction char *cstr; CODESTARTendTransaction +dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); - dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); +dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -353,11 +369,13 @@ DBGPRINTF("\n"); nmemb > sizeof(ok)-1 && strncmp(p, ok, sizeof(ok)-1) == 0) { STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); +dbgprintf("omelasticsearch ok\n"); } else { +dbgprintf("omelasticsearch fail\n"); STATSCOUNTER_INC(indexFailed, mutIndexFailed); if (Debug) { - DBGPRINTF("omelasticsearch request: %s\n", jsonData); - DBGPRINTF("omelasticsearch result: "); + DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); + DBGPRINTF("omelasticsearch (fail) result: "); for (i = 0; i < nmemb; i++) DBGPRINTF("%c", p[i]); DBGPRINTF("\n"); @@ -470,16 +488,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_UID_MISSING); } if(pData->dynSrchIdx && pData->searchIndex == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search index, but no " "name for index template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->dynSrchType && pData->searchType == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search type, but no " "name for type template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->bulkmode) { -- cgit v1.2.3 From 4d453967cbff1f09becab38a2ad10b05df476eaf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Jul 2012 12:10:34 +0200 Subject: omelasticsearch: implement retry via request to es server homepage --- plugins/omelasticsearch/omelasticsearch.c | 70 ++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 10 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index c18c1c52..a1f3b8ab 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -153,9 +153,66 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo + +/* Build basic URL part, which includes hostname and port as follows: + * http://hostname:port/ + * Newly creates an estr for this purpose. + */ +static rsRetVal +setBaseURL(instanceData *pData, es_str_t **url) +{ + char portBuf[64]; + int r; + DEFiRet; + + *url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + r = es_addBuf(url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(url, ':'); + if(r == 0) r = es_addBuf(url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(url, '/'); + RETiRet; +} + + +static inline rsRetVal +checkConn(instanceData *pData) +{ + es_str_t *url; + CURL *curl = NULL; + CURLcode res; + char *cstr; + DEFiRet; + + setBaseURL(pData, &url); + curl = curl_easy_init(); + if(curl == NULL) { + DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + cstr = es_str2cstr(url, NULL); + curl_easy_setopt(curl, CURLOPT_URL, cstr); + free(cstr); + + res = curl_easy_perform(curl); + if(res != CURLE_OK) { + dbgprintf("omelasticsearch: checkConn() curl_easy_perform() " + "failed: %s\n", curl_easy_strerror(res)); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(curl != NULL) + curl_easy_cleanup(curl); + RETiRet; +} + + BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); + iRet = checkConn(pData); ENDtryResume @@ -184,7 +241,6 @@ static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { char authBuf[1024]; - char portBuf[64]; char *restURL; uchar *searchIndex; uchar *searchType; @@ -194,18 +250,12 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) DEFiRet; getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); - url = es_newStr(128); - snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + setBaseURL(pData, &url); - r = es_addBuf(&url, "http://", sizeof("http://")-1); - if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); - if(r == 0) r = es_addChar(&url, ':'); - if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); - if(r == 0) r = es_addChar(&url, '/'); if(pData->bulkmode) { - if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } -- cgit v1.2.3 From 68056a6128b9ebc8d65791b2647030d36c73f014 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Jul 2012 17:15:03 +0200 Subject: omelasticsearch: support for parameters parent & dynparent added --- plugins/omelasticsearch/omelasticsearch.c | 115 ++++++++++++++++++++++++------ 1 file changed, 95 insertions(+), 20 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a1f3b8ab..5ddb66da 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -70,10 +70,12 @@ typedef struct _instanceData { uchar *pwd; uchar *searchIndex; uchar *searchType; + uchar *parent; uchar *tplName; uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool dynParent; sbool bulkmode; sbool asyncRepl; struct { @@ -95,8 +97,10 @@ static struct cnfparamdescr actpdescr[] = { { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "parent", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "dynparent", eCmdHdlrBinary, 0 }, { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, @@ -133,6 +137,7 @@ CODESTARTfreeInstance free(pData->pwd); free(pData->searchIndex); free(pData->searchType); + free(pData->parent); free(pData->tplName); ENDfreeInstance @@ -146,9 +151,11 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\tparent='%s'\n", pData->parent); dbgprintf("\ttimeout='%s'\n", pData->timeout); dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo @@ -197,10 +204,11 @@ checkConn(instanceData *pData) res = curl_easy_perform(curl); if(res != CURLE_OK) { - dbgprintf("omelasticsearch: checkConn() curl_easy_perform() " + DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: if(curl != NULL) @@ -218,38 +226,62 @@ ENDtryResume /* get the current index and type for this message */ static inline void -getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, - uchar **srchType) +getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3, + uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { *srchIndex = tpl1; - if(pData->dynSrchType) + if(pData->dynSrchType) { *srchType = tpl2; - else + if(pData->dynParent) { + *parent = tpl3; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpl2; + } else { + *parent = pData->parent; + } + } } else { *srchIndex = pData->searchIndex; - if(pData->dynSrchType) + if(pData->dynSrchType) { *srchType = tpl1; - else + if(pData->dynParent) { + *parent = tpl2; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpl1; + } else { + *parent = pData->parent; + } + } } } static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) { char authBuf[1024]; char *restURL; uchar *searchIndex; uchar *searchType; + uchar *parent; es_str_t *url; int rLocal; int r; DEFiRet; - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, + &searchIndex, &searchType, &parent); setBaseURL(pData, &url); if(pData->bulkmode) { @@ -267,6 +299,11 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) if(pData->timeout != NULL) { if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + if(r == 0) r = es_addChar(&url, '&'); + } + if(parent != NULL) { + if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); + if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -296,24 +333,29 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3) { int length = strlen((char *)message); int r; uchar *searchIndex; uchar *searchType; + uchar *parent; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" +# define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, + &searchIndex, &searchType, &parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -328,14 +370,15 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, + uchar *tpl1, uchar *tpl2, uchar *tpl3) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); + if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) + CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -376,11 +419,11 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3])); } else { dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2])); + ppString[1], ppString[2], ppString[3])); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -393,7 +436,7 @@ CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL)); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -455,13 +498,14 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + if( pData->bulkmode + || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL); + setCurlURL(pData, NULL, NULL, NULL); } if(Debug) { - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0) dbgprintf("omelasticsearch setup, using static REST URL\n"); else dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); @@ -478,9 +522,11 @@ setInstParamDefaults(instanceData *pData) pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; + pData->parent = NULL; pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->dynParent = 0; pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; @@ -513,10 +559,14 @@ CODESTARTnewActInst pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "parent")) { + pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynparent")) { + pData->dynParent = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { @@ -549,6 +599,12 @@ CODESTARTnewActInst "name for type template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynParent && pData->parent == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic parent, but no " + "name for parent template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -563,6 +619,7 @@ CODESTARTnewActInst iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; + if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) @@ -581,11 +638,29 @@ CODESTARTnewActInst if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } -- cgit v1.2.3 From fdbc4cb666b4fc92562ece1fba97227c40237e04 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 11 Jul 2012 17:12:34 +0200 Subject: omelasticsearch: regression from "parent" feature could case aborts this was not present in any released version --- plugins/omelasticsearch/omelasticsearch.c | 49 ++++++++++++++++--------------- 1 file changed, 26 insertions(+), 23 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 5ddb66da..f77caeca 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -226,22 +226,22 @@ ENDtryResume /* get the current index and type for this message */ static inline void -getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3, +getIndexTypeAndParent(instanceData *pData, uchar **tpls, uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { - *srchIndex = tpl1; + *srchIndex = tpls[1]; if(pData->dynSrchType) { - *srchType = tpl2; + *srchType = tpls[2]; if(pData->dynParent) { - *parent = tpl3; + *parent = tpls[3]; } else { *parent = pData->parent; } } else { *srchType = pData->searchType; if(pData->dynParent) { - *parent = tpl2; + *parent = tpls[2]; } else { *parent = pData->parent; } @@ -249,16 +249,16 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3 } else { *srchIndex = pData->searchIndex; if(pData->dynSrchType) { - *srchType = tpl1; + *srchType = tpls[1]; if(pData->dynParent) { - *parent = tpl2; + *parent = tpls[2]; } else { *parent = pData->parent; } } else { *srchType = pData->searchType; if(pData->dynParent) { - *parent = tpl1; + *parent = tpls[1]; } else { *parent = pData->parent; } @@ -268,7 +268,7 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3 static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) +setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; char *restURL; @@ -280,13 +280,13 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) int r; DEFiRet; - getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, - &searchIndex, &searchType, &parent); setBaseURL(pData, &url); if(pData->bulkmode) { r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + parent = NULL; } else { + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -333,7 +333,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3) +buildBatch(instanceData *pData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -346,16 +346,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar # define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, - &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); +dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); +dbgprintf("AAA: searchType: '%s'\n", searchType); +dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(parent != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -370,15 +374,14 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, - uchar *tpl1, uchar *tpl2, uchar *tpl3) +curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3)); + CHKiRet(setCurlURL(instance, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -419,11 +422,11 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3])); + CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2], ppString[3])); + ppString)); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -436,7 +439,7 @@ CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL)); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -501,7 +504,7 @@ curlSetup(instanceData *pData) if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL, NULL); + setCurlURL(pData, NULL); } if(Debug) { -- cgit v1.2.3