summaryrefslogtreecommitdiffstats
path: root/plugins/omelasticsearch
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omelasticsearch')
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c21
1 files changed, 16 insertions, 5 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index b6742f0d..7fa06d0f 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h for more specifics!
*
* Copyright 2011 Nathan Scott.
- * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2009-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -60,6 +60,7 @@ DEFobjCurrIf(statsobj)
statsobj_t *indexStats;
STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
+STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
/* REST API for elasticsearch hits this URL:
@@ -90,6 +91,7 @@ typedef struct _instanceData {
sbool asyncRepl;
struct {
es_str_t *data;
+ int nmemb; /* number of messages in batch (for statistics counting) */
uchar *currTpl1;
uchar *currTpl2;
} batch;
@@ -431,6 +433,7 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r);
ABORT_FINALIZE(RS_RET_ERR);
}
+ ++pData->batch.nmemb;
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
@@ -584,7 +587,7 @@ finalize_it:
static rsRetVal
-curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
+curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs)
{
CURLcode code;
CURL *curl = pData->curlHandle;
@@ -605,7 +608,8 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_CONNECT:
case CURLE_WRITE_ERROR:
- STATSCOUNTER_INC(indexHTTPFail, mutIndexHTTPFail);
+ STATSCOUNTER_INC(indexHTTPReqFail, mutHTTPReqFail);
+ indexHTTPFail += nmsgs;
DBGPRINTF("omelasticsearch: we are suspending ourselfs due "
"to failure %lld of curl_easy_perform()\n",
(long long) code);
@@ -634,6 +638,7 @@ dbgprintf("omelasticsearch: beginTransaction\n");
}
es_emptyStr(pData->batch.data);
+ pData->batch.nmemb = 0;
finalize_it:
ENDbeginTransaction
@@ -645,7 +650,7 @@ CODESTARTdoAction
CHKiRet(buildBatch(pData, ppString[0], ppString));
} else {
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
- ppString));
+ ppString, 1));
}
finalize_it:
dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
@@ -660,7 +665,7 @@ dbgprintf("omelasticsearch: endTransaction init\n");
if (pData->batch.data != NULL ) {
cstr = es_str2cstr(pData->batch.data, NULL);
dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
- CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL));
+ CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb));
}
else
dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n");
@@ -996,10 +1001,16 @@ CODEmodInit_QueryRegCFSLineHdlr
/* support statistics gathering */
CHKiRet(statsobj.Construct(&indexStats));
CHKiRet(statsobj.SetName(indexStats, (uchar *)"omelasticsearch"));
+ STATSCOUNTER_INIT(indexSubmit, mutCtrIndexSubmit);
CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submitted",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSubmit));
+ STATSCOUNTER_INIT(indexHTTPFail, mutCtrIndexHTTPFail);
CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.http",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPFail));
+ STATSCOUNTER_INIT(indexHTTPReqFail, mutCtrIndexHTTPReqFail);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.httprequests",
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPReqFail));
+ STATSCOUNTER_INIT(indexESFail, mutCtrIndexESFail);
CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
CHKiRet(statsobj.ConstructFinalize(indexStats));