diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imgssapi/imgssapi.c | 21 | ||||
-rwxr-xr-x[-rw-r--r--] | plugins/imjournal/imjournal.c | 153 | ||||
-rw-r--r-- | plugins/imrelp/imrelp.c | 86 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 4 | ||||
-rw-r--r-- | plugins/imzmq3/README | 59 | ||||
-rw-r--r-- | plugins/imzmq3/imzmq3.c | 781 | ||||
-rw-r--r-- | plugins/mmaudit/Makefile.am | 4 | ||||
-rw-r--r-- | plugins/mmaudit/mmaudit.c | 3 | ||||
-rw-r--r-- | plugins/mmcount/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/mmcount/mmcount.c | 342 | ||||
-rw-r--r-- | plugins/mmfields/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/mmfields/mmfields.c | 265 | ||||
-rw-r--r-- | plugins/mmjsonparse/Makefile.am | 4 | ||||
-rw-r--r-- | plugins/mmjsonparse/mmjsonparse.c | 1 | ||||
-rw-r--r-- | plugins/omjournal/omjournal.c | 2 | ||||
-rw-r--r-- | plugins/omrelp/omrelp.c | 115 | ||||
-rw-r--r-- | plugins/omzmq3/README | 14 | ||||
-rw-r--r-- | plugins/omzmq3/omzmq3.c | 202 |
18 files changed, 1577 insertions, 495 deletions
diff --git a/plugins/imgssapi/imgssapi.c b/plugins/imgssapi/imgssapi.c index 4e3a70ab..15d994cc 100644 --- a/plugins/imgssapi/imgssapi.c +++ b/plugins/imgssapi/imgssapi.c @@ -72,6 +72,7 @@ MODULE_CNFNAME("imgssapi") /* some forward definitions - they may go away when we no longer include imtcp.c */ static rsRetVal addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal); +static rsRetVal actGSSListener(uchar *port); static int TCPSessGSSInit(void); static void TCPSessGSSClose(tcps_sess_t* pSess); static rsRetVal TCPSessGSSRecv(tcps_sess_t *pSess, void *buf, size_t buf_len, ssize_t *); @@ -90,6 +91,7 @@ DEFobjCurrIf(glbl) static tcpsrv_t *pOurTcpsrv = NULL; /* our TCP server(listener) TODO: change for multiple instances */ static gss_cred_id_t gss_server_creds = GSS_C_NO_CREDENTIAL; +static uchar *srvPort; /* our usr structure for the tcpsrv object */ typedef struct gsssrv_s { @@ -317,6 +319,16 @@ static rsRetVal addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal) { DEFiRet; + + srvPort = pNewVal; + + RETiRet; +} + +static rsRetVal +actGSSListener(uchar *port) +{ + DEFiRet; gsssrv_t *pGSrv; if(pOurTcpsrv == NULL) { @@ -340,7 +352,7 @@ addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal) CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose)); CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose)); CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, UCHAR_CONSTANT("imgssapi"))); - tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal, 1); + tcpsrv.configureTCPListen(pOurTcpsrv, port, 1); CHKiRet(tcpsrv.ConstructFinalize(pOurTcpsrv)); } @@ -676,6 +688,11 @@ ENDfreeCnf */ BEGINrunInput CODESTARTrunInput + /* This will fail if the priviledges are dropped. Should be + * moved to the '*activateCnfPrePrivDrop' section eventually. + */ + actGSSListener(srvPort); + iRet = tcpsrv.Run(pOurTcpsrv); ENDrunInput @@ -683,7 +700,7 @@ ENDrunInput /* initialize and return if will run or not */ BEGINwillRun CODESTARTwillRun - if(pOurTcpsrv == NULL) + if(srvPort == NULL) ABORT_FINALIZE(RS_RET_NO_RUN); net.PrintAllowedSenders(2); /* TCP */ diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c index 2af1958e..cce45b9c 100644..100755 --- a/plugins/imjournal/imjournal.c +++ b/plugins/imjournal/imjournal.c @@ -30,7 +30,9 @@ #include <ctype.h> #include <stdlib.h> #include <time.h> +#include <sys/poll.h> #include <sys/socket.h> +#include <errno.h> #include "dirty.h" #include "cfsysline.h" @@ -43,6 +45,7 @@ #include "glbl.h" #include "prop.h" #include "errmsg.h" +#include "srUtils.h" #include "unicode-helper.h" #include <systemd/sd-journal.h> @@ -136,16 +139,20 @@ readjournal() { uint64_t timestamp; struct json_object *json = NULL; + int r; /* Information from messages */ char *message; + char *sys_pid; char *sys_iden; char *sys_iden_help; const void *get; + const void *pidget; char *parse; char *get2; size_t length; + size_t pidlength; const void *equal_sign; struct json_object *jval; @@ -158,13 +165,6 @@ readjournal() { int priority = 0; int facility = 0; - /* Get next journal message, if there is none, wait a second */ - if (sd_journal_next(j) == 0) { - sleep(1); - iRet = RS_RET_OK; - goto ret; - } - /* Get message text */ if (sd_journal_get_data(j, "MESSAGE", &get, &length) < 0) { logmsgInternal(NO_ERRCODE, LOG_SYSLOG|LOG_INFO, (uchar *)"log message from journal doesn't have MESSAGE", 0); @@ -203,7 +203,7 @@ readjournal() { goto free_message; } - /* Get message identifier and add ':' */ + /* Get message identifier, client pid and add ':' */ if (sd_journal_get_data(j, "SYSLOG_IDENTIFIER", &get, &length) >= 0) { sys_iden = strndup(get+18, length-18); } else { @@ -214,19 +214,44 @@ readjournal() { goto free_message; } - asprintf(&sys_iden_help, "%s:", sys_iden); - if (sys_iden_help == NULL) { + if (sd_journal_get_data(j, "SYSLOG_PID", &pidget, &pidlength) >= 0) { + sys_pid = strndup(pidget+11, pidlength-11); + if (sys_pid == NULL) { + iRet = RS_RET_OUT_OF_MEMORY; + free (sys_iden); + goto free_message; + } + } else { + sys_pid = NULL; + } + + if (sys_pid) { + r = asprintf(&sys_iden_help, "%s[%s]:", sys_iden, sys_pid); + } else { + r = asprintf(&sys_iden_help, "%s:", sys_iden); + } + + free (sys_iden); + free (sys_pid); + + if (-1 == r) { iRet = RS_RET_OUT_OF_MEMORY; goto finalize_it; } - free (sys_iden); json = json_object_new_object(); SD_JOURNAL_FOREACH_DATA(j, get, l) { /* locate equal sign, this is always present */ equal_sign = memchr(get, '=', l); - assert (equal_sign != NULL); + + /* ... but we know better than to trust the specs */ + if (equal_sign == NULL) { + errmsg.LogError(0, RS_RET_ERR,"SD_JOURNAL_FOREACH_DATA()" + " returned a malformed field (has no '='): '%s'", + get); + continue; /* skip the entry */ + } /* get length of journal data prefix */ prefixlen = ((char *)equal_sign - (char *)get); @@ -337,7 +362,9 @@ persistJournalState () { char *cursor; int ret = 0; - if ((ret = sd_journal_get_cursor(j, &cursor)) > 0) { + /* On success, sd_journal_get_cursor() returns 1 in systemd + 197 or older and 0 in systemd 198 or newer */ + if ((ret = sd_journal_get_cursor(j, &cursor)) >= 0) { if ((sf = fopen(cs.stateFile, "wb")) != NULL) { if (fprintf(sf, "%s", cursor) < 0) { iRet = RS_RET_IO_ERROR; @@ -345,29 +372,92 @@ persistJournalState () { fclose(sf); free(cursor); } else { + char errStr[256]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "fopen() failed: " + "'%s', path: '%s'\n", errStr, cs.stateFile); iRet = RS_RET_FOPEN_FAILURE; } } else { + char errStr[256]; + rs_strerror_r(-(ret), errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, "sd_journal_get_cursor() failed: '%s'\n", errStr); iRet = RS_RET_ERR; } RETiRet; } +/* Polls the journal for new messages. Similar to sd_journal_wait() + * except for the special handling of EINTR. + */ +static rsRetVal +pollJournal() +{ + DEFiRet; + struct pollfd pollfd; + int r; + + pollfd.fd = sd_journal_get_fd(j); + pollfd.events = sd_journal_get_events(j); + r = poll(&pollfd, 1, -1); + if (r == -1) { + if (errno == EINTR) { + /* EINTR is also received during termination + * so return now to check the term state. + */ + ABORT_FINALIZE(RS_RET_OK); + } else { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "poll() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + assert(r == 1); + + r = sd_journal_process(j); + if (r < 0) { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "sd_journal_process() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + +finalize_it: + RETiRet; +} + + BEGINrunInput CODESTARTrunInput /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ - int count = 0; + if (cs.stateFile[0] != '/') { + char *new_stateFile; - char readCursor[128 + 1]; - FILE *r_sf; + if (-1 == asprintf(&new_stateFile, "%s/%s", (char *)glbl.GetWorkDir(), cs.stateFile)) { + errmsg.LogError(0, RS_RET_OUT_OF_MEMORY, "imjournal: asprintf failed\n"); + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + free (cs.stateFile); + cs.stateFile = new_stateFile; + } /* if state file exists, set cursor to appropriate position */ if (access(cs.stateFile, F_OK|R_OK) != -1) { + FILE *r_sf; + if ((r_sf = fopen(cs.stateFile, "rb")) != NULL) { + char readCursor[128 + 1]; + if (fscanf(r_sf, "%128s\n", readCursor) != EOF) { if (sd_journal_seek_cursor(j, readCursor) != 0) { errmsg.LogError(0, RS_RET_ERR, "imjournal: " @@ -390,14 +480,32 @@ CODESTARTrunInput } while (glbl.GetGlobalInputTermState() == 0) { + int count = 0, r; + + r = sd_journal_next(j); + if (r < 0) { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "sd_journal_next() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + + if (r == 0) { + /* No new messages, wait for activity. */ + CHKiRet(pollJournal()); + continue; + } + CHKiRet(readjournal()); + /* TODO: This could use some finer metric. */ count++; if (count == cs.iPersistStateInterval) { count = 0; persistJournalState(); } } - persistJournalState(); finalize_it: ENDrunInput @@ -444,6 +552,7 @@ ENDwillRun /* close journal */ BEGINafterRun CODESTARTafterRun + persistJournalState(); sd_journal_close(j); ENDafterRun @@ -500,16 +609,22 @@ finalize_it: ENDsetModCnf +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt - - BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c index 5e0ae552..7fa98617 100644 --- a/plugins/imrelp/imrelp.c +++ b/plugins/imrelp/imrelp.c @@ -4,7 +4,7 @@ * * File begun on 2008-03-13 by RGerhards * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -74,6 +74,8 @@ static struct configSettings_s { struct instanceConf_s { uchar *pszBindPort; /* port to bind to */ + sbool bEnableTLS; + sbool bEnableTLSZip; struct instanceConf_s *next; }; @@ -88,9 +90,21 @@ struct modConfData_s { static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "ruleset", eCmdHdlrGetWord, 0 }, +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + /* input instance parameters */ static struct cnfparamdescr inppdescr[] = { - { "port", eCmdHdlrString, CNFPARAM_REQUIRED } + { "port", eCmdHdlrString, CNFPARAM_REQUIRED }, + { "tls", eCmdHdlrBinary, 0 }, + { "tls.compression", eCmdHdlrBinary, 0 } }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, @@ -155,6 +169,8 @@ createInstance(instanceConf_t **pinst) inst->next = NULL; inst->pszBindPort = NULL; + inst->bEnableTLS = 0; + inst->bEnableTLSZip = 0; /* node created, let's add to config */ if(loadModConf->tail == NULL) { @@ -179,7 +195,7 @@ std_checkRuleset_genErrMsg(modConfData_t *modConf, __attribute__((unused)) insta } -/* This function is called when a new listener instace shall be added to +/* This function is called when a new listener instance shall be added to * the current config object via the legacy config system. It just shuffles * all parameters to the listener in-memory instance. * rgerhards, 2011-05-04 @@ -204,6 +220,7 @@ finalize_it: static rsRetVal addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) { + relpSrv_t *pSrv; DEFiRet; if(pRelpEngine == NULL) { CHKiRet(relpEngineConstruct(&pRelpEngine)); @@ -216,7 +233,15 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) } } - CHKiRet(relpEngineAddListner(pRelpEngine, inst->pszBindPort)); + CHKiRet(relpEngineListnerConstruct(pRelpEngine, &pSrv)); + CHKiRet(relpSrvSetLstnPort(pSrv, inst->pszBindPort)); + if(inst->bEnableTLS) { + relpSrvEnableTLS(pSrv); + if(inst->bEnableTLSZip) { + relpSrvEnableTLSZip(pSrv); + } + } + CHKiRet(relpEngineListnerConstructFinalize(pRelpEngine, pSrv)); finalize_it: RETiRet; @@ -249,6 +274,10 @@ CODESTARTnewInpInst continue; if(!strcmp(inppblk.descr[i].name, "port")) { inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "tls")) { + inst->bEnableTLS = (unsigned) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "tls.compression")) { + inst->bEnableTLSZip = (unsigned) pvals[i].val.d.n; } else { dbgprintf("imrelp: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); @@ -264,19 +293,58 @@ BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; + pModConf->pszBindRuleset = NULL; + pModConf->pBindRuleset = NULL; /* init legacy config variables */ cs.pszBindRuleset = NULL; ENDbeginCnfLoad +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imrelp:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "ruleset")) { + loadModConf->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("imrelp: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + BEGINendCnfLoad CODESTARTendCnfLoad - if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) { - loadModConf->pszBindRuleset = NULL; + if(loadModConf->pszBindRuleset == NULL) { + if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) { + loadModConf->pszBindRuleset = NULL; + } else { + CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset)); + } } else { - CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset)); + if((cs.pszBindRuleset != NULL) && (cs.pszBindRuleset[0] != '\0')) { + errmsg.LogError(0, RS_RET_DUP_PARAM, "imrelp: warning: ruleset " + "set via legacy directive ignored"); + } } - loadModConf->pBindRuleset = NULL; finalize_it: free(cs.pszBindRuleset); loadModConf = NULL; /* done loading */ @@ -293,6 +361,7 @@ CODESTARTcheckCnf if(pModConf->pszBindRuleset == NULL) { pModConf->pBindRuleset = NULL; } else { + DBGPRINTF("imrelp: using ruleset '%s'\n", pModConf->pszBindRuleset); localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, pModConf->pszBindRuleset); if(localRet == RS_RET_NOT_FOUND) { std_checkRuleset_genErrMsg(pModConf, NULL); @@ -420,6 +489,7 @@ CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index c503852c..dad09ab4 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -1284,6 +1284,8 @@ BEGINactivateCnfPrePrivDrop instanceConf_t *inst; CODESTARTactivateCnfPrePrivDrop runModConf = pModConf; + if(runModConf->bOmitLocalLogging && nfd == 1) + ABORT_FINALIZE(RS_RET_OK); for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { addListner(inst); } @@ -1325,6 +1327,8 @@ BEGINrunInput #endif CODESTARTrunInput + if(runModConf->bOmitLocalLogging && nfd == 1) + ABORT_FINALIZE(RS_RET_OK); /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README index 88653b83..9a108a01 100644 --- a/plugins/imzmq3/README +++ b/plugins/imzmq3/README @@ -1,24 +1,59 @@ ZeroMQ 3.x Input Plugin Building this plugin: -Requires libzmq and libczmq. First, install libzmq from the HEAD on github: -http://github.com/zeromq/libzmq. You can clone the repository, build, then -install it. The directions for doing so are there in the readme. Then, do -the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 -version of libzmq will be released, and a supporting version of libczmq. -At that time, you could simply download and install the tarballs instead of -using git to clone the repositories. Those tarballs (when available) can -be found at http://download.zeromq.org. As of this writing (5/31/2012), the -most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile -properly. +Requires libzmq and libczmq. First, download the tarballs of both libzmq +and its supporting libczmq from http://download.zeromq.org. As of this +writing (04/23/2013), the most recent versions of libzmq and czmq are +3.2.2 and 1.3.2 respectively. Configure, build, and then install both libs. Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example below binds a SUB socket to port 7172, and then any messages with the topic "foo" will be pushed into rsyslog. +Please note: +This plugin only supports the newer (v7) config format. Legacy config support +was removed. + Example Rsyslog.conf snippet: ------------------------------------------------------------------------------- - -$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo +module(load="imzmq3" ioThreads="1") +input(type="imzmq3" action="CONNECT" socktype="SUB" description="tcp://*:7172" subscribe="foo,bar") ------------------------------------------------------------------------------- +Note you can specify multiple subscriptions with a comma-delimited list, with +no spaces between values. + +The only global parameter for this plugin is ioThreads, which is optional and +probably best left to the zmq default unless you know exactly what you are +doing. + +The instance-level parameters are: + +Required +description +subscribe (required if the sockType is SUB) + +Optional +sockType (defaults to SUB) +action (defaults to BIND +sndHWM +rcvHWM +identity +sndBuf +rcvBuf +linger +backlog +sndTimeout +rcvTimeout +maxMsgSize +rate +recoveryIVL +multicastHops +reconnectIVL +reconnectIVLMax +ipv4Only +affinity + +These all correspond to zmq optional settings. Except where noted, the defaults +are the zmq defaults if not set. See http://api.zeromq.org/3-2:zmq-setsockopt +for info on these. diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c index 52c12a53..08b1dbe4 100644 --- a/plugins/imzmq3/imzmq3.c +++ b/plugins/imzmq3/imzmq3.c @@ -19,20 +19,21 @@ * License along with this program. If not, see * <http://www.gnu.org/licenses/>. * - * Author: David Kelly - * <davidk@talksum.com> + * Authors: + * David Kelly <davidk@talksum.com> + * Hongfei Cheng <hongfeic@talksum.com> */ + +#include "config.h" +#include "rsyslog.h" + #include <assert.h> #include <errno.h> #include <stdlib.h> #include <string.h> #include <unistd.h> - -#include "rsyslog.h" - #include "cfsysline.h" -#include "config.h" #include "dirty.h" #include "errmsg.h" #include "glbl.h" @@ -49,6 +50,7 @@ MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("imzmq3"); /* convienent symbols to denote a socket we want to bind * vs one we want to just connect to @@ -83,47 +85,67 @@ typedef struct _poller_data { thrdInfo_t* thread; } poller_data; -typedef struct _socket_info { - int type; - int action; - char* description; - int sndHWM; /* if you want more than 2^32 messages, */ - int rcvHWM; /* then pass in 0 (the default). */ - char* identity; - char** subscriptions; - ruleset_t* ruleset; - int sndBuf; - int rcvBuf; - int linger; - int backlog; - int sndTimeout; - int rcvTimeout; - int maxMsgSize; - int rate; - int recoveryIVL; - int multicastHops; - int reconnectIVL; - int reconnectIVLMax; - int ipv4Only; - int affinity; - -} socket_info; +/* a linked-list of subscription topics */ +typedef struct sublist_t { + char* subscribe; + struct sublist_t* next; +} sublist; + +struct instanceConf_s { + int type; + int action; + char* description; + int sndHWM; /* if you want more than 2^32 messages, */ + int rcvHWM; /* then pass in 0 (the default). */ + char* identity; + sublist* subscriptions; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + uchar* pszBindRuleset; + ruleset_t* pBindRuleset; + struct instanceConf_s* next; + +}; + +struct modConfData_s { + rsconf_t* pConf; + instanceConf_t* root; + instanceConf_t* tail; + int io_threads; +}; +struct lstn_s { + struct lstn_s* next; + void* sock; + ruleset_t* pRuleset; +}; /* ---------------------------------------------------------------------------- * Static definitions/initializations. */ -static socket_info* s_socketInfo = NULL; -static size_t s_nitems = 0; -static prop_t * s_namep = NULL; +static modConfData_t* runModConf = NULL; +static struct lstn_s* lcnfRoot = NULL; +static struct lstn_s* lcnfLast = NULL; +static prop_t* s_namep = NULL; static zloop_t* s_zloop = NULL; -static int s_io_threads = 1; static zctx_t* s_context = NULL; -static ruleset_t* s_ruleset = NULL; static socket_type socketTypes[] = { - {"SUB", ZMQ_SUB }, - {"PULL", ZMQ_PULL }, - {"XSUB", ZMQ_XSUB } + {"SUB", ZMQ_SUB }, + {"PULL", ZMQ_PULL }, + {"ROUTER", ZMQ_ROUTER }, + {"XSUB", ZMQ_XSUB } }; static socket_action socketActions[] = { @@ -131,6 +153,48 @@ static socket_action socketActions[] = { {"CONNECT", ACTION_CONNECT}, }; +static struct cnfparamdescr modpdescr[] = { + { "ioThreads", eCmdHdlrInt, 0 }, +}; + +static struct cnfparamblk modpblk = { + CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr +}; + +static struct cnfparamdescr inppdescr[] = { + { "description", eCmdHdlrGetWord, 0 }, + { "sockType", eCmdHdlrGetWord, 0 }, + { "subscribe", eCmdHdlrGetWord, 0 }, + { "ruleset", eCmdHdlrGetWord, 0 }, + { "action", eCmdHdlrGetWord, 0 }, + { "sndHWM", eCmdHdlrInt, 0 }, + { "rcvHWM", eCmdHdlrInt, 0 }, + { "identity", eCmdHdlrGetWord, 0 }, + { "sndBuf", eCmdHdlrInt, 0 }, + { "rcvBuf", eCmdHdlrInt, 0 }, + { "linger", eCmdHdlrInt, 0 }, + { "backlog", eCmdHdlrInt, 0 }, + { "sndTimeout", eCmdHdlrInt, 0 }, + { "rcvTimeout", eCmdHdlrInt, 0 }, + { "maxMsgSize", eCmdHdlrInt, 0 }, + { "rate", eCmdHdlrInt, 0 }, + { "recoveryIVL", eCmdHdlrInt, 0 }, + { "multicastHops", eCmdHdlrInt, 0 }, + { "reconnectIVL", eCmdHdlrInt, 0 }, + { "reconnectIVLMax", eCmdHdlrInt, 0 }, + { "ipv4Only", eCmdHdlrInt, 0 }, + { "affinity", eCmdHdlrInt, 0 } +}; + +static struct cnfparamblk inppblk = { + CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr +}; + +#include "im-helper.h" /* must be included AFTER the type definitions! */ /* ---------------------------------------------------------------------------- * Helper functions @@ -179,15 +243,16 @@ static int getSocketAction(char* name) { } -static void setDefaults(socket_info* info) { - info->type = ZMQ_SUB; - info->action = ACTION_BIND; +static void setDefaults(instanceConf_t* info) { + info->type = -1; + info->action = -1; info->description = NULL; - info->sndHWM = 0; - info->rcvHWM = 0; + info->sndHWM = -1; + info->rcvHWM = -1; info->identity = NULL; info->subscriptions = NULL; - info->ruleset = NULL; + info->pszBindRuleset = NULL; + info->pBindRuleset = NULL; info->sndBuf = -1; info->rcvBuf = -1; info->linger = -1; @@ -202,93 +267,49 @@ static void setDefaults(socket_info* info) { info->reconnectIVLMax = -1; info->ipv4Only = -1; info->affinity = -1; - + info->next = NULL; }; - -/* The config string should look like: - * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'" - * +/* given a comma separated list of subscriptions, create a char* array of them + * to set later */ -static rsRetVal parseConfig(char* config, socket_info* info) { - int nsubs = 0; - - char* binding; - char* ptr1; - for (binding = strtok_r(config, ",", &ptr1); - binding != NULL; - binding = strtok_r(NULL, ",", &ptr1)) { - - /* Each binding looks like foo=bar */ - char * sep = strchr(binding, '='); - if (sep == NULL) - { - errmsg.LogError(0, NO_ERRCODE, - "Invalid argument format %s, ignoring ...", - binding); - continue; - } +static rsRetVal parseSubscriptions(char* subscribes, sublist** subList){ + char* tok = strtok(subscribes, ","); + sublist* currentSub; + sublist* head; + DEFiRet; - /* Replace '=' with '\0'. */ - *sep = '\0'; - - char * val = sep + 1; - - if (strcmp(binding, "action") == 0) { - info->action = getSocketAction(val); - } else if (strcmp(binding, "type") == 0) { - info->type = getSocketType(val); - } else if (strcmp(binding, "description") == 0) { - info->description = strdup(val); - } else if (strcmp(binding, "sndHWM") == 0) { - info->sndHWM = atoi(val); - } else if (strcmp(binding, "rcvHWM") == 0) { - info->sndHWM = atoi(val); - } else if (strcmp(binding, "subscribe") == 0) { - /* Add the subscription value to the list.*/ - char * substr = NULL; - substr = strdup(val); - info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); - info->subscriptions[nsubs] = substr; - ++nsubs; - } else if (strcmp(binding, "sndBuf") == 0) { - info->sndBuf = atoi(val); - } else if (strcmp(binding, "rcvBuf") == 0) { - info->rcvBuf = atoi(val); - } else if (strcmp(binding, "linger") == 0) { - info->linger = atoi(val); - } else if (strcmp(binding, "backlog") == 0) { - info->backlog = atoi(val); - } else if (strcmp(binding, "sndTimeout") == 0) { - info->sndTimeout = atoi(val); - } else if (strcmp(binding, "rcvTimeout") == 0) { - info->rcvTimeout = atoi(val); - } else if (strcmp(binding, "maxMsgSize") == 0) { - info->maxMsgSize = atoi(val); - } else if (strcmp(binding, "rate") == 0) { - info->rate = atoi(val); - } else if (strcmp(binding, "recoveryIVL") == 0) { - info->recoveryIVL = atoi(val); - } else if (strcmp(binding, "multicastHops") == 0) { - info->multicastHops = atoi(val); - } else if (strcmp(binding, "reconnectIVL") == 0) { - info->reconnectIVL = atoi(val); - } else if (strcmp(binding, "reconnectIVLMax") == 0) { - info->reconnectIVLMax = atoi(val); - } else if (strcmp(binding, "ipv4Only") == 0) { - info->ipv4Only = atoi(val); - } else if (strcmp(binding, "affinity") == 0) { - info->affinity = atoi(val); - } else { - errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding); - return RS_RET_INVALID_PARAMS; + /* create empty list */ + CHKmalloc(*subList = (sublist*)MALLOC(sizeof(sublist))); + head = *subList; + head->next = NULL; + head->subscribe=NULL; + currentSub=head; + + if(tok) { + head->subscribe=strdup(tok); + for(tok=strtok(NULL, ","); tok!=NULL;tok=strtok(NULL, ",")) { + CHKmalloc(currentSub->next = (sublist*)MALLOC(sizeof(sublist))); + currentSub=currentSub->next; + currentSub->subscribe=strdup(tok); + currentSub->next=NULL; } + } else { + /* make empty subscription ie subscribe="" */ + head->subscribe=strdup(""); } - - return RS_RET_OK; + /* TODO: temporary logging */ + currentSub = head; + DBGPRINTF("imzmq3: Subscriptions:"); + for(currentSub = head; currentSub != NULL; currentSub=currentSub->next) { + DBGPRINTF("'%s'", currentSub->subscribe); + } + DBGPRINTF("\n"); +finalize_it: + RETiRet; } -static rsRetVal validateConfig(socket_info* info) { +static rsRetVal validateConfig(instanceConf_t* info) { if (info->type == -1) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, @@ -307,7 +328,7 @@ static rsRetVal validateConfig(socket_info* info) { } if(info->type == ZMQ_SUB && info->subscriptions == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, - "SUB sockets need at least one subscription"); + "SUB sockets need a subscription"); return RS_RET_INVALID_PARAMS; } if(info->type != ZMQ_SUB && info->subscriptions != NULL) { @@ -320,39 +341,40 @@ static rsRetVal validateConfig(socket_info* info) { static rsRetVal createContext() { if (s_context == NULL) { - errmsg.LogError(0, NO_ERRCODE, "creating zctx."); + DBGPRINTF("imzmq3: creating zctx..."); + zsys_handler_set(NULL); s_context = zctx_new(); if (s_context == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zctx_new failed: %s", - strerror(errno)); + zmq_strerror(errno)); /* DK: really should do better than invalid params...*/ return RS_RET_INVALID_PARAMS; } - - if (s_io_threads > 1) { - errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads); - zctx_set_iothreads(s_context, s_io_threads); + DBGPRINTF("success!\n"); + if (runModConf->io_threads > 1) { + DBGPRINTF("setting io worker threads to %d\n", runModConf->io_threads); + zctx_set_iothreads(s_context, runModConf->io_threads); } } return RS_RET_OK; } -static rsRetVal createSocket(socket_info* info, void** sock) { - size_t ii; +static rsRetVal createSocket(instanceConf_t* info, void** sock) { int rv; + sublist* sub; *sock = zsocket_new(s_context, info->type); if (!sock) { - errmsg.LogError(0, + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zsocket_new failed: %s, for type %d", - strerror(errno),info->type); - /* DK: invalid params seems right here */ + zmq_strerror(errno),info->type); + /* DK: invalid params seems right here */ return RS_RET_INVALID_PARAMS; } - + DBGPRINTF("imzmq3: socket of type %d created successfully\n", info->type) /* Set options *before* the connect/bind. */ if (info->identity) zsocket_set_identity(*sock, info->identity); if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); @@ -369,38 +391,36 @@ static rsRetVal createSocket(socket_info* info, void** sock) { if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax); if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); - - /* since HWM have defaults, we always set them. No return codes to check, either.*/ - zsocket_set_sndhwm(*sock, info->sndHWM); - zsocket_set_rcvhwm(*sock, info->rcvHWM); - + if (info->sndHWM > -1 ) zsocket_set_sndhwm(*sock, info->sndHWM); + if (info->rcvHWM > -1 ) zsocket_set_rcvhwm(*sock, info->rcvHWM); /* Set subscriptions.*/ if (info->type == ZMQ_SUB) { - for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) - zsocket_set_subscribe(*sock, info->subscriptions[ii]); + for(sub = info->subscriptions; sub!=NULL; sub=sub->next) { + zsocket_set_subscribe(*sock, sub->subscribe); + } } - - /* Do the bind/connect... */ if (info->action==ACTION_CONNECT) { rv = zsocket_connect(*sock, info->description); - if (rv < 0) { + if (rv == -1) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zmq_connect using %s failed: %s", - info->description, strerror(errno)); + info->description, zmq_strerror(errno)); return RS_RET_INVALID_PARAMS; } + DBGPRINTF("imzmq3: connect for %s successful\n",info->description); } else { rv = zsocket_bind(*sock, info->description); - if (rv <= 0) { + if (rv == -1) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zmq_bind using %s failed: %s", - info->description, strerror(errno)); + info->description, zmq_strerror(errno)); return RS_RET_INVALID_PARAMS; } + DBGPRINTF("imzmq3: bind for %s successful\n",info->description); } return RS_RET_OK; } @@ -409,89 +429,138 @@ static rsRetVal createSocket(socket_info* info, void** sock) { * Module endpoints */ -/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note - * that this makes the assumption that after the bind ruleset is called in the config, - * another call will be made to add an endpoint. -*/ -static rsRetVal -set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) { - ruleset_t* ruleset_ptr; - rsRetVal localRet; - DEFiRet; - - localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName); - if(localRet == RS_RET_NOT_FOUND) { - errmsg.LogError(0, NO_ERRCODE, "error: " - "ruleset '%s' not found - ignored", pszName); - } - CHKiRet(localRet); - s_ruleset = ruleset_ptr; - DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName); - -finalize_it: - free(pszName); /* no longer needed */ - RETiRet; -} /* add an actual endpoint */ -static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { +static rsRetVal createInstance(instanceConf_t** pinst) { DEFiRet; + instanceConf_t* inst; + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); - /* increment number of items and store old num items, as it will be handy.*/ - size_t idx = s_nitems++; - - /* allocate a new socket_info array to accomidate this new endpoint*/ - socket_info* tmpSocketInfo; - CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); + /* set defaults into new instance config struct */ + setDefaults(inst); - /* copy existing socket_info across into new array, if any, and free old storage*/ - if(idx) { - memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); - free(s_socketInfo); + /* add this to the config */ + if (runModConf->root == NULL || runModConf->tail == NULL) { + runModConf->tail = runModConf->root = inst; + } else { + runModConf->tail->next = inst; + runModConf->tail = inst; } + *pinst = inst; +finalize_it: + RETiRet; +} - /* set the static to hold the new array */ - s_socketInfo = tmpSocketInfo; - - /* point to the new one */ - socket_info* sockInfo = &s_socketInfo[idx]; - - /* set defaults for the new socket info */ - setDefaults(sockInfo); - - /* Make a writeable copy of the string so we can use strtok - in the parseConfig call */ - char * copy = NULL; - CHKmalloc(copy = strdup((char *) valp)); +static rsRetVal createListener(struct cnfparamvals* pvals) { + instanceConf_t* inst; + int i; + DEFiRet; - /* parse the config string */ - CHKiRet(parseConfig(copy, sockInfo)); + CHKiRet(createInstance(&inst)); + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "ruleset")) { + inst->pszBindRuleset = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "description")) { + inst->description = es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "sockType")){ + inst->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(inppblk.descr[i].name, "action")){ + inst->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(inppblk.descr[i].name, "sndHWM")) { + inst->sndHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rcvHWM")) { + inst->rcvHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "subscribe")) { + CHKiRet(parseSubscriptions(es_str2cstr(pvals[i].val.d.estr, NULL), + &inst->subscriptions)); + } else if(!strcmp(inppblk.descr[i].name, "identity")){ + inst->identity = es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "sndBuf")) { + inst->sndBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rcvBuf")) { + inst->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "linger")) { + inst->linger = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "backlog")) { + inst->backlog = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "sndTimeout")) { + inst->sndTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rcvTimeout")) { + inst->rcvTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "maxMsgSize")) { + inst->maxMsgSize = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rate")) { + inst->rate = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "recoveryIVL")) { + inst->recoveryIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "multicastHops")) { + inst->multicastHops = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "reconnectIVL")) { + inst->reconnectIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "reconnectIVLMax")) { + inst->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ipv4Only")) { + inst->ipv4Only = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "affinity")) { + inst->affinity = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } - /* validate it */ - CHKiRet(validateConfig(sockInfo)); + } +finalize_it: + RETiRet; +} + +static rsRetVal addListener(instanceConf_t* inst){ + /* create the socket */ + void* sock; + struct lstn_s* newcnfinfo; + DEFiRet; - /* bind to the current ruleset (if any)*/ - sockInfo->ruleset = s_ruleset; + CHKiRet(createSocket(inst, &sock)); + + /* now create new lstn_s struct */ + CHKmalloc(newcnfinfo=(struct lstn_s*)MALLOC(sizeof(struct lstn_s))); + newcnfinfo->next = NULL; + newcnfinfo->sock = sock; + newcnfinfo->pRuleset = inst->pBindRuleset; + /* add this struct to the global */ + if(lcnfRoot == NULL) { + lcnfRoot = newcnfinfo; + } + if(lcnfLast == NULL) { + lcnfLast = newcnfinfo; + } else { + lcnfLast->next = newcnfinfo; + lcnfLast = newcnfinfo; + } + finalize_it: - free(valp); /* in any case, this is no longer needed */ - RETiRet; + RETiRet; } - static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) { - msg_t* logmsg; + msg_t* pMsg; poller_data* pollerData = (poller_data*)pd; char* buf = zstr_recv(poller->socket); - if (msgConstruct(&logmsg) == RS_RET_OK) { - MsgSetRawMsg(logmsg, buf, strlen(buf)); - MsgSetInputName(logmsg, s_namep); - MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY); - MsgSetRuleset(logmsg, pollerData->ruleset); - logmsg->msgFlags = NEEDS_PARSING; - submitMsg(logmsg); + if (msgConstruct(&pMsg) == RS_RET_OK) { + MsgSetRawMsg(pMsg, buf, strlen(buf)); + MsgSetInputName(pMsg, s_namep); + MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); + MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp()); + MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP()); + MsgSetMSGoffs(pMsg, 0); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(pMsg, pollerData->ruleset); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + submitMsg2(pMsg); } /* gotta free the string returned from zstr_recv() */ @@ -510,51 +579,65 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po /* called when runInput is called by rsyslog */ static rsRetVal rcv_loop(thrdInfo_t* pThrd){ + size_t n_items = 0; size_t i; int rv; - zmq_pollitem_t* items; - poller_data* pollerData; - + zmq_pollitem_t* items = NULL; + poller_data* pollerData = NULL; + struct lstn_s* current; + instanceConf_t* inst; DEFiRet; - - /* create the context*/ - CHKiRet(createContext()); + /* now add listeners. This actually creates the sockets, etc... */ + for (inst = runModConf->root; inst != NULL; inst=inst->next) { + addListener(inst); + } + if (lcnfRoot == NULL) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: no listeners were " + "started, input not activated.\n"); + ABORT_FINALIZE(RS_RET_NO_RUN); + } + + /* count the # of items first */ + for(current=lcnfRoot;current!=NULL;current=current->next) + n_items++; + + /* make arrays of pollitems, pollerdata so they are easy to delete later */ + /* create the poll items*/ - CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); + CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*n_items)); /* create poller data (stuff to pass into the zmq closure called when we get a message)*/ - CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); + CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*n_items)); /* loop through and initialize the poll items and poller_data arrays...*/ - for(i=0; i<s_nitems;++i) { + for(i=0, current = lcnfRoot; current != NULL; current = current->next, i++) { /* create the socket, update items.*/ - createSocket(&s_socketInfo[i], &items[i].socket); + items[i].socket=current->sock; items[i].events = ZMQ_POLLIN; /* now update the poller_data for this item */ pollerData[i].thread = pThrd; - pollerData[i].ruleset = s_socketInfo[i].ruleset; + pollerData[i].ruleset = current->pRuleset; } - + s_zloop = zloop_new(); - for(i=0; i<s_nitems; ++i) { + for(i=0; i<n_items; ++i) { rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]); if (rv) { - errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i); + errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu: %s", i, zmq_strerror(errno)); } } + DBGPRINTF("imzmq3: zloop_poller starting..."); zloop_start(s_zloop); zloop_destroy(&s_zloop); - finalize_it: - for(i=0; i< s_nitems; ++i) { - zsocket_destroy(s_context, items[i].socket); - } - + DBGPRINTF("imzmq3: zloop_poller stopped."); +finalize_it: zctx_destroy(&s_context); free(items); + free(pollerData); RETiRet; } @@ -564,7 +647,8 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){ BEGINrunInput CODESTARTrunInput - iRet = rcv_loop(pThrd); + CHKiRet(rcv_loop(pThrd)); +finalize_it: RETiRet; ENDrunInput @@ -572,17 +656,13 @@ ENDrunInput /* initialize and return if will run or not */ BEGINwillRun CODESTARTwillRun - /* we need to create the inputName property (only once during our + /* we need to create the inputName property (only once during our lifetime) */ - CHKiRet(prop.Construct(&s_namep)); - CHKiRet(prop.SetString(s_namep, + CHKiRet(prop.Construct(&s_namep)); + CHKiRet(prop.SetString(s_namep, UCHAR_CONSTANT("imzmq3"), sizeof("imzmq3") - 1)); - CHKiRet(prop.ConstructFinalize(s_namep)); - -/* If there are no endpoints this is pointless ...*/ - if (s_nitems == 0) - ABORT_FINALIZE(RS_RET_NO_RUN); + CHKiRet(prop.ConstructFinalize(s_namep)); finalize_it: ENDwillRun @@ -590,70 +670,207 @@ ENDwillRun BEGINafterRun CODESTARTafterRun - /* do cleanup here */ - if(s_namep != NULL) - prop.Destruct(&s_namep); + /* do cleanup here */ + if (s_namep != NULL) + prop.Destruct(&s_namep); ENDafterRun BEGINmodExit CODESTARTmodExit - /* release what we no longer need */ - objRelease(errmsg, CORE_COMPONENT); - objRelease(glbl, CORE_COMPONENT); - objRelease(prop, CORE_COMPONENT); + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); ENDmodExit BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature - if(eFeat == sFEATURENonCancelInputTermination) - iRet = RS_RET_OK; + if (eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; ENDisCompatibleWithFeature +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + /* After endCnfLoad() (BEGINendCnfLoad...ENDendCnfLoad) is called, + * the pModConf pointer must not be used to change the in-memory + * config object. It's safe to use the same pointer for accessing + * the config object until freeCnf() (BEGINfreeCnf...ENDfreeCnf). */ + runModConf = pModConf; + runModConf->pConf = pConf; + /* init module config */ + runModConf->io_threads = 0; /* 0 means don't set it */ +ENDbeginCnfLoad + + +BEGINsetModCnf + struct cnfparamvals* pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if (NULL == pvals) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imzmq3: error processing module " + " config parameters ['module(...)']"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + for (i=0; i < modpblk.nParams; ++i) { + if (!pvals[i].bUsed) + continue; + if (!strcmp(modpblk.descr[i].name, "ioThreads")) { + runModConf->io_threads = (int)pvals[i].val.d.n; + } else { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "imzmq3: config error, unknown " + "param %s in setModCnf\n", + modpblk.descr[i].name); + } + } + +finalize_it: + if (pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + +BEGINendCnfLoad +CODESTARTendCnfLoad + /* Last chance to make changes to the in-memory config object for this + * input module. After this call, the config object must no longer be + * changed. */ + if (pModConf != runModConf) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has " + "changed - pModConf=%p, runModConf=%p", pModConf, runModConf); + } + assert(pModConf == runModConf); +ENDendCnfLoad + + +/* function to generate error message if framework does not find requested ruleset */ +static inline void +std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst) +{ + errmsg.LogError(0, NO_ERRCODE, "imzmq3: ruleset '%s' for socket %s not found - " + "using default ruleset instead", inst->pszBindRuleset, + inst->description); +} + + +BEGINcheckCnf +instanceConf_t* inst; +CODESTARTcheckCnf + for(inst = pModConf->root; inst!=NULL; inst=inst->next) { + std_checkRuleset(pModConf, inst); + /* now, validate the instanceConf */ + CHKiRet(validateConfig(inst)); + } +finalize_it: + RETiRet; +ENDcheckCnf + + +BEGINactivateCnfPrePrivDrop +CODESTARTactivateCnfPrePrivDrop + if (pModConf != runModConf) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has " + "changed - pModConf=%p, runModConf=%p", pModConf, runModConf); + } + assert(pModConf == runModConf); + + /* first create the context */ + createContext(); + + /* could setup context here, and set the global worker threads + and so on... */ +ENDactivateCnfPrePrivDrop + + +BEGINactivateCnf +CODESTARTactivateCnf + if (pModConf != runModConf) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has " + "changed - pModConf=%p, runModConf=%p", pModConf, runModConf); + } + assert(pModConf == runModConf); +ENDactivateCnf + + +BEGINfreeCnf + struct lstn_s *lstn, *lstn_r; + instanceConf_t *inst, *inst_r; + sublist *sub, *sub_r; +CODESTARTfreeCnf + DBGPRINTF("imzmq3: BEGINfreeCnf ...\n"); + if (pModConf != runModConf) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has " + "changed - pModConf=%p, runModConf=%p", pModConf, runModConf); + } + for (lstn = lcnfRoot; lstn != NULL; ) { + lstn_r = lstn; + lstn = lstn_r->next; + free(lstn_r); + } + for (inst = pModConf->root ; inst != NULL ; ) { + for (sub = inst->subscriptions; sub != NULL; ) { + free(sub->subscribe); + sub_r = sub; + sub = sub_r->next; + free(sub_r); + } + free(inst->pszBindRuleset); + inst_r = inst; + inst = inst->next; + free(inst_r); + } +ENDfreeCnf + + +BEGINnewInpInst + struct cnfparamvals* pvals; +CODESTARTnewInpInst + + DBGPRINTF("newInpInst (imzmq3)\n"); + pvals = nvlstGetParams(lst, &inppblk, NULL); + if(NULL==pvals) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, + "imzmq3: required parameters are missing\n"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + DBGPRINTF("imzmq3: input param blk:\n"); + cnfparamsPrint(&inppblk, pvals); + + /* now, parse the config params and so on... */ + CHKiRet(createListener(pvals)); + +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES +CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt -static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, - void __attribute__((unused)) *pVal) { - return RS_RET_OK; -} -static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) { - errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val); - s_io_threads = val; - return RS_RET_OK; -} BEGINmodInit() CODESTARTmodInit /* we only support the current interface specification */ - *ipIFVersProvided = CURR_MOD_IF_VERSION; + *ipIFVersProvided = CURR_MOD_IF_VERSION; CODEmodInit_QueryRegCFSLineHdlr - CHKiRet(objUse(errmsg, CORE_COMPONENT)); - CHKiRet(objUse(glbl, CORE_COMPONENT)); - CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); - - /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset", - 0, eCmdHdlrGetWord, - set_ruleset, NULL, - STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun", - 0, eCmdHdlrGetWord, - add_endpoint, NULL, - STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", - 1, eCmdHdlrCustomHandler, - resetConfigVariables, NULL, - STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads", - 1, eCmdHdlrInt, - setGlobalWorkerThreads, NULL, - STD_LOADABLE_MODULE_ID)); ENDmodInit + + diff --git a/plugins/mmaudit/Makefile.am b/plugins/mmaudit/Makefile.am index c64d0822..77b2e85f 100644 --- a/plugins/mmaudit/Makefile.am +++ b/plugins/mmaudit/Makefile.am @@ -1,8 +1,8 @@ pkglib_LTLIBRARIES = mmaudit.la mmaudit_la_SOURCES = mmaudit.c -mmaudit_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBLOGNORM_CFLAGS) $(LIBEE_CFLAGS) -mmaudit_la_LDFLAGS = -module -avoid-version $(LIBLOGNORM_LIBS) $(LIBEE_LIBS) +mmaudit_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +mmaudit_la_LDFLAGS = -module -avoid-version mmaudit_la_LIBADD = EXTRA_DIST = diff --git a/plugins/mmaudit/mmaudit.c b/plugins/mmaudit/mmaudit.c index 018e1771..6b6b804c 100644 --- a/plugins/mmaudit/mmaudit.c +++ b/plugins/mmaudit/mmaudit.c @@ -43,8 +43,7 @@ #include <errno.h> #include <unistd.h> #include <ctype.h> -#include <libestr.h> -#include <libee/libee.h> +#include <json/json.h> #include "conf.h" #include "syslogd-types.h" #include "template.h" diff --git a/plugins/mmcount/Makefile.am b/plugins/mmcount/Makefile.am new file mode 100644 index 00000000..9c8c99db --- /dev/null +++ b/plugins/mmcount/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = mmcount.la + +mmcount_la_SOURCES = mmcount.c +mmcount_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +mmcount_la_LDFLAGS = -module -avoid-version +mmcount_la_LIBADD = + +EXTRA_DIST = diff --git a/plugins/mmcount/mmcount.c b/plugins/mmcount/mmcount.c new file mode 100644 index 00000000..56a4de55 --- /dev/null +++ b/plugins/mmcount/mmcount.c @@ -0,0 +1,342 @@ +/* mmcount.c + * count messages by priority or json property of given app-name. + * + * Copyright 2013 Red Hat Inc. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <unistd.h> +#include <stdint.h> +#include <json/json.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "hashtable.h" + +#define JSON_COUNT_NAME "!mmcount" +#define SEVERITY_COUNT 8 + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("mmcount") + + +DEFobjCurrIf(errmsg); +DEF_OMOD_STATIC_DATA + +/* config variables */ + +typedef struct _instanceData { + char *pszAppName; + int severity[SEVERITY_COUNT]; + char *pszKey; + char *pszValue; + int valueCounter; + struct hashtable *ht; +} instanceData; + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ +}; +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ + + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "appname", eCmdHdlrGetWord, 0 }, + { "key", eCmdHdlrGetWord, 0 }, + { "value", eCmdHdlrGetWord, 0 }, +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; +ENDbeginCnfLoad + +BEGINendCnfLoad +CODESTARTendCnfLoad +ENDendCnfLoad + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + +BEGINactivateCnf +CODESTARTactivateCnf + runModConf = pModConf; +ENDactivateCnf + +BEGINfreeCnf +CODESTARTfreeCnf +ENDfreeCnf + + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature +ENDisCompatibleWithFeature + + +BEGINfreeInstance +CODESTARTfreeInstance +ENDfreeInstance + + +static inline void +setInstParamDefaults(instanceData *pData) +{ + int i; + + pData->pszAppName = NULL; + for (i = 0; i < SEVERITY_COUNT; i++) + pData->severity[i] = 0; + pData->pszKey = NULL; + pData->pszValue = NULL; + pData->valueCounter = 0; + pData->ht = NULL; +} + +static unsigned int +hash_from_key_fn(void *k) +{ + return *(unsigned int *)k; +} + +static int +key_equals_fn(void *k1, void *k2) +{ + return (*(unsigned int *)k1 == *(unsigned int *)k2); +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst + DBGPRINTF("newActInst (mmcount)\n"); + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + CODE_STD_STRING_REQUESTnewActInst(1) + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "appname")) { + pData->pszAppName = es_str2cstr(pvals[i].val.d.estr, NULL); + continue; + } + if(!strcmp(actpblk.descr[i].name, "key")) { + pData->pszKey = es_str2cstr(pvals[i].val.d.estr, NULL); + continue; + } + if(!strcmp(actpblk.descr[i].name, "value")) { + pData->pszValue = es_str2cstr(pvals[i].val.d.estr, NULL); + continue; + } + dbgprintf("mmcount: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + + if(pData->pszAppName == NULL) { + dbgprintf("mmcount: action requires a appname"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(pData->pszKey != NULL && pData->pszValue == NULL) { + if(NULL == (pData->ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL))) { + DBGPRINTF("mmcount: error creating hash table!\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + } +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + +static int * +getCounter(struct hashtable *ht, char *str) { + unsigned int key; + int *pCounter; + unsigned int *pKey; + + /* we dont store str as key, instead we store hash of the str + as key to reduce memory usage */ + key = hash_from_string(str); + pCounter = hashtable_search(ht, &key); + if(pCounter) { + return pCounter; + } + + /* counter is not found for the str, so add new entry and + return the counter */ + if(NULL == (pKey = (unsigned int*)malloc(sizeof(unsigned int)))) { + DBGPRINTF("mmcount: memory allocation for key failed\n"); + return NULL; + } + *pKey = key; + + if(NULL == (pCounter = (int*)malloc(sizeof(int)))) { + DBGPRINTF("mmcount: memory allocation for value failed\n"); + free(pKey); + return NULL; + } + *pCounter = 0; + + if(!hashtable_insert(ht, pKey, pCounter)) { + DBGPRINTF("mmcount: inserting element into hashtable failed\n"); + free(pKey); + free(pCounter); + return NULL; + } + return pCounter; +} + +BEGINdoAction + msg_t *pMsg; + char *appname; + struct json_object *json = NULL; + es_str_t *estr = NULL; + struct json_object *keyjson = NULL; + char *pszValue; + int *pCounter; +CODESTARTdoAction + pMsg = (msg_t*) ppString[0]; + appname = getAPPNAME(pMsg, LOCK_MUTEX); + + if(0 != strcmp(appname, pData->pszAppName)) { + /* we are not working for this appname. nothing to do */ + ABORT_FINALIZE(RS_RET_OK); + } + + if(!pData->pszKey) { + /* no key given for count, so we count severity */ + if(pMsg->iSeverity <= SEVERITY_COUNT) { + pData->severity[pMsg->iSeverity]++; + json = json_object_new_int(pData->severity[pMsg->iSeverity]); + } + ABORT_FINALIZE(RS_RET_OK); + } + + /* key is given, so get the property json */ + estr = es_newStrFromBuf(pData->pszKey, strlen(pData->pszKey)); + if(msgGetCEEPropJSON(pMsg, estr, &keyjson) != RS_RET_OK) { + /* key not found in the message. nothing to do */ + ABORT_FINALIZE(RS_RET_OK); + } + + /* key found, so get the value */ + pszValue = (char*)json_object_get_string(keyjson); + + if(pData->pszValue) { + /* value also given for count */ + if(!strcmp(pszValue, pData->pszValue)) { + /* count for (value and key and appname) matched */ + pData->valueCounter++; + json = json_object_new_int(pData->valueCounter); + } + ABORT_FINALIZE(RS_RET_OK); + } + + /* value is not given, so we count for each value of given key */ + pCounter = getCounter(pData->ht, pszValue); + if(pCounter) { + (*pCounter)++; + json = json_object_new_int(*pCounter); + } +finalize_it: + if(estr) { + es_deleteStr(estr); + } + + if(json) { + msgAddJSON(pMsg, (uchar *)JSON_COUNT_NAME, json); + } +ENDdoAction + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":mmcount:", sizeof(":mmcount:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "mmcount supports only v6+ config format, use: " + "action(type=\"mmcount\" ...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + + +BEGINmodExit +CODESTARTmodExit + objRelease(errmsg, CORE_COMPONENT); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +ENDqueryEtryPt + + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + DBGPRINTF("mmcount: module compiled with rsyslog version %s.\n", VERSION); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); +ENDmodInit diff --git a/plugins/mmfields/Makefile.am b/plugins/mmfields/Makefile.am new file mode 100644 index 00000000..08170d52 --- /dev/null +++ b/plugins/mmfields/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = mmfields.la + +mmfields_la_SOURCES = mmfields.c +mmfields_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +mmfields_la_LDFLAGS = -module -avoid-version +mmfields_la_LIBADD = + +EXTRA_DIST = diff --git a/plugins/mmfields/mmfields.c b/plugins/mmfields/mmfields.c new file mode 100644 index 00000000..99c78916 --- /dev/null +++ b/plugins/mmfields/mmfields.c @@ -0,0 +1,265 @@ +/* mmfields.c + * Parse all fields of the message into structured data inside the + * JSON tree. + * + * Copyright 2013 Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <unistd.h> +#include <stdint.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("mmfields") + + +DEFobjCurrIf(errmsg); +DEF_OMOD_STATIC_DATA + +/* config variables */ + +/* define operation modes we have */ +#define SIMPLE_MODE 0 /* just overwrite */ +#define REWRITE_MODE 1 /* rewrite IP address, canoninized */ +typedef struct _instanceData { + char separator; +} instanceData; + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ +}; +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ + + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "separator", eCmdHdlrGetChar, 0 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; +ENDbeginCnfLoad + +BEGINendCnfLoad +CODESTARTendCnfLoad +ENDendCnfLoad + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + +BEGINactivateCnf +CODESTARTactivateCnf + runModConf = pModConf; +ENDactivateCnf + +BEGINfreeCnf +CODESTARTfreeCnf +ENDfreeCnf + + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature +ENDisCompatibleWithFeature + + +BEGINfreeInstance +CODESTARTfreeInstance +ENDfreeInstance + + +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->separator = ','; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst + DBGPRINTF("newActInst (mmfields)\n"); + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + CODE_STD_STRING_REQUESTnewActInst(1) + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "separator")) { + pData->separator = es_getBufAddr(pvals[i].val.d.estr)[0]; + } else { + dbgprintf("mmfields: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + + +static inline rsRetVal +extractField(instanceData *pData, uchar *msgtext, int lenMsg, int *curridx, uchar *fieldbuf) +{ + int i, j; + DEFiRet; + i = *curridx; + j = 0; + while(i < lenMsg && msgtext[i] != pData->separator) { + fieldbuf[j++] = msgtext[i++]; + } + fieldbuf[j] = '\0'; + if(i < lenMsg) + ++i; + *curridx = i; + + RETiRet; +} + + +static inline rsRetVal +parse_fields(instanceData *pData, msg_t *pMsg, uchar *msgtext, int lenMsg) +{ + uchar fieldbuf[32*1024]; + uchar fieldname[512]; + struct json_object *json; + struct json_object *jval; + int field; + uchar *buf; + int currIdx = 0; + DEFiRet; + + if(lenMsg < (int) sizeof(fieldbuf)) { + buf = fieldbuf; + } else { + CHKmalloc(buf = malloc(lenMsg+1)); + } + + json = json_object_new_object(); + if(json == NULL) { + ABORT_FINALIZE(RS_RET_ERR); + } + field = 1; + while(currIdx < lenMsg) { + CHKiRet(extractField(pData, msgtext, lenMsg, &currIdx, buf)); + DBGPRINTF("mmfields: field %d: '%s'\n", field, buf); + snprintf(fieldname, sizeof(fieldname), "f%d", (char*)field); + fieldname[sizeof(fieldname)-1] = '\0'; + jval = json_object_new_string((char*)fieldbuf); + json_object_object_add(json, (char*)fieldname, jval); + field++; + } + msgAddJSON(pMsg, (uchar*)"!", json); +finalize_it: + RETiRet; +} + + +BEGINdoAction + msg_t *pMsg; + uchar *msg; + int lenMsg; +CODESTARTdoAction + pMsg = (msg_t*) ppString[0]; + lenMsg = getMSGLen(pMsg); + msg = getMSG(pMsg); + CHKiRet(parse_fields(pData, pMsg, msg, lenMsg)); +finalize_it: +ENDdoAction + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":mmfields:", sizeof(":mmfields:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "mmfields supports only v6+ config format, use: " + "action(type=\"mmfields\" ...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + + +BEGINmodExit +CODESTARTmodExit + objRelease(errmsg, CORE_COMPONENT); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +ENDqueryEtryPt + + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + DBGPRINTF("mmfields: module compiled with rsyslog version %s.\n", VERSION); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); +ENDmodInit diff --git a/plugins/mmjsonparse/Makefile.am b/plugins/mmjsonparse/Makefile.am index 5175fe81..ef39163e 100644 --- a/plugins/mmjsonparse/Makefile.am +++ b/plugins/mmjsonparse/Makefile.am @@ -1,8 +1,8 @@ pkglib_LTLIBRARIES = mmjsonparse.la mmjsonparse_la_SOURCES = mmjsonparse.c -mmjsonparse_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBLOGNORM_CFLAGS) $(LIBEE_CFLAGS) -mmjsonparse_la_LDFLAGS = -module -avoid-version $(LIBLOGNORM_LIBS) $(LIBEE_LIBS) +mmjsonparse_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +mmjsonparse_la_LDFLAGS = -module -avoid-version mmjsonparse_la_LIBADD = EXTRA_DIST = diff --git a/plugins/mmjsonparse/mmjsonparse.c b/plugins/mmjsonparse/mmjsonparse.c index c47aceb6..35f69aab 100644 --- a/plugins/mmjsonparse/mmjsonparse.c +++ b/plugins/mmjsonparse/mmjsonparse.c @@ -35,7 +35,6 @@ #include <errno.h> #include <unistd.h> #include <ctype.h> -#include <libestr.h> #include <json/json.h> #include "conf.h" #include "syslogd-types.h" diff --git a/plugins/omjournal/omjournal.c b/plugins/omjournal/omjournal.c index c340287f..160c369d 100644 --- a/plugins/omjournal/omjournal.c +++ b/plugins/omjournal/omjournal.c @@ -107,6 +107,7 @@ CODESTARTnewActInst * the lst ptr. However, we will most probably need params in the * future. */ + (void) lst; /* prevent compiler warning */ DBGPRINTF("newActInst (mmjournal)\n"); CODE_STD_STRING_REQUESTnewActInst(1) CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); @@ -145,6 +146,7 @@ CODESTARTdoAction "SYSLOG_IDENTIFIER=%s", tag, NULL); /* FIXME: think about what to do with errors ;) */ + (void) r; /* prevent compiler warning */ ENDdoAction diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c index c9e32444..50f6f905 100644 --- a/plugins/omrelp/omrelp.c +++ b/plugins/omrelp/omrelp.c @@ -55,16 +55,22 @@ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) +#define DFLT_ENABLE_TLS 0 +#define DFLT_ENABLE_TLSZIP 0 + static relpEngine_t *pRelpEngine; /* our relp engine */ typedef struct _instanceData { uchar *target; - int compressionLevel; /* 0 - no compression, else level for zlib */ uchar *port; int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */ int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */ unsigned timeout; + unsigned rebindInterval; + unsigned nSent; relpClt_t *pRelpClt; /* relp client for this instance */ + sbool bEnableTLS; + sbool bEnableTLSZip; uchar *tplName; } instanceData; @@ -78,7 +84,10 @@ static configSettings_t __attribute__((unused)) cs; /* action (instance) parameters */ static struct cnfparamdescr actpdescr[] = { { "target", eCmdHdlrGetWord, 1 }, + { "tls", eCmdHdlrBinary, 0 }, + { "tls.compression", eCmdHdlrBinary, 0 }, { "port", eCmdHdlrGetWord, 0 }, + { "rebindinterval", eCmdHdlrInt, 0 }, { "timeout", eCmdHdlrInt, 0 }, { "template", eCmdHdlrGetWord, 1 } }; @@ -113,6 +122,20 @@ doCreateRelpClient(instanceData *pData) ABORT_FINALIZE(RS_RET_RELP_ERR); if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); + if(pData->bEnableTLS) { + if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK) + ABORT_FINALIZE(RS_RET_RELP_ERR); + if(pData->bEnableTLSZip) { + if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK) + ABORT_FINALIZE(RS_RET_RELP_ERR); + } + } + if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */ + if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK) + ABORT_FINALIZE(RS_RET_RELP_ERR); + } + pData->bInitialConnect = 1; + pData->nSent = 0; finalize_it: RETiRet; } @@ -120,7 +143,10 @@ finalize_it: BEGINcreateInstance CODESTARTcreateInstance - pData->bInitialConnect = 1; + pData->timeout = 90; + pData->rebindInterval = 0; + pData->bEnableTLS = DFLT_ENABLE_TLS; + pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP; ENDcreateInstance BEGINfreeInstance @@ -139,6 +165,9 @@ setInstParamDefaults(instanceData *pData) pData->port = NULL; pData->tplName = NULL; pData->timeout = 90; + pData->rebindInterval = 0; + pData->bEnableTLS = DFLT_ENABLE_TLS; + pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP; } @@ -164,6 +193,12 @@ CODESTARTnewActInst pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "timeout")) { pData->timeout = (unsigned) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) { + pData->rebindInterval = (unsigned) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "tls")) { + pData->bEnableTLS = (unsigned) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "tls.compression")) { + pData->bEnableTLSZip = (unsigned) pvals[i].val.d.n; } else { dbgprintf("omrelp: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); @@ -232,6 +267,17 @@ CODESTARTtryResume iRet = doConnect(pData); ENDtryResume +static inline rsRetVal +doRebind(instanceData *pData) +{ + DEFiRet; + DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n"); + CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt)); + pData->bIsConnected = 0; + CHKiRet(doCreateRelpClient(pData)); +finalize_it: + RETiRet; +} BEGINdoAction uchar *pMsg; /* temporary buffering */ @@ -247,7 +293,7 @@ CODESTARTdoAction pMsg = ppString[0]; lenMsg = strlen((char*) pMsg); /* TODO: don't we get this? */ - /* TODO: think about handling oversize messages! */ + /* we need to truncate oversize msgs - no way around that... */ if((int) lenMsg > glbl.GetMaxLine()) lenMsg = glbl.GetMaxLine(); @@ -256,9 +302,13 @@ CODESTARTdoAction if(ret != RELP_RET_OK) { /* error! */ dbgprintf("error forwarding via relp, suspending\n"); - iRet = RS_RET_SUSPENDED; + ABORT_FINALIZE(RS_RET_SUSPENDED); } + if(pData->rebindInterval != 0 && + (++pData->nSent >= pData->rebindInterval)) { + doRebind(pData); + } finalize_it: ENDdoAction @@ -279,62 +329,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) if((iRet = createInstance(&pData)) != RS_RET_OK) FINALIZE; - /* we are now after the protocol indicator. Now check if we should - * use compression. We begin to use a new option format for this: - * @(option,option)host:port - * The first option defined is "z[0..9]" where the digit indicates - * the compression level. If it is not given, 9 (best compression) is - * assumed. An example action statement might be: - * :omrelp:(z5,o)127.0.0.1:1400 - * Which means send via TCP with medium (5) compresion (z) to the local - * host on port 1400. The '0' option means that octet-couting (as in - * IETF I-D syslog-transport-tls) is to be used for framing (this option - * applies to TCP-based syslog only and is ignored when specified with UDP). - * That is not yet implemented. - * rgerhards, 2006-12-07 - * TODO: think of all this in spite of RELP -- rgerhards, 2008-03-13 - */ - if(*p == '(') { - /* at this position, it *must* be an option indicator */ - do { - ++p; /* eat '(' or ',' (depending on when called) */ - /* check options */ - if(*p == 'z') { /* compression */ -# ifdef USE_NETZIP - ++p; /* eat */ - if(isdigit((int) *p)) { - int iLevel; - iLevel = *p - '0'; - ++p; /* eat */ - pData->compressionLevel = iLevel; - } else { - errmsg.LogError(0, NO_ERRCODE, "Invalid compression level '%c' specified in " - "forwardig action - NOT turning on compression.", - *p); - } -# else - errmsg.LogError(0, NO_ERRCODE, "Compression requested, but rsyslogd is not compiled " - "with compression support - request ignored."); -# endif /* #ifdef USE_NETZIP */ - } else { /* invalid option! Just skip it... */ - errmsg.LogError(0, NO_ERRCODE, "Invalid option %c in forwarding action - ignoring.", *p); - ++p; /* eat invalid option */ - } - /* the option processing is done. We now do a generic skip - * to either the next option or the end of the option - * block. - */ - while(*p && *p != ')' && *p != ',') - ++p; /* just skip it */ - } while(*p && *p == ','); /* Attention: do.. while() */ - if(*p == ')') - ++p; /* eat terminator, on to next */ - else - /* we probably have end of string - leave it for the rest - * of the code to handle it (but warn the user) - */ - errmsg.LogError(0, NO_ERRCODE, "Option block not terminated in forwarding action."); - } /* extract the host first (we do a trick - we replace the ';' or ':' with a '\0') * now skip to port and then template name. rgerhards 2005-07-06 */ @@ -384,7 +378,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) ++p; } - /* TODO: make this if go away! */ if(*p == ';') { *p = '\0'; /* trick to obtain hostname (later)! */ CHKmalloc(pData->target = ustrdup(q)); diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README index ccc96c74..c2a33555 100644 --- a/plugins/omzmq3/README +++ b/plugins/omzmq3/README @@ -1,16 +1,10 @@ ZeroMQ 3.x Output Plugin Building this plugin: -Requires libzmq and libczmq. First, install libzmq from the HEAD on github: -http://github.com/zeromq/libzmq. You can clone the repository, build, then -install it. The directions for doing so are there in the readme. Then, do -the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 -version of libzmq will be released, and a supporting version of libczmq. -At that time, you could simply download and install the tarballs instead of -using git to clone the repositories. Those tarballs (when available) can -be found at http://download.zeromq.org. As of this writing (5/31/2012), the -most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile -properly. +Requires libzmq and libczmq. First, download the tarballs of both libzmq +and its supporting libczmq from http://download.zeromq.org. As of this +writing (04/23/2013), the most recent versions of libzmq and czmq are +3.2.2 and 1.3.2 respectively. Configure, build, and then install both libs. Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example below binds a PUB socket to port 7171, and any message fitting the criteria will diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c index ee6756b9..c8552f11 100644 --- a/plugins/omzmq3/omzmq3.c +++ b/plugins/omzmq3/omzmq3.c @@ -110,9 +110,10 @@ static zctx_t* s_context = NULL; static int s_workerThreads = -1; static struct socket_type types[] = { - {"PUB", ZMQ_PUB }, - {"PUSH", ZMQ_PUSH }, - {"XPUB", ZMQ_XPUB } + {"PUB", ZMQ_PUB }, + {"PUSH", ZMQ_PUSH }, + {"DEALER", ZMQ_DEALER }, + {"XPUB", ZMQ_XPUB } }; static struct socket_action actions[] = { @@ -201,17 +202,18 @@ static rsRetVal initZMQ(instanceData* pData) { /* create the context if necessary. */ if (NULL == s_context) { + zsys_handler_set(NULL); s_context = zctx_new(); if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); } pData->socket = zsocket_new(s_context, pData->type); - - /* ALWAYS set the HWM as the zmq3 default is 1000 and we default - to 0 (infinity) */ - zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); - zsocket_set_sndhwm(pData->socket, pData->sndHWM); - + if (NULL == pData->socket) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, + "omzmq3: zsocket_new failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); + } /* use czmq defaults for these, unless set to non-default values */ if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); @@ -228,17 +230,26 @@ static rsRetVal initZMQ(instanceData* pData) { if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax); if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); - + if(pData->rcvHWM > -1) zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); + if(pData->sndHWM > -1) zsocket_set_sndhwm(pData->socket, pData->sndHWM); + /* bind or connect to it */ if (pData->action == ACTION_BIND) { /* bind asserts, so no need to test return val here which isn't the greatest api -- oh well */ - zsocket_bind(pData->socket, (char*)pData->description); + if(-1 == zsocket_bind(pData->socket, (char*)pData->description)) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: bind failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); + } + DBGPRINTF("omzmq3: bind to %s successful\n",pData->description); } else { - if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { - errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!"); - ABORT_FINALIZE(RS_RET_SUSPENDED); + if(-1 == zsocket_connect(pData->socket, (char*)pData->description)) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: connect failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); } + DBGPRINTF("omzmq3: connect to %s successful", pData->description); } finalize_it: RETiRet; @@ -256,7 +267,7 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) { /* whine if things went wrong */ if (result == -1) { - errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); + errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed: %s", msg, zmq_strerror(errno)); ABORT_FINALIZE(RS_RET_ERR); } finalize_it: @@ -265,13 +276,13 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) { static inline void setInstParamDefaults(instanceData* pData) { - pData->description = (uchar*)"tcp://*:7171"; + pData->description = NULL; pData->socket = NULL; pData->tplName = NULL; pData->type = ZMQ_PUB; pData->action = ACTION_BIND; - pData->sndHWM = 0; /*unlimited*/ - pData->rcvHWM = 0; /*unlimited*/ + pData->sndHWM = -1; + pData->rcvHWM = -1; pData->identity = NULL; pData->sndBuf = -1; pData->rcvBuf = -1; @@ -314,6 +325,7 @@ CODESTARTfreeInstance closeZMQ(pData); free(pData->description); free(pData->tplName); + free(pData->identity); ENDfreeInstance BEGINtryResume @@ -329,88 +341,90 @@ ENDdoAction BEGINnewActInst - struct cnfparamvals *pvals; - int i; + struct cnfparamvals *pvals; + int i; CODESTARTnewActInst -if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { - ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); - } + if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); CODE_STD_STRING_REQUESTnewActInst(1) -for(i = 0 ; i < actpblk.nParams ; ++i) { - if(!pvals[i].bUsed) - continue; - if(!strcmp(actpblk.descr[i].name, "description")) { - pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "template")) { - pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "sockType")){ - pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); - } else if(!strcmp(actpblk.descr[i].name, "action")){ - pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); - } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { - pData->sndHWM = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { - pData->rcvHWM = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "identity")){ - pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { - pData->sndBuf = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { - pData->rcvBuf = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "linger")) { - pData->linger = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "backlog")) { - pData->backlog = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { - pData->sndTimeout = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { - pData->rcvTimeout = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { - pData->maxMsgSize = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rate")) { - pData->rate = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { - pData->recoveryIVL = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { - pData->multicastHops = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { - pData->reconnectIVL = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { - pData->reconnectIVLMax = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { - pData->ipv4Only = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "affinity")) { - pData->affinity = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { - s_workerThreads = (int) pvals[i].val.d.n; - } else { - errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " - "param '%s'\n", actpblk.descr[i].name); + for (i = 0; i < actpblk.nParams; ++i) { + if (!pvals[i].bUsed) + continue; + if (!strcmp(actpblk.descr[i].name, "description")) { + pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "sockType")){ + pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if (!strcmp(actpblk.descr[i].name, "action")){ + pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if (!strcmp(actpblk.descr[i].name, "sndHWM")) { + pData->sndHWM = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvHWM")) { + pData->rcvHWM = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "identity")){ + pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "sndBuf")) { + pData->sndBuf = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvBuf")) { + pData->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "linger")) { + pData->linger = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "backlog")) { + pData->backlog = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "sndTimeout")) { + pData->sndTimeout = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvTimeout")) { + pData->rcvTimeout = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "maxMsgSize")) { + pData->maxMsgSize = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rate")) { + pData->rate = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "recoveryIVL")) { + pData->recoveryIVL = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "multicastHops")) { + pData->multicastHops = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "reconnectIVL")) { + pData->reconnectIVL = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { + pData->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "ipv4Only")) { + pData->ipv4Only = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "affinity")) { + pData->affinity = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { + s_workerThreads = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } } - } - -if(pData->tplName == NULL) { - CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); - } else { - CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); - } - -if(pData->type == -1) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } -if(pData->action == -1) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } + if (pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup("RSYSLOG_ForwardFormat"), OMSR_NO_RQD_TPL_OPTS)); + } else { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); + } + if (NULL == pData->description) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: you didn't enter a description"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if (pData->type == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if (pData->action == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } CODE_STD_FINALIZERnewActInst - cnfparamvalsDestruct(pvals, &actpblk); + cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst BEGINparseSelectorAct @@ -433,10 +447,10 @@ ENDinitConfVars BEGINmodExit CODESTARTmodExit -if(NULL != s_context) { - zctx_destroy(&s_context); - s_context=NULL; - } + if (NULL != s_context) { + zctx_destroy(&s_context); + s_context=NULL; + } ENDmodExit |