summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-09-28 17:26:28 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-09-28 17:26:28 +0200
commit054d2ccdd6044f94823f8facbda935cb70646333 (patch)
treef2051f857a4413062475a7ae42c23299aeeced6d
parent01a8807174d91a7936345d1172a87f98bbba61c4 (diff)
downloadrsyslog-054d2ccdd6044f94823f8facbda935cb70646333.tar.gz
rsyslog-054d2ccdd6044f94823f8facbda935cb70646333.tar.xz
rsyslog-054d2ccdd6044f94823f8facbda935cb70646333.zip
imuxsock: added per-socket hash tables/rate limiters & severity filter
rate limiting now applies only to messages with a given severity or above. By default, emergency messages are NOT rate-limited.
-rw-r--r--ChangeLog1
-rw-r--r--plugins/imuxsock/imuxsock.c159
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--tests/syslog_caller.c20
4 files changed, 126 insertions, 55 deletions
diff --git a/ChangeLog b/ChangeLog
index 6622b67c..ca1428b4 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -5,6 +5,7 @@ Version 5.7.1 [V5-DEVEL] (rgerhards), 2010-09-??
- added new config statements
* $InputUnixListenSocketUsePIDFromSystem
* $SystemLogUsePIDFromSystem
+- imuxsock now supports up to 50 different sockets for input
---------------------------------------------------------------------------
Version 5.7.0 [V5-DEVEL] (rgerhards), 2010-09-16
- added module impstat to emit periodic statistics on rsyslog counters
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 728456b6..d500fc54 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -57,7 +57,7 @@
MODULE_TYPE_INPUT
/* defines */
-#define MAXFUNIX 20
+#define MAXFUNIX 50
#ifndef _PATH_LOG
#ifdef BSD
#define _PATH_LOG "/var/run/log"
@@ -94,8 +94,6 @@ struct rs_ratelimit_state {
};
typedef struct rs_ratelimit_state rs_ratelimit_state_t;
-/* support for per-process ratelimiting */
-static struct hashtable *hashtab;
/* a very simple "hash function" for process IDs - we simply use the
* pid itself: it is quite expected that all pids may log some time, but
@@ -118,14 +116,19 @@ key_equals_fn(void *key1, void *key2)
/* structure to describe a specific listener */
typedef struct lstn_s {
- uchar *sockName; /* read-only after startup */
- prop_t *hostName; /* host-name override - if set, use this instead of actual name */
- int fd; /* read-only after startup */
- int flags; /* should parser parse host name? read-only after startup */
- int flowCtl; /* flow control settings for this socket */
- sbool bParseHost; /* should parser parse host name? read-only after startup */
- sbool bUseCreds;
- sbool bCreatePath; /* auto-creation of socket directory? */
+ uchar *sockName; /* read-only after startup */
+ prop_t *hostName; /* host-name override - if set, use this instead of actual name */
+ int fd; /* read-only after startup */
+ int flags; /* should parser parse host name? read-only after startup */
+ int flowCtl; /* flow control settings for this socket */
+ int ratelimitInterval;
+ int ratelimitBurst;
+ intTiny ratelimitSev; /* severity level (and below) for which rate-limiting shall apply */
+ struct hashtable *ht; /* our hashtable for rate-limiting */
+ sbool bParseHost; /* should parser parse host name? read-only after startup */
+ sbool bCreatePath; /* auto-creation of socket directory? */
+ sbool bUseCreds; /* pull original creator credentials from socket */
+ sbool bWritePid; /* write original PID into tag */
} lstn_t;
static lstn_t listeners[MAXFUNIX];
@@ -142,15 +145,20 @@ static int bOmitLocalLogging = 0;
static uchar *pLogSockName = NULL;
static uchar *pLogHostName = NULL; /* host name to use with this socket */
static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is used! */
-static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
-static int bUseCreds = 0; /* use credentials from recvmsg() and fixup PID in TAG */
-static int bUseCredsSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */
+static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
+static int bWritePid = 0; /* use credentials from recvmsg() and fixup PID in TAG */
+static int bWritePidSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */
#define DFLT_bCreatePath 0
static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */
#define DFLT_ratelimitInterval 2
static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */
+static int ratelimitIntervalSysSock = DFLT_ratelimitInterval;
#define DFLT_ratelimitBurst 200
static int ratelimitBurst = DFLT_ratelimitBurst; /* max nbr of messages in interval */
+static int ratelimitBurstSysSock = DFLT_ratelimitBurst; /* max nbr of messages in interval */
+#define DFLT_ratelimitSeverity 1 /* do not rate-limit emergency messages */
+static int ratelimitSeverity = DFLT_ratelimitSeverity;
+static int ratelimitSeveritySysSock = DFLT_ratelimitSeverity;
@@ -192,7 +200,7 @@ withinRatelimit(struct rs_ratelimit_state *rs, time_t tt)
if(rs->missed) {
snprintf((char*)msgbuf, sizeof(msgbuf),
"imuxsock lost %u messages due to rate-limiting", rs->missed);
- logmsgInternal(0, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
rs->missed = 0;
}
rs->begin = 0;
@@ -262,11 +270,23 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
pLogHostName = NULL;
}
CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
+ if(ratelimitInterval > 0) {
+ if((listeners[nfd].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn)) == NULL) {
+ /* in this case, we simply turn of rate-limiting */
+ dbgprintf("imuxsock: turning off rate limiting because we could not "
+ "create hash table\n");
+ ratelimitInterval = 0;
+ }
+ }
+ listeners[nfd].ratelimitInterval = ratelimitInterval;
+ listeners[nfd].ratelimitBurst = ratelimitBurst;
+ listeners[nfd].ratelimitSev = ratelimitSeverity;
listeners[nfd].flowCtl = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
listeners[nfd].flags = bIgnoreTimestamp ? IGNDATE : NOFLAG;
listeners[nfd].bCreatePath = bCreatePath;
listeners[nfd].sockName = pNewVal;
- listeners[nfd].bUseCreds = bUseCreds;
+ listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval) ? 1 : 0;
+ listeners[nfd].bWritePid = bWritePid;
nfd++;
} else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
@@ -278,11 +298,10 @@ finalize_it:
}
-/* free the sockName[] socket names - needed as cleanup on several places
- * note that nfd is NOT reset! sockName[0] is never freed, as it comes from
+/* discard all log sockets except for "socket" 0. Data for it comes from
* the constant memory pool - and if not, it is freeed via some other pointer.
*/
-static rsRetVal discardFunixn(void)
+static rsRetVal discardLogSockets(void)
{
int i;
@@ -294,6 +313,9 @@ static rsRetVal discardFunixn(void)
if(listeners[i].hostName != NULL) {
prop.Destruct(&(listeners[i].hostName));
}
+ if(listeners[i].ht != NULL) {
+ hashtable_destroy(listeners[i].ht, 1); /* 1 => free all values automatically */
+ }
}
return RS_RET_OK;
@@ -410,7 +432,7 @@ finalize_it:
* Returns NULL if not found.
*/
static inline rsRetVal
-findRatelimiter(struct ucred *cred, rs_ratelimit_state_t **prl)
+findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
{
rs_ratelimit_state_t *rl;
int r;
@@ -420,7 +442,7 @@ findRatelimiter(struct ucred *cred, rs_ratelimit_state_t **prl)
if(cred == NULL)
FINALIZE;
- rl = hashtable_search(hashtab, &cred->pid);
+ rl = hashtable_search(pLstn->ht, &cred->pid);
if(rl == NULL) {
/* we need to add a new ratelimiter, process not seen before! */
dbgprintf("imuxsock: no ratelimiter for pid %lu, creating one\n",
@@ -429,8 +451,8 @@ findRatelimiter(struct ucred *cred, rs_ratelimit_state_t **prl)
CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t)));
CHKmalloc(keybuf = malloc(sizeof(pid_t)));
*keybuf = cred->pid;
- initRatelimitState(rl, ratelimitInterval, ratelimitBurst);
- r = hashtable_insert(hashtab, keybuf, rl);
+ initRatelimitState(rl, ratelimitInterval, pLstn->ratelimitBurst);
+ r = hashtable_insert(pLstn->ht, keybuf, rl);
if(r == 0)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
@@ -482,13 +504,33 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
int i;
uchar *parse;
int pri;
+ int facil;
+ int sever;
uchar bufParseTAG[CONF_TAG_MAXSIZE];
struct syslogTime st;
time_t tt;
- rs_ratelimit_state_t *ratelimiter;
+ rs_ratelimit_state_t *ratelimiter = NULL;
DEFiRet;
- findRatelimiter(cred, &ratelimiter); /* ignore error, better so than others... */
+// TODO: handle format errors??
+ /* we need to parse the pri first, because we need the severity for
+ * rate-limiting as well.
+ */
+ parse = pRcv;
+ lenMsg = lenRcv;
+
+ parse++; lenMsg--; /* '<' */
+ pri = 0;
+ while(lenMsg && isdigit(*parse)) {
+ pri = pri * 10 + *parse - '0';
+ ++parse;
+ --lenMsg;
+ }
+ facil = LOG_FAC(pri);
+ sever = LOG_PRI(pri);
+
+ if(sever >= pLstn->ratelimitSev)
+ findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */
datetime.getCurrTime(&st, &tt);
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt)) {
@@ -502,26 +544,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
MsgSetInputName(pMsg, pInputName);
MsgSetFlowControlType(pMsg, pLstn->flowCtl);
-
-// TODO: handle format errors
- parse = pRcv;
- lenMsg = lenRcv;
-
- parse++; lenMsg--; /* '<' */
- pri = 0;
- while(lenMsg && isdigit(*parse)) {
- pri = pri * 10 + *parse - '0';
- ++parse;
- --lenMsg;
- }
- pMsg->iFacility = LOG_FAC(pri);
- pMsg->iSeverity = LOG_PRI(pri);
+ pMsg->iFacility = facil;
+ pMsg->iSeverity = sever;
MsgSetAfterPRIOffs(pMsg, lenRcv - lenMsg);
parse++; lenMsg--; /* '>' */
if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
- dbgprintf("we have a problem, invalid timestamp in msg!\n");
+ DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
}
/* pull tag */
@@ -532,7 +562,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
--lenMsg;
}
bufParseTAG[i] = '\0'; /* terminate string */
- fixPID(bufParseTAG, &i, cred);
+ if(pLstn->bWritePid)
+ fixPID(bufParseTAG, &i, cred);
MsgSetTAG(pMsg, bufParseTAG, i);
MsgSetMSGoffs(pMsg, lenRcv - lenMsg);
@@ -716,7 +747,19 @@ CODESTARTwillRun
# endif
if(pLogSockName != NULL)
listeners[0].sockName = pLogSockName;
- listeners[0].bUseCreds = bUseCredsSysSock;
+ if(ratelimitIntervalSysSock > 0) {
+ if((listeners[0].ht = create_hashtable(1000, hash_from_key_fn, key_equals_fn)) == NULL) {
+ /* in this case, we simply turn of rate-limiting */
+ dbgprintf("imuxsock: turning off rate limiting because we could not "
+ "create hash table\n");
+ ratelimitIntervalSysSock = 0;
+ }
+ }
+ listeners[0].ratelimitInterval = ratelimitIntervalSysSock;
+ listeners[0].ratelimitBurst = ratelimitBurstSysSock;
+ listeners[0].ratelimitSev = ratelimitSeveritySysSock;
+ listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0;
+ listeners[0].bWritePid = bWritePidSysSock;
/* initialize and return if will run or not */
actSocks = 0;
@@ -724,7 +767,7 @@ CODESTARTwillRun
if(openLogSocket(&(listeners[i])) == RS_RET_OK) {
++actSocks;
dbgprintf("Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd);
- }
+ }
}
if(actSocks == 0) {
@@ -737,8 +780,6 @@ CODESTARTwillRun
CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imuxsock"), sizeof("imuxsock") - 1));
CHKiRet(prop.ConstructFinalize(pInputName));
- CHKmalloc(hashtab = create_hashtable(1000, hash_from_key_fn, key_equals_fn));
-
finalize_it:
ENDwillRun
@@ -765,16 +806,12 @@ CODESTARTafterRun
free(pLogSockName);
free(pLogHostName);
- discardFunixn();
+ discardLogSockets();
nfd = 1;
if(pInputName != NULL)
prop.Destruct(&pInputName);
- if(hashtab != NULL) {
- hashtable_destroy(hashtab, 1); /* 1 => free all values automatically */
- hashtab = NULL;
- }
ENDafterRun
@@ -815,15 +852,19 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
pLogHostName = NULL;
}
- discardFunixn();
+ discardLogSockets();
nfd = 1;
bIgnoreTimestamp = 1;
bUseFlowCtl = 0;
- bUseCreds = 0;
- bUseCredsSysSock = 0;
+ bWritePid = 0;
+ bWritePidSysSock = 0;
bCreatePath = DFLT_bCreatePath;
ratelimitInterval = DFLT_ratelimitInterval;
+ ratelimitIntervalSysSock = DFLT_ratelimitInterval;
ratelimitBurst = DFLT_ratelimitBurst;
+ ratelimitBurstSysSock = DFLT_ratelimitBurst;
+ ratelimitSeverity = DFLT_ratelimitSeverity;
+ ratelimitSeveritySysSock = DFLT_ratelimitSeverity;
return RS_RET_OK;
}
@@ -881,13 +922,15 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary,
NULL, &bCreatePath, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary,
- NULL, &bUseCreds, STD_LOADABLE_MODULE_ID));
+ NULL, &bWritePid, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt,
NULL, &ratelimitInterval, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitburst", 0, eCmdHdlrInt,
NULL, &ratelimitBurst, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitseverity", 0, eCmdHdlrInt,
+ NULL, &ratelimitSeverity, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
/* the following one is a (dirty) trick: the system log socket is not added via
@@ -901,7 +944,13 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary,
setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary,
- NULL, &bUseCredsSysSock, STD_LOADABLE_MODULE_ID));
+ NULL, &bWritePidSysSock, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt,
+ NULL, &ratelimitIntervalSysSock, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitburst", 0, eCmdHdlrInt,
+ NULL, &ratelimitBurstSysSock, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitseverity", 0, eCmdHdlrInt,
+ NULL, &ratelimitSeveritySysSock, STD_LOADABLE_MODULE_ID));
/* support statistics gathering */
CHKiRet(statsobj.Construct(&modStats));
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index c58c259b..43203378 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -457,6 +457,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_EPOLL_CTL_FAILED = -2174, /**< epoll_ctl() failed */
RS_RET_INTERNAL_ERROR = -2175, /**< rsyslogd internal error, unexpected code path reached */
RS_RET_ERR_CRE_AFUX = -2176, /**< error creating AF_UNIX socket (and binding it) */
+ RS_RET_RATE_LIMITED = -2177, /**< some messages discarded due to exceeding a rate limit */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/tests/syslog_caller.c b/tests/syslog_caller.c
new file mode 100644
index 00000000..91a1f08b
--- /dev/null
+++ b/tests/syslog_caller.c
@@ -0,0 +1,20 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <syslog.h>
+
+int main(int argc, char *argv[])
+{
+ int i;
+ int sev = 0;
+ if(argc != 2) {
+ fprintf(stderr, "usage: syslog_caller num-messages\n");
+ exit(1);
+ }
+
+ int msgs = atoi(argv[1]);
+
+ for(i = 0 ; i < msgs ; ++i) {
+ syslog(sev % 8, "test message nbr %d, severity=%d", i, sev % 8);
+ sev++;
+ }
+}