summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imgssapi/imgssapi.c21
-rwxr-xr-x[-rw-r--r--]plugins/imjournal/imjournal.c153
-rw-r--r--plugins/imrelp/imrelp.c86
-rw-r--r--plugins/imuxsock/imuxsock.c4
-rw-r--r--plugins/imzmq3/README59
-rw-r--r--plugins/imzmq3/imzmq3.c781
-rw-r--r--plugins/mmaudit/Makefile.am4
-rw-r--r--plugins/mmaudit/mmaudit.c3
-rw-r--r--plugins/mmcount/Makefile.am8
-rw-r--r--plugins/mmcount/mmcount.c342
-rw-r--r--plugins/mmfields/Makefile.am8
-rw-r--r--plugins/mmfields/mmfields.c265
-rw-r--r--plugins/mmjsonparse/Makefile.am4
-rw-r--r--plugins/mmjsonparse/mmjsonparse.c1
-rw-r--r--plugins/omjournal/omjournal.c2
-rw-r--r--plugins/omrelp/omrelp.c115
-rw-r--r--plugins/omzmq3/README14
-rw-r--r--plugins/omzmq3/omzmq3.c202
18 files changed, 1577 insertions, 495 deletions
diff --git a/plugins/imgssapi/imgssapi.c b/plugins/imgssapi/imgssapi.c
index 4e3a70ab..15d994cc 100644
--- a/plugins/imgssapi/imgssapi.c
+++ b/plugins/imgssapi/imgssapi.c
@@ -72,6 +72,7 @@ MODULE_CNFNAME("imgssapi")
/* some forward definitions - they may go away when we no longer include imtcp.c */
static rsRetVal addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal);
+static rsRetVal actGSSListener(uchar *port);
static int TCPSessGSSInit(void);
static void TCPSessGSSClose(tcps_sess_t* pSess);
static rsRetVal TCPSessGSSRecv(tcps_sess_t *pSess, void *buf, size_t buf_len, ssize_t *);
@@ -90,6 +91,7 @@ DEFobjCurrIf(glbl)
static tcpsrv_t *pOurTcpsrv = NULL; /* our TCP server(listener) TODO: change for multiple instances */
static gss_cred_id_t gss_server_creds = GSS_C_NO_CREDENTIAL;
+static uchar *srvPort;
/* our usr structure for the tcpsrv object */
typedef struct gsssrv_s {
@@ -317,6 +319,16 @@ static rsRetVal
addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal)
{
DEFiRet;
+
+ srvPort = pNewVal;
+
+ RETiRet;
+}
+
+static rsRetVal
+actGSSListener(uchar *port)
+{
+ DEFiRet;
gsssrv_t *pGSrv;
if(pOurTcpsrv == NULL) {
@@ -340,7 +352,7 @@ addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal)
CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose));
CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose));
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, UCHAR_CONSTANT("imgssapi")));
- tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal, 1);
+ tcpsrv.configureTCPListen(pOurTcpsrv, port, 1);
CHKiRet(tcpsrv.ConstructFinalize(pOurTcpsrv));
}
@@ -676,6 +688,11 @@ ENDfreeCnf
*/
BEGINrunInput
CODESTARTrunInput
+ /* This will fail if the priviledges are dropped. Should be
+ * moved to the '*activateCnfPrePrivDrop' section eventually.
+ */
+ actGSSListener(srvPort);
+
iRet = tcpsrv.Run(pOurTcpsrv);
ENDrunInput
@@ -683,7 +700,7 @@ ENDrunInput
/* initialize and return if will run or not */
BEGINwillRun
CODESTARTwillRun
- if(pOurTcpsrv == NULL)
+ if(srvPort == NULL)
ABORT_FINALIZE(RS_RET_NO_RUN);
net.PrintAllowedSenders(2); /* TCP */
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
index 2af1958e..cce45b9c 100644..100755
--- a/plugins/imjournal/imjournal.c
+++ b/plugins/imjournal/imjournal.c
@@ -30,7 +30,9 @@
#include <ctype.h>
#include <stdlib.h>
#include <time.h>
+#include <sys/poll.h>
#include <sys/socket.h>
+#include <errno.h>
#include "dirty.h"
#include "cfsysline.h"
@@ -43,6 +45,7 @@
#include "glbl.h"
#include "prop.h"
#include "errmsg.h"
+#include "srUtils.h"
#include "unicode-helper.h"
#include <systemd/sd-journal.h>
@@ -136,16 +139,20 @@ readjournal() {
uint64_t timestamp;
struct json_object *json = NULL;
+ int r;
/* Information from messages */
char *message;
+ char *sys_pid;
char *sys_iden;
char *sys_iden_help;
const void *get;
+ const void *pidget;
char *parse;
char *get2;
size_t length;
+ size_t pidlength;
const void *equal_sign;
struct json_object *jval;
@@ -158,13 +165,6 @@ readjournal() {
int priority = 0;
int facility = 0;
- /* Get next journal message, if there is none, wait a second */
- if (sd_journal_next(j) == 0) {
- sleep(1);
- iRet = RS_RET_OK;
- goto ret;
- }
-
/* Get message text */
if (sd_journal_get_data(j, "MESSAGE", &get, &length) < 0) {
logmsgInternal(NO_ERRCODE, LOG_SYSLOG|LOG_INFO, (uchar *)"log message from journal doesn't have MESSAGE", 0);
@@ -203,7 +203,7 @@ readjournal() {
goto free_message;
}
- /* Get message identifier and add ':' */
+ /* Get message identifier, client pid and add ':' */
if (sd_journal_get_data(j, "SYSLOG_IDENTIFIER", &get, &length) >= 0) {
sys_iden = strndup(get+18, length-18);
} else {
@@ -214,19 +214,44 @@ readjournal() {
goto free_message;
}
- asprintf(&sys_iden_help, "%s:", sys_iden);
- if (sys_iden_help == NULL) {
+ if (sd_journal_get_data(j, "SYSLOG_PID", &pidget, &pidlength) >= 0) {
+ sys_pid = strndup(pidget+11, pidlength-11);
+ if (sys_pid == NULL) {
+ iRet = RS_RET_OUT_OF_MEMORY;
+ free (sys_iden);
+ goto free_message;
+ }
+ } else {
+ sys_pid = NULL;
+ }
+
+ if (sys_pid) {
+ r = asprintf(&sys_iden_help, "%s[%s]:", sys_iden, sys_pid);
+ } else {
+ r = asprintf(&sys_iden_help, "%s:", sys_iden);
+ }
+
+ free (sys_iden);
+ free (sys_pid);
+
+ if (-1 == r) {
iRet = RS_RET_OUT_OF_MEMORY;
goto finalize_it;
}
- free (sys_iden);
json = json_object_new_object();
SD_JOURNAL_FOREACH_DATA(j, get, l) {
/* locate equal sign, this is always present */
equal_sign = memchr(get, '=', l);
- assert (equal_sign != NULL);
+
+ /* ... but we know better than to trust the specs */
+ if (equal_sign == NULL) {
+ errmsg.LogError(0, RS_RET_ERR,"SD_JOURNAL_FOREACH_DATA()"
+ " returned a malformed field (has no '='): '%s'",
+ get);
+ continue; /* skip the entry */
+ }
/* get length of journal data prefix */
prefixlen = ((char *)equal_sign - (char *)get);
@@ -337,7 +362,9 @@ persistJournalState () {
char *cursor;
int ret = 0;
- if ((ret = sd_journal_get_cursor(j, &cursor)) > 0) {
+ /* On success, sd_journal_get_cursor() returns 1 in systemd
+ 197 or older and 0 in systemd 198 or newer */
+ if ((ret = sd_journal_get_cursor(j, &cursor)) >= 0) {
if ((sf = fopen(cs.stateFile, "wb")) != NULL) {
if (fprintf(sf, "%s", cursor) < 0) {
iRet = RS_RET_IO_ERROR;
@@ -345,29 +372,92 @@ persistJournalState () {
fclose(sf);
free(cursor);
} else {
+ char errStr[256];
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "fopen() failed: "
+ "'%s', path: '%s'\n", errStr, cs.stateFile);
iRet = RS_RET_FOPEN_FAILURE;
}
} else {
+ char errStr[256];
+ rs_strerror_r(-(ret), errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR, "sd_journal_get_cursor() failed: '%s'\n", errStr);
iRet = RS_RET_ERR;
}
RETiRet;
}
+/* Polls the journal for new messages. Similar to sd_journal_wait()
+ * except for the special handling of EINTR.
+ */
+static rsRetVal
+pollJournal()
+{
+ DEFiRet;
+ struct pollfd pollfd;
+ int r;
+
+ pollfd.fd = sd_journal_get_fd(j);
+ pollfd.events = sd_journal_get_events(j);
+ r = poll(&pollfd, 1, -1);
+ if (r == -1) {
+ if (errno == EINTR) {
+ /* EINTR is also received during termination
+ * so return now to check the term state.
+ */
+ ABORT_FINALIZE(RS_RET_OK);
+ } else {
+ char errStr[256];
+
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR,
+ "poll() failed: '%s'", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+
+ assert(r == 1);
+
+ r = sd_journal_process(j);
+ if (r < 0) {
+ char errStr[256];
+
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR,
+ "sd_journal_process() failed: '%s'", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
BEGINrunInput
CODESTARTrunInput
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
*/
- int count = 0;
+ if (cs.stateFile[0] != '/') {
+ char *new_stateFile;
- char readCursor[128 + 1];
- FILE *r_sf;
+ if (-1 == asprintf(&new_stateFile, "%s/%s", (char *)glbl.GetWorkDir(), cs.stateFile)) {
+ errmsg.LogError(0, RS_RET_OUT_OF_MEMORY, "imjournal: asprintf failed\n");
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ free (cs.stateFile);
+ cs.stateFile = new_stateFile;
+ }
/* if state file exists, set cursor to appropriate position */
if (access(cs.stateFile, F_OK|R_OK) != -1) {
+ FILE *r_sf;
+
if ((r_sf = fopen(cs.stateFile, "rb")) != NULL) {
+ char readCursor[128 + 1];
+
if (fscanf(r_sf, "%128s\n", readCursor) != EOF) {
if (sd_journal_seek_cursor(j, readCursor) != 0) {
errmsg.LogError(0, RS_RET_ERR, "imjournal: "
@@ -390,14 +480,32 @@ CODESTARTrunInput
}
while (glbl.GetGlobalInputTermState() == 0) {
+ int count = 0, r;
+
+ r = sd_journal_next(j);
+ if (r < 0) {
+ char errStr[256];
+
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR,
+ "sd_journal_next() failed: '%s'", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if (r == 0) {
+ /* No new messages, wait for activity. */
+ CHKiRet(pollJournal());
+ continue;
+ }
+
CHKiRet(readjournal());
+ /* TODO: This could use some finer metric. */
count++;
if (count == cs.iPersistStateInterval) {
count = 0;
persistJournalState();
}
}
- persistJournalState();
finalize_it:
ENDrunInput
@@ -444,6 +552,7 @@ ENDwillRun
/* close journal */
BEGINafterRun
CODESTARTafterRun
+ persistJournalState();
sd_journal_close(j);
ENDafterRun
@@ -500,16 +609,22 @@ finalize_it:
ENDsetModCnf
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
-
-
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c
index 5e0ae552..7fa98617 100644
--- a/plugins/imrelp/imrelp.c
+++ b/plugins/imrelp/imrelp.c
@@ -4,7 +4,7 @@
*
* File begun on 2008-03-13 by RGerhards
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -74,6 +74,8 @@ static struct configSettings_s {
struct instanceConf_s {
uchar *pszBindPort; /* port to bind to */
+ sbool bEnableTLS;
+ sbool bEnableTLSZip;
struct instanceConf_s *next;
};
@@ -88,9 +90,21 @@ struct modConfData_s {
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "ruleset", eCmdHdlrGetWord, 0 },
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
+
/* input instance parameters */
static struct cnfparamdescr inppdescr[] = {
- { "port", eCmdHdlrString, CNFPARAM_REQUIRED }
+ { "port", eCmdHdlrString, CNFPARAM_REQUIRED },
+ { "tls", eCmdHdlrBinary, 0 },
+ { "tls.compression", eCmdHdlrBinary, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -155,6 +169,8 @@ createInstance(instanceConf_t **pinst)
inst->next = NULL;
inst->pszBindPort = NULL;
+ inst->bEnableTLS = 0;
+ inst->bEnableTLSZip = 0;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -179,7 +195,7 @@ std_checkRuleset_genErrMsg(modConfData_t *modConf, __attribute__((unused)) insta
}
-/* This function is called when a new listener instace shall be added to
+/* This function is called when a new listener instance shall be added to
* the current config object via the legacy config system. It just shuffles
* all parameters to the listener in-memory instance.
* rgerhards, 2011-05-04
@@ -204,6 +220,7 @@ finalize_it:
static rsRetVal
addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
{
+ relpSrv_t *pSrv;
DEFiRet;
if(pRelpEngine == NULL) {
CHKiRet(relpEngineConstruct(&pRelpEngine));
@@ -216,7 +233,15 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
}
}
- CHKiRet(relpEngineAddListner(pRelpEngine, inst->pszBindPort));
+ CHKiRet(relpEngineListnerConstruct(pRelpEngine, &pSrv));
+ CHKiRet(relpSrvSetLstnPort(pSrv, inst->pszBindPort));
+ if(inst->bEnableTLS) {
+ relpSrvEnableTLS(pSrv);
+ if(inst->bEnableTLSZip) {
+ relpSrvEnableTLSZip(pSrv);
+ }
+ }
+ CHKiRet(relpEngineListnerConstructFinalize(pRelpEngine, pSrv));
finalize_it:
RETiRet;
@@ -249,6 +274,10 @@ CODESTARTnewInpInst
continue;
if(!strcmp(inppblk.descr[i].name, "port")) {
inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls")) {
+ inst->bEnableTLS = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "tls.compression")) {
+ inst->bEnableTLSZip = (unsigned) pvals[i].val.d.n;
} else {
dbgprintf("imrelp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -264,19 +293,58 @@ BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
loadModConf = pModConf;
pModConf->pConf = pConf;
+ pModConf->pszBindRuleset = NULL;
+ pModConf->pBindRuleset = NULL;
/* init legacy config variables */
cs.pszBindRuleset = NULL;
ENDbeginCnfLoad
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module "
+ "config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for imrelp:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "ruleset")) {
+ loadModConf->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("imrelp: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
BEGINendCnfLoad
CODESTARTendCnfLoad
- if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
- loadModConf->pszBindRuleset = NULL;
+ if(loadModConf->pszBindRuleset == NULL) {
+ if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
+ loadModConf->pszBindRuleset = NULL;
+ } else {
+ CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset));
+ }
} else {
- CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset));
+ if((cs.pszBindRuleset != NULL) && (cs.pszBindRuleset[0] != '\0')) {
+ errmsg.LogError(0, RS_RET_DUP_PARAM, "imrelp: warning: ruleset "
+ "set via legacy directive ignored");
+ }
}
- loadModConf->pBindRuleset = NULL;
finalize_it:
free(cs.pszBindRuleset);
loadModConf = NULL; /* done loading */
@@ -293,6 +361,7 @@ CODESTARTcheckCnf
if(pModConf->pszBindRuleset == NULL) {
pModConf->pBindRuleset = NULL;
} else {
+ DBGPRINTF("imrelp: using ruleset '%s'\n", pModConf->pszBindRuleset);
localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, pModConf->pszBindRuleset);
if(localRet == RS_RET_NOT_FOUND) {
std_checkRuleset_genErrMsg(pModConf, NULL);
@@ -420,6 +489,7 @@ CODEqueryEtryPt_STD_IMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index c503852c..dad09ab4 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -1284,6 +1284,8 @@ BEGINactivateCnfPrePrivDrop
instanceConf_t *inst;
CODESTARTactivateCnfPrePrivDrop
runModConf = pModConf;
+ if(runModConf->bOmitLocalLogging && nfd == 1)
+ ABORT_FINALIZE(RS_RET_OK);
for(inst = runModConf->root ; inst != NULL ; inst = inst->next) {
addListner(inst);
}
@@ -1325,6 +1327,8 @@ BEGINrunInput
#endif
CODESTARTrunInput
+ if(runModConf->bOmitLocalLogging && nfd == 1)
+ ABORT_FINALIZE(RS_RET_OK);
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README
index 88653b83..9a108a01 100644
--- a/plugins/imzmq3/README
+++ b/plugins/imzmq3/README
@@ -1,24 +1,59 @@
ZeroMQ 3.x Input Plugin
Building this plugin:
-Requires libzmq and libczmq. First, install libzmq from the HEAD on github:
-http://github.com/zeromq/libzmq. You can clone the repository, build, then
-install it. The directions for doing so are there in the readme. Then, do
-the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1
-version of libzmq will be released, and a supporting version of libczmq.
-At that time, you could simply download and install the tarballs instead of
-using git to clone the repositories. Those tarballs (when available) can
-be found at http://download.zeromq.org. As of this writing (5/31/2012), the
-most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile
-properly.
+Requires libzmq and libczmq. First, download the tarballs of both libzmq
+and its supporting libczmq from http://download.zeromq.org. As of this
+writing (04/23/2013), the most recent versions of libzmq and czmq are
+3.2.2 and 1.3.2 respectively. Configure, build, and then install both libs.
Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example
below binds a SUB socket to port 7172, and then any messages with the topic
"foo" will be pushed into rsyslog.
+Please note:
+This plugin only supports the newer (v7) config format. Legacy config support
+was removed.
+
Example Rsyslog.conf snippet:
-------------------------------------------------------------------------------
-
-$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo
+module(load="imzmq3" ioThreads="1")
+input(type="imzmq3" action="CONNECT" socktype="SUB" description="tcp://*:7172" subscribe="foo,bar")
-------------------------------------------------------------------------------
+Note you can specify multiple subscriptions with a comma-delimited list, with
+no spaces between values.
+
+The only global parameter for this plugin is ioThreads, which is optional and
+probably best left to the zmq default unless you know exactly what you are
+doing.
+
+The instance-level parameters are:
+
+Required
+description
+subscribe (required if the sockType is SUB)
+
+Optional
+sockType (defaults to SUB)
+action (defaults to BIND
+sndHWM
+rcvHWM
+identity
+sndBuf
+rcvBuf
+linger
+backlog
+sndTimeout
+rcvTimeout
+maxMsgSize
+rate
+recoveryIVL
+multicastHops
+reconnectIVL
+reconnectIVLMax
+ipv4Only
+affinity
+
+These all correspond to zmq optional settings. Except where noted, the defaults
+are the zmq defaults if not set. See http://api.zeromq.org/3-2:zmq-setsockopt
+for info on these.
diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c
index 52c12a53..08b1dbe4 100644
--- a/plugins/imzmq3/imzmq3.c
+++ b/plugins/imzmq3/imzmq3.c
@@ -19,20 +19,21 @@
* License along with this program. If not, see
* <http://www.gnu.org/licenses/>.
*
- * Author: David Kelly
- * <davidk@talksum.com>
+ * Authors:
+ * David Kelly <davidk@talksum.com>
+ * Hongfei Cheng <hongfeic@talksum.com>
*/
+
+#include "config.h"
+#include "rsyslog.h"
+
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-
-#include "rsyslog.h"
-
#include "cfsysline.h"
-#include "config.h"
#include "dirty.h"
#include "errmsg.h"
#include "glbl.h"
@@ -49,6 +50,7 @@
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("imzmq3");
/* convienent symbols to denote a socket we want to bind
* vs one we want to just connect to
@@ -83,47 +85,67 @@ typedef struct _poller_data {
thrdInfo_t* thread;
} poller_data;
-typedef struct _socket_info {
- int type;
- int action;
- char* description;
- int sndHWM; /* if you want more than 2^32 messages, */
- int rcvHWM; /* then pass in 0 (the default). */
- char* identity;
- char** subscriptions;
- ruleset_t* ruleset;
- int sndBuf;
- int rcvBuf;
- int linger;
- int backlog;
- int sndTimeout;
- int rcvTimeout;
- int maxMsgSize;
- int rate;
- int recoveryIVL;
- int multicastHops;
- int reconnectIVL;
- int reconnectIVLMax;
- int ipv4Only;
- int affinity;
-
-} socket_info;
+/* a linked-list of subscription topics */
+typedef struct sublist_t {
+ char* subscribe;
+ struct sublist_t* next;
+} sublist;
+
+struct instanceConf_s {
+ int type;
+ int action;
+ char* description;
+ int sndHWM; /* if you want more than 2^32 messages, */
+ int rcvHWM; /* then pass in 0 (the default). */
+ char* identity;
+ sublist* subscriptions;
+ int sndBuf;
+ int rcvBuf;
+ int linger;
+ int backlog;
+ int sndTimeout;
+ int rcvTimeout;
+ int maxMsgSize;
+ int rate;
+ int recoveryIVL;
+ int multicastHops;
+ int reconnectIVL;
+ int reconnectIVLMax;
+ int ipv4Only;
+ int affinity;
+ uchar* pszBindRuleset;
+ ruleset_t* pBindRuleset;
+ struct instanceConf_s* next;
+
+};
+
+struct modConfData_s {
+ rsconf_t* pConf;
+ instanceConf_t* root;
+ instanceConf_t* tail;
+ int io_threads;
+};
+struct lstn_s {
+ struct lstn_s* next;
+ void* sock;
+ ruleset_t* pRuleset;
+};
/* ----------------------------------------------------------------------------
* Static definitions/initializations.
*/
-static socket_info* s_socketInfo = NULL;
-static size_t s_nitems = 0;
-static prop_t * s_namep = NULL;
+static modConfData_t* runModConf = NULL;
+static struct lstn_s* lcnfRoot = NULL;
+static struct lstn_s* lcnfLast = NULL;
+static prop_t* s_namep = NULL;
static zloop_t* s_zloop = NULL;
-static int s_io_threads = 1;
static zctx_t* s_context = NULL;
-static ruleset_t* s_ruleset = NULL;
static socket_type socketTypes[] = {
- {"SUB", ZMQ_SUB },
- {"PULL", ZMQ_PULL },
- {"XSUB", ZMQ_XSUB }
+ {"SUB", ZMQ_SUB },
+ {"PULL", ZMQ_PULL },
+ {"ROUTER", ZMQ_ROUTER },
+ {"XSUB", ZMQ_XSUB }
};
static socket_action socketActions[] = {
@@ -131,6 +153,48 @@ static socket_action socketActions[] = {
{"CONNECT", ACTION_CONNECT},
};
+static struct cnfparamdescr modpdescr[] = {
+ { "ioThreads", eCmdHdlrInt, 0 },
+};
+
+static struct cnfparamblk modpblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+};
+
+static struct cnfparamdescr inppdescr[] = {
+ { "description", eCmdHdlrGetWord, 0 },
+ { "sockType", eCmdHdlrGetWord, 0 },
+ { "subscribe", eCmdHdlrGetWord, 0 },
+ { "ruleset", eCmdHdlrGetWord, 0 },
+ { "action", eCmdHdlrGetWord, 0 },
+ { "sndHWM", eCmdHdlrInt, 0 },
+ { "rcvHWM", eCmdHdlrInt, 0 },
+ { "identity", eCmdHdlrGetWord, 0 },
+ { "sndBuf", eCmdHdlrInt, 0 },
+ { "rcvBuf", eCmdHdlrInt, 0 },
+ { "linger", eCmdHdlrInt, 0 },
+ { "backlog", eCmdHdlrInt, 0 },
+ { "sndTimeout", eCmdHdlrInt, 0 },
+ { "rcvTimeout", eCmdHdlrInt, 0 },
+ { "maxMsgSize", eCmdHdlrInt, 0 },
+ { "rate", eCmdHdlrInt, 0 },
+ { "recoveryIVL", eCmdHdlrInt, 0 },
+ { "multicastHops", eCmdHdlrInt, 0 },
+ { "reconnectIVL", eCmdHdlrInt, 0 },
+ { "reconnectIVLMax", eCmdHdlrInt, 0 },
+ { "ipv4Only", eCmdHdlrInt, 0 },
+ { "affinity", eCmdHdlrInt, 0 }
+};
+
+static struct cnfparamblk inppblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(inppdescr)/sizeof(struct cnfparamdescr),
+ inppdescr
+};
+
+#include "im-helper.h" /* must be included AFTER the type definitions! */
/* ----------------------------------------------------------------------------
* Helper functions
@@ -179,15 +243,16 @@ static int getSocketAction(char* name) {
}
-static void setDefaults(socket_info* info) {
- info->type = ZMQ_SUB;
- info->action = ACTION_BIND;
+static void setDefaults(instanceConf_t* info) {
+ info->type = -1;
+ info->action = -1;
info->description = NULL;
- info->sndHWM = 0;
- info->rcvHWM = 0;
+ info->sndHWM = -1;
+ info->rcvHWM = -1;
info->identity = NULL;
info->subscriptions = NULL;
- info->ruleset = NULL;
+ info->pszBindRuleset = NULL;
+ info->pBindRuleset = NULL;
info->sndBuf = -1;
info->rcvBuf = -1;
info->linger = -1;
@@ -202,93 +267,49 @@ static void setDefaults(socket_info* info) {
info->reconnectIVLMax = -1;
info->ipv4Only = -1;
info->affinity = -1;
-
+ info->next = NULL;
};
-
-/* The config string should look like:
- * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'"
- *
+/* given a comma separated list of subscriptions, create a char* array of them
+ * to set later
*/
-static rsRetVal parseConfig(char* config, socket_info* info) {
- int nsubs = 0;
-
- char* binding;
- char* ptr1;
- for (binding = strtok_r(config, ",", &ptr1);
- binding != NULL;
- binding = strtok_r(NULL, ",", &ptr1)) {
-
- /* Each binding looks like foo=bar */
- char * sep = strchr(binding, '=');
- if (sep == NULL)
- {
- errmsg.LogError(0, NO_ERRCODE,
- "Invalid argument format %s, ignoring ...",
- binding);
- continue;
- }
+static rsRetVal parseSubscriptions(char* subscribes, sublist** subList){
+ char* tok = strtok(subscribes, ",");
+ sublist* currentSub;
+ sublist* head;
+ DEFiRet;
- /* Replace '=' with '\0'. */
- *sep = '\0';
-
- char * val = sep + 1;
-
- if (strcmp(binding, "action") == 0) {
- info->action = getSocketAction(val);
- } else if (strcmp(binding, "type") == 0) {
- info->type = getSocketType(val);
- } else if (strcmp(binding, "description") == 0) {
- info->description = strdup(val);
- } else if (strcmp(binding, "sndHWM") == 0) {
- info->sndHWM = atoi(val);
- } else if (strcmp(binding, "rcvHWM") == 0) {
- info->sndHWM = atoi(val);
- } else if (strcmp(binding, "subscribe") == 0) {
- /* Add the subscription value to the list.*/
- char * substr = NULL;
- substr = strdup(val);
- info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1);
- info->subscriptions[nsubs] = substr;
- ++nsubs;
- } else if (strcmp(binding, "sndBuf") == 0) {
- info->sndBuf = atoi(val);
- } else if (strcmp(binding, "rcvBuf") == 0) {
- info->rcvBuf = atoi(val);
- } else if (strcmp(binding, "linger") == 0) {
- info->linger = atoi(val);
- } else if (strcmp(binding, "backlog") == 0) {
- info->backlog = atoi(val);
- } else if (strcmp(binding, "sndTimeout") == 0) {
- info->sndTimeout = atoi(val);
- } else if (strcmp(binding, "rcvTimeout") == 0) {
- info->rcvTimeout = atoi(val);
- } else if (strcmp(binding, "maxMsgSize") == 0) {
- info->maxMsgSize = atoi(val);
- } else if (strcmp(binding, "rate") == 0) {
- info->rate = atoi(val);
- } else if (strcmp(binding, "recoveryIVL") == 0) {
- info->recoveryIVL = atoi(val);
- } else if (strcmp(binding, "multicastHops") == 0) {
- info->multicastHops = atoi(val);
- } else if (strcmp(binding, "reconnectIVL") == 0) {
- info->reconnectIVL = atoi(val);
- } else if (strcmp(binding, "reconnectIVLMax") == 0) {
- info->reconnectIVLMax = atoi(val);
- } else if (strcmp(binding, "ipv4Only") == 0) {
- info->ipv4Only = atoi(val);
- } else if (strcmp(binding, "affinity") == 0) {
- info->affinity = atoi(val);
- } else {
- errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
- return RS_RET_INVALID_PARAMS;
+ /* create empty list */
+ CHKmalloc(*subList = (sublist*)MALLOC(sizeof(sublist)));
+ head = *subList;
+ head->next = NULL;
+ head->subscribe=NULL;
+ currentSub=head;
+
+ if(tok) {
+ head->subscribe=strdup(tok);
+ for(tok=strtok(NULL, ","); tok!=NULL;tok=strtok(NULL, ",")) {
+ CHKmalloc(currentSub->next = (sublist*)MALLOC(sizeof(sublist)));
+ currentSub=currentSub->next;
+ currentSub->subscribe=strdup(tok);
+ currentSub->next=NULL;
}
+ } else {
+ /* make empty subscription ie subscribe="" */
+ head->subscribe=strdup("");
}
-
- return RS_RET_OK;
+ /* TODO: temporary logging */
+ currentSub = head;
+ DBGPRINTF("imzmq3: Subscriptions:");
+ for(currentSub = head; currentSub != NULL; currentSub=currentSub->next) {
+ DBGPRINTF("'%s'", currentSub->subscribe);
+ }
+ DBGPRINTF("\n");
+finalize_it:
+ RETiRet;
}
-static rsRetVal validateConfig(socket_info* info) {
+static rsRetVal validateConfig(instanceConf_t* info) {
if (info->type == -1) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
@@ -307,7 +328,7 @@ static rsRetVal validateConfig(socket_info* info) {
}
if(info->type == ZMQ_SUB && info->subscriptions == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
- "SUB sockets need at least one subscription");
+ "SUB sockets need a subscription");
return RS_RET_INVALID_PARAMS;
}
if(info->type != ZMQ_SUB && info->subscriptions != NULL) {
@@ -320,39 +341,40 @@ static rsRetVal validateConfig(socket_info* info) {
static rsRetVal createContext() {
if (s_context == NULL) {
- errmsg.LogError(0, NO_ERRCODE, "creating zctx.");
+ DBGPRINTF("imzmq3: creating zctx...");
+ zsys_handler_set(NULL);
s_context = zctx_new();
if (s_context == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"zctx_new failed: %s",
- strerror(errno));
+ zmq_strerror(errno));
/* DK: really should do better than invalid params...*/
return RS_RET_INVALID_PARAMS;
}
-
- if (s_io_threads > 1) {
- errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads);
- zctx_set_iothreads(s_context, s_io_threads);
+ DBGPRINTF("success!\n");
+ if (runModConf->io_threads > 1) {
+ DBGPRINTF("setting io worker threads to %d\n", runModConf->io_threads);
+ zctx_set_iothreads(s_context, runModConf->io_threads);
}
}
return RS_RET_OK;
}
-static rsRetVal createSocket(socket_info* info, void** sock) {
- size_t ii;
+static rsRetVal createSocket(instanceConf_t* info, void** sock) {
int rv;
+ sublist* sub;
*sock = zsocket_new(s_context, info->type);
if (!sock) {
- errmsg.LogError(0,
+ errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zsocket_new failed: %s, for type %d",
- strerror(errno),info->type);
- /* DK: invalid params seems right here */
+ zmq_strerror(errno),info->type);
+ /* DK: invalid params seems right here */
return RS_RET_INVALID_PARAMS;
}
-
+ DBGPRINTF("imzmq3: socket of type %d created successfully\n", info->type)
/* Set options *before* the connect/bind. */
if (info->identity) zsocket_set_identity(*sock, info->identity);
if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf);
@@ -369,38 +391,36 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax);
if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only);
if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity);
-
- /* since HWM have defaults, we always set them. No return codes to check, either.*/
- zsocket_set_sndhwm(*sock, info->sndHWM);
- zsocket_set_rcvhwm(*sock, info->rcvHWM);
-
+ if (info->sndHWM > -1 ) zsocket_set_sndhwm(*sock, info->sndHWM);
+ if (info->rcvHWM > -1 ) zsocket_set_rcvhwm(*sock, info->rcvHWM);
/* Set subscriptions.*/
if (info->type == ZMQ_SUB) {
- for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii)
- zsocket_set_subscribe(*sock, info->subscriptions[ii]);
+ for(sub = info->subscriptions; sub!=NULL; sub=sub->next) {
+ zsocket_set_subscribe(*sock, sub->subscribe);
+ }
}
-
-
/* Do the bind/connect... */
if (info->action==ACTION_CONNECT) {
rv = zsocket_connect(*sock, info->description);
- if (rv < 0) {
+ if (rv == -1) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zmq_connect using %s failed: %s",
- info->description, strerror(errno));
+ info->description, zmq_strerror(errno));
return RS_RET_INVALID_PARAMS;
}
+ DBGPRINTF("imzmq3: connect for %s successful\n",info->description);
} else {
rv = zsocket_bind(*sock, info->description);
- if (rv <= 0) {
+ if (rv == -1) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zmq_bind using %s failed: %s",
- info->description, strerror(errno));
+ info->description, zmq_strerror(errno));
return RS_RET_INVALID_PARAMS;
}
+ DBGPRINTF("imzmq3: bind for %s successful\n",info->description);
}
return RS_RET_OK;
}
@@ -409,89 +429,138 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
* Module endpoints
*/
-/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note
- * that this makes the assumption that after the bind ruleset is called in the config,
- * another call will be made to add an endpoint.
-*/
-static rsRetVal
-set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) {
- ruleset_t* ruleset_ptr;
- rsRetVal localRet;
- DEFiRet;
-
- localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName);
- if(localRet == RS_RET_NOT_FOUND) {
- errmsg.LogError(0, NO_ERRCODE, "error: "
- "ruleset '%s' not found - ignored", pszName);
- }
- CHKiRet(localRet);
- s_ruleset = ruleset_ptr;
- DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName);
-
-finalize_it:
- free(pszName); /* no longer needed */
- RETiRet;
-}
/* add an actual endpoint
*/
-static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) {
+static rsRetVal createInstance(instanceConf_t** pinst) {
DEFiRet;
+ instanceConf_t* inst;
+ CHKmalloc(inst = MALLOC(sizeof(instanceConf_t)));
- /* increment number of items and store old num items, as it will be handy.*/
- size_t idx = s_nitems++;
-
- /* allocate a new socket_info array to accomidate this new endpoint*/
- socket_info* tmpSocketInfo;
- CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems));
+ /* set defaults into new instance config struct */
+ setDefaults(inst);
- /* copy existing socket_info across into new array, if any, and free old storage*/
- if(idx) {
- memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx);
- free(s_socketInfo);
+ /* add this to the config */
+ if (runModConf->root == NULL || runModConf->tail == NULL) {
+ runModConf->tail = runModConf->root = inst;
+ } else {
+ runModConf->tail->next = inst;
+ runModConf->tail = inst;
}
+ *pinst = inst;
+finalize_it:
+ RETiRet;
+}
- /* set the static to hold the new array */
- s_socketInfo = tmpSocketInfo;
-
- /* point to the new one */
- socket_info* sockInfo = &s_socketInfo[idx];
-
- /* set defaults for the new socket info */
- setDefaults(sockInfo);
-
- /* Make a writeable copy of the string so we can use strtok
- in the parseConfig call */
- char * copy = NULL;
- CHKmalloc(copy = strdup((char *) valp));
+static rsRetVal createListener(struct cnfparamvals* pvals) {
+ instanceConf_t* inst;
+ int i;
+ DEFiRet;
- /* parse the config string */
- CHKiRet(parseConfig(copy, sockInfo));
+ CHKiRet(createInstance(&inst));
+ for(i = 0 ; i < inppblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(inppblk.descr[i].name, "ruleset")) {
+ inst->pszBindRuleset = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "description")) {
+ inst->description = es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "sockType")){
+ inst->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if(!strcmp(inppblk.descr[i].name, "action")){
+ inst->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if(!strcmp(inppblk.descr[i].name, "sndHWM")) {
+ inst->sndHWM = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvHWM")) {
+ inst->rcvHWM = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "subscribe")) {
+ CHKiRet(parseSubscriptions(es_str2cstr(pvals[i].val.d.estr, NULL),
+ &inst->subscriptions));
+ } else if(!strcmp(inppblk.descr[i].name, "identity")){
+ inst->identity = es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "sndBuf")) {
+ inst->sndBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvBuf")) {
+ inst->rcvBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "linger")) {
+ inst->linger = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "backlog")) {
+ inst->backlog = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "sndTimeout")) {
+ inst->sndTimeout = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvTimeout")) {
+ inst->rcvTimeout = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "maxMsgSize")) {
+ inst->maxMsgSize = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rate")) {
+ inst->rate = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "recoveryIVL")) {
+ inst->recoveryIVL = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "multicastHops")) {
+ inst->multicastHops = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "reconnectIVL")) {
+ inst->reconnectIVL = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "reconnectIVLMax")) {
+ inst->reconnectIVLMax = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ipv4Only")) {
+ inst->ipv4Only = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "affinity")) {
+ inst->affinity = (int) pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: program error, non-handled "
+ "param '%s'\n", inppblk.descr[i].name);
+ }
- /* validate it */
- CHKiRet(validateConfig(sockInfo));
+ }
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal addListener(instanceConf_t* inst){
+ /* create the socket */
+ void* sock;
+ struct lstn_s* newcnfinfo;
+ DEFiRet;
- /* bind to the current ruleset (if any)*/
- sockInfo->ruleset = s_ruleset;
+ CHKiRet(createSocket(inst, &sock));
+
+ /* now create new lstn_s struct */
+ CHKmalloc(newcnfinfo=(struct lstn_s*)MALLOC(sizeof(struct lstn_s)));
+ newcnfinfo->next = NULL;
+ newcnfinfo->sock = sock;
+ newcnfinfo->pRuleset = inst->pBindRuleset;
+ /* add this struct to the global */
+ if(lcnfRoot == NULL) {
+ lcnfRoot = newcnfinfo;
+ }
+ if(lcnfLast == NULL) {
+ lcnfLast = newcnfinfo;
+ } else {
+ lcnfLast->next = newcnfinfo;
+ lcnfLast = newcnfinfo;
+ }
+
finalize_it:
- free(valp); /* in any case, this is no longer needed */
- RETiRet;
+ RETiRet;
}
-
static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) {
- msg_t* logmsg;
+ msg_t* pMsg;
poller_data* pollerData = (poller_data*)pd;
char* buf = zstr_recv(poller->socket);
- if (msgConstruct(&logmsg) == RS_RET_OK) {
- MsgSetRawMsg(logmsg, buf, strlen(buf));
- MsgSetInputName(logmsg, s_namep);
- MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY);
- MsgSetRuleset(logmsg, pollerData->ruleset);
- logmsg->msgFlags = NEEDS_PARSING;
- submitMsg(logmsg);
+ if (msgConstruct(&pMsg) == RS_RET_OK) {
+ MsgSetRawMsg(pMsg, buf, strlen(buf));
+ MsgSetInputName(pMsg, s_namep);
+ MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()));
+ MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp());
+ MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP());
+ MsgSetMSGoffs(pMsg, 0);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
+ MsgSetRuleset(pMsg, pollerData->ruleset);
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ submitMsg2(pMsg);
}
/* gotta free the string returned from zstr_recv() */
@@ -510,51 +579,65 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po
/* called when runInput is called by rsyslog
*/
static rsRetVal rcv_loop(thrdInfo_t* pThrd){
+ size_t n_items = 0;
size_t i;
int rv;
- zmq_pollitem_t* items;
- poller_data* pollerData;
-
+ zmq_pollitem_t* items = NULL;
+ poller_data* pollerData = NULL;
+ struct lstn_s* current;
+ instanceConf_t* inst;
DEFiRet;
-
- /* create the context*/
- CHKiRet(createContext());
+ /* now add listeners. This actually creates the sockets, etc... */
+ for (inst = runModConf->root; inst != NULL; inst=inst->next) {
+ addListener(inst);
+ }
+ if (lcnfRoot == NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: no listeners were "
+ "started, input not activated.\n");
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
+
+ /* count the # of items first */
+ for(current=lcnfRoot;current!=NULL;current=current->next)
+ n_items++;
+
+ /* make arrays of pollitems, pollerdata so they are easy to delete later */
+
/* create the poll items*/
- CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems));
+ CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*n_items));
/* create poller data (stuff to pass into the zmq closure called when we get a message)*/
- CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems));
+ CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*n_items));
/* loop through and initialize the poll items and poller_data arrays...*/
- for(i=0; i<s_nitems;++i) {
+ for(i=0, current = lcnfRoot; current != NULL; current = current->next, i++) {
/* create the socket, update items.*/
- createSocket(&s_socketInfo[i], &items[i].socket);
+ items[i].socket=current->sock;
items[i].events = ZMQ_POLLIN;
/* now update the poller_data for this item */
pollerData[i].thread = pThrd;
- pollerData[i].ruleset = s_socketInfo[i].ruleset;
+ pollerData[i].ruleset = current->pRuleset;
}
-
+
s_zloop = zloop_new();
- for(i=0; i<s_nitems; ++i) {
+ for(i=0; i<n_items; ++i) {
rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]);
if (rv) {
- errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i);
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu: %s", i, zmq_strerror(errno));
}
}
+ DBGPRINTF("imzmq3: zloop_poller starting...");
zloop_start(s_zloop);
zloop_destroy(&s_zloop);
- finalize_it:
- for(i=0; i< s_nitems; ++i) {
- zsocket_destroy(s_context, items[i].socket);
- }
-
+ DBGPRINTF("imzmq3: zloop_poller stopped.");
+finalize_it:
zctx_destroy(&s_context);
free(items);
+ free(pollerData);
RETiRet;
}
@@ -564,7 +647,8 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){
BEGINrunInput
CODESTARTrunInput
- iRet = rcv_loop(pThrd);
+ CHKiRet(rcv_loop(pThrd));
+finalize_it:
RETiRet;
ENDrunInput
@@ -572,17 +656,13 @@ ENDrunInput
/* initialize and return if will run or not */
BEGINwillRun
CODESTARTwillRun
- /* we need to create the inputName property (only once during our
+ /* we need to create the inputName property (only once during our
lifetime) */
- CHKiRet(prop.Construct(&s_namep));
- CHKiRet(prop.SetString(s_namep,
+ CHKiRet(prop.Construct(&s_namep));
+ CHKiRet(prop.SetString(s_namep,
UCHAR_CONSTANT("imzmq3"),
sizeof("imzmq3") - 1));
- CHKiRet(prop.ConstructFinalize(s_namep));
-
-/* If there are no endpoints this is pointless ...*/
- if (s_nitems == 0)
- ABORT_FINALIZE(RS_RET_NO_RUN);
+ CHKiRet(prop.ConstructFinalize(s_namep));
finalize_it:
ENDwillRun
@@ -590,70 +670,207 @@ ENDwillRun
BEGINafterRun
CODESTARTafterRun
- /* do cleanup here */
- if(s_namep != NULL)
- prop.Destruct(&s_namep);
+ /* do cleanup here */
+ if (s_namep != NULL)
+ prop.Destruct(&s_namep);
ENDafterRun
BEGINmodExit
CODESTARTmodExit
- /* release what we no longer need */
- objRelease(errmsg, CORE_COMPONENT);
- objRelease(glbl, CORE_COMPONENT);
- objRelease(prop, CORE_COMPONENT);
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
ENDmodExit
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
- if(eFeat == sFEATURENonCancelInputTermination)
- iRet = RS_RET_OK;
+ if (eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
ENDisCompatibleWithFeature
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ /* After endCnfLoad() (BEGINendCnfLoad...ENDendCnfLoad) is called,
+ * the pModConf pointer must not be used to change the in-memory
+ * config object. It's safe to use the same pointer for accessing
+ * the config object until freeCnf() (BEGINfreeCnf...ENDfreeCnf). */
+ runModConf = pModConf;
+ runModConf->pConf = pConf;
+ /* init module config */
+ runModConf->io_threads = 0; /* 0 means don't set it */
+ENDbeginCnfLoad
+
+
+BEGINsetModCnf
+ struct cnfparamvals* pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if (NULL == pvals) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imzmq3: error processing module "
+ " config parameters ['module(...)']");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ for (i=0; i < modpblk.nParams; ++i) {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(modpblk.descr[i].name, "ioThreads")) {
+ runModConf->io_threads = (int)pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "imzmq3: config error, unknown "
+ "param %s in setModCnf\n",
+ modpblk.descr[i].name);
+ }
+ }
+
+finalize_it:
+ if (pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ /* Last chance to make changes to the in-memory config object for this
+ * input module. After this call, the config object must no longer be
+ * changed. */
+ if (pModConf != runModConf) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has "
+ "changed - pModConf=%p, runModConf=%p", pModConf, runModConf);
+ }
+ assert(pModConf == runModConf);
+ENDendCnfLoad
+
+
+/* function to generate error message if framework does not find requested ruleset */
+static inline void
+std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst)
+{
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: ruleset '%s' for socket %s not found - "
+ "using default ruleset instead", inst->pszBindRuleset,
+ inst->description);
+}
+
+
+BEGINcheckCnf
+instanceConf_t* inst;
+CODESTARTcheckCnf
+ for(inst = pModConf->root; inst!=NULL; inst=inst->next) {
+ std_checkRuleset(pModConf, inst);
+ /* now, validate the instanceConf */
+ CHKiRet(validateConfig(inst));
+ }
+finalize_it:
+ RETiRet;
+ENDcheckCnf
+
+
+BEGINactivateCnfPrePrivDrop
+CODESTARTactivateCnfPrePrivDrop
+ if (pModConf != runModConf) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has "
+ "changed - pModConf=%p, runModConf=%p", pModConf, runModConf);
+ }
+ assert(pModConf == runModConf);
+
+ /* first create the context */
+ createContext();
+
+ /* could setup context here, and set the global worker threads
+ and so on... */
+ENDactivateCnfPrePrivDrop
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ if (pModConf != runModConf) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has "
+ "changed - pModConf=%p, runModConf=%p", pModConf, runModConf);
+ }
+ assert(pModConf == runModConf);
+ENDactivateCnf
+
+
+BEGINfreeCnf
+ struct lstn_s *lstn, *lstn_r;
+ instanceConf_t *inst, *inst_r;
+ sublist *sub, *sub_r;
+CODESTARTfreeCnf
+ DBGPRINTF("imzmq3: BEGINfreeCnf ...\n");
+ if (pModConf != runModConf) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has "
+ "changed - pModConf=%p, runModConf=%p", pModConf, runModConf);
+ }
+ for (lstn = lcnfRoot; lstn != NULL; ) {
+ lstn_r = lstn;
+ lstn = lstn_r->next;
+ free(lstn_r);
+ }
+ for (inst = pModConf->root ; inst != NULL ; ) {
+ for (sub = inst->subscriptions; sub != NULL; ) {
+ free(sub->subscribe);
+ sub_r = sub;
+ sub = sub_r->next;
+ free(sub_r);
+ }
+ free(inst->pszBindRuleset);
+ inst_r = inst;
+ inst = inst->next;
+ free(inst_r);
+ }
+ENDfreeCnf
+
+
+BEGINnewInpInst
+ struct cnfparamvals* pvals;
+CODESTARTnewInpInst
+
+ DBGPRINTF("newInpInst (imzmq3)\n");
+ pvals = nvlstGetParams(lst, &inppblk, NULL);
+ if(NULL==pvals) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS,
+ "imzmq3: required parameters are missing\n");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+ DBGPRINTF("imzmq3: input param blk:\n");
+ cnfparamsPrint(&inppblk, pvals);
+
+ /* now, parse the config params and so on... */
+ CHKiRet(createListener(pvals));
+
+finalize_it:
+CODE_STD_FINALIZERnewInpInst
+ cnfparamvalsDestruct(pvals, &inppblk);
+ENDnewInpInst
+
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
+CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
-static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp,
- void __attribute__((unused)) *pVal) {
- return RS_RET_OK;
-}
-static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) {
- errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val);
- s_io_threads = val;
- return RS_RET_OK;
-}
BEGINmodInit()
CODESTARTmodInit
/* we only support the current interface specification */
- *ipIFVersProvided = CURR_MOD_IF_VERSION;
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
CODEmodInit_QueryRegCFSLineHdlr
- CHKiRet(objUse(errmsg, CORE_COMPONENT));
- CHKiRet(objUse(glbl, CORE_COMPONENT));
- CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
-
- /* register config file handlers */
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset",
- 0, eCmdHdlrGetWord,
- set_ruleset, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun",
- 0, eCmdHdlrGetWord,
- add_endpoint, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables",
- 1, eCmdHdlrCustomHandler,
- resetConfigVariables, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads",
- 1, eCmdHdlrInt,
- setGlobalWorkerThreads, NULL,
- STD_LOADABLE_MODULE_ID));
ENDmodInit
+
+
diff --git a/plugins/mmaudit/Makefile.am b/plugins/mmaudit/Makefile.am
index c64d0822..77b2e85f 100644
--- a/plugins/mmaudit/Makefile.am
+++ b/plugins/mmaudit/Makefile.am
@@ -1,8 +1,8 @@
pkglib_LTLIBRARIES = mmaudit.la
mmaudit_la_SOURCES = mmaudit.c
-mmaudit_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBLOGNORM_CFLAGS) $(LIBEE_CFLAGS)
-mmaudit_la_LDFLAGS = -module -avoid-version $(LIBLOGNORM_LIBS) $(LIBEE_LIBS)
+mmaudit_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmaudit_la_LDFLAGS = -module -avoid-version
mmaudit_la_LIBADD =
EXTRA_DIST =
diff --git a/plugins/mmaudit/mmaudit.c b/plugins/mmaudit/mmaudit.c
index 018e1771..6b6b804c 100644
--- a/plugins/mmaudit/mmaudit.c
+++ b/plugins/mmaudit/mmaudit.c
@@ -43,8 +43,7 @@
#include <errno.h>
#include <unistd.h>
#include <ctype.h>
-#include <libestr.h>
-#include <libee/libee.h>
+#include <json/json.h>
#include "conf.h"
#include "syslogd-types.h"
#include "template.h"
diff --git a/plugins/mmcount/Makefile.am b/plugins/mmcount/Makefile.am
new file mode 100644
index 00000000..9c8c99db
--- /dev/null
+++ b/plugins/mmcount/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmcount.la
+
+mmcount_la_SOURCES = mmcount.c
+mmcount_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmcount_la_LDFLAGS = -module -avoid-version
+mmcount_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/mmcount/mmcount.c b/plugins/mmcount/mmcount.c
new file mode 100644
index 00000000..56a4de55
--- /dev/null
+++ b/plugins/mmcount/mmcount.c
@@ -0,0 +1,342 @@
+/* mmcount.c
+ * count messages by priority or json property of given app-name.
+ *
+ * Copyright 2013 Red Hat Inc.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <json/json.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "hashtable.h"
+
+#define JSON_COUNT_NAME "!mmcount"
+#define SEVERITY_COUNT 8
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmcount")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+typedef struct _instanceData {
+ char *pszAppName;
+ int severity[SEVERITY_COUNT];
+ char *pszKey;
+ char *pszValue;
+ int valueCounter;
+ struct hashtable *ht;
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "appname", eCmdHdlrGetWord, 0 },
+ { "key", eCmdHdlrGetWord, 0 },
+ { "value", eCmdHdlrGetWord, 0 },
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ENDfreeInstance
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ int i;
+
+ pData->pszAppName = NULL;
+ for (i = 0; i < SEVERITY_COUNT; i++)
+ pData->severity[i] = 0;
+ pData->pszKey = NULL;
+ pData->pszValue = NULL;
+ pData->valueCounter = 0;
+ pData->ht = NULL;
+}
+
+static unsigned int
+hash_from_key_fn(void *k)
+{
+ return *(unsigned int *)k;
+}
+
+static int
+key_equals_fn(void *k1, void *k2)
+{
+ return (*(unsigned int *)k1 == *(unsigned int *)k2);
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmcount)\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "appname")) {
+ pData->pszAppName = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "key")) {
+ pData->pszKey = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "value")) {
+ pData->pszValue = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ dbgprintf("mmcount: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+
+ if(pData->pszAppName == NULL) {
+ dbgprintf("mmcount: action requires a appname");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(pData->pszKey != NULL && pData->pszValue == NULL) {
+ if(NULL == (pData->ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL))) {
+ DBGPRINTF("mmcount: error creating hash table!\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+static int *
+getCounter(struct hashtable *ht, char *str) {
+ unsigned int key;
+ int *pCounter;
+ unsigned int *pKey;
+
+ /* we dont store str as key, instead we store hash of the str
+ as key to reduce memory usage */
+ key = hash_from_string(str);
+ pCounter = hashtable_search(ht, &key);
+ if(pCounter) {
+ return pCounter;
+ }
+
+ /* counter is not found for the str, so add new entry and
+ return the counter */
+ if(NULL == (pKey = (unsigned int*)malloc(sizeof(unsigned int)))) {
+ DBGPRINTF("mmcount: memory allocation for key failed\n");
+ return NULL;
+ }
+ *pKey = key;
+
+ if(NULL == (pCounter = (int*)malloc(sizeof(int)))) {
+ DBGPRINTF("mmcount: memory allocation for value failed\n");
+ free(pKey);
+ return NULL;
+ }
+ *pCounter = 0;
+
+ if(!hashtable_insert(ht, pKey, pCounter)) {
+ DBGPRINTF("mmcount: inserting element into hashtable failed\n");
+ free(pKey);
+ free(pCounter);
+ return NULL;
+ }
+ return pCounter;
+}
+
+BEGINdoAction
+ msg_t *pMsg;
+ char *appname;
+ struct json_object *json = NULL;
+ es_str_t *estr = NULL;
+ struct json_object *keyjson = NULL;
+ char *pszValue;
+ int *pCounter;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+ appname = getAPPNAME(pMsg, LOCK_MUTEX);
+
+ if(0 != strcmp(appname, pData->pszAppName)) {
+ /* we are not working for this appname. nothing to do */
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ if(!pData->pszKey) {
+ /* no key given for count, so we count severity */
+ if(pMsg->iSeverity <= SEVERITY_COUNT) {
+ pData->severity[pMsg->iSeverity]++;
+ json = json_object_new_int(pData->severity[pMsg->iSeverity]);
+ }
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* key is given, so get the property json */
+ estr = es_newStrFromBuf(pData->pszKey, strlen(pData->pszKey));
+ if(msgGetCEEPropJSON(pMsg, estr, &keyjson) != RS_RET_OK) {
+ /* key not found in the message. nothing to do */
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* key found, so get the value */
+ pszValue = (char*)json_object_get_string(keyjson);
+
+ if(pData->pszValue) {
+ /* value also given for count */
+ if(!strcmp(pszValue, pData->pszValue)) {
+ /* count for (value and key and appname) matched */
+ pData->valueCounter++;
+ json = json_object_new_int(pData->valueCounter);
+ }
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* value is not given, so we count for each value of given key */
+ pCounter = getCounter(pData->ht, pszValue);
+ if(pCounter) {
+ (*pCounter)++;
+ json = json_object_new_int(*pCounter);
+ }
+finalize_it:
+ if(estr) {
+ es_deleteStr(estr);
+ }
+
+ if(json) {
+ msgAddJSON(pMsg, (uchar *)JSON_COUNT_NAME, json);
+ }
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":mmcount:", sizeof(":mmcount:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmcount supports only v6+ config format, use: "
+ "action(type=\"mmcount\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmcount: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/mmfields/Makefile.am b/plugins/mmfields/Makefile.am
new file mode 100644
index 00000000..08170d52
--- /dev/null
+++ b/plugins/mmfields/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmfields.la
+
+mmfields_la_SOURCES = mmfields.c
+mmfields_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmfields_la_LDFLAGS = -module -avoid-version
+mmfields_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/mmfields/mmfields.c b/plugins/mmfields/mmfields.c
new file mode 100644
index 00000000..99c78916
--- /dev/null
+++ b/plugins/mmfields/mmfields.c
@@ -0,0 +1,265 @@
+/* mmfields.c
+ * Parse all fields of the message into structured data inside the
+ * JSON tree.
+ *
+ * Copyright 2013 Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmfields")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+/* define operation modes we have */
+#define SIMPLE_MODE 0 /* just overwrite */
+#define REWRITE_MODE 1 /* rewrite IP address, canoninized */
+typedef struct _instanceData {
+ char separator;
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "separator", eCmdHdlrGetChar, 0 }
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ENDfreeInstance
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->separator = ',';
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmfields)\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "separator")) {
+ pData->separator = es_getBufAddr(pvals[i].val.d.estr)[0];
+ } else {
+ dbgprintf("mmfields: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+
+static inline rsRetVal
+extractField(instanceData *pData, uchar *msgtext, int lenMsg, int *curridx, uchar *fieldbuf)
+{
+ int i, j;
+ DEFiRet;
+ i = *curridx;
+ j = 0;
+ while(i < lenMsg && msgtext[i] != pData->separator) {
+ fieldbuf[j++] = msgtext[i++];
+ }
+ fieldbuf[j] = '\0';
+ if(i < lenMsg)
+ ++i;
+ *curridx = i;
+
+ RETiRet;
+}
+
+
+static inline rsRetVal
+parse_fields(instanceData *pData, msg_t *pMsg, uchar *msgtext, int lenMsg)
+{
+ uchar fieldbuf[32*1024];
+ uchar fieldname[512];
+ struct json_object *json;
+ struct json_object *jval;
+ int field;
+ uchar *buf;
+ int currIdx = 0;
+ DEFiRet;
+
+ if(lenMsg < (int) sizeof(fieldbuf)) {
+ buf = fieldbuf;
+ } else {
+ CHKmalloc(buf = malloc(lenMsg+1));
+ }
+
+ json = json_object_new_object();
+ if(json == NULL) {
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ field = 1;
+ while(currIdx < lenMsg) {
+ CHKiRet(extractField(pData, msgtext, lenMsg, &currIdx, buf));
+ DBGPRINTF("mmfields: field %d: '%s'\n", field, buf);
+ snprintf(fieldname, sizeof(fieldname), "f%d", (char*)field);
+ fieldname[sizeof(fieldname)-1] = '\0';
+ jval = json_object_new_string((char*)fieldbuf);
+ json_object_object_add(json, (char*)fieldname, jval);
+ field++;
+ }
+ msgAddJSON(pMsg, (uchar*)"!", json);
+finalize_it:
+ RETiRet;
+}
+
+
+BEGINdoAction
+ msg_t *pMsg;
+ uchar *msg;
+ int lenMsg;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+ lenMsg = getMSGLen(pMsg);
+ msg = getMSG(pMsg);
+ CHKiRet(parse_fields(pData, pMsg, msg, lenMsg));
+finalize_it:
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":mmfields:", sizeof(":mmfields:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmfields supports only v6+ config format, use: "
+ "action(type=\"mmfields\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmfields: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/mmjsonparse/Makefile.am b/plugins/mmjsonparse/Makefile.am
index 5175fe81..ef39163e 100644
--- a/plugins/mmjsonparse/Makefile.am
+++ b/plugins/mmjsonparse/Makefile.am
@@ -1,8 +1,8 @@
pkglib_LTLIBRARIES = mmjsonparse.la
mmjsonparse_la_SOURCES = mmjsonparse.c
-mmjsonparse_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBLOGNORM_CFLAGS) $(LIBEE_CFLAGS)
-mmjsonparse_la_LDFLAGS = -module -avoid-version $(LIBLOGNORM_LIBS) $(LIBEE_LIBS)
+mmjsonparse_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmjsonparse_la_LDFLAGS = -module -avoid-version
mmjsonparse_la_LIBADD =
EXTRA_DIST =
diff --git a/plugins/mmjsonparse/mmjsonparse.c b/plugins/mmjsonparse/mmjsonparse.c
index c47aceb6..35f69aab 100644
--- a/plugins/mmjsonparse/mmjsonparse.c
+++ b/plugins/mmjsonparse/mmjsonparse.c
@@ -35,7 +35,6 @@
#include <errno.h>
#include <unistd.h>
#include <ctype.h>
-#include <libestr.h>
#include <json/json.h>
#include "conf.h"
#include "syslogd-types.h"
diff --git a/plugins/omjournal/omjournal.c b/plugins/omjournal/omjournal.c
index c340287f..160c369d 100644
--- a/plugins/omjournal/omjournal.c
+++ b/plugins/omjournal/omjournal.c
@@ -107,6 +107,7 @@ CODESTARTnewActInst
* the lst ptr. However, we will most probably need params in the
* future.
*/
+ (void) lst; /* prevent compiler warning */
DBGPRINTF("newActInst (mmjournal)\n");
CODE_STD_STRING_REQUESTnewActInst(1)
CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
@@ -145,6 +146,7 @@ CODESTARTdoAction
"SYSLOG_IDENTIFIER=%s", tag,
NULL);
/* FIXME: think about what to do with errors ;) */
+ (void) r; /* prevent compiler warning */
ENDdoAction
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index c9e32444..50f6f905 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -55,16 +55,22 @@ DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
+#define DFLT_ENABLE_TLS 0
+#define DFLT_ENABLE_TLSZIP 0
+
static relpEngine_t *pRelpEngine; /* our relp engine */
typedef struct _instanceData {
uchar *target;
- int compressionLevel; /* 0 - no compression, else level for zlib */
uchar *port;
int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
unsigned timeout;
+ unsigned rebindInterval;
+ unsigned nSent;
relpClt_t *pRelpClt; /* relp client for this instance */
+ sbool bEnableTLS;
+ sbool bEnableTLSZip;
uchar *tplName;
} instanceData;
@@ -78,7 +84,10 @@ static configSettings_t __attribute__((unused)) cs;
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "target", eCmdHdlrGetWord, 1 },
+ { "tls", eCmdHdlrBinary, 0 },
+ { "tls.compression", eCmdHdlrBinary, 0 },
{ "port", eCmdHdlrGetWord, 0 },
+ { "rebindinterval", eCmdHdlrInt, 0 },
{ "timeout", eCmdHdlrInt, 0 },
{ "template", eCmdHdlrGetWord, 1 }
};
@@ -113,6 +122,20 @@ doCreateRelpClient(instanceData *pData)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(pData->bEnableTLS) {
+ if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(pData->bEnableTLSZip) {
+ if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ }
+ if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */
+ if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ pData->bInitialConnect = 1;
+ pData->nSent = 0;
finalize_it:
RETiRet;
}
@@ -120,7 +143,10 @@ finalize_it:
BEGINcreateInstance
CODESTARTcreateInstance
- pData->bInitialConnect = 1;
+ pData->timeout = 90;
+ pData->rebindInterval = 0;
+ pData->bEnableTLS = DFLT_ENABLE_TLS;
+ pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP;
ENDcreateInstance
BEGINfreeInstance
@@ -139,6 +165,9 @@ setInstParamDefaults(instanceData *pData)
pData->port = NULL;
pData->tplName = NULL;
pData->timeout = 90;
+ pData->rebindInterval = 0;
+ pData->bEnableTLS = DFLT_ENABLE_TLS;
+ pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP;
}
@@ -164,6 +193,12 @@ CODESTARTnewActInst
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "timeout")) {
pData->timeout = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) {
+ pData->rebindInterval = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls")) {
+ pData->bEnableTLS = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls.compression")) {
+ pData->bEnableTLSZip = (unsigned) pvals[i].val.d.n;
} else {
dbgprintf("omrelp: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
@@ -232,6 +267,17 @@ CODESTARTtryResume
iRet = doConnect(pData);
ENDtryResume
+static inline rsRetVal
+doRebind(instanceData *pData)
+{
+ DEFiRet;
+ DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n");
+ CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt));
+ pData->bIsConnected = 0;
+ CHKiRet(doCreateRelpClient(pData));
+finalize_it:
+ RETiRet;
+}
BEGINdoAction
uchar *pMsg; /* temporary buffering */
@@ -247,7 +293,7 @@ CODESTARTdoAction
pMsg = ppString[0];
lenMsg = strlen((char*) pMsg); /* TODO: don't we get this? */
- /* TODO: think about handling oversize messages! */
+ /* we need to truncate oversize msgs - no way around that... */
if((int) lenMsg > glbl.GetMaxLine())
lenMsg = glbl.GetMaxLine();
@@ -256,9 +302,13 @@ CODESTARTdoAction
if(ret != RELP_RET_OK) {
/* error! */
dbgprintf("error forwarding via relp, suspending\n");
- iRet = RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ if(pData->rebindInterval != 0 &&
+ (++pData->nSent >= pData->rebindInterval)) {
+ doRebind(pData);
+ }
finalize_it:
ENDdoAction
@@ -279,62 +329,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
if((iRet = createInstance(&pData)) != RS_RET_OK)
FINALIZE;
- /* we are now after the protocol indicator. Now check if we should
- * use compression. We begin to use a new option format for this:
- * @(option,option)host:port
- * The first option defined is "z[0..9]" where the digit indicates
- * the compression level. If it is not given, 9 (best compression) is
- * assumed. An example action statement might be:
- * :omrelp:(z5,o)127.0.0.1:1400
- * Which means send via TCP with medium (5) compresion (z) to the local
- * host on port 1400. The '0' option means that octet-couting (as in
- * IETF I-D syslog-transport-tls) is to be used for framing (this option
- * applies to TCP-based syslog only and is ignored when specified with UDP).
- * That is not yet implemented.
- * rgerhards, 2006-12-07
- * TODO: think of all this in spite of RELP -- rgerhards, 2008-03-13
- */
- if(*p == '(') {
- /* at this position, it *must* be an option indicator */
- do {
- ++p; /* eat '(' or ',' (depending on when called) */
- /* check options */
- if(*p == 'z') { /* compression */
-# ifdef USE_NETZIP
- ++p; /* eat */
- if(isdigit((int) *p)) {
- int iLevel;
- iLevel = *p - '0';
- ++p; /* eat */
- pData->compressionLevel = iLevel;
- } else {
- errmsg.LogError(0, NO_ERRCODE, "Invalid compression level '%c' specified in "
- "forwardig action - NOT turning on compression.",
- *p);
- }
-# else
- errmsg.LogError(0, NO_ERRCODE, "Compression requested, but rsyslogd is not compiled "
- "with compression support - request ignored.");
-# endif /* #ifdef USE_NETZIP */
- } else { /* invalid option! Just skip it... */
- errmsg.LogError(0, NO_ERRCODE, "Invalid option %c in forwarding action - ignoring.", *p);
- ++p; /* eat invalid option */
- }
- /* the option processing is done. We now do a generic skip
- * to either the next option or the end of the option
- * block.
- */
- while(*p && *p != ')' && *p != ',')
- ++p; /* just skip it */
- } while(*p && *p == ','); /* Attention: do.. while() */
- if(*p == ')')
- ++p; /* eat terminator, on to next */
- else
- /* we probably have end of string - leave it for the rest
- * of the code to handle it (but warn the user)
- */
- errmsg.LogError(0, NO_ERRCODE, "Option block not terminated in forwarding action.");
- }
/* extract the host first (we do a trick - we replace the ';' or ':' with a '\0')
* now skip to port and then template name. rgerhards 2005-07-06
*/
@@ -384,7 +378,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
++p;
}
- /* TODO: make this if go away! */
if(*p == ';') {
*p = '\0'; /* trick to obtain hostname (later)! */
CHKmalloc(pData->target = ustrdup(q));
diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README
index ccc96c74..c2a33555 100644
--- a/plugins/omzmq3/README
+++ b/plugins/omzmq3/README
@@ -1,16 +1,10 @@
ZeroMQ 3.x Output Plugin
Building this plugin:
-Requires libzmq and libczmq. First, install libzmq from the HEAD on github:
-http://github.com/zeromq/libzmq. You can clone the repository, build, then
-install it. The directions for doing so are there in the readme. Then, do
-the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1
-version of libzmq will be released, and a supporting version of libczmq.
-At that time, you could simply download and install the tarballs instead of
-using git to clone the repositories. Those tarballs (when available) can
-be found at http://download.zeromq.org. As of this writing (5/31/2012), the
-most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile
-properly.
+Requires libzmq and libczmq. First, download the tarballs of both libzmq
+and its supporting libczmq from http://download.zeromq.org. As of this
+writing (04/23/2013), the most recent versions of libzmq and czmq are
+3.2.2 and 1.3.2 respectively. Configure, build, and then install both libs.
Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example
below binds a PUB socket to port 7171, and any message fitting the criteria will
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c
index ee6756b9..c8552f11 100644
--- a/plugins/omzmq3/omzmq3.c
+++ b/plugins/omzmq3/omzmq3.c
@@ -110,9 +110,10 @@ static zctx_t* s_context = NULL;
static int s_workerThreads = -1;
static struct socket_type types[] = {
- {"PUB", ZMQ_PUB },
- {"PUSH", ZMQ_PUSH },
- {"XPUB", ZMQ_XPUB }
+ {"PUB", ZMQ_PUB },
+ {"PUSH", ZMQ_PUSH },
+ {"DEALER", ZMQ_DEALER },
+ {"XPUB", ZMQ_XPUB }
};
static struct socket_action actions[] = {
@@ -201,17 +202,18 @@ static rsRetVal initZMQ(instanceData* pData) {
/* create the context if necessary. */
if (NULL == s_context) {
+ zsys_handler_set(NULL);
s_context = zctx_new();
if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads);
}
pData->socket = zsocket_new(s_context, pData->type);
-
- /* ALWAYS set the HWM as the zmq3 default is 1000 and we default
- to 0 (infinity) */
- zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
- zsocket_set_sndhwm(pData->socket, pData->sndHWM);
-
+ if (NULL == pData->socket) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE,
+ "omzmq3: zsocket_new failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
+ }
/* use czmq defaults for these, unless set to non-default values */
if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity);
if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf);
@@ -228,17 +230,26 @@ static rsRetVal initZMQ(instanceData* pData) {
if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax);
if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only);
if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity);
-
+ if(pData->rcvHWM > -1) zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
+ if(pData->sndHWM > -1) zsocket_set_sndhwm(pData->socket, pData->sndHWM);
+
/* bind or connect to it */
if (pData->action == ACTION_BIND) {
/* bind asserts, so no need to test return val here
which isn't the greatest api -- oh well */
- zsocket_bind(pData->socket, (char*)pData->description);
+ if(-1 == zsocket_bind(pData->socket, (char*)pData->description)) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: bind failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
+ }
+ DBGPRINTF("omzmq3: bind to %s successful\n",pData->description);
} else {
- if(zsocket_connect(pData->socket, (char*)pData->description) == -1) {
- errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!");
- ABORT_FINALIZE(RS_RET_SUSPENDED);
+ if(-1 == zsocket_connect(pData->socket, (char*)pData->description)) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: connect failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
}
+ DBGPRINTF("omzmq3: connect to %s successful", pData->description);
}
finalize_it:
RETiRet;
@@ -256,7 +267,7 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
/* whine if things went wrong */
if (result == -1) {
- errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result);
+ errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed: %s", msg, zmq_strerror(errno));
ABORT_FINALIZE(RS_RET_ERR);
}
finalize_it:
@@ -265,13 +276,13 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
static inline void
setInstParamDefaults(instanceData* pData) {
- pData->description = (uchar*)"tcp://*:7171";
+ pData->description = NULL;
pData->socket = NULL;
pData->tplName = NULL;
pData->type = ZMQ_PUB;
pData->action = ACTION_BIND;
- pData->sndHWM = 0; /*unlimited*/
- pData->rcvHWM = 0; /*unlimited*/
+ pData->sndHWM = -1;
+ pData->rcvHWM = -1;
pData->identity = NULL;
pData->sndBuf = -1;
pData->rcvBuf = -1;
@@ -314,6 +325,7 @@ CODESTARTfreeInstance
closeZMQ(pData);
free(pData->description);
free(pData->tplName);
+ free(pData->identity);
ENDfreeInstance
BEGINtryResume
@@ -329,88 +341,90 @@ ENDdoAction
BEGINnewActInst
- struct cnfparamvals *pvals;
- int i;
+ struct cnfparamvals *pvals;
+ int i;
CODESTARTnewActInst
-if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
- ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
- }
+ if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
CODE_STD_STRING_REQUESTnewActInst(1)
-for(i = 0 ; i < actpblk.nParams ; ++i) {
- if(!pvals[i].bUsed)
- continue;
- if(!strcmp(actpblk.descr[i].name, "description")) {
- pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "template")) {
- pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "sockType")){
- pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
- } else if(!strcmp(actpblk.descr[i].name, "action")){
- pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
- } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) {
- pData->sndHWM = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) {
- pData->rcvHWM = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "identity")){
- pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) {
- pData->sndBuf = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) {
- pData->rcvBuf = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "linger")) {
- pData->linger = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "backlog")) {
- pData->backlog = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) {
- pData->sndTimeout = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
- pData->rcvTimeout = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
- pData->maxMsgSize = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rate")) {
- pData->rate = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
- pData->recoveryIVL = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) {
- pData->multicastHops = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
- pData->reconnectIVL = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
- pData->reconnectIVLMax = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) {
- pData->ipv4Only = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "affinity")) {
- pData->affinity = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
- s_workerThreads = (int) pvals[i].val.d.n;
- } else {
- errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
- "param '%s'\n", actpblk.descr[i].name);
+ for (i = 0; i < actpblk.nParams; ++i) {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(actpblk.descr[i].name, "description")) {
+ pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "sockType")){
+ pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if (!strcmp(actpblk.descr[i].name, "action")){
+ pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if (!strcmp(actpblk.descr[i].name, "sndHWM")) {
+ pData->sndHWM = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvHWM")) {
+ pData->rcvHWM = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "identity")){
+ pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "sndBuf")) {
+ pData->sndBuf = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvBuf")) {
+ pData->rcvBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "linger")) {
+ pData->linger = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "backlog")) {
+ pData->backlog = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "sndTimeout")) {
+ pData->sndTimeout = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
+ pData->rcvTimeout = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
+ pData->maxMsgSize = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rate")) {
+ pData->rate = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
+ pData->recoveryIVL = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "multicastHops")) {
+ pData->multicastHops = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
+ pData->reconnectIVL = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
+ pData->reconnectIVLMax = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "ipv4Only")) {
+ pData->ipv4Only = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "affinity")) {
+ pData->affinity = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
+ s_workerThreads = (int) pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
}
- }
-
-if(pData->tplName == NULL) {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
- } else {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
- }
-
-if(pData->type == -1) {
- errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type.");
- ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
- }
-if(pData->action == -1) {
- errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action");
- ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
- }
+ if (pData->tplName == NULL) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup("RSYSLOG_ForwardFormat"), OMSR_NO_RQD_TPL_OPTS));
+ } else {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
+ }
+ if (NULL == pData->description) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: you didn't enter a description");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ if (pData->type == -1) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type.");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ if (pData->action == -1) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
CODE_STD_FINALIZERnewActInst
- cnfparamvalsDestruct(pvals, &actpblk);
+ cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
BEGINparseSelectorAct
@@ -433,10 +447,10 @@ ENDinitConfVars
BEGINmodExit
CODESTARTmodExit
-if(NULL != s_context) {
- zctx_destroy(&s_context);
- s_context=NULL;
- }
+ if (NULL != s_context) {
+ zctx_destroy(&s_context);
+ s_context=NULL;
+ }
ENDmodExit