summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile.am8
-rw-r--r--configure.ac42
-rw-r--r--plugins/impstats/impstats.c11
-rw-r--r--plugins/imzmq3/Makefile.am8
-rw-r--r--plugins/imzmq3/imzmq3.c651
-rw-r--r--plugins/omzmq3/Makefile.am8
-rw-r--r--plugins/omzmq3/omzmq3.c460
-rw-r--r--runtime/statsobj.c12
-rw-r--r--runtime/statsobj.h3
9 files changed, 1198 insertions, 5 deletions
diff --git a/Makefile.am b/Makefile.am
index 999404e7..4cefb756 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -158,6 +158,14 @@ if ENABLE_OMHIREDIS
SUBDIRS += plugins/omhiredis
endif
+if ENABLE_OMZMQ3
+SUBDIRS += plugins/omzmq3
+endif
+
+if ENABLE_IMZMQ3
+SUBDIRS += plugins/imzmq3
+endif
+
if ENABLE_OMUXSOCK
SUBDIRS += plugins/omuxsock
endif
diff --git a/configure.ac b/configure.ac
index 0d41572c..cee453db 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1269,6 +1269,44 @@ fi
AM_CONDITIONAL(ENABLE_OMMONGODB, test x$enable_ommongodb = xyes)
# end of mongodb code
+# BEGIN ZMQ3 INPUT SUPPORT
+AC_ARG_ENABLE(imzmq3,
+ [AS_HELP_STRING([--enable-imzmq3],[Compiles imzmq3 output module @<:@default=no@:>@])],
+ [case "${enableval}" in
+ yes) enable_imzmq3="yes" ;;
+ no) enable_imzmq3="no" ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-imzmq3) ;;
+ esac],
+ [enable_imzmq3=no]
+)
+if test "x$enable_imzmq3" = "xyes"; then
+ PKG_CHECK_MODULES(CZMQ, libczmq >= 1.1.0)
+ AC_SUBST(CZMQ_CFLAGS)
+ AC_SUBST(CZMQ_LIBS)
+fi
+AM_CONDITIONAL(ENABLE_IMZMQ3, test x$enable_imzmq3 = xyes)
+
+# END ZMQ3 INPUT SUPPORT
+
+# BEGIN ZMQ3 OUTPUT SUPPORT
+AC_ARG_ENABLE(omzmq3,
+ [AS_HELP_STRING([--enable-omzmq3],[Compiles omzmq3 output module @<:@default=no@:>@])],
+ [case "${enableval}" in
+ yes) enable_omzmq3="yes" ;;
+ no) enable_omzmq3="no" ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-omzmq3) ;;
+ esac],
+ [enable_omzmq3=no]
+)
+if test "x$enable_omzmq3" = "xyes"; then
+ PKG_CHECK_MODULES(CZMQ, libczmq >= 1.1.0)
+ AC_SUBST(CZMQ_CFLAGS)
+ AC_SUBST(CZMQ_LIBS)
+fi
+AM_CONDITIONAL(ENABLE_OMZMQ3, test x$enable_omzmq3 = xyes)
+
+# END ZMQ3 SUPPORT
+
# HIREDIS SUPPORT
AC_ARG_ENABLE(omhiredis,
@@ -1319,6 +1357,7 @@ AC_CONFIG_FILES([Makefile \
plugins/impstats/Makefile \
plugins/imrelp/Makefile \
plugins/imdiag/Makefile \
+ plugins/imzmq3/Makefile \
plugins/omtesting/Makefile \
plugins/omgssapi/Makefile \
plugins/ommysql/Makefile \
@@ -1331,6 +1370,7 @@ AC_CONFIG_FILES([Makefile \
plugins/omudpspoof/Makefile \
plugins/ommongodb/Makefile \
plugins/omhiredis/Makefile \
+ plugins/omzmq3/Makefile \
plugins/mmnormalize/Makefile \
plugins/mmjsonparse/Makefile \
plugins/mmaudit/Makefile \
@@ -1363,6 +1403,7 @@ echo " imdiag enabled: $enable_imdiag"
echo " file input module enabled: $enable_imfile"
echo " Solaris input module enabled: $enable_imsolaris"
echo " periodic statistics module enabled: $enable_impstats"
+echo " imzmq3 input module enabled: $enable_imzmq3"
echo
echo "---{ output plugins }---"
echo " Mail support enabled: $enable_mail"
@@ -1373,6 +1414,7 @@ echo " omelasticsearch module will be compiled: $enable_elasticsearch"
echo " omruleset module will be compiled: $enable_omruleset"
echo " omudpspoof module will be compiled: $enable_omudpspoof"
echo " omuxsock module will be compiled: $enable_omuxsock"
+echo " omzmq3 module will be compiled: $enable_omzmq3"
echo
echo "---{ parser modules }---"
echo " pmrfc3164sd module will be compiled: $enable_pmrfc3164sd"
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c
index 4fec8e70..0abde84a 100644
--- a/plugins/impstats/impstats.c
+++ b/plugins/impstats/impstats.c
@@ -59,6 +59,7 @@ typedef struct configSettings_s {
int iFacility;
int iSeverity;
int bJSON;
+ int bCEE;
} configSettings_t;
struct modConfData_s {
@@ -89,6 +90,7 @@ initConfigSettings(void)
cs.iFacility = DEFAULT_FACILITY;
cs.iSeverity = DEFAULT_SEVERITY;
cs.bJSON = 0;
+ cs.bCEE = 0;
}
@@ -157,7 +159,13 @@ CODESTARTendCnfLoad
loadModConf->iStatsInterval = cs.iStatsInterval;
loadModConf->iFacility = cs.iFacility;
loadModConf->iSeverity = cs.iSeverity;
- loadModConf->statsFmt = cs.bJSON ? statsFmt_JSON : statsFmt_Legacy;
+ if (cs.bCEE == 1) {
+ loadModConf->statsFmt = statsFmt_CEE;
+ } else if (cs.bJSON == 1) {
+ loadModConf->statsFmt = statsFmt_JSON;
+ } else {
+ loadModConf->statsFmt = statsFmt_Legacy;
+ }
ENDendCnfLoad
@@ -259,6 +267,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatjson", 0, eCmdHdlrBinary, NULL, &cs.bJSON, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatcee", 0, eCmdHdlrBinary, NULL, &cs.bCEE, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(prop.Construct(&pInputName));
diff --git a/plugins/imzmq3/Makefile.am b/plugins/imzmq3/Makefile.am
new file mode 100644
index 00000000..f9c84e5d
--- /dev/null
+++ b/plugins/imzmq3/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = imzmq3.la
+
+imzmq3_la_SOURCES = imzmq3.c
+imzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS)
+imzmq3_la_LDFLAGS = -module -avoid-version
+imzmq3_la_LIBADD = $(CZMQ_LIBS)
+
+EXTRA_DIST =
diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c
new file mode 100644
index 00000000..0195fd20
--- /dev/null
+++ b/plugins/imzmq3/imzmq3.c
@@ -0,0 +1,651 @@
+/* imzmq3.c
+ *
+ * This input plugin enables rsyslog to read messages from a ZeroMQ
+ * queue.
+ *
+ * Copyright 2012 Talksum, Inc.
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Author: David Kelly
+ * <davidk@talksum.com>
+ */
+
+#include <assert.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "rsyslog.h"
+
+#include "cfsysline.h"
+#include "config.h"
+#include "dirty.h"
+#include "errmsg.h"
+#include "glbl.h"
+#include "module-template.h"
+#include "msg.h"
+#include "net.h"
+#include "parser.h"
+#include "prop.h"
+#include "ruleset.h"
+#include "srUtils.h"
+#include "unicode-helper.h"
+
+#include <czmq.h>
+
+MODULE_TYPE_INPUT
+MODULE_TYPE_NOKEEP
+
+/* convienent symbols to denote a socket we want to bind
+ * vs one we want to just connect to
+ */
+#define ACTION_CONNECT 1
+#define ACTION_BIND 2
+
+/* Module static data */
+DEF_IMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
+
+
+/* ----------------------------------------------------------------------------
+ * structs to describe sockets
+ */
+typedef struct _socket_type {
+ char* name;
+ int type;
+} socket_type;
+
+// more overkill, but seems nice to be consistent.
+typedef struct _socket_action {
+ char* name;
+ int action;
+} socket_action;
+
+typedef struct _poller_data {
+ ruleset_t* ruleset;
+ thrdInfo_t* thread;
+} poller_data;
+
+typedef struct _socket_info {
+ int type;
+ int action;
+ char* description;
+ int sndHWM; // if you want more than 2^32 messages,
+ int rcvHWM; // then pass in 0 (the default).
+ char* identity;
+ char** subscriptions;
+ ruleset_t* ruleset;
+ int sndBuf;
+ int rcvBuf;
+ int linger;
+ int backlog;
+ int sndTimeout;
+ int rcvTimeout;
+ int maxMsgSize;
+ int rate;
+ int recoveryIVL;
+ int multicastHops;
+ int reconnectIVL;
+ int reconnectIVLMax;
+ int ipv4Only;
+ int affinity;
+
+} socket_info;
+
+
+/* ----------------------------------------------------------------------------
+ * Static definitions/initializations.
+ */
+static socket_info* s_socketInfo = NULL;
+static size_t s_nitems = 0;
+static prop_t * s_namep = NULL;
+static zloop_t* s_zloop = NULL;
+static int s_io_threads = 1;
+static zctx_t* s_context = NULL;
+static ruleset_t* s_ruleset = NULL;
+static socket_type socketTypes[] = {
+ {"SUB", ZMQ_SUB },
+ {"PULL", ZMQ_PULL },
+ {"XSUB", ZMQ_XSUB }
+};
+
+static socket_action socketActions[] = {
+ {"BIND", ACTION_BIND},
+ {"CONNECT", ACTION_CONNECT},
+};
+
+
+/* ----------------------------------------------------------------------------
+ * Helper functions
+ */
+
+// get the name of a socket type, return the ZMQ_XXX type
+// or -1 if not a supported type (see above)
+static int getSocketType(char* name) {
+ int type = -1;
+ uint i;
+
+ // match name with known socket type
+ for(i=0; i<sizeof(socketTypes)/sizeof(socket_type); ++i) {
+ if( !strcmp(socketTypes[i].name, name) ) {
+ type = socketTypes[i].type;
+ break;
+ }
+ }
+
+ // whine if no match was found.
+ if (type == -1)
+ errmsg.LogError(0, NO_ERRCODE, "unknown type %s",name);
+
+ return type;
+}
+
+
+static int getSocketAction(char* name) {
+ int action = -1;
+ uint i;
+
+ // match name with known socket action
+ for(i=0; i < sizeof(socketActions)/sizeof(socket_action); ++i) {
+ if(!strcmp(socketActions[i].name, name)) {
+ action = socketActions[i].action;
+ break;
+ }
+ }
+
+ // whine if no matching action was found
+ if (action == -1)
+ errmsg.LogError(0, NO_ERRCODE, "unknown action %s",name);
+
+ return action;
+}
+
+
+static void setDefaults(socket_info* info) {
+ info->type = ZMQ_SUB;
+ info->action = ACTION_BIND;
+ info->description = NULL;
+ info->sndHWM = 0;
+ info->rcvHWM = 0;
+ info->identity = NULL;
+ info->subscriptions = NULL;
+ info->ruleset = NULL;
+ info->sndBuf = -1;
+ info->rcvBuf = -1;
+ info->linger = -1;
+ info->backlog = -1;
+ info->sndTimeout = -1;
+ info->rcvTimeout = -1;
+ info->maxMsgSize = -1;
+ info->rate = -1;
+ info->recoveryIVL = -1;
+ info->multicastHops = -1;
+ info->reconnectIVL = -1;
+ info->reconnectIVLMax = -1;
+ info->ipv4Only = -1;
+ info->affinity = -1;
+
+};
+
+
+/* The config string should look like:
+ * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'"
+ *
+ */
+static rsRetVal parseConfig(char* config, socket_info* info) {
+ int nsubs = 0;
+
+ char* binding;
+ char* ptr1;
+ for (binding = strtok_r(config, ",", &ptr1);
+ binding != NULL;
+ binding = strtok_r(NULL, ",", &ptr1)) {
+
+ // Each binding looks like foo=bar
+ char * sep = strchr(binding, '=');
+ if (sep == NULL)
+ {
+ errmsg.LogError(0, NO_ERRCODE,
+ "Invalid argument format %s, ignoring ...",
+ binding);
+ continue;
+ }
+
+ // Replace '=' with '\0'.
+ *sep = '\0';
+
+ char * val = sep + 1;
+
+ if (strcmp(binding, "action") == 0) {
+ info->action = getSocketAction(val);
+ } else if (strcmp(binding, "type") == 0) {
+ info->type = getSocketType(val);
+ } else if (strcmp(binding, "description") == 0) {
+ info->description = strdup(val);
+ } else if (strcmp(binding, "sndHWM") == 0) {
+ info->sndHWM = atoi(val);
+ } else if (strcmp(binding, "rcvHWM") == 0) {
+ info->sndHWM = atoi(val);
+ } else if (strcmp(binding, "subscribe") == 0) {
+ // Add the subscription value to the list.
+ char * substr = NULL;
+ substr = strdup(val);
+ info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1);
+ info->subscriptions[nsubs] = substr;
+ ++nsubs;
+ } else if (strcmp(binding, "sndBuf") == 0) {
+ info->sndBuf = atoi(val);
+ } else if (strcmp(binding, "rcvBuf") == 0) {
+ info->rcvBuf = atoi(val);
+ } else if (strcmp(binding, "linger") == 0) {
+ info->linger = atoi(val);
+ } else if (strcmp(binding, "backlog") == 0) {
+ info->backlog = atoi(val);
+ } else if (strcmp(binding, "sndTimeout") == 0) {
+ info->sndTimeout = atoi(val);
+ } else if (strcmp(binding, "rcvTimeout") == 0) {
+ info->rcvTimeout = atoi(val);
+ } else if (strcmp(binding, "maxMsgSize") == 0) {
+ info->maxMsgSize = atoi(val);
+ } else if (strcmp(binding, "rate") == 0) {
+ info->rate = atoi(val);
+ } else if (strcmp(binding, "recoveryIVL") == 0) {
+ info->recoveryIVL = atoi(val);
+ } else if (strcmp(binding, "multicastHops") == 0) {
+ info->multicastHops = atoi(val);
+ } else if (strcmp(binding, "reconnectIVL") == 0) {
+ info->reconnectIVL = atoi(val);
+ } else if (strcmp(binding, "reconnectIVLMax") == 0) {
+ info->reconnectIVLMax = atoi(val);
+ } else if (strcmp(binding, "ipv4Only") == 0) {
+ info->ipv4Only = atoi(val);
+ } else if (strcmp(binding, "affinity") == 0) {
+ info->affinity = atoi(val);
+ } else {
+ errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding);
+ return RS_RET_INVALID_PARAMS;
+ }
+ }
+
+ return RS_RET_OK;
+}
+
+static rsRetVal validateConfig(socket_info* info) {
+
+ if (info->type == -1) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "you entered an invalid type");
+ return RS_RET_INVALID_PARAMS;
+ }
+ if (info->action == -1) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "you entered an invalid action");
+ return RS_RET_INVALID_PARAMS;
+ }
+ if (info->description == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "you didn't enter a description");
+ return RS_RET_INVALID_PARAMS;
+ }
+ if(info->type == ZMQ_SUB && info->subscriptions == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "SUB sockets need at least one subscription");
+ return RS_RET_INVALID_PARAMS;
+ }
+ if(info->type != ZMQ_SUB && info->subscriptions != NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "only SUB sockets can have subscriptions");
+ return RS_RET_INVALID_PARAMS;
+ }
+ return RS_RET_OK;
+}
+
+static rsRetVal createContext() {
+ if (s_context == NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "creating zctx.");
+ s_context = zctx_new();
+
+ if (s_context == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS,
+ "zctx_new failed: %s",
+ strerror(errno));
+ // DK: really should do better than invalid params...
+ return RS_RET_INVALID_PARAMS;
+ }
+
+ if (s_io_threads > 1) {
+ errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads);
+ zctx_set_iothreads(s_context, s_io_threads);
+ }
+ }
+ return RS_RET_OK;
+}
+
+static rsRetVal createSocket(socket_info* info, void** sock) {
+ size_t ii;
+ int rv;
+
+ *sock = zsocket_new(s_context, info->type);
+ if (!sock) {
+ errmsg.LogError(0,
+ RS_RET_INVALID_PARAMS,
+ "zsocket_new failed: %s, for type %d",
+ strerror(errno),info->type);
+ // DK: invalid params seems right here.
+ return RS_RET_INVALID_PARAMS;
+ }
+
+ // Set options *before* the connect/bind.
+ if (info->identity) zsocket_set_identity(*sock, info->identity);
+ if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf);
+ if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf);
+ if (info->linger > -1) zsocket_set_linger(*sock, info->linger);
+ if (info->backlog > -1) zsocket_set_backlog(*sock, info->backlog);
+ if (info->sndTimeout > -1) zsocket_set_sndtimeo(*sock, info->sndTimeout);
+ if (info->rcvTimeout > -1) zsocket_set_rcvtimeo(*sock, info->rcvTimeout);
+ if (info->maxMsgSize > -1) zsocket_set_maxmsgsize(*sock, info->maxMsgSize);
+ if (info->rate > -1) zsocket_set_rate(*sock, info->rate);
+ if (info->recoveryIVL > -1) zsocket_set_recovery_ivl(*sock, info->recoveryIVL);
+ if (info->multicastHops > -1) zsocket_set_multicast_hops(*sock, info->multicastHops);
+ if (info->reconnectIVL > -1) zsocket_set_reconnect_ivl(*sock, info->reconnectIVL);
+ if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax);
+ if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only);
+ if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity);
+
+ // since HWM have defaults, we always set them. No return codes to check, either.
+ zsocket_set_sndhwm(*sock, info->sndHWM);
+ zsocket_set_rcvhwm(*sock, info->rcvHWM);
+
+ // Set subscriptions.
+ for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii)
+ zsocket_set_subscribe(*sock, info->subscriptions[ii]);
+
+
+
+ // Do the bind/connect...
+ if (info->action==ACTION_CONNECT) {
+ rv = zsocket_connect(*sock, info->description);
+ if (rv < 0) {
+ errmsg.LogError(0,
+ RS_RET_INVALID_PARAMS,
+ "zmq_connect using %s failed: %s",
+ info->description, strerror(errno));
+ return RS_RET_INVALID_PARAMS;
+ }
+ } else {
+ rv = zsocket_bind(*sock, info->description);
+ if (rv <= 0) {
+ errmsg.LogError(0,
+ RS_RET_INVALID_PARAMS,
+ "zmq_bind using %s failed: %s",
+ info->description, strerror(errno));
+ return RS_RET_INVALID_PARAMS;
+ }
+ }
+ return RS_RET_OK;
+}
+
+/* ----------------------------------------------------------------------------
+ * Module endpoints
+ */
+
+/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note
+ * that this makes the assumption that after the bind ruleset is called in the config,
+ * another call will be made to add an endpoint.
+*/
+static rsRetVal
+set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) {
+ ruleset_t* ruleset_ptr;
+ rsRetVal localRet;
+ DEFiRet;
+
+ localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName);
+ if(localRet == RS_RET_NOT_FOUND) {
+ errmsg.LogError(0, NO_ERRCODE, "error: "
+ "ruleset '%s' not found - ignored", pszName);
+ }
+ CHKiRet(localRet);
+ s_ruleset = ruleset_ptr;
+ DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName);
+
+finalize_it:
+ free(pszName); /* no longer needed */
+ RETiRet;
+}
+
+/* add an actual endpoint
+ */
+static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) {
+ DEFiRet;
+
+ // increment number of items and store old num items, as it will be handy.
+ size_t idx = s_nitems++;
+
+ // allocate a new socket_info array to accomidate this new endpoint
+ socket_info* tmpSocketInfo;
+ CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems));
+
+ // copy existing socket_info across into new array, if any, and free old storage
+ if(idx) {
+ memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx);
+ free(s_socketInfo);
+ }
+
+ // set the static to hold the new array
+ s_socketInfo = tmpSocketInfo;
+
+ // point to the new one
+ socket_info* sockInfo = &s_socketInfo[idx];
+
+ // set defaults for the new socket info
+ setDefaults(sockInfo);
+
+ // Make a writeable copy of the string so we can use strtok
+ // in the parseConfig call
+ char * copy = NULL;
+ CHKmalloc(copy = strdup((char *) valp));
+
+ // parse the config string
+ CHKiRet(parseConfig(copy, sockInfo));
+
+ // validate it
+ CHKiRet(validateConfig(sockInfo));
+
+ // bind to the current ruleset (if any)
+ sockInfo->ruleset = s_ruleset;
+
+finalize_it:
+ free(valp); /* in any case, this is no longer needed */
+ RETiRet;
+}
+
+
+static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) {
+ msg_t* logmsg;
+ poller_data* pollerData = (poller_data*)pd;
+
+ char* buf = zstr_recv(poller->socket);
+ if (msgConstruct(&logmsg) == RS_RET_OK) {
+ MsgSetRawMsg(logmsg, buf, strlen(buf));
+ MsgSetInputName(logmsg, s_namep);
+ MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY);
+ MsgSetRuleset(logmsg, pollerData->ruleset);
+ logmsg->msgFlags = NEEDS_PARSING;
+ submitMsg(logmsg);
+ }
+
+ if( pollerData->thread->bShallStop == TRUE) {
+ // a handler that returns -1 will terminate the
+ // czmq reactor loop
+ return -1;
+ }
+
+ return 0;
+}
+
+/* called when runInput is called by rsyslog
+ */
+static rsRetVal rcv_loop(thrdInfo_t* pThrd){
+ size_t i;
+ int rv;
+ zmq_pollitem_t* items;
+ poller_data* pollerData;
+
+ DEFiRet;
+
+ // create the context
+ CHKiRet(createContext());
+
+ // create the poll items
+ CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems));
+
+ // create poller data (stuff to pass into the zmq closure called when we get a message)
+ CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems));
+
+ // loop through and initialize the poll items and poller_data arrays...
+ for(i=0; i<s_nitems;++i) {
+ // create the socket, update items.
+ createSocket(&s_socketInfo[i], &items[i].socket);
+ items[i].events = ZMQ_POLLIN;
+
+ // now update the poller_data for this item
+ pollerData[i].thread = pThrd;
+ pollerData[i].ruleset = s_socketInfo[i].ruleset;
+ }
+
+ s_zloop = zloop_new();
+ for(i=0; i<s_nitems; ++i) {
+
+ rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]);
+ if (rv) {
+ errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i);
+ }
+ }
+ zloop_start(s_zloop);
+ zloop_destroy(&s_zloop);
+ finalize_it:
+ for(i=0; i< s_nitems; ++i) {
+ zsocket_destroy(s_context, items[i].socket);
+ }
+
+ zctx_destroy(&s_context);
+
+ free(items);
+ RETiRet;
+}
+
+/* ----------------------------------------------------------------------------
+ * input module functions
+ */
+
+BEGINrunInput
+CODESTARTrunInput
+ iRet = rcv_loop(pThrd);
+ENDrunInput
+
+
+/* initialize and return if will run or not */
+BEGINwillRun
+CODESTARTwillRun
+ /* we need to create the inputName property (only once during our
+ lifetime) */
+ CHKiRet(prop.Construct(&s_namep));
+ CHKiRet(prop.SetString(s_namep,
+ UCHAR_CONSTANT("imzmq3"),
+ sizeof("imzmq3") - 1));
+ CHKiRet(prop.ConstructFinalize(s_namep));
+
+ // If there are no endpoints this is pointless ...
+ if (s_nitems == 0)
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+
+finalize_it:
+ENDwillRun
+
+
+BEGINafterRun
+CODESTARTafterRun
+ /* do cleanup here */
+ if(s_namep != NULL)
+ prop.Destruct(&s_namep);
+ENDafterRun
+
+
+BEGINmodExit
+CODESTARTmodExit
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp,
+ void __attribute__((unused)) *pVal) {
+ return RS_RET_OK;
+}
+static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) {
+ errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val);
+ s_io_threads = val;
+ return RS_RET_OK;
+}
+
+BEGINmodInit()
+CODESTARTmodInit
+ /* we only support the current interface specification */
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
+
+ /* register config file handlers */
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset",
+ 0, eCmdHdlrGetWord,
+ set_ruleset, NULL,
+ STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun",
+ 0, eCmdHdlrGetWord,
+ add_endpoint, NULL,
+ STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables",
+ 1, eCmdHdlrCustomHandler,
+ resetConfigVariables, NULL,
+ STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads",
+ 1, eCmdHdlrInt,
+ setGlobalWorkerThreads, NULL,
+ STD_LOADABLE_MODULE_ID));
+ENDmodInit
diff --git a/plugins/omzmq3/Makefile.am b/plugins/omzmq3/Makefile.am
new file mode 100644
index 00000000..92cd7586
--- /dev/null
+++ b/plugins/omzmq3/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = omzmq3.la
+
+omzmq3_la_SOURCES = omzmq3.c
+omzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS)
+omzmq3_la_LDFLAGS = -module -avoid-version
+omzmq3_la_LIBADD = $(CZMQ_LIBS)
+
+EXTRA_DIST =
diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c
new file mode 100644
index 00000000..885bc365
--- /dev/null
+++ b/plugins/omzmq3/omzmq3.c
@@ -0,0 +1,460 @@
+/* omzmq3.c
+ * Copyright 2012 Talksum, Inc
+ * Using the czmq interface to zeromq, we output
+ * to a zmq socket.
+
+
+*
+* This program is free software: you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public License
+* as published by the Free Software Foundation, either version 3 of
+* the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful, but
+* WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this program. If not, see
+* <http://www.gnu.org/licenses/>.
+*
+* Author: David Kelly
+* <davidk@talksum.com>
+*/
+
+
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "cfsysline.h"
+
+#include <czmq.h>
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("omzmq3")
+
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+
+/* convienent symbols to denote a socket we want to bind
+p * vs one we want to just connect to
+ */
+#define ACTION_CONNECT 1
+#define ACTION_BIND 2
+
+
+/* ----------------------------------------------------------------------------
+ * structs to describe sockets
+ */
+struct socket_type {
+ char* name;
+ int type;
+};
+
+// more overkill, but seems nice to be consistent.
+struct socket_action {
+ char* name;
+ int action;
+};
+
+typedef struct _instanceData {
+ void* socket;
+ uchar* description;
+ int type;
+ int action;
+ int sndHWM;
+ int rcvHWM;
+ uchar* identity;
+ int sndBuf;
+ int rcvBuf;
+ int linger;
+ int backlog;
+ int sndTimeout;
+ int rcvTimeout;
+ int maxMsgSize;
+ int rate;
+ int recoveryIVL;
+ int multicastHops;
+ int reconnectIVL;
+ int reconnectIVLMax;
+ int ipv4Only;
+ int affinity;
+ uchar* tplName;
+} instanceData;
+
+
+/* ----------------------------------------------------------------------------
+ * Static definitions/initializations
+ */
+
+// only 1 zctx for all the sockets, with an adjustable number of
+// worker threads which may be useful if we use affinity in particular
+// sockets
+static zctx_t* s_context = NULL;
+static int s_workerThreads = -1;
+
+static struct socket_type types[] = {
+ {"PUB", ZMQ_PUB },
+ {"PUSH", ZMQ_PUSH },
+ {"XPUB", ZMQ_XPUB }
+};
+
+static struct socket_action actions[] = {
+ {"BIND", ACTION_BIND},
+ {"CONNECT", ACTION_CONNECT},
+};
+
+static struct cnfparamdescr actpdescr[] = {
+ { "description", eCmdHdlrGetWord, 0 },
+ { "sockType", eCmdHdlrGetWord, 0 },
+ { "action", eCmdHdlrGetWord, 0 },
+ { "sndHWM", eCmdHdlrInt, 0 },
+ { "rcvHWM", eCmdHdlrInt, 0 },
+ { "identity", eCmdHdlrGetWord, 0 },
+ { "sndBuf", eCmdHdlrInt, 0 },
+ { "rcvBuf", eCmdHdlrInt, 0 },
+ { "linger", eCmdHdlrInt, 0 },
+ { "backlog", eCmdHdlrInt, 0 },
+ { "sndTimeout", eCmdHdlrInt, 0 },
+ { "rcvTimeout", eCmdHdlrInt, 0 },
+ { "maxMsgSize", eCmdHdlrInt, 0 },
+ { "rate", eCmdHdlrInt, 0 },
+ { "recoveryIVL", eCmdHdlrInt, 0 },
+ { "multicastHops", eCmdHdlrInt, 0 },
+ { "reconnectIVL", eCmdHdlrInt, 0 },
+ { "reconnectIVLMax", eCmdHdlrInt, 0 },
+ { "ipv4Only", eCmdHdlrInt, 0 },
+ { "affinity", eCmdHdlrInt, 0 },
+ { "globalWorkerThreads", eCmdHdlrInt, 0 },
+ { "template", eCmdHdlrGetWord, 1 }
+};
+
+static struct cnfparamblk actpblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+};
+
+/* ----------------------------------------------------------------------------
+ * Helper Functions
+ */
+
+// get the name of a socket type, return the ZMQ_XXX type
+// or -1 if not a supported type (see above)
+int getSocketType(char* name) {
+ int type = -1;
+ uint i;
+ for(i=0; i<sizeof(types)/sizeof(struct socket_type); ++i) {
+ if( !strcmp(types[i].name, name) ) {
+ type = types[i].type;
+ break;
+ }
+ }
+ return type;
+}
+
+
+static int getSocketAction(char* name) {
+ int action = -1;
+ uint i;
+ for(i=0; i < sizeof(actions)/sizeof(struct socket_action); ++i) {
+ if(!strcmp(actions[i].name, name)) {
+ action = actions[i].action;
+ break;
+ }
+ }
+ return action;
+}
+
+/* closeZMQ will destroy the context and
+ * associated socket
+ */
+static void closeZMQ(instanceData* pData) {
+ errmsg.LogError(0, NO_ERRCODE, "closeZMQ called");
+ if(s_context && pData->socket) {
+ if(pData->socket != NULL) {
+ zsocket_destroy(s_context, pData->socket);
+ }
+ }
+}
+
+
+static rsRetVal initZMQ(instanceData* pData) {
+ DEFiRet;
+
+ // create the context if necessary.
+ if (NULL == s_context) {
+ s_context = zctx_new();
+ if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads);
+ }
+
+ pData->socket = zsocket_new(s_context, pData->type);
+
+ // ALWAYS set the HWM as the zmq3 default is 1000 and we default
+ // to 0 (infinity)
+ zsocket_set_rcvhwm(pData->socket, pData->rcvHWM);
+ zsocket_set_sndhwm(pData->socket, pData->sndHWM);
+
+ // use czmq defaults for these, unless set to non-default values
+ if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity);
+ if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf);
+ if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf);
+ if(pData->linger > -1) zsocket_set_linger(pData->socket, pData->linger);
+ if(pData->backlog > -1) zsocket_set_backlog(pData->socket, pData->backlog);
+ if(pData->sndTimeout > -1) zsocket_set_sndtimeo(pData->socket, pData->sndTimeout);
+ if(pData->rcvTimeout > -1) zsocket_set_rcvtimeo(pData->socket, pData->rcvTimeout);
+ if(pData->maxMsgSize > -1) zsocket_set_maxmsgsize(pData->socket, pData->maxMsgSize);
+ if(pData->rate > -1) zsocket_set_rate(pData->socket, pData->rate);
+ if(pData->recoveryIVL > -1) zsocket_set_recovery_ivl(pData->socket, pData->recoveryIVL);
+ if(pData->multicastHops > -1) zsocket_set_multicast_hops(pData->socket, pData->multicastHops);
+ if(pData->reconnectIVL > -1) zsocket_set_reconnect_ivl(pData->socket, pData->reconnectIVL);
+ if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax);
+ if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only);
+ if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity);
+
+ // bind or connect to it
+ if (pData->action == ACTION_BIND) {
+ // bind asserts, so no need to test return val here
+ // which isn't the greatest api -- oh well
+ zsocket_bind(pData->socket, (char*)pData->description);
+ } else {
+ if(zsocket_connect(pData->socket, (char*)pData->description) == -1) {
+ errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ }
+ finalize_it:
+ RETiRet;
+}
+
+rsRetVal writeZMQ(uchar* msg, instanceData* pData) {
+ DEFiRet;
+
+ // initialize if necessary
+ if(NULL == pData->socket)
+ CHKiRet(initZMQ(pData));
+
+ // send the shit...
+ int result = zstr_send(pData->socket, (char*)msg);
+
+ // whine if shit went wrong
+ if (result == -1) {
+ errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ finalize_it:
+ RETiRet;
+}
+
+static inline void
+setInstParamDefaults(instanceData* pData) {
+ pData->description = (uchar*)"tcp://*:7171";
+ pData->socket = NULL;
+ pData->tplName = NULL;
+ pData->type = ZMQ_PUB;
+ pData->action = ACTION_BIND;
+ pData->sndHWM = 0; // unlimited
+ pData->rcvHWM = 0; // unlimited
+ pData->identity = NULL;
+ pData->sndBuf = -1;
+ pData->rcvBuf = -1;
+ pData->linger = -1;
+ pData->backlog = -1;
+ pData->sndTimeout = -1;
+ pData->rcvTimeout = -1;
+ pData->maxMsgSize = -1;
+ pData->rate = -1;
+ pData->recoveryIVL = -1;
+ pData->multicastHops = -1;
+ pData->reconnectIVL = -1;
+ pData->reconnectIVLMax = -1;
+ pData->ipv4Only = -1;
+ pData->affinity = 1;
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Output Module Functions
+ */
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ closeZMQ(pData);
+ free(pData->description);
+ free(pData->tplName);
+ENDfreeInstance
+
+BEGINtryResume
+CODESTARTtryResume
+ if(NULL == pData->socket)
+ iRet = initZMQ(pData);
+ENDtryResume
+
+BEGINdoAction
+CODESTARTdoAction
+iRet = writeZMQ(ppString[0], pData);
+ENDdoAction
+
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+CHKiRet(createInstance(&pData));
+setInstParamDefaults(pData);
+
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "description")) {
+ pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "sockType")){
+ pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if(!strcmp(actpblk.descr[i].name, "action")){
+ pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL));
+ } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) {
+ pData->sndHWM = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) {
+ pData->rcvHWM = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "identity")){
+ pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) {
+ pData->sndBuf = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) {
+ pData->rcvBuf = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "linger")) {
+ pData->linger = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "backlog")) {
+ pData->backlog = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) {
+ pData->sndTimeout = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) {
+ pData->rcvTimeout = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) {
+ pData->maxMsgSize = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "rate")) {
+ pData->rate = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) {
+ pData->recoveryIVL = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) {
+ pData->multicastHops = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) {
+ pData->reconnectIVL = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) {
+ pData->reconnectIVLMax = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) {
+ pData->ipv4Only = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "affinity")) {
+ pData->affinity = (int) pvals[i].val.d.n, NULL;
+ } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) {
+ s_workerThreads = (int) pvals[i].val.d.n, NULL;
+ } else {
+ errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+if(pData->tplName == NULL) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ } else {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
+ }
+
+if(pData->type == -1) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type.");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+if(pData->action == -1) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+
+/* tell the engine we only want one template string */
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1))
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "omzmq3 supports only v6 config format, use: "
+ "action(type=\"omzmq3\" serverport=...)");
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+BEGINinitConfVars /* (re)set config variables to defaults */
+CODESTARTinitConfVars
+s_workerThreads = -1;
+ENDinitConfVars
+
+BEGINmodExit
+CODESTARTmodExit
+if(NULL != s_context) {
+ zctx_destroy(&s_context);
+ s_context=NULL;
+ }
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ENDqueryEtryPt
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+ DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION);
+
+INITLegCnfVars
+CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID));
+ENDmodInit
+
+
+
diff --git a/runtime/statsobj.c b/runtime/statsobj.c
index a21614f6..25275616 100644
--- a/runtime/statsobj.c
+++ b/runtime/statsobj.c
@@ -168,15 +168,18 @@ finalize_it:
/* get all the object's countes together as CEE. */
static rsRetVal
-getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr)
+getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr, int cee_cookie)
{
cstr_t *pcstr;
ctr_t *pCtr;
DEFiRet;
CHKiRet(cstrConstruct(&pcstr));
- rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("@cee: {"), 7);
+ if (cee_cookie == 1)
+ rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("@cee: "), 6);
+
+ rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("{"), 1);
rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("\""), 1);
rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("name"), 4);
rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("\""), 1);
@@ -273,8 +276,11 @@ getAllStatsLines(rsRetVal(*cb)(void*, cstr_t*), void *usrptr, statsFmtType_t fmt
case statsFmt_Legacy:
CHKiRet(getStatsLine(o, &cstr));
break;
+ case statsFmt_CEE:
+ CHKiRet(getStatsLineCEE(o, &cstr, 1));
+ break;
case statsFmt_JSON:
- CHKiRet(getStatsLineCEE(o, &cstr));
+ CHKiRet(getStatsLineCEE(o, &cstr, 0));
break;
}
CHKiRet(cb(usrptr, cstr));
diff --git a/runtime/statsobj.h b/runtime/statsobj.h
index f7de68ee..14b33215 100644
--- a/runtime/statsobj.h
+++ b/runtime/statsobj.h
@@ -46,7 +46,8 @@ typedef enum statsCtrType_e {
/* stats line format types */
typedef enum statsFmtType_e {
statsFmt_Legacy,
- statsFmt_JSON
+ statsFmt_JSON,
+ statsFmt_CEE
} statsFmtType_t;