From 24179166f748464f4d556a80bd0c8dcc4cbd57b0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 5 Apr 2012 17:26:16 +0200 Subject: omelasticsearch: moved to v6 config system --- plugins/omelasticsearch/omelasticsearch.c | 126 +++++++++++++++++++++--------- 1 file changed, 91 insertions(+), 35 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 3bec1838..8965d40b 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -29,7 +29,7 @@ #include #include #include -#include +//#include #include #include #include @@ -46,6 +46,7 @@ MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omelasticsearch") /* internal structures */ DEF_OMOD_STATIC_DATA @@ -63,15 +64,30 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { + uchar *server; + int port; + uchar *searchIndex; + uchar *searchType; + uchar *tplName; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; -/* config variables */ -static int restPort = 9200; -static char *hostName = "localhost"; -static char *searchIndex = "system"; -static char *searchType = "events"; + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "server", eCmdHdlrGetWord, 0 }, + { "serverport", eCmdHdlrInt, 0 }, + { "searchindex", eCmdHdlrGetWord, 0 }, + { "searchtype", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; BEGINcreateInstance CODESTARTcreateInstance @@ -93,6 +109,10 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + free(pData->server); + free(pData->searchIndex); + free(pData->searchType); + free(pData->tplName); ENDfreeInstance BEGINdbgPrintInstInfo @@ -174,7 +194,7 @@ curlSetup(instanceData *instance) } snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - hostName, restPort, searchIndex, searchType); + instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); @@ -189,25 +209,77 @@ curlSetup(instanceData *instance) return RS_RET_OK; } -BEGINparseSelectorAct -CODESTARTparseSelectorAct -CODE_STD_STRING_REQUESTparseSelectorAct(1) - if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { - ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->server = NULL; + pData->port = 9200; + pData->searchIndex = NULL; + pData->searchType = NULL; + pData->tplName = NULL; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } - p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + CODE_STD_STRING_REQUESTparseSelectorAct(1) + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "server")) { + pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "serverport")) { + pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("omelasticsearch: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + " StdJSONFmt" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); - /* check if a non-standard template is to be applied */ - if(*(p-1) == ';') - --p; - CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + if(pData->server == NULL) + pData->server = (uchar*) strdup("localhost"); + if(pData->searchIndex == NULL) + pData->searchIndex = (uchar*) strdup("system"); + if(pData->searchType == NULL) + pData->searchType = (uchar*) strdup("events"); - /* all good, we can now initialise our private data */ +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); CHKiRet(curlSetup(pData)); +ENDnewActInst + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch supports only v6 config format, use: " + "action(type=\"omelasticsearch\" server=...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct + BEGINmodExit CODESTARTmodExit curl_global_cleanup(); @@ -220,18 +292,9 @@ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt -static rsRetVal -resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) -{ - DEFiRet; - restPort = 9200; - hostName = "localhost"; - searchIndex = "system"; - searchType = "events"; - RETiRet; -} BEGINmodInit() CODESTARTmodInit @@ -240,13 +303,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); - /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); - if (curl_global_init(CURL_GLOBAL_ALL) != 0) { errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); -- cgit v1.2.3 From f9bb245a4ae06151897a4a68d6c22ba88a6ecee8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 13 Apr 2012 12:12:39 +0200 Subject: omelasticsearch: permit dynamic index/type parameters (just like dynafile) --- plugins/omelasticsearch/omelasticsearch.c | 123 ++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 16 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 8965d40b..1143de06 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -43,6 +43,7 @@ #include "errmsg.h" #include "statsobj.h" #include "cfsysline.h" +#include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -69,6 +70,8 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + sbool dynSrchIdx; + sbool dynSrchType; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -81,6 +84,8 @@ static struct cnfparamdescr actpdescr[] = { { "serverport", eCmdHdlrInt, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, + { "dynsearchindex", eCmdHdlrBinary, 0 }, + { "dynsearchtype", eCmdHdlrBinary, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -123,12 +128,44 @@ BEGINtryResume CODESTARTtryResume ENDtryResume -rsRetVal -curlPost(instanceData *instance, uchar *message) + +static rsRetVal +setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + uchar *searchIndex; + uchar *searchType; + + if(pData->dynSrchIdx) { + searchIndex = tpl1; + if(pData->dynSrchType) + searchType = tpl2; + else + searchType = pData->searchType; + } else { + searchIndex = pData->searchIndex; + if(pData->dynSrchType) + searchType = tpl1; + else + searchType = pData->searchType; + } + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); + return RS_RET_OK; +} + +static rsRetVal +curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; int length = strlen((char *)message); + DEFiRet; + + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); @@ -145,11 +182,13 @@ curlPost(instanceData *instance, uchar *message) STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); return RS_RET_OK; } +finalize_it: + RETiRet; } BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0])); + CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction @@ -181,10 +220,10 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) return size * nmemb; } + static rsRetVal -curlSetup(instanceData *instance) +curlSetup(instanceData *pData) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ HEADER *header; CURL *handle; @@ -193,19 +232,26 @@ curlSetup(instanceData *instance) return RS_RET_OBJ_CREATION_FAILED; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - instance->server, instance->port, instance->searchIndex, instance->searchType); header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); - curl_easy_setopt(handle, CURLOPT_URL, restURL); curl_easy_setopt(handle, CURLOPT_POST, 1); - instance->curlHandle = handle; - instance->postHeader = header; + pData->curlHandle = handle; + pData->postHeader = header; - DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { + /* in this case, we know no tpls are involved --> NULL OK! */ + setCurlURL(pData, NULL, NULL); + } + + if(Debug) { + if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) + dbgprintf("omelasticsearch setup, using static REST URL\n"); + else + dbgprintf("omelasticsearch setup, we have a dynamic REST URL\n"); + } return RS_RET_OK; } @@ -216,12 +262,15 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->dynSrchIdx = 0; + pData->dynSrchType = 0; pData->tplName = NULL; } BEGINnewActInst struct cnfparamvals *pvals; int i; + int iNumTpls; CODESTARTnewActInst if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); @@ -230,7 +279,6 @@ CODESTARTnewActInst CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); - CODE_STD_STRING_REQUESTparseSelectorAct(1) for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; @@ -238,10 +286,14 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; - } else if(!strcmp(actpblk.descr[i].name, "searchIndex")) { + } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "searchType")) { + } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { + pData->dynSrchIdx = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { + pData->dynSrchType = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { @@ -249,11 +301,49 @@ CODESTARTnewActInst "param '%s'\n", actpblk.descr[i].name); } } + + if(pData->dynSrchIdx && pData->searchIndex == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search index, but no " + "name for index template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + if(pData->dynSrchType && pData->searchType == NULL) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omelasticsearch: requested dynamic search type, but no " + "name for type template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); + } + + iNumTpls = 1; + if(pData->dynSrchIdx) ++iNumTpls; + if(pData->dynSrchType) ++iNumTpls; + DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); + CODE_STD_STRING_REQUESTparseSelectorAct(iNumTpls) CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); + + /* we need to request additional templates. If we have a dynamic search index, + * it will always be string 1. Type may be 1 or 2, depending on whether search + * index is dynamic as well. Rule needs to be followed throughout the module. + */ + if(pData->dynSrchIdx) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + } + } + if(pData->server == NULL) pData->server = (uchar*) strdup("localhost"); if(pData->searchIndex == NULL) @@ -261,9 +351,10 @@ CODESTARTnewActInst if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); + CHKiRet(curlSetup(pData)); + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); - CHKiRet(curlSetup(pData)); ENDnewActInst -- cgit v1.2.3 From 7d8ac48a4a214da917f7455f24b4c292c3e6d015 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 16 Apr 2012 16:49:07 +0200 Subject: omelasticsearch: added asnycRepl and timeout config params --- plugins/omelasticsearch/omelasticsearch.c | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1143de06..0b2e0bf1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h for more specifics! * * Copyright 2011 Nathan Scott. - * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -70,8 +70,10 @@ typedef struct _instanceData { uchar *searchIndex; uchar *searchType; uchar *tplName; + uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -86,6 +88,8 @@ static struct cnfparamdescr actpdescr[] = { { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "asyncrepl", eCmdHdlrBinary, 0 }, + { "timeout", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -149,8 +153,21 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) else searchType = pData->searchType; } - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + if(pData->asyncRepl) { + if(pData->timeout != NULL) { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async&timeout=%s", + pData->server, pData->port, searchIndex, searchType, + pData->timeout); + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + "replication=async", + pData->server, pData->port, searchIndex, searchType); + } + } else { + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + pData->server, pData->port, searchIndex, searchType); + } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); return RS_RET_OK; @@ -262,8 +279,10 @@ setInstParamDefaults(instanceData *pData) pData->port = 9200; pData->searchIndex = NULL; pData->searchType = NULL; + pData->timeout = NULL; pData->dynSrchIdx = 0; pData->dynSrchType = 0; + pData->asyncRepl = 0; pData->tplName = NULL; } @@ -294,6 +313,10 @@ CODESTARTnewActInst pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "timeout")) { + pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { + pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { -- cgit v1.2.3 From 54966fd878bcf1c52019e0ec977da4d7b0a9f52a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Apr 2012 15:05:01 +0200 Subject: omelasticsearch: provide authentication support (UNTESTED) --- plugins/omelasticsearch/omelasticsearch.c | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 79a24ffe..c20c67c1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -66,6 +66,8 @@ typedef struct curl_slist HEADER; typedef struct _instanceData { uchar *server; int port; + uchar *uid; + uchar *pwd; uchar *searchIndex; uchar *searchType; uchar *tplName; @@ -83,6 +85,8 @@ typedef struct _instanceData { static struct cnfparamdescr actpdescr[] = { { "server", eCmdHdlrGetWord, 0 }, { "serverport", eCmdHdlrInt, 0 }, + { "uid", eCmdHdlrGetWord, 0 }, + { "pwd", eCmdHdlrGetWord, 0 }, { "searchindex", eCmdHdlrGetWord, 0 }, { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, @@ -118,6 +122,8 @@ CODESTARTfreeInstance pData->curlHandle = NULL; } free(pData->server); + free(pData->uid); + free(pData->pwd); free(pData->searchIndex); free(pData->searchType); free(pData->tplName); @@ -125,6 +131,7 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo + dbgprintf("omelasticsearch, target server %s", pData->server); ENDdbgPrintInstInfo BEGINtryResume @@ -136,6 +143,7 @@ static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + char authBuf[1024]; uchar *searchIndex; uchar *searchType; @@ -168,6 +176,14 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) pData->server, pData->port, searchIndex, searchType); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + if(pData->uid != NULL) { + snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, + (pData->pwd == NULL) ? "" : (char*)pData->pwd); + //TODO: create better code, check errors! + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + } DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); return RS_RET_OK; } @@ -276,6 +292,8 @@ setInstParamDefaults(instanceData *pData) { pData->server = NULL; pData->port = 9200; + pData->uid = NULL; + pData->pwd = NULL; pData->searchIndex = NULL; pData->searchType = NULL; pData->timeout = NULL; @@ -304,6 +322,10 @@ CODESTARTnewActInst pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "uid")) { + pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "pwd")) { + pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { @@ -324,6 +346,12 @@ CODESTARTnewActInst } } + if(pData->pwd != NULL && pData->uid == NULL) { + errmsg.LogError(0, RS_RET_UID_MISSING, + "omelasticsearch: password is provided, but no uid " + "- action definition invalid"); + ABORT_FINALIZE(RS_RET_UID_MISSING); + } if(pData->dynSrchIdx && pData->searchIndex == NULL) { errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omelasticsearch: requested dynamic search index, but no " -- cgit v1.2.3 From a4c0d08c78ef27891b192973174b997a0c7c4aa1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 24 May 2012 16:39:39 +0200 Subject: omelasticsearch: added transactional interface & better debug output --- plugins/omelasticsearch/omelasticsearch.c | 37 ++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index c20c67c1..7642d603 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -74,6 +74,7 @@ typedef struct _instanceData { uchar *timeout; sbool dynSrchIdx; sbool dynSrchType; + sbool bulkmode; sbool asyncRepl; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ @@ -91,6 +92,7 @@ static struct cnfparamdescr actpdescr[] = { { "searchtype", eCmdHdlrGetWord, 0 }, { "dynsearchindex", eCmdHdlrBinary, 0 }, { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } @@ -131,7 +133,19 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo - dbgprintf("omelasticsearch, target server %s", pData->server); + dbgprintf("omelasticsearch\n"); + dbgprintf("\ttemplate='%s'\n", pData->tplName); + dbgprintf("\tserver='%s'\n", pData->server); + dbgprintf("\tserverport=%d\n", pData->port); + dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); + dbgprintf("\tpwd=(%s configured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tsearch index='%s'\n", pData->searchIndex); + dbgprintf("\tsearch index='%s'\n", pData->searchType); + dbgprintf("\ttimeout='%s'\n", pData->timeout); + dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); + dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tasync replication=%d\n", pData->asyncRepl); + dbgprintf("\tbulkmode=%d\n", pData->bulkmode); ENDdbgPrintInstInfo BEGINtryResume @@ -218,12 +232,30 @@ finalize_it: RETiRet; } +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omelasticsearch: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); finalize_it: ENDdoAction + +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("elasticsearch: endTransaction\n"); +#if 0 + if(pData->offsSndBuf != 0) { + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + pData->offsSndBuf = 0; + } +#endif +ENDendTransaction + /* elasticsearch POST result string ... useful for debugging */ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) @@ -334,6 +366,8 @@ CODESTARTnewActInst pData->dynSrchIdx = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { + pData->bulkmode = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "timeout")) { pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { @@ -434,6 +468,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt -- cgit v1.2.3 From c7ca67a37586164a028c52a4d0cd9328b09e8697 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 May 2012 08:03:19 +0200 Subject: omelasticsearch: test commit, first shot at bulk interface This obviously does not work correctly. So expect problems if you set bulkmode="on". --- plugins/omelasticsearch/omelasticsearch.c | 96 +++++++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 11 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 7642d603..1705c605 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -76,6 +76,11 @@ typedef struct _instanceData { sbool dynSrchType; sbool bulkmode; sbool asyncRepl; + struct { + es_str_t *data; + uchar *currTpl1; + uchar *currTpl2; + } batch; CURL *curlHandle; /* libcurl session handle */ HEADER *postHeader; /* json POST request info */ } instanceData; @@ -160,7 +165,9 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) char authBuf[1024]; uchar *searchIndex; uchar *searchType; + uchar *bulkpart; + bulkpart = (pData->bulkmode) ? "/_bulk" : ""; if(pData->dynSrchIdx) { searchIndex = tpl1; if(pData->dynSrchType) @@ -176,18 +183,19 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) } if(pData->asyncRepl) { if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async&timeout=%s", pData->server, pData->port, searchIndex, searchType, - pData->timeout); + bulkpart, pData->timeout); } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s?" + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" "replication=async", - pData->server, pData->port, searchIndex, searchType); + pData->server, pData->port, searchIndex, searchType, + bulkpart); } } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", - pData->server, pData->port, searchIndex, searchType); + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", + pData->server, pData->port, searchIndex, searchType, bulkpart); } curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); @@ -202,12 +210,44 @@ setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) return RS_RET_OK; } + +/* this method does not directly submit but builds a batch instead. It + * may submit, if we have dynamic index/type and the current type or + * index changes. + */ +static rsRetVal +buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) +{ + int length = strlen((char *)message); + int r; + DEFiRet; + +#if 0 + if(instance->dynSrchIdx || instance->dynSrchType) + CHKiRet(setCurlURL(instance, tpl1, tpl2)); +#endif + + //if(pData->batch.currTpl1 == NULL + r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); + + if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r != 0) { + DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); + ABORT_FINALIZE(RS_RET_ERR); + } + iRet = RS_RET_DEFER_COMMIT; + +finalize_it: + RETiRet; +} + static rsRetVal -curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) +curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar *tpl2) { CURLcode code; CURL *curl = instance->curlHandle; - int length = strlen((char *)message); DEFiRet; if(instance->dynSrchIdx || instance->dynSrchType) @@ -215,8 +255,9 @@ curlPost(instanceData *instance, uchar *message, uchar *tpl1, uchar *tpl2) curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); +dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -235,25 +276,43 @@ finalize_it: BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("omelasticsearch: beginTransaction\n"); + if(!pData->bulkmode) { + FINALIZE; + } + + es_emptyStr(pData->batch.data); +finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction - CHKiRet(curlPost(pData, ppString[0], ppString[1], ppString[2])); + if(pData->bulkmode) { + CHKiRet(buildBatch(pData, ppString[0], ppString[1], ppString[2])); + } else { + CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + ppString[1], ppString[2])); + } finalize_it: +dbgprintf("omelasticsearch: result doAction: %d\n", iRet); ENDdoAction BEGINendTransaction + char *cstr; CODESTARTendTransaction -dbgprintf("elasticsearch: endTransaction\n"); + cstr = es_str2cstr(pData->batch.data, NULL); + dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), + NULL, NULL)); #if 0 if(pData->offsSndBuf != 0) { iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); pData->offsSndBuf = 0; } #endif +finalize_it: + free(cstr); ENDendTransaction /* elasticsearch POST result string ... useful for debugging */ @@ -266,6 +325,11 @@ curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) static char ok[] = "{\"ok\":true,"; ASSERT(size == 1); +DBGPRINTF("omelasticsearch request: %s\n", jsonData); +DBGPRINTF("omelasticsearch result: "); +for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); +DBGPRINTF("\n"); if (size == 1 && nmemb > sizeof(ok)-1 && @@ -399,6 +463,16 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_LEGA_ACT_NOT_SUPPORTED); } + if(pData->bulkmode) { + pData->batch.currTpl1 = NULL; + pData->batch.currTpl2 = NULL; + if((pData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; -- cgit v1.2.3 From 5f85909b17920172121d2ff8367c8185623f1409 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 May 2012 18:55:29 +0200 Subject: omelasticsearc: milestone, bulk insert basically works dynamic index&type is not yet used, but easy to add (did not manage to get it in today...) --- plugins/omelasticsearch/omelasticsearch.c | 115 +++++++++++++++++------------- 1 file changed, 65 insertions(+), 50 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 1705c605..704c9950 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -158,46 +158,67 @@ CODESTARTtryResume ENDtryResume +/* get the current index and type for this message */ +static inline void +getIndexAndType(instanceData *pData, uchar *tpl1, uchar *tpl2, uchar **srchIndex, + uchar **srchType) +{ + if(pData->dynSrchIdx) { + *srchIndex = tpl1; + if(pData->dynSrchType) + *srchType = tpl2; + else + *srchType = pData->searchType; + } else { + *srchIndex = pData->searchIndex; + if(pData->dynSrchType) + *srchType = tpl1; + else + *srchType = pData->searchType; + } +} + + static rsRetVal setCurlURL(instanceData *pData, uchar *tpl1, uchar *tpl2) { - char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ char authBuf[1024]; + char portBuf[64]; + char *restURL; uchar *searchIndex; uchar *searchType; - uchar *bulkpart; + es_str_t *url; + int r; - bulkpart = (pData->bulkmode) ? "/_bulk" : ""; - if(pData->dynSrchIdx) { - searchIndex = tpl1; - if(pData->dynSrchType) - searchType = tpl2; - else - searchType = pData->searchType; + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); + url = es_newStr(128); + snprintf(portBuf, sizeof(portBuf), "%d", pData->port); + + r = es_addBuf(&url, "http://", sizeof("http://")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->server, strlen((char*)pData->server)); + if(r == 0) r = es_addChar(&url, ':'); + if(r == 0) r = es_addBuf(&url, portBuf, strlen(portBuf)); + if(r == 0) r = es_addChar(&url, '/'); + if(pData->bulkmode) { + if(r == 0) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); } else { - searchIndex = pData->searchIndex; - if(pData->dynSrchType) - searchType = tpl1; - else - searchType = pData->searchType; + if(r == 0) r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + if(r == 0) r = es_addChar(&url, '/'); + if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); } + if(r == 0) r = es_addChar(&url, '?'); if(pData->asyncRepl) { - if(pData->timeout != NULL) { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async&timeout=%s", - pData->server, pData->port, searchIndex, searchType, - bulkpart, pData->timeout); - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s?" - "replication=async", - pData->server, pData->port, searchIndex, searchType, - bulkpart); - } - } else { - snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s%s", - pData->server, pData->port, searchIndex, searchType, bulkpart); + if(r == 0) r = es_addBuf(&url, "replication=async&", + sizeof("replication=async&")-1); } + if(pData->timeout != NULL) { + if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + } + restURL = es_str2cstr(url, NULL); curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + es_deleteStr(url); + free(restURL); if(pData->uid != NULL) { snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -221,18 +242,20 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) int length = strlen((char *)message); int r; DEFiRet; - -#if 0 - if(instance->dynSrchIdx || instance->dynSrchType) - CHKiRet(setCurlURL(instance, tpl1, tpl2)); -#endif - - //if(pData->batch.currTpl1 == NULL - r = es_addBuf(&pData->batch.data, "{\"index\":", sizeof("{\"index\":")-1); +# define META_STRT "{\"index\":{\"_index\": \"" +# define META_TYPE "\",\"_type\":\"" +# define META_END "\"}}\n" + +#warning TODO: use dynamic index/type! + r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, + ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, + ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "}\n", sizeof("}\n")-1); - - if(r == 0) r = es_addBuf(&pData->batch.data, "{ \"field1\" : \"value1\" }\n", sizeof("{ \"field1\" : \"value1\" }\n")-1); + if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); @@ -257,7 +280,6 @@ curlPost(instanceData *instance, uchar *message, int msglen, uchar *tpl1, uchar curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); -dbgprintf("curl_easy_perform result: %d\n", code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -303,14 +325,7 @@ BEGINendTransaction CODESTARTendTransaction cstr = es_str2cstr(pData->batch.data, NULL); dbgprintf("elasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), - NULL, NULL)); -#if 0 - if(pData->offsSndBuf != 0) { - iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); - pData->offsSndBuf = 0; - } -#endif + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, NULL)); finalize_it: free(cstr); ENDendTransaction @@ -369,8 +384,8 @@ curlSetup(instanceData *pData) pData->curlHandle = handle; pData->postHeader = header; - if(pData->dynSrchIdx == 0 && pData->dynSrchType == 0) { - /* in this case, we know no tpls are involved --> NULL OK! */ + if(pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0)) { + /* in this case, we know no tpls are involved in the request-->NULL OK! */ setCurlURL(pData, NULL, NULL); } -- cgit v1.2.3 From 0314f370a4c15ce2190febfccee6664ecccab788 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 May 2012 15:14:16 +0200 Subject: omelasticsearch: dyn index&type now also supported in bulk mode --- plugins/omelasticsearch/omelasticsearch.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 704c9950..a09851b1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -241,18 +241,21 @@ buildBatch(instanceData *pData, uchar *message, uchar *tpl1, uchar *tpl2) { int length = strlen((char *)message); int r; + uchar *searchIndex; + uchar *searchType; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_END "\"}}\n" #warning TODO: use dynamic index/type! + getIndexAndType(pData, tpl1, tpl2, &searchIndex, &searchType); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchIndex, - ustrlen(pData->searchIndex)); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)pData->searchType, - ustrlen(pData->searchType)); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + ustrlen(searchType)); if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); -- cgit v1.2.3 From b0a3e85e102c3e549574bf8f418ca643109f2884 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 13 Jun 2012 12:49:25 +0200 Subject: omelasticsearch: bugfix: bulkmode setting was not properly initialized --- plugins/omelasticsearch/omelasticsearch.c | 1 + 1 file changed, 1 insertion(+) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index a09851b1..d8db7307 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -414,6 +414,7 @@ setInstParamDefaults(instanceData *pData) pData->dynSrchIdx = 0; pData->dynSrchType = 0; pData->asyncRepl = 0; + pData->bulkmode = 0; pData->tplName = NULL; } -- cgit v1.2.3