summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-10-05 16:10:27 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-10-05 16:10:27 +0200
commitc7e9e2123ee71fb868bbcff59225e009e21af590 (patch)
tree12ab52bb20cb34c9fcd36816e836a1959440d5fc /plugins
parent304ab88dd64b2ae46554fa5394aa8822194b1e95 (diff)
parentb5afa1a5be2909ed13f77ef0e0804bab75325198 (diff)
downloadrsyslog-c7e9e2123ee71fb868bbcff59225e009e21af590.tar.gz
rsyslog-c7e9e2123ee71fb868bbcff59225e009e21af590.tar.xz
rsyslog-c7e9e2123ee71fb868bbcff59225e009e21af590.zip
Merge branch 'v5-devel'
Conflicts: ChangeLog configure.ac doc/manual.html plugins/imuxsock/imuxsock.c runtime/rsyslog.h
Diffstat (limited to 'plugins')
-rw-r--r--plugins/impstats/Makefile.am6
-rw-r--r--plugins/impstats/impstats.c228
-rw-r--r--plugins/imuxsock/Makefile.am2
-rw-r--r--plugins/imuxsock/imuxsock.c642
-rw-r--r--plugins/omhdfs/Makefile.am6
-rw-r--r--plugins/omhdfs/omhdfs.c475
6 files changed, 1247 insertions, 112 deletions
diff --git a/plugins/impstats/Makefile.am b/plugins/impstats/Makefile.am
new file mode 100644
index 00000000..187bbca4
--- /dev/null
+++ b/plugins/impstats/Makefile.am
@@ -0,0 +1,6 @@
+pkglib_LTLIBRARIES = impstats.la
+
+impstats_la_SOURCES = impstats.c
+impstats_la_CPPFLAGS = $(RSRT_CFLAGS) -I$(top_srcdir) $(PTHREADS_CFLAGS)
+impstats_la_LDFLAGS = -module -avoid-version
+impstats_la_LIBADD =
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c
new file mode 100644
index 00000000..b701b684
--- /dev/null
+++ b/plugins/impstats/impstats.c
@@ -0,0 +1,228 @@
+/* impstats.c
+ * A module to periodically output statistics gathered by rsyslog.
+ *
+ * NOTE: read comments in module-template.h to understand how this file
+ * works!
+ *
+ * File begun on 2007-07-20 by RGerhards (extracted from syslogd.c)
+ * This file is under development and has not yet arrived at being fully
+ * self-contained and a real object. So far, it is mostly an excerpt
+ * of the "old" message code without any modifications. However, it
+ * helps to have things at the right place one we go to the meat of it.
+ *
+ * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <assert.h>
+#include <signal.h>
+#include <string.h>
+#include <pthread.h>
+#include "dirty.h"
+#include "cfsysline.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "msg.h"
+#include "srUtils.h"
+#include "unicode-helper.h"
+#include "glbl.h"
+#include "statsobj.h"
+#include "prop.h"
+
+MODULE_TYPE_INPUT
+
+/* defines */
+#define DEFAULT_STATS_PERIOD (5 * 60)
+#define DEFAULT_FACILITY 5 /* syslog */
+#define DEFAULT_SEVERITY 6 /* info */
+
+/* Module static data */
+DEF_IMOD_STATIC_DATA
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(prop)
+DEFobjCurrIf(statsobj)
+DEFobjCurrIf(errmsg)
+
+typedef struct configSettings_s {
+ int iStatsInterval;
+ int iFacility;
+ int iSeverity;
+} configSettings_t;
+
+static configSettings_t cs;
+
+static prop_t *pInputName = NULL;
+static prop_t *pLocalHostIP = NULL;
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+static inline void
+initConfigSettings(void)
+{
+ cs.iStatsInterval = DEFAULT_STATS_PERIOD;
+ cs.iFacility = DEFAULT_FACILITY;
+ cs.iSeverity = DEFAULT_SEVERITY;
+}
+
+
+/* actually submit a message to the rsyslog core
+ */
+static inline rsRetVal
+doSubmitMsg(uchar *line)
+{
+ msg_t *pMsg;
+ DEFiRet;
+
+ CHKiRet(msgConstruct(&pMsg));
+ MsgSetInputName(pMsg, pInputName);
+ MsgSetRawMsgWOSize(pMsg, (char*)line);
+ MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()));
+ MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp());
+ MsgSetRcvFromIP(pMsg, pLocalHostIP);
+ MsgSetMSGoffs(pMsg, 0);
+ MsgSetTAG(pMsg, UCHAR_CONSTANT("rsyslogd-pstats:"), sizeof("rsyslogd-pstats:") - 1);
+ pMsg->iFacility = cs.iFacility;
+ pMsg->iSeverity = cs.iSeverity;
+ pMsg->msgFlags = 0;
+
+ submitMsg(pMsg);
+
+finalize_it:
+ RETiRet;
+
+}
+
+
+/* callback for statsobj
+ * Note: usrptr exists only to satisfy requirements of statsobj callback interface!
+ */
+static rsRetVal
+doStatsLine(void __attribute__((unused)) *usrptr, cstr_t *cstr)
+{
+ DEFiRet;
+ doSubmitMsg(rsCStrGetSzStrNoNULL(cstr));
+ RETiRet;
+}
+
+
+/* the function to generate the actual statistics messages
+ * rgerhards, 2010-09-09
+ */
+static inline void
+generateStatsMsgs(void)
+{
+ statsobj.GetAllStatsLines(doStatsLine, NULL);
+}
+
+
+BEGINrunInput
+CODESTARTrunInput
+ /* this is an endless loop - it is terminated when the thread is
+ * signalled to do so. This, however, is handled by the framework,
+ * right into the sleep below.
+ */
+ while(1) {
+ srSleep(cs.iStatsInterval, 0); /* seconds, micro seconds */
+
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
+
+ generateStatsMsgs();
+ }
+ENDrunInput
+
+
+BEGINwillRun
+ rsRetVal localRet;
+CODESTARTwillRun
+ DBGPRINTF("impstats: stats interval %d seconds\n", cs.iStatsInterval);
+ if(cs.iStatsInterval == 0)
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ localRet = statsobj.EnableStats();
+ if(localRet != RS_RET_OK) {
+ errmsg.LogError(0, localRet, "impstat: error enabling statistics gathering");
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
+finalize_it:
+ENDwillRun
+
+
+BEGINafterRun
+CODESTARTafterRun
+ENDafterRun
+
+
+BEGINmodExit
+CODESTARTmodExit
+ prop.Destruct(&pInputName);
+ prop.Destruct(&pLocalHostIP);
+ /* release objects we used */
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
+ENDmodExit
+
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
+{
+ initConfigSettings();
+ return RS_RET_OK;
+}
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("impstats version %s loading\n", VERSION);
+ initConfigSettings();
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatsinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+
+ CHKiRet(prop.Construct(&pInputName));
+ CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("impstats"), sizeof("impstats") - 1));
+ CHKiRet(prop.ConstructFinalize(pInputName));
+
+ CHKiRet(prop.Construct(&pLocalHostIP));
+ CHKiRet(prop.SetString(pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1));
+ CHKiRet(prop.ConstructFinalize(pLocalHostIP));
+ENDmodInit
+/* vi:set ai:
+ */
diff --git a/plugins/imuxsock/Makefile.am b/plugins/imuxsock/Makefile.am
index 28f9f9e3..34a0ad9a 100644
--- a/plugins/imuxsock/Makefile.am
+++ b/plugins/imuxsock/Makefile.am
@@ -1,6 +1,6 @@
pkglib_LTLIBRARIES = imuxsock.la
imuxsock_la_SOURCES = imuxsock.c
-imuxsock_la_CPPFLAGS = -DSD_EXPORT_SYMBOLS -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
+imuxsock_la_CPPFLAGS = -DSD_EXPORT_SYMBOLS -I../../runtime/hashtable -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
imuxsock_la_LDFLAGS = -module -avoid-version
imuxsock_la_LIBADD = $(RSRT_LIBS)
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 73fe962f..be162a1b 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-12-20 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -29,12 +29,14 @@
#include "rsyslog.h"
#include <stdlib.h>
#include <stdio.h>
+#include <ctype.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/un.h>
+#include <sys/socket.h>
#include "dirty.h"
#include "cfsysline.h"
#include "unicode-helper.h"
@@ -48,11 +50,14 @@
#include "debug.h"
#include "unlimited_select.h"
#include "sd-daemon.h"
+#include "statsobj.h"
+#include "datetime.h"
+#include "hashtable.h"
MODULE_TYPE_INPUT
/* defines */
-#define MAXFUNIX 20
+#define MAXFUNIX 50
#ifndef _PATH_LOG
#ifdef BSD
#define _PATH_LOG "/var/run/log"
@@ -72,30 +77,156 @@ DEF_IMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(prop)
+DEFobjCurrIf(datetime)
+DEFobjCurrIf(statsobj)
+
+statsobj_t *modStats;
+STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit)
+STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters)
+
+struct rs_ratelimit_state {
+ unsigned short interval;
+ unsigned short burst;
+ unsigned done;
+ unsigned missed;
+ time_t begin;
+};
+typedef struct rs_ratelimit_state rs_ratelimit_state_t;
+
+
+/* a very simple "hash function" for process IDs - we simply use the
+ * pid itself: it is quite expected that all pids may log some time, but
+ * from a collision point of view it is likely that long-running daemons
+ * start early and so will stay right in the top spots of the
+ * collision list.
+ */
+static unsigned int
+hash_from_key_fn(void *k)
+{
+ return((unsigned) *((pid_t*) k));
+}
+
+static int
+key_equals_fn(void *key1, void *key2)
+{
+ return *((pid_t*) key1) == *((pid_t*) key2);
+}
+
+
+/* structure to describe a specific listener */
+typedef struct lstn_s {
+ uchar *sockName; /* read-only after startup */
+ prop_t *hostName; /* host-name override - if set, use this instead of actual name */
+ int fd; /* read-only after startup */
+ int flags; /* should parser parse host name? read-only after startup */
+ int flowCtl; /* flow control settings for this socket */
+ int ratelimitInterval;
+ int ratelimitBurst;
+ intTiny ratelimitSev; /* severity level (and below) for which rate-limiting shall apply */
+ struct hashtable *ht; /* our hashtable for rate-limiting */
+ sbool bParseHost; /* should parser parse host name? read-only after startup */
+ sbool bCreatePath; /* auto-creation of socket directory? */
+ sbool bUseCreds; /* pull original creator credentials from socket */
+ sbool bWritePid; /* write original PID into tag */
+} lstn_t;
+static lstn_t listeners[MAXFUNIX];
static prop_t *pLocalHostIP = NULL; /* there is only one global IP for all internally-generated messages */
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
-static int startIndexUxLocalSockets; /* process funix from that index on (used to
+static int startIndexUxLocalSockets; /* process fd from that index on (used to
* suppress local logging. rgerhards 2005-08-01
* read-only after startup
*/
-static int funixParseHost[MAXFUNIX] = { 0, }; /* should parser parse host name? read-only after startup */
-static int funixFlags[MAXFUNIX] = { IGNDATE, }; /* should parser parse host name? read-only after startup */
-static int funixCreateSockPath[MAXFUNIX] = { 0, }; /* auto-creation of socket directory? */
-static uchar *funixn[MAXFUNIX] = { (uchar*) _PATH_LOG }; /* read-only after startup */
-static prop_t *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */
-static int funixFlowCtl[MAXFUNIX] = { eFLOWCTL_NO_DELAY, }; /* flow control settings for this socket */
-static int funix[MAXFUNIX] = { -1, }; /* read-only after startup */
-static int nfunix = 1; /* number of Unix sockets open / read-only after startup */
+static int nfd = 1; /* number of Unix sockets open / read-only after startup */
+static int bSysSockFromSystemd = 0; /* Did we receive the system socket from systemd? */
/* config settings */
static int bOmitLocalLogging = 0;
static uchar *pLogSockName = NULL;
static uchar *pLogHostName = NULL; /* host name to use with this socket */
static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is used! */
-static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
-#define DFLT_bCreateSockPath 0
-static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? */
+static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
+static int bWritePid = 0; /* use credentials from recvmsg() and fixup PID in TAG */
+static int bWritePidSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */
+#define DFLT_bCreatePath 0
+static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */
+#define DFLT_ratelimitInterval 5
+static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */
+static int ratelimitIntervalSysSock = DFLT_ratelimitInterval;
+#define DFLT_ratelimitBurst 200
+static int ratelimitBurst = DFLT_ratelimitBurst; /* max nbr of messages in interval */
+static int ratelimitBurstSysSock = DFLT_ratelimitBurst; /* max nbr of messages in interval */
+#define DFLT_ratelimitSeverity 1 /* do not rate-limit emergency messages */
+static int ratelimitSeverity = DFLT_ratelimitSeverity;
+static int ratelimitSeveritySysSock = DFLT_ratelimitSeverity;
+
+
+
+static void
+initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst)
+{
+ rs->interval = interval;
+ rs->burst = burst;
+ rs->done = 0;
+ rs->missed = 0;
+ rs->begin = 0;
+}
+
+
+/* ratelimiting support, modelled after the linux kernel
+ * returns 1 if message is within rate limit and shall be
+ * processed, 0 otherwise.
+ * This implementation is NOT THREAD-SAFE and must not
+ * be called concurrently.
+ */
+static inline int
+withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid)
+{
+ int ret;
+ uchar msgbuf[1024];
+
+ if(rs->interval == 0) {
+ ret = 1;
+ goto finalize_it;
+ }
+
+ assert(rs->burst != 0);
+
+ if(rs->begin == 0)
+ rs->begin = tt;
+
+ /* resume if we go out of out time window */
+ if(tt > rs->begin + rs->interval) {
+ if(rs->missed) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "imuxsock lost %u messages from pid %lu due to rate-limiting",
+ rs->missed, (unsigned long) pid);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ rs->missed = 0;
+ }
+ rs->begin = 0;
+ rs->done = 0;
+ }
+
+ /* do actual limit check */
+ if(rs->burst > rs->done) {
+ rs->done++;
+ ret = 1;
+ } else {
+ if(rs->missed == 0) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "imuxsock begins to drop messages from pid %lu due to rate-limiting",
+ (unsigned long) pid);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ }
+ rs->missed++;
+ ret = 0;
+ }
+
+finalize_it:
+ return ret;
+}
/* set the timestamp ignore / not ignore option for the system
@@ -105,7 +236,7 @@ static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? *
static rsRetVal setSystemLogTimestampIgnore(void __attribute__((unused)) *pVal, int iNewVal)
{
DEFiRet;
- funixFlags[0] = iNewVal ? IGNDATE : NOFLAG;
+ listeners[0].flags = iNewVal ? IGNDATE : NOFLAG;
RETiRet;
}
@@ -114,7 +245,7 @@ static rsRetVal setSystemLogTimestampIgnore(void __attribute__((unused)) *pVal,
static rsRetVal setSystemLogFlowControl(void __attribute__((unused)) *pVal, int iNewVal)
{
DEFiRet;
- funixFlowCtl[0] = iNewVal ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
+ listeners[0].flowCtl = iNewVal ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
RETiRet;
}
@@ -131,26 +262,40 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
{
DEFiRet;
- if(nfunix < MAXFUNIX) {
+ if(nfd < MAXFUNIX) {
if(*pNewVal == ':') {
- funixParseHost[nfunix] = 1;
+ listeners[nfd].bParseHost = 1;
} else {
- funixParseHost[nfunix] = 0;
+ listeners[nfd].bParseHost = 0;
}
- CHKiRet(prop.Construct(&(funixHName[nfunix])));
+ CHKiRet(prop.Construct(&(listeners[nfd].hostName)));
if(pLogHostName == NULL) {
- CHKiRet(prop.SetString(funixHName[nfunix], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
+ CHKiRet(prop.SetString(listeners[nfd].hostName, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
} else {
- CHKiRet(prop.SetString(funixHName[nfunix], pLogHostName, ustrlen(pLogHostName)));
+ CHKiRet(prop.SetString(listeners[nfd].hostName, pLogHostName, ustrlen(pLogHostName)));
/* reset hostname for next socket */
free(pLogHostName);
pLogHostName = NULL;
}
- CHKiRet(prop.ConstructFinalize(funixHName[nfunix]));
- funixFlowCtl[nfunix] = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
- funixFlags[nfunix] = bIgnoreTimestamp ? IGNDATE : NOFLAG;
- funixCreateSockPath[nfunix] = bCreateSockPath;
- funixn[nfunix++] = pNewVal;
+ CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
+ if(ratelimitInterval > 0) {
+ if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ /* in this case, we simply turn of rate-limiting */
+ dbgprintf("imuxsock: turning off rate limiting because we could not "
+ "create hash table\n");
+ ratelimitInterval = 0;
+ }
+ }
+ listeners[nfd].ratelimitInterval = ratelimitInterval;
+ listeners[nfd].ratelimitBurst = ratelimitBurst;
+ listeners[nfd].ratelimitSev = ratelimitSeverity;
+ listeners[nfd].flowCtl = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
+ listeners[nfd].flags = bIgnoreTimestamp ? IGNDATE : NOFLAG;
+ listeners[nfd].bCreatePath = bCreatePath;
+ listeners[nfd].sockName = pNewVal;
+ listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval) ? 1 : 0;
+ listeners[nfd].bWritePid = bWritePid;
+ nfd++;
} else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
pNewVal);
@@ -161,21 +306,23 @@ finalize_it:
}
-/* free the funixn[] socket names - needed as cleanup on several places
- * note that nfunix is NOT reset! funixn[0] is never freed, as it comes from
+/* discard all log sockets except for "socket" 0. Data for it comes from
* the constant memory pool - and if not, it is freeed via some other pointer.
*/
-static rsRetVal discardFunixn(void)
+static rsRetVal discardLogSockets(void)
{
int i;
- for (i = 1; i < nfunix; i++) {
- if(funixn[i] != NULL) {
- free(funixn[i]);
- funixn[i] = NULL;
+ for (i = 1; i < nfd; i++) {
+ if(listeners[i].sockName != NULL) {
+ free(listeners[i].sockName);
+ listeners[i].sockName = NULL;
}
- if(funixHName[i] != NULL) {
- prop.Destruct(&(funixHName[i]));
+ if(listeners[i].hostName != NULL) {
+ prop.Destruct(&(listeners[i].hostName));
+ }
+ if(listeners[i].ht != NULL) {
+ hashtable_destroy(listeners[i].ht, 1); /* 1 => free all values automatically */
}
}
@@ -183,92 +330,256 @@ static rsRetVal discardFunixn(void)
}
-static int create_unix_socket(const char *path, int bCreatePath)
+/* used to create a log socket if NOT passed in via systemd.
+ */
+static inline rsRetVal
+createLogSocket(lstn_t *pLstn)
{
struct sockaddr_un sunx;
- int fd;
+ DEFiRet;
- if (path[0] == '\0')
+ unlink((char*)pLstn->sockName);
+ memset(&sunx, 0, sizeof(sunx));
+ sunx.sun_family = AF_UNIX;
+ if(pLstn->bCreatePath) {
+ makeFileParentDirs((uchar*)pLstn->sockName, ustrlen(pLstn->sockName), 0755, -1, -1, 0);
+ }
+ strncpy(sunx.sun_path, (char*)pLstn->sockName, sizeof(sunx.sun_path));
+ pLstn->fd = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if(pLstn->fd < 0 || bind(pLstn->fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 ||
+ chmod((char*)pLstn->sockName, 0666) < 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "connot create '%s'", pLstn->sockName);
+ dbgprintf("cannot create %s (%d).\n", pLstn->sockName, errno);
+ close(pLstn->fd);
+ pLstn->fd = -1;
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+finalize_it:
+ RETiRet;
+}
+
+
+static inline rsRetVal
+openLogSocket(lstn_t *pLstn)
+{
+ DEFiRet;
+ int one;
+
+ if(pLstn->sockName[0] == '\0')
return -1;
- if (strcmp(path, _PATH_LOG) == 0) {
+ if (ustrcmp(pLstn->sockName, UCHAR_CONSTANT(_PATH_LOG)) == 0) {
+ bSysSockFromSystemd = 0; /* set default */
int r;
- /* Check whether an FD was passed in from systemd. If
+ /* System log socket code. Check whether an FD was passed in from systemd. If
* so, it's the /dev/log socket, so use it. */
r = sd_listen_fds(0);
if (r < 0) {
errmsg.LogError(-r, NO_ERRCODE, "Failed to acquire systemd socket");
- return -1;
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
}
if (r > 1) {
errmsg.LogError(EINVAL, NO_ERRCODE, "Wrong number of systemd sockets passed");
- return -1;
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
}
if (r == 1) {
- fd = SD_LISTEN_FDS_START;
- r = sd_is_socket_unix(fd, SOCK_DGRAM, -1, _PATH_LOG, 0);
+ pLstn->fd = SD_LISTEN_FDS_START;
+ r = sd_is_socket_unix(pLstn->fd, SOCK_DGRAM, -1, _PATH_LOG, 0);
if (r < 0) {
errmsg.LogError(-r, NO_ERRCODE, "Failed to verify systemd socket type");
- return -1;
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
}
if (!r) {
errmsg.LogError(EINVAL, NO_ERRCODE, "Passed systemd socket of wrong type");
- return -1;
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
}
+ bSysSockFromSystemd = 1; /* indicate we got the socket from systemd */
+ } else {
+ CHKiRet(createLogSocket(pLstn));
+ }
+ } else {
+ CHKiRet(createLogSocket(pLstn));
+ }
- return fd;
- }
- }
-
- unlink(path);
+ if(pLstn->bUseCreds) {
+ one = 1;
+ if(setsockopt(pLstn->fd, SOL_SOCKET, SO_PASSCRED, &one, (socklen_t) sizeof(one)) != 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "set SO_PASSCRED failed on '%s'", pLstn->sockName);
+ pLstn->bUseCreds = 0;
+ }
+ if(setsockopt(pLstn->fd, SOL_SOCKET, SCM_CREDENTIALS, &one, sizeof(one)) != 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "set SCM_CREDENTIALS failed on '%s'", pLstn->sockName);
+ pLstn->bUseCreds = 0;
+ }
+ }
- memset(&sunx, 0, sizeof(sunx));
- sunx.sun_family = AF_UNIX;
- if(bCreatePath) {
- makeFileParentDirs((uchar*)path, strlen(path), 0755, -1, -1, 0);
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ close(pLstn->fd);
+ pLstn->fd = -1;
}
- (void) strncpy(sunx.sun_path, path, sizeof(sunx.sun_path));
- fd = socket(AF_UNIX, SOCK_DGRAM, 0);
- if (fd < 0 || bind(fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 ||
- chmod(path, 0666) < 0) {
- errmsg.LogError(errno, NO_ERRCODE, "connot create '%s'", path);
- dbgprintf("cannot create %s (%d).\n", path, errno);
- close(fd);
- return -1;
+
+ RETiRet;
+}
+
+
+/* find ratelimiter to use for this message. Currently, we use the
+ * pid, but may change to cgroup later (probably via a config switch).
+ * Returns NULL if not found.
+ */
+static inline rsRetVal
+findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
+{
+ rs_ratelimit_state_t *rl;
+ int r;
+ pid_t *keybuf;
+ DEFiRet;
+
+ if(cred == NULL)
+ FINALIZE;
+
+ rl = hashtable_search(pLstn->ht, &cred->pid);
+ if(rl == NULL) {
+ /* we need to add a new ratelimiter, process not seen before! */
+ dbgprintf("imuxsock: no ratelimiter for pid %lu, creating one\n",
+ (unsigned long) cred->pid);
+ STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters);
+ CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t)));
+ CHKmalloc(keybuf = malloc(sizeof(pid_t)));
+ *keybuf = cred->pid;
+ initRatelimitState(rl, ratelimitInterval, pLstn->ratelimitBurst);
+ r = hashtable_insert(pLstn->ht, keybuf, rl);
+ if(r == 0)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
- return fd;
+
+ *prl = rl;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* patch correct pid into tag. bufTAG MUST be CONF_TAG_MAXSIZE long!
+ */
+static inline void
+fixPID(uchar *bufTAG, int *lenTag, struct ucred *cred)
+{
+ int i;
+ char bufPID[16];
+ int lenPID;
+
+ if(cred == NULL)
+ return;
+
+ lenPID = snprintf(bufPID, sizeof(bufPID), "[%lu]:", (unsigned long) cred->pid);
+
+ for(i = *lenTag ; i >= 0 && bufTAG[i] != '[' ; --i)
+ /*JUST SKIP*/;
+
+ if(i < 0)
+ i = *lenTag - 1; /* go right at end of TAG, pid was not present (-1 for ':') */
+
+ if(i + lenPID > CONF_TAG_MAXSIZE)
+ return; /* do not touch, as things would break */
+
+ memcpy(bufTAG + i, bufPID, lenPID);
+ *lenTag = i + lenPID;
}
/* submit received message to the queue engine
+ * We now parse the message according to expected format so that we
+ * can also mangle it if necessary.
*/
static inline rsRetVal
-SubmitMsg(uchar *pRcv, int lenRcv, int iSock)
+SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
{
msg_t *pMsg;
+ int lenMsg;
+ int i;
+ uchar *parse;
+ int pri;
+ int facil;
+ int sever;
+ uchar bufParseTAG[CONF_TAG_MAXSIZE];
+ struct syslogTime st;
+ time_t tt;
+ rs_ratelimit_state_t *ratelimiter = NULL;
DEFiRet;
+// TODO: handle format errors??
+ /* we need to parse the pri first, because we need the severity for
+ * rate-limiting as well.
+ */
+ parse = pRcv;
+ lenMsg = lenRcv;
+
+ parse++; lenMsg--; /* '<' */
+ pri = 0;
+ while(lenMsg && isdigit(*parse)) {
+ pri = pri * 10 + *parse - '0';
+ ++parse;
+ --lenMsg;
+ }
+ facil = LOG_FAC(pri);
+ sever = LOG_PRI(pri);
+
+ if(sever >= pLstn->ratelimitSev)
+ findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */
+
+ datetime.getCurrTime(&st, &tt);
+ if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
+ STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
+ FINALIZE;
+ }
+
/* we now create our own message object and submit it to the queue */
- CHKiRet(msgConstruct(&pMsg));
+ CHKiRet(msgConstructWithTime(&pMsg, &st, tt));
MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv);
MsgSetInputName(pMsg, pInputName);
- MsgSetFlowControlType(pMsg, funixFlowCtl[iSock]);
+ MsgSetFlowControlType(pMsg, pLstn->flowCtl);
+
+ pMsg->iFacility = facil;
+ pMsg->iSeverity = sever;
+ MsgSetAfterPRIOffs(pMsg, lenRcv - lenMsg);
+
+ parse++; lenMsg--; /* '>' */
+
+ if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
+ DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
+ }
+
+ /* pull tag */
+
+ i = 0;
+ while(lenMsg > 0 && *parse != ' ' && i < CONF_TAG_MAXSIZE) {
+ bufParseTAG[i++] = *parse++;
+ --lenMsg;
+ }
+ bufParseTAG[i] = '\0'; /* terminate string */
+ if(pLstn->bWritePid)
+ fixPID(bufParseTAG, &i, cred);
+ MsgSetTAG(pMsg, bufParseTAG, i);
- if(funixParseHost[iSock]) {
- pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING | PARSE_HOSTNAME;
+ MsgSetMSGoffs(pMsg, lenRcv - lenMsg);
+
+ if(pLstn->bParseHost) {
+ pMsg->msgFlags = pLstn->flags | PARSE_HOSTNAME;
} else {
- pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING;
+ pMsg->msgFlags = pLstn->flags;
}
- MsgSetRcvFrom(pMsg, funixHName[iSock]);
+ MsgSetRcvFrom(pMsg, pLstn->hostName);
CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
CHKiRet(submitMsg(pMsg));
+ STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
finalize_it:
RETiRet;
}
@@ -281,15 +592,20 @@ finalize_it:
* of the socket which is to be processed. This eases access to the
* growing number of properties. -- rgerhards, 2008-08-01
*/
-static rsRetVal readSocket(int fd, int iSock)
+static rsRetVal readSocket(lstn_t *pLstn)
{
DEFiRet;
int iRcvd;
int iMaxLine;
+ struct msghdr msgh;
+ struct iovec msgiov;
+ struct cmsghdr *cm;
+ struct ucred *cred;
uchar bufRcv[4096+1];
+ char aux[128];
uchar *pRcv = NULL; /* receive buffer */
- assert(iSock >= 0);
+ assert(pLstn->fd >= 0);
iMaxLine = glbl.GetMaxLine();
@@ -305,15 +621,40 @@ static rsRetVal readSocket(int fd, int iSock)
CHKmalloc(pRcv = (uchar*) MALLOC(sizeof(uchar) * (iMaxLine + 1)));
}
- iRcvd = recv(fd, pRcv, iMaxLine, 0);
- dbgprintf("Message from UNIX socket: #%d\n", fd);
- if (iRcvd > 0) {
- CHKiRet(SubmitMsg(pRcv, iRcvd, iSock));
- } else if (iRcvd < 0 && errno != EINTR) {
+ memset(&msgh, 0, sizeof(msgh));
+ memset(&msgiov, 0, sizeof(msgiov));
+ if(pLstn->bUseCreds) {
+ memset(&aux, 0, sizeof(aux));
+ msgh.msg_control = aux;
+ msgh.msg_controllen = sizeof(aux);
+ }
+ msgiov.iov_base = pRcv;
+ msgiov.iov_len = iMaxLine;
+ msgh.msg_iov = &msgiov;
+ msgh.msg_iovlen = 1;
+ iRcvd = recvmsg(pLstn->fd, &msgh, MSG_DONTWAIT);
+
+ dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd);
+ if(iRcvd > 0) {
+ cred = NULL;
+ if(pLstn->bUseCreds) {
+ dbgprintf("XXX: pre CM loop, length of control message %d\n", (int) msgh.msg_controllen);
+ for (cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) {
+ dbgprintf("XXX: in CM loop, %d, %d\n", cm->cmsg_level, cm->cmsg_type);
+ if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) {
+ cred = (struct ucred*) CMSG_DATA(cm);
+ dbgprintf("XXX: got credentials pid %d\n", (int) cred->pid);
+ //break;
+ }
+ }
+ dbgprintf("XXX: post CM loop\n");
+ }
+ CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred));
+ } else if(iRcvd < 0 && errno != EINTR) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr);
- errmsg.LogError(errno, NO_ERRCODE, "recvfrom UNIX");
+ errmsg.LogError(errno, NO_ERRCODE, "imuxsock: recvfrom UNIX");
}
finalize_it:
@@ -352,10 +693,11 @@ CODESTARTrunInput
maxfds = 0;
FD_ZERO (pReadfds);
/* Copy master connections */
- for (i = startIndexUxLocalSockets; i < nfunix; i++) {
- if (funix[i] != -1) {
- FD_SET(funix[i], pReadfds);
- if (funix[i]>maxfds) maxfds=funix[i];
+ for (i = startIndexUxLocalSockets; i < nfd; i++) {
+ if (listeners[i].fd!= -1) {
+ FD_SET(listeners[i].fd, pReadfds);
+ if(listeners[i].fd > maxfds)
+ maxfds=listeners[i].fd;
}
}
@@ -372,11 +714,11 @@ CODESTARTrunInput
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
- for (i = 0; i < nfunix && nfds > 0; i++) {
+ for (i = 0; i < nfd && nfds > 0; i++) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM); /* terminate input! */
- if ((fd = funix[i]) != -1 && FD_ISSET(fd, pReadfds)) {
- readSocket(fd, i);
+ if ((fd = listeners[i].fd) != -1 && FD_ISSET(fd, pReadfds)) {
+ readSocket(&(listeners[i]));
--nfds; /* indicate we have processed one */
}
}
@@ -391,6 +733,7 @@ ENDrunInput
BEGINwillRun
CODESTARTwillRun
register int i;
+ int actSocks;
/* first apply some config settings */
# ifdef OS_SOLARIS
@@ -404,12 +747,33 @@ CODESTARTwillRun
startIndexUxLocalSockets = bOmitLocalLogging ? 1 : 0;
# endif
if(pLogSockName != NULL)
- funixn[0] = pLogSockName;
+ listeners[0].sockName = pLogSockName;
+ if(ratelimitIntervalSysSock > 0) {
+ if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ /* in this case, we simply turn of rate-limiting */
+ dbgprintf("imuxsock: turning off rate limiting because we could not "
+ "create hash table\n");
+ ratelimitIntervalSysSock = 0;
+ }
+ }
+ listeners[0].ratelimitInterval = ratelimitIntervalSysSock;
+ listeners[0].ratelimitBurst = ratelimitBurstSysSock;
+ listeners[0].ratelimitSev = ratelimitSeveritySysSock;
+ listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0;
+ listeners[0].bWritePid = bWritePidSysSock;
/* initialize and return if will run or not */
- for (i = startIndexUxLocalSockets ; i < nfunix ; i++) {
- if ((funix[i] = create_unix_socket((char*) funixn[i], funixCreateSockPath[i])) != -1)
- dbgprintf("Opened UNIX socket '%s' (fd %d).\n", funixn[i], funix[i]);
+ actSocks = 0;
+ for (i = startIndexUxLocalSockets ; i < nfd ; i++) {
+ if(openLogSocket(&(listeners[i])) == RS_RET_OK) {
+ ++actSocks;
+ dbgprintf("Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd);
+ }
+ }
+
+ if(actSocks == 0) {
+ errmsg.LogError(0, NO_ERRCODE, "imuxsock does not run because we could not aquire any socket\n");
+ ABORT_FINALIZE(RS_RET_ERR);
}
/* we need to create the inputName property (only once during our lifetime) */
@@ -426,36 +790,44 @@ CODESTARTafterRun
int i;
/* do cleanup here */
/* Close the UNIX sockets. */
- for (i = 0; i < nfunix; i++)
- if (funix[i] != -1)
- close(funix[i]);
+ for (i = 0; i < nfd; i++)
+ if (listeners[i].fd != -1)
+ close(listeners[i].fd);
/* Clean-up files. If systemd passed us a socket it is
* systemd's job to clean it up.*/
- if (sd_listen_fds(0) > 0)
+ if(bSysSockFromSystemd) {
+ DBGPRINTF("imuxsock: got system socket from systemd, not unlinking it\n");
i = 1;
- else
+ } else
i = startIndexUxLocalSockets;
- for(; i < nfunix; i++)
- if (funixn[i] && funix[i] != -1)
- unlink((char*) funixn[i]);
+ for(; i < nfd; i++)
+ if (listeners[i].sockName && listeners[i].fd != -1) {
+ DBGPRINTF("imuxsock: unlinking unix socket file[%d] %s\n", i, listeners[i].sockName);
+ unlink((char*) listeners[i].sockName);
+ }
/* free no longer needed string */
free(pLogSockName);
free(pLogHostName);
- discardFunixn();
- nfunix = 1;
+ discardLogSockets();
+ nfd = 1;
if(pInputName != NULL)
prop.Destruct(&pInputName);
+
ENDafterRun
BEGINmodExit
CODESTARTmodExit
+ statsobj.Destruct(&modStats);
+
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
ENDmodExit
@@ -484,11 +856,19 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
pLogHostName = NULL;
}
- discardFunixn();
- nfunix = 1;
+ discardLogSockets();
+ nfd = 1;
bIgnoreTimestamp = 1;
bUseFlowCtl = 0;
- bCreateSockPath = DFLT_bCreateSockPath;
+ bWritePid = 0;
+ bWritePidSysSock = 0;
+ bCreatePath = DFLT_bCreatePath;
+ ratelimitInterval = DFLT_ratelimitInterval;
+ ratelimitIntervalSysSock = DFLT_ratelimitInterval;
+ ratelimitBurst = DFLT_ratelimitBurst;
+ ratelimitBurstSysSock = DFLT_ratelimitBurst;
+ ratelimitSeverity = DFLT_ratelimitSeverity;
+ ratelimitSeveritySysSock = DFLT_ratelimitSeverity;
return RS_RET_OK;
}
@@ -502,13 +882,25 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION);
- /* initialize funixn[] array */
+ /* init system log socket settings */
+ listeners[0].flags = IGNDATE;
+ listeners[0].sockName = UCHAR_CONSTANT(_PATH_LOG);
+ listeners[0].hostName = NULL;
+ listeners[0].flowCtl = eFLOWCTL_NO_DELAY;
+ listeners[0].fd = -1;
+ listeners[0].bParseHost = 0;
+ listeners[0].bUseCreds = 0;
+ listeners[0].bCreatePath = 0;
+
+ /* initialize socket names */
for(i = 1 ; i < MAXFUNIX ; ++i) {
- funixn[i] = NULL;
- funix[i] = -1;
+ listeners[i].sockName = NULL;
+ listeners[i].fd = -1;
}
CHKiRet(prop.Construct(&pLocalHostIP));
@@ -516,9 +908,9 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(prop.ConstructFinalize(pLocalHostIP));
/* now init listen socket zero, the local log socket */
- CHKiRet(prop.Construct(&(funixHName[0])));
- CHKiRet(prop.SetString(funixHName[0], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
- CHKiRet(prop.ConstructFinalize(funixHName[0]));
+ CHKiRet(prop.Construct(&(listeners[0].hostName)));
+ CHKiRet(prop.SetString(listeners[0].hostName, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
+ CHKiRet(prop.ConstructFinalize(listeners[0].hostName));
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"omitlocallogging", 0, eCmdHdlrBinary,
@@ -532,9 +924,17 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary,
NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary,
- NULL, &bCreateSockPath, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ NULL, &bCreatePath, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary,
+ NULL, &bWritePid, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt,
+ NULL, &ratelimitInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitburst", 0, eCmdHdlrInt,
+ NULL, &ratelimitBurst, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitseverity", 0, eCmdHdlrInt,
+ NULL, &ratelimitSeverity, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
/* the following one is a (dirty) trick: the system log socket is not added via
@@ -547,6 +947,26 @@ CODEmodInit_QueryRegCFSLineHdlr
setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary,
setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary,
+ NULL, &bWritePidSysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt,
+ NULL, &ratelimitIntervalSysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitburst", 0, eCmdHdlrInt,
+ NULL, &ratelimitBurstSysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitseverity", 0, eCmdHdlrInt,
+ NULL, &ratelimitSeveritySysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&modStats));
+ CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imuxsock")));
+ CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &ctrSubmit));
+ CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("ratelimit.discarded"),
+ ctrType_IntCtr, &ctrLostRatelimit));
+ CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("ratelimit.numratelimiters"),
+ ctrType_IntCtr, &ctrNumRatelimiters));
+ CHKiRet(statsobj.ConstructFinalize(modStats));
+
ENDmodInit
/* vim:set ai:
*/
diff --git a/plugins/omhdfs/Makefile.am b/plugins/omhdfs/Makefile.am
new file mode 100644
index 00000000..95e6b102
--- /dev/null
+++ b/plugins/omhdfs/Makefile.am
@@ -0,0 +1,6 @@
+pkglib_LTLIBRARIES = omhdfs.la
+
+omhdfs_la_SOURCES = omhdfs.c
+omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) $(JAVA_INCLUDES)
+omhdfs_la_LDFLAGS = -module -avoid-version -lhdfs $(JAVA_LIBS)
+omhdfs_la_LIBADD = $(RSRT_LIBS)
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
new file mode 100644
index 00000000..25c330f9
--- /dev/null
+++ b/plugins/omhdfs/omhdfs.c
@@ -0,0 +1,475 @@
+/* omhdfs.c
+ * This is an output module to support Hadoop's HDFS.
+ *
+ * NOTE: read comments in module-template.h to understand how this file
+ * works!
+ *
+ * Copyright 2010 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <assert.h>
+#include <errno.h>
+#include <ctype.h>
+#include <unistd.h>
+#include <sys/file.h>
+#include <pthread.h>
+#include <hdfs.h>
+
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "conf.h"
+#include "cfsysline.h"
+#include "module-template.h"
+#include "unicode-helper.h"
+#include "errmsg.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+
+MODULE_TYPE_OUTPUT
+
+/* internal structures
+ */
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+
+/* global data */
+static struct hashtable *files; /* holds all file objects that we know */
+
+/* globals for default values */
+static uchar *fileName = NULL;
+static uchar *hdfsHost = NULL;
+static uchar *dfltTplName = NULL; /* default template name to use */
+int hdfsPort = 0;
+/* end globals for default values */
+
+typedef struct {
+ uchar *name;
+ hdfsFS fs;
+ hdfsFile fh;
+ const char *hdfsHost;
+ tPort hdfsPort;
+ int nUsers;
+ pthread_mutex_t mut;
+} file_t;
+
+
+typedef struct _instanceData {
+ file_t *pFile;
+} instanceData;
+
+/* forward definitions (down here, need data types) */
+static inline rsRetVal fileClose(file_t *pFile);
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ printf("omhdfs: file:%s", pData->pFile->name);
+ENDdbgPrintInstInfo
+
+
+/* note that hdfsFileExists() does not work, so we did our
+ * own function to see if a pathname exists. Returns 0 if the
+ * file does not exists, something else otherwise. Note that
+ * we can also check a directroy (if that matters...)
+ */
+static int
+HDFSFileExists(hdfsFS fs, uchar *name)
+{
+ int r;
+ hdfsFileInfo *info;
+
+ info = hdfsGetPathInfo(fs, (char*)name);
+ /* if things go wrong, we assume it is because the file
+ * does not exist. We do not get too much information...
+ */
+ if(info == NULL) {
+ r = 0;
+ } else {
+ r = 1;
+ hdfsFreeFileInfo(info, 1);
+ }
+ return r;
+}
+
+static inline rsRetVal
+HDFSmkdir(hdfsFS fs, uchar *name)
+{
+ DEFiRet;
+ if(hdfsCreateDirectory(fs, (char*)name) == -1)
+ ABORT_FINALIZE(RS_RET_ERR);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* ---BEGIN FILE OBJECT---------------------------------------------------- */
+/* This code handles the "file object". This is split from the actual
+ * instance data, because several instances may write into the same file.
+ * If so, we need to use a single object, and also synchronize their writes.
+ * So we keep the file object separately, and just stick a reference into
+ * the instance data.
+ */
+
+static inline rsRetVal
+fileObjConstruct(file_t **ppFile)
+{
+ file_t *pFile;
+ DEFiRet;
+
+ CHKmalloc(pFile = malloc(sizeof(file_t)));
+ pFile->name = NULL;
+ pFile->hdfsHost = NULL;
+ pFile->fh = NULL;
+ pFile->nUsers = 0;
+
+ *ppFile = pFile;
+finalize_it:
+ RETiRet;
+}
+
+static inline void
+fileObjAddUser(file_t *pFile)
+{
+ /* init mutex only when second user is added */
+ ++pFile->nUsers;
+ if(pFile->nUsers == 2)
+ pthread_mutex_init(&pFile->mut, NULL);
+ DBGPRINTF("omhdfs: file %s now being used by %d actions\n", pFile->name, pFile->nUsers);
+}
+
+static rsRetVal
+fileObjDestruct(file_t **ppFile)
+{
+ file_t *pFile = *ppFile;
+ if(pFile->nUsers > 1)
+ pthread_mutex_destroy(&pFile->mut);
+ fileClose(pFile);
+ free(pFile->name);
+ free((char*)pFile->hdfsHost);
+ free(pFile->fh);
+
+ return RS_RET_OK;
+}
+
+
+/* check, and potentially create, all names inside a path */
+static rsRetVal
+filePrepare(file_t *pFile)
+{
+ uchar *p;
+ uchar *pszWork;
+ size_t len;
+ DEFiRet;
+
+ if(HDFSFileExists(pFile->fs, pFile->name))
+ FINALIZE;
+
+ /* file does not exist, create it (and eventually parent directories */
+ if(1) { // check if bCreateDirs
+ len = ustrlen(pFile->name) + 1;
+ CHKmalloc(pszWork = MALLOC(sizeof(uchar) * len));
+ memcpy(pszWork, pFile->name, len);
+ for(p = pszWork+1 ; *p ; p++)
+ if(*p == '/') {
+ /* temporarily terminate string, create dir and go on */
+ *p = '\0';
+ if(!HDFSFileExists(pFile->fs, pszWork)) {
+ CHKiRet(HDFSmkdir(pFile->fs, pszWork));
+ }
+ *p = '/';
+ }
+ free(pszWork);
+ return 0;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* this function is to be used as destructor for the
+ * hash table code.
+ */
+static void
+fileObjDestruct4Hashtable(void *ptr)
+{
+ file_t *pFile = (file_t*) ptr;
+ fileObjDestruct(&pFile);
+}
+
+
+static inline rsRetVal
+fileOpen(file_t *pFile)
+{
+ DEFiRet;
+
+ assert(pFile->fh == NULL);
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n",
+ pFile->hdfsHost, pFile->hdfsPort);
+ pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort);
+ if(pFile->fs == NULL) {
+ DBGPRINTF("omhdfs: error can not connect to hdfs\n");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+ CHKiRet(filePrepare(pFile));
+
+ pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0);
+ if(pFile->fh == NULL) {
+ /* maybe the file does not exist, so we try to create it now.
+ * Note that we can not use hdfsExists() because of a deficit in
+ * it: https://issues.apache.org/jira/browse/HDFS-1154
+ * As of my testing, libhdfs at least seems to return ENOENT if
+ * the file does not exist.
+ */
+ if(errno == ENOENT) {
+ DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
+ pFile->name);
+ pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
+ }
+ }
+ if(pFile->fh == NULL) {
+ DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+ RETiRet;
+}
+
+
+static inline rsRetVal
+fileWrite(file_t *pFile, uchar *buf)
+{
+ size_t lenWrite;
+ DEFiRet;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ /* open file if not open. This must be done *here* and while mutex-protected
+ * because of HUP handling (which is async to normal processing!).
+ */
+ if(pFile->fh == NULL) {
+ fileOpen(pFile);
+ if(pFile->fh == NULL) {
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ }
+
+ lenWrite = strlen((char*) buf);
+ tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
+ if((unsigned) num_written_bytes != lenWrite) {
+ errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
+ "written %lu\n", pFile->name, (unsigned long) lenWrite,
+ (unsigned long) num_written_bytes);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+ RETiRet;
+}
+
+
+static inline rsRetVal
+fileClose(file_t *pFile)
+{
+ DEFiRet;
+
+ if(pFile->fh == NULL)
+ FINALIZE;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_lock(&pFile->mut);
+
+ hdfsCloseFile(pFile->fs, pFile->fh);
+ pFile->fh = NULL;
+
+ if(pFile->nUsers > 1)
+ d_pthread_mutex_unlock(&pFile->mut);
+
+finalize_it:
+ RETiRet;
+}
+
+/* ---END FILE OBJECT---------------------------------------------------- */
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ pData->pFile = NULL;
+ENDcreateInstance
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ if(pData->pFile != NULL)
+ fileObjDestruct(&pData->pFile);
+ENDfreeInstance
+
+
+BEGINtryResume
+CODESTARTtryResume
+ fileClose(pData->pFile);
+ fileOpen(pData->pFile);
+ if(pData->pFile->fh == NULL){
+ dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n",
+ pData->pFile->name);
+ iRet = RS_RET_SUSPENDED;
+ }
+ENDtryResume
+
+BEGINdoAction
+CODESTARTdoAction
+ DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
+ iRet = fileWrite(pData->pFile, ppString[0]);
+ENDdoAction
+
+
+BEGINparseSelectorAct
+ file_t *pFile;
+ int r;
+ uchar *keybuf;
+CODESTARTparseSelectorAct
+
+ /* first check if this config line is actually for us */
+ if(strncmp((char*) p, ":omhdfs:", sizeof(":omhdfs:") - 1)) {
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+ }
+
+ /* ok, if we reach this point, we have something for us */
+ p += sizeof(":omhdfs:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
+ CHKiRet(createInstance(&pData));
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0,
+ (dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : dfltTplName));
+
+ if(fileName == NULL) {
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue");
+ ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED);
+ }
+
+ pFile = hashtable_search(files, fileName);
+ if(pFile == NULL) {
+ /* we need a new file object, this one not seen before */
+ CHKiRet(fileObjConstruct(&pFile));
+ CHKmalloc(pFile->name = fileName);
+ CHKmalloc(keybuf = ustrdup(fileName));
+ fileName = NULL; /* re-set, data passed to file object */
+ CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost));
+ pFile->hdfsPort = hdfsPort;
+ fileOpen(pFile);
+ if(pFile->fh == NULL){
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - "
+ "retrying later", pFile->name);
+ iRet = RS_RET_SUSPENDED;
+ }
+ r = hashtable_insert(files, keybuf, pFile);
+ if(r == 0)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ fileObjAddUser(pFile);
+ pData->pFile = pFile;
+
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINdoHUP
+ file_t *pFile;
+ struct hashtable_itr *itr;
+CODESTARTdoHUP
+ DBGPRINTF("omhdfs: HUP received (file count %d)\n", hashtable_count(files));
+ /* Iterator constructor only returns a valid iterator if
+ * the hashtable is not empty */
+ itr = hashtable_iterator(files);
+ if(hashtable_count(files) > 0)
+ {
+ do {
+ pFile = (file_t *) hashtable_iterator_value(itr);
+ fileClose(pFile);
+ DBGPRINTF("omhdfs: HUP, closing file %s\n", pFile->name);
+ } while (hashtable_iterator_advance(itr));
+ }
+ENDdoHUP
+
+
+/* Reset config variables for this module to default values.
+ * rgerhards, 2007-07-17
+ */
+static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
+{
+ hdfsHost = NULL;
+ hdfsPort = 0;
+ return RS_RET_OK;
+}
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ if(files != NULL)
+ hashtable_destroy(files, 1); /* 1 => free all values automatically */
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_doHUP
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string,
+ fileObjDestruct4Hashtable));
+
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL, eConfObjAction));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL, eConfObjAction));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL, eConfObjAction));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL, eConfObjAction));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjAction));
+CODEmodInit_QueryRegCFSLineHdlr
+ENDmodInit