summaryrefslogtreecommitdiffstats
path: root/plugins/omzmq3
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omzmq3')
-rw-r--r--plugins/omzmq3/README14
-rw-r--r--plugins/omzmq3/omzmq3.c202
2 files changed, 112 insertions, 104 deletions
diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README
index ccc96c74..c2a33555 100644
--- a/plugins/omzmq3/README
+++ b/plugins/omzmq3/README
@@ -1,16 +1,10 @@
ZeroMQ 3.x Output Plugin
Building this plugin:
-Requires libzmq and libczmq. First, install libzmq from the HEAD on github:
-http://github.com/zeromq/libzmq. You can clone the repository, build, then
-install it. The directions for doing so are there in the readme. Then, do
-the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1
-version of libzmq will be released, and a supporting version of libczmq.
-At that time, you could simply download and install the tarballs instead of
-using git to clone the repositories. Those tarballs (when available) can
-be found at http://download.zeromq.org. As of this writing (5/31/2012), the
-most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile
-properly.
+Requires libzmq and libczmq. First, download the tarballs of both libzmq
+and its supporting libczmq from http://download.zeromq.org. As of this
+writing (04/23/2013), the most recent versions of libzmq and czmq are
+3.2.2 and 1.3.2 respectively. Configure, build, and then install both libs.
Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example
below binds a PUB socket to port 7171, and any message fitting the criteria will
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c
index ee6756b9..c8552f11 100644
--- a/plugins/omzmq3/omzmq3.c
+++ b/plugins/omzmq3/omzmq3.c
@@ -110,9 +110,10 @@ static zctx_t* s_context = NULL;
static int s_workerThreads = -1;
static struct socket_type types[] = {
- {"PUB", ZMQ_PUB },
- {"PUSH", ZMQ_PUSH },
- {"XPUB", ZMQ_XPUB }
+ {"PUB", ZMQ_PUB },
+ {"PUSH", ZMQ_PUSH },
+ {"DEALER", ZMQ_DEALER },
+ {"XPUB", ZMQ_XPUB }
};
static struct socket_action actions[] = {
@@ -201,17 +202,18 @@ static rsRetVal initZMQ(instanceData* pData) {
/* create the context if necessary. */
if (NULL == s_context) {
+ zsys_handler_set(NULL);
s_context = zctx_new();
if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads);
}
pData->socket = zsocket_new(s_context, pData->type);
-
- /* ALWAYS set the HWM as the zmq3 default is 1000 and we default
- to 0 (infinity) */
- zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
- zsocket_set_sndhwm(pData->socket, pData->sndHWM);
-
+ if (NULL == pData->socket) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE,
+ "omzmq3: zsocket_new failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
+ }
/* use czmq defaults for these, unless set to non-default values */
if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity);
if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf);
@@ -228,17 +230,26 @@ static rsRetVal initZMQ(instanceData* pData) {
if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax);
if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only);
if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity);
-
+ if(pData->rcvHWM > -1) zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
+ if(pData->sndHWM > -1) zsocket_set_sndhwm(pData->socket, pData->sndHWM);
+
/* bind or connect to it */
if (pData->action == ACTION_BIND) {
/* bind asserts, so no need to test return val here
which isn't the greatest api -- oh well */
- zsocket_bind(pData->socket, (char*)pData->description);
+ if(-1 == zsocket_bind(pData->socket, (char*)pData->description)) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: bind failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
+ }
+ DBGPRINTF("omzmq3: bind to %s successful\n",pData->description);
} else {
- if(zsocket_connect(pData->socket, (char*)pData->description) == -1) {
- errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!");
- ABORT_FINALIZE(RS_RET_SUSPENDED);
+ if(-1 == zsocket_connect(pData->socket, (char*)pData->description)) {
+ errmsg.LogError(0, RS_RET_NO_ERRCODE, "omzmq3: connect failed for %s: %s",
+ pData->description, zmq_strerror(errno));
+ ABORT_FINALIZE(RS_RET_NO_ERRCODE);
}
+ DBGPRINTF("omzmq3: connect to %s successful", pData->description);
}
finalize_it:
RETiRet;
@@ -256,7 +267,7 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
/* whine if things went wrong */
if (result == -1) {
- errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result);
+ errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed: %s", msg, zmq_strerror(errno));
ABORT_FINALIZE(RS_RET_ERR);
}
finalize_it:
@@ -265,13 +276,13 @@ rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
static inline void
setInstParamDefaults(instanceData* pData) {
- pData->description = (uchar*)"tcp://*:7171";
+ pData->description = NULL;
pData->socket = NULL;
pData->tplName = NULL;
pData->type = ZMQ_PUB;
pData->action = ACTION_BIND;
- pData->sndHWM = 0; /*unlimited*/
- pData->rcvHWM = 0; /*unlimited*/
+ pData->sndHWM = -1;
+ pData->rcvHWM = -1;
pData->identity = NULL;
pData->sndBuf = -1;
pData->rcvBuf = -1;
@@ -314,6 +325,7 @@ CODESTARTfreeInstance
closeZMQ(pData);
free(pData->description);
free(pData->tplName);
+ free(pData->identity);
ENDfreeInstance
BEGINtryResume
@@ -329,88 +341,90 @@ ENDdoAction
BEGINnewActInst
- struct cnfparamvals *pvals;
- int i;
+ struct cnfparamvals *pvals;
+ int i;
CODESTARTnewActInst
-if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
- ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
- }
+ if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
CODE_STD_STRING_REQUESTnewActInst(1)
-for(i = 0 ; i < actpblk.nParams ; ++i) {
- if(!pvals[i].bUsed)
- continue;
- if(!strcmp(actpblk.descr[i].name, "description")) {
- pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "template")) {
- pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "sockType")){
- pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
- } else if(!strcmp(actpblk.descr[i].name, "action")){
- pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
- } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) {
- pData->sndHWM = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) {
- pData->rcvHWM = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "identity")){
- pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) {
- pData->sndBuf = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) {
- pData->rcvBuf = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "linger")) {
- pData->linger = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "backlog")) {
- pData->backlog = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) {
- pData->sndTimeout = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
- pData->rcvTimeout = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
- pData->maxMsgSize = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "rate")) {
- pData->rate = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
- pData->recoveryIVL = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) {
- pData->multicastHops = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
- pData->reconnectIVL = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
- pData->reconnectIVLMax = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) {
- pData->ipv4Only = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "affinity")) {
- pData->affinity = (int) pvals[i].val.d.n;
- } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
- s_workerThreads = (int) pvals[i].val.d.n;
- } else {
- errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
- "param '%s'\n", actpblk.descr[i].name);
+ for (i = 0; i < actpblk.nParams; ++i) {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(actpblk.descr[i].name, "description")) {
+ pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "sockType")){
+ pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if (!strcmp(actpblk.descr[i].name, "action")){
+ pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if (!strcmp(actpblk.descr[i].name, "sndHWM")) {
+ pData->sndHWM = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvHWM")) {
+ pData->rcvHWM = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "identity")){
+ pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "sndBuf")) {
+ pData->sndBuf = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvBuf")) {
+ pData->rcvBuf = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "linger")) {
+ pData->linger = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "backlog")) {
+ pData->backlog = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "sndTimeout")) {
+ pData->sndTimeout = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
+ pData->rcvTimeout = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
+ pData->maxMsgSize = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "rate")) {
+ pData->rate = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
+ pData->recoveryIVL = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "multicastHops")) {
+ pData->multicastHops = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
+ pData->reconnectIVL = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
+ pData->reconnectIVLMax = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "ipv4Only")) {
+ pData->ipv4Only = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "affinity")) {
+ pData->affinity = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
+ s_workerThreads = (int) pvals[i].val.d.n;
+ } else {
+ errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
}
- }
-
-if(pData->tplName == NULL) {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
- } else {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
- }
-
-if(pData->type == -1) {
- errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type.");
- ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
- }
-if(pData->action == -1) {
- errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action");
- ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
- }
+ if (pData->tplName == NULL) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup("RSYSLOG_ForwardFormat"), OMSR_NO_RQD_TPL_OPTS));
+ } else {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
+ }
+ if (NULL == pData->description) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: you didn't enter a description");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ if (pData->type == -1) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type.");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ if (pData->action == -1) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
CODE_STD_FINALIZERnewActInst
- cnfparamvalsDestruct(pvals, &actpblk);
+ cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
BEGINparseSelectorAct
@@ -433,10 +447,10 @@ ENDinitConfVars
BEGINmodExit
CODESTARTmodExit
-if(NULL != s_context) {
- zctx_destroy(&s_context);
- s_context=NULL;
- }
+ if (NULL != s_context) {
+ zctx_destroy(&s_context);
+ s_context=NULL;
+ }
ENDmodExit