From 71121003e273dc9fb2d6dd2aac427fb2c6329d23 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 25 Jul 2012 18:30:18 +0200 Subject: omelasticsearch: parse JSON response (in regard to data errors) note: bulkmode response processing is still mostly missing --- plugins/omelasticsearch/omelasticsearch.c | 102 +++++++++++++++++++----------- 1 file changed, 65 insertions(+), 37 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f77caeca..b49caf38 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -34,6 +34,7 @@ #include #include #include +#include "cjson.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -64,8 +65,9 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { - uchar *server; int port; + int replyLen; + uchar *server; uchar *uid; uchar *pwd; uchar *searchIndex; @@ -73,6 +75,7 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; @@ -139,6 +142,7 @@ CODESTARTfreeInstance free(pData->searchType); free(pData->parent); free(pData->tplName); + free(pData->timeout); ENDfreeInstance BEGINdbgPrintInstInfo @@ -202,12 +206,16 @@ checkConn(instanceData *pData) curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); + pData->reply = NULL; + pData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + free(pData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -373,22 +381,51 @@ finalize_it: RETiRet; } +static inline rsRetVal +checkResult(instanceData *pData) +{ + cJSON *root; + cJSON *ok; + DEFiRet; + + root = cJSON_Parse(pData->reply); + if(root == NULL) { + DBGPRINTF("omelasticsearch: could not parse JSON result \n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + if(pData->bulkmode) { + //TODO: implement + //iRet = checkResultBulkmode(pData, root); + } else { + ok = cJSON_GetObjectItem(root, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + } +finalize_it: + cJSON_Delete(root); + RETiRet; +} + + static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) +curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) { CURLcode code; - CURL *curl = instance->curlHandle; + CURL *curl = pData->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpls)); + pData->reply = NULL; + pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) + CHKiRet(setCurlURL(pData, tpls)); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); -dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); -DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -398,12 +435,18 @@ DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) co DBGPRINTF("omelasticsearch: we are suspending ourselfs due " "to failure %lld of curl_easy_perform()\n", (long long) code); - return RS_RET_SUSPENDED; + ABORT_FINALIZE(RS_RET_SUSPENDED); default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - return RS_RET_OK; + break; } + + pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ + DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); + + CHKiRet(checkResult(pData)); finalize_it: + free(pData->reply); RETiRet; } @@ -449,35 +492,20 @@ ENDendTransaction size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { - unsigned int i; char *p = (char *)ptr; - char *jsonData = (char *)userdata; - static char ok[] = "{\"ok\":true,"; - - ASSERT(size == 1); -DBGPRINTF("omelasticsearch request: %s\n", jsonData); -DBGPRINTF("omelasticsearch result: "); -for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); -DBGPRINTF("\n"); - - if (size == 1 && - nmemb > sizeof(ok)-1 && - strncmp(p, ok, sizeof(ok)-1) == 0) { - STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); -dbgprintf("omelasticsearch ok\n"); - } else { -dbgprintf("omelasticsearch fail\n"); - STATSCOUNTER_INC(indexFailed, mutIndexFailed); - if (Debug) { - DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); - DBGPRINTF("omelasticsearch (fail) result: "); - for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); - DBGPRINTF("\n"); - } + instanceData *pData = (instanceData*) userdata; + char *buf; + size_t newlen; + + newlen = pData->replyLen + size*nmemb; + if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); + return 0; /* abort due to failure */ } - return size * nmemb; + memcpy(buf+pData->replyLen, p, size*nmemb); + pData->replyLen = newlen; + pData->reply = buf; + return size*nmemb; } -- cgit v1.2.3 From 575686bd68a47fc5d9f59c1ed610f6680e1bb6fa Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 11:01:11 +0200 Subject: omelasticsearch: mileston: bulk reply is parsed --- plugins/omelasticsearch/omelasticsearch.c | 50 +++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index b49caf38..04113170 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -381,6 +381,53 @@ finalize_it: RETiRet; } +static inline rsRetVal +checkResultBulkmode(instanceData *pData, cJSON *root) +{ + int i; + int numitems; + cJSON *items; + cJSON *item; + cJSON *create; + cJSON *ok; + DEFiRet; + + items = cJSON_GetObjectItem(root, "items"); + if(items == NULL || items->type != cJSON_Array) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "bulkmode insert does not return array, reply is: %s\n", + pData->reply); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + numitems = cJSON_GetArraySize(items); +DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); + for(i = 0 ; i < numitems ; ++i) { + item = cJSON_GetArrayItem(items, i); + if(item == NULL) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "cannot obtain reply array item %d\n", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + create = cJSON_GetObjectItem(item, "create"); + if(create == NULL || create->type != cJSON_Object) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "cannot obtain 'create' item for #%d\n", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + ok = cJSON_GetObjectItem(create, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: " + "item ok (%p) not ok\n", ok); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } +dbgprintf("omelasticsearch: item %d is OK\n", i); + } + +finalize_it: + RETiRet; +} + + static inline rsRetVal checkResult(instanceData *pData) { @@ -395,8 +442,7 @@ checkResult(instanceData *pData) } if(pData->bulkmode) { - //TODO: implement - //iRet = checkResultBulkmode(pData, root); + iRet = checkResultBulkmode(pData, root); } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { -- cgit v1.2.3 From 36f7fbd5f458c3e8b2924a72f74621e055f319b2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 12:40:49 +0200 Subject: omelasticsearch: support for writing data errors to local file added --- plugins/omelasticsearch/omelasticsearch.c | 109 ++++++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 14 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 04113170..fea85f22 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -34,6 +34,9 @@ #include #include #include +#include +#include +#include #include "cjson.h" #include "conf.h" #include "syslogd-types.h" @@ -67,6 +70,7 @@ typedef struct curl_slist HEADER; typedef struct _instanceData { int port; int replyLen; + int fdErrFile; /* error file fd or -1 if not open */ uchar *server; uchar *uid; uchar *pwd; @@ -75,6 +79,8 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *restURL; /* last used URL for error reporting */ + uchar *errorFile; char *reply; sbool dynSrchIdx; sbool dynSrchType; @@ -107,6 +113,7 @@ static struct cnfparamdescr actpdescr[] = { { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, + { "errorfile", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -117,6 +124,8 @@ static struct cnfparamblk actpblk = BEGINcreateInstance CODESTARTcreateInstance + pData->restURL = NULL; + pData->fdErrFile = -1; ENDcreateInstance BEGINisCompatibleWithFeature @@ -135,6 +144,8 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + if(pData->fdErrFile != -1) + close(pData->fdErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -143,6 +154,8 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); + free(pData->restURL); + free(pData->errorFile); ENDfreeInstance BEGINdbgPrintInstInfo @@ -162,6 +175,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); + dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? + (uchar*)"(not configured)" : pData->errorFile); ENDdbgPrintInstInfo @@ -279,7 +294,6 @@ static rsRetVal setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; - char *restURL; uchar *searchIndex; uchar *searchType; uchar *parent; @@ -313,11 +327,12 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - restURL = es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + free(pData->restURL); + pData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - free(restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -355,9 +370,6 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_END "\"}}\n" getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); -dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); -dbgprintf("AAA: searchType: '%s'\n", searchType); -dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -381,6 +393,55 @@ finalize_it: RETiRet; } + +/* write data error request/replies to separate error file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + */ +static inline rsRetVal +writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +{ + char *rendered = NULL; + cJSON *errRoot; + cJSON *req; + cJSON *replyRoot = *pReplyRoot; + char errStr[1024]; + DEFiRet; + + if(pData->errorFile == NULL) + FINALIZE; + + if(pData->fdErrFile == -1) { + pData->fdErrFile = open((char*)pData->errorFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdErrFile == -1) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("omelasticsearch: error opening error file: %s\n", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); + + if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(errRoot, "request", req); + cJSON_AddItemToObject(errRoot, "reply", replyRoot); + rendered = cJSON_Print(errRoot); +DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); + write(pData->fdErrFile, rendered, strlen(rendered)); + free(rendered); + cJSON_Delete(errRoot); + *pReplyRoot = NULL; /* tell caller not to delete once again! */ + +finalize_it: + if(rendered != NULL) + free(rendered); + RETiRet; +} + + static inline rsRetVal checkResultBulkmode(instanceData *pData, cJSON *root) { @@ -420,7 +481,6 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); "item ok (%p) not ok\n", ok); ABORT_FINALIZE(RS_RET_DATAFAIL); } -dbgprintf("omelasticsearch: item %d is OK\n", i); } finalize_it: @@ -429,7 +489,7 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData) +checkResult(instanceData *pData, uchar *reqmsg) { cJSON *root; cJSON *ok; @@ -446,11 +506,21 @@ checkResult(instanceData *pData) } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { - ABORT_FINALIZE(RS_RET_DATAFAIL); + iRet = RS_RET_DATAFAIL; } } + + /* Note: we ignore errors writing the error file, as we cannot handle + * these in any case. + */ + if(iRet == RS_RET_DATAFAIL) { + writeDataError(pData, &root, reqmsg); + iRet = RS_RET_OK; /* we have handled the problem! */ + } + finalize_it: - cJSON_Delete(root); + if(root != NULL) + cJSON_Delete(root); RETiRet; } @@ -490,7 +560,7 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); - CHKiRet(checkResult(pData)); + CHKiRet(checkResult(pData, message)); finalize_it: free(pData->reply); RETiRet; @@ -513,7 +583,6 @@ CODESTARTdoAction if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { -dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString)); } @@ -607,6 +676,7 @@ setInstParamDefaults(instanceData *pData) pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; + pData->errorFile = NULL; } BEGINnewActInst @@ -626,6 +696,8 @@ CODESTARTnewActInst continue; if(!strcmp(actpblk.descr[i].name, "server")) { pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "errorfile")) { + pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; } else if(!strcmp(actpblk.descr[i].name, "uid")) { @@ -767,6 +839,14 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct +BEGINdoHUP +CODESTARTdoHUP + if(pData->fdErrFile != -1) { + close(pData->fdErrFile); + pData->fdErrFile = -1; + } +ENDdoHUP + BEGINmodExit CODESTARTmodExit @@ -781,6 +861,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_doHUP CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt -- cgit v1.2.3 From cb12ed5dd211f9910860e86fd937180895c459de Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 30 Jul 2012 13:06:44 +0200 Subject: omelasticsearch: improved debug logging --- plugins/omelasticsearch/omelasticsearch.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'plugins/omelasticsearch/omelasticsearch.c') diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index fea85f22..48cfef7d 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -408,8 +408,11 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) char errStr[1024]; DEFiRet; - if(pData->errorFile == NULL) + if(pData->errorFile == NULL) { + DBGPRINTF("omelasticsearch: no local error logger defined - " + "ignoring ES error information\n"); FINALIZE; + } if(pData->fdErrFile == -1) { pData->fdErrFile = open((char*)pData->errorFile, @@ -478,7 +481,7 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); ok = cJSON_GetObjectItem(create, "ok"); if(ok == NULL || ok->type != cJSON_True) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " - "item ok (%p) not ok\n", ok); + "item %d, prop ok (%p) not ok\n", i, ok); ABORT_FINALIZE(RS_RET_DATAFAIL); } } -- cgit v1.2.3