diff options
Diffstat (limited to 'plugins/omzmq3')
-rw-r--r-- | plugins/omzmq3/README | 14 | ||||
-rw-r--r-- | plugins/omzmq3/omzmq3.c | 202 |
2 files changed, 112 insertions, 104 deletions
diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README index ccc96c74..c2a33555 100644 --- a/plugins/omzmq3/README +++ b/plugins/omzmq3/README @@ -1,16 +1,10 @@ ZeroMQ 3.x Output Plugin Building this plugin: -Requires libzmq and libczmq. First, install libzmq from the HEAD on github: -http://github.com/zeromq/libzmq. You can clone the repository, build, then -install it. The directions for doing so are there in the readme. Then, do -the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 -version of libzmq will be released, and a supporting version of libczmq. -At that time, you could simply download and install the tarballs instead of -using git to clone the repositories. Those tarballs (when available) can -be found at http://download.zeromq.org. As of this writing (5/31/2012), the -most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile -properly. +Requires libzmq and libczmq. First, download the tarballs of both libzmq +and its supporting libczmq from http://download.zeromq.org. As of this +writing (04/23/2013), the most recent versions of libzmq and czmq are +3.2.2 and 1.3.2 respectively. Configure, build, and then install both libs. Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example below binds a PUB socket to port 7171, and any message fitting the criteria will diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c index ee6756b9..c8552f11 100644 --- a/plugins/omzmq3/omzmq3.c +++ b/plugins/omzmq3/omzmq3.c @@ -110,9 +110,10 @@ static zctx_t* s_context = NULL; static int s_workerThreads = -1; static struct socket_type types[] = { - {"PUB", ZMQ_PUB }, - {"PUSH", ZMQ_PUSH }, - {"XPUB", ZMQ_XPUB } + {"PUB", ZMQ_PUB }, + {"PUSH", ZMQ_PUSH }, + {"DEALER", ZMQ_DEALER }, + {"XPUB", ZMQ_XPUB } }; static struct socket_action actions[] = { @@ -201,17 +202,18 @@ static rsRetVal initZMQ(instanceData* pData) { /* create the context if necessary. */ if (NULL == s_context) { + zsys_handler_set(NULL); s_context = zctx_new(); if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); } pData->socket = zsocket_new(s_context, pData->type); - - /* ALWAYS set the HWM as the zmq3 default is 1000 and we default - to 0 (infinity) */ - zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); - zsocket_set_sndhwm(pData->socket, pData->sndHWM); - + if (NULL == pData->socket) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, + "omzmq3: zsocket_new failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); + } /* use czmq defaults for these, unless set to non-default values */ if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); @@ -228,17 +230,26 @@ static rsRetVal initZMQ(instanceData* pData) { if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax); if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); - + if(pData->rcvHWM > -1) zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); + if(pData->sndHWM > -1) zsocket_set_sndhwm(pData->socket, pData->sndHWM); + /* bind or connect to it */ if (pData->action == ACTION_BIND) { /* bind asserts, so no need to test return val here which isn't the greatest api -- oh well */ - zsocket_bind(pData->socket, (char*)pData->description); + if(-1 == zsocket_bind(pData->socket, (char*)pData->description)) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: bind failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); + } + DBGPRINTF("omzmq3: bind to %s successful\n",pData->description); } else { - if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { - errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!"); - ABORT_FINALIZE(RS_RET_SUSPENDED); + if(-1 == zsocket_connect(pData->socket, (char*)pData->description)) { + errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: connect failed for %s: %s", + pData->description, zmq_strerror(errno)); + ABORT_FINALIZE(RS_RET_NO_ERRCODE); } + DBGPRINTF("omzmq3: connect to %s successful", pData->description); } finalize_it: RETiRet; @@ -256,7 +267,7 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) { /* whine if things went wrong */ if (result == -1) { - errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); + errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed: %s", msg, zmq_strerror(errno)); ABORT_FINALIZE(RS_RET_ERR); } finalize_it: @@ -265,13 +276,13 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) { static inline void setInstParamDefaults(instanceData* pData) { - pData->description = (uchar*)"tcp://*:7171"; + pData->description = NULL; pData->socket = NULL; pData->tplName = NULL; pData->type = ZMQ_PUB; pData->action = ACTION_BIND; - pData->sndHWM = 0; /*unlimited*/ - pData->rcvHWM = 0; /*unlimited*/ + pData->sndHWM = -1; + pData->rcvHWM = -1; pData->identity = NULL; pData->sndBuf = -1; pData->rcvBuf = -1; @@ -314,6 +325,7 @@ CODESTARTfreeInstance closeZMQ(pData); free(pData->description); free(pData->tplName); + free(pData->identity); ENDfreeInstance BEGINtryResume @@ -329,88 +341,90 @@ ENDdoAction BEGINnewActInst - struct cnfparamvals *pvals; - int i; + struct cnfparamvals *pvals; + int i; CODESTARTnewActInst -if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { - ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); - } + if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); CODE_STD_STRING_REQUESTnewActInst(1) -for(i = 0 ; i < actpblk.nParams ; ++i) { - if(!pvals[i].bUsed) - continue; - if(!strcmp(actpblk.descr[i].name, "description")) { - pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "template")) { - pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "sockType")){ - pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); - } else if(!strcmp(actpblk.descr[i].name, "action")){ - pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); - } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { - pData->sndHWM = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { - pData->rcvHWM = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "identity")){ - pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); - } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { - pData->sndBuf = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { - pData->rcvBuf = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "linger")) { - pData->linger = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "backlog")) { - pData->backlog = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { - pData->sndTimeout = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { - pData->rcvTimeout = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { - pData->maxMsgSize = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "rate")) { - pData->rate = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { - pData->recoveryIVL = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { - pData->multicastHops = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { - pData->reconnectIVL = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { - pData->reconnectIVLMax = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { - pData->ipv4Only = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "affinity")) { - pData->affinity = (int) pvals[i].val.d.n; - } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { - s_workerThreads = (int) pvals[i].val.d.n; - } else { - errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " - "param '%s'\n", actpblk.descr[i].name); + for (i = 0; i < actpblk.nParams; ++i) { + if (!pvals[i].bUsed) + continue; + if (!strcmp(actpblk.descr[i].name, "description")) { + pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "sockType")){ + pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if (!strcmp(actpblk.descr[i].name, "action")){ + pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if (!strcmp(actpblk.descr[i].name, "sndHWM")) { + pData->sndHWM = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvHWM")) { + pData->rcvHWM = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "identity")){ + pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "sndBuf")) { + pData->sndBuf = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvBuf")) { + pData->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "linger")) { + pData->linger = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "backlog")) { + pData->backlog = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "sndTimeout")) { + pData->sndTimeout = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rcvTimeout")) { + pData->rcvTimeout = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "maxMsgSize")) { + pData->maxMsgSize = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "rate")) { + pData->rate = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "recoveryIVL")) { + pData->recoveryIVL = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "multicastHops")) { + pData->multicastHops = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "reconnectIVL")) { + pData->reconnectIVL = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { + pData->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "ipv4Only")) { + pData->ipv4Only = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "affinity")) { + pData->affinity = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { + s_workerThreads = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } } - } - -if(pData->tplName == NULL) { - CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); - } else { - CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); - } - -if(pData->type == -1) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } -if(pData->action == -1) { - errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); - ABORT_FINALIZE(RS_RET_CONFIG_ERROR); - } + if (pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup("RSYSLOG_ForwardFormat"), OMSR_NO_RQD_TPL_OPTS)); + } else { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); + } + if (NULL == pData->description) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: you didn't enter a description"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if (pData->type == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if (pData->action == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } CODE_STD_FINALIZERnewActInst - cnfparamvalsDestruct(pvals, &actpblk); + cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst BEGINparseSelectorAct @@ -433,10 +447,10 @@ ENDinitConfVars BEGINmodExit CODESTARTmodExit -if(NULL != s_context) { - zctx_destroy(&s_context); - s_context=NULL; - } + if (NULL != s_context) { + zctx_destroy(&s_context); + s_context=NULL; + } ENDmodExit |