summaryrefslogtreecommitdiffstats
path: root/plugins/imzmq3/imzmq3.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imzmq3/imzmq3.c')
-rw-r--r--plugins/imzmq3/imzmq3.c781
1 files changed, 499 insertions, 282 deletions
diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c
index 52c12a53..08b1dbe4 100644
--- a/plugins/imzmq3/imzmq3.c
+++ b/plugins/imzmq3/imzmq3.c
@@ -19,20 +19,21 @@
* License along with this program. If not, see
* <http://www.gnu.org/licenses/>.
*
- * Author: David Kelly
- * <davidk@talksum.com>
+ * Authors:
+ * David Kelly <davidk@talksum.com>
+ * Hongfei Cheng <hongfeic@talksum.com>
*/
+
+#include "config.h"
+#include "rsyslog.h"
+
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-
-#include "rsyslog.h"
-
#include "cfsysline.h"
-#include "config.h"
#include "dirty.h"
#include "errmsg.h"
#include "glbl.h"
@@ -49,6 +50,7 @@
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("imzmq3");
/* convienent symbols to denote a socket we want to bind
* vs one we want to just connect to
@@ -83,47 +85,67 @@ typedef struct _poller_data {
thrdInfo_t* thread;
} poller_data;
-typedef struct _socket_info {
- int type;
- int action;
- char* description;
- int sndHWM; /* if you want more than 2^32 messages, */
- int rcvHWM; /* then pass in 0 (the default). */
- char* identity;
- char** subscriptions;
- ruleset_t* ruleset;
- int sndBuf;
- int rcvBuf;
- int linger;
- int backlog;
- int sndTimeout;
- int rcvTimeout;
- int maxMsgSize;
- int rate;
- int recoveryIVL;
- int multicastHops;
- int reconnectIVL;
- int reconnectIVLMax;
- int ipv4Only;
- int affinity;
-
-} socket_info;
+/* a linked-list of subscription topics */
+typedef struct sublist_t {
+ char* subscribe;
+ struct sublist_t* next;
+} sublist;
+
+struct instanceConf_s {
+ int type;
+ int action;
+ char* description;
+ int sndHWM; /* if you want more than 2^32 messages, */
+ int rcvHWM; /* then pass in 0 (the default). */
+ char* identity;
+ sublist* subscriptions;
+ int sndBuf;
+ int rcvBuf;
+ int linger;
+ int backlog;
+ int sndTimeout;
+ int rcvTimeout;
+ int maxMsgSize;
+ int rate;
+ int recoveryIVL;
+ int multicastHops;
+ int reconnectIVL;
+ int reconnectIVLMax;
+ int ipv4Only;
+ int affinity;
+ uchar* pszBindRuleset;
+ ruleset_t* pBindRuleset;
+ struct instanceConf_s* next;
+
+};
+
+struct modConfData_s {
+ rsconf_t* pConf;
+ instanceConf_t* root;
+ instanceConf_t* tail;
+ int io_threads;
+};
+struct lstn_s {
+ struct lstn_s* next;
+ void* sock;
+ ruleset_t* pRuleset;
+};
/* ----------------------------------------------------------------------------
* Static definitions/initializations.
*/
-static socket_info* s_socketInfo = NULL;
-static size_t s_nitems = 0;
-static prop_t * s_namep = NULL;
+static modConfData_t* runModConf = NULL;
+static struct lstn_s* lcnfRoot = NULL;
+static struct lstn_s* lcnfLast = NULL;
+static prop_t* s_namep = NULL;
static zloop_t* s_zloop = NULL;
-static int s_io_threads = 1;
static zctx_t* s_context = NULL;
-static ruleset_t* s_ruleset = NULL;
static socket_type socketTypes[] = {
- {"SUB", ZMQ_SUB },
- {"PULL", ZMQ_PULL },
- {"XSUB", ZMQ_XSUB }
+ {"SUB", ZMQ_SUB },
+ {"PULL", ZMQ_PULL },
+ {"ROUTER", ZMQ_ROUTER },
+ {"XSUB", ZMQ_XSUB }
};
static socket_action socketActions[] = {
@@ -131,6 +153,48 @@ static socket_action socketActions[] = {
{"CONNECT", ACTION_CONNECT},
};
+static struct cnfparamdescr modpdescr[] = {
+ { "ioThreads", eCmdHdlrInt, 0 },
+};
+
+static struct cnfparamblk modpblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+};
+
+static struct cnfparamdescr inppdescr[] = {
+ { "description", eCmdHdlrGetWord, 0 },
+ { "sockType", eCmdHdlrGetWord, 0 },
+ { "subscribe", eCmdHdlrGetWord, 0 },
+ { "ruleset", eCmdHdlrGetWord, 0 },
+ { "action", eCmdHdlrGetWord, 0 },
+ { "sndHWM", eCmdHdlrInt, 0 },
+ { "rcvHWM", eCmdHdlrInt, 0 },
+ { "identity", eCmdHdlrGetWord, 0 },
+ { "sndBuf", eCmdHdlrInt, 0 },
+ { "rcvBuf", eCmdHdlrInt, 0 },
+ { "linger", eCmdHdlrInt, 0 },
+ { "backlog", eCmdHdlrInt, 0 },
+ { "sndTimeout", eCmdHdlrInt, 0 },
+ { "rcvTimeout", eCmdHdlrInt, 0 },
+ { "maxMsgSize", eCmdHdlrInt, 0 },
+ { "rate", eCmdHdlrInt, 0 },
+ { "recoveryIVL", eCmdHdlrInt, 0 },
+ { "multicastHops", eCmdHdlrInt, 0 },
+ { "reconnectIVL", eCmdHdlrInt, 0 },
+ { "reconnectIVLMax", eCmdHdlrInt, 0 },
+ { "ipv4Only", eCmdHdlrInt, 0 },
+ { "affinity", eCmdHdlrInt, 0 }
+};
+
+static struct cnfparamblk inppblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(inppdescr)/sizeof(struct cnfparamdescr),
+ inppdescr
+};
+
+#include "im-helper.h" /* must be included AFTER the type definitions! */
/* ----------------------------------------------------------------------------
* Helper functions
@@ -179,15 +243,16 @@ static int getSocketAction(char* name) {
}
-static void setDefaults(socket_info* info) {
- info->type = ZMQ_SUB;
- info->action = ACTION_BIND;
+static void setDefaults(instanceConf_t* info) {
+ info->type = -1;
+ info->action = -1;
info->description = NULL;
- info->sndHWM = 0;
- info->rcvHWM = 0;
+ info->sndHWM = -1;
+ info->rcvHWM = -1;
info->identity = NULL;
info->subscriptions = NULL;
- info->ruleset = NULL;
+ info->pszBindRuleset = NULL;
+ info->pBindRuleset = NULL;
info->sndBuf = -1;
info->rcvBuf = -1;
info->linger = -1;
@@ -202,93 +267,49 @@ static void setDefaults(socket_info* info) {
info->reconnectIVLMax = -1;
info->ipv4Only = -1;
info->affinity = -1;
-
+ info->next = NULL;
};
-
-/* The config string should look like:
- * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'"
- *
+/* given a comma separated list of subscriptions, create a char* array of them
+ * to set later
*/
-static rsRetVal parseConfig(char* config, socket_info* info) {
- int nsubs = 0;
-
- char* binding;
- char* ptr1;
- for (binding = strtok_r(config, ",", &ptr1);
- binding != NULL;
- binding = strtok_r(NULL, ",", &ptr1)) {
-
- /* Each binding looks like foo=bar */
- char * sep = strchr(binding, '=');
- if (sep == NULL)
- {
- errmsg.LogError(0, NO_ERRCODE,
- "Invalid argument format %s, ignoring ...",
- binding);
- continue;
- }
+static rsRetVal parseSubscriptions(char* subscribes, sublist** subList){
+ char* tok = strtok(subscribes, ",");
+ sublist* currentSub;
+ sublist* head;
+ DEFiRet;
- /* Replace '=' with '\0'. */
- *sep = '\0';
-
- char * val = sep + 1;
-
- if (strcmp(binding, "action") == 0) {
- info->action = getSocketAction(val);
- } else if (strcmp(binding, "type") == 0) {
- info->type = getSocketType(val);
- } else if (strcmp(binding, "description") == 0) {
- info->description = strdup(val);
- } else if (strcmp(binding, "sndHWM") == 0) {
- info->sndHWM = atoi(val);
- } else if (strcmp(binding, "rcvHWM") == 0) {
- info->sndHWM = atoi(val);
- } else if (strcmp(binding, "subscribe") == 0) {
- /* Add the subscription value to the list.*/
- char * substr = NULL;
- substr = strdup(val);
- info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1);
- info->subscriptions[nsubs] = substr;
- ++nsubs;
- } else if (strcmp(binding, "sndBuf") == 0) {
- info->sndBuf = atoi(val);
- } else if (strcmp(binding, "rcvBuf") == 0) {
- info->rcvBuf = atoi(val);
- } else if (strcmp(binding, "linger") == 0) {
- info->linger = atoi(val);
- } else if (strcmp(binding, "backlog") == 0) {
- info->backlog = atoi(val);
- } else if (strcmp(binding, "sndTimeout") == 0) {
- info->sndTimeout = atoi(val);
- } else if (strcmp(binding, "rcvTimeout") == 0) {
- info->rcvTimeout = atoi(val);
- } else if (strcmp(binding, "maxMsgSize") == 0) {
- info->maxMsgSize = atoi(val);
- } else if (strcmp(binding, "rate") == 0) {
- info->rate = atoi(val);
- } else if (strcmp(binding, "recoveryIVL") == 0) {
- info->recoveryIVL = atoi(val);
- } else if (strcmp(binding, "multicastHops") == 0) {
- info->multicastHops = atoi(val);
- } else if (strcmp(binding, "reconnectIVL") == 0) {
- info->reconnectIVL = atoi(val);
- } else if (strcmp(binding, "reconnectIVLMax") == 0) {
- info->reconnectIVLMax = atoi(val);
- } else if (strcmp(binding, "ipv4Only") == 0) {
- info->ipv4Only = atoi(val);
- } else if (strcmp(binding, "affinity") == 0) {
- info->affinity = atoi(val);
- } else {
- errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
- return RS_RET_INVALID_PARAMS;
+ /* create empty list */
+ CHKmalloc(*subList = (sublist*)MALLOC(sizeof(sublist)));
+ head = *subList;
+ head->next = NULL;
+ head->subscribe=NULL;
+ currentSub=head;
+
+ if(tok) {
+ head->subscribe=strdup(tok);
+ for(tok=strtok(NULL, ","); tok!=NULL;tok=strtok(NULL, ",")) {
+ CHKmalloc(currentSub->next = (sublist*)MALLOC(sizeof(sublist)));
+ currentSub=currentSub->next;
+ currentSub->subscribe=strdup(tok);
+ currentSub->next=NULL;
}
+ } else {
+ /* make empty subscription ie subscribe="" */
+ head->subscribe=strdup("");
}
-
- return RS_RET_OK;
+ /* TODO: temporary logging */
+ currentSub = head;
+ DBGPRINTF("imzmq3: Subscriptions:");
+ for(currentSub = head; currentSub != NULL; currentSub=currentSub->next) {
+ DBGPRINTF("'%s'", currentSub->subscribe);
+ }
+ DBGPRINTF("\n");
+finalize_it:
+ RETiRet;
}
-static rsRetVal validateConfig(socket_info* info) {
+static rsRetVal validateConfig(instanceConf_t* info) {
if (info->type == -1) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
@@ -307,7 +328,7 @@ static rsRetVal validateConfig(socket_info* info) {
}
if(info->type == ZMQ_SUB && info->subscriptions == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
- "SUB sockets need at least one subscription");
+ "SUB sockets need a subscription");
return RS_RET_INVALID_PARAMS;
}
if(info->type != ZMQ_SUB && info->subscriptions != NULL) {
@@ -320,39 +341,40 @@ static rsRetVal validateConfig(socket_info* info) {
static rsRetVal createContext() {
if (s_context == NULL) {
- errmsg.LogError(0, NO_ERRCODE, "creating zctx.");
+ DBGPRINTF("imzmq3: creating zctx...");
+ zsys_handler_set(NULL);
s_context = zctx_new();
if (s_context == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"zctx_new failed: %s",
- strerror(errno));
+ zmq_strerror(errno));
/* DK: really should do better than invalid params...*/
return RS_RET_INVALID_PARAMS;
}
-
- if (s_io_threads > 1) {
- errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads);
- zctx_set_iothreads(s_context, s_io_threads);
+ DBGPRINTF("success!\n");
+ if (runModConf->io_threads > 1) {
+ DBGPRINTF("setting io worker threads to %d\n", runModConf->io_threads);
+ zctx_set_iothreads(s_context, runModConf->io_threads);
}
}
return RS_RET_OK;
}
-static rsRetVal createSocket(socket_info* info, void** sock) {
- size_t ii;
+static rsRetVal createSocket(instanceConf_t* info, void** sock) {
int rv;
+ sublist* sub;
*sock = zsocket_new(s_context, info->type);
if (!sock) {
- errmsg.LogError(0,
+ errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zsocket_new failed: %s, for type %d",
- strerror(errno),info->type);
- /* DK: invalid params seems right here */
+ zmq_strerror(errno),info->type);
+ /* DK: invalid params seems right here */
return RS_RET_INVALID_PARAMS;
}
-
+ DBGPRINTF("imzmq3: socket of type %d created successfully\n", info->type)
/* Set options *before* the connect/bind. */
if (info->identity) zsocket_set_identity(*sock, info->identity);
if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf);
@@ -369,38 +391,36 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax);
if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only);
if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity);
-
- /* since HWM have defaults, we always set them. No return codes to check, either.*/
- zsocket_set_sndhwm(*sock, info->sndHWM);
- zsocket_set_rcvhwm(*sock, info->rcvHWM);
-
+ if (info->sndHWM > -1 ) zsocket_set_sndhwm(*sock, info->sndHWM);
+ if (info->rcvHWM > -1 ) zsocket_set_rcvhwm(*sock, info->rcvHWM);
/* Set subscriptions.*/
if (info->type == ZMQ_SUB) {
- for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii)
- zsocket_set_subscribe(*sock, info->subscriptions[ii]);
+ for(sub = info->subscriptions; sub!=NULL; sub=sub->next) {
+ zsocket_set_subscribe(*sock, sub->subscribe);
+ }
}
-
-
/* Do the bind/connect... */
if (info->action==ACTION_CONNECT) {
rv = zsocket_connect(*sock, info->description);
- if (rv < 0) {
+ if (rv == -1) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zmq_connect using %s failed: %s",
- info->description, strerror(errno));
+ info->description, zmq_strerror(errno));
return RS_RET_INVALID_PARAMS;
}
+ DBGPRINTF("imzmq3: connect for %s successful\n",info->description);
} else {
rv = zsocket_bind(*sock, info->description);
- if (rv <= 0) {
+ if (rv == -1) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zmq_bind using %s failed: %s",
- info->description, strerror(errno));
+ info->description, zmq_strerror(errno));
return RS_RET_INVALID_PARAMS;
}
+ DBGPRINTF("imzmq3: bind for %s successful\n",info->description);
}
return RS_RET_OK;
}
@@ -409,89 +429,138 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
* Module endpoints
*/
-/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note
- * that this makes the assumption that after the bind ruleset is called in the config,
- * another call will be made to add an endpoint.
-*/
-static rsRetVal
-set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) {
- ruleset_t* ruleset_ptr;
- rsRetVal localRet;
- DEFiRet;
-
- localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName);
- if(localRet == RS_RET_NOT_FOUND) {
- errmsg.LogError(0, NO_ERRCODE, "error: "
- "ruleset '%s' not found - ignored", pszName);
- }
- CHKiRet(localRet);
- s_ruleset = ruleset_ptr;
- DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName);
-
-finalize_it:
- free(pszName); /* no longer needed */
- RETiRet;
-}
/* add an actual endpoint
*/
-static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) {
+static rsRetVal createInstance(instanceConf_t** pinst) {
DEFiRet;
+ instanceConf_t* inst;
+ CHKmalloc(inst = MALLOC(sizeof(instanceConf_t)));
- /* increment number of items and store old num items, as it will be handy.*/
- size_t idx = s_nitems++;
-
- /* allocate a new socket_info array to accomidate this new endpoint*/
- socket_info* tmpSocketInfo;
- CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems));
+ /* set defaults into new instance config struct */
+ setDefaults(inst);
- /* copy existing socket_info across into new array, if any, and free old storage*/
- if(idx) {
- memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx);
- free(s_socketInfo);
+ /* add this to the config */
+ if (runModConf->root == NULL || runModConf->tail == NULL) {
+ runModConf->tail = runModConf->root = inst;
+ } else {
+ runModConf->tail->next = inst;
+ runModConf->tail = inst;
}
+ *pinst = inst;
+finalize_it:
+ RETiRet;
+}
- /* set the static to hold the new array */
- s_socketInfo = tmpSocketInfo;
-
- /* point to the new one */
- socket_info* sockInfo = &s_socketInfo[idx];
-
- /* set defaults for the new socket info */
- setDefaults(sockInfo);
-
- /* Make a writeable copy of the string so we can use strtok
- in the parseConfig call */
- char * copy = NULL;
- CHKmalloc(copy = strdup((char *) valp));
+static rsRetVal createListener(struct cnfparamvals* pvals) {
+ instanceConf_t* inst;
+ int i;
+ DEFiRet;
- /* parse the config string */
- CHKiRet(parseConfig(copy, sockInfo));
+ CHKiRet(createInstance(&inst));
+ for(i = 0 ; i < inppblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(inppblk.descr[i].name, "ruleset")) {
+ inst->pszBindRuleset = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "description")) {
+ inst->description = es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "sockType")){
+ inst->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if(!strcmp(inppblk.descr[i].name, "action")){
+ inst->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if(!strcmp(inppblk.descr[i].name, "sndHWM")) {
+ inst->sndHWM = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvHWM")) {
+ inst->rcvHWM = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "subscribe")) {
+ CHKiRet(parseSubscriptions(es_str2cstr(pvals[i].val.d.estr, NULL),
+ &inst->subscriptions));
+ } else if(!strcmp(inppblk.descr[i].name, "identity")){
+ inst->identity = es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "sndBuf")) {
+ inst->sndBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvBuf")) {
+ inst->rcvBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "linger")) {
+ inst->linger = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "backlog")) {
+ inst->backlog = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "sndTimeout")) {
+ inst->sndTimeout = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rcvTimeout")) {
+ inst->rcvTimeout = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "maxMsgSize")) {
+ inst->maxMsgSize = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "rate")) {
+ inst->rate = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "recoveryIVL")) {
+ inst->recoveryIVL = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "multicastHops")) {
+ inst->multicastHops = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "reconnectIVL")) {
+ inst->reconnectIVL = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "reconnectIVLMax")) {
+ inst->reconnectIVLMax = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ipv4Only")) {
+ inst->ipv4Only = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "affinity")) {
+ inst->affinity = (int) pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: program error, non-handled "
+ "param '%s'\n", inppblk.descr[i].name);
+ }
- /* validate it */
- CHKiRet(validateConfig(sockInfo));
+ }
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal addListener(instanceConf_t* inst){
+ /* create the socket */
+ void* sock;
+ struct lstn_s* newcnfinfo;
+ DEFiRet;
- /* bind to the current ruleset (if any)*/
- sockInfo->ruleset = s_ruleset;
+ CHKiRet(createSocket(inst, &sock));
+
+ /* now create new lstn_s struct */
+ CHKmalloc(newcnfinfo=(struct lstn_s*)MALLOC(sizeof(struct lstn_s)));
+ newcnfinfo->next = NULL;
+ newcnfinfo->sock = sock;
+ newcnfinfo->pRuleset = inst->pBindRuleset;
+ /* add this struct to the global */
+ if(lcnfRoot == NULL) {
+ lcnfRoot = newcnfinfo;
+ }
+ if(lcnfLast == NULL) {
+ lcnfLast = newcnfinfo;
+ } else {
+ lcnfLast->next = newcnfinfo;
+ lcnfLast = newcnfinfo;
+ }
+
finalize_it:
- free(valp); /* in any case, this is no longer needed */
- RETiRet;
+ RETiRet;
}
-
static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) {
- msg_t* logmsg;
+ msg_t* pMsg;
poller_data* pollerData = (poller_data*)pd;
char* buf = zstr_recv(poller->socket);
- if (msgConstruct(&logmsg) == RS_RET_OK) {
- MsgSetRawMsg(logmsg, buf, strlen(buf));
- MsgSetInputName(logmsg, s_namep);
- MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY);
- MsgSetRuleset(logmsg, pollerData->ruleset);
- logmsg->msgFlags = NEEDS_PARSING;
- submitMsg(logmsg);
+ if (msgConstruct(&pMsg) == RS_RET_OK) {
+ MsgSetRawMsg(pMsg, buf, strlen(buf));
+ MsgSetInputName(pMsg, s_namep);
+ MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()));
+ MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp());
+ MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP());
+ MsgSetMSGoffs(pMsg, 0);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
+ MsgSetRuleset(pMsg, pollerData->ruleset);
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ submitMsg2(pMsg);
}
/* gotta free the string returned from zstr_recv() */
@@ -510,51 +579,65 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po
/* called when runInput is called by rsyslog
*/
static rsRetVal rcv_loop(thrdInfo_t* pThrd){
+ size_t n_items = 0;
size_t i;
int rv;
- zmq_pollitem_t* items;
- poller_data* pollerData;
-
+ zmq_pollitem_t* items = NULL;
+ poller_data* pollerData = NULL;
+ struct lstn_s* current;
+ instanceConf_t* inst;
DEFiRet;
-
- /* create the context*/
- CHKiRet(createContext());
+ /* now add listeners. This actually creates the sockets, etc... */
+ for (inst = runModConf->root; inst != NULL; inst=inst->next) {
+ addListener(inst);
+ }
+ if (lcnfRoot == NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: no listeners were "
+ "started, input not activated.\n");
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
+
+ /* count the # of items first */
+ for(current=lcnfRoot;current!=NULL;current=current->next)
+ n_items++;
+
+ /* make arrays of pollitems, pollerdata so they are easy to delete later */
+
/* create the poll items*/
- CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems));
+ CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*n_items));
/* create poller data (stuff to pass into the zmq closure called when we get a message)*/
- CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems));
+ CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*n_items));
/* loop through and initialize the poll items and poller_data arrays...*/
- for(i=0; i<s_nitems;++i) {
+ for(i=0, current = lcnfRoot; current != NULL; current = current->next, i++) {
/* create the socket, update items.*/
- createSocket(&s_socketInfo[i], &items[i].socket);
+ items[i].socket=current->sock;
items[i].events = ZMQ_POLLIN;
/* now update the poller_data for this item */
pollerData[i].thread = pThrd;
- pollerData[i].ruleset = s_socketInfo[i].ruleset;
+ pollerData[i].ruleset = current->pRuleset;
}
-
+
s_zloop = zloop_new();
- for(i=0; i<s_nitems; ++i) {
+ for(i=0; i<n_items; ++i) {
rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]);
if (rv) {
- errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i);
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu: %s", i, zmq_strerror(errno));
}
}
+ DBGPRINTF("imzmq3: zloop_poller starting...");
zloop_start(s_zloop);
zloop_destroy(&s_zloop);
- finalize_it:
- for(i=0; i< s_nitems; ++i) {
- zsocket_destroy(s_context, items[i].socket);
- }
-
+ DBGPRINTF("imzmq3: zloop_poller stopped.");
+finalize_it:
zctx_destroy(&s_context);
free(items);
+ free(pollerData);
RETiRet;
}
@@ -564,7 +647,8 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){
BEGINrunInput
CODESTARTrunInput
- iRet = rcv_loop(pThrd);
+ CHKiRet(rcv_loop(pThrd));
+finalize_it:
RETiRet;
ENDrunInput
@@ -572,17 +656,13 @@ ENDrunInput
/* initialize and return if will run or not */
BEGINwillRun
CODESTARTwillRun
- /* we need to create the inputName property (only once during our
+ /* we need to create the inputName property (only once during our
lifetime) */
- CHKiRet(prop.Construct(&s_namep));
- CHKiRet(prop.SetString(s_namep,
+ CHKiRet(prop.Construct(&s_namep));
+ CHKiRet(prop.SetString(s_namep,
UCHAR_CONSTANT("imzmq3"),
sizeof("imzmq3") - 1));
- CHKiRet(prop.ConstructFinalize(s_namep));
-
-/* If there are no endpoints this is pointless ...*/
- if (s_nitems == 0)
- ABORT_FINALIZE(RS_RET_NO_RUN);
+ CHKiRet(prop.ConstructFinalize(s_namep));
finalize_it:
ENDwillRun
@@ -590,70 +670,207 @@ ENDwillRun
BEGINafterRun
CODESTARTafterRun
- /* do cleanup here */
- if(s_namep != NULL)
- prop.Destruct(&s_namep);
+ /* do cleanup here */
+ if (s_namep != NULL)
+ prop.Destruct(&s_namep);
ENDafterRun
BEGINmodExit
CODESTARTmodExit
- /* release what we no longer need */
- objRelease(errmsg, CORE_COMPONENT);
- objRelease(glbl, CORE_COMPONENT);
- objRelease(prop, CORE_COMPONENT);
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
ENDmodExit
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
- if(eFeat == sFEATURENonCancelInputTermination)
- iRet = RS_RET_OK;
+ if (eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
ENDisCompatibleWithFeature
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ /* After endCnfLoad() (BEGINendCnfLoad...ENDendCnfLoad) is called,
+ * the pModConf pointer must not be used to change the in-memory
+ * config object. It's safe to use the same pointer for accessing
+ * the config object until freeCnf() (BEGINfreeCnf...ENDfreeCnf). */
+ runModConf = pModConf;
+ runModConf->pConf = pConf;
+ /* init module config */
+ runModConf->io_threads = 0; /* 0 means don't set it */
+ENDbeginCnfLoad
+
+
+BEGINsetModCnf
+ struct cnfparamvals* pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if (NULL == pvals) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imzmq3: error processing module "
+ " config parameters ['module(...)']");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ for (i=0; i < modpblk.nParams; ++i) {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(modpblk.descr[i].name, "ioThreads")) {
+ runModConf->io_threads = (int)pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "imzmq3: config error, unknown "
+ "param %s in setModCnf\n",
+ modpblk.descr[i].name);
+ }
+ }
+
+finalize_it:
+ if (pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ /* Last chance to make changes to the in-memory config object for this
+ * input module. After this call, the config object must no longer be
+ * changed. */
+ if (pModConf != runModConf) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has "
+ "changed - pModConf=%p, runModConf=%p", pModConf, runModConf);
+ }
+ assert(pModConf == runModConf);
+ENDendCnfLoad
+
+
+/* function to generate error message if framework does not find requested ruleset */
+static inline void
+std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst)
+{
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: ruleset '%s' for socket %s not found - "
+ "using default ruleset instead", inst->pszBindRuleset,
+ inst->description);
+}
+
+
+BEGINcheckCnf
+instanceConf_t* inst;
+CODESTARTcheckCnf
+ for(inst = pModConf->root; inst!=NULL; inst=inst->next) {
+ std_checkRuleset(pModConf, inst);
+ /* now, validate the instanceConf */
+ CHKiRet(validateConfig(inst));
+ }
+finalize_it:
+ RETiRet;
+ENDcheckCnf
+
+
+BEGINactivateCnfPrePrivDrop
+CODESTARTactivateCnfPrePrivDrop
+ if (pModConf != runModConf) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has "
+ "changed - pModConf=%p, runModConf=%p", pModConf, runModConf);
+ }
+ assert(pModConf == runModConf);
+
+ /* first create the context */
+ createContext();
+
+ /* could setup context here, and set the global worker threads
+ and so on... */
+ENDactivateCnfPrePrivDrop
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ if (pModConf != runModConf) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has "
+ "changed - pModConf=%p, runModConf=%p", pModConf, runModConf);
+ }
+ assert(pModConf == runModConf);
+ENDactivateCnf
+
+
+BEGINfreeCnf
+ struct lstn_s *lstn, *lstn_r;
+ instanceConf_t *inst, *inst_r;
+ sublist *sub, *sub_r;
+CODESTARTfreeCnf
+ DBGPRINTF("imzmq3: BEGINfreeCnf ...\n");
+ if (pModConf != runModConf) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has "
+ "changed - pModConf=%p, runModConf=%p", pModConf, runModConf);
+ }
+ for (lstn = lcnfRoot; lstn != NULL; ) {
+ lstn_r = lstn;
+ lstn = lstn_r->next;
+ free(lstn_r);
+ }
+ for (inst = pModConf->root ; inst != NULL ; ) {
+ for (sub = inst->subscriptions; sub != NULL; ) {
+ free(sub->subscribe);
+ sub_r = sub;
+ sub = sub_r->next;
+ free(sub_r);
+ }
+ free(inst->pszBindRuleset);
+ inst_r = inst;
+ inst = inst->next;
+ free(inst_r);
+ }
+ENDfreeCnf
+
+
+BEGINnewInpInst
+ struct cnfparamvals* pvals;
+CODESTARTnewInpInst
+
+ DBGPRINTF("newInpInst (imzmq3)\n");
+ pvals = nvlstGetParams(lst, &inppblk, NULL);
+ if(NULL==pvals) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS,
+ "imzmq3: required parameters are missing\n");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+ DBGPRINTF("imzmq3: input param blk:\n");
+ cnfparamsPrint(&inppblk, pvals);
+
+ /* now, parse the config params and so on... */
+ CHKiRet(createListener(pvals));
+
+finalize_it:
+CODE_STD_FINALIZERnewInpInst
+ cnfparamvalsDestruct(pvals, &inppblk);
+ENDnewInpInst
+
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
+CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
-static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp,
- void __attribute__((unused)) *pVal) {
- return RS_RET_OK;
-}
-static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) {
- errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val);
- s_io_threads = val;
- return RS_RET_OK;
-}
BEGINmodInit()
CODESTARTmodInit
/* we only support the current interface specification */
- *ipIFVersProvided = CURR_MOD_IF_VERSION;
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
CODEmodInit_QueryRegCFSLineHdlr
- CHKiRet(objUse(errmsg, CORE_COMPONENT));
- CHKiRet(objUse(glbl, CORE_COMPONENT));
- CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
-
- /* register config file handlers */
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset",
- 0, eCmdHdlrGetWord,
- set_ruleset, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun",
- 0, eCmdHdlrGetWord,
- add_endpoint, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables",
- 1, eCmdHdlrCustomHandler,
- resetConfigVariables, NULL,
- STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads",
- 1, eCmdHdlrInt,
- setGlobalWorkerThreads, NULL,
- STD_LOADABLE_MODULE_ID));
ENDmodInit
+
+