summaryrefslogtreecommitdiffstats
path: root/plugins/ommongodb/ommongodb.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ommongodb/ommongodb.c')
-rw-r--r--plugins/ommongodb/ommongodb.c316
1 files changed, 189 insertions, 127 deletions
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c
index 8e19105f..2763eecf 100644
--- a/plugins/ommongodb/ommongodb.c
+++ b/plugins/ommongodb/ommongodb.c
@@ -1,3 +1,28 @@
+/* ommongodb.c
+ * Output module for mongodb.
+ * Note: this module uses the libmongo-client library. The original 10gen
+ * mongodb C interface is crap. Obtain the library here:
+ * https://github.com/algernon/libmongo-client
+ *
+ * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
@@ -6,9 +31,8 @@
#include <assert.h>
#include <signal.h>
#include <time.h>
-#include "bson.h"
-#include "mongo.h"
-#include "config.h"
+#include <mongo.h>
+
#include "rsyslog.h"
#include "conf.h"
#include "syslogd-types.h"
@@ -17,7 +41,6 @@
#include "module-template.h"
#include "errmsg.h"
#include "cfsysline.h"
-#include "mongo-c-driver/src/mongo.h"
#define countof(X) ( (size_t) ( sizeof(X)/sizeof*(X) ) )
@@ -26,29 +49,50 @@
#define DEFAULT_COLLECTION "log"
#define DEFAULT_DB_COLLECTION "syslog.log"
-//i just defined some constants, i couldt not find the limit
-#define MONGO_DB_NAME_SIZE 128
-#define MONGO_COLLECTION_NAME_SIZE 128
-
MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("ommongodb")
/* internal structures
*/
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
typedef struct _instanceData {
- mongo_connection conn[1]; /* ptr */
- mongo_connection_options opts[1];
- mongo_conn_return status;
- char db[MONGO_DB_NAME_SIZE];
- char collection[MONGO_COLLECTION_NAME_SIZE];
- char dbcollection[MONGO_DB_NAME_SIZE + MONGO_COLLECTION_NAME_SIZE + 1];
+ mongo_sync_connection *conn;
+ uchar *server;
+ int port;
+ uchar *db;
+ uchar *collection;
+ uchar *uid;
+ uchar *pwd;
unsigned uLastMongoDBErrno;
- //unsigned iSrvPort; /* sample: server port */
+ uchar *tplName;
} instanceData;
-char db[_DB_MAXDBLEN+2];
-static int iSrvPort = 27017;
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "server", eCmdHdlrGetWord, 0 },
+ { "serverport", eCmdHdlrInt, 0 },
+ { "db", eCmdHdlrGetWord, 0 },
+ { "collection", eCmdHdlrGetWord, 0 },
+ { "uid", eCmdHdlrGetWord, 0 },
+ { "pwd", eCmdHdlrGetWord, 0 },
+ { "template", eCmdHdlrGetWord, 1 }
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+
+BEGINinitConfVars /* (re)set config variables to default values */
+CODESTARTinitConfVars
+ENDinitConfVars
+
+
BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
@@ -69,44 +113,28 @@ static void closeMongoDB(instanceData *pData)
ASSERT(pData != NULL);
if(pData->conn != NULL) {
- mongo_destroy( pData->conn );
- memset(pData->conn,0x00,sizeof(mongo_connection));
+ mongo_sync_disconnect(pData->conn);
+ pData->conn = NULL;
}
}
BEGINfreeInstance
CODESTARTfreeInstance
closeMongoDB(pData);
+ free(pData->server);
+ free(pData->db);
+ free(pData->collection);
+ free(pData->uid);
+ free(pData->pwd);
+ free(pData->tplName);
ENDfreeInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
/* nothing special here */
ENDdbgPrintInstInfo
-/* log a database error with descriptive message.
- * We check if we have a valid MongoDB handle. If not, we simply
- * report an error
- */
-static void reportDBError(instanceData *pData, int bSilent)
-{
- char errMsg[512];
- bson ErrObj;
-
- ASSERT(pData != NULL);
-
- /* output log message */
- errno = 0;
- if(pData->conn == NULL) {
- errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MongoDB handle");
- } else { /* we can ask mysql for the error description... */
- //we should handle the error. if bSilent is set then we should print as debug
- mongo_cmd_get_last_error(pData->conn, pData->db, &ErrObj);
- bson_destroy(&ErrObj);
- }
-
- return;
-}
/* The following function is responsible for initializing a
* MySQL connection.
@@ -114,101 +142,65 @@ static void reportDBError(instanceData *pData, int bSilent)
*/
static rsRetVal initMongoDB(instanceData *pData, int bSilent)
{
+ char *server;
DEFiRet;
- ASSERT(pData != NULL);
- ASSERT(pData->conn == NULL);
-
- //I'm trying to fallback to a default here
- if(pData->opts->port == 0)
- pData->opts->port = 27017;
-
+#if 0
if(pData->opts->host == 0x00)
strcpy(pData->opts->host,DEFAULT_SERVER);
if(pData->dbcollection == 0x00)
strcpy(pData->dbcollection,DEFAULT_DB_COLLECTION);
+#endif
+ server = (pData->server == NULL) ? "127.0.0.1" : (char*) pData->server;
- pData->status = mongo_connect(pData->conn, pData->opts );
-
- switch (pData->status) {
- case mongo_conn_success:
- fprintf(stderr, "connection succeeded\n" );
- iRet = RS_RET_OK;
- break;
- case mongo_conn_bad_arg:
- errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
- fprintf(stderr, "bad arguments\n" );
- iRet = RS_RET_SUSPENDED;
- break;
- case mongo_conn_no_socket:
- errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
- fprintf(stderr, "no socket\n" );
- iRet = RS_RET_SUSPENDED;
- break;
- case mongo_conn_fail:
- errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
- fprintf(stderr, "connection failed\n" );
- iRet = RS_RET_SUSPENDED;
- break;
- case mongo_conn_not_master:
+ DBGPRINTF("ommongodb: trying connect to '%s' at port %d\n", server, pData->port);
+ pData->conn = mongo_sync_connect(server, pData->port, TRUE);
+ if(pData->conn == NULL) {
errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
- fprintf(stderr, "not master\n" );
- iRet = RS_RET_SUSPENDED;
- break;
- }
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
RETiRet;
}
-//we must implement it
rsRetVal writeMongoDB(uchar *psz, instanceData *pData)
{
- char mydate[32];
- char **szParams;
- bson b[1];
- bson_buffer buf[1];
- bson_buffer_init( buf );
- bson_append_new_oid(buf, "_id" );
- memset(mydate,0x00,32);
-
-
- DEFiRet;
-
- ASSERT(psz != NULL);
- ASSERT(pData != NULL);
-
-
- /* see if we are ready to proceed */
- if(pData->conn == NULL) {
- CHKiRet(initMongoDB(pData, 0));
- }
-
-szParams = (char**)(void*) psz;
-//We can make it beter
-//if you change the fields in your template, we must update it here
-//there is any C_metaprogramming_ninja there? :-)
-if(countof(szParams) > 0)
-{
- bson_append_string( buf, "msg", szParams[0]);
- bson_append_string( buf, "facility",szParams[1]);
- bson_append_string( buf, "hostname", szParams[2] );
- bson_append_string(buf, "priority",szParams[3]);
- bson_append_int(buf,"count",countof(szParams));
- bson_from_buffer( b, buf );
- mongo_insert(pData->conn, pData->dbcollection, b );
-}
+ bson *doc;
+ char **szParams;
+ DEFiRet;
-if(b)
- bson_destroy(b);
+ /* see if we are ready to proceed */
+ if(pData->conn == NULL) {
+ CHKiRet(initMongoDB(pData, 0));
+ }
+ szParams = (char**)(void*) psz;
+ doc = bson_build(BSON_TYPE_STRING, "msg", szParams[0], -1,
+ BSON_TYPE_STRING, "facility", szParams[1], -1,
+ BSON_TYPE_STRING, "hostname", szParams[2], -1,
+ BSON_TYPE_STRING, "priority", szParams[3], -1,
+ BSON_TYPE_NONE);
+ if(doc == NULL) {
+ dbgprintf("ommongodb: error creating BSON doc\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ bson_finish(doc);
+ if(!mongo_sync_cmd_insert(pData->conn, "syslog.doc", doc, NULL)) {
+ perror ("mongo_sync_cmd_insert()");
+ dbgprintf("ommongodb: insert error\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ bson_free(doc);
- finalize_it:
+finalize_it:
if(iRet == RS_RET_OK) {
pData->uLastMongoDBErrno = 0; /* reset error for error supression */
}
- RETiRet;
+ RETiRet;
}
BEGINtryResume
@@ -223,11 +215,80 @@ CODESTARTdoAction
iRet = writeMongoDB(ppString[0], pData);
ENDdoAction
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->server = NULL;
+ pData->port = 27017;
+ pData->db = NULL;
+ pData->collection= NULL;
+ pData->uid = NULL;
+ pData->pwd = NULL;
+ pData->tplName = NULL;
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+dbgprintf("ommongodb: enter newActInst\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+dbgprintf("ommongodb: newActInst 10\n");
+
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+dbgprintf("ommongodb: newActInst 20\n");
+
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "server")) {
+ pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "serverport")) {
+ pData->port = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "db")) {
+ pData->db = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "collection")) {
+ pData->collection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "uid")) {
+ pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "pwd")) {
+ pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("ommysql: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+ if(pData->tplName == NULL) {
+dbgprintf("ommongodb: using default template\n");
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*) strdup(" StdDBFmt"),
+ OMSR_TPL_AS_ARRAY));
+ } else {
+dbgprintf("ommongodb: using configured template '%s'\n", pData->tplName);
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0,
+ (uchar*) strdup((char*) pData->tplName),
+ OMSR_TPL_AS_ARRAY));
+ }
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
BEGINparseSelectorAct
- //int iMongoDBPropErr = 0;
CODESTARTparseSelectorAct
CODE_STD_STRING_REQUESTparseSelectorAct(1)
-
+ char tmpBuf[256];
+// ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+ /* don't use old config interface! */
+#if 1
if(!strncmp((char*) p, ":ommongodb:", sizeof(":ommongodb:") - 1)) {
p += sizeof(":ommongodb:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
} else {
@@ -236,14 +297,16 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
CHKiRet(createInstance(&pData));
+#if 0
if(getSubString(&p, pData->opts->host, MAXHOSTNAMELEN+1, ','))
strcpy(pData->opts->host,DEFAULT_SERVER);
+#endif
//we must define the max db name
- if(getSubString(&p,pData->db,255,','))
- strcpy(pData->db,DEFAULT_DATABASE);
- if(getSubString(&p,pData->collection,255,';'))
- strcpy(pData->collection,DEFAULT_COLLECTION);
+ if(getSubString(&p,tmpBuf,255,','))
+ ;//strcpy(pData->db,DEFAULT_DATABASE);
+ if(getSubString(&p,tmpBuf,255,';'))
+ ;//strcpy(pData->collection,DEFAULT_COLLECTION);
if(*(p-1) == ';')
--p;
@@ -251,10 +314,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_ARRAY, (uchar*) " StdMongoDBFmt"));
- pData->opts->port = (unsigned) iSrvPort; /* set configured port */
- sprintf(pData->dbcollection,"%s.%s",pData->db,pData->collection);
CHKiRet(initMongoDB(pData, 0));
-
+#endif
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -267,6 +328,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
BEGINmodInit()
@@ -275,6 +337,6 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
- DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION);
- DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not ");
-ENDmodInit \ No newline at end of file
+ DBGPRINTF("ommongodb: module compiled with rsyslog version %s.\n", VERSION);
+ //DBGPRINTF("ommongodb: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not ");
+ENDmodInit