diff options
Diffstat (limited to 'plugins/ommongodb/ommongodb.c')
-rw-r--r-- | plugins/ommongodb/ommongodb.c | 316 |
1 files changed, 189 insertions, 127 deletions
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c index 8e19105f..2763eecf 100644 --- a/plugins/ommongodb/ommongodb.c +++ b/plugins/ommongodb/ommongodb.c @@ -1,3 +1,28 @@ +/* ommongodb.c + * Output module for mongodb. + * Note: this module uses the libmongo-client library. The original 10gen + * mongodb C interface is crap. Obtain the library here: + * https://github.com/algernon/libmongo-client + * + * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" #include <stdio.h> #include <string.h> #include <stdlib.h> @@ -6,9 +31,8 @@ #include <assert.h> #include <signal.h> #include <time.h> -#include "bson.h" -#include "mongo.h" -#include "config.h" +#include <mongo.h> + #include "rsyslog.h" #include "conf.h" #include "syslogd-types.h" @@ -17,7 +41,6 @@ #include "module-template.h" #include "errmsg.h" #include "cfsysline.h" -#include "mongo-c-driver/src/mongo.h" #define countof(X) ( (size_t) ( sizeof(X)/sizeof*(X) ) ) @@ -26,29 +49,50 @@ #define DEFAULT_COLLECTION "log" #define DEFAULT_DB_COLLECTION "syslog.log" -//i just defined some constants, i couldt not find the limit -#define MONGO_DB_NAME_SIZE 128 -#define MONGO_COLLECTION_NAME_SIZE 128 - MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("ommongodb") /* internal structures */ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) typedef struct _instanceData { - mongo_connection conn[1]; /* ptr */ - mongo_connection_options opts[1]; - mongo_conn_return status; - char db[MONGO_DB_NAME_SIZE]; - char collection[MONGO_COLLECTION_NAME_SIZE]; - char dbcollection[MONGO_DB_NAME_SIZE + MONGO_COLLECTION_NAME_SIZE + 1]; + mongo_sync_connection *conn; + uchar *server; + int port; + uchar *db; + uchar *collection; + uchar *uid; + uchar *pwd; unsigned uLastMongoDBErrno; - //unsigned iSrvPort; /* sample: server port */ + uchar *tplName; } instanceData; -char db[_DB_MAXDBLEN+2]; -static int iSrvPort = 27017; + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "server", eCmdHdlrGetWord, 0 }, + { "serverport", eCmdHdlrInt, 0 }, + { "db", eCmdHdlrGetWord, 0 }, + { "collection", eCmdHdlrGetWord, 0 }, + { "uid", eCmdHdlrGetWord, 0 }, + { "pwd", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + + +BEGINinitConfVars /* (re)set config variables to default values */ +CODESTARTinitConfVars +ENDinitConfVars + + BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance @@ -69,44 +113,28 @@ static void closeMongoDB(instanceData *pData) ASSERT(pData != NULL); if(pData->conn != NULL) { - mongo_destroy( pData->conn ); - memset(pData->conn,0x00,sizeof(mongo_connection)); + mongo_sync_disconnect(pData->conn); + pData->conn = NULL; } } BEGINfreeInstance CODESTARTfreeInstance closeMongoDB(pData); + free(pData->server); + free(pData->db); + free(pData->collection); + free(pData->uid); + free(pData->pwd); + free(pData->tplName); ENDfreeInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* nothing special here */ ENDdbgPrintInstInfo -/* log a database error with descriptive message. - * We check if we have a valid MongoDB handle. If not, we simply - * report an error - */ -static void reportDBError(instanceData *pData, int bSilent) -{ - char errMsg[512]; - bson ErrObj; - - ASSERT(pData != NULL); - - /* output log message */ - errno = 0; - if(pData->conn == NULL) { - errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MongoDB handle"); - } else { /* we can ask mysql for the error description... */ - //we should handle the error. if bSilent is set then we should print as debug - mongo_cmd_get_last_error(pData->conn, pData->db, &ErrObj); - bson_destroy(&ErrObj); - } - - return; -} /* The following function is responsible for initializing a * MySQL connection. @@ -114,101 +142,65 @@ static void reportDBError(instanceData *pData, int bSilent) */ static rsRetVal initMongoDB(instanceData *pData, int bSilent) { + char *server; DEFiRet; - ASSERT(pData != NULL); - ASSERT(pData->conn == NULL); - - //I'm trying to fallback to a default here - if(pData->opts->port == 0) - pData->opts->port = 27017; - +#if 0 if(pData->opts->host == 0x00) strcpy(pData->opts->host,DEFAULT_SERVER); if(pData->dbcollection == 0x00) strcpy(pData->dbcollection,DEFAULT_DB_COLLECTION); +#endif + server = (pData->server == NULL) ? "127.0.0.1" : (char*) pData->server; - pData->status = mongo_connect(pData->conn, pData->opts ); - - switch (pData->status) { - case mongo_conn_success: - fprintf(stderr, "connection succeeded\n" ); - iRet = RS_RET_OK; - break; - case mongo_conn_bad_arg: - errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); - fprintf(stderr, "bad arguments\n" ); - iRet = RS_RET_SUSPENDED; - break; - case mongo_conn_no_socket: - errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); - fprintf(stderr, "no socket\n" ); - iRet = RS_RET_SUSPENDED; - break; - case mongo_conn_fail: - errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); - fprintf(stderr, "connection failed\n" ); - iRet = RS_RET_SUSPENDED; - break; - case mongo_conn_not_master: + DBGPRINTF("ommongodb: trying connect to '%s' at port %d\n", server, pData->port); + pData->conn = mongo_sync_connect(server, pData->port, TRUE); + if(pData->conn == NULL) { errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); - fprintf(stderr, "not master\n" ); - iRet = RS_RET_SUSPENDED; - break; - } + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: RETiRet; } -//we must implement it rsRetVal writeMongoDB(uchar *psz, instanceData *pData) { - char mydate[32]; - char **szParams; - bson b[1]; - bson_buffer buf[1]; - bson_buffer_init( buf ); - bson_append_new_oid(buf, "_id" ); - memset(mydate,0x00,32); - - - DEFiRet; - - ASSERT(psz != NULL); - ASSERT(pData != NULL); - - - /* see if we are ready to proceed */ - if(pData->conn == NULL) { - CHKiRet(initMongoDB(pData, 0)); - } - -szParams = (char**)(void*) psz; -//We can make it beter -//if you change the fields in your template, we must update it here -//there is any C_metaprogramming_ninja there? :-) -if(countof(szParams) > 0) -{ - bson_append_string( buf, "msg", szParams[0]); - bson_append_string( buf, "facility",szParams[1]); - bson_append_string( buf, "hostname", szParams[2] ); - bson_append_string(buf, "priority",szParams[3]); - bson_append_int(buf,"count",countof(szParams)); - bson_from_buffer( b, buf ); - mongo_insert(pData->conn, pData->dbcollection, b ); -} + bson *doc; + char **szParams; + DEFiRet; -if(b) - bson_destroy(b); + /* see if we are ready to proceed */ + if(pData->conn == NULL) { + CHKiRet(initMongoDB(pData, 0)); + } + szParams = (char**)(void*) psz; + doc = bson_build(BSON_TYPE_STRING, "msg", szParams[0], -1, + BSON_TYPE_STRING, "facility", szParams[1], -1, + BSON_TYPE_STRING, "hostname", szParams[2], -1, + BSON_TYPE_STRING, "priority", szParams[3], -1, + BSON_TYPE_NONE); + if(doc == NULL) { + dbgprintf("ommongodb: error creating BSON doc\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + bson_finish(doc); + if(!mongo_sync_cmd_insert(pData->conn, "syslog.doc", doc, NULL)) { + perror ("mongo_sync_cmd_insert()"); + dbgprintf("ommongodb: insert error\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + bson_free(doc); - finalize_it: +finalize_it: if(iRet == RS_RET_OK) { pData->uLastMongoDBErrno = 0; /* reset error for error supression */ } - RETiRet; + RETiRet; } BEGINtryResume @@ -223,11 +215,80 @@ CODESTARTdoAction iRet = writeMongoDB(ppString[0], pData); ENDdoAction + +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->server = NULL; + pData->port = 27017; + pData->db = NULL; + pData->collection= NULL; + pData->uid = NULL; + pData->pwd = NULL; + pData->tplName = NULL; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst +dbgprintf("ommongodb: enter newActInst\n"); + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } +dbgprintf("ommongodb: newActInst 10\n"); + + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); +dbgprintf("ommongodb: newActInst 20\n"); + + CODE_STD_STRING_REQUESTparseSelectorAct(1) + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "server")) { + pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "serverport")) { + pData->port = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "db")) { + pData->db = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "collection")) { + pData->collection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "uid")) { + pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "pwd")) { + pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("ommysql: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + + if(pData->tplName == NULL) { +dbgprintf("ommongodb: using default template\n"); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*) strdup(" StdDBFmt"), + OMSR_TPL_AS_ARRAY)); + } else { +dbgprintf("ommongodb: using configured template '%s'\n", pData->tplName); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, + (uchar*) strdup((char*) pData->tplName), + OMSR_TPL_AS_ARRAY)); + } + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + BEGINparseSelectorAct - //int iMongoDBPropErr = 0; CODESTARTparseSelectorAct CODE_STD_STRING_REQUESTparseSelectorAct(1) - + char tmpBuf[256]; +// ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + /* don't use old config interface! */ +#if 1 if(!strncmp((char*) p, ":ommongodb:", sizeof(":ommongodb:") - 1)) { p += sizeof(":ommongodb:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ } else { @@ -236,14 +297,16 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CHKiRet(createInstance(&pData)); +#if 0 if(getSubString(&p, pData->opts->host, MAXHOSTNAMELEN+1, ',')) strcpy(pData->opts->host,DEFAULT_SERVER); +#endif //we must define the max db name - if(getSubString(&p,pData->db,255,',')) - strcpy(pData->db,DEFAULT_DATABASE); - if(getSubString(&p,pData->collection,255,';')) - strcpy(pData->collection,DEFAULT_COLLECTION); + if(getSubString(&p,tmpBuf,255,',')) + ;//strcpy(pData->db,DEFAULT_DATABASE); + if(getSubString(&p,tmpBuf,255,';')) + ;//strcpy(pData->collection,DEFAULT_COLLECTION); if(*(p-1) == ';') --p; @@ -251,10 +314,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_ARRAY, (uchar*) " StdMongoDBFmt")); - pData->opts->port = (unsigned) iSrvPort; /* set configured port */ - sprintf(pData->dbcollection,"%s.%s",pData->db,pData->collection); CHKiRet(initMongoDB(pData, 0)); - +#endif CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -267,6 +328,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt BEGINmodInit() @@ -275,6 +337,6 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); - DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION); - DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); -ENDmodInit
\ No newline at end of file + DBGPRINTF("ommongodb: module compiled with rsyslog version %s.\n", VERSION); + //DBGPRINTF("ommongodb: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); +ENDmodInit |