From f8019d52f83884acb5e8f8755fe976d1592b4ccb Mon Sep 17 00:00:00 2001 From: Nathan Scott Date: Mon, 15 Aug 2011 22:22:13 +1000 Subject: add elasticsearch output module Add support for sending events to elasticsearch - a distributed, RESTful, search engine built on Lucene (www.elasticsearch.org). The output module is enabled via a configure option, and uses libcurl to send the messages from rsyslog to elasticsearch. This patch makes use of the earlier JSON quoting patch to ensure valid JSON strings are sent to the server. Signed-off-by: Nathan Scott --- plugins/omelasticsearch/omelasticsearch.c | 271 ++++++++++++++++++++++++++++++ 1 file changed, 271 insertions(+) create mode 100644 plugins/omelasticsearch/omelasticsearch.c (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c new file mode 100644 index 00000000..ce2e0c1f --- /dev/null +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -0,0 +1,271 @@ +/* omelasticsearch.c + * This is the http://www.elasticsearch.org/ output module. + * + * NOTE: read comments in module-template.h for more specifics! + * + * Copyright 2011 Nathan Scott. + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "statsobj.h" +#include "cfsysline.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP + +/* internal structures */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) + +statsobj_t *indexStats; +STATSCOUNTER_DEF(indexConFail, mutIndexConFail) +STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) +STATSCOUNTER_DEF(indexFailed, mutIndexFailed) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) + +/* REST API for elasticsearch hits this URL: + * http://:// + */ +typedef struct curl_slist HEADER; +typedef struct _instanceData { + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ +} instanceData; + +/* config variables */ +static int restPort = 9200; +static char *hostName = "localhost"; +static char *searchIndex = "system"; +static char *searchType = "events"; + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +BEGINfreeInstance +CODESTARTfreeInstance + if (pData->postHeader) { + curl_slist_free_all(pData->postHeader); + pData->postHeader = NULL; + } + if (pData->curlHandle) { + curl_easy_cleanup(pData->curlHandle); + pData->curlHandle = NULL; + } +ENDfreeInstance + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + +rsRetVal +curlPost(instanceData *instance, uchar *message) +{ + CURLcode code; + CURL *curl = instance->curlHandle; + int length = strlen((char *)message); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + code = curl_easy_perform(curl); + switch (code) { + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_CONNECT: + case CURLE_WRITE_ERROR: + STATSCOUNTER_INC(indexConFail, mutIndexConFail); + return RS_RET_SUSPENDED; + default: + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + return RS_RET_OK; + } +} + +BEGINdoAction +CODESTARTdoAction + CHKiRet(curlPost(pData, ppString[0])); +finalize_it: +ENDdoAction + +/* elasticsearch POST result string ... useful for debugging */ +size_t +curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) +{ + unsigned int i; + char *p = (char *)ptr; + char *jsonData = (char *)userdata; + static char ok[] = "{\"ok\":true,"; + + ASSERT(size == 1); + + if (size == 1 && + nmemb > sizeof(ok)-1 && + strncmp(p, ok, sizeof(ok)-1) == 0) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else { + STATSCOUNTER_INC(indexFailed, mutIndexFailed); + if (Debug) { + DBGPRINTF("omelasticsearch request: %s\n", jsonData); + DBGPRINTF("omelasticsearch result: "); + for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); + DBGPRINTF("\n"); + } + } + return size * nmemb; +} + +static rsRetVal +curlSetup(instanceData *instance) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + HEADER *header; + CURL *handle; + + handle = curl_easy_init(); + if (handle == NULL) { + return RS_RET_OBJ_CREATION_FAILED; + } + + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + hostName, restPort, searchIndex, searchType); + header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_URL, restURL); + curl_easy_setopt(handle, CURLOPT_POST, 1); + + instance->curlHandle = handle; + instance->postHeader = header; + + DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + return RS_RET_OK; +} + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + + /* check if a non-standard template is to be applied */ + if(*(p-1) == ';') + --p; + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + + /* all good, we can now initialise our private data */ + CHKiRet(curlSetup(pData)); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINmodExit +CODESTARTmodExit + curl_global_cleanup(); + statsobj.Destruct(&indexStats); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + DEFiRet; + restPort = 9200; + hostName = "localhost"; + searchIndex = "system"; + searchType = "events"; + RETiRet; +} + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); + ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&indexStats)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", + ctrType_IntCtr, &indexConFail)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + ctrType_IntCtr, &indexSubmit)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", + ctrType_IntCtr, &indexFailed)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", + ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.ConstructFinalize(indexStats)); +ENDmodInit + +/* vi:set ai: + */ -- cgit v1.2.3 From 9911dacf59e898bae2c8c6619b85348bf54ddc23 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 17 Jan 2012 10:42:04 +0100 Subject: elasticsearch: move to asl 2.0 email conversation Nathan/Rainer 2012-01-16&17 --- plugins/omelasticsearch/omelasticsearch.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index ce2e0c1f..3bec1838 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -8,20 +8,19 @@ * * This file is part of rsyslog. * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see . - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include "config.h" #include "rsyslog.h" -- cgit v1.2.3 From 24179166f748464f4d556a80bd0c8dcc4cbd57b0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 5 Apr 2012 17:26:16 +0200 Subject: omelasticsearch: moved to v6 config system --- plugins/omelasticsearch/omelasticsearch.c | 126 +++++++++++++++++++++--------- 1 file changed, 91 insertions(+), 35 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 3bec1838..8965d40b 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -29,7 +29,7 @@ #include #include #include -#include +//#include #include #include #include @@ -46,6 +46,7 @@ MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omelasticsearch") /* internal structures */ DEF_OMOD_STATIC_DATA @@ -63,15 +64,30 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { + uchar *server; + int port; + uchar *searchIndex; + uchar *searchType; + uchar *tplName; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; -/* config variables */ -static int restPort = 9200; -static char *hostName = "localhost"; -static char *searchIndex = "system"; -static char *searchType = "events"; + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "server", eCmdHdlrGetWord, 0 }, + { "serverport", eCmdHdlrInt, 0 }, + { "searchindex", eCmdHdlrGetWord, 0 }, + { "searchtype", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; BEGINcreateInstance CODESTARTcreateInstance @@ -93,6 +109,10 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + free(pData->server); + free(pData->searchIndex); + free(pData->searchType); + free(pData->tplName); ENDfreeInstance BEGINdbgPrintInstInfo @@ -174,7 +194,7 @@ curlSetup(instanceData *instance) } snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - hostName, restPort, searchIndex, searchType); + instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); @@ -189,25 +209,77 @@ curlSetup(instanceData *instance) return RS_RET_OK; } -BEGINparseSelectorAct -CODESTARTparseSelectorAct -CODE_STD_STRING_REQUESTparseSelectorAct(1) - if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { - ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->server = NULL; + pData->port = 9200; + pData->searchIndex = NULL; + pData->searchType = NULL; + pData->tplName = NULL; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } - p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + CODE_STD_STRING_REQUESTparseSelectorAct(1) + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "server")) { + pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "serverport")) { + pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("omelasticsearch: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + " StdJSONFmt" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); - /* check if a non-standard template is to be applied */ - if(*(p-1) == ';') - --p; - CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + if(pData->server == NULL) + pData->server = (uchar*) strdup("localhost"); + if(pData->searchIndex == NULL) + pData->searchIndex = (uchar*) strdup("system"); + if(pData->searchType == NULL) + pData->searchType = (uchar*) strdup("events"); - /* all good, we can now initialise our private data */ +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); CHKiRet(curlSetup(pData)); +ENDnewActInst + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch supports only v6 config format, use: " + "action(type=\"omelasticsearch\" server=...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct + BEGINmodExit CODESTARTmodExit curl_global_cleanup(); @@ -220,18 +292,9 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt -static rsRetVal -resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) -{ - DEFiRet; - restPort = 9200; - hostName = "localhost"; - searchIndex = "system"; - searchType = "events"; - RETiRet; -} BEGINmodInit() CODESTARTmodInit @@ -240,13 +303,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); - /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); - if (curl_global_init(CURL_GLOBAL_ALL) != 0) { errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); -- cgit v1.2.3 From f9bb245a4ae06151897a4a68d6c22ba88a6ecee8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 13 Apr 2012 12:12:39 +0200 Subject: omelasticsearch: permit dynamic index/type parameters (just like dynafile) --- plugins/omelasticsearch/omelasticsearch.c | 123 ++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 16 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 8965d40b..1143de06 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -43,6 +43,7 @@ #include "errmsg.h" #include "statsobj.h" #include "cfsysline.h" +#include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -69,6 +70,8 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + sbool dynSrchIdx; + sbool dynSrchType; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -81,6 +84,8 @@ static struct cnfparamdescr actpdescr[] = { { "serverport", eCmdHdlrInt, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "dynsearchindex", eCmdHdlrBinary, 0 }, + { "dynsearchtype", eCmdHdlrBinary, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -123,12 +128,44 @@ BEGINtryResume CODESTARTtryResume ENDtryResume -rsRetVal -curlPost(instanceData *instance, uchar *message) + +static rsRetVal +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + uchar *searchIndex; + uchar *searchType; + + if(pData->dynSrchIdx) { + searchIndex = tpl1; + if(pData->dynSrchType) + searchType = tpl2; + else + searchType = pData->searchType; + } else { + searchIndex = pData->searchIndex; + if(pData->dynSrchType) + searchType = tpl1; + else + searchType = pData->searchType; + } + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); + return RS_RET_OK; +} + +static rsRetVal +curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; int length = strlen((char *)message); + DEFiRet; + + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -145,11 +182,13 @@ curlPost(instanceData *instance, uchar *message) STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); return RS_RET_OK; } +finalize_it: + RETiRet; } BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0])); + CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction @@ -181,10 +220,10 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) return size * nmemb; } + static rsRetVal -curlSetup(instanceData *instance) +curlSetup(instanceData *pData) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ HEADER *header; CURL *handle; @@ -193,19 +232,26 @@ curlSetup(instanceData *instance) return RS_RET_OBJ_CREATION_FAILED; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); - curl_easy_setopt(handle, CURLOPT_URL, restURL); curl_easy_setopt(handle, CURLOPT_POST, 1); - instance->curlHandle = handle; - instance->postHeader = header; + pData->curlHandle = handle; + pData->postHeader = header; - DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { + /* in this case, we know no tpls are involved --> NULL OK! */ + setCurlURL(pData, NULL, NULL); + } + + if(Debug) { + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + dbgprintf("omelasticsearch setup, using static REST URL\n"); + else + dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); + } return RS_RET_OK; } @@ -216,12 +262,15 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->dynSrchIdx = 0; + pData->dynSrchType = 0; pData->tplName = NULL; } BEGINnewActInst struct cnfparamvals *pvals; int i; + int iNumTpls; CODESTARTnewActInst if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); @@ -230,7 +279,6 @@ CODESTARTnewActInst CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); - CODE_STD_STRING_REQUESTparseSelectorAct(1) for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; @@ -238,10 +286,14 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; - } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { + pData->dynSrchIdx = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { + pData->dynSrchType = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { @@ -249,11 +301,49 @@ CODESTARTnewActInst "param '%s'\n", actpblk.descr[i].name); } } + + if(pData->dynSrchIdx && pData->searchIndex == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search index, but no " + "name for index template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + if(pData->dynSrchType && pData->searchType == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search type, but no " + "name for type template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + + iNumTpls = 1; + if(pData->dynSrchIdx) ++iNumTpls; + if(pData->dynSrchType) ++iNumTpls; + DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); + CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); + + /* we need to request additional templates. If we have a dynamic search index, + * it will always be string 1. Type may be 1 or 2, depending on whether search + * index is dynamic as well. Rule needs to be followed throughout the module. + */ + if(pData->dynSrchIdx) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } + if(pData->server == NULL) pData->server = (uchar*) strdup("localhost"); if(pData->searchIndex == NULL) @@ -261,9 +351,10 @@ CODESTARTnewActInst if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); + CHKiRet(curlSetup(pData)); + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); - CHKiRet(curlSetup(pData)); ENDnewActInst -- cgit v1.2.3 From 7d8ac48a4a214da917f7455f24b4c292c3e6d015 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 16 Apr 2012 16:49:07 +0200 Subject: omelasticsearch: added asnycRepl and timeout config params --- plugins/omelasticsearch/omelasticsearch.c | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1143de06..0b2e0bf1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h for more specifics! * * Copyright 2011 Nathan Scott. - * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -70,8 +70,10 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -86,6 +88,8 @@ static struct cnfparamdescr actpdescr[] = { { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "asyncrepl", eCmdHdlrBinary, 0 }, + { "timeout", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -149,8 +153,21 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) else searchType = pData->searchType; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + if(pData->asyncRepl) { + if(pData->timeout != NULL) { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async&timeout=%s", + pData->server, pData->port, searchIndex, searchType, + pData->timeout); + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async", + pData->server, pData->port, searchIndex, searchType); + } + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); return RS_RET_OK; @@ -262,8 +279,10 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->asyncRepl = 0; pData->tplName = NULL; } @@ -294,6 +313,10 @@ CODESTARTnewActInst pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "timeout")) { + pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { + pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { -- cgit v1.2.3 From 1acde66c3274c57d084146621d4124eaaef9ccf9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 16 Apr 2012 17:46:46 +0200 Subject: make "old" omelasticsearch compile this module is to be replaced by branch currently in development. But I want to be able to compile the previous one, at least ;) --- plugins/omelasticsearch/omelasticsearch.c | 1 - 1 file changed, 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 3bec1838..2b451eb4 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include -- cgit v1.2.3 From 54966fd878bcf1c52019e0ec977da4d7b0a9f52a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Apr 2012 15:05:01 +0200 Subject: omelasticsearch: provide authentication support (UNTESTED) --- plugins/omelasticsearch/omelasticsearch.c | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 79a24ffe..c20c67c1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -66,6 +66,8 @@ typedef struct curl_slist HEADER; typedef struct _instanceData { uchar *server; int port; + uchar *uid; + uchar *pwd; uchar *searchIndex; uchar *searchType; uchar *tplName; @@ -83,6 +85,8 @@ typedef struct _instanceData { static struct cnfparamdescr actpdescr[] = { { "server", eCmdHdlrGetWord, 0 }, { "serverport", eCmdHdlrInt, 0 }, + { "uid", eCmdHdlrGetWord, 0 }, + { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, @@ -118,6 +122,8 @@ CODESTARTfreeInstance pData->curlHandle = NULL; } free(pData->server); + free(pData->uid); + free(pData->pwd); free(pData->searchIndex); free(pData->searchType); free(pData->tplName); @@ -125,6 +131,7 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo + dbgprintf("omelasticsearch, target server %s", pData->server); ENDdbgPrintInstInfo BEGINtryResume @@ -136,6 +143,7 @@ static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + char authBuf[1024]; uchar *searchIndex; uchar *searchType; @@ -168,6 +176,14 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) pData->server, pData->port, searchIndex, searchType); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + if(pData->uid != NULL) { + snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + //TODO: create better code, check errors! + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + } DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); return RS_RET_OK; } @@ -276,6 +292,8 @@ setInstParamDefaults(instanceData *pData) { pData->server = NULL; pData->port = 9200; + pData->uid = NULL; + pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; pData->timeout = NULL; @@ -304,6 +322,10 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "uid")) { + pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "pwd")) { + pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { @@ -324,6 +346,12 @@ CODESTARTnewActInst } } + if(pData->pwd != NULL && pData->uid == NULL) { + errmsg.LogError(0, RS_RET_UID_MISSING, + "omelasticsearch: password is provided, but no uid " + "- action definition invalid"); + ABORT_FINALIZE(RS_RET_UID_MISSING); + } if(pData->dynSrchIdx && pData->searchIndex == NULL) { errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omelasticsearch: requested dynamic search index, but no " -- cgit v1.2.3 From a4c0d08c78ef27891b192973174b997a0c7c4aa1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 24 May 2012 16:39:39 +0200 Subject: omelasticsearch: added transactional interface & better debug output --- plugins/omelasticsearch/omelasticsearch.c | 37 ++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index c20c67c1..7642d603 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -74,6 +74,7 @@ typedef struct _instanceData { uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool bulkmode; sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ @@ -91,6 +92,7 @@ static struct cnfparamdescr actpdescr[] = { { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } @@ -131,7 +133,19 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo - dbgprintf("omelasticsearch, target server %s", pData->server); + dbgprintf("omelasticsearch\n"); + dbgprintf("\ttemplate='%s'\n", pData->tplName); + dbgprintf("\tserver='%s'\n", pData->server); + dbgprintf("\tserverport=%d\n", pData->port); + dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); + dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tsearch index='%s'\n", pData->searchIndex); + dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\ttimeout='%s'\n", pData->timeout); + dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); + dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tasync replication=%d\n", pData->asyncRepl); + dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo BEGINtryResume @@ -218,12 +232,30 @@ finalize_it: RETiRet; } +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omelasticsearch: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction + +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("elasticsearch: endTransaction\n"); +#if 0 + if(pData->offsSndBuf != 0) { + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + pData->offsSndBuf = 0; + } +#endif +ENDendTransaction + /* elasticsearch POST result string ... useful for debugging */ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) @@ -334,6 +366,8 @@ CODESTARTnewActInst pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { + pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { @@ -434,6 +468,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt -- cgit v1.2.3 From c7ca67a37586164a028c52a4d0cd9328b09e8697 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 May 2012 08:03:19 +0200 Subject: omelasticsearch: test commit, first shot at bulk interface This obviously does not work correctly. So expect problems if you set bulkmode="on". --- plugins/omelasticsearch/omelasticsearch.c | 96 +++++++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 11 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 7642d603..1705c605 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -76,6 +76,11 @@ typedef struct _instanceData { sbool dynSrchType; sbool bulkmode; sbool asyncRepl; + struct { + es_str_t *data; + uchar *currTpl1; + uchar *currTpl2; + } batch; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -160,7 +165,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) char authBuf[1024]; uchar *searchIndex; uchar *searchType; + uchar *bulkpart; + bulkpart = (pData->bulkmode) ? "/_bulk" : ""; if(pData->dynSrchIdx) { searchIndex = tpl1; if(pData->dynSrchType) @@ -176,18 +183,19 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) } if(pData->asyncRepl) { if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async&timeout=%s", pData->server, pData->port, searchIndex, searchType, - pData->timeout); + bulkpart, pData->timeout); } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async", - pData->server, pData->port, searchIndex, searchType); + pData->server, pData->port, searchIndex, searchType, + bulkpart); } } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", + pData->server, pData->port, searchIndex, searchType, bulkpart); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -202,12 +210,44 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) return RS_RET_OK; } + +/* this method does not directly submit but builds a batch instead. It + * may submit, if we have dynamic index/type and the current type or + * index changes. + */ +static rsRetVal +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +{ + int length = strlen((char *)message); + int r; + DEFiRet; + +#if 0 + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); +#endif + + //if(pData->batch.currTpl1 == NULL + r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); + + if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r != 0) { + DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); + ABORT_FINALIZE(RS_RET_ERR); + } + iRet = RS_RET_DEFER_COMMIT; + +finalize_it: + RETiRet; +} + static rsRetVal -curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; - int length = strlen((char *)message); DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType) @@ -215,8 +255,9 @@ curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); +dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -235,25 +276,43 @@ finalize_it: BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("omelasticsearch: beginTransaction\n"); + if(!pData->bulkmode) { + FINALIZE; + } + + es_emptyStr(pData->batch.data); +finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); + if(pData->bulkmode) { + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + } else { + CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + ppString[1], ppString[2])); + } finalize_it: +dbgprintf("omelasticsearch: result doAction: %d\n", iRet); ENDdoAction BEGINendTransaction + char *cstr; CODESTARTendTransaction -dbgprintf("elasticsearch: endTransaction\n"); + cstr = es_str2cstr(pData->batch.data, NULL); + dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), + NULL, NULL)); #if 0 if(pData->offsSndBuf != 0) { iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); pData->offsSndBuf = 0; } #endif +finalize_it: + free(cstr); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -266,6 +325,11 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) static char ok[] = "{\"ok\":true,"; ASSERT(size == 1); +DBGPRINTF("omelasticsearch request: %s\n", jsonData); +DBGPRINTF("omelasticsearch result: "); +for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); +DBGPRINTF("\n"); if (size == 1 && nmemb > sizeof(ok)-1 && @@ -399,6 +463,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); } + if(pData->bulkmode) { + pData->batch.currTpl1 = NULL; + pData->batch.currTpl2 = NULL; + if((pData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; -- cgit v1.2.3 From 5f85909b17920172121d2ff8367c8185623f1409 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 May 2012 18:55:29 +0200 Subject: omelasticsearc: milestone, bulk insert basically works dynamic index&type is not yet used, but easy to add (did not manage to get it in today...) --- plugins/omelasticsearch/omelasticsearch.c | 115 +++++++++++++++++------------- 1 file changed, 65 insertions(+), 50 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1705c605..704c9950 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -158,46 +158,67 @@ CODESTARTtryResume ENDtryResume +/* get the current index and type for this message */ +static inline void +getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, + uchar **srchType) +{ + if(pData->dynSrchIdx) { + *srchIndex = tpl1; + if(pData->dynSrchType) + *srchType = tpl2; + else + *srchType = pData->searchType; + } else { + *srchIndex = pData->searchIndex; + if(pData->dynSrchType) + *srchType = tpl1; + else + *srchType = pData->searchType; + } +} + + static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ char authBuf[1024]; + char portBuf[64]; + char *restURL; uchar *searchIndex; uchar *searchType; - uchar *bulkpart; + es_str_t *url; + int r; - bulkpart = (pData->bulkmode) ? "/_bulk" : ""; - if(pData->dynSrchIdx) { - searchIndex = tpl1; - if(pData->dynSrchType) - searchType = tpl2; - else - searchType = pData->searchType; + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + + r = es_addBuf(&url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(&url, ':'); + if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(&url, '/'); + if(pData->bulkmode) { + if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - searchIndex = pData->searchIndex; - if(pData->dynSrchType) - searchType = tpl1; - else - searchType = pData->searchType; + if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + if(r == 0) r = es_addChar(&url, '/'); + if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } + if(r == 0) r = es_addChar(&url, '?'); if(pData->asyncRepl) { - if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async&timeout=%s", - pData->server, pData->port, searchIndex, searchType, - bulkpart, pData->timeout); - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async", - pData->server, pData->port, searchIndex, searchType, - bulkpart); - } - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", - pData->server, pData->port, searchIndex, searchType, bulkpart); + if(r == 0) r = es_addBuf(&url, "replication=async&", + sizeof("replication=async&")-1); } + if(pData->timeout != NULL) { + if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + } + restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + es_deleteStr(url); + free(restURL); if(pData->uid != NULL) { snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -221,18 +242,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) int length = strlen((char *)message); int r; DEFiRet; - -#if 0 - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); -#endif - - //if(pData->batch.currTpl1 == NULL - r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); +# define META_STRT "{\"index\":{\"_index\": \"" +# define META_TYPE "\",\"_type\":\"" +# define META_END "\"}}\n" + +#warning TODO: use dynamic index/type! + r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, + ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, + ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); - - if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); @@ -257,7 +280,6 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); -dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -303,14 +325,7 @@ BEGINendTransaction CODESTARTendTransaction cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), - NULL, NULL)); -#if 0 - if(pData->offsSndBuf != 0) { - iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); - pData->offsSndBuf = 0; - } -#endif + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); ENDendTransaction @@ -369,8 +384,8 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { - /* in this case, we know no tpls are involved --> NULL OK! */ + if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + /* in this case, we know no tpls are involved in the request-->NULL OK! */ setCurlURL(pData, NULL, NULL); } -- cgit v1.2.3 From 0314f370a4c15ce2190febfccee6664ecccab788 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 May 2012 15:14:16 +0200 Subject: omelasticsearch: dyn index&type now also supported in bulk mode --- plugins/omelasticsearch/omelasticsearch.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 704c9950..a09851b1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -241,18 +241,21 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) { int length = strlen((char *)message); int r; + uchar *searchIndex; + uchar *searchType; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_END "\"}}\n" #warning TODO: use dynamic index/type! + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, - ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, - ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + ustrlen(searchType)); if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); -- cgit v1.2.3 From b0a3e85e102c3e549574bf8f418ca643109f2884 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 13 Jun 2012 12:49:25 +0200 Subject: omelasticsearch: bugfix: bulkmode setting was not properly initialized --- plugins/omelasticsearch/omelasticsearch.c | 1 + 1 file changed, 1 insertion(+) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a09851b1..d8db7307 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -414,6 +414,7 @@ setInstParamDefaults(instanceData *pData) pData->dynSrchIdx = 0; pData->dynSrchType = 0; pData->asyncRepl = 0; + pData->bulkmode = 0; pData->tplName = NULL; } -- cgit v1.2.3 From ebaf375ed108b14c5a5a3af62067df988506bfec Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 9 Jul 2012 17:28:40 +0200 Subject: omelasticsearch: better debug instrumentation --- plugins/omelasticsearch/omelasticsearch.c | 48 +++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 15 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index d8db7307..c18c1c52 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -143,7 +143,7 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tserver='%s'\n", pData->server); dbgprintf("\tserverport=%d\n", pData->port); dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); - dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); dbgprintf("\ttimeout='%s'\n", pData->timeout); @@ -155,6 +155,7 @@ ENDdbgPrintInstInfo BEGINtryResume CODESTARTtryResume + DBGPRINTF("omelasticsearch: tryResume called\n"); ENDtryResume @@ -188,7 +189,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) uchar *searchIndex; uchar *searchType; es_str_t *url; + int rLocal; int r; + DEFiRet; getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); url = es_newStr(128); @@ -218,17 +221,23 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); es_deleteStr(url); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); free(restURL); if(pData->uid != NULL) { - snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, - (pData->pwd == NULL) ? "" : (char*)pData->pwd); - //TODO: create better code, check errors! + rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + if(rLocal != (int) es_strlen(url)) { + errmsg.LogError(0, RS_RET_ERR, "omelasticsearch: snprintf failed " + "when trying to build auth string (return %d)\n", + rLocal); + ABORT_FINALIZE(RS_RET_ERR); + } curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - return RS_RET_OK; +finalize_it: + RETiRet; } @@ -248,7 +257,6 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) # define META_TYPE "\",\"_type\":\"" # define META_END "\"}}\n" -#warning TODO: use dynamic index/type! getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, @@ -282,13 +290,18 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); +dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); +DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_CONNECT: case CURLE_WRITE_ERROR: STATSCOUNTER_INC(indexConFail, mutIndexConFail); + DBGPRINTF("omelasticsearch: we are suspending ourselfs due " + "to failure %lld of curl_easy_perform()\n", + (long long) code); return RS_RET_SUSPENDED; default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); @@ -315,22 +328,25 @@ CODESTARTdoAction if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); } else { +dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString[1], ppString[2])); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d\n", iRet); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); ENDdoAction BEGINendTransaction char *cstr; CODESTARTendTransaction +dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); - dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); +dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -353,11 +369,13 @@ DBGPRINTF("\n"); nmemb > sizeof(ok)-1 && strncmp(p, ok, sizeof(ok)-1) == 0) { STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); +dbgprintf("omelasticsearch ok\n"); } else { +dbgprintf("omelasticsearch fail\n"); STATSCOUNTER_INC(indexFailed, mutIndexFailed); if (Debug) { - DBGPRINTF("omelasticsearch request: %s\n", jsonData); - DBGPRINTF("omelasticsearch result: "); + DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); + DBGPRINTF("omelasticsearch (fail) result: "); for (i = 0; i < nmemb; i++) DBGPRINTF("%c", p[i]); DBGPRINTF("\n"); @@ -470,16 +488,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_UID_MISSING); } if(pData->dynSrchIdx && pData->searchIndex == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search index, but no " "name for index template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->dynSrchType && pData->searchType == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search type, but no " "name for type template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->bulkmode) { -- cgit v1.2.3 From 4d453967cbff1f09becab38a2ad10b05df476eaf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Jul 2012 12:10:34 +0200 Subject: omelasticsearch: implement retry via request to es server homepage --- plugins/omelasticsearch/omelasticsearch.c | 70 ++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 10 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index c18c1c52..a1f3b8ab 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -153,9 +153,66 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo + +/* Build basic URL part, which includes hostname and port as follows: + * http://hostname:port/ + * Newly creates an estr for this purpose. + */ +static rsRetVal +setBaseURL(instanceData *pData, es_str_t **url) +{ + char portBuf[64]; + int r; + DEFiRet; + + *url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + r = es_addBuf(url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(url, ':'); + if(r == 0) r = es_addBuf(url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(url, '/'); + RETiRet; +} + + +static inline rsRetVal +checkConn(instanceData *pData) +{ + es_str_t *url; + CURL *curl = NULL; + CURLcode res; + char *cstr; + DEFiRet; + + setBaseURL(pData, &url); + curl = curl_easy_init(); + if(curl == NULL) { + DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + cstr = es_str2cstr(url, NULL); + curl_easy_setopt(curl, CURLOPT_URL, cstr); + free(cstr); + + res = curl_easy_perform(curl); + if(res != CURLE_OK) { + dbgprintf("omelasticsearch: checkConn() curl_easy_perform() " + "failed: %s\n", curl_easy_strerror(res)); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(curl != NULL) + curl_easy_cleanup(curl); + RETiRet; +} + + BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); + iRet = checkConn(pData); ENDtryResume @@ -184,7 +241,6 @@ static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { char authBuf[1024]; - char portBuf[64]; char *restURL; uchar *searchIndex; uchar *searchType; @@ -194,18 +250,12 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) DEFiRet; getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); - url = es_newStr(128); - snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + setBaseURL(pData, &url); - r = es_addBuf(&url, "http://", sizeof("http://")-1); - if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); - if(r == 0) r = es_addChar(&url, ':'); - if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); - if(r == 0) r = es_addChar(&url, '/'); if(pData->bulkmode) { - if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } -- cgit v1.2.3 From 68056a6128b9ebc8d65791b2647030d36c73f014 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Jul 2012 17:15:03 +0200 Subject: omelasticsearch: support for parameters parent & dynparent added --- plugins/omelasticsearch/omelasticsearch.c | 115 ++++++++++++++++++++++++------ 1 file changed, 95 insertions(+), 20 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a1f3b8ab..5ddb66da 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -70,10 +70,12 @@ typedef struct _instanceData { uchar *pwd; uchar *searchIndex; uchar *searchType; + uchar *parent; uchar *tplName; uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool dynParent; sbool bulkmode; sbool asyncRepl; struct { @@ -95,8 +97,10 @@ static struct cnfparamdescr actpdescr[] = { { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "parent", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "dynparent", eCmdHdlrBinary, 0 }, { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, @@ -133,6 +137,7 @@ CODESTARTfreeInstance free(pData->pwd); free(pData->searchIndex); free(pData->searchType); + free(pData->parent); free(pData->tplName); ENDfreeInstance @@ -146,9 +151,11 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\tparent='%s'\n", pData->parent); dbgprintf("\ttimeout='%s'\n", pData->timeout); dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo @@ -197,10 +204,11 @@ checkConn(instanceData *pData) res = curl_easy_perform(curl); if(res != CURLE_OK) { - dbgprintf("omelasticsearch: checkConn() curl_easy_perform() " + DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: if(curl != NULL) @@ -218,38 +226,62 @@ ENDtryResume /* get the current index and type for this message */ static inline void -getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, - uchar **srchType) +getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3, + uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { *srchIndex = tpl1; - if(pData->dynSrchType) + if(pData->dynSrchType) { *srchType = tpl2; - else + if(pData->dynParent) { + *parent = tpl3; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpl2; + } else { + *parent = pData->parent; + } + } } else { *srchIndex = pData->searchIndex; - if(pData->dynSrchType) + if(pData->dynSrchType) { *srchType = tpl1; - else + if(pData->dynParent) { + *parent = tpl2; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpl1; + } else { + *parent = pData->parent; + } + } } } static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) { char authBuf[1024]; char *restURL; uchar *searchIndex; uchar *searchType; + uchar *parent; es_str_t *url; int rLocal; int r; DEFiRet; - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, + &searchIndex, &searchType, &parent); setBaseURL(pData, &url); if(pData->bulkmode) { @@ -267,6 +299,11 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) if(pData->timeout != NULL) { if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + if(r == 0) r = es_addChar(&url, '&'); + } + if(parent != NULL) { + if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); + if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -296,24 +333,29 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3) { int length = strlen((char *)message); int r; uchar *searchIndex; uchar *searchType; + uchar *parent; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" +# define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, + &searchIndex, &searchType, &parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -328,14 +370,15 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, + uchar *tpl1, uchar *tpl2, uchar *tpl3) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); + if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) + CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -376,11 +419,11 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3])); } else { dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2])); + ppString[1], ppString[2], ppString[3])); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -393,7 +436,7 @@ CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL)); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -455,13 +498,14 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + if( pData->bulkmode + || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL); + setCurlURL(pData, NULL, NULL, NULL); } if(Debug) { - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0) dbgprintf("omelasticsearch setup, using static REST URL\n"); else dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); @@ -478,9 +522,11 @@ setInstParamDefaults(instanceData *pData) pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; + pData->parent = NULL; pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->dynParent = 0; pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; @@ -513,10 +559,14 @@ CODESTARTnewActInst pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "parent")) { + pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynparent")) { + pData->dynParent = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { @@ -549,6 +599,12 @@ CODESTARTnewActInst "name for type template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynParent && pData->parent == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic parent, but no " + "name for parent template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -563,6 +619,7 @@ CODESTARTnewActInst iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; + if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) @@ -581,11 +638,29 @@ CODESTARTnewActInst if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } -- cgit v1.2.3 From fdbc4cb666b4fc92562ece1fba97227c40237e04 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 11 Jul 2012 17:12:34 +0200 Subject: omelasticsearch: regression from "parent" feature could case aborts this was not present in any released version --- plugins/omelasticsearch/omelasticsearch.c | 49 ++++++++++++++++--------------- 1 file changed, 26 insertions(+), 23 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 5ddb66da..f77caeca 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -226,22 +226,22 @@ ENDtryResume /* get the current index and type for this message */ static inline void -getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3, +getIndexTypeAndParent(instanceData *pData, uchar **tpls, uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { - *srchIndex = tpl1; + *srchIndex = tpls[1]; if(pData->dynSrchType) { - *srchType = tpl2; + *srchType = tpls[2]; if(pData->dynParent) { - *parent = tpl3; + *parent = tpls[3]; } else { *parent = pData->parent; } } else { *srchType = pData->searchType; if(pData->dynParent) { - *parent = tpl2; + *parent = tpls[2]; } else { *parent = pData->parent; } @@ -249,16 +249,16 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3 } else { *srchIndex = pData->searchIndex; if(pData->dynSrchType) { - *srchType = tpl1; + *srchType = tpls[1]; if(pData->dynParent) { - *parent = tpl2; + *parent = tpls[2]; } else { *parent = pData->parent; } } else { *srchType = pData->searchType; if(pData->dynParent) { - *parent = tpl1; + *parent = tpls[1]; } else { *parent = pData->parent; } @@ -268,7 +268,7 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3 static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) +setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; char *restURL; @@ -280,13 +280,13 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) int r; DEFiRet; - getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, - &searchIndex, &searchType, &parent); setBaseURL(pData, &url); if(pData->bulkmode) { r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + parent = NULL; } else { + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -333,7 +333,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3) +buildBatch(instanceData *pData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -346,16 +346,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar # define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, - &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); +dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); +dbgprintf("AAA: searchType: '%s'\n", searchType); +dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(parent != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -370,15 +374,14 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, - uchar *tpl1, uchar *tpl2, uchar *tpl3) +curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3)); + CHKiRet(setCurlURL(instance, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -419,11 +422,11 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3])); + CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2], ppString[3])); + ppString)); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -436,7 +439,7 @@ CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL)); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -501,7 +504,7 @@ curlSetup(instanceData *pData) if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL, NULL); + setCurlURL(pData, NULL); } if(Debug) { -- cgit v1.2.3 From 71121003e273dc9fb2d6dd2aac427fb2c6329d23 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Jul 2012 18:30:18 +0200 Subject: omelasticsearch: parse JSON response (in regard to data errors) note: bulkmode response processing is still mostly missing --- plugins/omelasticsearch/omelasticsearch.c | 102 +++++++++++++++++++----------- 1 file changed, 65 insertions(+), 37 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f77caeca..b49caf38 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -34,6 +34,7 @@ #include #include #include +#include "cjson.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -64,8 +65,9 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { - uchar *server; int port; + int replyLen; + uchar *server; uchar *uid; uchar *pwd; uchar *searchIndex; @@ -73,6 +75,7 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; @@ -139,6 +142,7 @@ CODESTARTfreeInstance free(pData->searchType); free(pData->parent); free(pData->tplName); + free(pData->timeout); ENDfreeInstance BEGINdbgPrintInstInfo @@ -202,12 +206,16 @@ checkConn(instanceData *pData) curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); + pData->reply = NULL; + pData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + free(pData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -373,22 +381,51 @@ finalize_it: RETiRet; } +static inline rsRetVal +checkResult(instanceData *pData) +{ + cJSON *root; + cJSON *ok; + DEFiRet; + + root = cJSON_Parse(pData->reply); + if(root == NULL) { + DBGPRINTF("omelasticsearch: could not parse JSON result \n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + if(pData->bulkmode) { + //TODO: implement + //iRet = checkResultBulkmode(pData, root); + } else { + ok = cJSON_GetObjectItem(root, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + } +finalize_it: + cJSON_Delete(root); + RETiRet; +} + + static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) +curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) { CURLcode code; - CURL *curl = instance->curlHandle; + CURL *curl = pData->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpls)); + pData->reply = NULL; + pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) + CHKiRet(setCurlURL(pData, tpls)); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); -dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); -DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -398,12 +435,18 @@ DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) co DBGPRINTF("omelasticsearch: we are suspending ourselfs due " "to failure %lld of curl_easy_perform()\n", (long long) code); - return RS_RET_SUSPENDED; + ABORT_FINALIZE(RS_RET_SUSPENDED); default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - return RS_RET_OK; + break; } + + pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ + DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); + + CHKiRet(checkResult(pData)); finalize_it: + free(pData->reply); RETiRet; } @@ -449,35 +492,20 @@ ENDendTransaction size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { - unsigned int i; char *p = (char *)ptr; - char *jsonData = (char *)userdata; - static char ok[] = "{\"ok\":true,"; - - ASSERT(size == 1); -DBGPRINTF("omelasticsearch request: %s\n", jsonData); -DBGPRINTF("omelasticsearch result: "); -for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); -DBGPRINTF("\n"); - - if (size == 1 && - nmemb > sizeof(ok)-1 && - strncmp(p, ok, sizeof(ok)-1) == 0) { - STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); -dbgprintf("omelasticsearch ok\n"); - } else { -dbgprintf("omelasticsearch fail\n"); - STATSCOUNTER_INC(indexFailed, mutIndexFailed); - if (Debug) { - DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); - DBGPRINTF("omelasticsearch (fail) result: "); - for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); - DBGPRINTF("\n"); - } + instanceData *pData = (instanceData*) userdata; + char *buf; + size_t newlen; + + newlen = pData->replyLen + size*nmemb; + if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); + return 0; /* abort due to failure */ } - return size * nmemb; + memcpy(buf+pData->replyLen, p, size*nmemb); + pData->replyLen = newlen; + pData->reply = buf; + return size*nmemb; } -- cgit v1.2.3 From 575686bd68a47fc5d9f59c1ed610f6680e1bb6fa Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 11:01:11 +0200 Subject: omelasticsearch: mileston: bulk reply is parsed --- plugins/omelasticsearch/omelasticsearch.c | 50 +++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index b49caf38..04113170 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -381,6 +381,53 @@ finalize_it: RETiRet; } +static inline rsRetVal +checkResultBulkmode(instanceData *pData, cJSON *root) +{ + int i; + int numitems; + cJSON *items; + cJSON *item; + cJSON *create; + cJSON *ok; + DEFiRet; + + items = cJSON_GetObjectItem(root, "items"); + if(items == NULL || items->type != cJSON_Array) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "bulkmode insert does not return array, reply is: %s\n", + pData->reply); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + numitems = cJSON_GetArraySize(items); +DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); + for(i = 0 ; i < numitems ; ++i) { + item = cJSON_GetArrayItem(items, i); + if(item == NULL) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "cannot obtain reply array item %d\n", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + create = cJSON_GetObjectItem(item, "create"); + if(create == NULL || create->type != cJSON_Object) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "cannot obtain 'create' item for #%d\n", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + ok = cJSON_GetObjectItem(create, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "item ok (%p) not ok\n", ok); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } +dbgprintf("omelasticsearch: item %d is OK\n", i); + } + +finalize_it: + RETiRet; +} + + static inline rsRetVal checkResult(instanceData *pData) { @@ -395,8 +442,7 @@ checkResult(instanceData *pData) } if(pData->bulkmode) { - //TODO: implement - //iRet = checkResultBulkmode(pData, root); + iRet = checkResultBulkmode(pData, root); } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { -- cgit v1.2.3 From 36f7fbd5f458c3e8b2924a72f74621e055f319b2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 12:40:49 +0200 Subject: omelasticsearch: support for writing data errors to local file added --- plugins/omelasticsearch/omelasticsearch.c | 109 ++++++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 14 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 04113170..fea85f22 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -34,6 +34,9 @@ #include #include #include +#include +#include +#include #include "cjson.h" #include "conf.h" #include "syslogd-types.h" @@ -67,6 +70,7 @@ typedef struct curl_slist HEADER; typedef struct _instanceData { int port; int replyLen; + int fdErrFile; /* error file fd or -1 if not open */ uchar *server; uchar *uid; uchar *pwd; @@ -75,6 +79,8 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *restURL; /* last used URL for error reporting */ + uchar *errorFile; char *reply; sbool dynSrchIdx; sbool dynSrchType; @@ -107,6 +113,7 @@ static struct cnfparamdescr actpdescr[] = { { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, + { "errorfile", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -117,6 +124,8 @@ static struct cnfparamblk actpblk = BEGINcreateInstance CODESTARTcreateInstance + pData->restURL = NULL; + pData->fdErrFile = -1; ENDcreateInstance BEGINisCompatibleWithFeature @@ -135,6 +144,8 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + if(pData->fdErrFile != -1) + close(pData->fdErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -143,6 +154,8 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); + free(pData->restURL); + free(pData->errorFile); ENDfreeInstance BEGINdbgPrintInstInfo @@ -162,6 +175,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); + dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? + (uchar*)"(not configured)" : pData->errorFile); ENDdbgPrintInstInfo @@ -279,7 +294,6 @@ static rsRetVal setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; - char *restURL; uchar *searchIndex; uchar *searchType; uchar *parent; @@ -313,11 +327,12 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - restURL = es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + free(pData->restURL); + pData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - free(restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -355,9 +370,6 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_END "\"}}\n" getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); -dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); -dbgprintf("AAA: searchType: '%s'\n", searchType); -dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -381,6 +393,55 @@ finalize_it: RETiRet; } + +/* write data error request/replies to separate error file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + */ +static inline rsRetVal +writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +{ + char *rendered = NULL; + cJSON *errRoot; + cJSON *req; + cJSON *replyRoot = *pReplyRoot; + char errStr[1024]; + DEFiRet; + + if(pData->errorFile == NULL) + FINALIZE; + + if(pData->fdErrFile == -1) { + pData->fdErrFile = open((char*)pData->errorFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdErrFile == -1) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("omelasticsearch: error opening error file: %s\n", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); + + if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(errRoot, "request", req); + cJSON_AddItemToObject(errRoot, "reply", replyRoot); + rendered = cJSON_Print(errRoot); +DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); + write(pData->fdErrFile, rendered, strlen(rendered)); + free(rendered); + cJSON_Delete(errRoot); + *pReplyRoot = NULL; /* tell caller not to delete once again! */ + +finalize_it: + if(rendered != NULL) + free(rendered); + RETiRet; +} + + static inline rsRetVal checkResultBulkmode(instanceData *pData, cJSON *root) { @@ -420,7 +481,6 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); "item ok (%p) not ok\n", ok); ABORT_FINALIZE(RS_RET_DATAFAIL); } -dbgprintf("omelasticsearch: item %d is OK\n", i); } finalize_it: @@ -429,7 +489,7 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData) +checkResult(instanceData *pData, uchar *reqmsg) { cJSON *root; cJSON *ok; @@ -446,11 +506,21 @@ checkResult(instanceData *pData) } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { - ABORT_FINALIZE(RS_RET_DATAFAIL); + iRet = RS_RET_DATAFAIL; } } + + /* Note: we ignore errors writing the error file, as we cannot handle + * these in any case. + */ + if(iRet == RS_RET_DATAFAIL) { + writeDataError(pData, &root, reqmsg); + iRet = RS_RET_OK; /* we have handled the problem! */ + } + finalize_it: - cJSON_Delete(root); + if(root != NULL) + cJSON_Delete(root); RETiRet; } @@ -490,7 +560,7 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); - CHKiRet(checkResult(pData)); + CHKiRet(checkResult(pData, message)); finalize_it: free(pData->reply); RETiRet; @@ -513,7 +583,6 @@ CODESTARTdoAction if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { -dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString)); } @@ -607,6 +676,7 @@ setInstParamDefaults(instanceData *pData) pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; + pData->errorFile = NULL; } BEGINnewActInst @@ -626,6 +696,8 @@ CODESTARTnewActInst continue; if(!strcmp(actpblk.descr[i].name, "server")) { pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "errorfile")) { + pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; } else if(!strcmp(actpblk.descr[i].name, "uid")) { @@ -767,6 +839,14 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct +BEGINdoHUP +CODESTARTdoHUP + if(pData->fdErrFile != -1) { + close(pData->fdErrFile); + pData->fdErrFile = -1; + } +ENDdoHUP + BEGINmodExit CODESTARTmodExit @@ -781,6 +861,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_doHUP CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt -- cgit v1.2.3 From cb12ed5dd211f9910860e86fd937180895c459de Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 13:06:44 +0200 Subject: omelasticsearch: improved debug logging --- plugins/omelasticsearch/omelasticsearch.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index fea85f22..48cfef7d 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -408,8 +408,11 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) char errStr[1024]; DEFiRet; - if(pData->errorFile == NULL) + if(pData->errorFile == NULL) { + DBGPRINTF("omelasticsearch: no local error logger defined - " + "ignoring ES error information\n"); FINALIZE; + } if(pData->fdErrFile == -1) { pData->fdErrFile = open((char*)pData->errorFile, @@ -478,7 +481,7 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); ok = cJSON_GetObjectItem(create, "ok"); if(ok == NULL || ok->type != cJSON_True) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " - "item ok (%p) not ok\n", ok); + "item %d, prop ok (%p) not ok\n", i, ok); ABORT_FINALIZE(RS_RET_DATAFAIL); } } -- cgit v1.2.3 From b11c85aac8b41eb74fb14c486f88940df8819bbc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 29 Oct 2012 11:49:07 +0100 Subject: move cJSON to omelasticsearch the rsyslog runtime itself now uses json-c, but omelasticsearch currently depends on cJSON. We will change this, but not yet. Let's merge as is and see that it works well ;) --- plugins/omelasticsearch/omelasticsearch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 48cfef7d..a5833b15 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -37,7 +37,7 @@ #include #include #include -#include "cjson.h" +#include "cJSON/cjson.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" -- cgit v1.2.3 From e5ef73eb25c8dda2e19c593ad2fc0a960aa8873b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 1 Nov 2012 17:05:07 +0100 Subject: bugfix: invalid rsyslog-internal macro API use This had no bad effect, because the macro did the same as the one that should have been used. --- plugins/omelasticsearch/omelasticsearch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a5833b15..982f4318 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -773,7 +773,7 @@ CODESTARTnewActInst if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); - CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) + CODE_STD_STRING_REQUESTnewActInst(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), -- cgit v1.2.3 From 206908e2887cedf78d12d70b29591c52276d685a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 1 Nov 2012 17:05:07 +0100 Subject: bugfix: invalid rsyslog-internal macro API use This had no bad effect, because the macro did the same as the one that should have been used. --- plugins/omelasticsearch/omelasticsearch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f77caeca..50acdf11 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -624,7 +624,7 @@ CODESTARTnewActInst if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); - CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) + CODE_STD_STRING_REQUESTnewActInst(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), -- cgit v1.2.3 From 3352d2c605567c29840cc93a358d60881c865cb7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 14 Dec 2012 09:20:51 +0100 Subject: minor cleanup --- plugins/omelasticsearch/omelasticsearch.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 982f4318..499a2bb2 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -405,6 +405,8 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON *errRoot; cJSON *req; cJSON *replyRoot = *pReplyRoot; + size_t toWrite; + ssize_t wrRet; char errStr[1024]; DEFiRet; @@ -432,8 +434,16 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON_AddItemToObject(errRoot, "request", req); cJSON_AddItemToObject(errRoot, "reply", replyRoot); rendered = cJSON_Print(errRoot); -DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); - write(pData->fdErrFile, rendered, strlen(rendered)); + /* we do not do real error-handling on the err file, as this finally complicates + * things way to much. + */ + DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); + toWrite = strlen(rendered); + wrRet = write(pData->fdErrFile, rendered, toWrite); + if(wrRet != (ssize_t) toWrite) { + DBGPRINTF("omelasticsearch: error %d writing error file, write returns %lld\n", + errno, (long long) wrRet); + } free(rendered); cJSON_Delete(errRoot); *pReplyRoot = NULL; /* tell caller not to delete once again! */ -- cgit v1.2.3 From 614b1fba0a56629579ec9bf1cc07cc19bba35e0c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sun, 13 Jan 2013 11:30:48 +0100 Subject: bugfix: omelasticsearch failed when authentication data was provided ... at least in most cases it emitted an error message: "snprintf failed when trying to build auth string" Thanks to Joerg Heinemann for alerting us. closes: http://bugzilla.adiscon.com/show_bug.cgi?id=404 --- plugins/omelasticsearch/omelasticsearch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 50acdf11..00e4dbac 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -314,7 +314,7 @@ setCurlURL(instanceData *pData, uchar **tpls) if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, (pData->pwd == NULL) ? "" : (char*)pData->pwd); - if(rLocal != (int) es_strlen(url)) { + if(rLocal < 1) { errmsg.LogError(0, RS_RET_ERR, "omelasticsearch: snprintf failed " "when trying to build auth string (return %d)\n", rLocal); -- cgit v1.2.3 From 2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Renard?= Date: Wed, 24 Apr 2013 10:56:52 +0200 Subject: omelasticsearch: _id field support for bulk operations also max number of templates for plugin use has been increased to five closes: http://bugzilla.adiscon.com/show_bug.cgi?id=392 --- plugins/omelasticsearch/omelasticsearch.c | 122 +++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 17 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f27fe62b..33e58c1a 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -11,11 +11,11 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * -or- * see COPYING.ASL20 in the source distribution - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -79,12 +79,14 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *bulkId; uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; + sbool dynBulkId; sbool bulkmode; sbool asyncRepl; struct { @@ -114,7 +116,9 @@ static struct cnfparamdescr actpdescr[] = { { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 } + { "template", eCmdHdlrGetWord, 1 }, + { "dynbulkid", eCmdHdlrBinary, 0 }, + { "bulkid", eCmdHdlrGetWord, 0 }, }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, @@ -156,6 +160,7 @@ CODESTARTfreeInstance free(pData->timeout); free(pData->restURL); free(pData->errorFile); + free(pData->bulkId); ENDfreeInstance BEGINdbgPrintInstInfo @@ -177,6 +182,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tbulkmode=%d\n", pData->bulkmode); dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? (uchar*)"(not configured)" : pData->errorFile); + dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId); + dbgprintf("\tbulkid='%s'\n", pData->bulkId); ENDdbgPrintInstInfo @@ -220,7 +227,7 @@ checkConn(instanceData *pData) cstr = es_str2cstr(url, NULL); curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - + pData->reply = NULL; pData->replyLen = 0; curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); @@ -250,7 +257,8 @@ ENDtryResume /* get the current index and type for this message */ static inline void getIndexTypeAndParent(instanceData *pData, uchar **tpls, - uchar **srchIndex, uchar **srchType, uchar **parent) + uchar **srchIndex, uchar **srchType, uchar **parent, + uchar **bulkId) { if(pData->dynSrchIdx) { *srchIndex = tpls[1]; @@ -258,15 +266,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[2]; if(pData->dynParent) { *parent = tpls[3]; + if(pData->dynBulkId) { + *bulkId = tpls[4]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } } else { @@ -275,15 +295,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[1]; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[1]; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[1]; + } } } } @@ -297,6 +329,7 @@ setCurlURL(instanceData *pData, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId; es_str_t *url; int rLocal; int r; @@ -308,7 +341,7 @@ setCurlURL(instanceData *pData, uchar **tpls) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); parent = NULL; } else { - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -330,7 +363,7 @@ setCurlURL(instanceData *pData, uchar **tpls) free(pData->restURL); pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); @@ -343,7 +376,7 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: @@ -363,13 +396,15 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId = NULL; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_PARENT "\",\"_parent\":\"" +# define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -380,6 +415,10 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); } + if(bulkId != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -409,7 +448,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) ssize_t wrRet; char errStr[1024]; DEFiRet; - + if(pData->errorFile == NULL) { DBGPRINTF("omelasticsearch: no local error logger defined - " "ignoring ES error information\n"); @@ -524,7 +563,7 @@ checkResult(instanceData *pData, uchar *reqmsg) } /* Note: we ignore errors writing the error file, as we cannot handle - * these in any case. + * these in any case. */ if(iRet == RS_RET_DATAFAIL) { writeDataError(pData, &root, reqmsg); @@ -552,8 +591,8 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) CHKiRet(setCurlURL(pData, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: @@ -649,10 +688,10 @@ curlSetup(instanceData *pData) } header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_POST, 1); + curl_easy_setopt(handle, CURLOPT_POST, 1); pData->curlHandle = handle; pData->postHeader = header; @@ -690,6 +729,8 @@ setInstParamDefaults(instanceData *pData) pData->bulkmode = 0; pData->tplName = NULL; pData->errorFile = NULL; + pData->dynBulkId= 0; + pData->bulkId = NULL; } BEGINnewActInst @@ -737,12 +778,16 @@ CODESTARTnewActInst pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) { + pData->dynBulkId = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkid")) { + pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("omelasticsearch: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); } } - + if(pData->pwd != NULL && pData->uid == NULL) { errmsg.LogError(0, RS_RET_UID_MISSING, "omelasticsearch: password is provided, but no uid " @@ -767,6 +812,12 @@ CODESTARTnewActInst "name for parent template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynBulkId && pData->bulkId == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic bulkid, but no " + "name for bulkid template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -782,6 +833,7 @@ CODESTARTnewActInst if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; + if(pData->dynBulkId) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTnewActInst(iNumTpls) @@ -803,11 +855,29 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 4, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } } else { @@ -817,12 +887,30 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); - } + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } } } -- cgit v1.2.3