diff options
Diffstat (limited to 'plugins/omelasticsearch')
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 73 |
1 files changed, 48 insertions, 25 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 33e58c1a..b82968d0 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h for more specifics! * * Copyright 2011 Nathan Scott. - * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -58,10 +58,10 @@ DEFobjCurrIf(errmsg) DEFobjCurrIf(statsobj) statsobj_t *indexStats; -STATSCOUNTER_DEF(indexConFail, mutIndexConFail) STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) -STATSCOUNTER_DEF(indexFailed, mutIndexFailed) -STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) +STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail) +STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail) +STATSCOUNTER_DEF(indexESFail, mutIndexESFail) /* REST API for elasticsearch hits this URL: * http://<hostName>:<restPort>/<searchIndex>/<searchType> @@ -91,6 +91,7 @@ typedef struct _instanceData { sbool asyncRepl; struct { es_str_t *data; + int nmemb; /* number of messages in batch (for statistics counting) */ uchar *currTpl1; uchar *currTpl2; } batch; @@ -224,6 +225,12 @@ checkConn(instanceData *pData) DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); ABORT_FINALIZE(RS_RET_SUSPENDED); } + /* Bodypart of request not needed, so set curl opt to nobody and httpget, otherwise lib-curl could sigsegv */ + curl_easy_setopt(curl, CURLOPT_HTTPGET, TRUE); + curl_easy_setopt(curl, CURLOPT_NOBODY, TRUE); + /* Only enable for debugging + curl_easy_setopt(curl, CURLOPT_VERBOSE, TRUE); */ + cstr = es_str2cstr(url, NULL); curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); @@ -426,6 +433,7 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); } + ++pData->batch.nmemb; iRet = RS_RET_DEFER_COMMIT; finalize_it: @@ -483,13 +491,11 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) DBGPRINTF("omelasticsearch: error %d writing error file, write returns %lld\n", errno, (long long) wrRet); } - free(rendered); cJSON_Delete(errRoot); *pReplyRoot = NULL; /* tell caller not to delete once again! */ finalize_it: - if(rendered != NULL) - free(rendered); + free(rendered); RETiRet; } @@ -573,12 +579,15 @@ checkResult(instanceData *pData, uchar *reqmsg) finalize_it: if(root != NULL) cJSON_Delete(root); + if(iRet != RS_RET_OK) { + STATSCOUNTER_INC(indexESFail, mutIndexESFail); + } RETiRet; } static rsRetVal -curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) +curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs) { CURLcode code; CURL *curl = pData->curlHandle; @@ -599,18 +608,21 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_CONNECT: case CURLE_WRITE_ERROR: - STATSCOUNTER_INC(indexConFail, mutIndexConFail); + STATSCOUNTER_INC(indexHTTPReqFail, mutHTTPReqFail); + indexHTTPFail += nmsgs; DBGPRINTF("omelasticsearch: we are suspending ourselfs due " "to failure %lld of curl_easy_perform()\n", (long long) code); ABORT_FINALIZE(RS_RET_SUSPENDED); default: - STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); break; } - pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ - DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); + DBGPRINTF("omelasticsearch: pData replyLen = '%d'\n", pData->replyLen); + if (pData->replyLen > 0) { + pData->reply[pData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ + } + DBGPRINTF("omelasticsearch: pData reply: '%s'\n", pData->reply); CHKiRet(checkResult(pData, message)); finalize_it: @@ -626,17 +638,19 @@ dbgprintf("omelasticsearch: beginTransaction\n"); } es_emptyStr(pData->batch.data); + pData->batch.nmemb = 0; finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); if(pData->bulkmode) { CHKiRet(buildBatch(pData, ppString[0], ppString)); } else { CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString)); + ppString, 1)); } finalize_it: dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); @@ -644,12 +658,17 @@ ENDdoAction BEGINendTransaction - char *cstr; + char *cstr = NULL; CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); - cstr = es_str2cstr(pData->batch.data, NULL); - dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); + /* End Transaction only if batch data is not empty */ + if (pData->batch.data != NULL ) { + cstr = es_str2cstr(pData->batch.data, NULL); + dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb)); + } + else + dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n"); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -981,15 +1000,19 @@ CODEmodInit_QueryRegCFSLineHdlr /* support statistics gathering */ CHKiRet(statsobj.Construct(&indexStats)); - CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", - ctrType_IntCtr, &indexConFail)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + CHKiRet(statsobj.SetName(indexStats, (uchar *)"omelasticsearch")); + STATSCOUNTER_INIT(indexSubmit, mutCtrIndexSubmit); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submitted", ctrType_IntCtr, &indexSubmit)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", - ctrType_IntCtr, &indexFailed)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", - ctrType_IntCtr, &indexSuccess)); + STATSCOUNTER_INIT(indexHTTPFail, mutCtrIndexHTTPFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.http", + ctrType_IntCtr, &indexHTTPFail)); + STATSCOUNTER_INIT(indexHTTPReqFail, mutCtrIndexHTTPReqFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.httprequests", + ctrType_IntCtr, &indexHTTPReqFail)); + STATSCOUNTER_INIT(indexESFail, mutCtrIndexESFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es", + ctrType_IntCtr, &indexESFail)); CHKiRet(statsobj.ConstructFinalize(indexStats)); ENDmodInit |