summaryrefslogtreecommitdiffstats
path: root/plugins/omelasticsearch/omelasticsearch.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c250
1 files changed, 204 insertions, 46 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 50acdf11..982f4318 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -34,6 +34,10 @@
#include <signal.h>
#include <errno.h>
#include <time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include "cJSON/cjson.h"
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
@@ -64,8 +68,10 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
*/
typedef struct curl_slist HEADER;
typedef struct _instanceData {
- uchar *server;
int port;
+ int replyLen;
+ int fdErrFile; /* error file fd or -1 if not open */
+ uchar *server;
uchar *uid;
uchar *pwd;
uchar *searchIndex;
@@ -73,6 +79,9 @@ typedef struct _instanceData {
uchar *parent;
uchar *tplName;
uchar *timeout;
+ uchar *restURL; /* last used URL for error reporting */
+ uchar *errorFile;
+ char *reply;
sbool dynSrchIdx;
sbool dynSrchType;
sbool dynParent;
@@ -104,6 +113,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "bulkmode", eCmdHdlrBinary, 0 },
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
+ { "errorfile", eCmdHdlrGetWord, 0 },
{ "template", eCmdHdlrGetWord, 1 }
};
static struct cnfparamblk actpblk =
@@ -114,6 +124,8 @@ static struct cnfparamblk actpblk =
BEGINcreateInstance
CODESTARTcreateInstance
+ pData->restURL = NULL;
+ pData->fdErrFile = -1;
ENDcreateInstance
BEGINisCompatibleWithFeature
@@ -132,6 +144,8 @@ CODESTARTfreeInstance
curl_easy_cleanup(pData->curlHandle);
pData->curlHandle = NULL;
}
+ if(pData->fdErrFile != -1)
+ close(pData->fdErrFile);
free(pData->server);
free(pData->uid);
free(pData->pwd);
@@ -139,6 +153,9 @@ CODESTARTfreeInstance
free(pData->searchType);
free(pData->parent);
free(pData->tplName);
+ free(pData->timeout);
+ free(pData->restURL);
+ free(pData->errorFile);
ENDfreeInstance
BEGINdbgPrintInstInfo
@@ -158,6 +175,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tdynamic parent=%d\n", pData->dynParent);
dbgprintf("\tasync replication=%d\n", pData->asyncRepl);
dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
+ dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ?
+ (uchar*)"(not configured)" : pData->errorFile);
ENDdbgPrintInstInfo
@@ -202,12 +221,16 @@ checkConn(instanceData *pData)
curl_easy_setopt(curl, CURLOPT_URL, cstr);
free(cstr);
+ pData->reply = NULL;
+ pData->replyLen = 0;
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() "
"failed: %s\n", curl_easy_strerror(res));
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ free(pData->reply);
DBGPRINTF("omelasticsearch: checkConn() completed with success\n");
finalize_it:
@@ -271,7 +294,6 @@ static rsRetVal
setCurlURL(instanceData *pData, uchar **tpls)
{
char authBuf[1024];
- char *restURL;
uchar *searchIndex;
uchar *searchType;
uchar *parent;
@@ -305,11 +327,12 @@ setCurlURL(instanceData *pData, uchar **tpls)
if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1);
if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent));
}
- restURL = es_str2cstr(url, NULL);
- curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
+
+ free(pData->restURL);
+ pData->restURL = (uchar*)es_str2cstr(url, NULL);
+ curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
es_deleteStr(url);
- DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL);
- free(restURL);
+ DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL);
if(pData->uid != NULL) {
rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid,
@@ -347,9 +370,6 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
# define META_END "\"}}\n"
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
-dbgprintf("AAA: searchIndex: '%s'\n", searchIndex);
-dbgprintf("AAA: searchType: '%s'\n", searchType);
-dbgprintf("AAA: parent: '%s'\n", parent);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
@@ -373,22 +393,158 @@ finalize_it:
RETiRet;
}
+
+/* write data error request/replies to separate error file
+ * Note: we open the file but never close it before exit. If it
+ * needs to be closed, HUP must be sent.
+ */
+static inline rsRetVal
+writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
+{
+ char *rendered = NULL;
+ cJSON *errRoot;
+ cJSON *req;
+ cJSON *replyRoot = *pReplyRoot;
+ char errStr[1024];
+ DEFiRet;
+
+ if(pData->errorFile == NULL) {
+ DBGPRINTF("omelasticsearch: no local error logger defined - "
+ "ignoring ES error information\n");
+ FINALIZE;
+ }
+
+ if(pData->fdErrFile == -1) {
+ pData->fdErrFile = open((char*)pData->errorFile,
+ O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
+ S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+ if(pData->fdErrFile == -1) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ DBGPRINTF("omelasticsearch: error opening error file: %s\n", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+ if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
+ cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL));
+ cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg));
+
+ if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
+ cJSON_AddItemToObject(errRoot, "request", req);
+ cJSON_AddItemToObject(errRoot, "reply", replyRoot);
+ rendered = cJSON_Print(errRoot);
+DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered);
+ write(pData->fdErrFile, rendered, strlen(rendered));
+ free(rendered);
+ cJSON_Delete(errRoot);
+ *pReplyRoot = NULL; /* tell caller not to delete once again! */
+
+finalize_it:
+ if(rendered != NULL)
+ free(rendered);
+ RETiRet;
+}
+
+
+static inline rsRetVal
+checkResultBulkmode(instanceData *pData, cJSON *root)
+{
+ int i;
+ int numitems;
+ cJSON *items;
+ cJSON *item;
+ cJSON *create;
+ cJSON *ok;
+ DEFiRet;
+
+ items = cJSON_GetObjectItem(root, "items");
+ if(items == NULL || items->type != cJSON_Array) {
+ DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
+ "bulkmode insert does not return array, reply is: %s\n",
+ pData->reply);
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ numitems = cJSON_GetArraySize(items);
+DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
+ for(i = 0 ; i < numitems ; ++i) {
+ item = cJSON_GetArrayItem(items, i);
+ if(item == NULL) {
+ DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
+ "cannot obtain reply array item %d\n", i);
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ create = cJSON_GetObjectItem(item, "create");
+ if(create == NULL || create->type != cJSON_Object) {
+ DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
+ "cannot obtain 'create' item for #%d\n", i);
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ ok = cJSON_GetObjectItem(create, "ok");
+ if(ok == NULL || ok->type != cJSON_True) {
+ DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
+ "item %d, prop ok (%p) not ok\n", i, ok);
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+static inline rsRetVal
+checkResult(instanceData *pData, uchar *reqmsg)
+{
+ cJSON *root;
+ cJSON *ok;
+ DEFiRet;
+
+ root = cJSON_Parse(pData->reply);
+ if(root == NULL) {
+ DBGPRINTF("omelasticsearch: could not parse JSON result \n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if(pData->bulkmode) {
+ iRet = checkResultBulkmode(pData, root);
+ } else {
+ ok = cJSON_GetObjectItem(root, "ok");
+ if(ok == NULL || ok->type != cJSON_True) {
+ iRet = RS_RET_DATAFAIL;
+ }
+ }
+
+ /* Note: we ignore errors writing the error file, as we cannot handle
+ * these in any case.
+ */
+ if(iRet == RS_RET_DATAFAIL) {
+ writeDataError(pData, &root, reqmsg);
+ iRet = RS_RET_OK; /* we have handled the problem! */
+ }
+
+finalize_it:
+ if(root != NULL)
+ cJSON_Delete(root);
+ RETiRet;
+}
+
+
static rsRetVal
-curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls)
+curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
{
CURLcode code;
- CURL *curl = instance->curlHandle;
+ CURL *curl = pData->curlHandle;
DEFiRet;
- if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent)
- CHKiRet(setCurlURL(instance, tpls));
+ pData->reply = NULL;
+ pData->replyLen = 0;
+
+ if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent)
+ CHKiRet(setCurlURL(pData, tpls));
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
-dbgprintf("omelasticsearch: do curl_easy_perform()\n");
code = curl_easy_perform(curl);
-DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code);
switch (code) {
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_COULDNT_RESOLVE_PROXY:
@@ -398,12 +554,18 @@ DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) co
DBGPRINTF("omelasticsearch: we are suspending ourselfs due "
"to failure %lld of curl_easy_perform()\n",
(long long) code);
- return RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
default:
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
- return RS_RET_OK;
+ break;
}
+
+ pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */
+ DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply);
+
+ CHKiRet(checkResult(pData, message));
finalize_it:
+ free(pData->reply);
RETiRet;
}
@@ -424,7 +586,6 @@ CODESTARTdoAction
if(pData->bulkmode) {
CHKiRet(buildBatch(pData, ppString[0], ppString));
} else {
-dbgprintf("omelasticsearch: doAction calling curlPost\n");
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
ppString));
}
@@ -449,35 +610,20 @@ ENDendTransaction
size_t
curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
{
- unsigned int i;
char *p = (char *)ptr;
- char *jsonData = (char *)userdata;
- static char ok[] = "{\"ok\":true,";
-
- ASSERT(size == 1);
-DBGPRINTF("omelasticsearch request: %s\n", jsonData);
-DBGPRINTF("omelasticsearch result: ");
-for (i = 0; i < nmemb; i++)
- DBGPRINTF("%c", p[i]);
-DBGPRINTF("\n");
-
- if (size == 1 &&
- nmemb > sizeof(ok)-1 &&
- strncmp(p, ok, sizeof(ok)-1) == 0) {
- STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
-dbgprintf("omelasticsearch ok\n");
- } else {
-dbgprintf("omelasticsearch fail\n");
- STATSCOUNTER_INC(indexFailed, mutIndexFailed);
- if (Debug) {
- DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData);
- DBGPRINTF("omelasticsearch (fail) result: ");
- for (i = 0; i < nmemb; i++)
- DBGPRINTF("%c", p[i]);
- DBGPRINTF("\n");
- }
+ instanceData *pData = (instanceData*) userdata;
+ char *buf;
+ size_t newlen;
+
+ newlen = pData->replyLen + size*nmemb;
+ if((buf = realloc(pData->reply, newlen + 1)) == NULL) {
+ DBGPRINTF("omelasticsearch: realloc failed in curlResult\n");
+ return 0; /* abort due to failure */
}
- return size * nmemb;
+ memcpy(buf+pData->replyLen, p, size*nmemb);
+ pData->replyLen = newlen;
+ pData->reply = buf;
+ return size*nmemb;
}
@@ -533,6 +679,7 @@ setInstParamDefaults(instanceData *pData)
pData->asyncRepl = 0;
pData->bulkmode = 0;
pData->tplName = NULL;
+ pData->errorFile = NULL;
}
BEGINnewActInst
@@ -552,6 +699,8 @@ CODESTARTnewActInst
continue;
if(!strcmp(actpblk.descr[i].name, "server")) {
pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "errorfile")) {
+ pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
pData->port = (int) pvals[i].val.d.n, NULL;
} else if(!strcmp(actpblk.descr[i].name, "uid")) {
@@ -693,6 +842,14 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
+BEGINdoHUP
+CODESTARTdoHUP
+ if(pData->fdErrFile != -1) {
+ close(pData->fdErrFile);
+ pData->fdErrFile = -1;
+ }
+ENDdoHUP
+
BEGINmodExit
CODESTARTmodExit
@@ -707,6 +864,7 @@ CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_doHUP
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt