diff options
Diffstat (limited to 'plugins/omzmq3/omzmq3.c')
-rw-r--r-- | plugins/omzmq3/omzmq3.c | 76 |
1 files changed, 39 insertions, 37 deletions
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c index 885bc365..e13011fb 100644 --- a/plugins/omzmq3/omzmq3.c +++ b/plugins/omzmq3/omzmq3.c @@ -52,8 +52,8 @@ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) /* convienent symbols to denote a socket we want to bind -p * vs one we want to just connect to - */ + vs one we want to just connect to +*/ #define ACTION_CONNECT 1 #define ACTION_BIND 2 @@ -66,7 +66,7 @@ struct socket_type { int type; }; -// more overkill, but seems nice to be consistent. +/* more overkill, but seems nice to be consistent. */ struct socket_action { char* name; int action; @@ -102,9 +102,10 @@ typedef struct _instanceData { * Static definitions/initializations */ -// only 1 zctx for all the sockets, with an adjustable number of -// worker threads which may be useful if we use affinity in particular -// sockets +/* only 1 zctx for all the sockets, with an adjustable number of + worker threads which may be useful if we use affinity in particular + sockets +*/ static zctx_t* s_context = NULL; static int s_workerThreads = -1; @@ -154,8 +155,9 @@ static struct cnfparamblk actpblk = { * Helper Functions */ -// get the name of a socket type, return the ZMQ_XXX type -// or -1 if not a supported type (see above) +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ int getSocketType(char* name) { int type = -1; uint i; @@ -197,7 +199,7 @@ static void closeZMQ(instanceData* pData) { static rsRetVal initZMQ(instanceData* pData) { DEFiRet; - // create the context if necessary. + /* create the context if necessary. */ if (NULL == s_context) { s_context = zctx_new(); if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); @@ -205,12 +207,12 @@ static rsRetVal initZMQ(instanceData* pData) { pData->socket = zsocket_new(s_context, pData->type); - // ALWAYS set the HWM as the zmq3 default is 1000 and we default - // to 0 (infinity) + /* ALWAYS set the HWM as the zmq3 default is 1000 and we default + to 0 (infinity) */ zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); zsocket_set_sndhwm(pData->socket, pData->sndHWM); - // use czmq defaults for these, unless set to non-default values + /* use czmq defaults for these, unless set to non-default values */ if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf); @@ -227,10 +229,10 @@ static rsRetVal initZMQ(instanceData* pData) { if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); - // bind or connect to it + /* bind or connect to it */ if (pData->action == ACTION_BIND) { - // bind asserts, so no need to test return val here - // which isn't the greatest api -- oh well + /* bind asserts, so no need to test return val here + which isn't the greatest api -- oh well */ zsocket_bind(pData->socket, (char*)pData->description); } else { if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { @@ -245,14 +247,14 @@ static rsRetVal initZMQ(instanceData* pData) { rsRetVal writeZMQ(uchar* msg, instanceData* pData) { DEFiRet; - // initialize if necessary + /* initialize if necessary */ if(NULL == pData->socket) CHKiRet(initZMQ(pData)); - // send the shit... + /* send it */ int result = zstr_send(pData->socket, (char*)msg); - // whine if shit went wrong + /* whine if things went wrong */ if (result == -1) { errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); ABORT_FINALIZE(RS_RET_ERR); @@ -268,8 +270,8 @@ setInstParamDefaults(instanceData* pData) { pData->tplName = NULL; pData->type = ZMQ_PUB; pData->action = ACTION_BIND; - pData->sndHWM = 0; // unlimited - pData->rcvHWM = 0; // unlimited + pData->sndHWM = 0; /*unlimited*/ + pData->rcvHWM = 0; /*unlimited*/ pData->identity = NULL; pData->sndBuf = -1; pData->rcvBuf = -1; @@ -350,41 +352,41 @@ for(i = 0 ; i < actpblk.nParams ; ++i) { } else if(!strcmp(actpblk.descr[i].name, "action")){ pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { - pData->sndHWM = (int) pvals[i].val.d.n, NULL; + pData->sndHWM = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { - pData->rcvHWM = (int) pvals[i].val.d.n, NULL; + pData->rcvHWM = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "identity")){ pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { - pData->sndBuf = (int) pvals[i].val.d.n, NULL; + pData->sndBuf = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { - pData->rcvBuf = (int) pvals[i].val.d.n, NULL; + pData->rcvBuf = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "linger")) { - pData->linger = (int) pvals[i].val.d.n, NULL; + pData->linger = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "backlog")) { - pData->backlog = (int) pvals[i].val.d.n, NULL; + pData->backlog = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { - pData->sndTimeout = (int) pvals[i].val.d.n, NULL; + pData->sndTimeout = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { - pData->rcvTimeout = (int) pvals[i].val.d.n, NULL; + pData->rcvTimeout = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { - pData->maxMsgSize = (int) pvals[i].val.d.n, NULL; + pData->maxMsgSize = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rate")) { - pData->rate = (int) pvals[i].val.d.n, NULL; + pData->rate = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { - pData->recoveryIVL = (int) pvals[i].val.d.n, NULL; + pData->recoveryIVL = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { - pData->multicastHops = (int) pvals[i].val.d.n, NULL; + pData->multicastHops = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { - pData->reconnectIVL = (int) pvals[i].val.d.n, NULL; + pData->reconnectIVL = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { - pData->reconnectIVLMax = (int) pvals[i].val.d.n, NULL; + pData->reconnectIVLMax = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { - pData->ipv4Only = (int) pvals[i].val.d.n, NULL; + pData->ipv4Only = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "affinity")) { - pData->affinity = (int) pvals[i].val.d.n, NULL; + pData->affinity = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { - s_workerThreads = (int) pvals[i].val.d.n, NULL; + s_workerThreads = (int) pvals[i].val.d.n; } else { errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); |