diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-07-30 12:40:49 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-07-30 12:40:49 +0200 |
commit | 36f7fbd5f458c3e8b2924a72f74621e055f319b2 (patch) | |
tree | 25f5824779ed297eaf02867f95519578844d5c0c /plugins/omelasticsearch/omelasticsearch.c | |
parent | 575686bd68a47fc5d9f59c1ed610f6680e1bb6fa (diff) | |
download | rsyslog-36f7fbd5f458c3e8b2924a72f74621e055f319b2.tar.gz rsyslog-36f7fbd5f458c3e8b2924a72f74621e055f319b2.tar.bz2 rsyslog-36f7fbd5f458c3e8b2924a72f74621e055f319b2.zip |
omelasticsearch: support for writing data errors to local file added
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 109 |
1 files changed, 95 insertions, 14 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 04113170..fea85f22 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -34,6 +34,9 @@ #include <signal.h> #include <errno.h> #include <time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> #include "cjson.h" #include "conf.h" #include "syslogd-types.h" @@ -67,6 +70,7 @@ typedef struct curl_slist HEADER; typedef struct _instanceData { int port; int replyLen; + int fdErrFile; /* error file fd or -1 if not open */ uchar *server; uchar *uid; uchar *pwd; @@ -75,6 +79,8 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *restURL; /* last used URL for error reporting */ + uchar *errorFile; char *reply; sbool dynSrchIdx; sbool dynSrchType; @@ -107,6 +113,7 @@ static struct cnfparamdescr actpdescr[] = { { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, + { "errorfile", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -117,6 +124,8 @@ static struct cnfparamblk actpblk = BEGINcreateInstance CODESTARTcreateInstance + pData->restURL = NULL; + pData->fdErrFile = -1; ENDcreateInstance BEGINisCompatibleWithFeature @@ -135,6 +144,8 @@ CODESTARTfreeInstance curl_easy_cleanup(pData->curlHandle); pData->curlHandle = NULL; } + if(pData->fdErrFile != -1) + close(pData->fdErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -143,6 +154,8 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); + free(pData->restURL); + free(pData->errorFile); ENDfreeInstance BEGINdbgPrintInstInfo @@ -162,6 +175,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); + dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? + (uchar*)"(not configured)" : pData->errorFile); ENDdbgPrintInstInfo @@ -279,7 +294,6 @@ static rsRetVal setCurlURL(instanceData *pData, uchar **tpls) { char authBuf[1024]; - char *restURL; uchar *searchIndex; uchar *searchType; uchar *parent; @@ -313,11 +327,12 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - restURL = es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL); + + free(pData->restURL); + pData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL); - free(restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -355,9 +370,6 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_END "\"}}\n" getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); -dbgprintf("AAA: searchIndex: '%s'\n", searchIndex); -dbgprintf("AAA: searchType: '%s'\n", searchType); -dbgprintf("AAA: parent: '%s'\n", parent); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -381,6 +393,55 @@ finalize_it: RETiRet; } + +/* write data error request/replies to separate error file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + */ +static inline rsRetVal +writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +{ + char *rendered = NULL; + cJSON *errRoot; + cJSON *req; + cJSON *replyRoot = *pReplyRoot; + char errStr[1024]; + DEFiRet; + + if(pData->errorFile == NULL) + FINALIZE; + + if(pData->fdErrFile == -1) { + pData->fdErrFile = open((char*)pData->errorFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdErrFile == -1) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGPRINTF("omelasticsearch: error opening error file: %s\n", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); + + if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + cJSON_AddItemToObject(errRoot, "request", req); + cJSON_AddItemToObject(errRoot, "reply", replyRoot); + rendered = cJSON_Print(errRoot); +DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); + write(pData->fdErrFile, rendered, strlen(rendered)); + free(rendered); + cJSON_Delete(errRoot); + *pReplyRoot = NULL; /* tell caller not to delete once again! */ + +finalize_it: + if(rendered != NULL) + free(rendered); + RETiRet; +} + + static inline rsRetVal checkResultBulkmode(instanceData *pData, cJSON *root) { @@ -420,7 +481,6 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); "item ok (%p) not ok\n", ok); ABORT_FINALIZE(RS_RET_DATAFAIL); } -dbgprintf("omelasticsearch: item %d is OK\n", i); } finalize_it: @@ -429,7 +489,7 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData) +checkResult(instanceData *pData, uchar *reqmsg) { cJSON *root; cJSON *ok; @@ -446,11 +506,21 @@ checkResult(instanceData *pData) } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { - ABORT_FINALIZE(RS_RET_DATAFAIL); + iRet = RS_RET_DATAFAIL; } } + + /* Note: we ignore errors writing the error file, as we cannot handle + * these in any case. + */ + if(iRet == RS_RET_DATAFAIL) { + writeDataError(pData, &root, reqmsg); + iRet = RS_RET_OK; /* we have handled the problem! */ + } + finalize_it: - cJSON_Delete(root); + if(root != NULL) + cJSON_Delete(root); RETiRet; } @@ -490,7 +560,7 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); - CHKiRet(checkResult(pData)); + CHKiRet(checkResult(pData, message)); finalize_it: free(pData->reply); RETiRet; @@ -513,7 +583,6 @@ CODESTARTdoAction if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { -dbgprintf("omelasticsearch: doAction calling curlPost\n"); CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), ppString)); } @@ -607,6 +676,7 @@ setInstParamDefaults(instanceData *pData) pData->asyncRepl = 0; pData->bulkmode = 0; pData->tplName = NULL; + pData->errorFile = NULL; } BEGINnewActInst @@ -626,6 +696,8 @@ CODESTARTnewActInst continue; if(!strcmp(actpblk.descr[i].name, "server")) { pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "errorfile")) { + pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n, NULL; } else if(!strcmp(actpblk.descr[i].name, "uid")) { @@ -767,6 +839,14 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct +BEGINdoHUP +CODESTARTdoHUP + if(pData->fdErrFile != -1) { + close(pData->fdErrFile); + pData->fdErrFile = -1; + } +ENDdoHUP + BEGINmodExit CODESTARTmodExit @@ -781,6 +861,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_doHUP CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt |