diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-10-05 16:10:27 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-10-05 16:10:27 +0200 |
commit | c7e9e2123ee71fb868bbcff59225e009e21af590 (patch) | |
tree | 12ab52bb20cb34c9fcd36816e836a1959440d5fc /plugins | |
parent | 304ab88dd64b2ae46554fa5394aa8822194b1e95 (diff) | |
parent | b5afa1a5be2909ed13f77ef0e0804bab75325198 (diff) | |
download | rsyslog-c7e9e2123ee71fb868bbcff59225e009e21af590.tar.gz rsyslog-c7e9e2123ee71fb868bbcff59225e009e21af590.tar.xz rsyslog-c7e9e2123ee71fb868bbcff59225e009e21af590.zip |
Merge branch 'v5-devel'
Conflicts:
ChangeLog
configure.ac
doc/manual.html
plugins/imuxsock/imuxsock.c
runtime/rsyslog.h
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/impstats/Makefile.am | 6 | ||||
-rw-r--r-- | plugins/impstats/impstats.c | 228 | ||||
-rw-r--r-- | plugins/imuxsock/Makefile.am | 2 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 642 | ||||
-rw-r--r-- | plugins/omhdfs/Makefile.am | 6 | ||||
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 475 |
6 files changed, 1247 insertions, 112 deletions
diff --git a/plugins/impstats/Makefile.am b/plugins/impstats/Makefile.am new file mode 100644 index 00000000..187bbca4 --- /dev/null +++ b/plugins/impstats/Makefile.am @@ -0,0 +1,6 @@ +pkglib_LTLIBRARIES = impstats.la + +impstats_la_SOURCES = impstats.c +impstats_la_CPPFLAGS = $(RSRT_CFLAGS) -I$(top_srcdir) $(PTHREADS_CFLAGS) +impstats_la_LDFLAGS = -module -avoid-version +impstats_la_LIBADD = diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c new file mode 100644 index 00000000..b701b684 --- /dev/null +++ b/plugins/impstats/impstats.c @@ -0,0 +1,228 @@ +/* impstats.c + * A module to periodically output statistics gathered by rsyslog. + * + * NOTE: read comments in module-template.h to understand how this file + * works! + * + * File begun on 2007-07-20 by RGerhards (extracted from syslogd.c) + * This file is under development and has not yet arrived at being fully + * self-contained and a real object. So far, it is mostly an excerpt + * of the "old" message code without any modifications. However, it + * helps to have things at the right place one we go to the meat of it. + * + * Copyright 2010 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdlib.h> +#include <stdio.h> +#include <assert.h> +#include <signal.h> +#include <string.h> +#include <pthread.h> +#include "dirty.h" +#include "cfsysline.h" +#include "module-template.h" +#include "errmsg.h" +#include "msg.h" +#include "srUtils.h" +#include "unicode-helper.h" +#include "glbl.h" +#include "statsobj.h" +#include "prop.h" + +MODULE_TYPE_INPUT + +/* defines */ +#define DEFAULT_STATS_PERIOD (5 * 60) +#define DEFAULT_FACILITY 5 /* syslog */ +#define DEFAULT_SEVERITY 6 /* info */ + +/* Module static data */ +DEF_IMOD_STATIC_DATA +DEFobjCurrIf(glbl) +DEFobjCurrIf(prop) +DEFobjCurrIf(statsobj) +DEFobjCurrIf(errmsg) + +typedef struct configSettings_s { + int iStatsInterval; + int iFacility; + int iSeverity; +} configSettings_t; + +static configSettings_t cs; + +static prop_t *pInputName = NULL; +static prop_t *pLocalHostIP = NULL; + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +static inline void +initConfigSettings(void) +{ + cs.iStatsInterval = DEFAULT_STATS_PERIOD; + cs.iFacility = DEFAULT_FACILITY; + cs.iSeverity = DEFAULT_SEVERITY; +} + + +/* actually submit a message to the rsyslog core + */ +static inline rsRetVal +doSubmitMsg(uchar *line) +{ + msg_t *pMsg; + DEFiRet; + + CHKiRet(msgConstruct(&pMsg)); + MsgSetInputName(pMsg, pInputName); + MsgSetRawMsgWOSize(pMsg, (char*)line); + MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); + MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp()); + MsgSetRcvFromIP(pMsg, pLocalHostIP); + MsgSetMSGoffs(pMsg, 0); + MsgSetTAG(pMsg, UCHAR_CONSTANT("rsyslogd-pstats:"), sizeof("rsyslogd-pstats:") - 1); + pMsg->iFacility = cs.iFacility; + pMsg->iSeverity = cs.iSeverity; + pMsg->msgFlags = 0; + + submitMsg(pMsg); + +finalize_it: + RETiRet; + +} + + +/* callback for statsobj + * Note: usrptr exists only to satisfy requirements of statsobj callback interface! + */ +static rsRetVal +doStatsLine(void __attribute__((unused)) *usrptr, cstr_t *cstr) +{ + DEFiRet; + doSubmitMsg(rsCStrGetSzStrNoNULL(cstr)); + RETiRet; +} + + +/* the function to generate the actual statistics messages + * rgerhards, 2010-09-09 + */ +static inline void +generateStatsMsgs(void) +{ + statsobj.GetAllStatsLines(doStatsLine, NULL); +} + + +BEGINrunInput +CODESTARTrunInput + /* this is an endless loop - it is terminated when the thread is + * signalled to do so. This, however, is handled by the framework, + * right into the sleep below. + */ + while(1) { + srSleep(cs.iStatsInterval, 0); /* seconds, micro seconds */ + + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ + + generateStatsMsgs(); + } +ENDrunInput + + +BEGINwillRun + rsRetVal localRet; +CODESTARTwillRun + DBGPRINTF("impstats: stats interval %d seconds\n", cs.iStatsInterval); + if(cs.iStatsInterval == 0) + ABORT_FINALIZE(RS_RET_NO_RUN); + localRet = statsobj.EnableStats(); + if(localRet != RS_RET_OK) { + errmsg.LogError(0, localRet, "impstat: error enabling statistics gathering"); + ABORT_FINALIZE(RS_RET_NO_RUN); + } +finalize_it: +ENDwillRun + + +BEGINafterRun +CODESTARTafterRun +ENDafterRun + + +BEGINmodExit +CODESTARTmodExit + prop.Destruct(&pInputName); + prop.Destruct(&pLocalHostIP); + /* release objects we used */ + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + initConfigSettings(); + return RS_RET_OK; +} + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + DBGPRINTF("impstats version %s loading\n", VERSION); + initConfigSettings(); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatsinterval", 0, eCmdHdlrInt, NULL, &cs.iStatsInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + + CHKiRet(prop.Construct(&pInputName)); + CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("impstats"), sizeof("impstats") - 1)); + CHKiRet(prop.ConstructFinalize(pInputName)); + + CHKiRet(prop.Construct(&pLocalHostIP)); + CHKiRet(prop.SetString(pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); + CHKiRet(prop.ConstructFinalize(pLocalHostIP)); +ENDmodInit +/* vi:set ai: + */ diff --git a/plugins/imuxsock/Makefile.am b/plugins/imuxsock/Makefile.am index 28f9f9e3..34a0ad9a 100644 --- a/plugins/imuxsock/Makefile.am +++ b/plugins/imuxsock/Makefile.am @@ -1,6 +1,6 @@ pkglib_LTLIBRARIES = imuxsock.la imuxsock_la_SOURCES = imuxsock.c -imuxsock_la_CPPFLAGS = -DSD_EXPORT_SYMBOLS -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) +imuxsock_la_CPPFLAGS = -DSD_EXPORT_SYMBOLS -I../../runtime/hashtable -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) imuxsock_la_LDFLAGS = -module -avoid-version imuxsock_la_LIBADD = $(RSRT_LIBS) diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 73fe962f..be162a1b 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-20 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -29,12 +29,14 @@ #include "rsyslog.h" #include <stdlib.h> #include <stdio.h> +#include <ctype.h> #include <assert.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <sys/stat.h> #include <sys/un.h> +#include <sys/socket.h> #include "dirty.h" #include "cfsysline.h" #include "unicode-helper.h" @@ -48,11 +50,14 @@ #include "debug.h" #include "unlimited_select.h" #include "sd-daemon.h" +#include "statsobj.h" +#include "datetime.h" +#include "hashtable.h" MODULE_TYPE_INPUT /* defines */ -#define MAXFUNIX 20 +#define MAXFUNIX 50 #ifndef _PATH_LOG #ifdef BSD #define _PATH_LOG "/var/run/log" @@ -72,30 +77,156 @@ DEF_IMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(prop) +DEFobjCurrIf(datetime) +DEFobjCurrIf(statsobj) + +statsobj_t *modStats; +STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) +STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit) +STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters) + +struct rs_ratelimit_state { + unsigned short interval; + unsigned short burst; + unsigned done; + unsigned missed; + time_t begin; +}; +typedef struct rs_ratelimit_state rs_ratelimit_state_t; + + +/* a very simple "hash function" for process IDs - we simply use the + * pid itself: it is quite expected that all pids may log some time, but + * from a collision point of view it is likely that long-running daemons + * start early and so will stay right in the top spots of the + * collision list. + */ +static unsigned int +hash_from_key_fn(void *k) +{ + return((unsigned) *((pid_t*) k)); +} + +static int +key_equals_fn(void *key1, void *key2) +{ + return *((pid_t*) key1) == *((pid_t*) key2); +} + + +/* structure to describe a specific listener */ +typedef struct lstn_s { + uchar *sockName; /* read-only after startup */ + prop_t *hostName; /* host-name override - if set, use this instead of actual name */ + int fd; /* read-only after startup */ + int flags; /* should parser parse host name? read-only after startup */ + int flowCtl; /* flow control settings for this socket */ + int ratelimitInterval; + int ratelimitBurst; + intTiny ratelimitSev; /* severity level (and below) for which rate-limiting shall apply */ + struct hashtable *ht; /* our hashtable for rate-limiting */ + sbool bParseHost; /* should parser parse host name? read-only after startup */ + sbool bCreatePath; /* auto-creation of socket directory? */ + sbool bUseCreds; /* pull original creator credentials from socket */ + sbool bWritePid; /* write original PID into tag */ +} lstn_t; +static lstn_t listeners[MAXFUNIX]; static prop_t *pLocalHostIP = NULL; /* there is only one global IP for all internally-generated messages */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ -static int startIndexUxLocalSockets; /* process funix from that index on (used to +static int startIndexUxLocalSockets; /* process fd from that index on (used to * suppress local logging. rgerhards 2005-08-01 * read-only after startup */ -static int funixParseHost[MAXFUNIX] = { 0, }; /* should parser parse host name? read-only after startup */ -static int funixFlags[MAXFUNIX] = { IGNDATE, }; /* should parser parse host name? read-only after startup */ -static int funixCreateSockPath[MAXFUNIX] = { 0, }; /* auto-creation of socket directory? */ -static uchar *funixn[MAXFUNIX] = { (uchar*) _PATH_LOG }; /* read-only after startup */ -static prop_t *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */ -static int funixFlowCtl[MAXFUNIX] = { eFLOWCTL_NO_DELAY, }; /* flow control settings for this socket */ -static int funix[MAXFUNIX] = { -1, }; /* read-only after startup */ -static int nfunix = 1; /* number of Unix sockets open / read-only after startup */ +static int nfd = 1; /* number of Unix sockets open / read-only after startup */ +static int bSysSockFromSystemd = 0; /* Did we receive the system socket from systemd? */ /* config settings */ static int bOmitLocalLogging = 0; static uchar *pLogSockName = NULL; static uchar *pLogHostName = NULL; /* host name to use with this socket */ static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is used! */ -static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */ -#define DFLT_bCreateSockPath 0 -static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? */ +static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */ +static int bWritePid = 0; /* use credentials from recvmsg() and fixup PID in TAG */ +static int bWritePidSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */ +#define DFLT_bCreatePath 0 +static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */ +#define DFLT_ratelimitInterval 5 +static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */ +static int ratelimitIntervalSysSock = DFLT_ratelimitInterval; +#define DFLT_ratelimitBurst 200 +static int ratelimitBurst = DFLT_ratelimitBurst; /* max nbr of messages in interval */ +static int ratelimitBurstSysSock = DFLT_ratelimitBurst; /* max nbr of messages in interval */ +#define DFLT_ratelimitSeverity 1 /* do not rate-limit emergency messages */ +static int ratelimitSeverity = DFLT_ratelimitSeverity; +static int ratelimitSeveritySysSock = DFLT_ratelimitSeverity; + + + +static void +initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst) +{ + rs->interval = interval; + rs->burst = burst; + rs->done = 0; + rs->missed = 0; + rs->begin = 0; +} + + +/* ratelimiting support, modelled after the linux kernel + * returns 1 if message is within rate limit and shall be + * processed, 0 otherwise. + * This implementation is NOT THREAD-SAFE and must not + * be called concurrently. + */ +static inline int +withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid) +{ + int ret; + uchar msgbuf[1024]; + + if(rs->interval == 0) { + ret = 1; + goto finalize_it; + } + + assert(rs->burst != 0); + + if(rs->begin == 0) + rs->begin = tt; + + /* resume if we go out of out time window */ + if(tt > rs->begin + rs->interval) { + if(rs->missed) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "imuxsock lost %u messages from pid %lu due to rate-limiting", + rs->missed, (unsigned long) pid); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + rs->missed = 0; + } + rs->begin = 0; + rs->done = 0; + } + + /* do actual limit check */ + if(rs->burst > rs->done) { + rs->done++; + ret = 1; + } else { + if(rs->missed == 0) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "imuxsock begins to drop messages from pid %lu due to rate-limiting", + (unsigned long) pid); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + } + rs->missed++; + ret = 0; + } + +finalize_it: + return ret; +} /* set the timestamp ignore / not ignore option for the system @@ -105,7 +236,7 @@ static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? * static rsRetVal setSystemLogTimestampIgnore(void __attribute__((unused)) *pVal, int iNewVal) { DEFiRet; - funixFlags[0] = iNewVal ? IGNDATE : NOFLAG; + listeners[0].flags = iNewVal ? IGNDATE : NOFLAG; RETiRet; } @@ -114,7 +245,7 @@ static rsRetVal setSystemLogTimestampIgnore(void __attribute__((unused)) *pVal, static rsRetVal setSystemLogFlowControl(void __attribute__((unused)) *pVal, int iNewVal) { DEFiRet; - funixFlowCtl[0] = iNewVal ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; + listeners[0].flowCtl = iNewVal ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; RETiRet; } @@ -131,26 +262,40 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal) { DEFiRet; - if(nfunix < MAXFUNIX) { + if(nfd < MAXFUNIX) { if(*pNewVal == ':') { - funixParseHost[nfunix] = 1; + listeners[nfd].bParseHost = 1; } else { - funixParseHost[nfunix] = 0; + listeners[nfd].bParseHost = 0; } - CHKiRet(prop.Construct(&(funixHName[nfunix]))); + CHKiRet(prop.Construct(&(listeners[nfd].hostName))); if(pLogHostName == NULL) { - CHKiRet(prop.SetString(funixHName[nfunix], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()))); + CHKiRet(prop.SetString(listeners[nfd].hostName, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()))); } else { - CHKiRet(prop.SetString(funixHName[nfunix], pLogHostName, ustrlen(pLogHostName))); + CHKiRet(prop.SetString(listeners[nfd].hostName, pLogHostName, ustrlen(pLogHostName))); /* reset hostname for next socket */ free(pLogHostName); pLogHostName = NULL; } - CHKiRet(prop.ConstructFinalize(funixHName[nfunix])); - funixFlowCtl[nfunix] = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; - funixFlags[nfunix] = bIgnoreTimestamp ? IGNDATE : NOFLAG; - funixCreateSockPath[nfunix] = bCreateSockPath; - funixn[nfunix++] = pNewVal; + CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName)); + if(ratelimitInterval > 0) { + if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { + /* in this case, we simply turn of rate-limiting */ + dbgprintf("imuxsock: turning off rate limiting because we could not " + "create hash table\n"); + ratelimitInterval = 0; + } + } + listeners[nfd].ratelimitInterval = ratelimitInterval; + listeners[nfd].ratelimitBurst = ratelimitBurst; + listeners[nfd].ratelimitSev = ratelimitSeverity; + listeners[nfd].flowCtl = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; + listeners[nfd].flags = bIgnoreTimestamp ? IGNDATE : NOFLAG; + listeners[nfd].bCreatePath = bCreatePath; + listeners[nfd].sockName = pNewVal; + listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval) ? 1 : 0; + listeners[nfd].bWritePid = bWritePid; + nfd++; } else { errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n", pNewVal); @@ -161,21 +306,23 @@ finalize_it: } -/* free the funixn[] socket names - needed as cleanup on several places - * note that nfunix is NOT reset! funixn[0] is never freed, as it comes from +/* discard all log sockets except for "socket" 0. Data for it comes from * the constant memory pool - and if not, it is freeed via some other pointer. */ -static rsRetVal discardFunixn(void) +static rsRetVal discardLogSockets(void) { int i; - for (i = 1; i < nfunix; i++) { - if(funixn[i] != NULL) { - free(funixn[i]); - funixn[i] = NULL; + for (i = 1; i < nfd; i++) { + if(listeners[i].sockName != NULL) { + free(listeners[i].sockName); + listeners[i].sockName = NULL; } - if(funixHName[i] != NULL) { - prop.Destruct(&(funixHName[i])); + if(listeners[i].hostName != NULL) { + prop.Destruct(&(listeners[i].hostName)); + } + if(listeners[i].ht != NULL) { + hashtable_destroy(listeners[i].ht, 1); /* 1 => free all values automatically */ } } @@ -183,92 +330,256 @@ static rsRetVal discardFunixn(void) } -static int create_unix_socket(const char *path, int bCreatePath) +/* used to create a log socket if NOT passed in via systemd. + */ +static inline rsRetVal +createLogSocket(lstn_t *pLstn) { struct sockaddr_un sunx; - int fd; + DEFiRet; - if (path[0] == '\0') + unlink((char*)pLstn->sockName); + memset(&sunx, 0, sizeof(sunx)); + sunx.sun_family = AF_UNIX; + if(pLstn->bCreatePath) { + makeFileParentDirs((uchar*)pLstn->sockName, ustrlen(pLstn->sockName), 0755, -1, -1, 0); + } + strncpy(sunx.sun_path, (char*)pLstn->sockName, sizeof(sunx.sun_path)); + pLstn->fd = socket(AF_UNIX, SOCK_DGRAM, 0); + if(pLstn->fd < 0 || bind(pLstn->fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 || + chmod((char*)pLstn->sockName, 0666) < 0) { + errmsg.LogError(errno, NO_ERRCODE, "connot create '%s'", pLstn->sockName); + dbgprintf("cannot create %s (%d).\n", pLstn->sockName, errno); + close(pLstn->fd); + pLstn->fd = -1; + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); + } +finalize_it: + RETiRet; +} + + +static inline rsRetVal +openLogSocket(lstn_t *pLstn) +{ + DEFiRet; + int one; + + if(pLstn->sockName[0] == '\0') return -1; - if (strcmp(path, _PATH_LOG) == 0) { + if (ustrcmp(pLstn->sockName, UCHAR_CONSTANT(_PATH_LOG)) == 0) { + bSysSockFromSystemd = 0; /* set default */ int r; - /* Check whether an FD was passed in from systemd. If + /* System log socket code. Check whether an FD was passed in from systemd. If * so, it's the /dev/log socket, so use it. */ r = sd_listen_fds(0); if (r < 0) { errmsg.LogError(-r, NO_ERRCODE, "Failed to acquire systemd socket"); - return -1; + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } if (r > 1) { errmsg.LogError(EINVAL, NO_ERRCODE, "Wrong number of systemd sockets passed"); - return -1; + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } if (r == 1) { - fd = SD_LISTEN_FDS_START; - r = sd_is_socket_unix(fd, SOCK_DGRAM, -1, _PATH_LOG, 0); + pLstn->fd = SD_LISTEN_FDS_START; + r = sd_is_socket_unix(pLstn->fd, SOCK_DGRAM, -1, _PATH_LOG, 0); if (r < 0) { errmsg.LogError(-r, NO_ERRCODE, "Failed to verify systemd socket type"); - return -1; + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } if (!r) { errmsg.LogError(EINVAL, NO_ERRCODE, "Passed systemd socket of wrong type"); - return -1; + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } + bSysSockFromSystemd = 1; /* indicate we got the socket from systemd */ + } else { + CHKiRet(createLogSocket(pLstn)); + } + } else { + CHKiRet(createLogSocket(pLstn)); + } - return fd; - } - } - - unlink(path); + if(pLstn->bUseCreds) { + one = 1; + if(setsockopt(pLstn->fd, SOL_SOCKET, SO_PASSCRED, &one, (socklen_t) sizeof(one)) != 0) { + errmsg.LogError(errno, NO_ERRCODE, "set SO_PASSCRED failed on '%s'", pLstn->sockName); + pLstn->bUseCreds = 0; + } + if(setsockopt(pLstn->fd, SOL_SOCKET, SCM_CREDENTIALS, &one, sizeof(one)) != 0) { + errmsg.LogError(errno, NO_ERRCODE, "set SCM_CREDENTIALS failed on '%s'", pLstn->sockName); + pLstn->bUseCreds = 0; + } + } - memset(&sunx, 0, sizeof(sunx)); - sunx.sun_family = AF_UNIX; - if(bCreatePath) { - makeFileParentDirs((uchar*)path, strlen(path), 0755, -1, -1, 0); +finalize_it: + if(iRet != RS_RET_OK) { + close(pLstn->fd); + pLstn->fd = -1; } - (void) strncpy(sunx.sun_path, path, sizeof(sunx.sun_path)); - fd = socket(AF_UNIX, SOCK_DGRAM, 0); - if (fd < 0 || bind(fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 || - chmod(path, 0666) < 0) { - errmsg.LogError(errno, NO_ERRCODE, "connot create '%s'", path); - dbgprintf("cannot create %s (%d).\n", path, errno); - close(fd); - return -1; + + RETiRet; +} + + +/* find ratelimiter to use for this message. Currently, we use the + * pid, but may change to cgroup later (probably via a config switch). + * Returns NULL if not found. + */ +static inline rsRetVal +findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) +{ + rs_ratelimit_state_t *rl; + int r; + pid_t *keybuf; + DEFiRet; + + if(cred == NULL) + FINALIZE; + + rl = hashtable_search(pLstn->ht, &cred->pid); + if(rl == NULL) { + /* we need to add a new ratelimiter, process not seen before! */ + dbgprintf("imuxsock: no ratelimiter for pid %lu, creating one\n", + (unsigned long) cred->pid); + STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters); + CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t))); + CHKmalloc(keybuf = malloc(sizeof(pid_t))); + *keybuf = cred->pid; + initRatelimitState(rl, ratelimitInterval, pLstn->ratelimitBurst); + r = hashtable_insert(pLstn->ht, keybuf, rl); + if(r == 0) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } - return fd; + + *prl = rl; + +finalize_it: + RETiRet; +} + + +/* patch correct pid into tag. bufTAG MUST be CONF_TAG_MAXSIZE long! + */ +static inline void +fixPID(uchar *bufTAG, int *lenTag, struct ucred *cred) +{ + int i; + char bufPID[16]; + int lenPID; + + if(cred == NULL) + return; + + lenPID = snprintf(bufPID, sizeof(bufPID), "[%lu]:", (unsigned long) cred->pid); + + for(i = *lenTag ; i >= 0 && bufTAG[i] != '[' ; --i) + /*JUST SKIP*/; + + if(i < 0) + i = *lenTag - 1; /* go right at end of TAG, pid was not present (-1 for ':') */ + + if(i + lenPID > CONF_TAG_MAXSIZE) + return; /* do not touch, as things would break */ + + memcpy(bufTAG + i, bufPID, lenPID); + *lenTag = i + lenPID; } /* submit received message to the queue engine + * We now parse the message according to expected format so that we + * can also mangle it if necessary. */ static inline rsRetVal -SubmitMsg(uchar *pRcv, int lenRcv, int iSock) +SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) { msg_t *pMsg; + int lenMsg; + int i; + uchar *parse; + int pri; + int facil; + int sever; + uchar bufParseTAG[CONF_TAG_MAXSIZE]; + struct syslogTime st; + time_t tt; + rs_ratelimit_state_t *ratelimiter = NULL; DEFiRet; +// TODO: handle format errors?? + /* we need to parse the pri first, because we need the severity for + * rate-limiting as well. + */ + parse = pRcv; + lenMsg = lenRcv; + + parse++; lenMsg--; /* '<' */ + pri = 0; + while(lenMsg && isdigit(*parse)) { + pri = pri * 10 + *parse - '0'; + ++parse; + --lenMsg; + } + facil = LOG_FAC(pri); + sever = LOG_PRI(pri); + + if(sever >= pLstn->ratelimitSev) + findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */ + + datetime.getCurrTime(&st, &tt); + if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) { + STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit); + FINALIZE; + } + /* we now create our own message object and submit it to the queue */ - CHKiRet(msgConstruct(&pMsg)); + CHKiRet(msgConstructWithTime(&pMsg, &st, tt)); MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv); MsgSetInputName(pMsg, pInputName); - MsgSetFlowControlType(pMsg, funixFlowCtl[iSock]); + MsgSetFlowControlType(pMsg, pLstn->flowCtl); + + pMsg->iFacility = facil; + pMsg->iSeverity = sever; + MsgSetAfterPRIOffs(pMsg, lenRcv - lenMsg); + + parse++; lenMsg--; /* '>' */ + + if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { + DBGPRINTF("we have a problem, invalid timestamp in msg!\n"); + } + + /* pull tag */ + + i = 0; + while(lenMsg > 0 && *parse != ' ' && i < CONF_TAG_MAXSIZE) { + bufParseTAG[i++] = *parse++; + --lenMsg; + } + bufParseTAG[i] = '\0'; /* terminate string */ + if(pLstn->bWritePid) + fixPID(bufParseTAG, &i, cred); + MsgSetTAG(pMsg, bufParseTAG, i); - if(funixParseHost[iSock]) { - pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING | PARSE_HOSTNAME; + MsgSetMSGoffs(pMsg, lenRcv - lenMsg); + + if(pLstn->bParseHost) { + pMsg->msgFlags = pLstn->flags | PARSE_HOSTNAME; } else { - pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING; + pMsg->msgFlags = pLstn->flags; } - MsgSetRcvFrom(pMsg, funixHName[iSock]); + MsgSetRcvFrom(pMsg, pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); CHKiRet(submitMsg(pMsg)); + STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: RETiRet; } @@ -281,15 +592,20 @@ finalize_it: * of the socket which is to be processed. This eases access to the * growing number of properties. -- rgerhards, 2008-08-01 */ -static rsRetVal readSocket(int fd, int iSock) +static rsRetVal readSocket(lstn_t *pLstn) { DEFiRet; int iRcvd; int iMaxLine; + struct msghdr msgh; + struct iovec msgiov; + struct cmsghdr *cm; + struct ucred *cred; uchar bufRcv[4096+1]; + char aux[128]; uchar *pRcv = NULL; /* receive buffer */ - assert(iSock >= 0); + assert(pLstn->fd >= 0); iMaxLine = glbl.GetMaxLine(); @@ -305,15 +621,40 @@ static rsRetVal readSocket(int fd, int iSock) CHKmalloc(pRcv = (uchar*) MALLOC(sizeof(uchar) * (iMaxLine + 1))); } - iRcvd = recv(fd, pRcv, iMaxLine, 0); - dbgprintf("Message from UNIX socket: #%d\n", fd); - if (iRcvd > 0) { - CHKiRet(SubmitMsg(pRcv, iRcvd, iSock)); - } else if (iRcvd < 0 && errno != EINTR) { + memset(&msgh, 0, sizeof(msgh)); + memset(&msgiov, 0, sizeof(msgiov)); + if(pLstn->bUseCreds) { + memset(&aux, 0, sizeof(aux)); + msgh.msg_control = aux; + msgh.msg_controllen = sizeof(aux); + } + msgiov.iov_base = pRcv; + msgiov.iov_len = iMaxLine; + msgh.msg_iov = &msgiov; + msgh.msg_iovlen = 1; + iRcvd = recvmsg(pLstn->fd, &msgh, MSG_DONTWAIT); + + dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd); + if(iRcvd > 0) { + cred = NULL; + if(pLstn->bUseCreds) { + dbgprintf("XXX: pre CM loop, length of control message %d\n", (int) msgh.msg_controllen); + for (cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) { + dbgprintf("XXX: in CM loop, %d, %d\n", cm->cmsg_level, cm->cmsg_type); + if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) { + cred = (struct ucred*) CMSG_DATA(cm); + dbgprintf("XXX: got credentials pid %d\n", (int) cred->pid); + //break; + } + } + dbgprintf("XXX: post CM loop\n"); + } + CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred)); + } else if(iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr); - errmsg.LogError(errno, NO_ERRCODE, "recvfrom UNIX"); + errmsg.LogError(errno, NO_ERRCODE, "imuxsock: recvfrom UNIX"); } finalize_it: @@ -352,10 +693,11 @@ CODESTARTrunInput maxfds = 0; FD_ZERO (pReadfds); /* Copy master connections */ - for (i = startIndexUxLocalSockets; i < nfunix; i++) { - if (funix[i] != -1) { - FD_SET(funix[i], pReadfds); - if (funix[i]>maxfds) maxfds=funix[i]; + for (i = startIndexUxLocalSockets; i < nfd; i++) { + if (listeners[i].fd!= -1) { + FD_SET(listeners[i].fd, pReadfds); + if(listeners[i].fd > maxfds) + maxfds=listeners[i].fd; } } @@ -372,11 +714,11 @@ CODESTARTrunInput if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ - for (i = 0; i < nfunix && nfds > 0; i++) { + for (i = 0; i < nfd && nfds > 0; i++) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); /* terminate input! */ - if ((fd = funix[i]) != -1 && FD_ISSET(fd, pReadfds)) { - readSocket(fd, i); + if ((fd = listeners[i].fd) != -1 && FD_ISSET(fd, pReadfds)) { + readSocket(&(listeners[i])); --nfds; /* indicate we have processed one */ } } @@ -391,6 +733,7 @@ ENDrunInput BEGINwillRun CODESTARTwillRun register int i; + int actSocks; /* first apply some config settings */ # ifdef OS_SOLARIS @@ -404,12 +747,33 @@ CODESTARTwillRun startIndexUxLocalSockets = bOmitLocalLogging ? 1 : 0; # endif if(pLogSockName != NULL) - funixn[0] = pLogSockName; + listeners[0].sockName = pLogSockName; + if(ratelimitIntervalSysSock > 0) { + if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { + /* in this case, we simply turn of rate-limiting */ + dbgprintf("imuxsock: turning off rate limiting because we could not " + "create hash table\n"); + ratelimitIntervalSysSock = 0; + } + } + listeners[0].ratelimitInterval = ratelimitIntervalSysSock; + listeners[0].ratelimitBurst = ratelimitBurstSysSock; + listeners[0].ratelimitSev = ratelimitSeveritySysSock; + listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0; + listeners[0].bWritePid = bWritePidSysSock; /* initialize and return if will run or not */ - for (i = startIndexUxLocalSockets ; i < nfunix ; i++) { - if ((funix[i] = create_unix_socket((char*) funixn[i], funixCreateSockPath[i])) != -1) - dbgprintf("Opened UNIX socket '%s' (fd %d).\n", funixn[i], funix[i]); + actSocks = 0; + for (i = startIndexUxLocalSockets ; i < nfd ; i++) { + if(openLogSocket(&(listeners[i])) == RS_RET_OK) { + ++actSocks; + dbgprintf("Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd); + } + } + + if(actSocks == 0) { + errmsg.LogError(0, NO_ERRCODE, "imuxsock does not run because we could not aquire any socket\n"); + ABORT_FINALIZE(RS_RET_ERR); } /* we need to create the inputName property (only once during our lifetime) */ @@ -426,36 +790,44 @@ CODESTARTafterRun int i; /* do cleanup here */ /* Close the UNIX sockets. */ - for (i = 0; i < nfunix; i++) - if (funix[i] != -1) - close(funix[i]); + for (i = 0; i < nfd; i++) + if (listeners[i].fd != -1) + close(listeners[i].fd); /* Clean-up files. If systemd passed us a socket it is * systemd's job to clean it up.*/ - if (sd_listen_fds(0) > 0) + if(bSysSockFromSystemd) { + DBGPRINTF("imuxsock: got system socket from systemd, not unlinking it\n"); i = 1; - else + } else i = startIndexUxLocalSockets; - for(; i < nfunix; i++) - if (funixn[i] && funix[i] != -1) - unlink((char*) funixn[i]); + for(; i < nfd; i++) + if (listeners[i].sockName && listeners[i].fd != -1) { + DBGPRINTF("imuxsock: unlinking unix socket file[%d] %s\n", i, listeners[i].sockName); + unlink((char*) listeners[i].sockName); + } /* free no longer needed string */ free(pLogSockName); free(pLogHostName); - discardFunixn(); - nfunix = 1; + discardLogSockets(); + nfd = 1; if(pInputName != NULL) prop.Destruct(&pInputName); + ENDafterRun BEGINmodExit CODESTARTmodExit + statsobj.Destruct(&modStats); + objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); ENDmodExit @@ -484,11 +856,19 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a pLogHostName = NULL; } - discardFunixn(); - nfunix = 1; + discardLogSockets(); + nfd = 1; bIgnoreTimestamp = 1; bUseFlowCtl = 0; - bCreateSockPath = DFLT_bCreateSockPath; + bWritePid = 0; + bWritePidSysSock = 0; + bCreatePath = DFLT_bCreatePath; + ratelimitInterval = DFLT_ratelimitInterval; + ratelimitIntervalSysSock = DFLT_ratelimitInterval; + ratelimitBurst = DFLT_ratelimitBurst; + ratelimitBurstSysSock = DFLT_ratelimitBurst; + ratelimitSeverity = DFLT_ratelimitSeverity; + ratelimitSeveritySysSock = DFLT_ratelimitSeverity; return RS_RET_OK; } @@ -502,13 +882,25 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION); - /* initialize funixn[] array */ + /* init system log socket settings */ + listeners[0].flags = IGNDATE; + listeners[0].sockName = UCHAR_CONSTANT(_PATH_LOG); + listeners[0].hostName = NULL; + listeners[0].flowCtl = eFLOWCTL_NO_DELAY; + listeners[0].fd = -1; + listeners[0].bParseHost = 0; + listeners[0].bUseCreds = 0; + listeners[0].bCreatePath = 0; + + /* initialize socket names */ for(i = 1 ; i < MAXFUNIX ; ++i) { - funixn[i] = NULL; - funix[i] = -1; + listeners[i].sockName = NULL; + listeners[i].fd = -1; } CHKiRet(prop.Construct(&pLocalHostIP)); @@ -516,9 +908,9 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(prop.ConstructFinalize(pLocalHostIP)); /* now init listen socket zero, the local log socket */ - CHKiRet(prop.Construct(&(funixHName[0]))); - CHKiRet(prop.SetString(funixHName[0], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()))); - CHKiRet(prop.ConstructFinalize(funixHName[0])); + CHKiRet(prop.Construct(&(listeners[0].hostName))); + CHKiRet(prop.SetString(listeners[0].hostName, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()))); + CHKiRet(prop.ConstructFinalize(listeners[0].hostName)); /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omitlocallogging", 0, eCmdHdlrBinary, @@ -532,9 +924,17 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary, NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary, - NULL, &bCreateSockPath, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + NULL, &bCreatePath, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord, addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary, + NULL, &bWritePid, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt, + NULL, &ratelimitInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitburst", 0, eCmdHdlrInt, + NULL, &ratelimitBurst, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitseverity", 0, eCmdHdlrInt, + NULL, &ratelimitSeverity, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); /* the following one is a (dirty) trick: the system log socket is not added via @@ -547,6 +947,26 @@ CODEmodInit_QueryRegCFSLineHdlr setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary, setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, + NULL, &bWritePidSysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt, + NULL, &ratelimitIntervalSysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitburst", 0, eCmdHdlrInt, + NULL, &ratelimitBurstSysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitseverity", 0, eCmdHdlrInt, + NULL, &ratelimitSeveritySysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&modStats)); + CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imuxsock"))); + CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &ctrSubmit)); + CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("ratelimit.discarded"), + ctrType_IntCtr, &ctrLostRatelimit)); + CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("ratelimit.numratelimiters"), + ctrType_IntCtr, &ctrNumRatelimiters)); + CHKiRet(statsobj.ConstructFinalize(modStats)); + ENDmodInit /* vim:set ai: */ diff --git a/plugins/omhdfs/Makefile.am b/plugins/omhdfs/Makefile.am new file mode 100644 index 00000000..95e6b102 --- /dev/null +++ b/plugins/omhdfs/Makefile.am @@ -0,0 +1,6 @@ +pkglib_LTLIBRARIES = omhdfs.la + +omhdfs_la_SOURCES = omhdfs.c +omhdfs_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) $(JAVA_INCLUDES) +omhdfs_la_LDFLAGS = -module -avoid-version -lhdfs $(JAVA_LIBS) +omhdfs_la_LIBADD = $(RSRT_LIBS) diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c new file mode 100644 index 00000000..25c330f9 --- /dev/null +++ b/plugins/omhdfs/omhdfs.c @@ -0,0 +1,475 @@ +/* omhdfs.c + * This is an output module to support Hadoop's HDFS. + * + * NOTE: read comments in module-template.h to understand how this file + * works! + * + * Copyright 2010 Rainer Gerhards and Adiscon GmbH. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ + +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <assert.h> +#include <errno.h> +#include <ctype.h> +#include <unistd.h> +#include <sys/file.h> +#include <pthread.h> +#include <hdfs.h> + +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "conf.h" +#include "cfsysline.h" +#include "module-template.h" +#include "unicode-helper.h" +#include "errmsg.h" +#include "hashtable.h" +#include "hashtable_itr.h" + +MODULE_TYPE_OUTPUT + +/* internal structures + */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +/* global data */ +static struct hashtable *files; /* holds all file objects that we know */ + +/* globals for default values */ +static uchar *fileName = NULL; +static uchar *hdfsHost = NULL; +static uchar *dfltTplName = NULL; /* default template name to use */ +int hdfsPort = 0; +/* end globals for default values */ + +typedef struct { + uchar *name; + hdfsFS fs; + hdfsFile fh; + const char *hdfsHost; + tPort hdfsPort; + int nUsers; + pthread_mutex_t mut; +} file_t; + + +typedef struct _instanceData { + file_t *pFile; +} instanceData; + +/* forward definitions (down here, need data types) */ +static inline rsRetVal fileClose(file_t *pFile); + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo + printf("omhdfs: file:%s", pData->pFile->name); +ENDdbgPrintInstInfo + + +/* note that hdfsFileExists() does not work, so we did our + * own function to see if a pathname exists. Returns 0 if the + * file does not exists, something else otherwise. Note that + * we can also check a directroy (if that matters...) + */ +static int +HDFSFileExists(hdfsFS fs, uchar *name) +{ + int r; + hdfsFileInfo *info; + + info = hdfsGetPathInfo(fs, (char*)name); + /* if things go wrong, we assume it is because the file + * does not exist. We do not get too much information... + */ + if(info == NULL) { + r = 0; + } else { + r = 1; + hdfsFreeFileInfo(info, 1); + } + return r; +} + +static inline rsRetVal +HDFSmkdir(hdfsFS fs, uchar *name) +{ + DEFiRet; + if(hdfsCreateDirectory(fs, (char*)name) == -1) + ABORT_FINALIZE(RS_RET_ERR); + +finalize_it: + RETiRet; +} + + +/* ---BEGIN FILE OBJECT---------------------------------------------------- */ +/* This code handles the "file object". This is split from the actual + * instance data, because several instances may write into the same file. + * If so, we need to use a single object, and also synchronize their writes. + * So we keep the file object separately, and just stick a reference into + * the instance data. + */ + +static inline rsRetVal +fileObjConstruct(file_t **ppFile) +{ + file_t *pFile; + DEFiRet; + + CHKmalloc(pFile = malloc(sizeof(file_t))); + pFile->name = NULL; + pFile->hdfsHost = NULL; + pFile->fh = NULL; + pFile->nUsers = 0; + + *ppFile = pFile; +finalize_it: + RETiRet; +} + +static inline void +fileObjAddUser(file_t *pFile) +{ + /* init mutex only when second user is added */ + ++pFile->nUsers; + if(pFile->nUsers == 2) + pthread_mutex_init(&pFile->mut, NULL); + DBGPRINTF("omhdfs: file %s now being used by %d actions\n", pFile->name, pFile->nUsers); +} + +static rsRetVal +fileObjDestruct(file_t **ppFile) +{ + file_t *pFile = *ppFile; + if(pFile->nUsers > 1) + pthread_mutex_destroy(&pFile->mut); + fileClose(pFile); + free(pFile->name); + free((char*)pFile->hdfsHost); + free(pFile->fh); + + return RS_RET_OK; +} + + +/* check, and potentially create, all names inside a path */ +static rsRetVal +filePrepare(file_t *pFile) +{ + uchar *p; + uchar *pszWork; + size_t len; + DEFiRet; + + if(HDFSFileExists(pFile->fs, pFile->name)) + FINALIZE; + + /* file does not exist, create it (and eventually parent directories */ + if(1) { // check if bCreateDirs + len = ustrlen(pFile->name) + 1; + CHKmalloc(pszWork = MALLOC(sizeof(uchar) * len)); + memcpy(pszWork, pFile->name, len); + for(p = pszWork+1 ; *p ; p++) + if(*p == '/') { + /* temporarily terminate string, create dir and go on */ + *p = '\0'; + if(!HDFSFileExists(pFile->fs, pszWork)) { + CHKiRet(HDFSmkdir(pFile->fs, pszWork)); + } + *p = '/'; + } + free(pszWork); + return 0; + } + +finalize_it: + RETiRet; +} + + +/* this function is to be used as destructor for the + * hash table code. + */ +static void +fileObjDestruct4Hashtable(void *ptr) +{ + file_t *pFile = (file_t*) ptr; + fileObjDestruct(&pFile); +} + + +static inline rsRetVal +fileOpen(file_t *pFile) +{ + DEFiRet; + + assert(pFile->fh == NULL); + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + + DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n", + pFile->hdfsHost, pFile->hdfsPort); + pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort); + if(pFile->fs == NULL) { + DBGPRINTF("omhdfs: error can not connect to hdfs\n"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + + CHKiRet(filePrepare(pFile)); + + pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_APPEND, 0, 0, 0); + if(pFile->fh == NULL) { + /* maybe the file does not exist, so we try to create it now. + * Note that we can not use hdfsExists() because of a deficit in + * it: https://issues.apache.org/jira/browse/HDFS-1154 + * As of my testing, libhdfs at least seems to return ENOENT if + * the file does not exist. + */ + if(errno == ENOENT) { + DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n", + pFile->name); + pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); + } + } + if(pFile->fh == NULL) { + DBGPRINTF("omhdfs: failed to open %s for writing!\n", pFile->name); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); + RETiRet; +} + + +static inline rsRetVal +fileWrite(file_t *pFile, uchar *buf) +{ + size_t lenWrite; + DEFiRet; + + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + + /* open file if not open. This must be done *here* and while mutex-protected + * because of HUP handling (which is async to normal processing!). + */ + if(pFile->fh == NULL) { + fileOpen(pFile); + if(pFile->fh == NULL) { + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + } + + lenWrite = strlen((char*) buf); + tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite); + if((unsigned) num_written_bytes != lenWrite) { + errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, " + "written %lu\n", pFile->name, (unsigned long) lenWrite, + (unsigned long) num_written_bytes); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); + RETiRet; +} + + +static inline rsRetVal +fileClose(file_t *pFile) +{ + DEFiRet; + + if(pFile->fh == NULL) + FINALIZE; + + if(pFile->nUsers > 1) + d_pthread_mutex_lock(&pFile->mut); + + hdfsCloseFile(pFile->fs, pFile->fh); + pFile->fh = NULL; + + if(pFile->nUsers > 1) + d_pthread_mutex_unlock(&pFile->mut); + +finalize_it: + RETiRet; +} + +/* ---END FILE OBJECT---------------------------------------------------- */ + + +BEGINcreateInstance +CODESTARTcreateInstance + pData->pFile = NULL; +ENDcreateInstance + + +BEGINfreeInstance +CODESTARTfreeInstance + if(pData->pFile != NULL) + fileObjDestruct(&pData->pFile); +ENDfreeInstance + + +BEGINtryResume +CODESTARTtryResume + fileClose(pData->pFile); + fileOpen(pData->pFile); + if(pData->pFile->fh == NULL){ + dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n", + pData->pFile->name); + iRet = RS_RET_SUSPENDED; + } +ENDtryResume + +BEGINdoAction +CODESTARTdoAction + DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name); + iRet = fileWrite(pData->pFile, ppString[0]); +ENDdoAction + + +BEGINparseSelectorAct + file_t *pFile; + int r; + uchar *keybuf; +CODESTARTparseSelectorAct + + /* first check if this config line is actually for us */ + if(strncmp((char*) p, ":omhdfs:", sizeof(":omhdfs:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + + /* ok, if we reach this point, we have something for us */ + p += sizeof(":omhdfs:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + CODE_STD_STRING_REQUESTparseSelectorAct(1) + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, 0, + (dfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : dfltTplName)); + + if(fileName == NULL) { + errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: no file name specified, can not continue"); + ABORT_FINALIZE(RS_RET_FILE_NOT_SPECIFIED); + } + + pFile = hashtable_search(files, fileName); + if(pFile == NULL) { + /* we need a new file object, this one not seen before */ + CHKiRet(fileObjConstruct(&pFile)); + CHKmalloc(pFile->name = fileName); + CHKmalloc(keybuf = ustrdup(fileName)); + fileName = NULL; /* re-set, data passed to file object */ + CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost)); + pFile->hdfsPort = hdfsPort; + fileOpen(pFile); + if(pFile->fh == NULL){ + errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - " + "retrying later", pFile->name); + iRet = RS_RET_SUSPENDED; + } + r = hashtable_insert(files, keybuf, pFile); + if(r == 0) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + fileObjAddUser(pFile); + pData->pFile = pFile; + +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + + +BEGINdoHUP + file_t *pFile; + struct hashtable_itr *itr; +CODESTARTdoHUP + DBGPRINTF("omhdfs: HUP received (file count %d)\n", hashtable_count(files)); + /* Iterator constructor only returns a valid iterator if + * the hashtable is not empty */ + itr = hashtable_iterator(files); + if(hashtable_count(files) > 0) + { + do { + pFile = (file_t *) hashtable_iterator_value(itr); + fileClose(pFile); + DBGPRINTF("omhdfs: HUP, closing file %s\n", pFile->name); + } while (hashtable_iterator_advance(itr)); + } +ENDdoHUP + + +/* Reset config variables for this module to default values. + * rgerhards, 2007-07-17 + */ +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + hdfsHost = NULL; + hdfsPort = 0; + return RS_RET_OK; +} + + +BEGINmodExit +CODESTARTmodExit + objRelease(errmsg, CORE_COMPONENT); + if(files != NULL) + hashtable_destroy(files, 1); /* 1 => free all values automatically */ +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_doHUP +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string, + fileObjDestruct4Hashtable)); + + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &fileName, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &hdfsHost, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL, eConfObjAction)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjAction)); +CODEmodInit_QueryRegCFSLineHdlr +ENDmodInit |