From d886dd979ebc94b45eb31255a24d49316dfcf6d0 Mon Sep 17 00:00:00 2001 From: David Kelly Date: Thu, 31 May 2012 15:25:00 -0700 Subject: Minor updates * C++ style comments converted to c-style * copy/paste error (benign I believe) in omzmq3 fixed * added readme files for both imzmq3 and omzmq3 --- plugins/imzmq3/README | 24 +++++++++++++++ plugins/imzmq3/imzmq3.c | 78 +++++++++++++++++++++++++------------------------ plugins/omzmq3/README | 25 ++++++++++++++++ plugins/omzmq3/omzmq3.c | 76 ++++++++++++++++++++++++----------------------- 4 files changed, 128 insertions(+), 75 deletions(-) create mode 100644 plugins/imzmq3/README create mode 100644 plugins/omzmq3/README diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README new file mode 100644 index 00000000..88653b83 --- /dev/null +++ b/plugins/imzmq3/README @@ -0,0 +1,24 @@ +ZeroMQ 3.x Input Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example +below binds a SUB socket to port 7172, and then any messages with the topic +"foo" will be pushed into rsyslog. + +Example Rsyslog.conf snippet: +------------------------------------------------------------------------------- + +$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo + +------------------------------------------------------------------------------- diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c index 0195fd20..78eee887 100644 --- a/plugins/imzmq3/imzmq3.c +++ b/plugins/imzmq3/imzmq3.c @@ -72,7 +72,7 @@ typedef struct _socket_type { int type; } socket_type; -// more overkill, but seems nice to be consistent. +/* more overkill, but seems nice to be consistent.*/ typedef struct _socket_action { char* name; int action; @@ -87,8 +87,8 @@ typedef struct _socket_info { int type; int action; char* description; - int sndHWM; // if you want more than 2^32 messages, - int rcvHWM; // then pass in 0 (the default). + int sndHWM; /* if you want more than 2^32 messages, */ + int rcvHWM; /* then pass in 0 (the default). */ char* identity; char** subscriptions; ruleset_t* ruleset; @@ -136,13 +136,14 @@ static socket_action socketActions[] = { * Helper functions */ -// get the name of a socket type, return the ZMQ_XXX type -// or -1 if not a supported type (see above) +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ static int getSocketType(char* name) { int type = -1; uint i; - // match name with known socket type + /* match name with known socket type */ for(i=0; isndHWM = atoi(val); } else if (strcmp(binding, "subscribe") == 0) { - // Add the subscription value to the list. + /* Add the subscription value to the list.*/ char * substr = NULL; substr = strdup(val); info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); @@ -326,7 +327,7 @@ static rsRetVal createContext() { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "zctx_new failed: %s", strerror(errno)); - // DK: really should do better than invalid params... + /* DK: really should do better than invalid params...*/ return RS_RET_INVALID_PARAMS; } @@ -348,11 +349,11 @@ static rsRetVal createSocket(socket_info* info, void** sock) { RS_RET_INVALID_PARAMS, "zsocket_new failed: %s, for type %d", strerror(errno),info->type); - // DK: invalid params seems right here. + /* DK: invalid params seems right here */ return RS_RET_INVALID_PARAMS; } - // Set options *before* the connect/bind. + /* Set options *before* the connect/bind. */ if (info->identity) zsocket_set_identity(*sock, info->identity); if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf); @@ -369,17 +370,17 @@ static rsRetVal createSocket(socket_info* info, void** sock) { if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); - // since HWM have defaults, we always set them. No return codes to check, either. + /* since HWM have defaults, we always set them. No return codes to check, either.*/ zsocket_set_sndhwm(*sock, info->sndHWM); zsocket_set_rcvhwm(*sock, info->rcvHWM); - // Set subscriptions. + /* Set subscriptions.*/ for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) zsocket_set_subscribe(*sock, info->subscriptions[ii]); - // Do the bind/connect... + /* Do the bind/connect... */ if (info->action==ACTION_CONNECT) { rv = zsocket_connect(*sock, info->description); if (rv < 0) { @@ -435,40 +436,40 @@ finalize_it: static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { DEFiRet; - // increment number of items and store old num items, as it will be handy. + /* increment number of items and store old num items, as it will be handy.*/ size_t idx = s_nitems++; - // allocate a new socket_info array to accomidate this new endpoint + /* allocate a new socket_info array to accomidate this new endpoint*/ socket_info* tmpSocketInfo; CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); - // copy existing socket_info across into new array, if any, and free old storage + /* copy existing socket_info across into new array, if any, and free old storage*/ if(idx) { memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); free(s_socketInfo); } - // set the static to hold the new array + /* set the static to hold the new array */ s_socketInfo = tmpSocketInfo; - // point to the new one + /* point to the new one */ socket_info* sockInfo = &s_socketInfo[idx]; - // set defaults for the new socket info + /* set defaults for the new socket info */ setDefaults(sockInfo); - // Make a writeable copy of the string so we can use strtok - // in the parseConfig call + /* Make a writeable copy of the string so we can use strtok + in the parseConfig call */ char * copy = NULL; CHKmalloc(copy = strdup((char *) valp)); - // parse the config string + /* parse the config string */ CHKiRet(parseConfig(copy, sockInfo)); - // validate it + /* validate it */ CHKiRet(validateConfig(sockInfo)); - // bind to the current ruleset (if any) + /* bind to the current ruleset (if any)*/ sockInfo->ruleset = s_ruleset; finalize_it: @@ -492,8 +493,9 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po } if( pollerData->thread->bShallStop == TRUE) { - // a handler that returns -1 will terminate the - // czmq reactor loop + /* a handler that returns -1 will terminate the + czmq reactor loop + */ return -1; } @@ -510,22 +512,22 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){ DEFiRet; - // create the context + /* create the context*/ CHKiRet(createContext()); - // create the poll items + /* create the poll items*/ CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); - // create poller data (stuff to pass into the zmq closure called when we get a message) + /* create poller data (stuff to pass into the zmq closure called when we get a message)*/ CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); - // loop through and initialize the poll items and poller_data arrays... + /* loop through and initialize the poll items and poller_data arrays...*/ for(i=0; itplName = NULL; pData->type = ZMQ_PUB; pData->action = ACTION_BIND; - pData->sndHWM = 0; // unlimited - pData->rcvHWM = 0; // unlimited + pData->sndHWM = 0; /*unlimited*/ + pData->rcvHWM = 0; /*unlimited*/ pData->identity = NULL; pData->sndBuf = -1; pData->rcvBuf = -1; @@ -350,41 +352,41 @@ for(i = 0 ; i < actpblk.nParams ; ++i) { } else if(!strcmp(actpblk.descr[i].name, "action")){ pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { - pData->sndHWM = (int) pvals[i].val.d.n, NULL; + pData->sndHWM = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { - pData->rcvHWM = (int) pvals[i].val.d.n, NULL; + pData->rcvHWM = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "identity")){ pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { - pData->sndBuf = (int) pvals[i].val.d.n, NULL; + pData->sndBuf = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { - pData->rcvBuf = (int) pvals[i].val.d.n, NULL; + pData->rcvBuf = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "linger")) { - pData->linger = (int) pvals[i].val.d.n, NULL; + pData->linger = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "backlog")) { - pData->backlog = (int) pvals[i].val.d.n, NULL; + pData->backlog = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { - pData->sndTimeout = (int) pvals[i].val.d.n, NULL; + pData->sndTimeout = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { - pData->rcvTimeout = (int) pvals[i].val.d.n, NULL; + pData->rcvTimeout = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { - pData->maxMsgSize = (int) pvals[i].val.d.n, NULL; + pData->maxMsgSize = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "rate")) { - pData->rate = (int) pvals[i].val.d.n, NULL; + pData->rate = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { - pData->recoveryIVL = (int) pvals[i].val.d.n, NULL; + pData->recoveryIVL = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { - pData->multicastHops = (int) pvals[i].val.d.n, NULL; + pData->multicastHops = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { - pData->reconnectIVL = (int) pvals[i].val.d.n, NULL; + pData->reconnectIVL = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { - pData->reconnectIVLMax = (int) pvals[i].val.d.n, NULL; + pData->reconnectIVLMax = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { - pData->ipv4Only = (int) pvals[i].val.d.n, NULL; + pData->ipv4Only = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "affinity")) { - pData->affinity = (int) pvals[i].val.d.n, NULL; + pData->affinity = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { - s_workerThreads = (int) pvals[i].val.d.n, NULL; + s_workerThreads = (int) pvals[i].val.d.n; } else { errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); -- cgit