/* imzmq3.c
*
* This input plugin enables rsyslog to read messages from a ZeroMQ
* queue.
*
* Copyright 2012 Talksum, Inc.
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program. If not, see
* .
*
* Author: David Kelly
*
*/
#include
#include
#include
#include
#include
#include "rsyslog.h"
#include "cfsysline.h"
#include "config.h"
#include "dirty.h"
#include "errmsg.h"
#include "glbl.h"
#include "module-template.h"
#include "msg.h"
#include "net.h"
#include "parser.h"
#include "prop.h"
#include "ruleset.h"
#include "srUtils.h"
#include "unicode-helper.h"
#include
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
/* convienent symbols to denote a socket we want to bind
* vs one we want to just connect to
*/
#define ACTION_CONNECT 1
#define ACTION_BIND 2
/* Module static data */
DEF_IMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
/* ----------------------------------------------------------------------------
* structs to describe sockets
*/
typedef struct _socket_type {
char* name;
int type;
} socket_type;
/* more overkill, but seems nice to be consistent.*/
typedef struct _socket_action {
char* name;
int action;
} socket_action;
typedef struct _poller_data {
ruleset_t* ruleset;
thrdInfo_t* thread;
} poller_data;
typedef struct _socket_info {
int type;
int action;
char* description;
int sndHWM; /* if you want more than 2^32 messages, */
int rcvHWM; /* then pass in 0 (the default). */
char* identity;
char** subscriptions;
ruleset_t* ruleset;
int sndBuf;
int rcvBuf;
int linger;
int backlog;
int sndTimeout;
int rcvTimeout;
int maxMsgSize;
int rate;
int recoveryIVL;
int multicastHops;
int reconnectIVL;
int reconnectIVLMax;
int ipv4Only;
int affinity;
} socket_info;
/* ----------------------------------------------------------------------------
* Static definitions/initializations.
*/
static socket_info* s_socketInfo = NULL;
static size_t s_nitems = 0;
static prop_t * s_namep = NULL;
static zloop_t* s_zloop = NULL;
static int s_io_threads = 1;
static zctx_t* s_context = NULL;
static ruleset_t* s_ruleset = NULL;
static socket_type socketTypes[] = {
{"SUB", ZMQ_SUB },
{"PULL", ZMQ_PULL },
{"XSUB", ZMQ_XSUB }
};
static socket_action socketActions[] = {
{"BIND", ACTION_BIND},
{"CONNECT", ACTION_CONNECT},
};
/* ----------------------------------------------------------------------------
* Helper functions
*/
/* get the name of a socket type, return the ZMQ_XXX type
or -1 if not a supported type (see above)
*/
static int getSocketType(char* name) {
int type = -1;
uint i;
/* match name with known socket type */
for(i=0; itype = ZMQ_SUB;
info->action = ACTION_BIND;
info->description = NULL;
info->sndHWM = 0;
info->rcvHWM = 0;
info->identity = NULL;
info->subscriptions = NULL;
info->ruleset = NULL;
info->sndBuf = -1;
info->rcvBuf = -1;
info->linger = -1;
info->backlog = -1;
info->sndTimeout = -1;
info->rcvTimeout = -1;
info->maxMsgSize = -1;
info->rate = -1;
info->recoveryIVL = -1;
info->multicastHops = -1;
info->reconnectIVL = -1;
info->reconnectIVLMax = -1;
info->ipv4Only = -1;
info->affinity = -1;
};
/* The config string should look like:
* "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'"
*
*/
static rsRetVal parseConfig(char* config, socket_info* info) {
int nsubs = 0;
char* binding;
char* ptr1;
for (binding = strtok_r(config, ",", &ptr1);
binding != NULL;
binding = strtok_r(NULL, ",", &ptr1)) {
/* Each binding looks like foo=bar */
char * sep = strchr(binding, '=');
if (sep == NULL)
{
errmsg.LogError(0, NO_ERRCODE,
"Invalid argument format %s, ignoring ...",
binding);
continue;
}
/* Replace '=' with '\0'. */
*sep = '\0';
char * val = sep + 1;
if (strcmp(binding, "action") == 0) {
info->action = getSocketAction(val);
} else if (strcmp(binding, "type") == 0) {
info->type = getSocketType(val);
} else if (strcmp(binding, "description") == 0) {
info->description = strdup(val);
} else if (strcmp(binding, "sndHWM") == 0) {
info->sndHWM = atoi(val);
} else if (strcmp(binding, "rcvHWM") == 0) {
info->sndHWM = atoi(val);
} else if (strcmp(binding, "subscribe") == 0) {
/* Add the subscription value to the list.*/
char * substr = NULL;
substr = strdup(val);
info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1);
info->subscriptions[nsubs] = substr;
++nsubs;
} else if (strcmp(binding, "sndBuf") == 0) {
info->sndBuf = atoi(val);
} else if (strcmp(binding, "rcvBuf") == 0) {
info->rcvBuf = atoi(val);
} else if (strcmp(binding, "linger") == 0) {
info->linger = atoi(val);
} else if (strcmp(binding, "backlog") == 0) {
info->backlog = atoi(val);
} else if (strcmp(binding, "sndTimeout") == 0) {
info->sndTimeout = atoi(val);
} else if (strcmp(binding, "rcvTimeout") == 0) {
info->rcvTimeout = atoi(val);
} else if (strcmp(binding, "maxMsgSize") == 0) {
info->maxMsgSize = atoi(val);
} else if (strcmp(binding, "rate") == 0) {
info->rate = atoi(val);
} else if (strcmp(binding, "recoveryIVL") == 0) {
info->recoveryIVL = atoi(val);
} else if (strcmp(binding, "multicastHops") == 0) {
info->multicastHops = atoi(val);
} else if (strcmp(binding, "reconnectIVL") == 0) {
info->reconnectIVL = atoi(val);
} else if (strcmp(binding, "reconnectIVLMax") == 0) {
info->reconnectIVLMax = atoi(val);
} else if (strcmp(binding, "ipv4Only") == 0) {
info->ipv4Only = atoi(val);
} else if (strcmp(binding, "affinity") == 0) {
info->affinity = atoi(val);
} else {
errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
return RS_RET_INVALID_PARAMS;
}
}
return RS_RET_OK;
}
static rsRetVal validateConfig(socket_info* info) {
if (info->type == -1) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"you entered an invalid type");
return RS_RET_INVALID_PARAMS;
}
if (info->action == -1) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"you entered an invalid action");
return RS_RET_INVALID_PARAMS;
}
if (info->description == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"you didn't enter a description");
return RS_RET_INVALID_PARAMS;
}
if(info->type == ZMQ_SUB && info->subscriptions == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"SUB sockets need at least one subscription");
return RS_RET_INVALID_PARAMS;
}
if(info->type != ZMQ_SUB && info->subscriptions != NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"only SUB sockets can have subscriptions");
return RS_RET_INVALID_PARAMS;
}
return RS_RET_OK;
}
static rsRetVal createContext() {
if (s_context == NULL) {
errmsg.LogError(0, NO_ERRCODE, "creating zctx.");
s_context = zctx_new();
if (s_context == NULL) {
errmsg.LogError(0, RS_RET_INVALID_PARAMS,
"zctx_new failed: %s",
strerror(errno));
/* DK: really should do better than invalid params...*/
return RS_RET_INVALID_PARAMS;
}
if (s_io_threads > 1) {
errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads);
zctx_set_iothreads(s_context, s_io_threads);
}
}
return RS_RET_OK;
}
static rsRetVal createSocket(socket_info* info, void** sock) {
size_t ii;
int rv;
*sock = zsocket_new(s_context, info->type);
if (!sock) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zsocket_new failed: %s, for type %d",
strerror(errno),info->type);
/* DK: invalid params seems right here */
return RS_RET_INVALID_PARAMS;
}
/* Set options *before* the connect/bind. */
if (info->identity) zsocket_set_identity(*sock, info->identity);
if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf);
if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf);
if (info->linger > -1) zsocket_set_linger(*sock, info->linger);
if (info->backlog > -1) zsocket_set_backlog(*sock, info->backlog);
if (info->sndTimeout > -1) zsocket_set_sndtimeo(*sock, info->sndTimeout);
if (info->rcvTimeout > -1) zsocket_set_rcvtimeo(*sock, info->rcvTimeout);
if (info->maxMsgSize > -1) zsocket_set_maxmsgsize(*sock, info->maxMsgSize);
if (info->rate > -1) zsocket_set_rate(*sock, info->rate);
if (info->recoveryIVL > -1) zsocket_set_recovery_ivl(*sock, info->recoveryIVL);
if (info->multicastHops > -1) zsocket_set_multicast_hops(*sock, info->multicastHops);
if (info->reconnectIVL > -1) zsocket_set_reconnect_ivl(*sock, info->reconnectIVL);
if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax);
if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only);
if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity);
/* since HWM have defaults, we always set them. No return codes to check, either.*/
zsocket_set_sndhwm(*sock, info->sndHWM);
zsocket_set_rcvhwm(*sock, info->rcvHWM);
/* Set subscriptions.*/
for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii)
zsocket_set_subscribe(*sock, info->subscriptions[ii]);
/* Do the bind/connect... */
if (info->action==ACTION_CONNECT) {
rv = zsocket_connect(*sock, info->description);
if (rv < 0) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zmq_connect using %s failed: %s",
info->description, strerror(errno));
return RS_RET_INVALID_PARAMS;
}
} else {
rv = zsocket_bind(*sock, info->description);
if (rv <= 0) {
errmsg.LogError(0,
RS_RET_INVALID_PARAMS,
"zmq_bind using %s failed: %s",
info->description, strerror(errno));
return RS_RET_INVALID_PARAMS;
}
}
return RS_RET_OK;
}
/* ----------------------------------------------------------------------------
* Module endpoints
*/
/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note
* that this makes the assumption that after the bind ruleset is called in the config,
* another call will be made to add an endpoint.
*/
static rsRetVal
set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) {
ruleset_t* ruleset_ptr;
rsRetVal localRet;
DEFiRet;
localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName);
if(localRet == RS_RET_NOT_FOUND) {
errmsg.LogError(0, NO_ERRCODE, "error: "
"ruleset '%s' not found - ignored", pszName);
}
CHKiRet(localRet);
s_ruleset = ruleset_ptr;
DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName);
finalize_it:
free(pszName); /* no longer needed */
RETiRet;
}
/* add an actual endpoint
*/
static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) {
DEFiRet;
/* increment number of items and store old num items, as it will be handy.*/
size_t idx = s_nitems++;
/* allocate a new socket_info array to accomidate this new endpoint*/
socket_info* tmpSocketInfo;
CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems));
/* copy existing socket_info across into new array, if any, and free old storage*/
if(idx) {
memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx);
free(s_socketInfo);
}
/* set the static to hold the new array */
s_socketInfo = tmpSocketInfo;
/* point to the new one */
socket_info* sockInfo = &s_socketInfo[idx];
/* set defaults for the new socket info */
setDefaults(sockInfo);
/* Make a writeable copy of the string so we can use strtok
in the parseConfig call */
char * copy = NULL;
CHKmalloc(copy = strdup((char *) valp));
/* parse the config string */
CHKiRet(parseConfig(copy, sockInfo));
/* validate it */
CHKiRet(validateConfig(sockInfo));
/* bind to the current ruleset (if any)*/
sockInfo->ruleset = s_ruleset;
finalize_it:
free(valp); /* in any case, this is no longer needed */
RETiRet;
}
static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) {
msg_t* logmsg;
poller_data* pollerData = (poller_data*)pd;
char* buf = zstr_recv(poller->socket);
if (msgConstruct(&logmsg) == RS_RET_OK) {
MsgSetRawMsg(logmsg, buf, strlen(buf));
MsgSetInputName(logmsg, s_namep);
MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY);
MsgSetRuleset(logmsg, pollerData->ruleset);
logmsg->msgFlags = NEEDS_PARSING;
submitMsg(logmsg);
}
/* gotta free the string returned from zstr_recv() */
free(buf);
if( pollerData->thread->bShallStop == TRUE) {
/* a handler that returns -1 will terminate the
czmq reactor loop
*/
return -1;
}
return 0;
}
/* called when runInput is called by rsyslog
*/
static rsRetVal rcv_loop(thrdInfo_t* pThrd){
size_t i;
int rv;
zmq_pollitem_t* items;
poller_data* pollerData;
DEFiRet;
/* create the context*/
CHKiRet(createContext());
/* create the poll items*/
CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems));
/* create poller data (stuff to pass into the zmq closure called when we get a message)*/
CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems));
/* loop through and initialize the poll items and poller_data arrays...*/
for(i=0; i