summaryrefslogtreecommitdiffstats
path: root/plugins/omelasticsearch/omelasticsearch.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-07-25 18:30:18 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2012-07-25 18:30:18 +0200
commit71121003e273dc9fb2d6dd2aac427fb2c6329d23 (patch)
treea2fa641d12eccc64f4f012e56ccb79561ff3fd2d /plugins/omelasticsearch/omelasticsearch.c
parent7e725f41f8857c861f1d3d64fd725916cb4bf9b8 (diff)
downloadrsyslog-71121003e273dc9fb2d6dd2aac427fb2c6329d23.tar.gz
rsyslog-71121003e273dc9fb2d6dd2aac427fb2c6329d23.tar.bz2
rsyslog-71121003e273dc9fb2d6dd2aac427fb2c6329d23.zip
omelasticsearch: parse JSON response (in regard to data errors)
note: bulkmode response processing is still mostly missing
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c102
1 files changed, 65 insertions, 37 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index f77caeca..b49caf38 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -34,6 +34,7 @@
#include <signal.h>
#include <errno.h>
#include <time.h>
+#include "cjson.h"
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
@@ -64,8 +65,9 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
*/
typedef struct curl_slist HEADER;
typedef struct _instanceData {
- uchar *server;
int port;
+ int replyLen;
+ uchar *server;
uchar *uid;
uchar *pwd;
uchar *searchIndex;
@@ -73,6 +75,7 @@ typedef struct _instanceData {
uchar *parent;
uchar *tplName;
uchar *timeout;
+ char *reply;
sbool dynSrchIdx;
sbool dynSrchType;
sbool dynParent;
@@ -139,6 +142,7 @@ CODESTARTfreeInstance
free(pData->searchType);
free(pData->parent);
free(pData->tplName);
+ free(pData->timeout);
ENDfreeInstance
BEGINdbgPrintInstInfo
@@ -202,12 +206,16 @@ checkConn(instanceData *pData)
curl_easy_setopt(curl, CURLOPT_URL, cstr);
free(cstr);
+ pData->reply = NULL;
+ pData->replyLen = 0;
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() "
"failed: %s\n", curl_easy_strerror(res));
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ free(pData->reply);
DBGPRINTF("omelasticsearch: checkConn() completed with success\n");
finalize_it:
@@ -373,22 +381,51 @@ finalize_it:
RETiRet;
}
+static inline rsRetVal
+checkResult(instanceData *pData)
+{
+ cJSON *root;
+ cJSON *ok;
+ DEFiRet;
+
+ root = cJSON_Parse(pData->reply);
+ if(root == NULL) {
+ DBGPRINTF("omelasticsearch: could not parse JSON result \n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if(pData->bulkmode) {
+ //TODO: implement
+ //iRet = checkResultBulkmode(pData, root);
+ } else {
+ ok = cJSON_GetObjectItem(root, "ok");
+ if(ok == NULL || ok->type != cJSON_True) {
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ }
+finalize_it:
+ cJSON_Delete(root);
+ RETiRet;
+}
+
+
static rsRetVal
-curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls)
+curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
{
CURLcode code;
- CURL *curl = instance->curlHandle;
+ CURL *curl = pData->curlHandle;
DEFiRet;
- if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent)
- CHKiRet(setCurlURL(instance, tpls));
+ pData->reply = NULL;
+ pData->replyLen = 0;
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
+ if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent)
+ CHKiRet(setCurlURL(pData, tpls));
+
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
-dbgprintf("omelasticsearch: do curl_easy_perform()\n");
code = curl_easy_perform(curl);
-DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code);
switch (code) {
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_COULDNT_RESOLVE_PROXY:
@@ -398,12 +435,18 @@ DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) co
DBGPRINTF("omelasticsearch: we are suspending ourselfs due "
"to failure %lld of curl_easy_perform()\n",
(long long) code);
- return RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
default:
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
- return RS_RET_OK;
+ break;
}
+
+ pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */
+ DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply);
+
+ CHKiRet(checkResult(pData));
finalize_it:
+ free(pData->reply);
RETiRet;
}
@@ -449,35 +492,20 @@ ENDendTransaction
size_t
curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
{
- unsigned int i;
char *p = (char *)ptr;
- char *jsonData = (char *)userdata;
- static char ok[] = "{\"ok\":true,";
-
- ASSERT(size == 1);
-DBGPRINTF("omelasticsearch request: %s\n", jsonData);
-DBGPRINTF("omelasticsearch result: ");
-for (i = 0; i < nmemb; i++)
- DBGPRINTF("%c", p[i]);
-DBGPRINTF("\n");
-
- if (size == 1 &&
- nmemb > sizeof(ok)-1 &&
- strncmp(p, ok, sizeof(ok)-1) == 0) {
- STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
-dbgprintf("omelasticsearch ok\n");
- } else {
-dbgprintf("omelasticsearch fail\n");
- STATSCOUNTER_INC(indexFailed, mutIndexFailed);
- if (Debug) {
- DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData);
- DBGPRINTF("omelasticsearch (fail) result: ");
- for (i = 0; i < nmemb; i++)
- DBGPRINTF("%c", p[i]);
- DBGPRINTF("\n");
- }
+ instanceData *pData = (instanceData*) userdata;
+ char *buf;
+ size_t newlen;
+
+ newlen = pData->replyLen + size*nmemb;
+ if((buf = realloc(pData->reply, newlen + 1)) == NULL) {
+ DBGPRINTF("omelasticsearch: realloc failed in curlResult\n");
+ return 0; /* abort due to failure */
}
- return size * nmemb;
+ memcpy(buf+pData->replyLen, p, size*nmemb);
+ pData->replyLen = newlen;
+ pData->reply = buf;
+ return size*nmemb;
}