diff options
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 422 |
1 files changed, 345 insertions, 77 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 00e4dbac..d49667f9 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -11,11 +11,11 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * -or- * see COPYING.ASL20 in the source distribution - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,6 +34,10 @@ #include <signal.h> #include <errno.h> #include <time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include "cJSON/cjson.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -54,18 +58,19 @@ DEFobjCurrIf(errmsg) DEFobjCurrIf(statsobj) statsobj_t *indexStats; -STATSCOUNTER_DEF(indexConFail, mutIndexConFail) STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) -STATSCOUNTER_DEF(indexFailed, mutIndexFailed) -STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) +STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail) +STATSCOUNTER_DEF(indexESFail, mutIndexESFail) /* REST API for elasticsearch hits this URL: * http://<hostName>:<restPort>/<searchIndex>/<searchType> */ typedef struct curl_slist HEADER; typedef struct _instanceData { - uchar *server; int port; + int replyLen; + int fdErrFile; /* error file fd or -1 if not open */ + uchar *server; uchar *uid; uchar *pwd; uchar *searchIndex; @@ -73,9 +78,14 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *bulkId; + uchar *restURL; /* last used URL for error reporting */ + uchar *errorFile; + char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; + sbool dynBulkId; sbool bulkmode; sbool asyncRepl; struct { @@ -104,7 +114,10 @@ static struct cnfparamdescr actpdescr[] = { { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 } + { "errorfile", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 1 }, + { "dynbulkid", eCmdHdlrBinary, 0 }, + { "bulkid", eCmdHdlrGetWord, 0 }, }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, @@ -114,6 +127,8 @@ static struct cnfparamblk actpblk = BEGINcreateInstance CODESTARTcreateInstance + pData->restURL = NULL; + pData->fdErrFile = -1; ENDcreateInstance BEGINisCompatibleWithFeature @@ -132,6 +147,8 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + if(pData->fdErrFile != -1) + close(pData->fdErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -139,6 +156,10 @@ CODESTARTfreeInstance free(pData->searchType); free(pData->parent); free(pData->tplName); + free(pData->timeout); + free(pData->restURL); + free(pData->errorFile); + free(pData->bulkId); ENDfreeInstance BEGINdbgPrintInstInfo @@ -158,6 +179,10 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); + dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? + (uchar*)"(not configured)" : pData->errorFile); + dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId); + dbgprintf("\tbulkid='%s'\n", pData->bulkId); ENDdbgPrintInstInfo @@ -198,16 +223,26 @@ checkConn(instanceData *pData) DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); ABORT_FINALIZE(RS_RET_SUSPENDED); } + /* Bodypart of request not needed, so set curl opt to nobody and httpget, otherwise lib-curl could sigsegv */ + curl_easy_setopt(curl, CURLOPT_HTTPGET, TRUE); + curl_easy_setopt(curl, CURLOPT_NOBODY, TRUE); + /* Only enable for debugging + curl_easy_setopt(curl, CURLOPT_VERBOSE, TRUE); */ + cstr = es_str2cstr(url, NULL); curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - + + pData->reply = NULL; + pData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + free(pData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -227,7 +262,8 @@ ENDtryResume /* get the current index and type for this message */ static inline void getIndexTypeAndParent(instanceData *pData, uchar **tpls, - uchar **srchIndex, uchar **srchType, uchar **parent) + uchar **srchIndex, uchar **srchType, uchar **parent, + uchar **bulkId) { if(pData->dynSrchIdx) { *srchIndex = tpls[1]; @@ -235,15 +271,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[2]; if(pData->dynParent) { *parent = tpls[3]; + if(pData->dynBulkId) { + *bulkId = tpls[4]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } } else { @@ -252,15 +300,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[1]; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[1]; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[1]; + } } } } @@ -271,10 +331,10 @@ static rsRetVal setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; - char *restURL; uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId; es_str_t *url; int rLocal; int r; @@ -286,7 +346,7 @@ setCurlURL(instanceData *pData, uchar **tpls) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); parent = NULL; } else { - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -305,11 +365,12 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - restURL = es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + free(pData->restURL); + pData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - free(restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -320,7 +381,7 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: @@ -340,16 +401,15 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId = NULL; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_PARENT "\",\"_parent\":\"" +# define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); -dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); -dbgprintf("AAA: searchType: '%s'\n", searchType); -dbgprintf("AAA: parent: '%s'\n", parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -360,6 +420,10 @@ dbgprintf("AAA: parent: '%s'\n", parent); if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); } + if(bulkId != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -373,37 +437,192 @@ finalize_it: RETiRet; } + +/* write data error request/replies to separate error file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + */ +static inline rsRetVal +writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +{ + char *rendered = NULL; + cJSON *errRoot; + cJSON *req; + cJSON *replyRoot = *pReplyRoot; + size_t toWrite; + ssize_t wrRet; + char errStr[1024]; + DEFiRet; + + if(pData->errorFile == NULL) { + DBGPRINTF("omelasticsearch: no local error logger defined - " + "ignoring ES error information\n"); + FINALIZE; + } + + if(pData->fdErrFile == -1) { + pData->fdErrFile = open((char*)pData->errorFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdErrFile == -1) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("omelasticsearch: error opening error file: %s\n", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); + + if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(errRoot, "request", req); + cJSON_AddItemToObject(errRoot, "reply", replyRoot); + rendered = cJSON_Print(errRoot); + /* we do not do real error-handling on the err file, as this finally complicates + * things way to much. + */ + DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); + toWrite = strlen(rendered); + wrRet = write(pData->fdErrFile, rendered, toWrite); + if(wrRet != (ssize_t) toWrite) { + DBGPRINTF("omelasticsearch: error %d writing error file, write returns %lld\n", + errno, (long long) wrRet); + } + cJSON_Delete(errRoot); + *pReplyRoot = NULL; /* tell caller not to delete once again! */ + +finalize_it: + free(rendered); + RETiRet; +} + + +static inline rsRetVal +checkResultBulkmode(instanceData *pData, cJSON *root) +{ + int i; + int numitems; + cJSON *items; + cJSON *item; + cJSON *create; + cJSON *ok; + DEFiRet; + + items = cJSON_GetObjectItem(root, "items"); + if(items == NULL || items->type != cJSON_Array) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "bulkmode insert does not return array, reply is: %s\n", + pData->reply); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + numitems = cJSON_GetArraySize(items); +DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); + for(i = 0 ; i < numitems ; ++i) { + item = cJSON_GetArrayItem(items, i); + if(item == NULL) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "cannot obtain reply array item %d\n", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + create = cJSON_GetObjectItem(item, "create"); + if(create == NULL || create->type != cJSON_Object) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "cannot obtain 'create' item for #%d\n", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + ok = cJSON_GetObjectItem(create, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "item %d, prop ok (%p) not ok\n", i, ok); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + } + +finalize_it: + if(iRet != RS_RET_OK) { + STATSCOUNTER_INC(indexESFail, mutIndexESFail); + } + RETiRet; +} + + +static inline rsRetVal +checkResult(instanceData *pData, uchar *reqmsg) +{ + cJSON *root; + cJSON *ok; + DEFiRet; + + root = cJSON_Parse(pData->reply); + if(root == NULL) { + DBGPRINTF("omelasticsearch: could not parse JSON result \n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + if(pData->bulkmode) { + iRet = checkResultBulkmode(pData, root); + } else { + ok = cJSON_GetObjectItem(root, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + iRet = RS_RET_DATAFAIL; + } + } + + /* Note: we ignore errors writing the error file, as we cannot handle + * these in any case. + */ + if(iRet == RS_RET_DATAFAIL) { + writeDataError(pData, &root, reqmsg); + iRet = RS_RET_OK; /* we have handled the problem! */ + } + +finalize_it: + if(root != NULL) + cJSON_Delete(root); + RETiRet; +} + + static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) +curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) { CURLcode code; - CURL *curl = instance->curlHandle; + CURL *curl = pData->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpls)); + pData->reply = NULL; + pData->replyLen = 0; + + if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) + CHKiRet(setCurlURL(pData, tpls)); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); -dbgprintf("omelasticsearch: do curl_easy_perform()\n"); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); -DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_CONNECT: case CURLE_WRITE_ERROR: - STATSCOUNTER_INC(indexConFail, mutIndexConFail); + STATSCOUNTER_INC(indexHTTPFail, mutIndexHTTPFail); DBGPRINTF("omelasticsearch: we are suspending ourselfs due " "to failure %lld of curl_easy_perform()\n", (long long) code); - return RS_RET_SUSPENDED; + ABORT_FINALIZE(RS_RET_SUSPENDED); default: - STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - return RS_RET_OK; + break; } + + DBGPRINTF("omelasticsearch: pData replyLen = '%d'\n", pData->replyLen); + if (pData->replyLen > 0) { + pData->reply[pData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ + } + DBGPRINTF("omelasticsearch: pData reply: '%s'\n", pData->reply); + + CHKiRet(checkResult(pData, message)); finalize_it: + free(pData->reply); RETiRet; } @@ -421,10 +640,10 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { -dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString)); } @@ -434,12 +653,17 @@ ENDdoAction BEGINendTransaction - char *cstr; + char *cstr = NULL; CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); - cstr = es_str2cstr(pData->batch.data, NULL); - dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); + /* End Transaction only if batch data is not empty */ + if (pData->batch.data != NULL ) { + cstr = es_str2cstr(pData->batch.data, NULL); + dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); + } + else + dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n"); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -449,35 +673,20 @@ ENDendTransaction size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { - unsigned int i; char *p = (char *)ptr; - char *jsonData = (char *)userdata; - static char ok[] = "{\"ok\":true,"; - - ASSERT(size == 1); -DBGPRINTF("omelasticsearch request: %s\n", jsonData); -DBGPRINTF("omelasticsearch result: "); -for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); -DBGPRINTF("\n"); - - if (size == 1 && - nmemb > sizeof(ok)-1 && - strncmp(p, ok, sizeof(ok)-1) == 0) { - STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); -dbgprintf("omelasticsearch ok\n"); - } else { -dbgprintf("omelasticsearch fail\n"); - STATSCOUNTER_INC(indexFailed, mutIndexFailed); - if (Debug) { - DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); - DBGPRINTF("omelasticsearch (fail) result: "); - for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); - DBGPRINTF("\n"); - } + instanceData *pData = (instanceData*) userdata; + char *buf; + size_t newlen; + + newlen = pData->replyLen + size*nmemb; + if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); + return 0; /* abort due to failure */ } - return size * nmemb; + memcpy(buf+pData->replyLen, p, size*nmemb); + pData->replyLen = newlen; + pData->reply = buf; + return size*nmemb; } @@ -493,10 +702,10 @@ curlSetup(instanceData *pData) } header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_POST, 1); + curl_easy_setopt(handle, CURLOPT_POST, 1); pData->curlHandle = handle; pData->postHeader = header; @@ -533,6 +742,9 @@ setInstParamDefaults(instanceData *pData) pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; + pData->errorFile = NULL; + pData->dynBulkId= 0; + pData->bulkId = NULL; } BEGINnewActInst @@ -552,6 +764,8 @@ CODESTARTnewActInst continue; if(!strcmp(actpblk.descr[i].name, "server")) { pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "errorfile")) { + pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; } else if(!strcmp(actpblk.descr[i].name, "uid")) { @@ -578,12 +792,16 @@ CODESTARTnewActInst pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) { + pData->dynBulkId = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkid")) { + pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("omelasticsearch: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); } } - + if(pData->pwd != NULL && pData->uid == NULL) { errmsg.LogError(0, RS_RET_UID_MISSING, "omelasticsearch: password is provided, but no uid " @@ -608,6 +826,12 @@ CODESTARTnewActInst "name for parent template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynBulkId && pData->bulkId == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic bulkid, but no " + "name for bulkid template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -623,6 +847,7 @@ CODESTARTnewActInst if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; + if(pData->dynBulkId) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTnewActInst(iNumTpls) @@ -644,11 +869,29 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 4, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } } else { @@ -658,12 +901,30 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); - } + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } } } @@ -693,6 +954,14 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct +BEGINdoHUP +CODESTARTdoHUP + if(pData->fdErrFile != -1) { + close(pData->fdErrFile); + pData->fdErrFile = -1; + } +ENDdoHUP + BEGINmodExit CODESTARTmodExit @@ -707,6 +976,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_doHUP CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt @@ -725,15 +995,13 @@ CODEmodInit_QueryRegCFSLineHdlr /* support statistics gathering */ CHKiRet(statsobj.Construct(&indexStats)); - CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", - ctrType_IntCtr, &indexConFail)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + CHKiRet(statsobj.SetName(indexStats, (uchar *)"omelasticsearch")); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submitted", ctrType_IntCtr, &indexSubmit)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", - ctrType_IntCtr, &indexFailed)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", - ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.http", + ctrType_IntCtr, &indexHTTPFail)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es", + ctrType_IntCtr, &indexESFail)); CHKiRet(statsobj.ConstructFinalize(indexStats)); ENDmodInit |