summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c13
1 files changed, 8 insertions, 5 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index d49667f9..c1b2abd0 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h for more specifics!
*
* Copyright 2011 Nathan Scott.
- * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2009-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -90,6 +90,7 @@ typedef struct _instanceData {
sbool asyncRepl;
struct {
es_str_t *data;
+ int nmemb; /* number of messages in batch (for statistics counting) */
uchar *currTpl1;
uchar *currTpl2;
} batch;
@@ -431,6 +432,7 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r);
ABORT_FINALIZE(RS_RET_ERR);
}
+ ++pData->batch.nmemb;
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
@@ -584,7 +586,7 @@ finalize_it:
static rsRetVal
-curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
+curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs)
{
CURLcode code;
CURL *curl = pData->curlHandle;
@@ -605,7 +607,7 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_CONNECT:
case CURLE_WRITE_ERROR:
- STATSCOUNTER_INC(indexHTTPFail, mutIndexHTTPFail);
+ indexHTTPFail += nmsgs;
DBGPRINTF("omelasticsearch: we are suspending ourselfs due "
"to failure %lld of curl_easy_perform()\n",
(long long) code);
@@ -634,6 +636,7 @@ dbgprintf("omelasticsearch: beginTransaction\n");
}
es_emptyStr(pData->batch.data);
+ pData->batch.nmemb = 0;
finalize_it:
ENDbeginTransaction
@@ -645,7 +648,7 @@ CODESTARTdoAction
CHKiRet(buildBatch(pData, ppString[0], ppString));
} else {
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
- ppString));
+ ppString, 1));
}
finalize_it:
dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
@@ -660,7 +663,7 @@ dbgprintf("omelasticsearch: endTransaction init\n");
if (pData->batch.data != NULL ) {
cstr = es_str2cstr(pData->batch.data, NULL);
dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
- CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL));
+ CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb));
}
else
dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n");