diff options
Diffstat (limited to 'plugins/ommongodb/ommongodb.c')
-rw-r--r-- | plugins/ommongodb/ommongodb.c | 253 |
1 files changed, 206 insertions, 47 deletions
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c index 39e2e4f9..acc40ab7 100644 --- a/plugins/ommongodb/ommongodb.c +++ b/plugins/ommongodb/ommongodb.c @@ -30,8 +30,12 @@ #include <errno.h> #include <assert.h> #include <signal.h> +#include <stdint.h> #include <time.h> #include <mongo.h> +#include <json/json.h> +/* For struct json_object_iter, should not be necessary in future versions */ +#include <json/json_object_private.h> #include "rsyslog.h" #include "conf.h" @@ -42,6 +46,7 @@ #include "datetime.h" #include "errmsg.h" #include "cfsysline.h" +#include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -54,6 +59,7 @@ DEFobjCurrIf(datetime) typedef struct _instanceData { mongo_sync_connection *conn; + struct json_tokener *json_tokener; /* only if (tplName != NULL) */ uchar *server; int port; uchar *db; @@ -62,6 +68,7 @@ typedef struct _instanceData { uchar *pwd; uchar *dbNcoll; uchar *tplName; + int bErrMsgPermitted; /* only one errmsg permitted per connection */ } instanceData; @@ -108,11 +115,14 @@ static void closeMongoDB(instanceData *pData) BEGINfreeInstance CODESTARTfreeInstance closeMongoDB(pData); + if (pData->json_tokener != NULL) + json_tokener_free(pData->json_tokener); free(pData->server); free(pData->db); free(pData->collection); free(pData->uid); free(pData->pwd); + free(pData->dbNcoll); free(pData->tplName); ENDfreeInstance @@ -120,6 +130,7 @@ ENDfreeInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* nothing special here */ + (void)pData; ENDdbgPrintInstInfo @@ -129,17 +140,21 @@ static void reportMongoError(instanceData *pData) { char errStr[1024]; - errmsg.LogError(0, RS_RET_ERR, "ommongodb: error: %s", - rs_strerror_r(errno, errStr, sizeof(errStr))); -#if 0 gchar *err; - if(mongo_sync_cmd_get_last_error(pData->conn, (gchar*)pData->db, &err) == TRUE) { - errmsg.LogError(0, RS_RET_ERR, "ommongodb: error: %s", err); - } else { - errmsg.LogError(0, RS_RET_ERR, "ommongodb: we had an error, but can " - "not obtain specifics"); + int eno; + + if(pData->bErrMsgPermitted) { + eno = errno; + if(mongo_sync_cmd_get_last_error(pData->conn, (gchar*)pData->db, &err) == TRUE) { + errmsg.LogError(0, RS_RET_ERR, "ommongodb: error: %s", err); + } else { + DBGPRINTF("ommongodb: we had an error, but can not obtain specifics, " + "using plain old errno error message generator\n"); + errmsg.LogError(0, RS_RET_ERR, "ommongodb: error: %s", + rs_strerror_r(eno, errStr, sizeof(errStr))); + } + pData->bErrMsgPermitted = 0; } -#endif } @@ -202,35 +217,30 @@ i10pow(int exp) } return r; } -/* write to mongodb in MSG passing mode, that is without a template. +/* Return a BSON document when an user hasn't specified a template. * In this mode, we use the standard document format, which is somewhat * aligned to cee (as described in project lumberjack). Note that this is * a moving target, so we may run out of sync (and stay so to retain * backward compatibility, which we consider pretty important). */ -rsRetVal writeMongoDB_msg(msg_t *pMsg, instanceData *pData) +static bson * +getDefaultBSON(msg_t *pMsg) { bson *doc = NULL; - uchar *procid; short unsigned procid_free; size_t procid_len; - uchar *tag; short unsigned tag_free; size_t tag_len; - uchar *pid; short unsigned pid_free; size_t pid_len; - uchar *sys; short unsigned sys_free; size_t sys_len; - uchar *msg; short unsigned msg_free; size_t msg_len; + uchar *procid; short unsigned procid_free; rs_size_t procid_len; + uchar *tag; short unsigned tag_free; rs_size_t tag_len; + uchar *pid; short unsigned pid_free; rs_size_t pid_len; + uchar *sys; short unsigned sys_free; rs_size_t sys_len; + uchar *msg; short unsigned msg_free; rs_size_t msg_len; int severity, facil; gint64 ts_gen, ts_rcv; /* timestamps: generated, received */ int secfrac; - DEFiRet; - - /* see if we are ready to proceed */ - if(pData->conn == NULL) { - CHKiRet(initMongoDB(pData, 0)); - } - procid = MsgGetProp(pMsg, NULL, PROP_PROGRAMNAME, NULL, &procid_len, &procid_free); - tag = MsgGetProp(pMsg, NULL, PROP_SYSLOGTAG, NULL, &tag_len, &tag_free); - pid = MsgGetProp(pMsg, NULL, PROP_PROCID, NULL, &pid_len, &pid_free); - sys = MsgGetProp(pMsg, NULL, PROP_HOSTNAME, NULL, &sys_len, &sys_free); - msg = MsgGetProp(pMsg, NULL, PROP_MSG, NULL, &msg_len, &msg_free); + procid = MsgGetProp(pMsg, NULL, PROP_PROGRAMNAME, NULL, &procid_len, &procid_free, NULL); + tag = MsgGetProp(pMsg, NULL, PROP_SYSLOGTAG, NULL, &tag_len, &tag_free, NULL); + pid = MsgGetProp(pMsg, NULL, PROP_PROCID, NULL, &pid_len, &pid_free, NULL); + sys = MsgGetProp(pMsg, NULL, PROP_HOSTNAME, NULL, &sys_len, &sys_free, NULL); + msg = MsgGetProp(pMsg, NULL, PROP_MSG, NULL, &msg_len, &msg_free, NULL); // TODO: move to datetime? Refactor in any case! rgerhards, 2012-03-30 ts_gen = (gint64) datetime.syslogTime2time_t(&pMsg->tTIMESTAMP) * 1000; /* ms! */ @@ -276,22 +286,129 @@ dbgprintf("ommongodb: secfrac is %d, precision %d\n", pMsg->tTIMESTAMP.secfrac, if(sys_free) free(sys); if(msg_free) free(msg); - if(doc == NULL) { - reportMongoError(pData); - dbgprintf("ommongodb: error creating BSON doc\n"); - ABORT_FINALIZE(RS_RET_SUSPENDED); - } + if(doc == NULL) + return doc; bson_finish(doc); - if(!mongo_sync_cmd_insert(pData->conn, (char*)pData->dbNcoll, doc, NULL)) { - reportMongoError(pData); - dbgprintf("ommongodb: insert error\n"); - ABORT_FINALIZE(RS_RET_SUSPENDED); + return doc; +} + +static bson *BSONFromJSONArray(struct json_object *json); +static bson *BSONFromJSONObject(struct json_object *json); + +/* Append a BSON variant of json to doc using name. Return TRUE on success */ +static gboolean +BSONAppendJSONObject(bson *doc, const gchar *name, struct json_object *json) +{ + switch(json != NULL ? json_object_get_type(json) : json_type_null) { + case json_type_null: + return bson_append_null(doc, name); + case json_type_boolean: + return bson_append_boolean(doc, name, + json_object_get_boolean(json)); + case json_type_double: + return bson_append_double(doc, name, + json_object_get_double(json)); + case json_type_int: { + int64_t i; + + /* FIXME: the future version will have get_int64 */ + i = json_object_get_int(json); + if (i >= INT32_MIN && i <= INT32_MAX) + return bson_append_int32(doc, name, i); + else + return bson_append_int64(doc, name, i); + } + case json_type_object: { + bson *sub; + gboolean ok; + + sub = BSONFromJSONObject(json); + if (sub == NULL) + return FALSE; + ok = bson_append_document(doc, name, sub); + bson_free(sub); + return ok; } + case json_type_array: { + bson *sub; + gboolean ok; + + sub = BSONFromJSONArray(json); + if (sub == NULL) + return FALSE; + ok = bson_append_document(doc, name, sub); + bson_free(sub); + return ok; + } + case json_type_string: + return bson_append_string(doc, name, + json_object_get_string(json), -1); -finalize_it: + default: + return FALSE; + } +} + +/* Return a BSON variant of json, which must be a json_type_array */ +static bson * +BSONFromJSONArray(struct json_object *json) +{ + /* Way more than necessary */ + bson *doc = NULL; + size_t i, array_len; + + doc = bson_new(); + if(doc == NULL) + goto error; + + array_len = json_object_array_length(json); + for (i = 0; i < array_len; i++) { + char buf[sizeof(size_t) * CHAR_BIT + 1]; + + if ((size_t)snprintf(buf, sizeof(buf), "%zu", i) >= sizeof(buf)) + goto error; + if (BSONAppendJSONObject(doc, buf, + json_object_array_get_idx(json, i)) + == FALSE) + goto error; + } + + if(bson_finish(doc) == FALSE) + goto error; + + return doc; + +error: if(doc != NULL) bson_free(doc); - RETiRet; + return NULL; +} + +/* Return a BSON variant of json, which must be a json_type_object */ +static bson * +BSONFromJSONObject(struct json_object *json) +{ + bson *doc = NULL; + struct json_object_iter it; + + doc = bson_new(); + if(doc == NULL) + goto error; + + json_object_object_foreachC(json, it) { + if (BSONAppendJSONObject(doc, it.key, it.val) == FALSE) + goto error; + } + + if(bson_finish(doc) == FALSE) + goto error; + + return doc; + +error: + if(doc != NULL) + bson_free(doc); + return NULL; } BEGINtryResume @@ -302,10 +419,34 @@ CODESTARTtryResume ENDtryResume BEGINdoAction + bson *doc = NULL; CODESTARTdoAction + /* see if we are ready to proceed */ + if(pData->conn == NULL) { + CHKiRet(initMongoDB(pData, 0)); + } + if(pData->tplName == NULL) { - iRet = writeMongoDB_msg((msg_t*)ppString[0], pData); + doc = getDefaultBSON((msg_t*)ppString[0]); + } else { + doc = BSONFromJSONObject((struct json_object *)ppString[0]); } + if(doc == NULL) { + dbgprintf("ommongodb: error creating BSON doc\n"); + /* FIXME: is this a correct return code? */ + ABORT_FINALIZE(RS_RET_ERR); + } + if(mongo_sync_cmd_insert(pData->conn, (char*)pData->dbNcoll, doc, NULL)) { + pData->bErrMsgPermitted = 1; + } else { + dbgprintf("ommongodb: insert error\n"); + reportMongoError(pData); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(doc != NULL) + bson_free(doc); ENDdoAction @@ -340,7 +481,7 @@ CODESTARTnewActInst if(!strcmp(actpblk.descr[i].name, "server")) { pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { - pData->port = (int) pvals[i].val.d.n, NULL; + pData->port = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "db")) { pData->db = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "collection")) { @@ -360,12 +501,9 @@ CODESTARTnewActInst if(pData->tplName == NULL) { CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); } else { - errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, - "ommongodb: templates are not supported in this version"); - ABORT_FINALIZE(RS_RET_ERR); - CHKiRet(OMSRsetEntry(*ppOMSR, 0, - (uchar*) strdup((char*) pData->tplName), - OMSR_TPL_AS_ARRAY)); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, ustrdup(pData->tplName), + OMSR_TPL_AS_JSON)); + CHKmalloc(pData->json_tokener = json_tokener_new()); } if(pData->db == NULL) @@ -417,6 +555,10 @@ CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt BEGINmodInit() + rsRetVal localRet; + rsRetVal (*pomsrGetSupportedTplOpts)(unsigned long *pOpts); + unsigned long opts; + int bJSONPassingSupported; CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr @@ -424,5 +566,22 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); DBGPRINTF("ommongodb: module compiled with rsyslog version %s.\n", VERSION); - //DBGPRINTF("ommongodb: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); + + /* check if the rsyslog core supports parameter passing code */ + bJSONPassingSupported = 0; + localRet = pHostQueryEtryPt((uchar*)"OMSRgetSupportedTplOpts", + &pomsrGetSupportedTplOpts); + if(localRet == RS_RET_OK) { + /* found entry point, so let's see if core supports msg passing */ + CHKiRet((*pomsrGetSupportedTplOpts)(&opts)); + if(opts & OMSR_TPL_AS_JSON) + bJSONPassingSupported = 1; + } else if(localRet != RS_RET_ENTRY_POINT_NOT_FOUND) { + ABORT_FINALIZE(localRet); /* Something else went wrong, not acceptable */ + } + if(!bJSONPassingSupported) { + DBGPRINTF("ommongodb: JSON-passing is not supported by rsyslog core, " + "can not continue.\n"); + ABORT_FINALIZE(RS_RET_NO_JSON_PASSING); + } ENDmodInit |