summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Kelly <davidk@talksum.com>2012-05-31 15:25:00 -0700
committerRainer Gerhards <rgerhards@adiscon.com>2012-06-01 18:44:21 +0200
commitd886dd979ebc94b45eb31255a24d49316dfcf6d0 (patch)
tree01047fd4854f9a8a6c3e7abbeee5ca58680e6ee6
parentcbff73d94c3a86ed74294fe1265dc5242f9317be (diff)
downloadrsyslog-d886dd979ebc94b45eb31255a24d49316dfcf6d0.tar.gz
rsyslog-d886dd979ebc94b45eb31255a24d49316dfcf6d0.tar.xz
rsyslog-d886dd979ebc94b45eb31255a24d49316dfcf6d0.zip
Minor updates
* C++ style comments converted to c-style * copy/paste error (benign I believe) in omzmq3 fixed * added readme files for both imzmq3 and omzmq3
-rw-r--r--plugins/imzmq3/README24
-rw-r--r--plugins/imzmq3/imzmq3.c78
-rw-r--r--plugins/omzmq3/README25
-rw-r--r--plugins/omzmq3/omzmq3.c76
4 files changed, 128 insertions, 75 deletions
diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README
new file mode 100644
index 00000000..88653b83
--- /dev/null
+++ b/plugins/imzmq3/README
@@ -0,0 +1,24 @@
+ZeroMQ 3.x Input Plugin
+
+Building this plugin:
+Requires libzmq and libczmq. First, install libzmq from the HEAD on github:
+http://github.com/zeromq/libzmq. You can clone the repository, build, then
+install it. The directions for doing so are there in the readme. Then, do
+the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1
+version of libzmq will be released, and a supporting version of libczmq.
+At that time, you could simply download and install the tarballs instead of
+using git to clone the repositories. Those tarballs (when available) can
+be found at http://download.zeromq.org. As of this writing (5/31/2012), the
+most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile
+properly.
+
+Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example
+below binds a SUB socket to port 7172, and then any messages with the topic
+"foo" will be pushed into rsyslog.
+
+Example Rsyslog.conf snippet:
+-------------------------------------------------------------------------------
+
+$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo
+
+-------------------------------------------------------------------------------
diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c
index 0195fd20..78eee887 100644
--- a/plugins/imzmq3/imzmq3.c
+++ b/plugins/imzmq3/imzmq3.c
@@ -72,7 +72,7 @@ typedef struct _socket_type {
int type;
} socket_type;
-// more overkill, but seems nice to be consistent.
+/* more overkill, but seems nice to be consistent.*/
typedef struct _socket_action {
char* name;
int action;
@@ -87,8 +87,8 @@ typedef struct _socket_info {
int type;
int action;
char* description;
- int sndHWM; // if you want more than 2^32 messages,
- int rcvHWM; // then pass in 0 (the default).
+ int sndHWM; /* if you want more than 2^32 messages, */
+ int rcvHWM; /* then pass in 0 (the default). */
char* identity;
char** subscriptions;
ruleset_t* ruleset;
@@ -136,13 +136,14 @@ static socket_action socketActions[] = {
* Helper functions
*/
-// get the name of a socket type, return the ZMQ_XXX type
-// or -1 if not a supported type (see above)
+/* get the name of a socket type, return the ZMQ_XXX type
+ or -1 if not a supported type (see above)
+*/
static int getSocketType(char* name) {
int type = -1;
uint i;
- // match name with known socket type
+ /* match name with known socket type */
for(i=0; i<sizeof(socketTypes)/sizeof(socket_type); ++i) {
if( !strcmp(socketTypes[i].name, name) ) {
type = socketTypes[i].type;
@@ -150,7 +151,7 @@ static int getSocketType(char* name) {
}
}
- // whine if no match was found.
+ /* whine if no match was found. */
if (type == -1)
errmsg.LogError(0, NO_ERRCODE, "unknown type %s",name);
@@ -162,7 +163,7 @@ static int getSocketAction(char* name) {
int action = -1;
uint i;
- // match name with known socket action
+ /* match name with known socket action */
for(i=0; i < sizeof(socketActions)/sizeof(socket_action); ++i) {
if(!strcmp(socketActions[i].name, name)) {
action = socketActions[i].action;
@@ -170,7 +171,7 @@ static int getSocketAction(char* name) {
}
}
- // whine if no matching action was found
+ /* whine if no matching action was found */
if (action == -1)
errmsg.LogError(0, NO_ERRCODE, "unknown action %s",name);
@@ -218,7 +219,7 @@ static rsRetVal parseConfig(char* config, socket_info* info) {
binding != NULL;
binding = strtok_r(NULL, ",", &ptr1)) {
- // Each binding looks like foo=bar
+ /* Each binding looks like foo=bar */
char * sep = strchr(binding, '=');
if (sep == NULL)
{
@@ -228,7 +229,7 @@ static rsRetVal parseConfig(char* config, socket_info* info) {
continue;
}
- // Replace '=' with '\0'.
+ /* Replace '=' with '\0'. */
*sep = '\0';
char * val = sep + 1;
@@ -244,7 +245,7 @@ static rsRetVal parseConfig(char* config, socket_info* info) {
} else if (strcmp(binding, "rcvHWM") == 0) {
info->sndHWM = atoi(val);
} else if (strcmp(binding, "subscribe") == 0) {
- // Add the subscription value to the list.
+ /* Add the subscription value to the list.*/
char * substr = NULL;
substr = strdup(val);
info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1);
@@ -326,7 +327,7 @@ static rsRetVal createContext() {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"zctx_new failed: %s",
strerror(errno));
- // DK: really should do better than invalid params...
+ /* DK: really should do better than invalid params...*/
return RS_RET_INVALID_PARAMS;
}
@@ -348,11 +349,11 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
RS_RET_INVALID_PARAMS,
"zsocket_new failed: %s, for type %d",
strerror(errno),info->type);
- // DK: invalid params seems right here.
+ /* DK: invalid params seems right here */
return RS_RET_INVALID_PARAMS;
}
- // Set options *before* the connect/bind.
+ /* Set options *before* the connect/bind. */
if (info->identity) zsocket_set_identity(*sock, info->identity);
if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf);
if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf);
@@ -369,17 +370,17 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only);
if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity);
- // since HWM have defaults, we always set them. No return codes to check, either.
+ /* since HWM have defaults, we always set them. No return codes to check, either.*/
zsocket_set_sndhwm(*sock, info->sndHWM);
zsocket_set_rcvhwm(*sock, info->rcvHWM);
- // Set subscriptions.
+ /* Set subscriptions.*/
for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii)
zsocket_set_subscribe(*sock, info->subscriptions[ii]);
- // Do the bind/connect...
+ /* Do the bind/connect... */
if (info->action==ACTION_CONNECT) {
rv = zsocket_connect(*sock, info->description);
if (rv < 0) {
@@ -435,40 +436,40 @@ finalize_it:
static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) {
DEFiRet;
- // increment number of items and store old num items, as it will be handy.
+ /* increment number of items and store old num items, as it will be handy.*/
size_t idx = s_nitems++;
- // allocate a new socket_info array to accomidate this new endpoint
+ /* allocate a new socket_info array to accomidate this new endpoint*/
socket_info* tmpSocketInfo;
CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems));
- // copy existing socket_info across into new array, if any, and free old storage
+ /* copy existing socket_info across into new array, if any, and free old storage*/
if(idx) {
memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx);
free(s_socketInfo);
}
- // set the static to hold the new array
+ /* set the static to hold the new array */
s_socketInfo = tmpSocketInfo;
- // point to the new one
+ /* point to the new one */
socket_info* sockInfo = &s_socketInfo[idx];
- // set defaults for the new socket info
+ /* set defaults for the new socket info */
setDefaults(sockInfo);
- // Make a writeable copy of the string so we can use strtok
- // in the parseConfig call
+ /* Make a writeable copy of the string so we can use strtok
+ in the parseConfig call */
char * copy = NULL;
CHKmalloc(copy = strdup((char *) valp));
- // parse the config string
+ /* parse the config string */
CHKiRet(parseConfig(copy, sockInfo));
- // validate it
+ /* validate it */
CHKiRet(validateConfig(sockInfo));
- // bind to the current ruleset (if any)
+ /* bind to the current ruleset (if any)*/
sockInfo->ruleset = s_ruleset;
finalize_it:
@@ -492,8 +493,9 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po
}
if( pollerData->thread->bShallStop == TRUE) {
- // a handler that returns -1 will terminate the
- // czmq reactor loop
+ /* a handler that returns -1 will terminate the
+ czmq reactor loop
+ */
return -1;
}
@@ -510,22 +512,22 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){
DEFiRet;
- // create the context
+ /* create the context*/
CHKiRet(createContext());
- // create the poll items
+ /* create the poll items*/
CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems));
- // create poller data (stuff to pass into the zmq closure called when we get a message)
+ /* create poller data (stuff to pass into the zmq closure called when we get a message)*/
CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems));
- // loop through and initialize the poll items and poller_data arrays...
+ /* loop through and initialize the poll items and poller_data arrays...*/
for(i=0; i<s_nitems;++i) {
- // create the socket, update items.
+ /* create the socket, update items.*/
createSocket(&s_socketInfo[i], &items[i].socket);
items[i].events = ZMQ_POLLIN;
- // now update the poller_data for this item
+ /* now update the poller_data for this item */
pollerData[i].thread = pThrd;
pollerData[i].ruleset = s_socketInfo[i].ruleset;
}
@@ -572,7 +574,7 @@ CODESTARTwillRun
sizeof("imzmq3") - 1));
CHKiRet(prop.ConstructFinalize(s_namep));
- // If there are no endpoints this is pointless ...
+/* If there are no endpoints this is pointless ...*/
if (s_nitems == 0)
ABORT_FINALIZE(RS_RET_NO_RUN);
diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README
new file mode 100644
index 00000000..ccc96c74
--- /dev/null
+++ b/plugins/omzmq3/README
@@ -0,0 +1,25 @@
+ZeroMQ 3.x Output Plugin
+
+Building this plugin:
+Requires libzmq and libczmq. First, install libzmq from the HEAD on github:
+http://github.com/zeromq/libzmq. You can clone the repository, build, then
+install it. The directions for doing so are there in the readme. Then, do
+the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1
+version of libzmq will be released, and a supporting version of libczmq.
+At that time, you could simply download and install the tarballs instead of
+using git to clone the repositories. Those tarballs (when available) can
+be found at http://download.zeromq.org. As of this writing (5/31/2012), the
+most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile
+properly.
+
+Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example
+below binds a PUB socket to port 7171, and any message fitting the criteria will
+be output to the zmq socket.
+
+Example Rsyslog.conf snippet (NOTE: v6 format):
+-------------------------------------------------------------------------------
+if $msg then {
+ action(type="omzmq3", sockType="PUB", action="BIND",
+ description="tcp://*:7172)
+}
+-------------------------------------------------------------------------------
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c
index 885bc365..e13011fb 100644
--- a/plugins/omzmq3/omzmq3.c
+++ b/plugins/omzmq3/omzmq3.c
@@ -52,8 +52,8 @@ DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
/* convienent symbols to denote a socket we want to bind
-p * vs one we want to just connect to
- */
+ vs one we want to just connect to
+*/
#define ACTION_CONNECT 1
#define ACTION_BIND 2
@@ -66,7 +66,7 @@ struct socket_type {
int type;
};
-// more overkill, but seems nice to be consistent.
+/* more overkill, but seems nice to be consistent. */
struct socket_action {
char* name;
int action;
@@ -102,9 +102,10 @@ typedef struct _instanceData {
* Static definitions/initializations
*/
-// only 1 zctx for all the sockets, with an adjustable number of
-// worker threads which may be useful if we use affinity in particular
-// sockets
+/* only 1 zctx for all the sockets, with an adjustable number of
+ worker threads which may be useful if we use affinity in particular
+ sockets
+*/
static zctx_t* s_context = NULL;
static int s_workerThreads = -1;
@@ -154,8 +155,9 @@ static struct cnfparamblk actpblk = {
* Helper Functions
*/
-// get the name of a socket type, return the ZMQ_XXX type
-// or -1 if not a supported type (see above)
+/* get the name of a socket type, return the ZMQ_XXX type
+ or -1 if not a supported type (see above)
+*/
int getSocketType(char* name) {
int type = -1;
uint i;
@@ -197,7 +199,7 @@ static void closeZMQ(instanceData* pData) {
static rsRetVal initZMQ(instanceData* pData) {
DEFiRet;
- // create the context if necessary.
+ /* create the context if necessary. */
if (NULL == s_context) {
s_context = zctx_new();
if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads);
@@ -205,12 +207,12 @@ static rsRetVal initZMQ(instanceData* pData) {
pData->socket = zsocket_new(s_context, pData->type);
- // ALWAYS set the HWM as the zmq3 default is 1000 and we default
- // to 0 (infinity)
+ /* ALWAYS set the HWM as the zmq3 default is 1000 and we default
+ to 0 (infinity) */
zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
zsocket_set_sndhwm(pData->socket, pData->sndHWM);
- // use czmq defaults for these, unless set to non-default values
+ /* use czmq defaults for these, unless set to non-default values */
if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity);
if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf);
if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf);
@@ -227,10 +229,10 @@ static rsRetVal initZMQ(instanceData* pData) {
if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only);
if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity);
- // bind or connect to it
+ /* bind or connect to it */
if (pData->action == ACTION_BIND) {
- // bind asserts, so no need to test return val here
- // which isn't the greatest api -- oh well
+ /* bind asserts, so no need to test return val here
+ which isn't the greatest api -- oh well */
zsocket_bind(pData->socket, (char*)pData->description);
} else {
if(zsocket_connect(pData->socket, (char*)pData->description) == -1) {
@@ -245,14 +247,14 @@ static rsRetVal initZMQ(instanceData* pData) {
rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
DEFiRet;
- // initialize if necessary
+ /* initialize if necessary */
if(NULL == pData->socket)
CHKiRet(initZMQ(pData));
- // send the shit...
+ /* send it */
int result = zstr_send(pData->socket, (char*)msg);
- // whine if shit went wrong
+ /* whine if things went wrong */
if (result == -1) {
errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result);
ABORT_FINALIZE(RS_RET_ERR);
@@ -268,8 +270,8 @@ setInstParamDefaults(instanceData* pData) {
pData->tplName = NULL;
pData->type = ZMQ_PUB;
pData->action = ACTION_BIND;
- pData->sndHWM = 0; // unlimited
- pData->rcvHWM = 0; // unlimited
+ pData->sndHWM = 0; /*unlimited*/
+ pData->rcvHWM = 0; /*unlimited*/
pData->identity = NULL;
pData->sndBuf = -1;
pData->rcvBuf = -1;
@@ -350,41 +352,41 @@ for(i = 0 ; i < actpblk.nParams ; ++i) {
} else if(!strcmp(actpblk.descr[i].name, "action")){
pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
} else if(!strcmp(actpblk.descr[i].name, "sndHWM")) {
- pData->sndHWM = (int) pvals[i].val.d.n, NULL;
+ pData->sndHWM = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) {
- pData->rcvHWM = (int) pvals[i].val.d.n, NULL;
+ pData->rcvHWM = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "identity")){
pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "sndBuf")) {
- pData->sndBuf = (int) pvals[i].val.d.n, NULL;
+ pData->sndBuf = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) {
- pData->rcvBuf = (int) pvals[i].val.d.n, NULL;
+ pData->rcvBuf = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "linger")) {
- pData->linger = (int) pvals[i].val.d.n, NULL;
+ pData->linger = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "backlog")) {
- pData->backlog = (int) pvals[i].val.d.n, NULL;
+ pData->backlog = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) {
- pData->sndTimeout = (int) pvals[i].val.d.n, NULL;
+ pData->sndTimeout = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
- pData->rcvTimeout = (int) pvals[i].val.d.n, NULL;
+ pData->rcvTimeout = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
- pData->maxMsgSize = (int) pvals[i].val.d.n, NULL;
+ pData->maxMsgSize = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "rate")) {
- pData->rate = (int) pvals[i].val.d.n, NULL;
+ pData->rate = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
- pData->recoveryIVL = (int) pvals[i].val.d.n, NULL;
+ pData->recoveryIVL = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "multicastHops")) {
- pData->multicastHops = (int) pvals[i].val.d.n, NULL;
+ pData->multicastHops = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
- pData->reconnectIVL = (int) pvals[i].val.d.n, NULL;
+ pData->reconnectIVL = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
- pData->reconnectIVLMax = (int) pvals[i].val.d.n, NULL;
+ pData->reconnectIVLMax = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) {
- pData->ipv4Only = (int) pvals[i].val.d.n, NULL;
+ pData->ipv4Only = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "affinity")) {
- pData->affinity = (int) pvals[i].val.d.n, NULL;
+ pData->affinity = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
- s_workerThreads = (int) pvals[i].val.d.n, NULL;
+ s_workerThreads = (int) pvals[i].val.d.n;
} else {
errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);