diff options
author | Jérôme Renard <jerome.renard@gmail.com> | 2013-04-24 10:56:52 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-04-24 10:56:52 +0200 |
commit | 2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764 (patch) | |
tree | d7633a2c31ad17215511b7c6487ce13dd0405a53 | |
parent | bf52ef7e7e212f12b97eb87b3d21c050f455bc0e (diff) | |
download | rsyslog-2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764.tar.gz rsyslog-2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764.tar.bz2 rsyslog-2f68d5f0d8a5c3ffef6bf52f27abc726ad27d764.zip |
omelasticsearch: _id field support for bulk operations
also max number of templates for plugin use has been increased to five
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=392
-rw-r--r-- | ChangeLog | 4 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 122 | ||||
-rw-r--r-- | runtime/rsyslog.h | 16 |
3 files changed, 117 insertions, 25 deletions
@@ -2,6 +2,10 @@ Version 7.3.12 [devel] 2013-04-?? - added doc for omelasticsearch Thanks to Radu Gheorghe for the doc contribution. +- omelasticsearch: _id field support for bulk operations + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=392 + Thanks to Jérôme Renard for the idea and patches. +- max number of templates for plugin use has been increased to five - platform compatibility enhancement: solve compile issue with libgcrypt do not use GCRY_CIPHER_MODE_AESWRAP where not available - fix compile on Solaris diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f27fe62b..33e58c1a 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -11,11 +11,11 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * -or- * see COPYING.ASL20 in the source distribution - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -79,12 +79,14 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *bulkId; uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; + sbool dynBulkId; sbool bulkmode; sbool asyncRepl; struct { @@ -114,7 +116,9 @@ static struct cnfparamdescr actpdescr[] = { { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 } + { "template", eCmdHdlrGetWord, 1 }, + { "dynbulkid", eCmdHdlrBinary, 0 }, + { "bulkid", eCmdHdlrGetWord, 0 }, }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, @@ -156,6 +160,7 @@ CODESTARTfreeInstance free(pData->timeout); free(pData->restURL); free(pData->errorFile); + free(pData->bulkId); ENDfreeInstance BEGINdbgPrintInstInfo @@ -177,6 +182,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tbulkmode=%d\n", pData->bulkmode); dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? (uchar*)"(not configured)" : pData->errorFile); + dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId); + dbgprintf("\tbulkid='%s'\n", pData->bulkId); ENDdbgPrintInstInfo @@ -220,7 +227,7 @@ checkConn(instanceData *pData) cstr = es_str2cstr(url, NULL); curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - + pData->reply = NULL; pData->replyLen = 0; curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); @@ -250,7 +257,8 @@ ENDtryResume /* get the current index and type for this message */ static inline void getIndexTypeAndParent(instanceData *pData, uchar **tpls, - uchar **srchIndex, uchar **srchType, uchar **parent) + uchar **srchIndex, uchar **srchType, uchar **parent, + uchar **bulkId) { if(pData->dynSrchIdx) { *srchIndex = tpls[1]; @@ -258,15 +266,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[2]; if(pData->dynParent) { *parent = tpls[3]; + if(pData->dynBulkId) { + *bulkId = tpls[4]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } } else { @@ -275,15 +295,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[1]; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[1]; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[1]; + } } } } @@ -297,6 +329,7 @@ setCurlURL(instanceData *pData, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId; es_str_t *url; int rLocal; int r; @@ -308,7 +341,7 @@ setCurlURL(instanceData *pData, uchar **tpls) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); parent = NULL; } else { - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -330,7 +363,7 @@ setCurlURL(instanceData *pData, uchar **tpls) free(pData->restURL); pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); @@ -343,7 +376,7 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: @@ -363,13 +396,15 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId = NULL; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_PARENT "\",\"_parent\":\"" +# define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -380,6 +415,10 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); } + if(bulkId != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -409,7 +448,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) ssize_t wrRet; char errStr[1024]; DEFiRet; - + if(pData->errorFile == NULL) { DBGPRINTF("omelasticsearch: no local error logger defined - " "ignoring ES error information\n"); @@ -524,7 +563,7 @@ checkResult(instanceData *pData, uchar *reqmsg) } /* Note: we ignore errors writing the error file, as we cannot handle - * these in any case. + * these in any case. */ if(iRet == RS_RET_DATAFAIL) { writeDataError(pData, &root, reqmsg); @@ -552,8 +591,8 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) CHKiRet(setCurlURL(pData, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: @@ -649,10 +688,10 @@ curlSetup(instanceData *pData) } header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_POST, 1); + curl_easy_setopt(handle, CURLOPT_POST, 1); pData->curlHandle = handle; pData->postHeader = header; @@ -690,6 +729,8 @@ setInstParamDefaults(instanceData *pData) pData->bulkmode = 0; pData->tplName = NULL; pData->errorFile = NULL; + pData->dynBulkId= 0; + pData->bulkId = NULL; } BEGINnewActInst @@ -737,12 +778,16 @@ CODESTARTnewActInst pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) { + pData->dynBulkId = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkid")) { + pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("omelasticsearch: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); } } - + if(pData->pwd != NULL && pData->uid == NULL) { errmsg.LogError(0, RS_RET_UID_MISSING, "omelasticsearch: password is provided, but no uid " @@ -767,6 +812,12 @@ CODESTARTnewActInst "name for parent template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynBulkId && pData->bulkId == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic bulkid, but no " + "name for bulkid template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -782,6 +833,7 @@ CODESTARTnewActInst if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; + if(pData->dynBulkId) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTnewActInst(iNumTpls) @@ -803,11 +855,29 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 4, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } } else { @@ -817,12 +887,30 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); - } + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } } } diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 9fdf2b0f..a901d2ef 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -61,9 +61,9 @@ * rgerhards, 2006-11-30 */ -#define CONF_OMOD_NUMSTRINGS_MAXSIZE 3 /* cache for pointers to output module buffer pointers. All - * rsyslog-provided plugins do NOT need more than three buffers. If - * more are needed (future developments, third-parties), rsyslog +#define CONF_OMOD_NUMSTRINGS_MAXSIZE 5 /* cache for pointers to output module buffer pointers. All + * rsyslog-provided plugins do NOT need more than five buffers. If + * more are needed (future developments, third-parties), rsyslog * must be recompiled with a larger parameter. Hardcoding this * saves us some overhead, both in runtime in code complexity. As * it is doubtful if ever more than 3 parameters are needed, the @@ -91,7 +91,7 @@ /* the rsyslog core provides information about present feature to plugins - * asking it. Below are feature-test macros which must be used to query + * asking it. Below are feature-test macros which must be used to query * features. Note that this must be powers of two, so that multiple queries * can be combined. -- rgerhards, 2009-04-27 */ @@ -153,7 +153,7 @@ typedef uintTiny propid_t; */ enum rsRetVal_ /** return value. All methods return this if not specified otherwise */ { - /* the first two define are for errmsg.logError(), so that we can use the rsRetVal + /* the first two define are for errmsg.logError(), so that we can use the rsRetVal * as an rsyslog error code. -- rgerhards, 20080-06-27 */ RS_RET_NO_ERRCODE = -1, /**< RESERVED for NO_ERRCODE errmsg.logError status name */ @@ -448,7 +448,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth /** Object ID. These are for internal checking. Each * object is assigned a specific ID. This is contained in - * all Object structs (just like C++ RTTI). We can use + * all Object structs (just like C++ RTTI). We can use * this field to see if we have been passed a correct ID. * Other than that, there is currently no other use for * the object id. @@ -480,7 +480,7 @@ typedef enum rsObjectID rsObjID; #endif /** - * This macro should be used to free objects. + * This macro should be used to free objects. * It aids in interpreting dumps during debugging. */ #ifdef NDEBUG @@ -547,7 +547,7 @@ rsRetVal rsrtSetErrLogger(rsRetVal (*errLogger)(int, uchar*)); /* TODO: remove this -- this is only for transition of the config system */ extern rsconf_t *ourConf; /* defined by syslogd.c, a hack for functions that do not - yet receive a copy, so that we can incrementially + yet receive a copy, so that we can incrementially compile and change... -- rgerhars, 2011-04-19 */ #endif /* multi-include protection */ |