summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJérôme Renard <jerome.renard@gmail.com>2013-04-24 10:56:52 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2013-04-24 10:56:52 +0200
commit2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764 (patch)
treed7633a2c31ad17215511b7c6487ce13dd0405a53
parentbf52ef7e7e212f12b97eb87b3d21c050f455bc0e (diff)
downloadrsyslog-2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764.tar.gz
rsyslog-2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764.tar.bz2
rsyslog-2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764.zip
omelasticsearch: _id field support for bulk operations
also max number of templates for plugin use has been increased to five closes: http://bugzilla.adiscon.com/show_bug.cgi?id=392
-rw-r--r--ChangeLog4
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c122
-rw-r--r--runtime/rsyslog.h16
3 files changed, 117 insertions, 25 deletions
diff --git a/ChangeLog b/ChangeLog
index 06820a37..34e09d79 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,10 @@
Version 7.3.12 [devel] 2013-04-??
- added doc for omelasticsearch
Thanks to Radu Gheorghe for the doc contribution.
+- omelasticsearch: _id field support for bulk operations
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=392
+ Thanks to Jérôme Renard for the idea and patches.
+- max number of templates for plugin use has been increased to five
- platform compatibility enhancement: solve compile issue with libgcrypt
do not use GCRY_CIPHER_MODE_AESWRAP where not available
- fix compile on Solaris
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index f27fe62b..33e58c1a 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -11,11 +11,11 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
* -or-
* see COPYING.ASL20 in the source distribution
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -79,12 +79,14 @@ typedef struct _instanceData {
uchar *parent;
uchar *tplName;
uchar *timeout;
+ uchar *bulkId;
uchar *restURL; /* last used URL for error reporting */
uchar *errorFile;
char *reply;
sbool dynSrchIdx;
sbool dynSrchType;
sbool dynParent;
+ sbool dynBulkId;
sbool bulkmode;
sbool asyncRepl;
struct {
@@ -114,7 +116,9 @@ static struct cnfparamdescr actpdescr[] = {
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
{ "errorfile", eCmdHdlrGetWord, 0 },
- { "template", eCmdHdlrGetWord, 1 }
+ { "template", eCmdHdlrGetWord, 1 },
+ { "dynbulkid", eCmdHdlrBinary, 0 },
+ { "bulkid", eCmdHdlrGetWord, 0 },
};
static struct cnfparamblk actpblk =
{ CNFPARAMBLK_VERSION,
@@ -156,6 +160,7 @@ CODESTARTfreeInstance
free(pData->timeout);
free(pData->restURL);
free(pData->errorFile);
+ free(pData->bulkId);
ENDfreeInstance
BEGINdbgPrintInstInfo
@@ -177,6 +182,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ?
(uchar*)"(not configured)" : pData->errorFile);
+ dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId);
+ dbgprintf("\tbulkid='%s'\n", pData->bulkId);
ENDdbgPrintInstInfo
@@ -220,7 +227,7 @@ checkConn(instanceData *pData)
cstr = es_str2cstr(url, NULL);
curl_easy_setopt(curl, CURLOPT_URL, cstr);
free(cstr);
-
+
pData->reply = NULL;
pData->replyLen = 0;
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
@@ -250,7 +257,8 @@ ENDtryResume
/* get the current index and type for this message */
static inline void
getIndexTypeAndParent(instanceData *pData, uchar **tpls,
- uchar **srchIndex, uchar **srchType, uchar **parent)
+ uchar **srchIndex, uchar **srchType, uchar **parent,
+ uchar **bulkId)
{
if(pData->dynSrchIdx) {
*srchIndex = tpls[1];
@@ -258,15 +266,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls,
*srchType = tpls[2];
if(pData->dynParent) {
*parent = tpls[3];
+ if(pData->dynBulkId) {
+ *bulkId = tpls[4];
+ }
} else {
*parent = pData->parent;
+ if(pData->dynBulkId) {
+ *bulkId = tpls[3];
+ }
}
} else {
*srchType = pData->searchType;
if(pData->dynParent) {
*parent = tpls[2];
+ if(pData->dynBulkId) {
+ *bulkId = tpls[3];
+ }
} else {
*parent = pData->parent;
+ if(pData->dynBulkId) {
+ *bulkId = tpls[2];
+ }
}
}
} else {
@@ -275,15 +295,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls,
*srchType = tpls[1];
if(pData->dynParent) {
*parent = tpls[2];
+ if(pData->dynBulkId) {
+ *bulkId = tpls[3];
+ }
} else {
*parent = pData->parent;
+ if(pData->dynBulkId) {
+ *bulkId = tpls[2];
+ }
}
} else {
*srchType = pData->searchType;
if(pData->dynParent) {
*parent = tpls[1];
+ if(pData->dynBulkId) {
+ *bulkId = tpls[2];
+ }
} else {
*parent = pData->parent;
+ if(pData->dynBulkId) {
+ *bulkId = tpls[1];
+ }
}
}
}
@@ -297,6 +329,7 @@ setCurlURL(instanceData *pData, uchar **tpls)
uchar *searchIndex;
uchar *searchType;
uchar *parent;
+ uchar *bulkId;
es_str_t *url;
int rLocal;
int r;
@@ -308,7 +341,7 @@ setCurlURL(instanceData *pData, uchar **tpls)
r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
parent = NULL;
} else {
- getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
+ getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
if(r == 0) r = es_addChar(&url, '/');
if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
@@ -330,7 +363,7 @@ setCurlURL(instanceData *pData, uchar **tpls)
free(pData->restURL);
pData->restURL = (uchar*)es_str2cstr(url, NULL);
- curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
+ curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
es_deleteStr(url);
DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL);
@@ -343,7 +376,7 @@ setCurlURL(instanceData *pData, uchar **tpls)
rLocal);
ABORT_FINALIZE(RS_RET_ERR);
}
- curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf);
+ curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf);
curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
}
finalize_it:
@@ -363,13 +396,15 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
uchar *searchIndex;
uchar *searchType;
uchar *parent;
+ uchar *bulkId = NULL;
DEFiRet;
# define META_STRT "{\"index\":{\"_index\": \""
# define META_TYPE "\",\"_type\":\""
# define META_PARENT "\",\"_parent\":\""
+# define META_ID "\", \"_id\":\""
# define META_END "\"}}\n"
- getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
+ getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
@@ -380,6 +415,10 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent));
}
+ if(bulkId != NULL) {
+ if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId));
+ }
if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1);
@@ -409,7 +448,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
ssize_t wrRet;
char errStr[1024];
DEFiRet;
-
+
if(pData->errorFile == NULL) {
DBGPRINTF("omelasticsearch: no local error logger defined - "
"ignoring ES error information\n");
@@ -524,7 +563,7 @@ checkResult(instanceData *pData, uchar *reqmsg)
}
/* Note: we ignore errors writing the error file, as we cannot handle
- * these in any case.
+ * these in any case.
*/
if(iRet == RS_RET_DATAFAIL) {
writeDataError(pData, &root, reqmsg);
@@ -552,8 +591,8 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
CHKiRet(setCurlURL(pData, tpls));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
code = curl_easy_perform(curl);
switch (code) {
case CURLE_COULDNT_RESOLVE_HOST:
@@ -649,10 +688,10 @@ curlSetup(instanceData *pData)
}
header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
- curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
+ curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
- curl_easy_setopt(handle, CURLOPT_POST, 1);
+ curl_easy_setopt(handle, CURLOPT_POST, 1);
pData->curlHandle = handle;
pData->postHeader = header;
@@ -690,6 +729,8 @@ setInstParamDefaults(instanceData *pData)
pData->bulkmode = 0;
pData->tplName = NULL;
pData->errorFile = NULL;
+ pData->dynBulkId= 0;
+ pData->bulkId = NULL;
}
BEGINnewActInst
@@ -737,12 +778,16 @@ CODESTARTnewActInst
pData->asyncRepl = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) {
+ pData->dynBulkId = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "bulkid")) {
+ pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
dbgprintf("omelasticsearch: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
}
}
-
+
if(pData->pwd != NULL && pData->uid == NULL) {
errmsg.LogError(0, RS_RET_UID_MISSING,
"omelasticsearch: password is provided, but no uid "
@@ -767,6 +812,12 @@ CODESTARTnewActInst
"name for parent template given - action definition invalid");
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
+ if(pData->dynBulkId && pData->bulkId == NULL) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
+ "omelasticsearch: requested dynamic bulkid, but no "
+ "name for bulkid template given - action definition invalid");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
if(pData->bulkmode) {
pData->batch.currTpl1 = NULL;
@@ -782,6 +833,7 @@ CODESTARTnewActInst
if(pData->dynSrchIdx) ++iNumTpls;
if(pData->dynSrchType) ++iNumTpls;
if(pData->dynParent) ++iNumTpls;
+ if(pData->dynBulkId) ++iNumTpls;
DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
CODE_STD_STRING_REQUESTnewActInst(iNumTpls)
@@ -803,11 +855,29 @@ CODESTARTnewActInst
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 4, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
} else {
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
}
} else {
@@ -817,12 +887,30 @@ CODESTARTnewActInst
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
} else {
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
- }
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ }
}
}
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 9fdf2b0f..a901d2ef 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -61,9 +61,9 @@
* rgerhards, 2006-11-30
*/
-#define CONF_OMOD_NUMSTRINGS_MAXSIZE 3 /* cache for pointers to output module buffer pointers. All
- * rsyslog-provided plugins do NOT need more than three buffers. If
- * more are needed (future developments, third-parties), rsyslog
+#define CONF_OMOD_NUMSTRINGS_MAXSIZE 5 /* cache for pointers to output module buffer pointers. All
+ * rsyslog-provided plugins do NOT need more than five buffers. If
+ * more are needed (future developments, third-parties), rsyslog
* must be recompiled with a larger parameter. Hardcoding this
* saves us some overhead, both in runtime in code complexity. As
* it is doubtful if ever more than 3 parameters are needed, the
@@ -91,7 +91,7 @@
/* the rsyslog core provides information about present feature to plugins
- * asking it. Below are feature-test macros which must be used to query
+ * asking it. Below are feature-test macros which must be used to query
* features. Note that this must be powers of two, so that multiple queries
* can be combined. -- rgerhards, 2009-04-27
*/
@@ -153,7 +153,7 @@ typedef uintTiny propid_t;
*/
enum rsRetVal_ /** return value. All methods return this if not specified otherwise */
{
- /* the first two define are for errmsg.logError(), so that we can use the rsRetVal
+ /* the first two define are for errmsg.logError(), so that we can use the rsRetVal
* as an rsyslog error code. -- rgerhards, 20080-06-27
*/
RS_RET_NO_ERRCODE = -1, /**< RESERVED for NO_ERRCODE errmsg.logError status name */
@@ -448,7 +448,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
/** Object ID. These are for internal checking. Each
* object is assigned a specific ID. This is contained in
- * all Object structs (just like C++ RTTI). We can use
+ * all Object structs (just like C++ RTTI). We can use
* this field to see if we have been passed a correct ID.
* Other than that, there is currently no other use for
* the object id.
@@ -480,7 +480,7 @@ typedef enum rsObjectID rsObjID;
#endif
/**
- * This macro should be used to free objects.
+ * This macro should be used to free objects.
* It aids in interpreting dumps during debugging.
*/
#ifdef NDEBUG
@@ -547,7 +547,7 @@ rsRetVal rsrtSetErrLogger(rsRetVal (*errLogger)(int, uchar*));
/* TODO: remove this -- this is only for transition of the config system */
extern rsconf_t *ourConf; /* defined by syslogd.c, a hack for functions that do not
- yet receive a copy, so that we can incrementially
+ yet receive a copy, so that we can incrementially
compile and change... -- rgerhars, 2011-04-19 */
#endif /* multi-include protection */