summaryrefslogtreecommitdiffstats
path: root/plugins/imzmq3
diff options
context:
space:
mode:
authorDavid Kelly <davidk@talksum.com>2012-05-31 15:25:00 -0700
committerRainer Gerhards <rgerhards@adiscon.com>2012-06-01 18:44:21 +0200
commitd886dd979ebc94b45eb31255a24d49316dfcf6d0 (patch)
tree01047fd4854f9a8a6c3e7abbeee5ca58680e6ee6 /plugins/imzmq3
parentcbff73d94c3a86ed74294fe1265dc5242f9317be (diff)
downloadrsyslog-d886dd979ebc94b45eb31255a24d49316dfcf6d0.tar.gz
rsyslog-d886dd979ebc94b45eb31255a24d49316dfcf6d0.tar.xz
rsyslog-d886dd979ebc94b45eb31255a24d49316dfcf6d0.zip
Minor updates
* C++ style comments converted to c-style * copy/paste error (benign I believe) in omzmq3 fixed * added readme files for both imzmq3 and omzmq3
Diffstat (limited to 'plugins/imzmq3')
-rw-r--r--plugins/imzmq3/README24
-rw-r--r--plugins/imzmq3/imzmq3.c78
2 files changed, 64 insertions, 38 deletions
diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README
new file mode 100644
index 00000000..88653b83
--- /dev/null
+++ b/plugins/imzmq3/README
@@ -0,0 +1,24 @@
+ZeroMQ 3.x Input Plugin
+
+Building this plugin:
+Requires libzmq and libczmq. First, install libzmq from the HEAD on github:
+http://github.com/zeromq/libzmq. You can clone the repository, build, then
+install it. The directions for doing so are there in the readme. Then, do
+the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1
+version of libzmq will be released, and a supporting version of libczmq.
+At that time, you could simply download and install the tarballs instead of
+using git to clone the repositories. Those tarballs (when available) can
+be found at http://download.zeromq.org. As of this writing (5/31/2012), the
+most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile
+properly.
+
+Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example
+below binds a SUB socket to port 7172, and then any messages with the topic
+"foo" will be pushed into rsyslog.
+
+Example Rsyslog.conf snippet:
+-------------------------------------------------------------------------------
+
+$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo
+
+-------------------------------------------------------------------------------
diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c
index 0195fd20..78eee887 100644
--- a/plugins/imzmq3/imzmq3.c
+++ b/plugins/imzmq3/imzmq3.c
@@ -72,7 +72,7 @@ typedef struct _socket_type {
int type;
} socket_type;
-// more overkill, but seems nice to be consistent.
+/* more overkill, but seems nice to be consistent.*/
typedef struct _socket_action {
char* name;
int action;
@@ -87,8 +87,8 @@ typedef struct _socket_info {
int type;
int action;
char* description;
- int sndHWM; // if you want more than 2^32 messages,
- int rcvHWM; // then pass in 0 (the default).
+ int sndHWM; /* if you want more than 2^32 messages, */
+ int rcvHWM; /* then pass in 0 (the default). */
char* identity;
char** subscriptions;
ruleset_t* ruleset;
@@ -136,13 +136,14 @@ static socket_action socketActions[] = {
* Helper functions
*/
-// get the name of a socket type, return the ZMQ_XXX type
-// or -1 if not a supported type (see above)
+/* get the name of a socket type, return the ZMQ_XXX type
+ or -1 if not a supported type (see above)
+*/
static int getSocketType(char* name) {
int type = -1;
uint i;
- // match name with known socket type
+ /* match name with known socket type */
for(i=0; i<sizeof(socketTypes)/sizeof(socket_type); ++i) {
if( !strcmp(socketTypes[i].name, name) ) {
type = socketTypes[i].type;
@@ -150,7 +151,7 @@ static int getSocketType(char* name) {
}
}
- // whine if no match was found.
+ /* whine if no match was found. */
if (type == -1)
errmsg.LogError(0, NO_ERRCODE, "unknown type %s",name);
@@ -162,7 +163,7 @@ static int getSocketAction(char* name) {
int action = -1;
uint i;
- // match name with known socket action
+ /* match name with known socket action */
for(i=0; i < sizeof(socketActions)/sizeof(socket_action); ++i) {
if(!strcmp(socketActions[i].name, name)) {
action = socketActions[i].action;
@@ -170,7 +171,7 @@ static int getSocketAction(char* name) {
}
}
- // whine if no matching action was found
+ /* whine if no matching action was found */
if (action == -1)
errmsg.LogError(0, NO_ERRCODE, "unknown action %s",name);
@@ -218,7 +219,7 @@ static rsRetVal parseConfig(char* config, socket_info* info) {
binding != NULL;
binding = strtok_r(NULL, ",", &ptr1)) {
- // Each binding looks like foo=bar
+ /* Each binding looks like foo=bar */
char * sep = strchr(binding, '=');
if (sep == NULL)
{
@@ -228,7 +229,7 @@ static rsRetVal parseConfig(char* config, socket_info* info) {
continue;
}
- // Replace '=' with '\0'.
+ /* Replace '=' with '\0'. */
*sep = '\0';
char * val = sep + 1;
@@ -244,7 +245,7 @@ static rsRetVal parseConfig(char* config, socket_info* info) {
} else if (strcmp(binding, "rcvHWM") == 0) {
info->sndHWM = atoi(val);
} else if (strcmp(binding, "subscribe") == 0) {
- // Add the subscription value to the list.
+ /* Add the subscription value to the list.*/
char * substr = NULL;
substr = strdup(val);
info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1);
@@ -326,7 +327,7 @@ static rsRetVal createContext() {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"zctx_new failed: %s",
strerror(errno));
- // DK: really should do better than invalid params...
+ /* DK: really should do better than invalid params...*/
return RS_RET_INVALID_PARAMS;
}
@@ -348,11 +349,11 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
RS_RET_INVALID_PARAMS,
"zsocket_new failed: %s, for type %d",
strerror(errno),info->type);
- // DK: invalid params seems right here.
+ /* DK: invalid params seems right here */
return RS_RET_INVALID_PARAMS;
}
- // Set options *before* the connect/bind.
+ /* Set options *before* the connect/bind. */
if (info->identity) zsocket_set_identity(*sock, info->identity);
if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf);
if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf);
@@ -369,17 +370,17 @@ static rsRetVal createSocket(socket_info* info, void** sock) {
if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only);
if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity);
- // since HWM have defaults, we always set them. No return codes to check, either.
+ /* since HWM have defaults, we always set them. No return codes to check, either.*/
zsocket_set_sndhwm(*sock, info->sndHWM);
zsocket_set_rcvhwm(*sock, info->rcvHWM);
- // Set subscriptions.
+ /* Set subscriptions.*/
for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii)
zsocket_set_subscribe(*sock, info->subscriptions[ii]);
- // Do the bind/connect...
+ /* Do the bind/connect... */
if (info->action==ACTION_CONNECT) {
rv = zsocket_connect(*sock, info->description);
if (rv < 0) {
@@ -435,40 +436,40 @@ finalize_it:
static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) {
DEFiRet;
- // increment number of items and store old num items, as it will be handy.
+ /* increment number of items and store old num items, as it will be handy.*/
size_t idx = s_nitems++;
- // allocate a new socket_info array to accomidate this new endpoint
+ /* allocate a new socket_info array to accomidate this new endpoint*/
socket_info* tmpSocketInfo;
CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems));
- // copy existing socket_info across into new array, if any, and free old storage
+ /* copy existing socket_info across into new array, if any, and free old storage*/
if(idx) {
memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx);
free(s_socketInfo);
}
- // set the static to hold the new array
+ /* set the static to hold the new array */
s_socketInfo = tmpSocketInfo;
- // point to the new one
+ /* point to the new one */
socket_info* sockInfo = &s_socketInfo[idx];
- // set defaults for the new socket info
+ /* set defaults for the new socket info */
setDefaults(sockInfo);
- // Make a writeable copy of the string so we can use strtok
- // in the parseConfig call
+ /* Make a writeable copy of the string so we can use strtok
+ in the parseConfig call */
char * copy = NULL;
CHKmalloc(copy = strdup((char *) valp));
- // parse the config string
+ /* parse the config string */
CHKiRet(parseConfig(copy, sockInfo));
- // validate it
+ /* validate it */
CHKiRet(validateConfig(sockInfo));
- // bind to the current ruleset (if any)
+ /* bind to the current ruleset (if any)*/
sockInfo->ruleset = s_ruleset;
finalize_it:
@@ -492,8 +493,9 @@ static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *po
}
if( pollerData->thread->bShallStop == TRUE) {
- // a handler that returns -1 will terminate the
- // czmq reactor loop
+ /* a handler that returns -1 will terminate the
+ czmq reactor loop
+ */
return -1;
}
@@ -510,22 +512,22 @@ static rsRetVal rcv_loop(thrdInfo_t* pThrd){
DEFiRet;
- // create the context
+ /* create the context*/
CHKiRet(createContext());
- // create the poll items
+ /* create the poll items*/
CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems));
- // create poller data (stuff to pass into the zmq closure called when we get a message)
+ /* create poller data (stuff to pass into the zmq closure called when we get a message)*/
CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems));
- // loop through and initialize the poll items and poller_data arrays...
+ /* loop through and initialize the poll items and poller_data arrays...*/
for(i=0; i<s_nitems;++i) {
- // create the socket, update items.
+ /* create the socket, update items.*/
createSocket(&s_socketInfo[i], &items[i].socket);
items[i].events = ZMQ_POLLIN;
- // now update the poller_data for this item
+ /* now update the poller_data for this item */
pollerData[i].thread = pThrd;
pollerData[i].ruleset = s_socketInfo[i].ruleset;
}
@@ -572,7 +574,7 @@ CODESTARTwillRun
sizeof("imzmq3") - 1));
CHKiRet(prop.ConstructFinalize(s_namep));
- // If there are no endpoints this is pointless ...
+/* If there are no endpoints this is pointless ...*/
if (s_nitems == 0)
ABORT_FINALIZE(RS_RET_NO_RUN);