diff options
Diffstat (limited to 'plugins/imzmq3/imzmq3.c')
-rw-r--r-- | plugins/imzmq3/imzmq3.c | 783 |
1 files changed, 501 insertions, 282 deletions
diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c index dc1d64d3..08b1dbe4 100644 --- a/plugins/imzmq3/imzmq3.c +++ b/plugins/imzmq3/imzmq3.c @@ -19,20 +19,21 @@ * License along with this program. If not, see * <http://www.gnu.org/licenses/>. * - * Author: David Kelly - * <davidk@talksum.com> + * Authors: + * David Kelly <davidk@talksum.com> + * Hongfei Cheng <hongfeic@talksum.com> */ + +#include "config.h" +#include "rsyslog.h" + #include <assert.h> #include <errno.h> #include <stdlib.h> #include <string.h> #include <unistd.h> - -#include "rsyslog.h" - #include "cfsysline.h" -#include "config.h" #include "dirty.h" #include "errmsg.h" #include "glbl.h" @@ -49,6 +50,7 @@ MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("imzmq3"); /* convienent symbols to denote a socket we want to bind * vs one we want to just connect to @@ -83,47 +85,67 @@ typedef struct _poller_data { thrdInfo_t* thread; } poller_data; -typedef struct _socket_info { - int type; - int action; - char* description; - int sndHWM; /* if you want more than 2^32 messages, */ - int rcvHWM; /* then pass in 0 (the default). */ - char* identity; - char** subscriptions; - ruleset_t* ruleset; - int sndBuf; - int rcvBuf; - int linger; - int backlog; - int sndTimeout; - int rcvTimeout; - int maxMsgSize; - int rate; - int recoveryIVL; - int multicastHops; - int reconnectIVL; - int reconnectIVLMax; - int ipv4Only; - int affinity; - -} socket_info; +/* a linked-list of subscription topics */ +typedef struct sublist_t { + char* subscribe; + struct sublist_t* next; +} sublist; + +struct instanceConf_s { + int type; + int action; + char* description; + int sndHWM; /* if you want more than 2^32 messages, */ + int rcvHWM; /* then pass in 0 (the default). */ + char* identity; + sublist* subscriptions; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + uchar* pszBindRuleset; + ruleset_t* pBindRuleset; + struct instanceConf_s* next; + +}; + +struct modConfData_s { + rsconf_t* pConf; + instanceConf_t* root; + instanceConf_t* tail; + int io_threads; +}; +struct lstn_s { + struct lstn_s* next; + void* sock; + ruleset_t* pRuleset; +}; /* ---------------------------------------------------------------------------- * Static definitions/initializations. */ -static socket_info* s_socketInfo = NULL; -static size_t s_nitems = 0; -static prop_t * s_namep = NULL; +static modConfData_t* runModConf = NULL; +static struct lstn_s* lcnfRoot = NULL; +static struct lstn_s* lcnfLast = NULL; +static prop_t* s_namep = NULL; static zloop_t* s_zloop = NULL; -static int s_io_threads = 1; static zctx_t* s_context = NULL; -static ruleset_t* s_ruleset = NULL; static socket_type socketTypes[] = { - {"SUB", ZMQ_SUB }, - {"PULL", ZMQ_PULL }, - {"XSUB", ZMQ_XSUB } + {"SUB", ZMQ_SUB }, + {"PULL", ZMQ_PULL }, + {"ROUTER", ZMQ_ROUTER }, + {"XSUB", ZMQ_XSUB } }; static socket_action socketActions[] = { @@ -131,6 +153,48 @@ static socket_action socketActions[] = { {"CONNECT", ACTION_CONNECT}, }; +static struct cnfparamdescr modpdescr[] = { + { "ioThreads", eCmdHdlrInt, 0 }, +}; + +static struct cnfparamblk modpblk = { + CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr +}; + +static struct cnfparamdescr inppdescr[] = { + { "description", eCmdHdlrGetWord, 0 }, + { "sockType", eCmdHdlrGetWord, 0 }, + { "subscribe", eCmdHdlrGetWord, 0 }, + { "ruleset", eCmdHdlrGetWord, 0 }, + { "action", eCmdHdlrGetWord, 0 }, + { "sndHWM", eCmdHdlrInt, 0 }, + { "rcvHWM", eCmdHdlrInt, 0 }, + { "identity", eCmdHdlrGetWord, 0 }, + { "sndBuf", eCmdHdlrInt, 0 }, + { "rcvBuf", eCmdHdlrInt, 0 }, + { "linger", eCmdHdlrInt, 0 }, + { "backlog", eCmdHdlrInt, 0 }, + { "sndTimeout", eCmdHdlrInt, 0 }, + { "rcvTimeout", eCmdHdlrInt, 0 }, + { "maxMsgSize", eCmdHdlrInt, 0 }, + { "rate", eCmdHdlrInt, 0 }, + { "recoveryIVL", eCmdHdlrInt, 0 }, + { "multicastHops", eCmdHdlrInt, 0 }, + { "reconnectIVL", eCmdHdlrInt, 0 }, + { "reconnectIVLMax", eCmdHdlrInt, 0 }, + { "ipv4Only", eCmdHdlrInt, 0 }, + { "affinity", eCmdHdlrInt, 0 } +}; + +static struct cnfparamblk inppblk = { + CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr +}; + +#include "im-helper.h" /* must be included AFTER the type definitions! */ /* ---------------------------------------------------------------------------- * Helper functions @@ -179,15 +243,16 @@ static int getSocketAction(char* name) { } -static void setDefaults(socket_info* info) { - info->type = ZMQ_SUB; - info->action = ACTION_BIND; +static void setDefaults(instanceConf_t* info) { + info->type = -1; + info->action = -1; info->description = NULL; - info->sndHWM = 0; - info->rcvHWM = 0; + info->sndHWM = -1; + info->rcvHWM = -1; info->identity = NULL; info->subscriptions = NULL; - info->ruleset = NULL; + info->pszBindRuleset = NULL; + info->pBindRuleset = NULL; info->sndBuf = -1; info->rcvBuf = -1; info->linger = -1; @@ -202,93 +267,49 @@ static void setDefaults(socket_info* info) { info->reconnectIVLMax = -1; info->ipv4Only = -1; info->affinity = -1; - + info->next = NULL; }; - -/* The config string should look like: - * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'" - * +/* given a comma separated list of subscriptions, create a char* array of them + * to set later */ -static rsRetVal parseConfig(char* config, socket_info* info) { - int nsubs = 0; - - char* binding; - char* ptr1; - for (binding = strtok_r(config, ",", &ptr1); - binding != NULL; - binding = strtok_r(NULL, ",", &ptr1)) { - - /* Each binding looks like foo=bar */ - char * sep = strchr(binding, '='); - if (sep == NULL) - { - errmsg.LogError(0, NO_ERRCODE, - "Invalid argument format %s, ignoring ...", - binding); - continue; - } +static rsRetVal parseSubscriptions(char* subscribes, sublist** subList){ + char* tok = strtok(subscribes, ","); + sublist* currentSub; + sublist* head; + DEFiRet; - /* Replace '=' with '\0'. */ - *sep = '\0'; - - char * val = sep + 1; - - if (strcmp(binding, "action") == 0) { - info->action = getSocketAction(val); - } else if (strcmp(binding, "type") == 0) { - info->type = getSocketType(val); - } else if (strcmp(binding, "description") == 0) { - info->description = strdup(val); - } else if (strcmp(binding, "sndHWM") == 0) { - info->sndHWM = atoi(val); - } else if (strcmp(binding, "rcvHWM") == 0) { - info->sndHWM = atoi(val); - } else if (strcmp(binding, "subscribe") == 0) { - /* Add the subscription value to the list.*/ - char * substr = NULL; - substr = strdup(val); - info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); - info->subscriptions[nsubs] = substr; - ++nsubs; - } else if (strcmp(binding, "sndBuf") == 0) { - info->sndBuf = atoi(val); - } else if (strcmp(binding, "rcvBuf") == 0) { - info->rcvBuf = atoi(val); - } else if (strcmp(binding, "linger") == 0) { - info->linger = atoi(val); - } else if (strcmp(binding, "backlog") == 0) { - info->backlog = atoi(val); - } else if (strcmp(binding, "sndTimeout") == 0) { - info->sndTimeout = atoi(val); - } else if (strcmp(binding, "rcvTimeout") == 0) { - info->rcvTimeout = atoi(val); - } else if (strcmp(binding, "maxMsgSize") == 0) { - info->maxMsgSize = atoi(val); - } else if (strcmp(binding, "rate") == 0) { - info->rate = atoi(val); - } else if (strcmp(binding, "recoveryIVL") == 0) { - info->recoveryIVL = atoi(val); - } else if (strcmp(binding, "multicastHops") == 0) { - info->multicastHops = atoi(val); - } else if (strcmp(binding, "reconnectIVL") == 0) { - info->reconnectIVL = atoi(val); - } else if (strcmp(binding, "reconnectIVLMax") == 0) { - info->reconnectIVLMax = atoi(val); - } else if (strcmp(binding, "ipv4Only") == 0) { - info->ipv4Only = atoi(val); - } else if (strcmp(binding, "affinity") == 0) { - info->affinity = atoi(val); - } else { - errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding); - return RS_RET_INVALID_PARAMS; + /* create empty list */ + CHKmalloc(*subList = (sublist*)MALLOC(sizeof(sublist))); + head = *subList; + head->next = NULL; + head->subscribe=NULL; + currentSub=head; + + if(tok) { + head->subscribe=strdup(tok); + for(tok=strtok(NULL, ","); tok!=NULL;tok=strtok(NULL, ",")) { + CHKmalloc(currentSub->next = (sublist*)MALLOC(sizeof(sublist))); + currentSub=currentSub->next; + currentSub->subscribe=strdup(tok); + currentSub->next=NULL; } + } else { + /* make empty subscription ie subscribe="" */ + head->subscribe=strdup(""); } - - return RS_RET_OK; + /* TODO: temporary logging */ + currentSub = head; + DBGPRINTF("imzmq3: Subscriptions:"); + for(currentSub = head; currentSub != NULL; currentSub=currentSub->next) { + DBGPRINTF("'%s'", currentSub->subscribe); + } + DBGPRINTF("\n"); +finalize_it: + RETiRet; } -static rsRetVal validateConfig(socket_info* info) { +static rsRetVal validateConfig(instanceConf_t* info) { if (info->type == -1) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, @@ -307,7 +328,7 @@ static rsRetVal validateConfig(socket_info* info) { } if(info->type == ZMQ_SUB && info->subscriptions == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, - "SUB sockets need at least one subscription"); + "SUB sockets need a subscription"); return RS_RET_INVALID_PARAMS; } if(info->type != ZMQ_SUB && info->subscriptions != NULL) { @@ -320,39 +341,40 @@ static rsRetVal validateConfig(socket_info* info) { static rsRetVal createContext() { if (s_context == NULL) { - errmsg.LogError(0, NO_ERRCODE, "creating zctx."); + DBGPRINTF("imzmq3: creating zctx..."); + zsys_handler_set(NULL); s_context = zctx_new(); if (s_context == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zctx_new failed: %s", - strerror(errno)); + zmq_strerror(errno)); /* DK: really should do better than invalid params...*/ return RS_RET_INVALID_PARAMS; } - - if (s_io_threads > 1) { - errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads); - zctx_set_iothreads(s_context, s_io_threads); + DBGPRINTF("success!\n"); + if (runModConf->io_threads > 1) { + DBGPRINTF("setting io worker threads to %d\n", runModConf->io_threads); + zctx_set_iothreads(s_context, runModConf->io_threads); } } return RS_RET_OK; } -static rsRetVal createSocket(socket_info* info, void** sock) { - size_t ii; +static rsRetVal createSocket(instanceConf_t* info, void** sock) { int rv; + sublist* sub; *sock = zsocket_new(s_context, info->type); if (!sock) { - errmsg.LogError(0, + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zsocket_new failed: %s, for type %d", - strerror(errno),info->type); - /* DK: invalid params seems right here */ + zmq_strerror(errno),info->type); + /* DK: invalid params seems right here */ return RS_RET_INVALID_PARAMS; } - + DBGPRINTF("imzmq3: socket of type %d created successfully\n", info->type) /* Set options *before* the connect/bind. */ if (info->identity) zsocket_set_identity(*sock, info->identity); if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); @@ -369,36 +391,36 @@ static rsRetVal createSocket(socket_info* info, void** sock) { if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax); if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); - - /* since HWM have defaults, we always set them. No return codes to check, either.*/ - zsocket_set_sndhwm(*sock, info->sndHWM); - zsocket_set_rcvhwm(*sock, info->rcvHWM); - + if (info->sndHWM > -1 ) zsocket_set_sndhwm(*sock, info->sndHWM); + if (info->rcvHWM > -1 ) zsocket_set_rcvhwm(*sock, info->rcvHWM); /* Set subscriptions.*/ - for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) - zsocket_set_subscribe(*sock, info->subscriptions[ii]); - - + if (info->type == ZMQ_SUB) { + for(sub = info->subscriptions; sub!=NULL; sub=sub->next) { + zsocket_set_subscribe(*sock, sub->subscribe); + } + } /* Do the bind/connect... */ if (info->action==ACTION_CONNECT) { rv = zsocket_connect(*sock, info->description); - if (rv < 0) { + if (rv == -1) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zmq_connect using %s failed: %s", - info->description, strerror(errno)); + info->description, zmq_strerror(errno)); return RS_RET_INVALID_PARAMS; } + DBGPRINTF("imzmq3: connect for %s successful\n",info->description); } else { rv = zsocket_bind(*sock, info->description); - if (rv <= 0) { + if (rv == -1) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zmq_bind using %s failed: %s", - info->description, strerror(errno)); + info->description, zmq_strerror(errno)); return RS_RET_INVALID_PARAMS; } + DBGPRINTF("imzmq3: bind for %s successful\n",info->description); } return RS_RET_OK; } @@ -407,89 +429,138 @@ static rsRetVal createSocket(socket_info* info, void** sock) { * Module endpoints */ -/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note - * that this makes the assumption that after the bind ruleset is called in the config, - * another call will be made to add an endpoint. -*/ -static rsRetVal -set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) { - ruleset_t* ruleset_ptr; - rsRetVal localRet; - DEFiRet; - - localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName); - if(localRet == RS_RET_NOT_FOUND) { - errmsg.LogError(0, NO_ERRCODE, "error: " - "ruleset '%s' not found - ignored", pszName); - } - CHKiRet(localRet); - s_ruleset = ruleset_ptr; - DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName); - -finalize_it: - free(pszName); /* no longer needed */ - RETiRet; -} /* add an actual endpoint */ -static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { +static rsRetVal createInstance(instanceConf_t** pinst) { DEFiRet; + instanceConf_t* inst; + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); - /* increment number of items and store old num items, as it will be handy.*/ - size_t idx = s_nitems++; - - /* allocate a new socket_info array to accomidate this new endpoint*/ - socket_info* tmpSocketInfo; - CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); + /* set defaults into new instance config struct */ + setDefaults(inst); - /* copy existing socket_info across into new array, if any, and free old storage*/ - if(idx) { - memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); - free(s_socketInfo); + /* add this to the config */ + if (runModConf->root == NULL || runModConf->tail == NULL) { + runModConf->tail = runModConf->root = inst; + } else { + runModConf->tail->next = inst; + runModConf->tail = inst; } + *pinst = inst; +finalize_it: + RETiRet; +} - /* set the static to hold the new array */ - s_socketInfo = tmpSocketInfo; - - /* point to the new one */ - socket_info* sockInfo = &s_socketInfo[idx]; - - /* set defaults for the new socket info */ - setDefaults(sockInfo); - - /* Make a writeable copy of the string so we can use strtok - in the parseConfig call */ - char * copy = NULL; - CHKmalloc(copy = strdup((char *) valp)); +static rsRetVal createListener(struct cnfparamvals* pvals) { + instanceConf_t* inst; + int i; + DEFiRet; - /* parse the config string */ - CHKiRet(parseConfig(copy, sockInfo)); + CHKiRet(createInstance(&inst)); + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "ruleset")) { + inst->pszBindRuleset = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "description")) { + inst->description = es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "sockType")){ + inst->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(inppblk.descr[i].name, "action")){ + inst->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(inppblk.descr[i].name, "sndHWM")) { + inst->sndHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rcvHWM")) { + inst->rcvHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "subscribe")) { + CHKiRet(parseSubscriptions(es_str2cstr(pvals[i].val.d.estr, NULL), + &inst->subscriptions)); + } else if(!strcmp(inppblk.descr[i].name, "identity")){ + inst->identity = es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "sndBuf")) { + inst->sndBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rcvBuf")) { + inst->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "linger")) { + inst->linger = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "backlog")) { + inst->backlog = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "sndTimeout")) { + inst->sndTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rcvTimeout")) { + inst->rcvTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "maxMsgSize")) { + inst->maxMsgSize = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "rate")) { + inst->rate = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "recoveryIVL")) { + inst->recoveryIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "multicastHops")) { + inst->multicastHops = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "reconnectIVL")) { + inst->reconnectIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "reconnectIVLMax")) { + inst->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ipv4Only")) { + inst->ipv4Only = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "affinity")) { + inst->affinity = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } - /* validate it */ - CHKiRet(validateConfig(sockInfo)); + } +finalize_it: + RETiRet; +} + +static rsRetVal addListener(instanceConf_t* inst){ + /* create the socket */ + void* sock; + struct lstn_s* newcnfinfo; + DEFiRet; - /* bind to the current ruleset (if any)*/ - sockInfo->ruleset = s_ruleset; + CHKiRet(createSocket(inst, &sock)); + + /* now create new lstn_s struct */ + CHKmalloc(newcnfinfo=(struct lstn_s*)MALLOC(sizeof(struct lstn_s))); + newcnfinfo->next = NULL; + newcnfinfo->sock = sock; + newcnfinfo->pRuleset = inst->pBindRuleset; + /* add this struct to the global */ + if(lcnfRoot == NULL) { + lcnfRoot = newcnfinfo; + } + if(lcnfLast == NULL) { + lcnfLast = newcnfinfo; + } else { + lcnfLast->next = newcnfinfo; + lcnfLast = newcnfinfo; + } + finalize_it: - free(valp); /* in any case, this is no longer needed */ - RETiRet; + RETiRet; } - static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) { - msg_t* logmsg; + msg_t* pMsg; poller_data* pollerData = (poller_data*)pd; char* buf = zstr_recv(poller->socket); - if (msgConstruct(&logmsg) == RS_RET_OK) { - MsgSetRawMsg(logmsg, buf, strlen(buf)); - MsgSetInputName(logmsg, s_namep); - MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY); - MsgSetRuleset(logmsg, pollerData->ruleset); - logmsg->msgFlags = NEEDS_PARSING; - submitMsg(logmsg); + if (msgConstruct(&pMsg) == RS_RET_OK) { + MsgSetRawMsg(pMsg, buf, strlen(buf)); + MsgSetInputName(pMsg, s_namep); + MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); + MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp()); + MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP()); + MsgSetMSGoffs(pMsg, 0); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(pMsg, pollerData->ruleset); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + submitMsg2(pMsg); } /* gotta free the string returned from zstr_recv() */ @@ -508,51 +579,65 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po /* called when runInput is called by rsyslog */ static rsRetVal rcv_loop(thrdInfo_t* pThrd){ + size_t n_items = 0; size_t i; int rv; - zmq_pollitem_t* items; - poller_data* pollerData; - + zmq_pollitem_t* items = NULL; + poller_data* pollerData = NULL; + struct lstn_s* current; + instanceConf_t* inst; DEFiRet; - - /* create the context*/ - CHKiRet(createContext()); + /* now add listeners. This actually creates the sockets, etc... */ + for (inst = runModConf->root; inst != NULL; inst=inst->next) { + addListener(inst); + } + if (lcnfRoot == NULL) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: no listeners were " + "started, input not activated.\n"); + ABORT_FINALIZE(RS_RET_NO_RUN); + } + + /* count the # of items first */ + for(current=lcnfRoot;current!=NULL;current=current->next) + n_items++; + + /* make arrays of pollitems, pollerdata so they are easy to delete later */ + /* create the poll items*/ - CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); + CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*n_items)); /* create poller data (stuff to pass into the zmq closure called when we get a message)*/ - CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); + CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*n_items)); /* loop through and initialize the poll items and poller_data arrays...*/ - for(i=0; i<s_nitems;++i) { + for(i=0, current = lcnfRoot; current != NULL; current = current->next, i++) { /* create the socket, update items.*/ - createSocket(&s_socketInfo[i], &items[i].socket); + items[i].socket=current->sock; items[i].events = ZMQ_POLLIN; /* now update the poller_data for this item */ pollerData[i].thread = pThrd; - pollerData[i].ruleset = s_socketInfo[i].ruleset; + pollerData[i].ruleset = current->pRuleset; } - + s_zloop = zloop_new(); - for(i=0; i<s_nitems; ++i) { + for(i=0; i<n_items; ++i) { rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]); if (rv) { - errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i); + errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu: %s", i, zmq_strerror(errno)); } } + DBGPRINTF("imzmq3: zloop_poller starting..."); zloop_start(s_zloop); zloop_destroy(&s_zloop); - finalize_it: - for(i=0; i< s_nitems; ++i) { - zsocket_destroy(s_context, items[i].socket); - } - + DBGPRINTF("imzmq3: zloop_poller stopped."); +finalize_it: zctx_destroy(&s_context); free(items); + free(pollerData); RETiRet; } @@ -562,7 +647,8 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){ BEGINrunInput CODESTARTrunInput - iRet = rcv_loop(pThrd); + CHKiRet(rcv_loop(pThrd)); +finalize_it: RETiRet; ENDrunInput @@ -570,17 +656,13 @@ ENDrunInput /* initialize and return if will run or not */ BEGINwillRun CODESTARTwillRun - /* we need to create the inputName property (only once during our + /* we need to create the inputName property (only once during our lifetime) */ - CHKiRet(prop.Construct(&s_namep)); - CHKiRet(prop.SetString(s_namep, + CHKiRet(prop.Construct(&s_namep)); + CHKiRet(prop.SetString(s_namep, UCHAR_CONSTANT("imzmq3"), sizeof("imzmq3") - 1)); - CHKiRet(prop.ConstructFinalize(s_namep)); - -/* If there are no endpoints this is pointless ...*/ - if (s_nitems == 0) - ABORT_FINALIZE(RS_RET_NO_RUN); + CHKiRet(prop.ConstructFinalize(s_namep)); finalize_it: ENDwillRun @@ -588,70 +670,207 @@ ENDwillRun BEGINafterRun CODESTARTafterRun - /* do cleanup here */ - if(s_namep != NULL) - prop.Destruct(&s_namep); + /* do cleanup here */ + if (s_namep != NULL) + prop.Destruct(&s_namep); ENDafterRun BEGINmodExit CODESTARTmodExit - /* release what we no longer need */ - objRelease(errmsg, CORE_COMPONENT); - objRelease(glbl, CORE_COMPONENT); - objRelease(prop, CORE_COMPONENT); + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); ENDmodExit BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature - if(eFeat == sFEATURENonCancelInputTermination) - iRet = RS_RET_OK; + if (eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; ENDisCompatibleWithFeature +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + /* After endCnfLoad() (BEGINendCnfLoad...ENDendCnfLoad) is called, + * the pModConf pointer must not be used to change the in-memory + * config object. It's safe to use the same pointer for accessing + * the config object until freeCnf() (BEGINfreeCnf...ENDfreeCnf). */ + runModConf = pModConf; + runModConf->pConf = pConf; + /* init module config */ + runModConf->io_threads = 0; /* 0 means don't set it */ +ENDbeginCnfLoad + + +BEGINsetModCnf + struct cnfparamvals* pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if (NULL == pvals) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "imzmq3: error processing module " + " config parameters ['module(...)']"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + for (i=0; i < modpblk.nParams; ++i) { + if (!pvals[i].bUsed) + continue; + if (!strcmp(modpblk.descr[i].name, "ioThreads")) { + runModConf->io_threads = (int)pvals[i].val.d.n; + } else { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "imzmq3: config error, unknown " + "param %s in setModCnf\n", + modpblk.descr[i].name); + } + } + +finalize_it: + if (pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + +BEGINendCnfLoad +CODESTARTendCnfLoad + /* Last chance to make changes to the in-memory config object for this + * input module. After this call, the config object must no longer be + * changed. */ + if (pModConf != runModConf) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has " + "changed - pModConf=%p, runModConf=%p", pModConf, runModConf); + } + assert(pModConf == runModConf); +ENDendCnfLoad + + +/* function to generate error message if framework does not find requested ruleset */ +static inline void +std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst) +{ + errmsg.LogError(0, NO_ERRCODE, "imzmq3: ruleset '%s' for socket %s not found - " + "using default ruleset instead", inst->pszBindRuleset, + inst->description); +} + + +BEGINcheckCnf +instanceConf_t* inst; +CODESTARTcheckCnf + for(inst = pModConf->root; inst!=NULL; inst=inst->next) { + std_checkRuleset(pModConf, inst); + /* now, validate the instanceConf */ + CHKiRet(validateConfig(inst)); + } +finalize_it: + RETiRet; +ENDcheckCnf + + +BEGINactivateCnfPrePrivDrop +CODESTARTactivateCnfPrePrivDrop + if (pModConf != runModConf) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has " + "changed - pModConf=%p, runModConf=%p", pModConf, runModConf); + } + assert(pModConf == runModConf); + + /* first create the context */ + createContext(); + + /* could setup context here, and set the global worker threads + and so on... */ +ENDactivateCnfPrePrivDrop + + +BEGINactivateCnf +CODESTARTactivateCnf + if (pModConf != runModConf) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has " + "changed - pModConf=%p, runModConf=%p", pModConf, runModConf); + } + assert(pModConf == runModConf); +ENDactivateCnf + + +BEGINfreeCnf + struct lstn_s *lstn, *lstn_r; + instanceConf_t *inst, *inst_r; + sublist *sub, *sub_r; +CODESTARTfreeCnf + DBGPRINTF("imzmq3: BEGINfreeCnf ...\n"); + if (pModConf != runModConf) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: pointer of in-memory config object has " + "changed - pModConf=%p, runModConf=%p", pModConf, runModConf); + } + for (lstn = lcnfRoot; lstn != NULL; ) { + lstn_r = lstn; + lstn = lstn_r->next; + free(lstn_r); + } + for (inst = pModConf->root ; inst != NULL ; ) { + for (sub = inst->subscriptions; sub != NULL; ) { + free(sub->subscribe); + sub_r = sub; + sub = sub_r->next; + free(sub_r); + } + free(inst->pszBindRuleset); + inst_r = inst; + inst = inst->next; + free(inst_r); + } +ENDfreeCnf + + +BEGINnewInpInst + struct cnfparamvals* pvals; +CODESTARTnewInpInst + + DBGPRINTF("newInpInst (imzmq3)\n"); + pvals = nvlstGetParams(lst, &inppblk, NULL); + if(NULL==pvals) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, + "imzmq3: required parameters are missing\n"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + DBGPRINTF("imzmq3: input param blk:\n"); + cnfparamsPrint(&inppblk, pvals); + + /* now, parse the config params and so on... */ + CHKiRet(createListener(pvals)); + +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES +CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt -static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, - void __attribute__((unused)) *pVal) { - return RS_RET_OK; -} -static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) { - errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val); - s_io_threads = val; - return RS_RET_OK; -} BEGINmodInit() CODESTARTmodInit /* we only support the current interface specification */ - *ipIFVersProvided = CURR_MOD_IF_VERSION; + *ipIFVersProvided = CURR_MOD_IF_VERSION; CODEmodInit_QueryRegCFSLineHdlr - CHKiRet(objUse(errmsg, CORE_COMPONENT)); - CHKiRet(objUse(glbl, CORE_COMPONENT)); - CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); - - /* register config file handlers */ - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset", - 0, eCmdHdlrGetWord, - set_ruleset, NULL, - STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun", - 0, eCmdHdlrGetWord, - add_endpoint, NULL, - STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", - 1, eCmdHdlrCustomHandler, - resetConfigVariables, NULL, - STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads", - 1, eCmdHdlrInt, - setGlobalWorkerThreads, NULL, - STD_LOADABLE_MODULE_ID)); ENDmodInit + + |