diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-07-25 18:30:18 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-07-25 18:30:18 +0200 |
commit | 71121003e273dc9fb2d6dd2aac427fb2c6329d23 (patch) | |
tree | a2fa641d12eccc64f4f012e56ccb79561ff3fd2d /plugins/omelasticsearch/omelasticsearch.c | |
parent | 7e725f41f8857c861f1d3d64fd725916cb4bf9b8 (diff) | |
download | rsyslog-71121003e273dc9fb2d6dd2aac427fb2c6329d23.tar.gz rsyslog-71121003e273dc9fb2d6dd2aac427fb2c6329d23.tar.bz2 rsyslog-71121003e273dc9fb2d6dd2aac427fb2c6329d23.zip |
omelasticsearch: parse JSON response (in regard to data errors)
note: bulkmode response processing is still mostly missing
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 102 |
1 files changed, 65 insertions, 37 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f77caeca..b49caf38 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -34,6 +34,7 @@ #include <signal.h> #include <errno.h> #include <time.h> +#include "cjson.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -64,8 +65,9 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) */ typedef struct curl_slist HEADER; typedef struct _instanceData { - uchar *server; int port; + int replyLen; + uchar *server; uchar *uid; uchar *pwd; uchar *searchIndex; @@ -73,6 +75,7 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; @@ -139,6 +142,7 @@ CODESTARTfreeInstance free(pData->searchType); free(pData->parent); free(pData->tplName); + free(pData->timeout); ENDfreeInstance BEGINdbgPrintInstInfo @@ -202,12 +206,16 @@ checkConn(instanceData *pData) curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); + pData->reply = NULL; + pData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } + free(pData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -373,22 +381,51 @@ finalize_it: RETiRet; } +static inline rsRetVal +checkResult(instanceData *pData) +{ + cJSON *root; + cJSON *ok; + DEFiRet; + + root = cJSON_Parse(pData->reply); + if(root == NULL) { + DBGPRINTF("omelasticsearch: could not parse JSON result \n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + if(pData->bulkmode) { + //TODO: implement + //iRet = checkResultBulkmode(pData, root); + } else { + ok = cJSON_GetObjectItem(root, "ok"); + if(ok == NULL || ok->type != cJSON_True) { + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + } +finalize_it: + cJSON_Delete(root); + RETiRet; +} + + static rsRetVal -curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls) +curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) { CURLcode code; - CURL *curl = instance->curlHandle; + CURL *curl = pData->curlHandle; DEFiRet; - if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent) - CHKiRet(setCurlURL(instance, tpls)); + pData->reply = NULL; + pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) + CHKiRet(setCurlURL(pData, tpls)); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); -dbgprintf("omelasticsearch: do curl_easy_perform()\n"); code = curl_easy_perform(curl); -DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_PROXY: @@ -398,12 +435,18 @@ DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) co DBGPRINTF("omelasticsearch: we are suspending ourselfs due " "to failure %lld of curl_easy_perform()\n", (long long) code); - return RS_RET_SUSPENDED; + ABORT_FINALIZE(RS_RET_SUSPENDED); default: STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - return RS_RET_OK; + break; } + + pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ + DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); + + CHKiRet(checkResult(pData)); finalize_it: + free(pData->reply); RETiRet; } @@ -449,35 +492,20 @@ ENDendTransaction size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { - unsigned int i; char *p = (char *)ptr; - char *jsonData = (char *)userdata; - static char ok[] = "{\"ok\":true,"; - - ASSERT(size == 1); -DBGPRINTF("omelasticsearch request: %s\n", jsonData); -DBGPRINTF("omelasticsearch result: "); -for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); -DBGPRINTF("\n"); - - if (size == 1 && - nmemb > sizeof(ok)-1 && - strncmp(p, ok, sizeof(ok)-1) == 0) { - STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); -dbgprintf("omelasticsearch ok\n"); - } else { -dbgprintf("omelasticsearch fail\n"); - STATSCOUNTER_INC(indexFailed, mutIndexFailed); - if (Debug) { - DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData); - DBGPRINTF("omelasticsearch (fail) result: "); - for (i = 0; i < nmemb; i++) - DBGPRINTF("%c", p[i]); - DBGPRINTF("\n"); - } + instanceData *pData = (instanceData*) userdata; + char *buf; + size_t newlen; + + newlen = pData->replyLen + size*nmemb; + if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); + return 0; /* abort due to failure */ } - return size * nmemb; + memcpy(buf+pData->replyLen, p, size*nmemb); + pData->replyLen = newlen; + pData->reply = buf; + return size*nmemb; } |