summaryrefslogtreecommitdiffstats
path: root/plugins/omzmq3/omzmq3.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omzmq3/omzmq3.c')
-rw-r--r--plugins/omzmq3/omzmq3.c76
1 files changed, 39 insertions, 37 deletions
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c
index 885bc365..e13011fb 100644
--- a/plugins/omzmq3/omzmq3.c
+++ b/plugins/omzmq3/omzmq3.c
@@ -52,8 +52,8 @@ DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
/* convienent symbols to denote a socket we want to bind
-p * vs one we want to just connect to
- */
+ vs one we want to just connect to
+*/
#define ACTION_CONNECT 1
#define ACTION_BIND 2
@@ -66,7 +66,7 @@ struct socket_type {
int type;
};
-// more overkill, but seems nice to be consistent.
+/* more overkill, but seems nice to be consistent. */
struct socket_action {
char* name;
int action;
@@ -102,9 +102,10 @@ typedef struct _instanceData {
* Static definitions/initializations
*/
-// only 1 zctx for all the sockets, with an adjustable number of
-// worker threads which may be useful if we use affinity in particular
-// sockets
+/* only 1 zctx for all the sockets, with an adjustable number of
+ worker threads which may be useful if we use affinity in particular
+ sockets
+*/
static zctx_t* s_context = NULL;
static int s_workerThreads = -1;
@@ -154,8 +155,9 @@ static struct cnfparamblk actpblk = {
* Helper Functions
*/
-// get the name of a socket type, return the ZMQ_XXX type
-// or -1 if not a supported type (see above)
+/* get the name of a socket type, return the ZMQ_XXX type
+ or -1 if not a supported type (see above)
+*/
int getSocketType(char* name) {
int type = -1;
uint i;
@@ -197,7 +199,7 @@ static void closeZMQ(instanceData* pData) {
static rsRetVal initZMQ(instanceData* pData) {
DEFiRet;
- // create the context if necessary.
+ /* create the context if necessary. */
if (NULL == s_context) {
s_context = zctx_new();
if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads);
@@ -205,12 +207,12 @@ static rsRetVal initZMQ(instanceData* pData) {
pData->socket = zsocket_new(s_context, pData->type);
- // ALWAYS set the HWM as the zmq3 default is 1000 and we default
- // to 0 (infinity)
+ /* ALWAYS set the HWM as the zmq3 default is 1000 and we default
+ to 0 (infinity) */
zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
zsocket_set_sndhwm(pData->socket, pData->sndHWM);
- // use czmq defaults for these, unless set to non-default values
+ /* use czmq defaults for these, unless set to non-default values */
if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity);
if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf);
if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf);
@@ -227,10 +229,10 @@ static rsRetVal initZMQ(instanceData* pData) {
if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only);
if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity);
- // bind or connect to it
+ /* bind or connect to it */
if (pData->action == ACTION_BIND) {
- // bind asserts, so no need to test return val here
- // which isn't the greatest api -- oh well
+ /* bind asserts, so no need to test return val here
+ which isn't the greatest api -- oh well */
zsocket_bind(pData->socket, (char*)pData->description);
} else {
if(zsocket_connect(pData->socket, (char*)pData->description) == -1) {
@@ -245,14 +247,14 @@ static rsRetVal initZMQ(instanceData* pData) {
rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
DEFiRet;
- // initialize if necessary
+ /* initialize if necessary */
if(NULL == pData->socket)
CHKiRet(initZMQ(pData));
- // send the shit...
+ /* send it */
int result = zstr_send(pData->socket, (char*)msg);
- // whine if shit went wrong
+ /* whine if things went wrong */
if (result == -1) {
errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result);
ABORT_FINALIZE(RS_RET_ERR);
@@ -268,8 +270,8 @@ setInstParamDefaults(instanceData* pData) {
pData->tplName = NULL;
pData->type = ZMQ_PUB;
pData->action = ACTION_BIND;
- pData->sndHWM = 0; // unlimited
- pData->rcvHWM = 0; // unlimited
+ pData->sndHWM = 0; /*unlimited*/
+ pData->rcvHWM = 0; /*unlimited*/
pData->identity = NULL;
pData->sndBuf = -1;
pData->rcvBuf = -1;
@@ -350,41 +352,41 @@ for(i = 0 ; i < actpblk.nParams ; ++i) {
} else if(!strcmp(actpblk.descr[i].name, "action")){
pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
} else if(!strcmp(actpblk.descr[i].name, "sndHWM")) {
- pData->sndHWM = (int) pvals[i].val.d.n, NULL;
+ pData->sndHWM = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) {
- pData->rcvHWM = (int) pvals[i].val.d.n, NULL;
+ pData->rcvHWM = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "identity")){
pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "sndBuf")) {
- pData->sndBuf = (int) pvals[i].val.d.n, NULL;
+ pData->sndBuf = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) {
- pData->rcvBuf = (int) pvals[i].val.d.n, NULL;
+ pData->rcvBuf = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "linger")) {
- pData->linger = (int) pvals[i].val.d.n, NULL;
+ pData->linger = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "backlog")) {
- pData->backlog = (int) pvals[i].val.d.n, NULL;
+ pData->backlog = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) {
- pData->sndTimeout = (int) pvals[i].val.d.n, NULL;
+ pData->sndTimeout = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
- pData->rcvTimeout = (int) pvals[i].val.d.n, NULL;
+ pData->rcvTimeout = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
- pData->maxMsgSize = (int) pvals[i].val.d.n, NULL;
+ pData->maxMsgSize = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "rate")) {
- pData->rate = (int) pvals[i].val.d.n, NULL;
+ pData->rate = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
- pData->recoveryIVL = (int) pvals[i].val.d.n, NULL;
+ pData->recoveryIVL = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "multicastHops")) {
- pData->multicastHops = (int) pvals[i].val.d.n, NULL;
+ pData->multicastHops = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
- pData->reconnectIVL = (int) pvals[i].val.d.n, NULL;
+ pData->reconnectIVL = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
- pData->reconnectIVLMax = (int) pvals[i].val.d.n, NULL;
+ pData->reconnectIVLMax = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) {
- pData->ipv4Only = (int) pvals[i].val.d.n, NULL;
+ pData->ipv4Only = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "affinity")) {
- pData->affinity = (int) pvals[i].val.d.n, NULL;
+ pData->affinity = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
- s_workerThreads = (int) pvals[i].val.d.n, NULL;
+ s_workerThreads = (int) pvals[i].val.d.n;
} else {
errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);