summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog1
-rw-r--r--plugins/imzmq3/README59
-rw-r--r--plugins/imzmq3/imzmq3.c735
-rw-r--r--plugins/omzmq3/README14
-rw-r--r--plugins/omzmq3/omzmq3.c202
5 files changed, 614 insertions, 397 deletions
diff --git a/ChangeLog b/ChangeLog
index fc70b515..c102d2a9 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -23,6 +23,7 @@ Version 7.3.16 [beta] 2013-05-??
This is a long-standing bug that was only recently reported by forum
user mc-sim.
Reference: http://kb.monitorware.com/post23448.html
+- 0mq fixes; credits to Hongfei Cheng and Brian Knox
---------------------------------------------------------------------------
Version 7.3.15 [beta] 2013-05-15
- bugfix: problem in build system (especially when cross-compiling)
diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README
index 88653b83..9a108a01 100644
--- a/plugins/imzmq3/README
+++ b/plugins/imzmq3/README
@@ -1,24 +1,59 @@
ZeroMQ 3.x Input Plugin
Building this plugin:
-Requires libzmq and libczmq. First, install libzmq from the HEAD on github:
-http://github.com/zeromq/libzmq. You can clone the repository, build, then
-install it. The directions for doing so are there in the readme. Then, do
-the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1
-version of libzmq will be released, and a supporting version of libczmq.
-At that time, you could simply download and install the tarballs instead of
-using git to clone the repositories. Those tarballs (when available) can
-be found at http://download.zeromq.org. As of this writing (5/31/2012), the
-most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile
-properly.
+Requires libzmq and libczmq. First, download the tarballs of both libzmq
+and its supporting libczmq from http://download.zeromq.org. As of this
+writing (04/23/2013), the most recent versions of libzmq and czmq are
+3.2.2 and 1.3.2 respectively. Configure, build, and then install both libs.
Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example
below binds a SUB socket to port 7172, and then any messages with the topic
"foo" will be pushed into rsyslog.
+Please note:
+This plugin only supports the newer (v7) config format. Legacy config support
+was removed.
+
Example Rsyslog.conf snippet:
-------------------------------------------------------------------------------
-
-$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo
+module(load="imzmq3" ioThreads="1")
+input(type="imzmq3" action="CONNECT" socktype="SUB" description="tcp://*:7172" subscribe="foo,bar")
-------------------------------------------------------------------------------
+Note you can specify multiple subscriptions with a comma-delimited list, with
+no spaces between values.
+
+The only global parameter for this plugin is ioThreads, which is optional and
+probably best left to the zmq default unless you know exactly what you are
+doing.
+
+The instance-level parameters are:
+
+Required
+description
+subscribe (required if the sockType is SUB)
+
+Optional
+sockType (defaults to SUB)
+action (defaults to BIND
+sndHWM
+rcvHWM
+identity
+sndBuf
+rcvBuf
+linger
+backlog
+sndTimeout
+rcvTimeout
+maxMsgSize
+rate
+recoveryIVL
+multicastHops
+reconnectIVL
+reconnectIVLMax
+ipv4Only
+affinity
+
+These all correspond to zmq optional settings. Except where noted, the defaults
+are the zmq defaults if not set. See http://api.zeromq.org/3-2:zmq-setsockopt
+for info on these.
diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c
index 52c12a53..52ca5ebe 100644
--- a/plugins/imzmq3/imzmq3.c
+++ b/plugins/imzmq3/imzmq3.c
@@ -19,20 +19,21 @@
* License along with this program. If not, see
* <http://www.gnu.org/licenses/>.
*
- * Author: David Kelly
- * <davidk@talksum.com>
+ * Authors:
+ * David Kelly <davidk@talksum.com>
+ * Hongfei Cheng <hongfeic@talksum.com>
*/
+
+#include "config.h"
+#include "rsyslog.h"
+
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-
-#include "rsyslog.h"
-
#include "cfsysline.h"
-#include "config.h"
#include "dirty.h"
#include "errmsg.h"
#include "glbl.h"
@@ -49,6 +50,7 @@
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("imzmq3");
/* convienent symbols to denote a socket we want to bind
* vs one we want to just connect to
@@ -83,47 +85,68 @@ typedef struct _poller_data {
thrdInfo_t* thread;
} poller_data;
-typedef struct _socket_info {
- int type;
- int action;
- char* description;
- int sndHWM; /* if you want more than 2^32 messages, */
- int rcvHWM; /* then pass in 0 (the default). */
- char* identity;
- char** subscriptions;
- ruleset_t* ruleset;
- int sndBuf;
- int rcvBuf;
- int linger;
- int backlog;
- int sndTimeout;
- int rcvTimeout;
- int maxMsgSize;
- int rate;
- int recoveryIVL;
- int multicastHops;
- int reconnectIVL;
- int reconnectIVLMax;
- int ipv4Only;
- int affinity;
-
-} socket_info;
+/* a linked-list of subscription topics */
+typedef struct sublist_t {
+ char* subscribe;
+ struct sublist_t* next;
+} sublist;
+
+struct instanceConf_s {
+ int type;
+ int action;
+ char* description;
+ int sndHWM; /* if you want more than 2^32 messages, */
+ int rcvHWM; /* then pass in 0 (the default). */
+ char* identity;
+ sublist* subscriptions;
+ int sndBuf;
+ int rcvBuf;
+ int linger;
+ int backlog;
+ int sndTimeout;
+ int rcvTimeout;
+ int maxMsgSize;
+ int rate;
+ int recoveryIVL;
+ int multicastHops;
+ int reconnectIVL;
+ int reconnectIVLMax;
+ int ipv4Only;
+ int affinity;
+ uchar* pszBindRuleset;
+ ruleset_t* pBindRuleset;
+ struct instanceConf_s* next;
+
+};
+
+struct modConfData_s {
+ rsconf_t* pConf;
+ instanceConf_t* root;
+ instanceConf_t* tail;
+ int io_threads;
+};
+struct lstn_s {
+ struct lstn_s* next;
+ void* sock;
+ ruleset_t* pRuleset;
+};
/* ----------------------------------------------------------------------------
* Static definitions/initializations.
*/
-static socket_info* s_socketInfo = NULL;
-static size_t s_nitems = 0;
-static prop_t * s_namep = NULL;
+static modConfData_t* loadModConf = NULL;
+static modConfData_t* runModConf = NULL;
+static struct lstn_s* lcnfRoot = NULL;
+static struct lstn_s* lcnfLast = NULL;
+static prop_t* s_namep = NULL;
static zloop_t* s_zloop = NULL;
-static int s_io_threads = 1;
static zctx_t* s_context = NULL;
-static ruleset_t* s_ruleset = NULL;
static socket_type socketTypes[] = {
- {"SUB", ZMQ_SUB },
- {"PULL", ZMQ_PULL },
- {"XSUB", ZMQ_XSUB }
+ {"SUB", ZMQ_SUB },
+ {"PULL", ZMQ_PULL },
+ {"ROUTER", ZMQ_ROUTER },
+ {"XSUB", ZMQ_XSUB }
};
static socket_action socketActions[] = {
@@ -131,6 +154,48 @@ static socket_action socketActions[] = {
{"CONNECT", ACTION_CONNECT},
};
+static struct cnfparamdescr modpdescr[] = {
+ { "ioThreads", eCmdHdlrInt, 0 },
+};
+
+static struct cnfparamblk modpblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+};
+
+static struct cnfparamdescr inppdescr[] = {
+ { "description", eCmdHdlrGetWord, 0 },
+ { "sockType", eCmdHdlrGetWord, 0 },
+ { "subscribe", eCmdHdlrGetWord, 0 },
+ { "ruleset", eCmdHdlrGetWord, 0 },
+ { "action", eCmdHdlrGetWord, 0 },
+ { "sndHWM", eCmdHdlrInt, 0 },
+ { "rcvHWM", eCmdHdlrInt, 0 },
+ { "identity", eCmdHdlrGetWord, 0 },
+ { "sndBuf", eCmdHdlrInt, 0 },
+ { "rcvBuf", eCmdHdlrInt, 0 },
+ { "linger", eCmdHdlrInt, 0 },
+ { "backlog", eCmdHdlrInt, 0 },
+ { "sndTimeout", eCmdHdlrInt, 0 },
+ { "rcvTimeout", eCmdHdlrInt, 0 },
+ { "maxMsgSize", eCmdHdlrInt, 0 },
+ { "rate", eCmdHdlrInt, 0 },
+ { "recoveryIVL", eCmdHdlrInt, 0 },
+ { "multicastHops", eCmdHdlrInt, 0 },
+ { "reconnectIVL", eCmdHdlrInt, 0 },
+ { "reconnectIVLMax", eCmdHdlrInt, 0 },
+ { "ipv4Only", eCmdHdlrInt, 0 },
+ { "affinity", eCmdHdlrInt, 0 }
+};
+
+static struct cnfparamblk inppblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(inppdescr)/sizeof(struct cnfparamdescr),
+ inppdescr
+};
+
+#include "im-helper.h" /* must be included AFTER the type definitions! */
/* ----------------------------------------------------------------------------
* Helper functions
@@ -179,15 +244,16 @@ static int getSocketAction(char* name) {
}
-static void setDefaults(socket_info* info) {
- info->type = ZMQ_SUB;
- info->action = ACTION_BIND;
+static void setDefaults(instanceConf_t* info) {
+ info->type = -1;
+ info->action = -1;
info->description = NULL;
- info->sndHWM = 0;
- info->rcvHWM = 0;
+ info->sndHWM = -1;
+ info->rcvHWM = -1;
info->identity = NULL;
info->subscriptions = NULL;
- info->ruleset = NULL;
+ info->pszBindRuleset = NULL;
+ info->pBindRuleset = NULL;
info->sndBuf = -1;
info->rcvBuf = -1;
info->linger = -1;
@@ -205,90 +271,46 @@ static void setDefaults(socket_info* info) {
};
-
-/* The config string should look like:
- * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'"
- *
+/* given a comma separated list of subscriptions, create a char* array of them
+ * to set later
*/
-static rsRetVal parseConfig(char* config, socket_info* info) {
- int nsubs = 0;
-
- char* binding;
- char* ptr1;
- for (binding = strtok_r(config, ",", &ptr1);
- binding != NULL;
- binding = strtok_r(NULL, ",", &ptr1)) {
-
- /* Each binding looks like foo=bar */
- char * sep = strchr(binding, '=');
- if (sep == NULL)
- {
- errmsg.LogError(0, NO_ERRCODE,
- "Invalid argument format %s, ignoring ...",
- binding);
- continue;
- }
+static rsRetVal parseSubscriptions(char* subscribes, sublist** subList){
+ char* tok = strtok(subscribes, ",");
+ sublist* currentSub;
+ sublist* head;
+ DEFiRet;
- /* Replace '=' with '\0'. */
- *sep = '\0';
-
- char * val = sep + 1;
-
- if (strcmp(binding, "action") == 0) {
- info->action = getSocketAction(val);
- } else if (strcmp(binding, "type") == 0) {
- info->type = getSocketType(val);
- } else if (strcmp(binding, "description") == 0) {
- info->description = strdup(val);
- } else if (strcmp(binding, "sndHWM") == 0) {
- info->sndHWM = atoi(val);
- } else if (strcmp(binding, "rcvHWM") == 0) {
- info->sndHWM = atoi(val);
- } else if (strcmp(binding, "subscribe") == 0) {
- /* Add the subscription value to the list.*/
- char * substr = NULL;
- substr = strdup(val);
- info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1);
- info->subscriptions[nsubs] = substr;
- ++nsubs;
- } else if (strcmp(binding, "sndBuf") == 0) {
- info->sndBuf = atoi(val);
- } else if (strcmp(binding, "rcvBuf") == 0) {
- info->rcvBuf = atoi(val);
- } else if (strcmp(binding, "linger") == 0) {
- info->linger = atoi(val);
- } else if (strcmp(binding, "backlog") == 0) {
- info->backlog = atoi(val);
- } else if (strcmp(binding, "sndTimeout") == 0) {
- info->sndTimeout = atoi(val);
- } else if (strcmp(binding, "rcvTimeout") == 0) {
- info->rcvTimeout = atoi(val);
- } else if (strcmp(binding, "maxMsgSize") == 0) {
- info->maxMsgSize = atoi(val);
- } else if (strcmp(binding, "rate") == 0) {
- info->rate = atoi(val);
- } else if (strcmp(binding, "recoveryIVL") == 0) {
- info->recoveryIVL = atoi(val);
- } else if (strcmp(binding, "multicastHops") == 0) {
- info->multicastHops = atoi(val);
- } else if (strcmp(binding, "reconnectIVL") == 0) {
- info->reconnectIVL = atoi(val);
- } else if (strcmp(binding, "reconnectIVLMax") == 0) {
- info->reconnectIVLMax = atoi(val);
- } else if (strcmp(binding, "ipv4Only") == 0) {
- info->ipv4Only = atoi(val);
- } else if (strcmp(binding, "affinity") == 0) {
- info->affinity = atoi(val);
- } else {
- errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
- return RS_RET_INVALID_PARAMS;
+ /* create empty list */
+ CHKmalloc(*subList = (sublist*)MALLOC(sizeof(sublist)));
+ head = *subList;
+ head->next = NULL;
+ head->subscribe=NULL;
+ currentSub=head;
+
+ if(tok) {
+ head->subscribe=strdup(tok);
+ for(tok=strtok(NULL, ","); tok!=NULL;tok=strtok(NULL, ",")) {
+ CHKmalloc(currentSub->next = (sublist*)MALLOC(sizeof(sublist)));
+ currentSub=currentSub->next;
+ currentSub->subscribe=strdup(tok);
+ currentSub->next=NULL;
}
+ } else {
+ /* make empty subscription ie subscribe="" */
+ head->subscribe=strdup("");
}
-
- return RS_RET_OK;
+ /* TODO: temporary logging */
+ currentSub = head;
+ DBGPRINTF("imzmq3: Subscriptions:");
+ for(currentSub = head; currentSub != NULL; currentSub=currentSub->next) {
+ DBGPRINTF("'%s'", currentSub->subscribe);
+ }
+ DBGPRINTF("\n");
+finalize_it:
+ RETiRet;
}
-static rsRetVal validateConfig(socket_info* info) {
+static rsRetVal validateConfig(instanceConf_t* info) {
if (info->type == -1) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
@@ -307,7 +329,7 @@ static rsRetVal validateConfig(socket_info* info) {
}
if(info->type == ZMQ_SUB && info->subscriptions == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
- "SUB sockets need at least one subscription");
+ "SUB sockets need a subscription");
return RS_RET_INVALID_PARAMS;
}
if(info->type != ZMQ_SUB && info->subscriptions != NULL) {
@@ -320,39 +342,40 @@ static rsRetVal validateConfig(socket_info* info) {
static rsRetVal createContext() {
if (s_context == NULL) {
- errmsg.LogError(0, NO_ERRCODE, "creating zctx.");
+ DBGPRINTF("imzmq3: creating zctx...");
+ zsys_handler_set(NULL);
s_context = zctx_new();
if (s_context == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"zctx_new failed: %s",
- strerror(errno));
+ zmq_strerror(errno));
/* DK: really should do better than invalid params...*/
return RS_RET_INVALID_PARAMS;
}
-
- if (s_io_threads > 1) {
- errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads);
- zctx_set_iothreads(s_context, s_io_threads);
+ DBGPRINTF("success!\n");
+ if (runModConf->io_threads > 1) {
+ DBGPRINTF("setting io worker threads to %d\n", runModConf->io_threads);
+ zctx_set_iothreads(s_context, runModConf->io_threads);
}
}
return RS_RET_OK;
}
-static rsRetVal createSocket(socket_info* info, void** sock) {
- size_t ii;
+static rsRetVal createSocket(instanceConf_t* info, void** sock) {
int rv;
+ sublist* sub;
*sock = zsocket_new(s_context, info->type);
if (!sock) {
- errmsg.LogError(0,
+ errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zsocket_new failed: %s, for type %d",
- strerror(errno),info->type);
- /* DK: invalid params seems right here */
+ zmq_strerror(errno),info->type);
+ /* DK: invalid params seems right here */
return RS_RET_INVALID_PARAMS;
}
-
+ DBGPRINTF("imzmq3: socket of type %d created successfully\n", info->type)
/* Set options *before* the connect/bind. */
if (info->identity) zsocket_set_identity(*sock, info->identity);
if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf);
@@ -369,38 +392,36 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax);
if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only);
if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity);
-
- /* since HWM have defaults, we always set them. No return codes to check, either.*/
- zsocket_set_sndhwm(*sock, info->sndHWM);
- zsocket_set_rcvhwm(*sock, info->rcvHWM);
-
+ if (info->sndHWM > -1 ) zsocket_set_sndhwm(*sock, info->sndHWM);
+ if (info->rcvHWM > -1 ) zsocket_set_rcvhwm(*sock, info->rcvHWM);
/* Set subscriptions.*/
if (info->type == ZMQ_SUB) {
- for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii)
- zsocket_set_subscribe(*sock, info->subscriptions[ii]);
+ for(sub = info->subscriptions; sub!=NULL; sub=sub->next) {
+ zsocket_set_subscribe(*sock, sub->subscribe);
+ }
}
-
-
/* Do the bind/connect... */
if (info->action==ACTION_CONNECT) {
rv = zsocket_connect(*sock, info->description);
- if (rv < 0) {
+ if (rv == -1) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zmq_connect using %s failed: %s",
- info->description, strerror(errno));
+ info->description, zmq_strerror(errno));
return RS_RET_INVALID_PARAMS;
}
+ DBGPRINTF("imzmq3: connect for %s successful\n",info->description);
} else {
rv = zsocket_bind(*sock, info->description);
- if (rv <= 0) {
+ if (rv == -1) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zmq_bind using %s failed: %s",
- info->description, strerror(errno));
+ info->description, zmq_strerror(errno));
return RS_RET_INVALID_PARAMS;
}
+ DBGPRINTF("imzmq3: bind for %s successful\n",info->description);
}
return RS_RET_OK;
}
@@ -409,89 +430,138 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
* Module endpoints
*/
-/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note
- * that this makes the assumption that after the bind ruleset is called in the config,
- * another call will be made to add an endpoint.
-*/
-static rsRetVal
-set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) {
- ruleset_t* ruleset_ptr;
- rsRetVal localRet;
- DEFiRet;
-
- localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName);
- if(localRet == RS_RET_NOT_FOUND) {
- errmsg.LogError(0, NO_ERRCODE, "error: "
- "ruleset '%s' not found - ignored", pszName);
- }
- CHKiRet(localRet);
- s_ruleset = ruleset_ptr;
- DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName);
-
-finalize_it:
- free(pszName); /* no longer needed */
- RETiRet;
-}
/* add an actual endpoint
*/
-static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) {
+static rsRetVal createInstance(instanceConf_t** pinst) {
DEFiRet;
+ instanceConf_t* inst;
+ CHKmalloc(inst = MALLOC(sizeof(instanceConf_t)));
- /* increment number of items and store old num items, as it will be handy.*/
- size_t idx = s_nitems++;
-
- /* allocate a new socket_info array to accomidate this new endpoint*/
- socket_info* tmpSocketInfo;
- CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems));
+ /* set defaults into new instance config struct */
+ setDefaults(inst);
- /* copy existing socket_info across into new array, if any, and free old storage*/
- if(idx) {
- memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx);
- free(s_socketInfo);
+ /* add this to the config */
+ if(loadModConf->tail == NULL) {
+ loadModConf->tail = loadModConf->root = inst;
+ } else {
+ loadModConf->tail->next = inst;
+ loadModConf->tail = inst;
}
+ *pinst = inst;
+finalize_it:
+ RETiRet;
+}
- /* set the static to hold the new array */
- s_socketInfo = tmpSocketInfo;
-
- /* point to the new one */
- socket_info* sockInfo = &s_socketInfo[idx];
-
- /* set defaults for the new socket info */
- setDefaults(sockInfo);
-
- /* Make a writeable copy of the string so we can use strtok
- in the parseConfig call */
- char * copy = NULL;
- CHKmalloc(copy = strdup((char *) valp));
+static rsRetVal createListener(struct cnfparamvals* pvals) {
+ instanceConf_t* inst;
+ int i;
+ DEFiRet;
- /* parse the config string */
- CHKiRet(parseConfig(copy, sockInfo));
+ CHKiRet(createInstance(&inst));
+ for(i = 0 ; i < inppblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(inppblk.descr[i].name, "ruleset")) {
+ inst->pszBindRuleset = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "description")) {
+ inst->description = es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "sockType")){
+ inst->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if(!strcmp(inppblk.descr[i].name, "action")){
+ inst->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if(!strcmp(inppblk.descr[i].name, "sndHWM")) {
+ inst->sndHWM = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvHWM")) {
+ inst->rcvHWM = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "subscribe")) {
+ CHKiRet(parseSubscriptions(es_str2cstr(pvals[i].val.d.estr, NULL),
+ &inst->subscriptions));
+ } else if(!strcmp(inppblk.descr[i].name, "identity")){
+ inst->identity = es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "sndBuf")) {
+ inst->sndBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvBuf")) {
+ inst->rcvBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "linger")) {
+ inst->linger = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "backlog")) {
+ inst->backlog = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "sndTimeout")) {
+ inst->sndTimeout = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvTimeout")) {
+ inst->rcvTimeout = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "maxMsgSize")) {
+ inst->maxMsgSize = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rate")) {
+ inst->rate = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "recoveryIVL")) {
+ inst->recoveryIVL = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "multicastHops")) {
+ inst->multicastHops = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "reconnectIVL")) {
+ inst->reconnectIVL = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "reconnectIVLMax")) {
+ inst->reconnectIVLMax = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ipv4Only")) {
+ inst->ipv4Only = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "affinity")) {
+ inst->affinity = (int) pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: program error, non-handled "
+ "param '%s'\n", inppblk.descr[i].name);
+ }
- /* validate it */
- CHKiRet(validateConfig(sockInfo));
+ }
+finalize_it:
+ RETiRet;
+}
- /* bind to the current ruleset (if any)*/
- sockInfo->ruleset = s_ruleset;
+static rsRetVal addListener(instanceConf_t* inst){
+ /* create the socket */
+ void* sock;
+ struct lstn_s* newcnfinfo;
+ DEFiRet;
+
+ CHKiRet(createSocket(inst, &sock));
+
+ /* now create new lstn_s struct */
+ CHKmalloc(newcnfinfo=(struct lstn_s*)MALLOC(sizeof(struct lstn_s)));
+ newcnfinfo->next = NULL;
+ newcnfinfo->sock = sock;
+ newcnfinfo->pRuleset = inst->pBindRuleset;
+ /* add this struct to the global */
+ if(lcnfRoot == NULL) {
+ lcnfRoot = newcnfinfo;
+ }
+ if(lcnfLast == NULL) {
+ lcnfLast = newcnfinfo;
+ } else {
+ lcnfLast->next = newcnfinfo;
+ lcnfLast = newcnfinfo;
+ }
+
finalize_it:
- free(valp); /* in any case, this is no longer needed */
- RETiRet;
+ RETiRet;
}
-
static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) {
- msg_t* logmsg;
+ msg_t* pMsg;
poller_data* pollerData = (poller_data*)pd;
char* buf = zstr_recv(poller->socket);
- if (msgConstruct(&logmsg) == RS_RET_OK) {
- MsgSetRawMsg(logmsg, buf, strlen(buf));
- MsgSetInputName(logmsg, s_namep);
- MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY);
- MsgSetRuleset(logmsg, pollerData->ruleset);
- logmsg->msgFlags = NEEDS_PARSING;
- submitMsg(logmsg);
+ if (msgConstruct(&pMsg) == RS_RET_OK) {
+ MsgSetRawMsg(pMsg, buf, strlen(buf));
+ MsgSetInputName(pMsg, s_namep);
+ MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()));
+ MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp());
+ MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP());
+ MsgSetMSGoffs(pMsg, 0);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
+ MsgSetRuleset(pMsg, pollerData->ruleset);
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ submitMsg2(pMsg);
}
/* gotta free the string returned from zstr_recv() */
@@ -510,51 +580,65 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po
/* called when runInput is called by rsyslog
*/
static rsRetVal rcv_loop(thrdInfo_t* pThrd){
+ size_t n_items = 0;
size_t i;
int rv;
- zmq_pollitem_t* items;
- poller_data* pollerData;
-
+ zmq_pollitem_t* items = NULL;
+ poller_data* pollerData = NULL;
+ struct lstn_s* current;
+ instanceConf_t* inst;
DEFiRet;
-
- /* create the context*/
- CHKiRet(createContext());
+ /* now add listeners. This actually creates the sockets, etc... */
+ for (inst = runModConf->root; inst != NULL; inst=inst->next) {
+ addListener(inst);
+ }
+ if (lcnfRoot == NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: no listeners were "
+ "started, input not activated.\n");
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
+
+ /* count the # of items first */
+ for(current=lcnfRoot;current!=NULL;current=current->next)
+ n_items++;
+
+ /* make arrays of pollitems, pollerdata so they are easy to delete later */
+
/* create the poll items*/
- CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems));
+ CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*n_items));
/* create poller data (stuff to pass into the zmq closure called when we get a message)*/
- CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems));
+ CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*n_items));
/* loop through and initialize the poll items and poller_data arrays...*/
- for(i=0; i<s_nitems;++i) {
+ for(i=0, current = lcnfRoot; current != NULL; current = current->next, i++) {
/* create the socket, update items.*/
- createSocket(&s_socketInfo[i], &items[i].socket);
+ items[i].socket=current->sock;
items[i].events = ZMQ_POLLIN;
/* now update the poller_data for this item */
pollerData[i].thread = pThrd;
- pollerData[i].ruleset = s_socketInfo[i].ruleset;
+ pollerData[i].ruleset = current->pRuleset;
}
-
+
s_zloop = zloop_new();
- for(i=0; i<s_nitems; ++i) {
+ for(i=0; i<n_items; ++i) {
rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]);
if (rv) {
- errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i);
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu: %s", i, zmq_strerror(errno));
}
}
+ DBGPRINTF("imzmq3: zloop_poller starting...");
zloop_start(s_zloop);
zloop_destroy(&s_zloop);
- finalize_it:
- for(i=0; i< s_nitems; ++i) {
- zsocket_destroy(s_context, items[i].socket);
- }
-
+ DBGPRINTF("imzmq3: zloop_poller stopped.");
+finalize_it:
zctx_destroy(&s_context);
free(items);
+ free(pollerData);
RETiRet;
}
@@ -564,7 +648,8 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){
BEGINrunInput
CODESTARTrunInput
- iRet = rcv_loop(pThrd);
+ CHKiRet(rcv_loop(pThrd));
+finalize_it:
RETiRet;
ENDrunInput
@@ -572,17 +657,13 @@ ENDrunInput
/* initialize and return if will run or not */
BEGINwillRun
CODESTARTwillRun
- /* we need to create the inputName property (only once during our
+ /* we need to create the inputName property (only once during our
lifetime) */
- CHKiRet(prop.Construct(&s_namep));
- CHKiRet(prop.SetString(s_namep,
+ CHKiRet(prop.Construct(&s_namep));
+ CHKiRet(prop.SetString(s_namep,
UCHAR_CONSTANT("imzmq3"),
sizeof("imzmq3") - 1));
- CHKiRet(prop.ConstructFinalize(s_namep));
-
-/* If there are no endpoints this is pointless ...*/
- if (s_nitems == 0)
- ABORT_FINALIZE(RS_RET_NO_RUN);
+ CHKiRet(prop.ConstructFinalize(s_namep));
finalize_it:
ENDwillRun
@@ -590,70 +671,162 @@ ENDwillRun
BEGINafterRun
CODESTARTafterRun
- /* do cleanup here */
- if(s_namep != NULL)
- prop.Destruct(&s_namep);
+ /* do cleanup here */
+ if (s_namep != NULL)
+ prop.Destruct(&s_namep);
ENDafterRun
BEGINmodExit
CODESTARTmodExit
- /* release what we no longer need */
- objRelease(errmsg, CORE_COMPONENT);
- objRelease(glbl, CORE_COMPONENT);
- objRelease(prop, CORE_COMPONENT);
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
ENDmodExit
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
- if(eFeat == sFEATURENonCancelInputTermination)
- iRet = RS_RET_OK;
+ if (eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
ENDisCompatibleWithFeature
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ /* init module config */
+ loadModConf->io_threads = 0; /* 0 means don't set it */
+ENDbeginCnfLoad
+
+
+BEGINsetModCnf
+ struct cnfparamvals* pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if (NULL == pvals) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imzmq3: error processing module "
+ " config parameters ['module(...)']");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ for (i=0; i < modpblk.nParams; ++i) {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(modpblk.descr[i].name, "ioThreads")) {
+ loadModConf->io_threads = (int)pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "imzmq3: config error, unknown "
+ "param %s in setModCnf\n",
+ modpblk.descr[i].name);
+ }
+ }
+
+finalize_it:
+ if (pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ loadModConf = NULL; /* done loading, so it becomes NULL */
+ENDendCnfLoad
+
+
+/* function to generate error message if framework does not find requested ruleset */
+static inline void
+std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst)
+{
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: ruleset '%s' for socket %s not found - "
+ "using default ruleset instead", inst->pszBindRuleset,
+ inst->description);
+}
+
+
+BEGINcheckCnf
+instanceConf_t* inst;
+CODESTARTcheckCnf
+ for(inst = pModConf->root; inst!=NULL; inst=inst->next) {
+ std_checkRuleset(pModConf, inst);
+ /* now, validate the instanceConf */
+ CHKiRet(validateConfig(inst));
+ }
+finalize_it:
+ RETiRet;
+ENDcheckCnf
+
+
+BEGINactivateCnfPrePrivDrop
+CODESTARTactivateCnfPrePrivDrop
+ runModConf = pModConf;
+ /* first create the context */
+ createContext();
+
+ /* could setup context here, and set the global worker threads
+ and so on... */
+ENDactivateCnfPrePrivDrop
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ENDactivateCnf
+
+
+/*TODO: Fill this in! */
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINnewInpInst
+ struct cnfparamvals* pvals;
+CODESTARTnewInpInst
+
+ DBGPRINTF("newInpInst (imzmq3)\n");
+ pvals = nvlstGetParams(lst, &inppblk, NULL);
+ if(NULL==pvals) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS,
+ "imzmq3: required parameters are missing\n");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+ DBGPRINTF("imzmq3: input param blk:\n");
+ cnfparamsPrint(&inppblk, pvals);
+
+ /* now, parse the config params and so on... */
+ CHKiRet(createListener(pvals));
+
+finalize_it:
+CODE_STD_FINALIZERnewInpInst
+ cnfparamvalsDestruct(pvals, &inppblk);
+ENDnewInpInst
+
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
+CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
-static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp,
- void __attribute__((unused)) *pVal) {
- return RS_RET_OK;
-}
-static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) {
- errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val);
- s_io_threads = val;
- return RS_RET_OK;
-}
BEGINmodInit()
CODESTARTmodInit
/* we only support the current interface specification */
- *ipIFVersProvided = CURR_MOD_IF_VERSION;
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
CODEmodInit_QueryRegCFSLineHdlr
- CHKiRet(objUse(errmsg, CORE_COMPONENT));
- CHKiRet(objUse(glbl, CORE_COMPONENT));
- CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
-
- /* register config file handlers */
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset",
- 0, eCmdHdlrGetWord,
- set_ruleset, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun",
- 0, eCmdHdlrGetWord,
- add_endpoint, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables",
- 1, eCmdHdlrCustomHandler,
- resetConfigVariables, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads",
- 1, eCmdHdlrInt,
- setGlobalWorkerThreads, NULL,
- STD_LOADABLE_MODULE_ID));
ENDmodInit
+
+
diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README
index ccc96c74..c2a33555 100644
--- a/plugins/omzmq3/README
+++ b/plugins/omzmq3/README
@@ -1,16 +1,10 @@
ZeroMQ 3.x Output Plugin
Building this plugin:
-Requires libzmq and libczmq. First, install libzmq from the HEAD on github:
-http://github.com/zeromq/libzmq. You can clone the repository, build, then
-install it. The directions for doing so are there in the readme. Then, do
-the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1
-version of libzmq will be released, and a supporting version of libczmq.
-At that time, you could simply download and install the tarballs instead of
-using git to clone the repositories. Those tarballs (when available) can
-be found at http://download.zeromq.org. As of this writing (5/31/2012), the
-most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile
-properly.
+Requires libzmq and libczmq. First, download the tarballs of both libzmq
+and its supporting libczmq from http://download.zeromq.org. As of this
+writing (04/23/2013), the most recent versions of libzmq and czmq are
+3.2.2 and 1.3.2 respectively. Configure, build, and then install both libs.
Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example
below binds a PUB socket to port 7171, and any message fitting the criteria will
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c
index ee6756b9..c8552f11 100644
--- a/plugins/omzmq3/omzmq3.c
+++ b/plugins/omzmq3/omzmq3.c
@@ -110,9 +110,10 @@ static zctx_t* s_context = NULL;
static int s_workerThreads = -1;
static struct socket_type types[] = {
- {"PUB", ZMQ_PUB },
- {"PUSH", ZMQ_PUSH },
- {"XPUB", ZMQ_XPUB }
+ {"PUB", ZMQ_PUB },
+ {"PUSH", ZMQ_PUSH },
+ {"DEALER", ZMQ_DEALER },
+ {"XPUB", ZMQ_XPUB }
};
static struct socket_action actions[] = {
@@ -201,17 +202,18 @@ static rsRetVal initZMQ(instanceData* pData) {
/* create the context if necessary. */
if (NULL == s_context) {
+ zsys_handler_set(NULL);
s_context = zctx_new();
if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads);
}
pData->socket = zsocket_new(s_context, pData->type);
-
- /* ALWAYS set the HWM as the zmq3 default is 1000 and we default
- to 0 (infinity) */
- zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
- zsocket_set_sndhwm(pData->socket, pData->sndHWM);
-
+ if (NULL == pData->socket) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE,
+ "omzmq3: zsocket_new failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
+ }
/* use czmq defaults for these, unless set to non-default values */
if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity);
if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf);
@@ -228,17 +230,26 @@ static rsRetVal initZMQ(instanceData* pData) {
if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax);
if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only);
if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity);
-
+ if(pData->rcvHWM > -1) zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
+ if(pData->sndHWM > -1) zsocket_set_sndhwm(pData->socket, pData->sndHWM);
+
/* bind or connect to it */
if (pData->action == ACTION_BIND) {
/* bind asserts, so no need to test return val here
which isn't the greatest api -- oh well */
- zsocket_bind(pData->socket, (char*)pData->description);
+ if(-1 == zsocket_bind(pData->socket, (char*)pData->description)) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: bind failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
+ }
+ DBGPRINTF("omzmq3: bind to %s successful\n",pData->description);
} else {
- if(zsocket_connect(pData->socket, (char*)pData->description) == -1) {
- errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!");
- ABORT_FINALIZE(RS_RET_SUSPENDED);
+ if(-1 == zsocket_connect(pData->socket, (char*)pData->description)) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: connect failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
}
+ DBGPRINTF("omzmq3: connect to %s successful", pData->description);
}
finalize_it:
RETiRet;
@@ -256,7 +267,7 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
/* whine if things went wrong */
if (result == -1) {
- errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result);
+ errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed: %s", msg, zmq_strerror(errno));
ABORT_FINALIZE(RS_RET_ERR);
}
finalize_it:
@@ -265,13 +276,13 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
static inline void
setInstParamDefaults(instanceData* pData) {
- pData->description = (uchar*)"tcp://*:7171";
+ pData->description = NULL;
pData->socket = NULL;
pData->tplName = NULL;
pData->type = ZMQ_PUB;
pData->action = ACTION_BIND;
- pData->sndHWM = 0; /*unlimited*/
- pData->rcvHWM = 0; /*unlimited*/
+ pData->sndHWM = -1;
+ pData->rcvHWM = -1;
pData->identity = NULL;
pData->sndBuf = -1;
pData->rcvBuf = -1;
@@ -314,6 +325,7 @@ CODESTARTfreeInstance
closeZMQ(pData);
free(pData->description);
free(pData->tplName);
+ free(pData->identity);
ENDfreeInstance
BEGINtryResume
@@ -329,88 +341,90 @@ ENDdoAction
BEGINnewActInst
- struct cnfparamvals *pvals;
- int i;
+ struct cnfparamvals *pvals;
+ int i;
CODESTARTnewActInst
-if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
- ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
- }
+ if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
CODE_STD_STRING_REQUESTnewActInst(1)
-for(i = 0 ; i < actpblk.nParams ; ++i) {
- if(!pvals[i].bUsed)
- continue;
- if(!strcmp(actpblk.descr[i].name, "description")) {
- pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "template")) {
- pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "sockType")){
- pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
- } else if(!strcmp(actpblk.descr[i].name, "action")){
- pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
- } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) {
- pData->sndHWM = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) {
- pData->rcvHWM = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "identity")){
- pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) {
- pData->sndBuf = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) {
- pData->rcvBuf = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "linger")) {
- pData->linger = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "backlog")) {
- pData->backlog = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) {
- pData->sndTimeout = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
- pData->rcvTimeout = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
- pData->maxMsgSize = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rate")) {
- pData->rate = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
- pData->recoveryIVL = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) {
- pData->multicastHops = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
- pData->reconnectIVL = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
- pData->reconnectIVLMax = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) {
- pData->ipv4Only = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "affinity")) {
- pData->affinity = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
- s_workerThreads = (int) pvals[i].val.d.n;
- } else {
- errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
- "param '%s'\n", actpblk.descr[i].name);
+ for (i = 0; i < actpblk.nParams; ++i) {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(actpblk.descr[i].name, "description")) {
+ pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "sockType")){
+ pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if (!strcmp(actpblk.descr[i].name, "action")){
+ pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if (!strcmp(actpblk.descr[i].name, "sndHWM")) {
+ pData->sndHWM = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvHWM")) {
+ pData->rcvHWM = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "identity")){
+ pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "sndBuf")) {
+ pData->sndBuf = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvBuf")) {
+ pData->rcvBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "linger")) {
+ pData->linger = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "backlog")) {
+ pData->backlog = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "sndTimeout")) {
+ pData->sndTimeout = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
+ pData->rcvTimeout = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
+ pData->maxMsgSize = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rate")) {
+ pData->rate = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
+ pData->recoveryIVL = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "multicastHops")) {
+ pData->multicastHops = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
+ pData->reconnectIVL = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
+ pData->reconnectIVLMax = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "ipv4Only")) {
+ pData->ipv4Only = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "affinity")) {
+ pData->affinity = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
+ s_workerThreads = (int) pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
}
- }
-
-if(pData->tplName == NULL) {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
- } else {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
- }
-
-if(pData->type == -1) {
- errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type.");
- ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
- }
-if(pData->action == -1) {
- errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action");
- ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
- }
+ if (pData->tplName == NULL) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup("RSYSLOG_ForwardFormat"), OMSR_NO_RQD_TPL_OPTS));
+ } else {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
+ }
+ if (NULL == pData->description) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: you didn't enter a description");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ if (pData->type == -1) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type.");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ if (pData->action == -1) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
CODE_STD_FINALIZERnewActInst
- cnfparamvalsDestruct(pvals, &actpblk);
+ cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
BEGINparseSelectorAct
@@ -433,10 +447,10 @@ ENDinitConfVars
BEGINmodExit
CODESTARTmodExit
-if(NULL != s_context) {
- zctx_destroy(&s_context);
- s_context=NULL;
- }
+ if (NULL != s_context) {
+ zctx_destroy(&s_context);
+ s_context=NULL;
+ }
ENDmodExit