From f8019d52f83884acb5e8f8755fe976d1592b4ccb Mon Sep 17 00:00:00 2001 From: Nathan Scott Date: Mon, 15 Aug 2011 22:22:13 +1000 Subject: add elasticsearch output module Add support for sending events to elasticsearch - a distributed, RESTful, search engine built on Lucene (www.elasticsearch.org). The output module is enabled via a configure option, and uses libcurl to send the messages from rsyslog to elasticsearch. This patch makes use of the earlier JSON quoting patch to ensure valid JSON strings are sent to the server. Signed-off-by: Nathan Scott --- plugins/omelasticsearch/Makefile.am | 8 + plugins/omelasticsearch/omelasticsearch.c | 271 ++++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+) create mode 100644 plugins/omelasticsearch/Makefile.am create mode 100644 plugins/omelasticsearch/omelasticsearch.c (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am new file mode 100644 index 00000000..a574c72f --- /dev/null +++ b/plugins/omelasticsearch/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omelasticsearch.la + +omelasticsearch_la_SOURCES = omelasticsearch.c +omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +omelasticsearch_la_LDFLAGS = -module -avoid-version +omelasticsearch_la_LIBADD = $(CURL_LIBS) + +EXTRA_DIST = diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c new file mode 100644 index 00000000..ce2e0c1f --- /dev/null +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -0,0 +1,271 @@ +/* omelasticsearch.c + * This is the http://www.elasticsearch.org/ output module. + * + * NOTE: read comments in module-template.h for more specifics! + * + * Copyright 2011 Nathan Scott. + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "statsobj.h" +#include "cfsysline.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP + +/* internal structures */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) + +statsobj_t *indexStats; +STATSCOUNTER_DEF(indexConFail, mutIndexConFail) +STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) +STATSCOUNTER_DEF(indexFailed, mutIndexFailed) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) + +/* REST API for elasticsearch hits this URL: + * http://:// + */ +typedef struct curl_slist HEADER; +typedef struct _instanceData { + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ +} instanceData; + +/* config variables */ +static int restPort = 9200; +static char *hostName = "localhost"; +static char *searchIndex = "system"; +static char *searchType = "events"; + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +BEGINfreeInstance +CODESTARTfreeInstance + if (pData->postHeader) { + curl_slist_free_all(pData->postHeader); + pData->postHeader = NULL; + } + if (pData->curlHandle) { + curl_easy_cleanup(pData->curlHandle); + pData->curlHandle = NULL; + } +ENDfreeInstance + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + +rsRetVal +curlPost(instanceData *instance, uchar *message) +{ + CURLcode code; + CURL *curl = instance->curlHandle; + int length = strlen((char *)message); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + code = curl_easy_perform(curl); + switch (code) { + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_CONNECT: + case CURLE_WRITE_ERROR: + STATSCOUNTER_INC(indexConFail, mutIndexConFail); + return RS_RET_SUSPENDED; + default: + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + return RS_RET_OK; + } +} + +BEGINdoAction +CODESTARTdoAction + CHKiRet(curlPost(pData, ppString[0])); +finalize_it: +ENDdoAction + +/* elasticsearch POST result string ... useful for debugging */ +size_t +curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) +{ + unsigned int i; + char *p = (char *)ptr; + char *jsonData = (char *)userdata; + static char ok[] = "{\"ok\":true,"; + + ASSERT(size == 1); + + if (size == 1 && + nmemb > sizeof(ok)-1 && + strncmp(p, ok, sizeof(ok)-1) == 0) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else { + STATSCOUNTER_INC(indexFailed, mutIndexFailed); + if (Debug) { + DBGPRINTF("omelasticsearch request: %s\n", jsonData); + DBGPRINTF("omelasticsearch result: "); + for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); + DBGPRINTF("\n"); + } + } + return size * nmemb; +} + +static rsRetVal +curlSetup(instanceData *instance) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + HEADER *header; + CURL *handle; + + handle = curl_easy_init(); + if (handle == NULL) { + return RS_RET_OBJ_CREATION_FAILED; + } + + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + hostName, restPort, searchIndex, searchType); + header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_URL, restURL); + curl_easy_setopt(handle, CURLOPT_POST, 1); + + instance->curlHandle = handle; + instance->postHeader = header; + + DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + return RS_RET_OK; +} + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + + /* check if a non-standard template is to be applied */ + if(*(p-1) == ';') + --p; + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + + /* all good, we can now initialise our private data */ + CHKiRet(curlSetup(pData)); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINmodExit +CODESTARTmodExit + curl_global_cleanup(); + statsobj.Destruct(&indexStats); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + DEFiRet; + restPort = 9200; + hostName = "localhost"; + searchIndex = "system"; + searchType = "events"; + RETiRet; +} + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); + ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&indexStats)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", + ctrType_IntCtr, &indexConFail)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + ctrType_IntCtr, &indexSubmit)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", + ctrType_IntCtr, &indexFailed)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", + ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.ConstructFinalize(indexStats)); +ENDmodInit + +/* vi:set ai: + */ -- cgit v1.2.3 From 9911dacf59e898bae2c8c6619b85348bf54ddc23 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 17 Jan 2012 10:42:04 +0100 Subject: elasticsearch: move to asl 2.0 email conversation Nathan/Rainer 2012-01-16&17 --- plugins/omelasticsearch/omelasticsearch.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index ce2e0c1f..3bec1838 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -8,20 +8,19 @@ * * This file is part of rsyslog. * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see . - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include "config.h" #include "rsyslog.h" -- cgit v1.2.3 From 24179166f748464f4d556a80bd0c8dcc4cbd57b0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 5 Apr 2012 17:26:16 +0200 Subject: omelasticsearch: moved to v6 config system --- plugins/omelasticsearch/omelasticsearch.c | 126 +++++++++++++++++++++--------- 1 file changed, 91 insertions(+), 35 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 3bec1838..8965d40b 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -29,7 +29,7 @@ #include #include #include -#include +//#include #include #include #include @@ -46,6 +46,7 @@ MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omelasticsearch") /* internal structures */ DEF_OMOD_STATIC_DATA @@ -63,15 +64,30 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { + uchar *server; + int port; + uchar *searchIndex; + uchar *searchType; + uchar *tplName; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; -/* config variables */ -static int restPort = 9200; -static char *hostName = "localhost"; -static char *searchIndex = "system"; -static char *searchType = "events"; + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "server", eCmdHdlrGetWord, 0 }, + { "serverport", eCmdHdlrInt, 0 }, + { "searchindex", eCmdHdlrGetWord, 0 }, + { "searchtype", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; BEGINcreateInstance CODESTARTcreateInstance @@ -93,6 +109,10 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + free(pData->server); + free(pData->searchIndex); + free(pData->searchType); + free(pData->tplName); ENDfreeInstance BEGINdbgPrintInstInfo @@ -174,7 +194,7 @@ curlSetup(instanceData *instance) } snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - hostName, restPort, searchIndex, searchType); + instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); @@ -189,25 +209,77 @@ curlSetup(instanceData *instance) return RS_RET_OK; } -BEGINparseSelectorAct -CODESTARTparseSelectorAct -CODE_STD_STRING_REQUESTparseSelectorAct(1) - if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { - ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->server = NULL; + pData->port = 9200; + pData->searchIndex = NULL; + pData->searchType = NULL; + pData->tplName = NULL; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } - p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + CODE_STD_STRING_REQUESTparseSelectorAct(1) + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "server")) { + pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "serverport")) { + pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("omelasticsearch: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + " StdJSONFmt" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); - /* check if a non-standard template is to be applied */ - if(*(p-1) == ';') - --p; - CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + if(pData->server == NULL) + pData->server = (uchar*) strdup("localhost"); + if(pData->searchIndex == NULL) + pData->searchIndex = (uchar*) strdup("system"); + if(pData->searchType == NULL) + pData->searchType = (uchar*) strdup("events"); - /* all good, we can now initialise our private data */ +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); CHKiRet(curlSetup(pData)); +ENDnewActInst + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch supports only v6 config format, use: " + "action(type=\"omelasticsearch\" server=...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct + BEGINmodExit CODESTARTmodExit curl_global_cleanup(); @@ -220,18 +292,9 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt -static rsRetVal -resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) -{ - DEFiRet; - restPort = 9200; - hostName = "localhost"; - searchIndex = "system"; - searchType = "events"; - RETiRet; -} BEGINmodInit() CODESTARTmodInit @@ -240,13 +303,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); - /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); - if (curl_global_init(CURL_GLOBAL_ALL) != 0) { errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); -- cgit v1.2.3 From f9bb245a4ae06151897a4a68d6c22ba88a6ecee8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 13 Apr 2012 12:12:39 +0200 Subject: omelasticsearch: permit dynamic index/type parameters (just like dynafile) --- plugins/omelasticsearch/omelasticsearch.c | 123 ++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 16 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 8965d40b..1143de06 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -43,6 +43,7 @@ #include "errmsg.h" #include "statsobj.h" #include "cfsysline.h" +#include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -69,6 +70,8 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + sbool dynSrchIdx; + sbool dynSrchType; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -81,6 +84,8 @@ static struct cnfparamdescr actpdescr[] = { { "serverport", eCmdHdlrInt, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "dynsearchindex", eCmdHdlrBinary, 0 }, + { "dynsearchtype", eCmdHdlrBinary, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -123,12 +128,44 @@ BEGINtryResume CODESTARTtryResume ENDtryResume -rsRetVal -curlPost(instanceData *instance, uchar *message) + +static rsRetVal +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + uchar *searchIndex; + uchar *searchType; + + if(pData->dynSrchIdx) { + searchIndex = tpl1; + if(pData->dynSrchType) + searchType = tpl2; + else + searchType = pData->searchType; + } else { + searchIndex = pData->searchIndex; + if(pData->dynSrchType) + searchType = tpl1; + else + searchType = pData->searchType; + } + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); + return RS_RET_OK; +} + +static rsRetVal +curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; int length = strlen((char *)message); + DEFiRet; + + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -145,11 +182,13 @@ curlPost(instanceData *instance, uchar *message) STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); return RS_RET_OK; } +finalize_it: + RETiRet; } BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0])); + CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction @@ -181,10 +220,10 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) return size * nmemb; } + static rsRetVal -curlSetup(instanceData *instance) +curlSetup(instanceData *pData) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ HEADER *header; CURL *handle; @@ -193,19 +232,26 @@ curlSetup(instanceData *instance) return RS_RET_OBJ_CREATION_FAILED; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); - curl_easy_setopt(handle, CURLOPT_URL, restURL); curl_easy_setopt(handle, CURLOPT_POST, 1); - instance->curlHandle = handle; - instance->postHeader = header; + pData->curlHandle = handle; + pData->postHeader = header; - DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { + /* in this case, we know no tpls are involved --> NULL OK! */ + setCurlURL(pData, NULL, NULL); + } + + if(Debug) { + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + dbgprintf("omelasticsearch setup, using static REST URL\n"); + else + dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); + } return RS_RET_OK; } @@ -216,12 +262,15 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->dynSrchIdx = 0; + pData->dynSrchType = 0; pData->tplName = NULL; } BEGINnewActInst struct cnfparamvals *pvals; int i; + int iNumTpls; CODESTARTnewActInst if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); @@ -230,7 +279,6 @@ CODESTARTnewActInst CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); - CODE_STD_STRING_REQUESTparseSelectorAct(1) for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; @@ -238,10 +286,14 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; - } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { + pData->dynSrchIdx = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { + pData->dynSrchType = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { @@ -249,11 +301,49 @@ CODESTARTnewActInst "param '%s'\n", actpblk.descr[i].name); } } + + if(pData->dynSrchIdx && pData->searchIndex == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search index, but no " + "name for index template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + if(pData->dynSrchType && pData->searchType == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search type, but no " + "name for type template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + + iNumTpls = 1; + if(pData->dynSrchIdx) ++iNumTpls; + if(pData->dynSrchType) ++iNumTpls; + DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); + CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); + + /* we need to request additional templates. If we have a dynamic search index, + * it will always be string 1. Type may be 1 or 2, depending on whether search + * index is dynamic as well. Rule needs to be followed throughout the module. + */ + if(pData->dynSrchIdx) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } + if(pData->server == NULL) pData->server = (uchar*) strdup("localhost"); if(pData->searchIndex == NULL) @@ -261,9 +351,10 @@ CODESTARTnewActInst if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); + CHKiRet(curlSetup(pData)); + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); - CHKiRet(curlSetup(pData)); ENDnewActInst -- cgit v1.2.3 From 7d8ac48a4a214da917f7455f24b4c292c3e6d015 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 16 Apr 2012 16:49:07 +0200 Subject: omelasticsearch: added asnycRepl and timeout config params --- plugins/omelasticsearch/omelasticsearch.c | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1143de06..0b2e0bf1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h for more specifics! * * Copyright 2011 Nathan Scott. - * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -70,8 +70,10 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -86,6 +88,8 @@ static struct cnfparamdescr actpdescr[] = { { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "asyncrepl", eCmdHdlrBinary, 0 }, + { "timeout", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -149,8 +153,21 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) else searchType = pData->searchType; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + if(pData->asyncRepl) { + if(pData->timeout != NULL) { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async&timeout=%s", + pData->server, pData->port, searchIndex, searchType, + pData->timeout); + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async", + pData->server, pData->port, searchIndex, searchType); + } + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); return RS_RET_OK; @@ -262,8 +279,10 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->asyncRepl = 0; pData->tplName = NULL; } @@ -294,6 +313,10 @@ CODESTARTnewActInst pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "timeout")) { + pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { + pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { -- cgit v1.2.3 From 1acde66c3274c57d084146621d4124eaaef9ccf9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 16 Apr 2012 17:46:46 +0200 Subject: make "old" omelasticsearch compile this module is to be replaced by branch currently in development. But I want to be able to compile the previous one, at least ;) --- plugins/omelasticsearch/omelasticsearch.c | 1 - 1 file changed, 1 deletion(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 3bec1838..2b451eb4 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include -- cgit v1.2.3 From 54966fd878bcf1c52019e0ec977da4d7b0a9f52a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Apr 2012 15:05:01 +0200 Subject: omelasticsearch: provide authentication support (UNTESTED) --- plugins/omelasticsearch/omelasticsearch.c | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 79a24ffe..c20c67c1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -66,6 +66,8 @@ typedef struct curl_slist HEADER; typedef struct _instanceData { uchar *server; int port; + uchar *uid; + uchar *pwd; uchar *searchIndex; uchar *searchType; uchar *tplName; @@ -83,6 +85,8 @@ typedef struct _instanceData { static struct cnfparamdescr actpdescr[] = { { "server", eCmdHdlrGetWord, 0 }, { "serverport", eCmdHdlrInt, 0 }, + { "uid", eCmdHdlrGetWord, 0 }, + { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, @@ -118,6 +122,8 @@ CODESTARTfreeInstance pData->curlHandle = NULL; } free(pData->server); + free(pData->uid); + free(pData->pwd); free(pData->searchIndex); free(pData->searchType); free(pData->tplName); @@ -125,6 +131,7 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo + dbgprintf("omelasticsearch, target server %s", pData->server); ENDdbgPrintInstInfo BEGINtryResume @@ -136,6 +143,7 @@ static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + char authBuf[1024]; uchar *searchIndex; uchar *searchType; @@ -168,6 +176,14 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) pData->server, pData->port, searchIndex, searchType); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + if(pData->uid != NULL) { + snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + //TODO: create better code, check errors! + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + } DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); return RS_RET_OK; } @@ -276,6 +292,8 @@ setInstParamDefaults(instanceData *pData) { pData->server = NULL; pData->port = 9200; + pData->uid = NULL; + pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; pData->timeout = NULL; @@ -304,6 +322,10 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "uid")) { + pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "pwd")) { + pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { @@ -324,6 +346,12 @@ CODESTARTnewActInst } } + if(pData->pwd != NULL && pData->uid == NULL) { + errmsg.LogError(0, RS_RET_UID_MISSING, + "omelasticsearch: password is provided, but no uid " + "- action definition invalid"); + ABORT_FINALIZE(RS_RET_UID_MISSING); + } if(pData->dynSrchIdx && pData->searchIndex == NULL) { errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omelasticsearch: requested dynamic search index, but no " -- cgit v1.2.3 From a4c0d08c78ef27891b192973174b997a0c7c4aa1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 24 May 2012 16:39:39 +0200 Subject: omelasticsearch: added transactional interface & better debug output --- plugins/omelasticsearch/omelasticsearch.c | 37 ++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index c20c67c1..7642d603 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -74,6 +74,7 @@ typedef struct _instanceData { uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool bulkmode; sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ @@ -91,6 +92,7 @@ static struct cnfparamdescr actpdescr[] = { { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } @@ -131,7 +133,19 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo - dbgprintf("omelasticsearch, target server %s", pData->server); + dbgprintf("omelasticsearch\n"); + dbgprintf("\ttemplate='%s'\n", pData->tplName); + dbgprintf("\tserver='%s'\n", pData->server); + dbgprintf("\tserverport=%d\n", pData->port); + dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); + dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tsearch index='%s'\n", pData->searchIndex); + dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\ttimeout='%s'\n", pData->timeout); + dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); + dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tasync replication=%d\n", pData->asyncRepl); + dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo BEGINtryResume @@ -218,12 +232,30 @@ finalize_it: RETiRet; } +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omelasticsearch: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction + +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("elasticsearch: endTransaction\n"); +#if 0 + if(pData->offsSndBuf != 0) { + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + pData->offsSndBuf = 0; + } +#endif +ENDendTransaction + /* elasticsearch POST result string ... useful for debugging */ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) @@ -334,6 +366,8 @@ CODESTARTnewActInst pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { + pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { @@ -434,6 +468,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt -- cgit v1.2.3 From c7ca67a37586164a028c52a4d0cd9328b09e8697 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 May 2012 08:03:19 +0200 Subject: omelasticsearch: test commit, first shot at bulk interface This obviously does not work correctly. So expect problems if you set bulkmode="on". --- plugins/omelasticsearch/omelasticsearch.c | 96 +++++++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 11 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 7642d603..1705c605 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -76,6 +76,11 @@ typedef struct _instanceData { sbool dynSrchType; sbool bulkmode; sbool asyncRepl; + struct { + es_str_t *data; + uchar *currTpl1; + uchar *currTpl2; + } batch; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -160,7 +165,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) char authBuf[1024]; uchar *searchIndex; uchar *searchType; + uchar *bulkpart; + bulkpart = (pData->bulkmode) ? "/_bulk" : ""; if(pData->dynSrchIdx) { searchIndex = tpl1; if(pData->dynSrchType) @@ -176,18 +183,19 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) } if(pData->asyncRepl) { if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async&timeout=%s", pData->server, pData->port, searchIndex, searchType, - pData->timeout); + bulkpart, pData->timeout); } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async", - pData->server, pData->port, searchIndex, searchType); + pData->server, pData->port, searchIndex, searchType, + bulkpart); } } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", + pData->server, pData->port, searchIndex, searchType, bulkpart); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -202,12 +210,44 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) return RS_RET_OK; } + +/* this method does not directly submit but builds a batch instead. It + * may submit, if we have dynamic index/type and the current type or + * index changes. + */ +static rsRetVal +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +{ + int length = strlen((char *)message); + int r; + DEFiRet; + +#if 0 + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); +#endif + + //if(pData->batch.currTpl1 == NULL + r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); + + if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r != 0) { + DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); + ABORT_FINALIZE(RS_RET_ERR); + } + iRet = RS_RET_DEFER_COMMIT; + +finalize_it: + RETiRet; +} + static rsRetVal -curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; - int length = strlen((char *)message); DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType) @@ -215,8 +255,9 @@ curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); +dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -235,25 +276,43 @@ finalize_it: BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("omelasticsearch: beginTransaction\n"); + if(!pData->bulkmode) { + FINALIZE; + } + + es_emptyStr(pData->batch.data); +finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); + if(pData->bulkmode) { + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + } else { + CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + ppString[1], ppString[2])); + } finalize_it: +dbgprintf("omelasticsearch: result doAction: %d\n", iRet); ENDdoAction BEGINendTransaction + char *cstr; CODESTARTendTransaction -dbgprintf("elasticsearch: endTransaction\n"); + cstr = es_str2cstr(pData->batch.data, NULL); + dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), + NULL, NULL)); #if 0 if(pData->offsSndBuf != 0) { iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); pData->offsSndBuf = 0; } #endif +finalize_it: + free(cstr); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -266,6 +325,11 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) static char ok[] = "{\"ok\":true,"; ASSERT(size == 1); +DBGPRINTF("omelasticsearch request: %s\n", jsonData); +DBGPRINTF("omelasticsearch result: "); +for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); +DBGPRINTF("\n"); if (size == 1 && nmemb > sizeof(ok)-1 && @@ -399,6 +463,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); } + if(pData->bulkmode) { + pData->batch.currTpl1 = NULL; + pData->batch.currTpl2 = NULL; + if((pData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; -- cgit v1.2.3 From 5f85909b17920172121d2ff8367c8185623f1409 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 May 2012 18:55:29 +0200 Subject: omelasticsearc: milestone, bulk insert basically works dynamic index&type is not yet used, but easy to add (did not manage to get it in today...) --- plugins/omelasticsearch/omelasticsearch.c | 115 +++++++++++++++++------------- 1 file changed, 65 insertions(+), 50 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1705c605..704c9950 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -158,46 +158,67 @@ CODESTARTtryResume ENDtryResume +/* get the current index and type for this message */ +static inline void +getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, + uchar **srchType) +{ + if(pData->dynSrchIdx) { + *srchIndex = tpl1; + if(pData->dynSrchType) + *srchType = tpl2; + else + *srchType = pData->searchType; + } else { + *srchIndex = pData->searchIndex; + if(pData->dynSrchType) + *srchType = tpl1; + else + *srchType = pData->searchType; + } +} + + static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ char authBuf[1024]; + char portBuf[64]; + char *restURL; uchar *searchIndex; uchar *searchType; - uchar *bulkpart; + es_str_t *url; + int r; - bulkpart = (pData->bulkmode) ? "/_bulk" : ""; - if(pData->dynSrchIdx) { - searchIndex = tpl1; - if(pData->dynSrchType) - searchType = tpl2; - else - searchType = pData->searchType; + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + + r = es_addBuf(&url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(&url, ':'); + if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(&url, '/'); + if(pData->bulkmode) { + if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - searchIndex = pData->searchIndex; - if(pData->dynSrchType) - searchType = tpl1; - else - searchType = pData->searchType; + if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + if(r == 0) r = es_addChar(&url, '/'); + if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } + if(r == 0) r = es_addChar(&url, '?'); if(pData->asyncRepl) { - if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async&timeout=%s", - pData->server, pData->port, searchIndex, searchType, - bulkpart, pData->timeout); - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async", - pData->server, pData->port, searchIndex, searchType, - bulkpart); - } - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", - pData->server, pData->port, searchIndex, searchType, bulkpart); + if(r == 0) r = es_addBuf(&url, "replication=async&", + sizeof("replication=async&")-1); } + if(pData->timeout != NULL) { + if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + } + restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + es_deleteStr(url); + free(restURL); if(pData->uid != NULL) { snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -221,18 +242,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) int length = strlen((char *)message); int r; DEFiRet; - -#if 0 - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); -#endif - - //if(pData->batch.currTpl1 == NULL - r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); +# define META_STRT "{\"index\":{\"_index\": \"" +# define META_TYPE "\",\"_type\":\"" +# define META_END "\"}}\n" + +#warning TODO: use dynamic index/type! + r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, + ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, + ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); - - if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); @@ -257,7 +280,6 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); -dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -303,14 +325,7 @@ BEGINendTransaction CODESTARTendTransaction cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), - NULL, NULL)); -#if 0 - if(pData->offsSndBuf != 0) { - iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); - pData->offsSndBuf = 0; - } -#endif + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); ENDendTransaction @@ -369,8 +384,8 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { - /* in this case, we know no tpls are involved --> NULL OK! */ + if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + /* in this case, we know no tpls are involved in the request-->NULL OK! */ setCurlURL(pData, NULL, NULL); } -- cgit v1.2.3 From 0314f370a4c15ce2190febfccee6664ecccab788 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 May 2012 15:14:16 +0200 Subject: omelasticsearch: dyn index&type now also supported in bulk mode --- plugins/omelasticsearch/omelasticsearch.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 704c9950..a09851b1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -241,18 +241,21 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) { int length = strlen((char *)message); int r; + uchar *searchIndex; + uchar *searchType; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_END "\"}}\n" #warning TODO: use dynamic index/type! + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, - ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, - ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + ustrlen(searchType)); if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); -- cgit v1.2.3 From b0a3e85e102c3e549574bf8f418ca643109f2884 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 13 Jun 2012 12:49:25 +0200 Subject: omelasticsearch: bugfix: bulkmode setting was not properly initialized --- plugins/omelasticsearch/omelasticsearch.c | 1 + 1 file changed, 1 insertion(+) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a09851b1..d8db7307 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -414,6 +414,7 @@ setInstParamDefaults(instanceData *pData) pData->dynSrchIdx = 0; pData->dynSrchType = 0; pData->asyncRepl = 0; + pData->bulkmode = 0; pData->tplName = NULL; } -- cgit v1.2.3 From ebaf375ed108b14c5a5a3af62067df988506bfec Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 9 Jul 2012 17:28:40 +0200 Subject: omelasticsearch: better debug instrumentation --- plugins/omelasticsearch/omelasticsearch.c | 48 +++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 15 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index d8db7307..c18c1c52 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -143,7 +143,7 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tserver='%s'\n", pData->server); dbgprintf("\tserverport=%d\n", pData->port); dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); - dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); dbgprintf("\ttimeout='%s'\n", pData->timeout); @@ -155,6 +155,7 @@ ENDdbgPrintInstInfo BEGINtryResume CODESTARTtryResume + DBGPRINTF("omelasticsearch: tryResume called\n"); ENDtryResume @@ -188,7 +189,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) uchar *searchIndex; uchar *searchType; es_str_t *url; + int rLocal; int r; + DEFiRet; getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); url = es_newStr(128); @@ -218,17 +221,23 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); es_deleteStr(url); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); free(restURL); if(pData->uid != NULL) { - snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, - (pData->pwd == NULL) ? "" : (char*)pData->pwd); - //TODO: create better code, check errors! + rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + if(rLocal != (int) es_strlen(url)) { + errmsg.LogError(0, RS_RET_ERR, "omelasticsearch: snprintf failed " + "when trying to build auth string (return %d)\n", + rLocal); + ABORT_FINALIZE(RS_RET_ERR); + } curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - return RS_RET_OK; +finalize_it: + RETiRet; } @@ -248,7 +257,6 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) # define META_TYPE "\",\"_type\":\"" # define META_END "\"}}\n" -#warning TODO: use dynamic index/type! getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, @@ -282,13 +290,18 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); +dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); +DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_CONNECT: case CURLE_WRITE_ERROR: STATSCOUNTER_INC(indexConFail, mutIndexConFail); + DBGPRINTF("omelasticsearch: we are suspending ourselfs due " + "to failure %lld of curl_easy_perform()\n", + (long long) code); return RS_RET_SUSPENDED; default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); @@ -315,22 +328,25 @@ CODESTARTdoAction if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); } else { +dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString[1], ppString[2])); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d\n", iRet); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); ENDdoAction BEGINendTransaction char *cstr; CODESTARTendTransaction +dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); - dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); +dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -353,11 +369,13 @@ DBGPRINTF("\n"); nmemb > sizeof(ok)-1 && strncmp(p, ok, sizeof(ok)-1) == 0) { STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); +dbgprintf("omelasticsearch ok\n"); } else { +dbgprintf("omelasticsearch fail\n"); STATSCOUNTER_INC(indexFailed, mutIndexFailed); if (Debug) { - DBGPRINTF("omelasticsearch request: %s\n", jsonData); - DBGPRINTF("omelasticsearch result: "); + DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); + DBGPRINTF("omelasticsearch (fail) result: "); for (i = 0; i < nmemb; i++) DBGPRINTF("%c", p[i]); DBGPRINTF("\n"); @@ -470,16 +488,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_UID_MISSING); } if(pData->dynSrchIdx && pData->searchIndex == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search index, but no " "name for index template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->dynSrchType && pData->searchType == NULL) { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omelasticsearch: requested dynamic search type, but no " "name for type template given - action definition invalid"); - ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } if(pData->bulkmode) { -- cgit v1.2.3 From 4d453967cbff1f09becab38a2ad10b05df476eaf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Jul 2012 12:10:34 +0200 Subject: omelasticsearch: implement retry via request to es server homepage --- plugins/omelasticsearch/omelasticsearch.c | 70 ++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 10 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index c18c1c52..a1f3b8ab 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -153,9 +153,66 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo + +/* Build basic URL part, which includes hostname and port as follows: + * http://hostname:port/ + * Newly creates an estr for this purpose. + */ +static rsRetVal +setBaseURL(instanceData *pData, es_str_t **url) +{ + char portBuf[64]; + int r; + DEFiRet; + + *url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + r = es_addBuf(url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(url, ':'); + if(r == 0) r = es_addBuf(url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(url, '/'); + RETiRet; +} + + +static inline rsRetVal +checkConn(instanceData *pData) +{ + es_str_t *url; + CURL *curl = NULL; + CURLcode res; + char *cstr; + DEFiRet; + + setBaseURL(pData, &url); + curl = curl_easy_init(); + if(curl == NULL) { + DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + cstr = es_str2cstr(url, NULL); + curl_easy_setopt(curl, CURLOPT_URL, cstr); + free(cstr); + + res = curl_easy_perform(curl); + if(res != CURLE_OK) { + dbgprintf("omelasticsearch: checkConn() curl_easy_perform() " + "failed: %s\n", curl_easy_strerror(res)); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(curl != NULL) + curl_easy_cleanup(curl); + RETiRet; +} + + BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); + iRet = checkConn(pData); ENDtryResume @@ -184,7 +241,6 @@ static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { char authBuf[1024]; - char portBuf[64]; char *restURL; uchar *searchIndex; uchar *searchType; @@ -194,18 +250,12 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) DEFiRet; getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); - url = es_newStr(128); - snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + setBaseURL(pData, &url); - r = es_addBuf(&url, "http://", sizeof("http://")-1); - if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); - if(r == 0) r = es_addChar(&url, ':'); - if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); - if(r == 0) r = es_addChar(&url, '/'); if(pData->bulkmode) { - if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } -- cgit v1.2.3 From 68056a6128b9ebc8d65791b2647030d36c73f014 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 10 Jul 2012 17:15:03 +0200 Subject: omelasticsearch: support for parameters parent & dynparent added --- plugins/omelasticsearch/omelasticsearch.c | 115 ++++++++++++++++++++++++------ 1 file changed, 95 insertions(+), 20 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a1f3b8ab..5ddb66da 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -70,10 +70,12 @@ typedef struct _instanceData { uchar *pwd; uchar *searchIndex; uchar *searchType; + uchar *parent; uchar *tplName; uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool dynParent; sbool bulkmode; sbool asyncRepl; struct { @@ -95,8 +97,10 @@ static struct cnfparamdescr actpdescr[] = { { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "parent", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "dynparent", eCmdHdlrBinary, 0 }, { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, @@ -133,6 +137,7 @@ CODESTARTfreeInstance free(pData->pwd); free(pData->searchIndex); free(pData->searchType); + free(pData->parent); free(pData->tplName); ENDfreeInstance @@ -146,9 +151,11 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); dbgprintf("\tsearch index='%s'\n", pData->searchIndex); dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\tparent='%s'\n", pData->parent); dbgprintf("\ttimeout='%s'\n", pData->timeout); dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo @@ -197,10 +204,11 @@ checkConn(instanceData *pData) res = curl_easy_perform(curl); if(res != CURLE_OK) { - dbgprintf("omelasticsearch: checkConn() curl_easy_perform() " + DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: if(curl != NULL) @@ -218,38 +226,62 @@ ENDtryResume /* get the current index and type for this message */ static inline void -getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, - uchar **srchType) +getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3, + uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { *srchIndex = tpl1; - if(pData->dynSrchType) + if(pData->dynSrchType) { *srchType = tpl2; - else + if(pData->dynParent) { + *parent = tpl3; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpl2; + } else { + *parent = pData->parent; + } + } } else { *srchIndex = pData->searchIndex; - if(pData->dynSrchType) + if(pData->dynSrchType) { *srchType = tpl1; - else + if(pData->dynParent) { + *parent = tpl2; + } else { + *parent = pData->parent; + } + } else { *srchType = pData->searchType; + if(pData->dynParent) { + *parent = tpl1; + } else { + *parent = pData->parent; + } + } } } static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) { char authBuf[1024]; char *restURL; uchar *searchIndex; uchar *searchType; + uchar *parent; es_str_t *url; int rLocal; int r; DEFiRet; - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, + &searchIndex, &searchType, &parent); setBaseURL(pData, &url); if(pData->bulkmode) { @@ -267,6 +299,11 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) if(pData->timeout != NULL) { if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + if(r == 0) r = es_addChar(&url, '&'); + } + if(parent != NULL) { + if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); + if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -296,24 +333,29 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3) { int length = strlen((char *)message); int r; uchar *searchIndex; uchar *searchType; + uchar *parent; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" +# define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" - getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, + &searchIndex, &searchType, &parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -328,14 +370,15 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, + uchar *tpl1, uchar *tpl2, uchar *tpl3) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); + if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) + CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -376,11 +419,11 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3])); } else { dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2])); + ppString[1], ppString[2], ppString[3])); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -393,7 +436,7 @@ CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL)); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -455,13 +498,14 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + if( pData->bulkmode + || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL); + setCurlURL(pData, NULL, NULL, NULL); } if(Debug) { - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0) dbgprintf("omelasticsearch setup, using static REST URL\n"); else dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); @@ -478,9 +522,11 @@ setInstParamDefaults(instanceData *pData) pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; + pData->parent = NULL; pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->dynParent = 0; pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; @@ -513,10 +559,14 @@ CODESTARTnewActInst pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "parent")) { + pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynparent")) { + pData->dynParent = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { @@ -549,6 +599,12 @@ CODESTARTnewActInst "name for type template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynParent && pData->parent == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic parent, but no " + "name for parent template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -563,6 +619,7 @@ CODESTARTnewActInst iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; + if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) @@ -581,11 +638,29 @@ CODESTARTnewActInst if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynSrchType) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + } } } -- cgit v1.2.3 From fdbc4cb666b4fc92562ece1fba97227c40237e04 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 11 Jul 2012 17:12:34 +0200 Subject: omelasticsearch: regression from "parent" feature could case aborts this was not present in any released version --- plugins/omelasticsearch/omelasticsearch.c | 49 ++++++++++++++++--------------- 1 file changed, 26 insertions(+), 23 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 5ddb66da..f77caeca 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -226,22 +226,22 @@ ENDtryResume /* get the current index and type for this message */ static inline void -getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3, +getIndexTypeAndParent(instanceData *pData, uchar **tpls, uchar **srchIndex, uchar **srchType, uchar **parent) { if(pData->dynSrchIdx) { - *srchIndex = tpl1; + *srchIndex = tpls[1]; if(pData->dynSrchType) { - *srchType = tpl2; + *srchType = tpls[2]; if(pData->dynParent) { - *parent = tpl3; + *parent = tpls[3]; } else { *parent = pData->parent; } } else { *srchType = pData->searchType; if(pData->dynParent) { - *parent = tpl2; + *parent = tpls[2]; } else { *parent = pData->parent; } @@ -249,16 +249,16 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3 } else { *srchIndex = pData->searchIndex; if(pData->dynSrchType) { - *srchType = tpl1; + *srchType = tpls[1]; if(pData->dynParent) { - *parent = tpl2; + *parent = tpls[2]; } else { *parent = pData->parent; } } else { *srchType = pData->searchType; if(pData->dynParent) { - *parent = tpl1; + *parent = tpls[1]; } else { *parent = pData->parent; } @@ -268,7 +268,7 @@ getIndexTypeAndParent(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3 static rsRetVal -setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) +setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; char *restURL; @@ -280,13 +280,13 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar *tpl3) int r; DEFiRet; - getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, - &searchIndex, &searchType, &parent); setBaseURL(pData, &url); if(pData->bulkmode) { r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + parent = NULL; } else { + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -333,7 +333,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar *tpl3) +buildBatch(instanceData *pData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -346,16 +346,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2, uchar # define META_PARENT "\",\"_parent\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpl1, tpl2, tpl3, - &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); +dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); +dbgprintf("AAA: searchType: '%s'\n", searchType); +dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, ustrlen(searchType)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(parent != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -370,15 +374,14 @@ finalize_it: } static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, - uchar *tpl1, uchar *tpl2, uchar *tpl3) +curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) { CURLcode code; CURL *curl = instance->curlHandle; DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpl1, tpl2, tpl3)); + CHKiRet(setCurlURL(instance, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -419,11 +422,11 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2], ppString[3])); + CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString[1], ppString[2], ppString[3])); + ppString)); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -436,7 +439,7 @@ CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL, NULL)); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -501,7 +504,7 @@ curlSetup(instanceData *pData) if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL, NULL, NULL); + setCurlURL(pData, NULL); } if(Debug) { -- cgit v1.2.3 From 71121003e273dc9fb2d6dd2aac427fb2c6329d23 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Jul 2012 18:30:18 +0200 Subject: omelasticsearch: parse JSON response (in regard to data errors) note: bulkmode response processing is still mostly missing --- plugins/omelasticsearch/omelasticsearch.c | 102 +++++++++++++++++++----------- 1 file changed, 65 insertions(+), 37 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f77caeca..b49caf38 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -34,6 +34,7 @@ #include #include #include +#include "cjson.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -64,8 +65,9 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { - uchar *server; int port; + int replyLen; + uchar *server; uchar *uid; uchar *pwd; uchar *searchIndex; @@ -73,6 +75,7 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; @@ -139,6 +142,7 @@ CODESTARTfreeInstance free(pData->searchType); free(pData->parent); free(pData->tplName); + free(pData->timeout); ENDfreeInstance BEGINdbgPrintInstInfo @@ -202,12 +206,16 @@ checkConn(instanceData *pData) curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); + pData->reply = NULL; + pData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + free(pData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -373,22 +381,51 @@ finalize_it: RETiRet; } +static inline rsRetVal +checkResult(instanceData *pData) +{ + cJSON *root; + cJSON *ok; + DEFiRet; + + root = cJSON_Parse(pData->reply); + if(root == NULL) { + DBGPRINTF("omelasticsearch: could not parse JSON result \n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + if(pData->bulkmode) { + //TODO: implement + //iRet = checkResultBulkmode(pData, root); + } else { + ok = cJSON_GetObjectItem(root, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + } +finalize_it: + cJSON_Delete(root); + RETiRet; +} + + static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) +curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) { CURLcode code; - CURL *curl = instance->curlHandle; + CURL *curl = pData->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpls)); + pData->reply = NULL; + pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) + CHKiRet(setCurlURL(pData, tpls)); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); -dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); -DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -398,12 +435,18 @@ DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) co DBGPRINTF("omelasticsearch: we are suspending ourselfs due " "to failure %lld of curl_easy_perform()\n", (long long) code); - return RS_RET_SUSPENDED; + ABORT_FINALIZE(RS_RET_SUSPENDED); default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - return RS_RET_OK; + break; } + + pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ + DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); + + CHKiRet(checkResult(pData)); finalize_it: + free(pData->reply); RETiRet; } @@ -449,35 +492,20 @@ ENDendTransaction size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { - unsigned int i; char *p = (char *)ptr; - char *jsonData = (char *)userdata; - static char ok[] = "{\"ok\":true,"; - - ASSERT(size == 1); -DBGPRINTF("omelasticsearch request: %s\n", jsonData); -DBGPRINTF("omelasticsearch result: "); -for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); -DBGPRINTF("\n"); - - if (size == 1 && - nmemb > sizeof(ok)-1 && - strncmp(p, ok, sizeof(ok)-1) == 0) { - STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); -dbgprintf("omelasticsearch ok\n"); - } else { -dbgprintf("omelasticsearch fail\n"); - STATSCOUNTER_INC(indexFailed, mutIndexFailed); - if (Debug) { - DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); - DBGPRINTF("omelasticsearch (fail) result: "); - for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); - DBGPRINTF("\n"); - } + instanceData *pData = (instanceData*) userdata; + char *buf; + size_t newlen; + + newlen = pData->replyLen + size*nmemb; + if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); + return 0; /* abort due to failure */ } - return size * nmemb; + memcpy(buf+pData->replyLen, p, size*nmemb); + pData->replyLen = newlen; + pData->reply = buf; + return size*nmemb; } -- cgit v1.2.3 From 575686bd68a47fc5d9f59c1ed610f6680e1bb6fa Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 11:01:11 +0200 Subject: omelasticsearch: mileston: bulk reply is parsed --- plugins/omelasticsearch/omelasticsearch.c | 50 +++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index b49caf38..04113170 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -381,6 +381,53 @@ finalize_it: RETiRet; } +static inline rsRetVal +checkResultBulkmode(instanceData *pData, cJSON *root) +{ + int i; + int numitems; + cJSON *items; + cJSON *item; + cJSON *create; + cJSON *ok; + DEFiRet; + + items = cJSON_GetObjectItem(root, "items"); + if(items == NULL || items->type != cJSON_Array) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "bulkmode insert does not return array, reply is: %s\n", + pData->reply); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + numitems = cJSON_GetArraySize(items); +DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); + for(i = 0 ; i < numitems ; ++i) { + item = cJSON_GetArrayItem(items, i); + if(item == NULL) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "cannot obtain reply array item %d\n", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + create = cJSON_GetObjectItem(item, "create"); + if(create == NULL || create->type != cJSON_Object) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "cannot obtain 'create' item for #%d\n", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + ok = cJSON_GetObjectItem(create, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "item ok (%p) not ok\n", ok); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } +dbgprintf("omelasticsearch: item %d is OK\n", i); + } + +finalize_it: + RETiRet; +} + + static inline rsRetVal checkResult(instanceData *pData) { @@ -395,8 +442,7 @@ checkResult(instanceData *pData) } if(pData->bulkmode) { - //TODO: implement - //iRet = checkResultBulkmode(pData, root); + iRet = checkResultBulkmode(pData, root); } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { -- cgit v1.2.3 From 36f7fbd5f458c3e8b2924a72f74621e055f319b2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 12:40:49 +0200 Subject: omelasticsearch: support for writing data errors to local file added --- plugins/omelasticsearch/README | 17 +++++ plugins/omelasticsearch/omelasticsearch.c | 109 ++++++++++++++++++++++++++---- 2 files changed, 112 insertions(+), 14 deletions(-) create mode 100644 plugins/omelasticsearch/README (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/README b/plugins/omelasticsearch/README new file mode 100644 index 00000000..9021bc0e --- /dev/null +++ b/plugins/omelasticsearch/README @@ -0,0 +1,17 @@ +How to produce an error: +======================== +It's quite easy to get 400, if you put a wrong mapping to your +index. That would be easy to reproduce in "normal" omelasticsearch usage +conditions, by only altering the ES configuration: + +1. Make your index first. Let's call it "testindex": +$ curl -XPUT localhost:9200/testindex/ + +2. Put your mapping for a search type called "mytype", where you specify +that date property should be an integer: +$ curl -XPUT localhost:9200/testindex/mytype/_mapping -d '{"mytype":{"properties": {"timegenerated":{"type":"integer"}}}}' + +3. Now try to insert something where date is not an integer: +$ curl -XPOST localhost:9200/testindex/mytype/ -d '{"timegenerated":"bla"}' +{"error":"MapperParsingException[Failed to parse [date]]; nested: NumberFormatException[For input string: \"bla\"]; ","status":400} + diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 04113170..fea85f22 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -34,6 +34,9 @@ #include #include #include +#include +#include +#include #include "cjson.h" #include "conf.h" #include "syslogd-types.h" @@ -67,6 +70,7 @@ typedef struct curl_slist HEADER; typedef struct _instanceData { int port; int replyLen; + int fdErrFile; /* error file fd or -1 if not open */ uchar *server; uchar *uid; uchar *pwd; @@ -75,6 +79,8 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *restURL; /* last used URL for error reporting */ + uchar *errorFile; char *reply; sbool dynSrchIdx; sbool dynSrchType; @@ -107,6 +113,7 @@ static struct cnfparamdescr actpdescr[] = { { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, + { "errorfile", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -117,6 +124,8 @@ static struct cnfparamblk actpblk = BEGINcreateInstance CODESTARTcreateInstance + pData->restURL = NULL; + pData->fdErrFile = -1; ENDcreateInstance BEGINisCompatibleWithFeature @@ -135,6 +144,8 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + if(pData->fdErrFile != -1) + close(pData->fdErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -143,6 +154,8 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); + free(pData->restURL); + free(pData->errorFile); ENDfreeInstance BEGINdbgPrintInstInfo @@ -162,6 +175,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); + dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? + (uchar*)"(not configured)" : pData->errorFile); ENDdbgPrintInstInfo @@ -279,7 +294,6 @@ static rsRetVal setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; - char *restURL; uchar *searchIndex; uchar *searchType; uchar *parent; @@ -313,11 +327,12 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - restURL = es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + free(pData->restURL); + pData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - free(restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -355,9 +370,6 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_END "\"}}\n" getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); -dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); -dbgprintf("AAA: searchType: '%s'\n", searchType); -dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -381,6 +393,55 @@ finalize_it: RETiRet; } + +/* write data error request/replies to separate error file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + */ +static inline rsRetVal +writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +{ + char *rendered = NULL; + cJSON *errRoot; + cJSON *req; + cJSON *replyRoot = *pReplyRoot; + char errStr[1024]; + DEFiRet; + + if(pData->errorFile == NULL) + FINALIZE; + + if(pData->fdErrFile == -1) { + pData->fdErrFile = open((char*)pData->errorFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdErrFile == -1) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("omelasticsearch: error opening error file: %s\n", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); + + if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(errRoot, "request", req); + cJSON_AddItemToObject(errRoot, "reply", replyRoot); + rendered = cJSON_Print(errRoot); +DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); + write(pData->fdErrFile, rendered, strlen(rendered)); + free(rendered); + cJSON_Delete(errRoot); + *pReplyRoot = NULL; /* tell caller not to delete once again! */ + +finalize_it: + if(rendered != NULL) + free(rendered); + RETiRet; +} + + static inline rsRetVal checkResultBulkmode(instanceData *pData, cJSON *root) { @@ -420,7 +481,6 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); "item ok (%p) not ok\n", ok); ABORT_FINALIZE(RS_RET_DATAFAIL); } -dbgprintf("omelasticsearch: item %d is OK\n", i); } finalize_it: @@ -429,7 +489,7 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData) +checkResult(instanceData *pData, uchar *reqmsg) { cJSON *root; cJSON *ok; @@ -446,11 +506,21 @@ checkResult(instanceData *pData) } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { - ABORT_FINALIZE(RS_RET_DATAFAIL); + iRet = RS_RET_DATAFAIL; } } + + /* Note: we ignore errors writing the error file, as we cannot handle + * these in any case. + */ + if(iRet == RS_RET_DATAFAIL) { + writeDataError(pData, &root, reqmsg); + iRet = RS_RET_OK; /* we have handled the problem! */ + } + finalize_it: - cJSON_Delete(root); + if(root != NULL) + cJSON_Delete(root); RETiRet; } @@ -490,7 +560,7 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); - CHKiRet(checkResult(pData)); + CHKiRet(checkResult(pData, message)); finalize_it: free(pData->reply); RETiRet; @@ -513,7 +583,6 @@ CODESTARTdoAction if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { -dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString)); } @@ -607,6 +676,7 @@ setInstParamDefaults(instanceData *pData) pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; + pData->errorFile = NULL; } BEGINnewActInst @@ -626,6 +696,8 @@ CODESTARTnewActInst continue; if(!strcmp(actpblk.descr[i].name, "server")) { pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "errorfile")) { + pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; } else if(!strcmp(actpblk.descr[i].name, "uid")) { @@ -767,6 +839,14 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct +BEGINdoHUP +CODESTARTdoHUP + if(pData->fdErrFile != -1) { + close(pData->fdErrFile); + pData->fdErrFile = -1; + } +ENDdoHUP + BEGINmodExit CODESTARTmodExit @@ -781,6 +861,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_doHUP CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt -- cgit v1.2.3 From cb12ed5dd211f9910860e86fd937180895c459de Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 13:06:44 +0200 Subject: omelasticsearch: improved debug logging --- plugins/omelasticsearch/omelasticsearch.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index fea85f22..48cfef7d 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -408,8 +408,11 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) char errStr[1024]; DEFiRet; - if(pData->errorFile == NULL) + if(pData->errorFile == NULL) { + DBGPRINTF("omelasticsearch: no local error logger defined - " + "ignoring ES error information\n"); FINALIZE; + } if(pData->fdErrFile == -1) { pData->fdErrFile = open((char*)pData->errorFile, @@ -478,7 +481,7 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); ok = cJSON_GetObjectItem(create, "ok"); if(ok == NULL || ok->type != cJSON_True) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " - "item ok (%p) not ok\n", ok); + "item %d, prop ok (%p) not ok\n", i, ok); ABORT_FINALIZE(RS_RET_DATAFAIL); } } -- cgit v1.2.3 From b11c85aac8b41eb74fb14c486f88940df8819bbc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 29 Oct 2012 11:49:07 +0100 Subject: move cJSON to omelasticsearch the rsyslog runtime itself now uses json-c, but omelasticsearch currently depends on cJSON. We will change this, but not yet. Let's merge as is and see that it works well ;) --- plugins/omelasticsearch/Makefile.am | 3 +- plugins/omelasticsearch/cJSON/README | 247 ++++++++++++++ plugins/omelasticsearch/cJSON/cjson.c | 514 ++++++++++++++++++++++++++++++ plugins/omelasticsearch/cJSON/cjson.h | 130 ++++++++ plugins/omelasticsearch/cJSON/test.c | 156 +++++++++ plugins/omelasticsearch/cJSON/tests/test1 | 22 ++ plugins/omelasticsearch/cJSON/tests/test2 | 11 + plugins/omelasticsearch/cJSON/tests/test3 | 26 ++ plugins/omelasticsearch/cJSON/tests/test4 | 88 +++++ plugins/omelasticsearch/cJSON/tests/test5 | 27 ++ plugins/omelasticsearch/omelasticsearch.c | 2 +- 11 files changed, 1224 insertions(+), 2 deletions(-) create mode 100644 plugins/omelasticsearch/cJSON/README create mode 100644 plugins/omelasticsearch/cJSON/cjson.c create mode 100644 plugins/omelasticsearch/cJSON/cjson.h create mode 100644 plugins/omelasticsearch/cJSON/test.c create mode 100644 plugins/omelasticsearch/cJSON/tests/test1 create mode 100644 plugins/omelasticsearch/cJSON/tests/test2 create mode 100644 plugins/omelasticsearch/cJSON/tests/test3 create mode 100644 plugins/omelasticsearch/cJSON/tests/test4 create mode 100644 plugins/omelasticsearch/cJSON/tests/test5 (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am index a574c72f..059bdf8a 100644 --- a/plugins/omelasticsearch/Makefile.am +++ b/plugins/omelasticsearch/Makefile.am @@ -1,6 +1,7 @@ pkglib_LTLIBRARIES = omelasticsearch.la -omelasticsearch_la_SOURCES = omelasticsearch.c +# TODO: replace cJSON +omelasticsearch_la_SOURCES = omelasticsearch.c cJSON/cjson.c cJSON/cjson.h omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) omelasticsearch_la_LDFLAGS = -module -avoid-version omelasticsearch_la_LIBADD = $(CURL_LIBS) diff --git a/plugins/omelasticsearch/cJSON/README b/plugins/omelasticsearch/cJSON/README new file mode 100644 index 00000000..7531c049 --- /dev/null +++ b/plugins/omelasticsearch/cJSON/README @@ -0,0 +1,247 @@ +/* + Copyright (c) 2009 Dave Gamble + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +Welcome to cJSON. + +cJSON aims to be the dumbest possible parser that you can get your job done with. +It's a single file of C, and a single header file. + +JSON is described best here: http://www.json.org/ +It's like XML, but fat-free. You use it to move data around, store things, or just +generally represent your program's state. + + +First up, how do I build? +Add cJSON.c to your project, and put cJSON.h somewhere in the header search path. +For example, to build the test app: + +gcc cJSON.c test.c -o test -lm +./test + + +As a library, cJSON exists to take away as much legwork as it can, but not get in your way. +As a point of pragmatism (i.e. ignoring the truth), I'm going to say that you can use it +in one of two modes: Auto and Manual. Let's have a quick run-through. + + +I lifted some JSON from this page: http://www.json.org/fatfree.html +That page inspired me to write cJSON, which is a parser that tries to share the same +philosophy as JSON itself. Simple, dumb, out of the way. + +Some JSON: +{ + "name": "Jack (\"Bee\") Nimble", + "format": { + "type": "rect", + "width": 1920, + "height": 1080, + "interlace": false, + "frame rate": 24 + } +} + +Assume that you got this from a file, a webserver, or magic JSON elves, whatever, +you have a char * to it. Everything is a cJSON struct. +Get it parsed: + cJSON *root = cJSON_Parse(my_json_string); + +This is an object. We're in C. We don't have objects. But we do have structs. +What's the framerate? + + cJSON *format = cJSON_GetObjectItem(root,"format"); + int framerate = cJSON_GetObjectItem(format,"frame rate")->valueint; + + +Want to change the framerate? + cJSON_GetObjectItem(format,"frame rate")->valueint=25; + +Back to disk? + char *rendered=cJSON_Print(root); + +Finished? Delete the root (this takes care of everything else). + cJSON_Delete(root); + +That's AUTO mode. If you're going to use Auto mode, you really ought to check pointers +before you dereference them. If you want to see how you'd build this struct in code? + cJSON *root,*fmt; + root=cJSON_CreateObject(); + cJSON_AddItemToObject(root, "name", cJSON_CreateString("Jack (\"Bee\") Nimble")); + cJSON_AddItemToObject(root, "format", fmt=cJSON_CreateObject()); + cJSON_AddStringToObject(fmt,"type", "rect"); + cJSON_AddNumberToObject(fmt,"width", 1920); + cJSON_AddNumberToObject(fmt,"height", 1080); + cJSON_AddFalseToObject (fmt,"interlace"); + cJSON_AddNumberToObject(fmt,"frame rate", 24); + +Hopefully we can agree that's not a lot of code? There's no overhead, no unnecessary setup. +Look at test.c for a bunch of nice examples, mostly all ripped off the json.org site, and +a few from elsewhere. + +What about manual mode? First up you need some detail. +Let's cover how the cJSON objects represent the JSON data. +cJSON doesn't distinguish arrays from objects in handling; just type. +Each cJSON has, potentially, a child, siblings, value, a name. + +The root object has: Object Type and a Child +The Child has name "name", with value "Jack ("Bee") Nimble", and a sibling: +Sibling has type Object, name "format", and a child. +That child has type String, name "type", value "rect", and a sibling: +Sibling has type Number, name "width", value 1920, and a sibling: +Sibling has type Number, name "height", value 1080, and a sibling: +Sibling hs type False, name "interlace", and a sibling: +Sibling has type Number, name "frame rate", value 24 + +Here's the structure: +typedef struct cJSON { + struct cJSON *next,*prev; + struct cJSON *child; + + int type; + + char *valuestring; + int valueint; + double valuedouble; + + char *string; +} cJSON; + +By default all values are 0 unless set by virtue of being meaningful. + +next/prev is a doubly linked list of siblings. next takes you to your sibling, +prev takes you back from your sibling to you. +Only objects and arrays have a "child", and it's the head of the doubly linked list. +A "child" entry will have prev==0, but next potentially points on. The last sibling has next=0. +The type expresses Null/True/False/Number/String/Array/Object, all of which are #defined in +cJSON.h + +A Number has valueint and valuedouble. If you're expecting an int, read valueint, if not read +valuedouble. + +Any entry which is in the linked list which is the child of an object will have a "string" +which is the "name" of the entry. When I said "name" in the above example, that's "string". +"string" is the JSON name for the 'variable name' if you will. + +Now you can trivially walk the lists, recursively, and parse as you please. +You can invoke cJSON_Parse to get cJSON to parse for you, and then you can take +the root object, and traverse the structure (which is, formally, an N-tree), +and tokenise as you please. If you wanted to build a callback style parser, this is how +you'd do it (just an example, since these things are very specific): + +void parse_and_callback(cJSON *item,const char *prefix) +{ + while (item) + { + char *newprefix=malloc(strlen(prefix)+strlen(item->name)+2); + sprintf(newprefix,"%s/%s",prefix,item->name); + int dorecurse=callback(newprefix, item->type, item); + if (item->child && dorecurse) parse_and_callback(item->child,newprefix); + item=item->next; + free(newprefix); + } +} + +The prefix process will build you a separated list, to simplify your callback handling. +The 'dorecurse' flag would let the callback decide to handle sub-arrays on it's own, or +let you invoke it per-item. For the item above, your callback might look like this: + +int callback(const char *name,int type,cJSON *item) +{ + if (!strcmp(name,"name")) { /* populate name */ } + else if (!strcmp(name,"format/type") { /* handle "rect" */ } + else if (!strcmp(name,"format/width") { /* 800 */ } + else if (!strcmp(name,"format/height") { /* 600 */ } + else if (!strcmp(name,"format/interlace") { /* false */ } + else if (!strcmp(name,"format/frame rate") { /* 24 */ } + return 1; +} + +Alternatively, you might like to parse iteratively. +You'd use: + +void parse_object(cJSON *item) +{ + int i; for (i=0;ichild; + while (subitem) + { + // handle subitem + if (subitem->child) parse_object(subitem->child); + + subitem=subitem->next; + } +} + +Of course, this should look familiar, since this is just a stripped-down version +of the callback-parser. + +This should cover most uses you'll find for parsing. The rest should be possible +to infer.. and if in doubt, read the source! There's not a lot of it! ;) + + +In terms of constructing JSON data, the example code above is the right way to do it. +You can, of course, hand your sub-objects to other functions to populate. +Also, if you find a use for it, you can manually build the objects. +For instance, suppose you wanted to build an array of objects? + +cJSON *objects[24]; + +cJSON *Create_array_of_anything(cJSON **items,int num) +{ + int i;cJSON *prev, *root=cJSON_CreateArray(); + for (i=0;i<24;i++) + { + if (!i) root->child=objects[i]; + else prev->next=objects[i], objects[i]->prev=prev; + prev=objects[i]; + } + return root; +} + +and simply: Create_array_of_anything(objects,24); + +cJSON doesn't make any assumptions about what order you create things in. +You can attach the objects, as above, and later add children to each +of those objects. + +As soon as you call cJSON_Print, it renders the structure to text. + + + +The test.c code shows how to handle a bunch of typical cases. If you uncomment +the code, it'll load, parse and print a bunch of test files, also from json.org, +which are more complex than I'd care to try and stash into a const char array[]. + + +Enjoy cJSON! + + +- Dave Gamble, Aug 2009 diff --git a/plugins/omelasticsearch/cJSON/cjson.c b/plugins/omelasticsearch/cJSON/cjson.c new file mode 100644 index 00000000..99a831e9 --- /dev/null +++ b/plugins/omelasticsearch/cJSON/cjson.c @@ -0,0 +1,514 @@ +/* + Copyright (c) 2009 Dave Gamble + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +/* cJSON */ +/* JSON parser in C. */ + +#include +#include +#include +#include +#include +#include +#include +#include "cjson.h" + +static const char *ep; + +const char *cJSON_GetErrorPtr() {return ep;} + +static int cJSON_strcasecmp(const char *s1,const char *s2) +{ + if (!s1) return (s1==s2)?0:1;if (!s2) return 1; + for(; tolower(*s1) == tolower(*s2); ++s1, ++s2) if(*s1 == 0) return 0; + return tolower(*(const unsigned char *)s1) - tolower(*(const unsigned char *)s2); +} + +static void *(*cJSON_malloc)(size_t sz) = malloc; +static void (*cJSON_free)(void *ptr) = free; + +static char* cJSON_strdup(const char* str) +{ + size_t len; + char* copy; + + len = strlen(str) + 1; + if (!(copy = (char*)cJSON_malloc(len))) return 0; + memcpy(copy,str,len); + return copy; +} + +void cJSON_InitHooks(cJSON_Hooks* hooks) +{ + if (!hooks) { /* Reset hooks */ + cJSON_malloc = malloc; + cJSON_free = free; + return; + } + + cJSON_malloc = (hooks->malloc_fn)?hooks->malloc_fn:malloc; + cJSON_free = (hooks->free_fn)?hooks->free_fn:free; +} + +/* Internal constructor. */ +static cJSON *cJSON_New_Item() +{ + cJSON* node = (cJSON*)cJSON_malloc(sizeof(cJSON)); + if (node) memset(node,0,sizeof(cJSON)); + return node; +} + +/* Delete a cJSON structure. */ +void cJSON_Delete(cJSON *c) +{ + cJSON *next; + while (c) + { + next=c->next; + if (!(c->type&cJSON_IsReference) && c->child) cJSON_Delete(c->child); + if (!(c->type&cJSON_IsReference) && c->valuestring) cJSON_free(c->valuestring); + if (c->string) cJSON_free(c->string); + cJSON_free(c); + c=next; + } +} + +/* Parse the input text to generate a number, and populate the result into item. */ +static const char *parse_number(cJSON *item,const char *num) +{ + double n=0,sign=1,scale=0;int subscale=0,signsubscale=1; + + /* Could use sscanf for this? */ + if (*num=='-') sign=-1,num++; /* Has sign? */ + if (*num=='0') num++; /* is zero */ + if (*num>='1' && *num<='9') do n=(n*10.0)+(*num++ -'0'); while (*num>='0' && *num<='9'); /* Number? */ + if (*num=='.' && num[1]>='0' && num[1]<='9') {num++; do n=(n*10.0)+(*num++ -'0'),scale--; while (*num>='0' && *num<='9');} /* Fractional part? */ + if (*num=='e' || *num=='E') /* Exponent? */ + { num++;if (*num=='+') num++; else if (*num=='-') signsubscale=-1,num++; /* With sign? */ + while (*num>='0' && *num<='9') subscale=(subscale*10)+(*num++ - '0'); /* Number? */ + } + + n=sign*n*pow(10.0,(scale+subscale*signsubscale)); /* number = +/- number.fraction * 10^+/- exponent */ + + item->valuedouble=n; + item->valueint=(int)n; + item->type=cJSON_Number; + return num; +} + +/* Render the number nicely from the given item into a string. */ +char *cJSON_print_number(cJSON *item) +{ + char *str; + double d=item->valuedouble; + if (fabs(((double)item->valueint)-d)<=DBL_EPSILON && d<=INT_MAX && d>=INT_MIN) + { + str=(char*)cJSON_malloc(21); /* 2^64+1 can be represented in 21 chars. */ + if (str) sprintf(str,"%d",item->valueint); + } + else + { + str=(char*)cJSON_malloc(64); /* This is a nice tradeoff. */ + if (str) + { + if (fabs(floor(d)-d)<=DBL_EPSILON) sprintf(str,"%.0f",d); + else if (fabs(d)<1.0e-6 || fabs(d)>1.0e9) sprintf(str,"%e",d); + else sprintf(str,"%f",d); + } + } + return str; +} + +/* Parse the input text into an unescaped cstring, and populate item. */ +static const unsigned char firstByteMark[7] = { 0x00, 0x00, 0xC0, 0xE0, 0xF0, 0xF8, 0xFC }; +static const char *parse_string(cJSON *item,const char *str) +{ + const char *ptr=str+1;char *ptr2;char *out;int len=0;unsigned uc,uc2; + if (*str!='\"') {ep=str;return 0;} /* not a string! */ + + while (*ptr!='\"' && *ptr && ++len) if (*ptr++ == '\\') ptr++; /* Skip escaped quotes. */ + + out=(char*)cJSON_malloc(len+1); /* This is how long we need for the string, roughly. */ + if (!out) return 0; + + ptr=str+1;ptr2=out; + while (*ptr!='\"' && *ptr) + { + if (*ptr!='\\') *ptr2++=*ptr++; + else + { + ptr++; + switch (*ptr) + { + case 'b': *ptr2++='\b'; break; + case 'f': *ptr2++='\f'; break; + case 'n': *ptr2++='\n'; break; + case 'r': *ptr2++='\r'; break; + case 't': *ptr2++='\t'; break; + case 'u': /* transcode utf16 to utf8. */ + sscanf(ptr+1,"%4x",&uc);ptr+=4; /* get the unicode char. */ + + if ((uc>=0xDC00 && uc<=0xDFFF) || uc==0) break; // check for invalid. + + if (uc>=0xD800 && uc<=0xDBFF) // UTF16 surrogate pairs. + { + if (ptr[1]!='\\' || ptr[2]!='u') break; // missing second-half of surrogate. + sscanf(ptr+3,"%4x",&uc2);ptr+=6; + if (uc2<0xDC00 || uc2>0xDFFF) break; // invalid second-half of surrogate. + uc=0x10000 | ((uc&0x3FF)<<10) | (uc2&0x3FF); + } + + len=4;if (uc<0x80) len=1;else if (uc<0x800) len=2;else if (uc<0x10000) len=3; ptr2+=len; + + switch (len) { + case 4: *--ptr2 =((uc | 0x80) & 0xBF); uc >>= 6; + case 3: *--ptr2 =((uc | 0x80) & 0xBF); uc >>= 6; + case 2: *--ptr2 =((uc | 0x80) & 0xBF); uc >>= 6; + case 1: *--ptr2 =(uc | firstByteMark[len]); + } + ptr2+=len; + break; + default: *ptr2++=*ptr; break; + } + ptr++; + } + } + *ptr2=0; + if (*ptr=='\"') ptr++; + item->valuestring=out; + item->type=cJSON_String; + return ptr; +} + +/* Render the cstring provided to an escaped version that can be printed. */ +static char *print_string_ptr(const char *str) +{ + const char *ptr;char *ptr2,*out;int len=0;unsigned char token; + + if (!str) return cJSON_strdup(""); + ptr=str;while ((token=*ptr) && ++len) {if (strchr("\"\\\b\f\n\r\t",token)) len++; else if (token<32) len+=5;ptr++;} + + out=(char*)cJSON_malloc(len+3); + if (!out) return 0; + + ptr2=out;ptr=str; + *ptr2++='\"'; + while (*ptr) + { + if ((unsigned char)*ptr>31 && *ptr!='\"' && *ptr!='\\') *ptr2++=*ptr++; + else + { + *ptr2++='\\'; + switch (token=*ptr++) + { + case '\\': *ptr2++='\\'; break; + case '\"': *ptr2++='\"'; break; + case '\b': *ptr2++='b'; break; + case '\f': *ptr2++='f'; break; + case '\n': *ptr2++='n'; break; + case '\r': *ptr2++='r'; break; + case '\t': *ptr2++='t'; break; + default: sprintf(ptr2,"u%04x",token);ptr2+=5; break; /* escape and print */ + } + } + } + *ptr2++='\"';*ptr2++=0; + return out; +} +/* Invote print_string_ptr (which is useful) on an item. */ +static char *print_string(cJSON *item) {return print_string_ptr(item->valuestring);} + +/* Predeclare these prototypes. */ +static const char *parse_value(cJSON *item,const char *value); +static char *print_value(cJSON *item,int depth,int fmt); +static const char *parse_array(cJSON *item,const char *value); +static char *print_array(cJSON *item,int depth,int fmt); +static const char *parse_object(cJSON *item,const char *value); +static char *print_object(cJSON *item,int depth,int fmt); + +/* Utility to jump whitespace and cr/lf */ +static const char *skip(const char *in) {while (in && *in && (unsigned char)*in<=32) in++; return in;} + +/* Parse an object - create a new root, and populate. */ +cJSON *cJSON_Parse(const char *value) +{ + cJSON *c=cJSON_New_Item(); + ep=0; + if (!c) return 0; /* memory fail */ + + if (!parse_value(c,skip(value))) {cJSON_Delete(c);return 0;} + return c; +} + +/* Render a cJSON item/entity/structure to text. */ +char *cJSON_Print(cJSON *item) {return print_value(item,0,1);} +char *cJSON_PrintUnformatted(cJSON *item) {return print_value(item,0,0);} + +/* Parser core - when encountering text, process appropriately. */ +static const char *parse_value(cJSON *item,const char *value) +{ + if (!value) return 0; /* Fail on null. */ + if (!strncmp(value,"null",4)) { item->type=cJSON_NULL; return value+4; } + if (!strncmp(value,"false",5)) { item->type=cJSON_False; return value+5; } + if (!strncmp(value,"true",4)) { item->type=cJSON_True; item->valueint=1; return value+4; } + if (*value=='\"') { return parse_string(item,value); } + if (*value=='-' || (*value>='0' && *value<='9')) { return parse_number(item,value); } + if (*value=='[') { return parse_array(item,value); } + if (*value=='{') { return parse_object(item,value); } + + ep=value;return 0; /* failure. */ +} + +/* Render a value to text. */ +static char *print_value(cJSON *item,int depth,int fmt) +{ + char *out=0; + if (!item) return 0; + switch ((item->type)&255) + { + case cJSON_NULL: out=cJSON_strdup("null"); break; + case cJSON_False: out=cJSON_strdup("false");break; + case cJSON_True: out=cJSON_strdup("true"); break; + case cJSON_Number: out=cJSON_print_number(item);break; + case cJSON_String: out=print_string(item);break; + case cJSON_Array: out=print_array(item,depth,fmt);break; + case cJSON_Object: out=print_object(item,depth,fmt);break; + } + return out; +} + +/* Build an array from input text. */ +static const char *parse_array(cJSON *item,const char *value) +{ + cJSON *child; + if (*value!='[') {ep=value;return 0;} /* not an array! */ + + item->type=cJSON_Array; + value=skip(value+1); + if (*value==']') return value+1; /* empty array. */ + + item->child=child=cJSON_New_Item(); + if (!item->child) return 0; /* memory fail */ + value=skip(parse_value(child,skip(value))); /* skip any spacing, get the value. */ + if (!value) return 0; + + while (*value==',') + { + cJSON *new_item; + if (!(new_item=cJSON_New_Item())) return 0; /* memory fail */ + child->next=new_item;new_item->prev=child;child=new_item; + value=skip(parse_value(child,skip(value+1))); + if (!value) return 0; /* memory fail */ + } + + if (*value==']') return value+1; /* end of array */ + ep=value;return 0; /* malformed. */ +} + +/* Render an array to text */ +static char *print_array(cJSON *item,int depth,int fmt) +{ + char **entries; + char *out=0,*ptr,*ret;int len=5; + cJSON *child=item->child; + int numentries=0,i=0,fail=0; + + /* How many entries in the array? */ + while (child) numentries++,child=child->next; + /* Allocate an array to hold the values for each */ + entries=(char**)cJSON_malloc(numentries*sizeof(char*)); + if (!entries) return 0; + memset(entries,0,numentries*sizeof(char*)); + /* Retrieve all the results: */ + child=item->child; + while (child && !fail) + { + ret=print_value(child,depth+1,fmt); + entries[i++]=ret; + if (ret) len+=strlen(ret)+2+(fmt?1:0); else fail=1; + child=child->next; + } + + /* If we didn't fail, try to malloc the output string */ + if (!fail) out=(char*)cJSON_malloc(len); + /* If that fails, we fail. */ + if (!out) fail=1; + + /* Handle failure. */ + if (fail) + { + for (i=0;itype=cJSON_Object; + value=skip(value+1); + if (*value=='}') return value+1; /* empty array. */ + + item->child=child=cJSON_New_Item(); + if (!item->child) return 0; + value=skip(parse_string(child,skip(value))); + if (!value) return 0; + child->string=child->valuestring;child->valuestring=0; + if (*value!=':') {ep=value;return 0;} /* fail! */ + value=skip(parse_value(child,skip(value+1))); /* skip any spacing, get the value. */ + if (!value) return 0; + + while (*value==',') + { + cJSON *new_item; + if (!(new_item=cJSON_New_Item())) return 0; /* memory fail */ + child->next=new_item;new_item->prev=child;child=new_item; + value=skip(parse_string(child,skip(value+1))); + if (!value) return 0; + child->string=child->valuestring;child->valuestring=0; + if (*value!=':') {ep=value;return 0;} /* fail! */ + value=skip(parse_value(child,skip(value+1))); /* skip any spacing, get the value. */ + if (!value) return 0; + } + + if (*value=='}') return value+1; /* end of array */ + ep=value;return 0; /* malformed. */ +} + +/* Render an object to text. */ +static char *print_object(cJSON *item,int depth,int fmt) +{ + char **entries=0,**names=0; + char *out=0,*ptr,*ret,*str;int len=7,i=0,j; + cJSON *child=item->child; + int numentries=0,fail=0; + /* Count the number of entries. */ + while (child) numentries++,child=child->next; + /* Allocate space for the names and the objects */ + entries=(char**)cJSON_malloc(numentries*sizeof(char*)); + if (!entries) return 0; + names=(char**)cJSON_malloc(numentries*sizeof(char*)); + if (!names) {cJSON_free(entries);return 0;} + memset(entries,0,sizeof(char*)*numentries); + memset(names,0,sizeof(char*)*numentries); + + /* Collect all the results into our arrays: */ + child=item->child;depth++;if (fmt) len+=depth; + while (child) + { + names[i]=str=print_string_ptr(child->string); + entries[i++]=ret=print_value(child,depth,fmt); + if (str && ret) len+=strlen(ret)+strlen(str)+2+(fmt?2+depth:0); else fail=1; + child=child->next; + } + + /* Try to allocate the output string */ + if (!fail) out=(char*)cJSON_malloc(len); + if (!out) fail=1; + + /* Handle failure */ + if (fail) + { + for (i=0;ichild;int i=0;while(c)i++,c=c->next;return i;} +cJSON *cJSON_GetArrayItem(cJSON *array,int item) {cJSON *c=array->child; while (c && item>0) item--,c=c->next; return c;} +cJSON *cJSON_GetObjectItem(cJSON *object,const char *string) {cJSON *c=object->child; while (c && cJSON_strcasecmp(c->string,string)) c=c->next; return c;} + +/* Utility for array list handling. */ +static void suffix_object(cJSON *prev,cJSON *item) {prev->next=item;item->prev=prev;} +/* Utility for handling references. */ +static cJSON *create_reference(cJSON *item) {cJSON *ref=cJSON_New_Item();if (!ref) return 0;memcpy(ref,item,sizeof(cJSON));ref->string=0;ref->type|=cJSON_IsReference;ref->next=ref->prev=0;return ref;} + +/* Add item to array/object. */ +void cJSON_AddItemToArray(cJSON *array, cJSON *item) {cJSON *c=array->child;if (!item) return; if (!c) {array->child=item;} else {while (c && c->next) c=c->next; suffix_object(c,item);}} +void cJSON_AddItemToObject(cJSON *object,const char *string,cJSON *item) {if (!item) return; if (item->string) cJSON_free(item->string);item->string=cJSON_strdup(string);cJSON_AddItemToArray(object,item);} +void cJSON_AddItemReferenceToArray(cJSON *array, cJSON *item) {cJSON_AddItemToArray(array,create_reference(item));} +void cJSON_AddItemReferenceToObject(cJSON *object,const char *string,cJSON *item) {cJSON_AddItemToObject(object,string,create_reference(item));} + +cJSON *cJSON_DetachItemFromArray(cJSON *array,int which) {cJSON *c=array->child;while (c && which>0) c=c->next,which--;if (!c) return 0; + if (c->prev) c->prev->next=c->next;if (c->next) c->next->prev=c->prev;if (c==array->child) array->child=c->next;c->prev=c->next=0;return c;} +void cJSON_DeleteItemFromArray(cJSON *array,int which) {cJSON_Delete(cJSON_DetachItemFromArray(array,which));} +cJSON *cJSON_DetachItemFromObject(cJSON *object,const char *string) {int i=0;cJSON *c=object->child;while (c && cJSON_strcasecmp(c->string,string)) i++,c=c->next;if (c) return cJSON_DetachItemFromArray(object,i);return 0;} +void cJSON_DeleteItemFromObject(cJSON *object,const char *string) {cJSON_Delete(cJSON_DetachItemFromObject(object,string));} + +/* Replace array/object items with new ones. */ +void cJSON_ReplaceItemInArray(cJSON *array,int which,cJSON *newitem) {cJSON *c=array->child;while (c && which>0) c=c->next,which--;if (!c) return; + newitem->next=c->next;newitem->prev=c->prev;if (newitem->next) newitem->next->prev=newitem; + if (c==array->child) array->child=newitem; else newitem->prev->next=newitem;c->next=c->prev=0;cJSON_Delete(c);} +void cJSON_ReplaceItemInObject(cJSON *object,const char *string,cJSON *newitem){int i=0;cJSON *c=object->child;while(c && cJSON_strcasecmp(c->string,string))i++,c=c->next;if(c){newitem->string=cJSON_strdup(string);cJSON_ReplaceItemInArray(object,i,newitem);}} + +/* Create basic types: */ +cJSON *cJSON_CreateNull() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_NULL;return item;} +cJSON *cJSON_CreateTrue() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_True;return item;} +cJSON *cJSON_CreateFalse() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_False;return item;} +cJSON *cJSON_CreateBool(int b) {cJSON *item=cJSON_New_Item();if(item)item->type=b?cJSON_True:cJSON_False;return item;} +cJSON *cJSON_CreateNumber(double num) {cJSON *item=cJSON_New_Item();if(item){item->type=cJSON_Number;item->valuedouble=num;item->valueint=(int)num;}return item;} +cJSON *cJSON_CreateString(const char *string) {cJSON *item=cJSON_New_Item();if(item){item->type=cJSON_String;item->valuestring=cJSON_strdup(string);}return item;} +cJSON *cJSON_CreateArray() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_Array;return item;} +cJSON *cJSON_CreateObject() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_Object;return item;} + +/* Create Arrays: */ +cJSON *cJSON_CreateIntArray(int *numbers,int count) {int i;cJSON *n=0,*p=0,*a=cJSON_CreateArray();for(i=0;a && ichild=n;else suffix_object(p,n);p=n;}return a;} +cJSON *cJSON_CreateFloatArray(float *numbers,int count) {int i;cJSON *n=0,*p=0,*a=cJSON_CreateArray();for(i=0;a && ichild=n;else suffix_object(p,n);p=n;}return a;} +cJSON *cJSON_CreateDoubleArray(double *numbers,int count) {int i;cJSON *n=0,*p=0,*a=cJSON_CreateArray();for(i=0;a && ichild=n;else suffix_object(p,n);p=n;}return a;} +cJSON *cJSON_CreateStringArray(const char **strings,int count) {int i;cJSON *n=0,*p=0,*a=cJSON_CreateArray();for(i=0;a && ichild=n;else suffix_object(p,n);p=n;}return a;} diff --git a/plugins/omelasticsearch/cJSON/cjson.h b/plugins/omelasticsearch/cJSON/cjson.h new file mode 100644 index 00000000..a621720c --- /dev/null +++ b/plugins/omelasticsearch/cJSON/cjson.h @@ -0,0 +1,130 @@ +/* + Copyright (c) 2009 Dave Gamble + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +#ifndef cJSON__h +#define cJSON__h + +#ifdef __cplusplus +extern "C" +{ +#endif + +/* cJSON Types: */ +#define cJSON_False 0 +#define cJSON_True 1 +#define cJSON_NULL 2 +#define cJSON_Number 3 +#define cJSON_String 4 +#define cJSON_Array 5 +#define cJSON_Object 6 + +#define cJSON_IsReference 256 + +/* The cJSON structure: */ +typedef struct cJSON { + struct cJSON *next,*prev; /* next/prev allow you to walk array/object chains. Alternatively, use GetArraySize/GetArrayItem/GetObjectItem */ + struct cJSON *child; /* An array or object item will have a child pointer pointing to a chain of the items in the array/object. */ + + int type; /* The type of the item, as above. */ + + char *valuestring; /* The item's string, if type==cJSON_String */ + int valueint; /* The item's number, if type==cJSON_Number */ + double valuedouble; /* The item's number, if type==cJSON_Number */ + + char *string; /* The item's name string, if this item is the child of, or is in the list of subitems of an object. */ +} cJSON; + +typedef struct cJSON_Hooks { + void *(*malloc_fn)(size_t sz); + void (*free_fn)(void *ptr); +} cJSON_Hooks; + +/* Supply malloc, realloc and free functions to cJSON */ +extern void cJSON_InitHooks(cJSON_Hooks* hooks); + + +/* Supply a block of JSON, and this returns a cJSON object you can interrogate. Call cJSON_Delete when finished. */ +extern cJSON *cJSON_Parse(const char *value); +/* Render a cJSON entity to text for transfer/storage. Free the char* when finished. */ +extern char *cJSON_Print(cJSON *item); +/* Render a cJSON entity to text for transfer/storage without any formatting. Free the char* when finished. */ +extern char *cJSON_PrintUnformatted(cJSON *item); +/* Delete a cJSON entity and all subentities. */ +extern void cJSON_Delete(cJSON *c); + +/* Returns the number of items in an array (or object). */ +extern int cJSON_GetArraySize(cJSON *array); +/* Retrieve item number "item" from array "array". Returns NULL if unsuccessful. */ +extern cJSON *cJSON_GetArrayItem(cJSON *array,int item); +/* Get item "string" from object. Case insensitive. */ +extern cJSON *cJSON_GetObjectItem(cJSON *object,const char *string); + +/* For analysing failed parses. This returns a pointer to the parse error. You'll probably need to look a few chars back to make sense of it. Defined when cJSON_Parse() returns 0. 0 when cJSON_Parse() succeeds. */ +extern const char *cJSON_GetErrorPtr(); + +/* These calls create a cJSON item of the appropriate type. */ +extern cJSON *cJSON_CreateNull(); +extern cJSON *cJSON_CreateTrue(); +extern cJSON *cJSON_CreateFalse(); +extern cJSON *cJSON_CreateBool(int b); +extern cJSON *cJSON_CreateNumber(double num); +extern cJSON *cJSON_CreateString(const char *string); +extern cJSON *cJSON_CreateArray(); +extern cJSON *cJSON_CreateObject(); + +/* These utilities create an Array of count items. */ +extern cJSON *cJSON_CreateIntArray(int *numbers,int count); +extern cJSON *cJSON_CreateFloatArray(float *numbers,int count); +extern cJSON *cJSON_CreateDoubleArray(double *numbers,int count); +extern cJSON *cJSON_CreateStringArray(const char **strings,int count); + +/* Append item to the specified array/object. */ +extern void cJSON_AddItemToArray(cJSON *array, cJSON *item); +extern void cJSON_AddItemToObject(cJSON *object,const char *string,cJSON *item); +/* Append reference to item to the specified array/object. Use this when you want to add an existing cJSON to a new cJSON, but don't want to corrupt your existing cJSON. */ +extern void cJSON_AddItemReferenceToArray(cJSON *array, cJSON *item); +extern void cJSON_AddItemReferenceToObject(cJSON *object,const char *string,cJSON *item); + +/* Remove/Detatch items from Arrays/Objects. */ +extern cJSON *cJSON_DetachItemFromArray(cJSON *array,int which); +extern void cJSON_DeleteItemFromArray(cJSON *array,int which); +extern cJSON *cJSON_DetachItemFromObject(cJSON *object,const char *string); +extern void cJSON_DeleteItemFromObject(cJSON *object,const char *string); + +/* Update array items. */ +extern void cJSON_ReplaceItemInArray(cJSON *array,int which,cJSON *newitem); +extern void cJSON_ReplaceItemInObject(cJSON *object,const char *string,cJSON *newitem); + +/* rger: added helpers */ + +char *cJSON_print_number(cJSON *item); +#define cJSON_AddNullToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateNull()) +#define cJSON_AddTrueToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateTrue()) +#define cJSON_AddFalseToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateFalse()) +#define cJSON_AddNumberToObject(object,name,n) cJSON_AddItemToObject(object, name, cJSON_CreateNumber(n)) +#define cJSON_AddStringToObject(object,name,s) cJSON_AddItemToObject(object, name, cJSON_CreateString(s)) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/plugins/omelasticsearch/cJSON/test.c b/plugins/omelasticsearch/cJSON/test.c new file mode 100644 index 00000000..2cab632a --- /dev/null +++ b/plugins/omelasticsearch/cJSON/test.c @@ -0,0 +1,156 @@ +/* + Copyright (c) 2009 Dave Gamble + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +#include +#include +#include "cJSON.h" + +/* Parse text to JSON, then render back to text, and print! */ +void doit(char *text) +{ + char *out;cJSON *json; + + json=cJSON_Parse(text); + if (!json) {printf("Error before: [%s]\n",cJSON_GetErrorPtr());} + else + { + out=cJSON_Print(json); + cJSON_Delete(json); + printf("%s\n",out); + free(out); + } +} + +/* Read a file, parse, render back, etc. */ +void dofile(char *filename) +{ + FILE *f=fopen(filename,"rb");fseek(f,0,SEEK_END);long len=ftell(f);fseek(f,0,SEEK_SET); + char *data=malloc(len+1);fread(data,1,len,f);fclose(f); + doit(data); + free(data); +} + +/* Used by some code below as an example datatype. */ +struct record {const char *precision;double lat,lon;const char *address,*city,*state,*zip,*country; }; + +/* Create a bunch of objects as demonstration. */ +void create_objects() +{ + cJSON *root,*fmt,*img,*thm,*fld;char *out;int i; /* declare a few. */ + + /* Here we construct some JSON standards, from the JSON site. */ + + /* Our "Video" datatype: */ + root=cJSON_CreateObject(); + cJSON_AddItemToObject(root, "name", cJSON_CreateString("Jack (\"Bee\") Nimble")); + cJSON_AddItemToObject(root, "format", fmt=cJSON_CreateObject()); + cJSON_AddStringToObject(fmt,"type", "rect"); + cJSON_AddNumberToObject(fmt,"width", 1920); + cJSON_AddNumberToObject(fmt,"height", 1080); + cJSON_AddFalseToObject (fmt,"interlace"); + cJSON_AddNumberToObject(fmt,"frame rate", 24); + + out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out); /* Print to text, Delete the cJSON, print it, release the string. + + /* Our "days of the week" array: */ + const char *strings[7]={"Sunday","Monday","Tuesday","Wednesday","Thursday","Friday","Saturday"}; + root=cJSON_CreateStringArray(strings,7); + + out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out); + + /* Our matrix: */ + int numbers[3][3]={{0,-1,0},{1,0,0},{0,0,1}}; + root=cJSON_CreateArray(); + for (i=0;i<3;i++) cJSON_AddItemToArray(root,cJSON_CreateIntArray(numbers[i],3)); + +/* cJSON_ReplaceItemInArray(root,1,cJSON_CreateString("Replacement")); */ + + out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out); + + + /* Our "gallery" item: */ + int ids[4]={116,943,234,38793}; + root=cJSON_CreateObject(); + cJSON_AddItemToObject(root, "Image", img=cJSON_CreateObject()); + cJSON_AddNumberToObject(img,"Width",800); + cJSON_AddNumberToObject(img,"Height",600); + cJSON_AddStringToObject(img,"Title","View from 15th Floor"); + cJSON_AddItemToObject(img, "Thumbnail", thm=cJSON_CreateObject()); + cJSON_AddStringToObject(thm, "Url", "http:/*www.example.com/image/481989943"); + cJSON_AddNumberToObject(thm,"Height",125); + cJSON_AddStringToObject(thm,"Width","100"); + cJSON_AddItemToObject(img,"IDs", cJSON_CreateIntArray(ids,4)); + + out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out); + + /* Our array of "records": */ + struct record fields[2]={ + {"zip",37.7668,-1.223959e+2,"","SAN FRANCISCO","CA","94107","US"}, + {"zip",37.371991,-1.22026e+2,"","SUNNYVALE","CA","94085","US"}}; + + root=cJSON_CreateArray(); + for (i=0;i<2;i++) + { + cJSON_AddItemToArray(root,fld=cJSON_CreateObject()); + cJSON_AddStringToObject(fld, "precision", fields[i].precision); + cJSON_AddNumberToObject(fld, "Latitude", fields[i].lat); + cJSON_AddNumberToObject(fld, "Longitude", fields[i].lon); + cJSON_AddStringToObject(fld, "Address", fields[i].address); + cJSON_AddStringToObject(fld, "City", fields[i].city); + cJSON_AddStringToObject(fld, "State", fields[i].state); + cJSON_AddStringToObject(fld, "Zip", fields[i].zip); + cJSON_AddStringToObject(fld, "Country", fields[i].country); + } + +/* cJSON_ReplaceItemInObject(cJSON_GetArrayItem(root,1),"City",cJSON_CreateIntArray(ids,4)); */ + + out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out); + +} + +int main (int argc, const char * argv[]) { + /* a bunch of json: */ + char text1[]="{\n\"name\": \"Jack (\\\"Bee\\\") Nimble\", \n\"format\": {\"type\": \"rect\", \n\"width\": 1920, \n\"height\": 1080, \n\"interlace\": false,\"frame rate\": 24\n}\n}"; + char text2[]="[\"Sunday\", \"Monday\", \"Tuesday\", \"Wednesday\", \"Thursday\", \"Friday\", \"Saturday\"]"; + char text3[]="[\n [0, -1, 0],\n [1, 0, 0],\n [0, 0, 1]\n ]\n"; + char text4[]="{\n \"Image\": {\n \"Width\": 800,\n \"Height\": 600,\n \"Title\": \"View from 15th Floor\",\n \"Thumbnail\": {\n \"Url\": \"http:/*www.example.com/image/481989943\",\n \"Height\": 125,\n \"Width\": \"100\"\n },\n \"IDs\": [116, 943, 234, 38793]\n }\n }"; + char text5[]="[\n {\n \"precision\": \"zip\",\n \"Latitude\": 37.7668,\n \"Longitude\": -122.3959,\n \"Address\": \"\",\n \"City\": \"SAN FRANCISCO\",\n \"State\": \"CA\",\n \"Zip\": \"94107\",\n \"Country\": \"US\"\n },\n {\n \"precision\": \"zip\",\n \"Latitude\": 37.371991,\n \"Longitude\": -122.026020,\n \"Address\": \"\",\n \"City\": \"SUNNYVALE\",\n \"State\": \"CA\",\n \"Zip\": \"94085\",\n \"Country\": \"US\"\n }\n ]"; + + /* Process each json textblock by parsing, then rebuilding: */ + doit(text1); + doit(text2); + doit(text3); + doit(text4); + doit(text5); + + /* Parse standard testfiles: +/* dofile("../../tests/test1"); */ +/* dofile("../../tests/test2"); */ +/* dofile("../../tests/test3"); */ +/* dofile("../../tests/test4"); */ +/* dofile("../../tests/test5"); */ + + /* Now some samplecode for building objects concisely: */ + create_objects(); + + return 0; +} diff --git a/plugins/omelasticsearch/cJSON/tests/test1 b/plugins/omelasticsearch/cJSON/tests/test1 new file mode 100644 index 00000000..eacfbf5e --- /dev/null +++ b/plugins/omelasticsearch/cJSON/tests/test1 @@ -0,0 +1,22 @@ +{ + "glossary": { + "title": "example glossary", + "GlossDiv": { + "title": "S", + "GlossList": { + "GlossEntry": { + "ID": "SGML", + "SortAs": "SGML", + "GlossTerm": "Standard Generalized Markup Language", + "Acronym": "SGML", + "Abbrev": "ISO 8879:1986", + "GlossDef": { + "para": "A meta-markup language, used to create markup languages such as DocBook.", + "GlossSeeAlso": ["GML", "XML"] + }, + "GlossSee": "markup" + } + } + } + } +} diff --git a/plugins/omelasticsearch/cJSON/tests/test2 b/plugins/omelasticsearch/cJSON/tests/test2 new file mode 100644 index 00000000..5600991a --- /dev/null +++ b/plugins/omelasticsearch/cJSON/tests/test2 @@ -0,0 +1,11 @@ +{"menu": { + "id": "file", + "value": "File", + "popup": { + "menuitem": [ + {"value": "New", "onclick": "CreateNewDoc()"}, + {"value": "Open", "onclick": "OpenDoc()"}, + {"value": "Close", "onclick": "CloseDoc()"} + ] + } +}} diff --git a/plugins/omelasticsearch/cJSON/tests/test3 b/plugins/omelasticsearch/cJSON/tests/test3 new file mode 100644 index 00000000..5662b377 --- /dev/null +++ b/plugins/omelasticsearch/cJSON/tests/test3 @@ -0,0 +1,26 @@ +{"widget": { + "debug": "on", + "window": { + "title": "Sample Konfabulator Widget", + "name": "main_window", + "width": 500, + "height": 500 + }, + "image": { + "src": "Images/Sun.png", + "name": "sun1", + "hOffset": 250, + "vOffset": 250, + "alignment": "center" + }, + "text": { + "data": "Click Here", + "size": 36, + "style": "bold", + "name": "text1", + "hOffset": 250, + "vOffset": 100, + "alignment": "center", + "onMouseUp": "sun1.opacity = (sun1.opacity / 100) * 90;" + } +}} \ No newline at end of file diff --git a/plugins/omelasticsearch/cJSON/tests/test4 b/plugins/omelasticsearch/cJSON/tests/test4 new file mode 100644 index 00000000..d540b57f --- /dev/null +++ b/plugins/omelasticsearch/cJSON/tests/test4 @@ -0,0 +1,88 @@ +{"web-app": { + "servlet": [ + { + "servlet-name": "cofaxCDS", + "servlet-class": "org.cofax.cds.CDSServlet", + "init-param": { + "configGlossary:installationAt": "Philadelphia, PA", + "configGlossary:adminEmail": "ksm@pobox.com", + "configGlossary:poweredBy": "Cofax", + "configGlossary:poweredByIcon": "/images/cofax.gif", + "configGlossary:staticPath": "/content/static", + "templateProcessorClass": "org.cofax.WysiwygTemplate", + "templateLoaderClass": "org.cofax.FilesTemplateLoader", + "templatePath": "templates", + "templateOverridePath": "", + "defaultListTemplate": "listTemplate.htm", + "defaultFileTemplate": "articleTemplate.htm", + "useJSP": false, + "jspListTemplate": "listTemplate.jsp", + "jspFileTemplate": "articleTemplate.jsp", + "cachePackageTagsTrack": 200, + "cachePackageTagsStore": 200, + "cachePackageTagsRefresh": 60, + "cacheTemplatesTrack": 100, + "cacheTemplatesStore": 50, + "cacheTemplatesRefresh": 15, + "cachePagesTrack": 200, + "cachePagesStore": 100, + "cachePagesRefresh": 10, + "cachePagesDirtyRead": 10, + "searchEngineListTemplate": "forSearchEnginesList.htm", + "searchEngineFileTemplate": "forSearchEngines.htm", + "searchEngineRobotsDb": "WEB-INF/robots.db", + "useDataStore": true, + "dataStoreClass": "org.cofax.SqlDataStore", + "redirectionClass": "org.cofax.SqlRedirection", + "dataStoreName": "cofax", + "dataStoreDriver": "com.microsoft.jdbc.sqlserver.SQLServerDriver", + "dataStoreUrl": "jdbc:microsoft:sqlserver://LOCALHOST:1433;DatabaseName=goon", + "dataStoreUser": "sa", + "dataStorePassword": "dataStoreTestQuery", + "dataStoreTestQuery": "SET NOCOUNT ON;select test='test';", + "dataStoreLogFile": "/usr/local/tomcat/logs/datastore.log", + "dataStoreInitConns": 10, + "dataStoreMaxConns": 100, + "dataStoreConnUsageLimit": 100, + "dataStoreLogLevel": "debug", + "maxUrlLength": 500}}, + { + "servlet-name": "cofaxEmail", + "servlet-class": "org.cofax.cds.EmailServlet", + "init-param": { + "mailHost": "mail1", + "mailHostOverride": "mail2"}}, + { + "servlet-name": "cofaxAdmin", + "servlet-class": "org.cofax.cds.AdminServlet"}, + + { + "servlet-name": "fileServlet", + "servlet-class": "org.cofax.cds.FileServlet"}, + { + "servlet-name": "cofaxTools", + "servlet-class": "org.cofax.cms.CofaxToolsServlet", + "init-param": { + "templatePath": "toolstemplates/", + "log": 1, + "logLocation": "/usr/local/tomcat/logs/CofaxTools.log", + "logMaxSize": "", + "dataLog": 1, + "dataLogLocation": "/usr/local/tomcat/logs/dataLog.log", + "dataLogMaxSize": "", + "removePageCache": "/content/admin/remove?cache=pages&id=", + "removeTemplateCache": "/content/admin/remove?cache=templates&id=", + "fileTransferFolder": "/usr/local/tomcat/webapps/content/fileTransferFolder", + "lookInContext": 1, + "adminGroupID": 4, + "betaServer": true}}], + "servlet-mapping": { + "cofaxCDS": "/", + "cofaxEmail": "/cofaxutil/aemail/*", + "cofaxAdmin": "/admin/*", + "fileServlet": "/static/*", + "cofaxTools": "/tools/*"}, + + "taglib": { + "taglib-uri": "cofax.tld", + "taglib-location": "/WEB-INF/tlds/cofax.tld"}}} \ No newline at end of file diff --git a/plugins/omelasticsearch/cJSON/tests/test5 b/plugins/omelasticsearch/cJSON/tests/test5 new file mode 100644 index 00000000..49980ca2 --- /dev/null +++ b/plugins/omelasticsearch/cJSON/tests/test5 @@ -0,0 +1,27 @@ +{"menu": { + "header": "SVG Viewer", + "items": [ + {"id": "Open"}, + {"id": "OpenNew", "label": "Open New"}, + null, + {"id": "ZoomIn", "label": "Zoom In"}, + {"id": "ZoomOut", "label": "Zoom Out"}, + {"id": "OriginalView", "label": "Original View"}, + null, + {"id": "Quality"}, + {"id": "Pause"}, + {"id": "Mute"}, + null, + {"id": "Find", "label": "Find..."}, + {"id": "FindAgain", "label": "Find Again"}, + {"id": "Copy"}, + {"id": "CopyAgain", "label": "Copy Again"}, + {"id": "CopySVG", "label": "Copy SVG"}, + {"id": "ViewSVG", "label": "View SVG"}, + {"id": "ViewSource", "label": "View Source"}, + {"id": "SaveAs", "label": "Save As"}, + null, + {"id": "Help"}, + {"id": "About", "label": "About Adobe CVG Viewer..."} + ] +}} diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 48cfef7d..a5833b15 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -37,7 +37,7 @@ #include #include #include -#include "cjson.h" +#include "cJSON/cjson.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" -- cgit v1.2.3 From e5ef73eb25c8dda2e19c593ad2fc0a960aa8873b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 1 Nov 2012 17:05:07 +0100 Subject: bugfix: invalid rsyslog-internal macro API use This had no bad effect, because the macro did the same as the one that should have been used. --- plugins/omelasticsearch/omelasticsearch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a5833b15..982f4318 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -773,7 +773,7 @@ CODESTARTnewActInst if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); - CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) + CODE_STD_STRING_REQUESTnewActInst(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), -- cgit v1.2.3 From 206908e2887cedf78d12d70b29591c52276d685a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 1 Nov 2012 17:05:07 +0100 Subject: bugfix: invalid rsyslog-internal macro API use This had no bad effect, because the macro did the same as the one that should have been used. --- plugins/omelasticsearch/omelasticsearch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f77caeca..50acdf11 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -624,7 +624,7 @@ CODESTARTnewActInst if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); - CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) + CODE_STD_STRING_REQUESTnewActInst(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), -- cgit v1.2.3 From 2ab69c601923374f3d0ecb51e3f8a021d2d7e519 Mon Sep 17 00:00:00 2001 From: Michael Biebl Date: Thu, 13 Dec 2012 16:02:22 +0100 Subject: build: link omelasticsearch against -lm Use LT_LIB_M to find the math library which is needed for pow(). --- plugins/omelasticsearch/Makefile.am | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am index a574c72f..2fadb74d 100644 --- a/plugins/omelasticsearch/Makefile.am +++ b/plugins/omelasticsearch/Makefile.am @@ -3,6 +3,6 @@ pkglib_LTLIBRARIES = omelasticsearch.la omelasticsearch_la_SOURCES = omelasticsearch.c omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) omelasticsearch_la_LDFLAGS = -module -avoid-version -omelasticsearch_la_LIBADD = $(CURL_LIBS) +omelasticsearch_la_LIBADD = $(CURL_LIBS) $(LIBM) EXTRA_DIST = -- cgit v1.2.3 From 3352d2c605567c29840cc93a358d60881c865cb7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 14 Dec 2012 09:20:51 +0100 Subject: minor cleanup --- plugins/omelasticsearch/omelasticsearch.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 982f4318..499a2bb2 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -405,6 +405,8 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON *errRoot; cJSON *req; cJSON *replyRoot = *pReplyRoot; + size_t toWrite; + ssize_t wrRet; char errStr[1024]; DEFiRet; @@ -432,8 +434,16 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON_AddItemToObject(errRoot, "request", req); cJSON_AddItemToObject(errRoot, "reply", replyRoot); rendered = cJSON_Print(errRoot); -DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); - write(pData->fdErrFile, rendered, strlen(rendered)); + /* we do not do real error-handling on the err file, as this finally complicates + * things way to much. + */ + DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); + toWrite = strlen(rendered); + wrRet = write(pData->fdErrFile, rendered, toWrite); + if(wrRet != (ssize_t) toWrite) { + DBGPRINTF("omelasticsearch: error %d writing error file, write returns %lld\n", + errno, (long long) wrRet); + } free(rendered); cJSON_Delete(errRoot); *pReplyRoot = NULL; /* tell caller not to delete once again! */ -- cgit v1.2.3 From 614b1fba0a56629579ec9bf1cc07cc19bba35e0c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sun, 13 Jan 2013 11:30:48 +0100 Subject: bugfix: omelasticsearch failed when authentication data was provided ... at least in most cases it emitted an error message: "snprintf failed when trying to build auth string" Thanks to Joerg Heinemann for alerting us. closes: http://bugzilla.adiscon.com/show_bug.cgi?id=404 --- plugins/omelasticsearch/omelasticsearch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 50acdf11..00e4dbac 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -314,7 +314,7 @@ setCurlURL(instanceData *pData, uchar **tpls) if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, (pData->pwd == NULL) ? "" : (char*)pData->pwd); - if(rLocal != (int) es_strlen(url)) { + if(rLocal < 1) { errmsg.LogError(0, RS_RET_ERR, "omelasticsearch: snprintf failed " "when trying to build auth string (return %d)\n", rLocal); -- cgit v1.2.3 From 2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Renard?= Date: Wed, 24 Apr 2013 10:56:52 +0200 Subject: omelasticsearch: _id field support for bulk operations also max number of templates for plugin use has been increased to five closes: http://bugzilla.adiscon.com/show_bug.cgi?id=392 --- plugins/omelasticsearch/omelasticsearch.c | 122 +++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 17 deletions(-) (limited to 'plugins/omelasticsearch') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f27fe62b..33e58c1a 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -11,11 +11,11 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * -or- * see COPYING.ASL20 in the source distribution - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -79,12 +79,14 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *bulkId; uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; + sbool dynBulkId; sbool bulkmode; sbool asyncRepl; struct { @@ -114,7 +116,9 @@ static struct cnfparamdescr actpdescr[] = { { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 } + { "template", eCmdHdlrGetWord, 1 }, + { "dynbulkid", eCmdHdlrBinary, 0 }, + { "bulkid", eCmdHdlrGetWord, 0 }, }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, @@ -156,6 +160,7 @@ CODESTARTfreeInstance free(pData->timeout); free(pData->restURL); free(pData->errorFile); + free(pData->bulkId); ENDfreeInstance BEGINdbgPrintInstInfo @@ -177,6 +182,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tbulkmode=%d\n", pData->bulkmode); dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? (uchar*)"(not configured)" : pData->errorFile); + dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId); + dbgprintf("\tbulkid='%s'\n", pData->bulkId); ENDdbgPrintInstInfo @@ -220,7 +227,7 @@ checkConn(instanceData *pData) cstr = es_str2cstr(url, NULL); curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - + pData->reply = NULL; pData->replyLen = 0; curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); @@ -250,7 +257,8 @@ ENDtryResume /* get the current index and type for this message */ static inline void getIndexTypeAndParent(instanceData *pData, uchar **tpls, - uchar **srchIndex, uchar **srchType, uchar **parent) + uchar **srchIndex, uchar **srchType, uchar **parent, + uchar **bulkId) { if(pData->dynSrchIdx) { *srchIndex = tpls[1]; @@ -258,15 +266,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[2]; if(pData->dynParent) { *parent = tpls[3]; + if(pData->dynBulkId) { + *bulkId = tpls[4]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } } else { @@ -275,15 +295,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[1]; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[1]; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[1]; + } } } } @@ -297,6 +329,7 @@ setCurlURL(instanceData *pData, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId; es_str_t *url; int rLocal; int r; @@ -308,7 +341,7 @@ setCurlURL(instanceData *pData, uchar **tpls) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); parent = NULL; } else { - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -330,7 +363,7 @@ setCurlURL(instanceData *pData, uchar **tpls) free(pData->restURL); pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); @@ -343,7 +376,7 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: @@ -363,13 +396,15 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId = NULL; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_PARENT "\",\"_parent\":\"" +# define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -380,6 +415,10 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); } + if(bulkId != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -409,7 +448,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) ssize_t wrRet; char errStr[1024]; DEFiRet; - + if(pData->errorFile == NULL) { DBGPRINTF("omelasticsearch: no local error logger defined - " "ignoring ES error information\n"); @@ -524,7 +563,7 @@ checkResult(instanceData *pData, uchar *reqmsg) } /* Note: we ignore errors writing the error file, as we cannot handle - * these in any case. + * these in any case. */ if(iRet == RS_RET_DATAFAIL) { writeDataError(pData, &root, reqmsg); @@ -552,8 +591,8 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) CHKiRet(setCurlURL(pData, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: @@ -649,10 +688,10 @@ curlSetup(instanceData *pData) } header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_POST, 1); + curl_easy_setopt(handle, CURLOPT_POST, 1); pData->curlHandle = handle; pData->postHeader = header; @@ -690,6 +729,8 @@ setInstParamDefaults(instanceData *pData) pData->bulkmode = 0; pData->tplName = NULL; pData->errorFile = NULL; + pData->dynBulkId= 0; + pData->bulkId = NULL; } BEGINnewActInst @@ -737,12 +778,16 @@ CODESTARTnewActInst pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) { + pData->dynBulkId = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkid")) { + pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("omelasticsearch: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); } } - + if(pData->pwd != NULL && pData->uid == NULL) { errmsg.LogError(0, RS_RET_UID_MISSING, "omelasticsearch: password is provided, but no uid " @@ -767,6 +812,12 @@ CODESTARTnewActInst "name for parent template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynBulkId && pData->bulkId == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic bulkid, but no " + "name for bulkid template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -782,6 +833,7 @@ CODESTARTnewActInst if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; + if(pData->dynBulkId) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTnewActInst(iNumTpls) @@ -803,11 +855,29 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 4, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } } else { @@ -817,12 +887,30 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); - } + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } } } -- cgit v1.2.3