summaryrefslogtreecommitdiffstats
path: root/plugins/omelasticsearch/omelasticsearch.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-07-30 12:40:49 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2012-07-30 12:40:49 +0200
commit36f7fbd5f458c3e8b2924a72f74621e055f319b2 (patch)
tree25f5824779ed297eaf02867f95519578844d5c0c /plugins/omelasticsearch/omelasticsearch.c
parent575686bd68a47fc5d9f59c1ed610f6680e1bb6fa (diff)
downloadrsyslog-36f7fbd5f458c3e8b2924a72f74621e055f319b2.tar.gz
rsyslog-36f7fbd5f458c3e8b2924a72f74621e055f319b2.tar.bz2
rsyslog-36f7fbd5f458c3e8b2924a72f74621e055f319b2.zip
omelasticsearch: support for writing data errors to local file added
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c109
1 files changed, 95 insertions, 14 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 04113170..fea85f22 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -34,6 +34,9 @@
#include <signal.h>
#include <errno.h>
#include <time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
#include "cjson.h"
#include "conf.h"
#include "syslogd-types.h"
@@ -67,6 +70,7 @@ typedef struct curl_slist HEADER;
typedef struct _instanceData {
int port;
int replyLen;
+ int fdErrFile; /* error file fd or -1 if not open */
uchar *server;
uchar *uid;
uchar *pwd;
@@ -75,6 +79,8 @@ typedef struct _instanceData {
uchar *parent;
uchar *tplName;
uchar *timeout;
+ uchar *restURL; /* last used URL for error reporting */
+ uchar *errorFile;
char *reply;
sbool dynSrchIdx;
sbool dynSrchType;
@@ -107,6 +113,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "bulkmode", eCmdHdlrBinary, 0 },
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
+ { "errorfile", eCmdHdlrGetWord, 0 },
{ "template", eCmdHdlrGetWord, 1 }
};
static struct cnfparamblk actpblk =
@@ -117,6 +124,8 @@ static struct cnfparamblk actpblk =
BEGINcreateInstance
CODESTARTcreateInstance
+ pData->restURL = NULL;
+ pData->fdErrFile = -1;
ENDcreateInstance
BEGINisCompatibleWithFeature
@@ -135,6 +144,8 @@ CODESTARTfreeInstance
curl_easy_cleanup(pData->curlHandle);
pData->curlHandle = NULL;
}
+ if(pData->fdErrFile != -1)
+ close(pData->fdErrFile);
free(pData->server);
free(pData->uid);
free(pData->pwd);
@@ -143,6 +154,8 @@ CODESTARTfreeInstance
free(pData->parent);
free(pData->tplName);
free(pData->timeout);
+ free(pData->restURL);
+ free(pData->errorFile);
ENDfreeInstance
BEGINdbgPrintInstInfo
@@ -162,6 +175,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tdynamic parent=%d\n", pData->dynParent);
dbgprintf("\tasync replication=%d\n", pData->asyncRepl);
dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
+ dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ?
+ (uchar*)"(not configured)" : pData->errorFile);
ENDdbgPrintInstInfo
@@ -279,7 +294,6 @@ static rsRetVal
setCurlURL(instanceData *pData, uchar **tpls)
{
char authBuf[1024];
- char *restURL;
uchar *searchIndex;
uchar *searchType;
uchar *parent;
@@ -313,11 +327,12 @@ setCurlURL(instanceData *pData, uchar **tpls)
if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1);
if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent));
}
- restURL = es_str2cstr(url, NULL);
- curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
+
+ free(pData->restURL);
+ pData->restURL = (uchar*)es_str2cstr(url, NULL);
+ curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
es_deleteStr(url);
- DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL);
- free(restURL);
+ DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL);
if(pData->uid != NULL) {
rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid,
@@ -355,9 +370,6 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
# define META_END "\"}}\n"
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
-dbgprintf("AAA: searchIndex: '%s'\n", searchIndex);
-dbgprintf("AAA: searchType: '%s'\n", searchType);
-dbgprintf("AAA: parent: '%s'\n", parent);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
@@ -381,6 +393,55 @@ finalize_it:
RETiRet;
}
+
+/* write data error request/replies to separate error file
+ * Note: we open the file but never close it before exit. If it
+ * needs to be closed, HUP must be sent.
+ */
+static inline rsRetVal
+writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
+{
+ char *rendered = NULL;
+ cJSON *errRoot;
+ cJSON *req;
+ cJSON *replyRoot = *pReplyRoot;
+ char errStr[1024];
+ DEFiRet;
+
+ if(pData->errorFile == NULL)
+ FINALIZE;
+
+ if(pData->fdErrFile == -1) {
+ pData->fdErrFile = open((char*)pData->errorFile,
+ O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
+ S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+ if(pData->fdErrFile == -1) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ DBGPRINTF("omelasticsearch: error opening error file: %s\n", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+ if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
+ cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL));
+ cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg));
+
+ if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
+ cJSON_AddItemToObject(errRoot, "request", req);
+ cJSON_AddItemToObject(errRoot, "reply", replyRoot);
+ rendered = cJSON_Print(errRoot);
+DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered);
+ write(pData->fdErrFile, rendered, strlen(rendered));
+ free(rendered);
+ cJSON_Delete(errRoot);
+ *pReplyRoot = NULL; /* tell caller not to delete once again! */
+
+finalize_it:
+ if(rendered != NULL)
+ free(rendered);
+ RETiRet;
+}
+
+
static inline rsRetVal
checkResultBulkmode(instanceData *pData, cJSON *root)
{
@@ -420,7 +481,6 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
"item ok (%p) not ok\n", ok);
ABORT_FINALIZE(RS_RET_DATAFAIL);
}
-dbgprintf("omelasticsearch: item %d is OK\n", i);
}
finalize_it:
@@ -429,7 +489,7 @@ finalize_it:
static inline rsRetVal
-checkResult(instanceData *pData)
+checkResult(instanceData *pData, uchar *reqmsg)
{
cJSON *root;
cJSON *ok;
@@ -446,11 +506,21 @@ checkResult(instanceData *pData)
} else {
ok = cJSON_GetObjectItem(root, "ok");
if(ok == NULL || ok->type != cJSON_True) {
- ABORT_FINALIZE(RS_RET_DATAFAIL);
+ iRet = RS_RET_DATAFAIL;
}
}
+
+ /* Note: we ignore errors writing the error file, as we cannot handle
+ * these in any case.
+ */
+ if(iRet == RS_RET_DATAFAIL) {
+ writeDataError(pData, &root, reqmsg);
+ iRet = RS_RET_OK; /* we have handled the problem! */
+ }
+
finalize_it:
- cJSON_Delete(root);
+ if(root != NULL)
+ cJSON_Delete(root);
RETiRet;
}
@@ -490,7 +560,7 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */
DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply);
- CHKiRet(checkResult(pData));
+ CHKiRet(checkResult(pData, message));
finalize_it:
free(pData->reply);
RETiRet;
@@ -513,7 +583,6 @@ CODESTARTdoAction
if(pData->bulkmode) {
CHKiRet(buildBatch(pData, ppString[0], ppString));
} else {
-dbgprintf("omelasticsearch: doAction calling curlPost\n");
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
ppString));
}
@@ -607,6 +676,7 @@ setInstParamDefaults(instanceData *pData)
pData->asyncRepl = 0;
pData->bulkmode = 0;
pData->tplName = NULL;
+ pData->errorFile = NULL;
}
BEGINnewActInst
@@ -626,6 +696,8 @@ CODESTARTnewActInst
continue;
if(!strcmp(actpblk.descr[i].name, "server")) {
pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "errorfile")) {
+ pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
pData->port = (int) pvals[i].val.d.n, NULL;
} else if(!strcmp(actpblk.descr[i].name, "uid")) {
@@ -767,6 +839,14 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
+BEGINdoHUP
+CODESTARTdoHUP
+ if(pData->fdErrFile != -1) {
+ close(pData->fdErrFile);
+ pData->fdErrFile = -1;
+ }
+ENDdoHUP
+
BEGINmodExit
CODESTARTmodExit
@@ -781,6 +861,7 @@ CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_doHUP
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt