summaryrefslogtreecommitdiffstats
path: root/plugins/imuxsock/imuxsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imuxsock/imuxsock.c')
-rw-r--r--plugins/imuxsock/imuxsock.c685
1 files changed, 585 insertions, 100 deletions
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 046f12f0..7ee413e7 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-12-20 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -29,12 +29,14 @@
#include "rsyslog.h"
#include <stdlib.h>
#include <stdio.h>
+#include <ctype.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/un.h>
+#include <sys/socket.h>
#include "dirty.h"
#include "cfsysline.h"
#include "unicode-helper.h"
@@ -44,14 +46,19 @@
#include "net.h"
#include "glbl.h"
#include "msg.h"
+#include "parser.h"
#include "prop.h"
#include "debug.h"
#include "unlimited_select.h"
+#include "sd-daemon.h"
+#include "statsobj.h"
+#include "datetime.h"
+#include "hashtable.h"
MODULE_TYPE_INPUT
/* defines */
-#define MAXFUNIX 20
+#define MAXFUNIX 50
#ifndef _PATH_LOG
#ifdef BSD
#define _PATH_LOG "/var/run/log"
@@ -60,6 +67,10 @@ MODULE_TYPE_INPUT
#endif
#endif
+/* emulate struct ucred for platforms that do not have it */
+#ifndef HAVE_SCM_CREDENTIALS
+struct ucred { int pid; };
+#endif
/* handle some defines missing on more than one platform */
#ifndef SUN_LEN
@@ -71,30 +82,157 @@ DEF_IMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(prop)
+DEFobjCurrIf(parser)
+DEFobjCurrIf(datetime)
+DEFobjCurrIf(statsobj)
+
+statsobj_t *modStats;
+STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit)
+STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters)
+
+struct rs_ratelimit_state {
+ unsigned short interval;
+ unsigned short burst;
+ unsigned done;
+ unsigned missed;
+ time_t begin;
+};
+typedef struct rs_ratelimit_state rs_ratelimit_state_t;
+
+
+/* a very simple "hash function" for process IDs - we simply use the
+ * pid itself: it is quite expected that all pids may log some time, but
+ * from a collision point of view it is likely that long-running daemons
+ * start early and so will stay right in the top spots of the
+ * collision list.
+ */
+static unsigned int
+hash_from_key_fn(void *k)
+{
+ return((unsigned) *((pid_t*) k));
+}
+
+static int
+key_equals_fn(void *key1, void *key2)
+{
+ return *((pid_t*) key1) == *((pid_t*) key2);
+}
+
+
+/* structure to describe a specific listener */
+typedef struct lstn_s {
+ uchar *sockName; /* read-only after startup */
+ prop_t *hostName; /* host-name override - if set, use this instead of actual name */
+ int fd; /* read-only after startup */
+ int flags; /* should parser parse host name? read-only after startup */
+ int flowCtl; /* flow control settings for this socket */
+ int ratelimitInterval;
+ int ratelimitBurst;
+ intTiny ratelimitSev; /* severity level (and below) for which rate-limiting shall apply */
+ struct hashtable *ht; /* our hashtable for rate-limiting */
+ sbool bParseHost; /* should parser parse host name? read-only after startup */
+ sbool bCreatePath; /* auto-creation of socket directory? */
+ sbool bUseCreds; /* pull original creator credentials from socket */
+ sbool bWritePid; /* write original PID into tag */
+} lstn_t;
+static lstn_t listeners[MAXFUNIX];
static prop_t *pLocalHostIP = NULL; /* there is only one global IP for all internally-generated messages */
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
-static int startIndexUxLocalSockets; /* process funix from that index on (used to
+static int startIndexUxLocalSockets; /* process fd from that index on (used to
* suppress local logging. rgerhards 2005-08-01
* read-only after startup
*/
-static int funixParseHost[MAXFUNIX] = { 0, }; /* should parser parse host name? read-only after startup */
-static int funixFlags[MAXFUNIX] = { IGNDATE, }; /* should parser parse host name? read-only after startup */
-static int funixCreateSockPath[MAXFUNIX] = { 0, }; /* auto-creation of socket directory? */
-static uchar *funixn[MAXFUNIX] = { (uchar*) _PATH_LOG }; /* read-only after startup */
-static prop_t *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */
-static int funixFlowCtl[MAXFUNIX] = { eFLOWCTL_NO_DELAY, }; /* flow control settings for this socket */
-static int funix[MAXFUNIX] = { -1, }; /* read-only after startup */
-static int nfunix = 1; /* number of Unix sockets open / read-only after startup */
+static int nfd = 1; /* number of Unix sockets open / read-only after startup */
+static int sd_fds = 0; /* number of systemd activated sockets */
/* config settings */
static int bOmitLocalLogging = 0;
static uchar *pLogSockName = NULL;
static uchar *pLogHostName = NULL; /* host name to use with this socket */
static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is used! */
-static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
-#define DFLT_bCreateSockPath 0
-static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? */
+static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
+static int bWritePid = 0; /* use credentials from recvmsg() and fixup PID in TAG */
+static int bWritePidSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */
+#define DFLT_bCreatePath 0
+static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */
+#define DFLT_ratelimitInterval 5
+static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */
+static int ratelimitIntervalSysSock = DFLT_ratelimitInterval;
+#define DFLT_ratelimitBurst 200
+static int ratelimitBurst = DFLT_ratelimitBurst; /* max nbr of messages in interval */
+static int ratelimitBurstSysSock = DFLT_ratelimitBurst; /* max nbr of messages in interval */
+#define DFLT_ratelimitSeverity 1 /* do not rate-limit emergency messages */
+static int ratelimitSeverity = DFLT_ratelimitSeverity;
+static int ratelimitSeveritySysSock = DFLT_ratelimitSeverity;
+
+
+
+static void
+initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst)
+{
+ rs->interval = interval;
+ rs->burst = burst;
+ rs->done = 0;
+ rs->missed = 0;
+ rs->begin = 0;
+}
+
+
+/* ratelimiting support, modelled after the linux kernel
+ * returns 1 if message is within rate limit and shall be
+ * processed, 0 otherwise.
+ * This implementation is NOT THREAD-SAFE and must not
+ * be called concurrently.
+ */
+static inline int
+withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid)
+{
+ int ret;
+ uchar msgbuf[1024];
+
+ if(rs->interval == 0) {
+ ret = 1;
+ goto finalize_it;
+ }
+
+ assert(rs->burst != 0);
+
+ if(rs->begin == 0)
+ rs->begin = tt;
+
+ /* resume if we go out of out time window */
+ if(tt > rs->begin + rs->interval) {
+ if(rs->missed) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "imuxsock lost %u messages from pid %lu due to rate-limiting",
+ rs->missed, (unsigned long) pid);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ rs->missed = 0;
+ }
+ rs->begin = 0;
+ rs->done = 0;
+ }
+
+ /* do actual limit check */
+ if(rs->burst > rs->done) {
+ rs->done++;
+ ret = 1;
+ } else {
+ if(rs->missed == 0) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "imuxsock begins to drop messages from pid %lu due to rate-limiting",
+ (unsigned long) pid);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ }
+ rs->missed++;
+ ret = 0;
+ }
+
+finalize_it:
+ return ret;
+}
/* set the timestamp ignore / not ignore option for the system
@@ -104,7 +242,7 @@ static int bCreateSockPath = DFLT_bCreateSockPath; /* auto-create socket path? *
static rsRetVal setSystemLogTimestampIgnore(void __attribute__((unused)) *pVal, int iNewVal)
{
DEFiRet;
- funixFlags[0] = iNewVal ? IGNDATE : NOFLAG;
+ listeners[0].flags = iNewVal ? IGNDATE : NOFLAG;
RETiRet;
}
@@ -113,7 +251,7 @@ static rsRetVal setSystemLogTimestampIgnore(void __attribute__((unused)) *pVal,
static rsRetVal setSystemLogFlowControl(void __attribute__((unused)) *pVal, int iNewVal)
{
DEFiRet;
- funixFlowCtl[0] = iNewVal ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
+ listeners[0].flowCtl = iNewVal ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
RETiRet;
}
@@ -130,26 +268,40 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
{
DEFiRet;
- if(nfunix < MAXFUNIX) {
+ if(nfd < MAXFUNIX) {
if(*pNewVal == ':') {
- funixParseHost[nfunix] = 1;
+ listeners[nfd].bParseHost = 1;
} else {
- funixParseHost[nfunix] = 0;
+ listeners[nfd].bParseHost = 0;
}
- CHKiRet(prop.Construct(&(funixHName[nfunix])));
+ CHKiRet(prop.Construct(&(listeners[nfd].hostName)));
if(pLogHostName == NULL) {
- CHKiRet(prop.SetString(funixHName[nfunix], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
+ CHKiRet(prop.SetString(listeners[nfd].hostName, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
} else {
- CHKiRet(prop.SetString(funixHName[nfunix], pLogHostName, ustrlen(pLogHostName)));
+ CHKiRet(prop.SetString(listeners[nfd].hostName, pLogHostName, ustrlen(pLogHostName)));
/* reset hostname for next socket */
free(pLogHostName);
pLogHostName = NULL;
}
- CHKiRet(prop.ConstructFinalize(funixHName[nfunix]));
- funixFlowCtl[nfunix] = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
- funixFlags[nfunix] = bIgnoreTimestamp ? IGNDATE : NOFLAG;
- funixCreateSockPath[nfunix] = bCreateSockPath;
- funixn[nfunix++] = pNewVal;
+ CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
+ if(ratelimitInterval > 0) {
+ if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ /* in this case, we simply turn of rate-limiting */
+ dbgprintf("imuxsock: turning off rate limiting because we could not "
+ "create hash table\n");
+ ratelimitInterval = 0;
+ }
+ }
+ listeners[nfd].ratelimitInterval = ratelimitInterval;
+ listeners[nfd].ratelimitBurst = ratelimitBurst;
+ listeners[nfd].ratelimitSev = ratelimitSeverity;
+ listeners[nfd].flowCtl = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
+ listeners[nfd].flags = bIgnoreTimestamp ? IGNDATE : NOFLAG;
+ listeners[nfd].bCreatePath = bCreatePath;
+ listeners[nfd].sockName = pNewVal;
+ listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval) ? 1 : 0;
+ listeners[nfd].bWritePid = bWritePid;
+ nfd++;
} else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
pNewVal);
@@ -160,80 +312,281 @@ finalize_it:
}
-/* free the funixn[] socket names - needed as cleanup on several places
- * note that nfunix is NOT reset! funixn[0] is never freed, as it comes from
+/* discard all log sockets except for "socket" 0. Data for it comes from
* the constant memory pool - and if not, it is freeed via some other pointer.
*/
-static rsRetVal discardFunixn(void)
+static rsRetVal discardLogSockets(void)
{
int i;
- for (i = 1; i < nfunix; i++) {
- if(funixn[i] != NULL) {
- free(funixn[i]);
- funixn[i] = NULL;
+ for (i = 1; i < nfd; i++) {
+ if(listeners[i].sockName != NULL) {
+ free(listeners[i].sockName);
+ listeners[i].sockName = NULL;
}
- if(funixHName[i] != NULL) {
- prop.Destruct(&(funixHName[i]));
+ if(listeners[i].hostName != NULL) {
+ prop.Destruct(&(listeners[i].hostName));
+ }
+ if(listeners[i].ht != NULL) {
+ hashtable_destroy(listeners[i].ht, 1); /* 1 => free all values automatically */
}
}
-
+
return RS_RET_OK;
}
-static int create_unix_socket(const char *path, int bCreatePath)
+/* used to create a log socket if NOT passed in via systemd.
+ */
+static inline rsRetVal
+createLogSocket(lstn_t *pLstn)
{
struct sockaddr_un sunx;
- int fd;
-
- if (path[0] == '\0')
- return -1;
-
- unlink(path);
+ DEFiRet;
+ unlink((char*)pLstn->sockName);
memset(&sunx, 0, sizeof(sunx));
sunx.sun_family = AF_UNIX;
- if(bCreatePath) {
- makeFileParentDirs((uchar*)path, strlen(path), 0755, -1, -1, 0);
+ if(pLstn->bCreatePath) {
+ makeFileParentDirs((uchar*)pLstn->sockName, ustrlen(pLstn->sockName), 0755, -1, -1, 0);
}
- (void) strncpy(sunx.sun_path, path, sizeof(sunx.sun_path));
- fd = socket(AF_UNIX, SOCK_DGRAM, 0);
- if (fd < 0 || bind(fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 ||
- chmod(path, 0666) < 0) {
- errmsg.LogError(errno, NO_ERRCODE, "connot create '%s'", path);
- dbgprintf("cannot create %s (%d).\n", path, errno);
- close(fd);
+ strncpy(sunx.sun_path, (char*)pLstn->sockName, sizeof(sunx.sun_path));
+ pLstn->fd = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if(pLstn->fd < 0 || bind(pLstn->fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 ||
+ chmod((char*)pLstn->sockName, 0666) < 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "connot create '%s'", pLstn->sockName);
+ dbgprintf("cannot create %s (%d).\n", pLstn->sockName, errno);
+ close(pLstn->fd);
+ pLstn->fd = -1;
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+finalize_it:
+ RETiRet;
+}
+
+
+static inline rsRetVal
+openLogSocket(lstn_t *pLstn)
+{
+ DEFiRet;
+ int one;
+
+ if(pLstn->sockName[0] == '\0')
return -1;
+
+ pLstn->fd = -1;
+
+ if (sd_fds > 0) {
+ /* Check if the current socket is a systemd activated one.
+ * If so, just use it.
+ */
+ int fd;
+
+ for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + sd_fds; fd++) {
+ if( sd_is_socket_unix(fd, SOCK_DGRAM, -1, (const char*) pLstn->sockName, 0) == 1) {
+ /* ok, it matches -- just use as is */
+ pLstn->fd = fd;
+
+ dbgprintf("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n",
+ pLstn->sockName, pLstn->fd);
+ break;
+ }
+ /*
+ * otherwise it either didn't matched *this* socket and
+ * we just continue to check the next one or there were
+ * an error and we will create a new socket bellow.
+ */
+ }
+ }
+
+ if (pLstn->fd == -1) {
+ CHKiRet(createLogSocket(pLstn));
+ }
+
+# if HAVE_SCM_CREDENTIALS
+ if(pLstn->bUseCreds) {
+ one = 1;
+ if(setsockopt(pLstn->fd, SOL_SOCKET, SO_PASSCRED, &one, (socklen_t) sizeof(one)) != 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "set SO_PASSCRED failed on '%s'", pLstn->sockName);
+ pLstn->bUseCreds = 0;
+ }
+ if(setsockopt(pLstn->fd, SOL_SOCKET, SCM_CREDENTIALS, &one, sizeof(one)) != 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "set SCM_CREDENTIALS failed on '%s'", pLstn->sockName);
+ pLstn->bUseCreds = 0;
+ }
+ }
+# else /* HAVE_SCM_CREDENTIALS */
+ pLstn->bUseCreds = 0;
+# endif /* HAVE_SCM_CREDENTIALS */
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ close(pLstn->fd);
+ pLstn->fd = -1;
+ }
+
+ RETiRet;
+}
+
+
+/* find ratelimiter to use for this message. Currently, we use the
+ * pid, but may change to cgroup later (probably via a config switch).
+ * Returns NULL if not found.
+ */
+static inline rsRetVal
+findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
+{
+ rs_ratelimit_state_t *rl;
+ int r;
+ pid_t *keybuf;
+ DEFiRet;
+
+ if(cred == NULL)
+ FINALIZE;
+
+ rl = hashtable_search(pLstn->ht, &cred->pid);
+ if(rl == NULL) {
+ /* we need to add a new ratelimiter, process not seen before! */
+ dbgprintf("imuxsock: no ratelimiter for pid %lu, creating one\n",
+ (unsigned long) cred->pid);
+ STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters);
+ CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t)));
+ CHKmalloc(keybuf = malloc(sizeof(pid_t)));
+ *keybuf = cred->pid;
+ initRatelimitState(rl, ratelimitInterval, pLstn->ratelimitBurst);
+ r = hashtable_insert(pLstn->ht, keybuf, rl);
+ if(r == 0)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
- return fd;
+
+ *prl = rl;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* patch correct pid into tag. bufTAG MUST be CONF_TAG_MAXSIZE long!
+ */
+static inline void
+fixPID(uchar *bufTAG, int *lenTag, struct ucred *cred)
+{
+ int i;
+ char bufPID[16];
+ int lenPID;
+
+ if(cred == NULL)
+ return;
+
+ lenPID = snprintf(bufPID, sizeof(bufPID), "[%lu]:", (unsigned long) cred->pid);
+
+ for(i = *lenTag ; i >= 0 && bufTAG[i] != '[' ; --i)
+ /*JUST SKIP*/;
+
+ if(i < 0)
+ i = *lenTag - 1; /* go right at end of TAG, pid was not present (-1 for ':') */
+
+ if(i + lenPID > CONF_TAG_MAXSIZE)
+ return; /* do not touch, as things would break */
+
+ memcpy(bufTAG + i, bufPID, lenPID);
+ *lenTag = i + lenPID;
}
/* submit received message to the queue engine
+ * We now parse the message according to expected format so that we
+ * can also mangle it if necessary.
*/
static inline rsRetVal
-SubmitMsg(uchar *pRcv, int lenRcv, int iSock)
+SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
{
msg_t *pMsg;
+ int lenMsg;
+ int offs;
+ int i;
+ uchar *parse;
+ int pri;
+ int facil;
+ int sever;
+ uchar bufParseTAG[CONF_TAG_MAXSIZE];
+ struct syslogTime st;
+ time_t tt;
+ rs_ratelimit_state_t *ratelimiter = NULL;
DEFiRet;
+ /* TODO: handle format errors?? */
+ /* we need to parse the pri first, because we need the severity for
+ * rate-limiting as well.
+ */
+ parse = pRcv;
+ lenMsg = lenRcv;
+ offs = 1; /* '<' */
+
+ parse++;
+ pri = 0;
+ while(offs < lenMsg && isdigit(*parse)) {
+ pri = pri * 10 + *parse - '0';
+ ++parse;
+ ++offs;
+ }
+ facil = LOG_FAC(pri);
+ sever = LOG_PRI(pri);
+
+ if(sever >= pLstn->ratelimitSev) {
+ /* note: if cred == NULL, then ratelimiter == NULL as well! */
+ findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */
+ }
+
+ datetime.getCurrTime(&st, &tt);
+ if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
+ STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
+ FINALIZE;
+ }
+
/* we now create our own message object and submit it to the queue */
- CHKiRet(msgConstruct(&pMsg));
+ CHKiRet(msgConstructWithTime(&pMsg, &st, tt));
MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv);
+ parser.SanitizeMsg(pMsg);
+ lenMsg = pMsg->iLenRawMsg - offs;
MsgSetInputName(pMsg, pInputName);
- MsgSetFlowControlType(pMsg, funixFlowCtl[iSock]);
+ MsgSetFlowControlType(pMsg, pLstn->flowCtl);
+
+ pMsg->iFacility = facil;
+ pMsg->iSeverity = sever;
+ MsgSetAfterPRIOffs(pMsg, offs);
- if(funixParseHost[iSock]) {
- pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING | PARSE_HOSTNAME;
+ parse++; lenMsg--; /* '>' */
+
+ if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
+ DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
+ }
+
+ /* pull tag */
+
+ i = 0;
+ while(lenMsg > 0 && *parse != ' ' && i < CONF_TAG_MAXSIZE) {
+ bufParseTAG[i++] = *parse++;
+ --lenMsg;
+ }
+ bufParseTAG[i] = '\0'; /* terminate string */
+ if(pLstn->bWritePid)
+ fixPID(bufParseTAG, &i, cred);
+ MsgSetTAG(pMsg, bufParseTAG, i);
+
+ MsgSetMSGoffs(pMsg, pMsg->iLenRawMsg - lenMsg);
+
+ if(pLstn->bParseHost) {
+ pMsg->msgFlags = pLstn->flags | PARSE_HOSTNAME;
} else {
- pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING;
+ pMsg->msgFlags = pLstn->flags;
}
- MsgSetRcvFrom(pMsg, funixHName[iSock]);
+ MsgSetRcvFrom(pMsg, pLstn->hostName);
CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
CHKiRet(submitMsg(pMsg));
+ STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
finalize_it:
RETiRet;
}
@@ -246,15 +599,22 @@ finalize_it:
* of the socket which is to be processed. This eases access to the
* growing number of properties. -- rgerhards, 2008-08-01
*/
-static rsRetVal readSocket(int fd, int iSock)
+static rsRetVal readSocket(lstn_t *pLstn)
{
DEFiRet;
int iRcvd;
int iMaxLine;
+ struct msghdr msgh;
+ struct iovec msgiov;
+# if HAVE_SCM_CREDENTIALS
+ struct cmsghdr *cm;
+# endif
+ struct ucred *cred;
uchar bufRcv[4096+1];
+ char aux[128];
uchar *pRcv = NULL; /* receive buffer */
- assert(iSock >= 0);
+ assert(pLstn->fd >= 0);
iMaxLine = glbl.GetMaxLine();
@@ -270,15 +630,44 @@ static rsRetVal readSocket(int fd, int iSock)
CHKmalloc(pRcv = (uchar*) MALLOC(sizeof(uchar) * (iMaxLine + 1)));
}
- iRcvd = recv(fd, pRcv, iMaxLine, 0);
- dbgprintf("Message from UNIX socket: #%d\n", fd);
- if (iRcvd > 0) {
- CHKiRet(SubmitMsg(pRcv, iRcvd, iSock));
- } else if (iRcvd < 0 && errno != EINTR) {
+ memset(&msgh, 0, sizeof(msgh));
+ memset(&msgiov, 0, sizeof(msgiov));
+# if HAVE_SCM_CREDENTIALS
+ if(pLstn->bUseCreds) {
+ memset(&aux, 0, sizeof(aux));
+ msgh.msg_control = aux;
+ msgh.msg_controllen = sizeof(aux);
+ }
+# endif
+ msgiov.iov_base = pRcv;
+ msgiov.iov_len = iMaxLine;
+ msgh.msg_iov = &msgiov;
+ msgh.msg_iovlen = 1;
+ iRcvd = recvmsg(pLstn->fd, &msgh, MSG_DONTWAIT);
+
+ dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd);
+ if(iRcvd > 0) {
+ cred = NULL;
+# if HAVE_SCM_CREDENTIALS
+ if(pLstn->bUseCreds) {
+ dbgprintf("XXX: pre CM loop, length of control message %d\n", (int) msgh.msg_controllen);
+ for (cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) {
+ dbgprintf("XXX: in CM loop, %d, %d\n", cm->cmsg_level, cm->cmsg_type);
+ if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) {
+ cred = (struct ucred*) CMSG_DATA(cm);
+ dbgprintf("XXX: got credentials pid %d\n", (int) cred->pid);
+ break;
+ }
+ }
+ dbgprintf("XXX: post CM loop\n");
+ }
+# endif /* HAVE_SCM_CREDENTIALS */
+ CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred));
+ } else if(iRcvd < 0 && errno != EINTR) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr);
- errmsg.LogError(errno, NO_ERRCODE, "recvfrom UNIX");
+ errmsg.LogError(errno, NO_ERRCODE, "imuxsock: recvfrom UNIX");
}
finalize_it:
@@ -317,10 +706,11 @@ CODESTARTrunInput
maxfds = 0;
FD_ZERO (pReadfds);
/* Copy master connections */
- for (i = startIndexUxLocalSockets; i < nfunix; i++) {
- if (funix[i] != -1) {
- FD_SET(funix[i], pReadfds);
- if (funix[i]>maxfds) maxfds=funix[i];
+ for (i = startIndexUxLocalSockets; i < nfd; i++) {
+ if (listeners[i].fd!= -1) {
+ FD_SET(listeners[i].fd, pReadfds);
+ if(listeners[i].fd > maxfds)
+ maxfds=listeners[i].fd;
}
}
@@ -337,11 +727,11 @@ CODESTARTrunInput
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
- for (i = 0; i < nfunix && nfds > 0; i++) {
+ for (i = 0; i < nfd && nfds > 0; i++) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM); /* terminate input! */
- if ((fd = funix[i]) != -1 && FD_ISSET(fd, pReadfds)) {
- readSocket(fd, i);
+ if ((fd = listeners[i].fd) != -1 && FD_ISSET(fd, pReadfds)) {
+ readSocket(&(listeners[i]));
--nfds; /* indicate we have processed one */
}
}
@@ -356,6 +746,7 @@ ENDrunInput
BEGINwillRun
CODESTARTwillRun
register int i;
+ int actSocks;
/* first apply some config settings */
# ifdef OS_SOLARIS
@@ -369,12 +760,39 @@ CODESTARTwillRun
startIndexUxLocalSockets = bOmitLocalLogging ? 1 : 0;
# endif
if(pLogSockName != NULL)
- funixn[0] = pLogSockName;
+ listeners[0].sockName = pLogSockName;
+ if(ratelimitIntervalSysSock > 0) {
+ if((listeners[0].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ /* in this case, we simply turn of rate-limiting */
+ dbgprintf("imuxsock: turning off rate limiting because we could not "
+ "create hash table\n");
+ ratelimitIntervalSysSock = 0;
+ }
+ }
+ listeners[0].ratelimitInterval = ratelimitIntervalSysSock;
+ listeners[0].ratelimitBurst = ratelimitBurstSysSock;
+ listeners[0].ratelimitSev = ratelimitSeveritySysSock;
+ listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0;
+ listeners[0].bWritePid = bWritePidSysSock;
+
+ sd_fds = sd_listen_fds(0);
+ if (sd_fds < 0) {
+ errmsg.LogError(-sd_fds, NO_ERRCODE, "imuxsock: Failed to acquire systemd socket");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
/* initialize and return if will run or not */
- for (i = startIndexUxLocalSockets ; i < nfunix ; i++) {
- if ((funix[i] = create_unix_socket((char*) funixn[i], funixCreateSockPath[i])) != -1)
- dbgprintf("Opened UNIX socket '%s' (fd %d).\n", funixn[i], funix[i]);
+ actSocks = 0;
+ for (i = startIndexUxLocalSockets ; i < nfd ; i++) {
+ if(openLogSocket(&(listeners[i])) == RS_RET_OK) {
+ ++actSocks;
+ dbgprintf("imuxsock: Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd);
+ }
+ }
+
+ if(actSocks == 0) {
+ errmsg.LogError(0, NO_ERRCODE, "imuxsock does not run because we could not aquire any socket\n");
+ ABORT_FINALIZE(RS_RET_ERR);
}
/* we need to create the inputName property (only once during our lifetime) */
@@ -391,31 +809,49 @@ CODESTARTafterRun
int i;
/* do cleanup here */
/* Close the UNIX sockets. */
- for (i = 0; i < nfunix; i++)
- if (funix[i] != -1)
- close(funix[i]);
-
- /* Clean-up files. */
- for(i = startIndexUxLocalSockets; i < nfunix; i++)
- if (funixn[i] && funix[i] != -1)
- unlink((char*) funixn[i]);
+ for (i = 0; i < nfd; i++)
+ if (listeners[i].fd != -1)
+ close(listeners[i].fd);
+
+ /* Clean-up files. */
+ for(i = startIndexUxLocalSockets; i < nfd; i++)
+ if (listeners[i].sockName && listeners[i].fd != -1) {
+
+ /* If systemd passed us a socket it is systemd's job to clean it up.
+ * Do not unlink it -- we will get same socket (node) from systemd
+ * e.g. on restart again.
+ */
+ if (sd_fds > 0 &&
+ listeners[i].fd >= SD_LISTEN_FDS_START &&
+ listeners[i].fd < SD_LISTEN_FDS_START + sd_fds)
+ continue;
+
+ DBGPRINTF("imuxsock: unlinking unix socket file[%d] %s\n", i, listeners[i].sockName);
+ unlink((char*) listeners[i].sockName);
+ }
/* free no longer needed string */
free(pLogSockName);
free(pLogHostName);
- discardFunixn();
- nfunix = 1;
+ discardLogSockets();
+ nfd = 1;
if(pInputName != NULL)
prop.Destruct(&pInputName);
+
ENDafterRun
BEGINmodExit
CODESTARTmodExit
+ statsobj.Destruct(&modStats);
+
+ objRelease(parser, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
ENDmodExit
@@ -444,11 +880,19 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
pLogHostName = NULL;
}
- discardFunixn();
- nfunix = 1;
+ discardLogSockets();
+ nfd = 1;
bIgnoreTimestamp = 1;
bUseFlowCtl = 0;
- bCreateSockPath = DFLT_bCreateSockPath;
+ bWritePid = 0;
+ bWritePidSysSock = 0;
+ bCreatePath = DFLT_bCreatePath;
+ ratelimitInterval = DFLT_ratelimitInterval;
+ ratelimitIntervalSysSock = DFLT_ratelimitInterval;
+ ratelimitBurst = DFLT_ratelimitBurst;
+ ratelimitBurstSysSock = DFLT_ratelimitBurst;
+ ratelimitSeverity = DFLT_ratelimitSeverity;
+ ratelimitSeveritySysSock = DFLT_ratelimitSeverity;
return RS_RET_OK;
}
@@ -462,13 +906,26 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION);
- /* initialize funixn[] array */
+ /* init system log socket settings */
+ listeners[0].flags = IGNDATE;
+ listeners[0].sockName = UCHAR_CONSTANT(_PATH_LOG);
+ listeners[0].hostName = NULL;
+ listeners[0].flowCtl = eFLOWCTL_NO_DELAY;
+ listeners[0].fd = -1;
+ listeners[0].bParseHost = 0;
+ listeners[0].bUseCreds = 0;
+ listeners[0].bCreatePath = 0;
+
+ /* initialize socket names */
for(i = 1 ; i < MAXFUNIX ; ++i) {
- funixn[i] = NULL;
- funix[i] = -1;
+ listeners[i].sockName = NULL;
+ listeners[i].fd = -1;
}
CHKiRet(prop.Construct(&pLocalHostIP));
@@ -476,9 +933,9 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(prop.ConstructFinalize(pLocalHostIP));
/* now init listen socket zero, the local log socket */
- CHKiRet(prop.Construct(&(funixHName[0])));
- CHKiRet(prop.SetString(funixHName[0], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
- CHKiRet(prop.ConstructFinalize(funixHName[0]));
+ CHKiRet(prop.Construct(&(listeners[0].hostName)));
+ CHKiRet(prop.SetString(listeners[0].hostName, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
+ CHKiRet(prop.ConstructFinalize(listeners[0].hostName));
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"omitlocallogging", 0, eCmdHdlrBinary,
@@ -492,9 +949,17 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary,
NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary,
- NULL, &bCreateSockPath, STD_LOADABLE_MODULE_ID));
+ NULL, &bCreatePath, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary,
+ NULL, &bWritePid, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt,
+ NULL, &ratelimitInterval, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitburst", 0, eCmdHdlrInt,
+ NULL, &ratelimitBurst, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitseverity", 0, eCmdHdlrInt,
+ NULL, &ratelimitSeverity, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
/* the following one is a (dirty) trick: the system log socket is not added via
@@ -507,6 +972,26 @@ CODEmodInit_QueryRegCFSLineHdlr
setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary,
setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary,
+ NULL, &bWritePidSysSock, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt,
+ NULL, &ratelimitIntervalSysSock, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitburst", 0, eCmdHdlrInt,
+ NULL, &ratelimitBurstSysSock, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitseverity", 0, eCmdHdlrInt,
+ NULL, &ratelimitSeveritySysSock, STD_LOADABLE_MODULE_ID));
+
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&modStats));
+ CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imuxsock")));
+ CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &ctrSubmit));
+ CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("ratelimit.discarded"),
+ ctrType_IntCtr, &ctrLostRatelimit));
+ CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("ratelimit.numratelimiters"),
+ ctrType_IntCtr, &ctrNumRatelimiters));
+ CHKiRet(statsobj.ConstructFinalize(modStats));
+
ENDmodInit
/* vim:set ai:
*/