From cbff73d94c3a86ed74294fe1265dc5242f9317be Mon Sep 17 00:00:00 2001 From: David Kelly Date: Tue, 29 May 2012 13:05:59 +0200 Subject: added new-style zeromq plugins, based on czmq api and rsyslog v6 conf --- Makefile.am | 8 + configure.ac | 42 +++ plugins/impstats/impstats.c | 11 +- plugins/imzmq3/Makefile.am | 8 + plugins/imzmq3/imzmq3.c | 651 ++++++++++++++++++++++++++++++++++++++++++++ plugins/omzmq3/Makefile.am | 8 + plugins/omzmq3/omzmq3.c | 460 +++++++++++++++++++++++++++++++ runtime/statsobj.c | 12 +- runtime/statsobj.h | 3 +- 9 files changed, 1198 insertions(+), 5 deletions(-) create mode 100644 plugins/imzmq3/Makefile.am create mode 100644 plugins/imzmq3/imzmq3.c create mode 100644 plugins/omzmq3/Makefile.am create mode 100644 plugins/omzmq3/omzmq3.c diff --git a/Makefile.am b/Makefile.am index 999404e7..4cefb756 100644 --- a/Makefile.am +++ b/Makefile.am @@ -158,6 +158,14 @@ if ENABLE_OMHIREDIS SUBDIRS += plugins/omhiredis endif +if ENABLE_OMZMQ3 +SUBDIRS += plugins/omzmq3 +endif + +if ENABLE_IMZMQ3 +SUBDIRS += plugins/imzmq3 +endif + if ENABLE_OMUXSOCK SUBDIRS += plugins/omuxsock endif diff --git a/configure.ac b/configure.ac index 0d41572c..cee453db 100644 --- a/configure.ac +++ b/configure.ac @@ -1269,6 +1269,44 @@ fi AM_CONDITIONAL(ENABLE_OMMONGODB, test x$enable_ommongodb = xyes) # end of mongodb code +# BEGIN ZMQ3 INPUT SUPPORT +AC_ARG_ENABLE(imzmq3, + [AS_HELP_STRING([--enable-imzmq3],[Compiles imzmq3 output module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_imzmq3="yes" ;; + no) enable_imzmq3="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-imzmq3) ;; + esac], + [enable_imzmq3=no] +) +if test "x$enable_imzmq3" = "xyes"; then + PKG_CHECK_MODULES(CZMQ, libczmq >= 1.1.0) + AC_SUBST(CZMQ_CFLAGS) + AC_SUBST(CZMQ_LIBS) +fi +AM_CONDITIONAL(ENABLE_IMZMQ3, test x$enable_imzmq3 = xyes) + +# END ZMQ3 INPUT SUPPORT + +# BEGIN ZMQ3 OUTPUT SUPPORT +AC_ARG_ENABLE(omzmq3, + [AS_HELP_STRING([--enable-omzmq3],[Compiles omzmq3 output module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_omzmq3="yes" ;; + no) enable_omzmq3="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-omzmq3) ;; + esac], + [enable_omzmq3=no] +) +if test "x$enable_omzmq3" = "xyes"; then + PKG_CHECK_MODULES(CZMQ, libczmq >= 1.1.0) + AC_SUBST(CZMQ_CFLAGS) + AC_SUBST(CZMQ_LIBS) +fi +AM_CONDITIONAL(ENABLE_OMZMQ3, test x$enable_omzmq3 = xyes) + +# END ZMQ3 SUPPORT + # HIREDIS SUPPORT AC_ARG_ENABLE(omhiredis, @@ -1319,6 +1357,7 @@ AC_CONFIG_FILES([Makefile \ plugins/impstats/Makefile \ plugins/imrelp/Makefile \ plugins/imdiag/Makefile \ + plugins/imzmq3/Makefile \ plugins/omtesting/Makefile \ plugins/omgssapi/Makefile \ plugins/ommysql/Makefile \ @@ -1331,6 +1370,7 @@ AC_CONFIG_FILES([Makefile \ plugins/omudpspoof/Makefile \ plugins/ommongodb/Makefile \ plugins/omhiredis/Makefile \ + plugins/omzmq3/Makefile \ plugins/mmnormalize/Makefile \ plugins/mmjsonparse/Makefile \ plugins/mmaudit/Makefile \ @@ -1363,6 +1403,7 @@ echo " imdiag enabled: $enable_imdiag" echo " file input module enabled: $enable_imfile" echo " Solaris input module enabled: $enable_imsolaris" echo " periodic statistics module enabled: $enable_impstats" +echo " imzmq3 input module enabled: $enable_imzmq3" echo echo "---{ output plugins }---" echo " Mail support enabled: $enable_mail" @@ -1373,6 +1414,7 @@ echo " omelasticsearch module will be compiled: $enable_elasticsearch" echo " omruleset module will be compiled: $enable_omruleset" echo " omudpspoof module will be compiled: $enable_omudpspoof" echo " omuxsock module will be compiled: $enable_omuxsock" +echo " omzmq3 module will be compiled: $enable_omzmq3" echo echo "---{ parser modules }---" echo " pmrfc3164sd module will be compiled: $enable_pmrfc3164sd" diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 4fec8e70..0abde84a 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -59,6 +59,7 @@ typedef struct configSettings_s { int iFacility; int iSeverity; int bJSON; + int bCEE; } configSettings_t; struct modConfData_s { @@ -89,6 +90,7 @@ initConfigSettings(void) cs.iFacility = DEFAULT_FACILITY; cs.iSeverity = DEFAULT_SEVERITY; cs.bJSON = 0; + cs.bCEE = 0; } @@ -157,7 +159,13 @@ CODESTARTendCnfLoad loadModConf->iStatsInterval = cs.iStatsInterval; loadModConf->iFacility = cs.iFacility; loadModConf->iSeverity = cs.iSeverity; - loadModConf->statsFmt = cs.bJSON ? statsFmt_JSON : statsFmt_Legacy; + if (cs.bCEE == 1) { + loadModConf->statsFmt = statsFmt_CEE; + } else if (cs.bJSON == 1) { + loadModConf->statsFmt = statsFmt_JSON; + } else { + loadModConf->statsFmt = statsFmt_Legacy; + } ENDendCnfLoad @@ -259,6 +267,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatjson", 0, eCmdHdlrBinary, NULL, &cs.bJSON, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatcee", 0, eCmdHdlrBinary, NULL, &cs.bCEE, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(prop.Construct(&pInputName)); diff --git a/plugins/imzmq3/Makefile.am b/plugins/imzmq3/Makefile.am new file mode 100644 index 00000000..f9c84e5d --- /dev/null +++ b/plugins/imzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = imzmq3.la + +imzmq3_la_SOURCES = imzmq3.c +imzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +imzmq3_la_LDFLAGS = -module -avoid-version +imzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c new file mode 100644 index 00000000..0195fd20 --- /dev/null +++ b/plugins/imzmq3/imzmq3.c @@ -0,0 +1,651 @@ +/* imzmq3.c + * + * This input plugin enables rsyslog to read messages from a ZeroMQ + * queue. + * + * Copyright 2012 Talksum, Inc. + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * . + * + * Author: David Kelly + * + */ + +#include +#include +#include +#include +#include + +#include "rsyslog.h" + +#include "cfsysline.h" +#include "config.h" +#include "dirty.h" +#include "errmsg.h" +#include "glbl.h" +#include "module-template.h" +#include "msg.h" +#include "net.h" +#include "parser.h" +#include "prop.h" +#include "ruleset.h" +#include "srUtils.h" +#include "unicode-helper.h" + +#include + +MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP + +/* convienent symbols to denote a socket we want to bind + * vs one we want to just connect to + */ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + +/* Module static data */ +DEF_IMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +typedef struct _socket_type { + char* name; + int type; +} socket_type; + +// more overkill, but seems nice to be consistent. +typedef struct _socket_action { + char* name; + int action; +} socket_action; + +typedef struct _poller_data { + ruleset_t* ruleset; + thrdInfo_t* thread; +} poller_data; + +typedef struct _socket_info { + int type; + int action; + char* description; + int sndHWM; // if you want more than 2^32 messages, + int rcvHWM; // then pass in 0 (the default). + char* identity; + char** subscriptions; + ruleset_t* ruleset; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + +} socket_info; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations. + */ +static socket_info* s_socketInfo = NULL; +static size_t s_nitems = 0; +static prop_t * s_namep = NULL; +static zloop_t* s_zloop = NULL; +static int s_io_threads = 1; +static zctx_t* s_context = NULL; +static ruleset_t* s_ruleset = NULL; +static socket_type socketTypes[] = { + {"SUB", ZMQ_SUB }, + {"PULL", ZMQ_PULL }, + {"XSUB", ZMQ_XSUB } +}; + +static socket_action socketActions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + + +/* ---------------------------------------------------------------------------- + * Helper functions + */ + +// get the name of a socket type, return the ZMQ_XXX type +// or -1 if not a supported type (see above) +static int getSocketType(char* name) { + int type = -1; + uint i; + + // match name with known socket type + for(i=0; itype = ZMQ_SUB; + info->action = ACTION_BIND; + info->description = NULL; + info->sndHWM = 0; + info->rcvHWM = 0; + info->identity = NULL; + info->subscriptions = NULL; + info->ruleset = NULL; + info->sndBuf = -1; + info->rcvBuf = -1; + info->linger = -1; + info->backlog = -1; + info->sndTimeout = -1; + info->rcvTimeout = -1; + info->maxMsgSize = -1; + info->rate = -1; + info->recoveryIVL = -1; + info->multicastHops = -1; + info->reconnectIVL = -1; + info->reconnectIVLMax = -1; + info->ipv4Only = -1; + info->affinity = -1; + +}; + + +/* The config string should look like: + * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'" + * + */ +static rsRetVal parseConfig(char* config, socket_info* info) { + int nsubs = 0; + + char* binding; + char* ptr1; + for (binding = strtok_r(config, ",", &ptr1); + binding != NULL; + binding = strtok_r(NULL, ",", &ptr1)) { + + // Each binding looks like foo=bar + char * sep = strchr(binding, '='); + if (sep == NULL) + { + errmsg.LogError(0, NO_ERRCODE, + "Invalid argument format %s, ignoring ...", + binding); + continue; + } + + // Replace '=' with '\0'. + *sep = '\0'; + + char * val = sep + 1; + + if (strcmp(binding, "action") == 0) { + info->action = getSocketAction(val); + } else if (strcmp(binding, "type") == 0) { + info->type = getSocketType(val); + } else if (strcmp(binding, "description") == 0) { + info->description = strdup(val); + } else if (strcmp(binding, "sndHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "rcvHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "subscribe") == 0) { + // Add the subscription value to the list. + char * substr = NULL; + substr = strdup(val); + info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); + info->subscriptions[nsubs] = substr; + ++nsubs; + } else if (strcmp(binding, "sndBuf") == 0) { + info->sndBuf = atoi(val); + } else if (strcmp(binding, "rcvBuf") == 0) { + info->rcvBuf = atoi(val); + } else if (strcmp(binding, "linger") == 0) { + info->linger = atoi(val); + } else if (strcmp(binding, "backlog") == 0) { + info->backlog = atoi(val); + } else if (strcmp(binding, "sndTimeout") == 0) { + info->sndTimeout = atoi(val); + } else if (strcmp(binding, "rcvTimeout") == 0) { + info->rcvTimeout = atoi(val); + } else if (strcmp(binding, "maxMsgSize") == 0) { + info->maxMsgSize = atoi(val); + } else if (strcmp(binding, "rate") == 0) { + info->rate = atoi(val); + } else if (strcmp(binding, "recoveryIVL") == 0) { + info->recoveryIVL = atoi(val); + } else if (strcmp(binding, "multicastHops") == 0) { + info->multicastHops = atoi(val); + } else if (strcmp(binding, "reconnectIVL") == 0) { + info->reconnectIVL = atoi(val); + } else if (strcmp(binding, "reconnectIVLMax") == 0) { + info->reconnectIVLMax = atoi(val); + } else if (strcmp(binding, "ipv4Only") == 0) { + info->ipv4Only = atoi(val); + } else if (strcmp(binding, "affinity") == 0) { + info->affinity = atoi(val); + } else { + errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding); + return RS_RET_INVALID_PARAMS; + } + } + + return RS_RET_OK; +} + +static rsRetVal validateConfig(socket_info* info) { + + if (info->type == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid type"); + return RS_RET_INVALID_PARAMS; + } + if (info->action == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid action"); + return RS_RET_INVALID_PARAMS; + } + if (info->description == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you didn't enter a description"); + return RS_RET_INVALID_PARAMS; + } + if(info->type == ZMQ_SUB && info->subscriptions == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "SUB sockets need at least one subscription"); + return RS_RET_INVALID_PARAMS; + } + if(info->type != ZMQ_SUB && info->subscriptions != NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "only SUB sockets can have subscriptions"); + return RS_RET_INVALID_PARAMS; + } + return RS_RET_OK; +} + +static rsRetVal createContext() { + if (s_context == NULL) { + errmsg.LogError(0, NO_ERRCODE, "creating zctx."); + s_context = zctx_new(); + + if (s_context == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "zctx_new failed: %s", + strerror(errno)); + // DK: really should do better than invalid params... + return RS_RET_INVALID_PARAMS; + } + + if (s_io_threads > 1) { + errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads); + zctx_set_iothreads(s_context, s_io_threads); + } + } + return RS_RET_OK; +} + +static rsRetVal createSocket(socket_info* info, void** sock) { + size_t ii; + int rv; + + *sock = zsocket_new(s_context, info->type); + if (!sock) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zsocket_new failed: %s, for type %d", + strerror(errno),info->type); + // DK: invalid params seems right here. + return RS_RET_INVALID_PARAMS; + } + + // Set options *before* the connect/bind. + if (info->identity) zsocket_set_identity(*sock, info->identity); + if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); + if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf); + if (info->linger > -1) zsocket_set_linger(*sock, info->linger); + if (info->backlog > -1) zsocket_set_backlog(*sock, info->backlog); + if (info->sndTimeout > -1) zsocket_set_sndtimeo(*sock, info->sndTimeout); + if (info->rcvTimeout > -1) zsocket_set_rcvtimeo(*sock, info->rcvTimeout); + if (info->maxMsgSize > -1) zsocket_set_maxmsgsize(*sock, info->maxMsgSize); + if (info->rate > -1) zsocket_set_rate(*sock, info->rate); + if (info->recoveryIVL > -1) zsocket_set_recovery_ivl(*sock, info->recoveryIVL); + if (info->multicastHops > -1) zsocket_set_multicast_hops(*sock, info->multicastHops); + if (info->reconnectIVL > -1) zsocket_set_reconnect_ivl(*sock, info->reconnectIVL); + if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax); + if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); + if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); + + // since HWM have defaults, we always set them. No return codes to check, either. + zsocket_set_sndhwm(*sock, info->sndHWM); + zsocket_set_rcvhwm(*sock, info->rcvHWM); + + // Set subscriptions. + for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) + zsocket_set_subscribe(*sock, info->subscriptions[ii]); + + + + // Do the bind/connect... + if (info->action==ACTION_CONNECT) { + rv = zsocket_connect(*sock, info->description); + if (rv < 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_connect using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } else { + rv = zsocket_bind(*sock, info->description); + if (rv <= 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_bind using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } + return RS_RET_OK; +} + +/* ---------------------------------------------------------------------------- + * Module endpoints + */ + +/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note + * that this makes the assumption that after the bind ruleset is called in the config, + * another call will be made to add an endpoint. +*/ +static rsRetVal +set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) { + ruleset_t* ruleset_ptr; + rsRetVal localRet; + DEFiRet; + + localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName); + if(localRet == RS_RET_NOT_FOUND) { + errmsg.LogError(0, NO_ERRCODE, "error: " + "ruleset '%s' not found - ignored", pszName); + } + CHKiRet(localRet); + s_ruleset = ruleset_ptr; + DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName); + +finalize_it: + free(pszName); /* no longer needed */ + RETiRet; +} + +/* add an actual endpoint + */ +static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { + DEFiRet; + + // increment number of items and store old num items, as it will be handy. + size_t idx = s_nitems++; + + // allocate a new socket_info array to accomidate this new endpoint + socket_info* tmpSocketInfo; + CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); + + // copy existing socket_info across into new array, if any, and free old storage + if(idx) { + memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); + free(s_socketInfo); + } + + // set the static to hold the new array + s_socketInfo = tmpSocketInfo; + + // point to the new one + socket_info* sockInfo = &s_socketInfo[idx]; + + // set defaults for the new socket info + setDefaults(sockInfo); + + // Make a writeable copy of the string so we can use strtok + // in the parseConfig call + char * copy = NULL; + CHKmalloc(copy = strdup((char *) valp)); + + // parse the config string + CHKiRet(parseConfig(copy, sockInfo)); + + // validate it + CHKiRet(validateConfig(sockInfo)); + + // bind to the current ruleset (if any) + sockInfo->ruleset = s_ruleset; + +finalize_it: + free(valp); /* in any case, this is no longer needed */ + RETiRet; +} + + +static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) { + msg_t* logmsg; + poller_data* pollerData = (poller_data*)pd; + + char* buf = zstr_recv(poller->socket); + if (msgConstruct(&logmsg) == RS_RET_OK) { + MsgSetRawMsg(logmsg, buf, strlen(buf)); + MsgSetInputName(logmsg, s_namep); + MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(logmsg, pollerData->ruleset); + logmsg->msgFlags = NEEDS_PARSING; + submitMsg(logmsg); + } + + if( pollerData->thread->bShallStop == TRUE) { + // a handler that returns -1 will terminate the + // czmq reactor loop + return -1; + } + + return 0; +} + +/* called when runInput is called by rsyslog + */ +static rsRetVal rcv_loop(thrdInfo_t* pThrd){ + size_t i; + int rv; + zmq_pollitem_t* items; + poller_data* pollerData; + + DEFiRet; + + // create the context + CHKiRet(createContext()); + + // create the poll items + CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); + + // create poller data (stuff to pass into the zmq closure called when we get a message) + CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); + + // loop through and initialize the poll items and poller_data arrays... + for(i=0; i. +* +* Author: David Kelly +* +*/ + + +#include "config.h" +#include "rsyslog.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "cfsysline.h" + +#include + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omzmq3") + +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +/* convienent symbols to denote a socket we want to bind +p * vs one we want to just connect to + */ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +struct socket_type { + char* name; + int type; +}; + +// more overkill, but seems nice to be consistent. +struct socket_action { + char* name; + int action; +}; + +typedef struct _instanceData { + void* socket; + uchar* description; + int type; + int action; + int sndHWM; + int rcvHWM; + uchar* identity; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + uchar* tplName; +} instanceData; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations + */ + +// only 1 zctx for all the sockets, with an adjustable number of +// worker threads which may be useful if we use affinity in particular +// sockets +static zctx_t* s_context = NULL; +static int s_workerThreads = -1; + +static struct socket_type types[] = { + {"PUB", ZMQ_PUB }, + {"PUSH", ZMQ_PUSH }, + {"XPUB", ZMQ_XPUB } +}; + +static struct socket_action actions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + +static struct cnfparamdescr actpdescr[] = { + { "description", eCmdHdlrGetWord, 0 }, + { "sockType", eCmdHdlrGetWord, 0 }, + { "action", eCmdHdlrGetWord, 0 }, + { "sndHWM", eCmdHdlrInt, 0 }, + { "rcvHWM", eCmdHdlrInt, 0 }, + { "identity", eCmdHdlrGetWord, 0 }, + { "sndBuf", eCmdHdlrInt, 0 }, + { "rcvBuf", eCmdHdlrInt, 0 }, + { "linger", eCmdHdlrInt, 0 }, + { "backlog", eCmdHdlrInt, 0 }, + { "sndTimeout", eCmdHdlrInt, 0 }, + { "rcvTimeout", eCmdHdlrInt, 0 }, + { "maxMsgSize", eCmdHdlrInt, 0 }, + { "rate", eCmdHdlrInt, 0 }, + { "recoveryIVL", eCmdHdlrInt, 0 }, + { "multicastHops", eCmdHdlrInt, 0 }, + { "reconnectIVL", eCmdHdlrInt, 0 }, + { "reconnectIVLMax", eCmdHdlrInt, 0 }, + { "ipv4Only", eCmdHdlrInt, 0 }, + { "affinity", eCmdHdlrInt, 0 }, + { "globalWorkerThreads", eCmdHdlrInt, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; + +static struct cnfparamblk actpblk = { + CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr +}; + +/* ---------------------------------------------------------------------------- + * Helper Functions + */ + +// get the name of a socket type, return the ZMQ_XXX type +// or -1 if not a supported type (see above) +int getSocketType(char* name) { + int type = -1; + uint i; + for(i=0; isocket) { + if(pData->socket != NULL) { + zsocket_destroy(s_context, pData->socket); + } + } +} + + +static rsRetVal initZMQ(instanceData* pData) { + DEFiRet; + + // create the context if necessary. + if (NULL == s_context) { + s_context = zctx_new(); + if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); + } + + pData->socket = zsocket_new(s_context, pData->type); + + // ALWAYS set the HWM as the zmq3 default is 1000 and we default + // to 0 (infinity) + zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); + zsocket_set_sndhwm(pData->socket, pData->sndHWM); + + // use czmq defaults for these, unless set to non-default values + if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); + if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); + if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf); + if(pData->linger > -1) zsocket_set_linger(pData->socket, pData->linger); + if(pData->backlog > -1) zsocket_set_backlog(pData->socket, pData->backlog); + if(pData->sndTimeout > -1) zsocket_set_sndtimeo(pData->socket, pData->sndTimeout); + if(pData->rcvTimeout > -1) zsocket_set_rcvtimeo(pData->socket, pData->rcvTimeout); + if(pData->maxMsgSize > -1) zsocket_set_maxmsgsize(pData->socket, pData->maxMsgSize); + if(pData->rate > -1) zsocket_set_rate(pData->socket, pData->rate); + if(pData->recoveryIVL > -1) zsocket_set_recovery_ivl(pData->socket, pData->recoveryIVL); + if(pData->multicastHops > -1) zsocket_set_multicast_hops(pData->socket, pData->multicastHops); + if(pData->reconnectIVL > -1) zsocket_set_reconnect_ivl(pData->socket, pData->reconnectIVL); + if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax); + if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); + if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); + + // bind or connect to it + if (pData->action == ACTION_BIND) { + // bind asserts, so no need to test return val here + // which isn't the greatest api -- oh well + zsocket_bind(pData->socket, (char*)pData->description); + } else { + if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { + errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + } + finalize_it: + RETiRet; +} + +rsRetVal writeZMQ(uchar* msg, instanceData* pData) { + DEFiRet; + + // initialize if necessary + if(NULL == pData->socket) + CHKiRet(initZMQ(pData)); + + // send the shit... + int result = zstr_send(pData->socket, (char*)msg); + + // whine if shit went wrong + if (result == -1) { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); + ABORT_FINALIZE(RS_RET_ERR); + } + finalize_it: + RETiRet; +} + +static inline void +setInstParamDefaults(instanceData* pData) { + pData->description = (uchar*)"tcp://*:7171"; + pData->socket = NULL; + pData->tplName = NULL; + pData->type = ZMQ_PUB; + pData->action = ACTION_BIND; + pData->sndHWM = 0; // unlimited + pData->rcvHWM = 0; // unlimited + pData->identity = NULL; + pData->sndBuf = -1; + pData->rcvBuf = -1; + pData->linger = -1; + pData->backlog = -1; + pData->sndTimeout = -1; + pData->rcvTimeout = -1; + pData->maxMsgSize = -1; + pData->rate = -1; + pData->recoveryIVL = -1; + pData->multicastHops = -1; + pData->reconnectIVL = -1; + pData->reconnectIVLMax = -1; + pData->ipv4Only = -1; + pData->affinity = 1; +} + + +/* ---------------------------------------------------------------------------- + * Output Module Functions + */ + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINfreeInstance +CODESTARTfreeInstance + closeZMQ(pData); + free(pData->description); + free(pData->tplName); +ENDfreeInstance + +BEGINtryResume +CODESTARTtryResume + if(NULL == pData->socket) + iRet = initZMQ(pData); +ENDtryResume + +BEGINdoAction +CODESTARTdoAction +iRet = writeZMQ(ppString[0], pData); +ENDdoAction + + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst +if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + +CHKiRet(createInstance(&pData)); +setInstParamDefaults(pData); + +CODE_STD_STRING_REQUESTparseSelectorAct(1) +for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "description")) { + pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sockType")){ + pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "action")){ + pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { + pData->sndHWM = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { + pData->rcvHWM = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "identity")){ + pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { + pData->sndBuf = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { + pData->rcvBuf = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "linger")) { + pData->linger = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "backlog")) { + pData->backlog = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { + pData->sndTimeout = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { + pData->rcvTimeout = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { + pData->maxMsgSize = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "rate")) { + pData->rate = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { + pData->recoveryIVL = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { + pData->multicastHops = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { + pData->reconnectIVL = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { + pData->reconnectIVLMax = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { + pData->ipv4Only = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "affinity")) { + pData->affinity = (int) pvals[i].val.d.n, NULL; + } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { + s_workerThreads = (int) pvals[i].val.d.n, NULL; + } else { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + +if(pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + } else { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); + } + +if(pData->type == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } +if(pData->action == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + +BEGINparseSelectorAct +CODESTARTparseSelectorAct + +/* tell the engine we only want one template string */ +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1)) + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omzmq3 supports only v6 config format, use: " + "action(type=\"omzmq3\" serverport=...)"); + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINinitConfVars /* (re)set config variables to defaults */ +CODESTARTinitConfVars +s_workerThreads = -1; +ENDinitConfVars + +BEGINmodExit +CODESTARTmodExit +if(NULL != s_context) { + zctx_destroy(&s_context); + s_context=NULL; + } +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +ENDqueryEtryPt + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION); + +INITLegCnfVars +CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); +ENDmodInit + + + diff --git a/runtime/statsobj.c b/runtime/statsobj.c index a21614f6..25275616 100644 --- a/runtime/statsobj.c +++ b/runtime/statsobj.c @@ -168,15 +168,18 @@ finalize_it: /* get all the object's countes together as CEE. */ static rsRetVal -getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr) +getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr, int cee_cookie) { cstr_t *pcstr; ctr_t *pCtr; DEFiRet; CHKiRet(cstrConstruct(&pcstr)); - rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("@cee: {"), 7); + if (cee_cookie == 1) + rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("@cee: "), 6); + + rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("{"), 1); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("\""), 1); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("name"), 4); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("\""), 1); @@ -273,8 +276,11 @@ getAllStatsLines(rsRetVal(*cb)(void*, cstr_t*), void *usrptr, statsFmtType_t fmt case statsFmt_Legacy: CHKiRet(getStatsLine(o, &cstr)); break; + case statsFmt_CEE: + CHKiRet(getStatsLineCEE(o, &cstr, 1)); + break; case statsFmt_JSON: - CHKiRet(getStatsLineCEE(o, &cstr)); + CHKiRet(getStatsLineCEE(o, &cstr, 0)); break; } CHKiRet(cb(usrptr, cstr)); diff --git a/runtime/statsobj.h b/runtime/statsobj.h index f7de68ee..14b33215 100644 --- a/runtime/statsobj.h +++ b/runtime/statsobj.h @@ -46,7 +46,8 @@ typedef enum statsCtrType_e { /* stats line format types */ typedef enum statsFmtType_e { statsFmt_Legacy, - statsFmt_JSON + statsFmt_JSON, + statsFmt_CEE } statsFmtType_t; -- cgit From d886dd979ebc94b45eb31255a24d49316dfcf6d0 Mon Sep 17 00:00:00 2001 From: David Kelly Date: Thu, 31 May 2012 15:25:00 -0700 Subject: Minor updates * C++ style comments converted to c-style * copy/paste error (benign I believe) in omzmq3 fixed * added readme files for both imzmq3 and omzmq3 --- plugins/imzmq3/README | 24 +++++++++++++++ plugins/imzmq3/imzmq3.c | 78 +++++++++++++++++++++++++------------------------ plugins/omzmq3/README | 25 ++++++++++++++++ plugins/omzmq3/omzmq3.c | 76 ++++++++++++++++++++++++----------------------- 4 files changed, 128 insertions(+), 75 deletions(-) create mode 100644 plugins/imzmq3/README create mode 100644 plugins/omzmq3/README diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README new file mode 100644 index 00000000..88653b83 --- /dev/null +++ b/plugins/imzmq3/README @@ -0,0 +1,24 @@ +ZeroMQ 3.x Input Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example +below binds a SUB socket to port 7172, and then any messages with the topic +"foo" will be pushed into rsyslog. + +Example Rsyslog.conf snippet: +------------------------------------------------------------------------------- + +$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo + +------------------------------------------------------------------------------- diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c index 0195fd20..78eee887 100644 --- a/plugins/imzmq3/imzmq3.c +++ b/plugins/imzmq3/imzmq3.c @@ -72,7 +72,7 @@ typedef struct _socket_type { int type; } socket_type; -// more overkill, but seems nice to be consistent. +/* more overkill, but seems nice to be consistent.*/ typedef struct _socket_action { char* name; int action; @@ -87,8 +87,8 @@ typedef struct _socket_info { int type; int action; char* description; - int sndHWM; // if you want more than 2^32 messages, - int rcvHWM; // then pass in 0 (the default). + int sndHWM; /* if you want more than 2^32 messages, */ + int rcvHWM; /* then pass in 0 (the default). */ char* identity; char** subscriptions; ruleset_t* ruleset; @@ -136,13 +136,14 @@ static socket_action socketActions[] = { * Helper functions */ -// get the name of a socket type, return the ZMQ_XXX type -// or -1 if not a supported type (see above) +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ static int getSocketType(char* name) { int type = -1; uint i; - // match name with known socket type + /* match name with known socket type */ for(i=0; isndHWM = atoi(val); } else if (strcmp(binding, "subscribe") == 0) { - // Add the subscription value to the list. + /* Add the subscription value to the list.*/ char * substr = NULL; substr = strdup(val); info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); @@ -326,7 +327,7 @@ static rsRetVal createContext() { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zctx_new failed: %s", strerror(errno)); - // DK: really should do better than invalid params... + /* DK: really should do better than invalid params...*/ return RS_RET_INVALID_PARAMS; } @@ -348,11 +349,11 @@ static rsRetVal createSocket(socket_info* info, void** sock) { RS_RET_INVALID_PARAMS, "zsocket_new failed: %s, for type %d", strerror(errno),info->type); - // DK: invalid params seems right here. + /* DK: invalid params seems right here */ return RS_RET_INVALID_PARAMS; } - // Set options *before* the connect/bind. + /* Set options *before* the connect/bind. */ if (info->identity) zsocket_set_identity(*sock, info->identity); if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf); @@ -369,17 +370,17 @@ static rsRetVal createSocket(socket_info* info, void** sock) { if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); - // since HWM have defaults, we always set them. No return codes to check, either. + /* since HWM have defaults, we always set them. No return codes to check, either.*/ zsocket_set_sndhwm(*sock, info->sndHWM); zsocket_set_rcvhwm(*sock, info->rcvHWM); - // Set subscriptions. + /* Set subscriptions.*/ for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) zsocket_set_subscribe(*sock, info->subscriptions[ii]); - // Do the bind/connect... + /* Do the bind/connect... */ if (info->action==ACTION_CONNECT) { rv = zsocket_connect(*sock, info->description); if (rv < 0) { @@ -435,40 +436,40 @@ finalize_it: static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { DEFiRet; - // increment number of items and store old num items, as it will be handy. + /* increment number of items and store old num items, as it will be handy.*/ size_t idx = s_nitems++; - // allocate a new socket_info array to accomidate this new endpoint + /* allocate a new socket_info array to accomidate this new endpoint*/ socket_info* tmpSocketInfo; CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); - // copy existing socket_info across into new array, if any, and free old storage + /* copy existing socket_info across into new array, if any, and free old storage*/ if(idx) { memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); free(s_socketInfo); } - // set the static to hold the new array + /* set the static to hold the new array */ s_socketInfo = tmpSocketInfo; - // point to the new one + /* point to the new one */ socket_info* sockInfo = &s_socketInfo[idx]; - // set defaults for the new socket info + /* set defaults for the new socket info */ setDefaults(sockInfo); - // Make a writeable copy of the string so we can use strtok - // in the parseConfig call + /* Make a writeable copy of the string so we can use strtok + in the parseConfig call */ char * copy = NULL; CHKmalloc(copy = strdup((char *) valp)); - // parse the config string + /* parse the config string */ CHKiRet(parseConfig(copy, sockInfo)); - // validate it + /* validate it */ CHKiRet(validateConfig(sockInfo)); - // bind to the current ruleset (if any) + /* bind to the current ruleset (if any)*/ sockInfo->ruleset = s_ruleset; finalize_it: @@ -492,8 +493,9 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po } if( pollerData->thread->bShallStop == TRUE) { - // a handler that returns -1 will terminate the - // czmq reactor loop + /* a handler that returns -1 will terminate the + czmq reactor loop + */ return -1; } @@ -510,22 +512,22 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){ DEFiRet; - // create the context + /* create the context*/ CHKiRet(createContext()); - // create the poll items + /* create the poll items*/ CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); - // create poller data (stuff to pass into the zmq closure called when we get a message) + /* create poller data (stuff to pass into the zmq closure called when we get a message)*/ CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); - // loop through and initialize the poll items and poller_data arrays... + /* loop through and initialize the poll items and poller_data arrays...*/ for(i=0; itplName = NULL; pData->type = ZMQ_PUB; pData->action = ACTION_BIND; - pData->sndHWM = 0; // unlimited - pData->rcvHWM = 0; // unlimited + pData->sndHWM = 0; /*unlimited*/ + pData->rcvHWM = 0; /*unlimited*/ pData->identity = NULL; pData->sndBuf = -1; pData->rcvBuf = -1; @@ -350,41 +352,41 @@ for(i = 0 ; i < actpblk.nParams ; ++i) { } else if(!strcmp(actpblk.descr[i].name, "action")){ pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { - pData->sndHWM = (int) pvals[i].val.d.n, NULL; + pData->sndHWM = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { - pData->rcvHWM = (int) pvals[i].val.d.n, NULL; + pData->rcvHWM = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "identity")){ pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { - pData->sndBuf = (int) pvals[i].val.d.n, NULL; + pData->sndBuf = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { - pData->rcvBuf = (int) pvals[i].val.d.n, NULL; + pData->rcvBuf = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "linger")) { - pData->linger = (int) pvals[i].val.d.n, NULL; + pData->linger = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "backlog")) { - pData->backlog = (int) pvals[i].val.d.n, NULL; + pData->backlog = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { - pData->sndTimeout = (int) pvals[i].val.d.n, NULL; + pData->sndTimeout = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { - pData->rcvTimeout = (int) pvals[i].val.d.n, NULL; + pData->rcvTimeout = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { - pData->maxMsgSize = (int) pvals[i].val.d.n, NULL; + pData->maxMsgSize = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rate")) { - pData->rate = (int) pvals[i].val.d.n, NULL; + pData->rate = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { - pData->recoveryIVL = (int) pvals[i].val.d.n, NULL; + pData->recoveryIVL = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { - pData->multicastHops = (int) pvals[i].val.d.n, NULL; + pData->multicastHops = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { - pData->reconnectIVL = (int) pvals[i].val.d.n, NULL; + pData->reconnectIVL = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { - pData->reconnectIVLMax = (int) pvals[i].val.d.n, NULL; + pData->reconnectIVLMax = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { - pData->ipv4Only = (int) pvals[i].val.d.n, NULL; + pData->ipv4Only = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "affinity")) { - pData->affinity = (int) pvals[i].val.d.n, NULL; + pData->affinity = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { - s_workerThreads = (int) pvals[i].val.d.n, NULL; + s_workerThreads = (int) pvals[i].val.d.n; } else { errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); -- cgit