summaryrefslogtreecommitdiffstats
path: root/plugins/imuxsock/imuxsock.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-09-27 15:06:30 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-09-27 15:06:30 +0200
commitb5da6352830e9841dd367b8490d79461adb5cb22 (patch)
tree91c0ea46bc6bcee8cffb3424b7d0d9317dd30d82 /plugins/imuxsock/imuxsock.c
parent3773aadcdd8008b97bb532f6d29fe29d17b06159 (diff)
downloadrsyslog-b5da6352830e9841dd367b8490d79461adb5cb22.tar.gz
rsyslog-b5da6352830e9841dd367b8490d79461adb5cb22.tar.xz
rsyslog-b5da6352830e9841dd367b8490d79461adb5cb22.zip
first shot at imuxsock ratelimiting
works, but at a global level, need to go down to pid or cgroup
Diffstat (limited to 'plugins/imuxsock/imuxsock.c')
-rw-r--r--plugins/imuxsock/imuxsock.c98
1 files changed, 95 insertions, 3 deletions
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index df2b8efc..ccd31b44 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -81,6 +81,15 @@ DEFobjCurrIf(statsobj)
statsobj_t *modStats;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit)
+
+struct rs_ratelimit_state {
+ unsigned short interval;
+ unsigned short burst;
+ unsigned done;
+ unsigned missed;
+ time_t begin;
+} ratelimiter;
/* structure to describe a specific listener */
typedef struct lstn_s {
@@ -113,6 +122,70 @@ static int bUseCreds = 0; /* use credentials from recvmsg() and fixup PID in TA
static int bUseCredsSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */
#define DFLT_bCreatePath 0
static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */
+#define DFLT_ratelimitInterval 2
+static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */
+#define DFLT_ratelimitBurst 200
+static int ratelimitBurst = DFLT_ratelimitBurst; /* max nbr of messages in interval */
+
+
+
+static void
+initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst)
+{
+ rs->interval = interval;
+ rs->burst = burst;
+ rs->done = 0;
+ rs->missed = 0;
+ rs->begin = 0;
+}
+
+
+/* ratelimiting support, modelled after the linux kernel
+ * returns 1 if message is within rate limit and shall be
+ * processed, 0 otherwise.
+ * This implementation is NOT THREAD-SAFE and must not
+ * be called concurrently.
+ */
+static inline int
+withinRatelimit(struct rs_ratelimit_state *rs, time_t tt)
+{
+ int ret;
+ uchar msgbuf[1024];
+
+ if(rs->interval == 0) {
+ ret = 1;
+ goto finalize_it;
+ }
+
+ assert(rs->burst != 0);
+
+ if(rs->begin == 0)
+ rs->begin = tt;
+
+ /* resume if we go out of out time window */
+ if(tt > rs->begin + rs->interval) {
+ if(rs->missed) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "imuxsock lost %u messages due to rate-limiting", rs->missed);
+ logmsgInternal(0, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ rs->missed = 0;
+ }
+ rs->begin = 0;
+ rs->done = 0;
+ }
+
+ /* do actual limit check */
+ if(rs->burst > rs->done) {
+ rs->done++;
+ ret = 1;
+ } else {
+ rs->missed++;
+ ret = 0;
+ }
+
+finalize_it:
+ return ret;
+}
/* set the timestamp ignore / not ignore option for the system
@@ -348,14 +421,23 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
uchar *parse;
int pri;
uchar bufParseTAG[CONF_TAG_MAXSIZE];
+ struct syslogTime st;
+ time_t tt;
DEFiRet;
+ datetime.getCurrTime(&st, &tt);
+ if(!withinRatelimit(&ratelimiter, tt)) {
+ STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
+ FINALIZE;
+ }
+
/* we now create our own message object and submit it to the queue */
- CHKiRet(msgConstruct(&pMsg));
+ CHKiRet(msgConstructWithTime(&pMsg, &st, tt));
MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv);
MsgSetInputName(pMsg, pInputName);
MsgSetFlowControlType(pMsg, pLstn->flowCtl);
+
// TODO: handle format errors
parse = pRcv;
lenMsg = lenRcv;
@@ -504,6 +586,8 @@ CODESTARTrunInput
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
*/
+ initRatelimitState(&ratelimiter, ratelimitInterval, ratelimitBurst);
+
while(1) {
/* Add the Unix Domain Sockets to the list of read
* descriptors.
@@ -668,6 +752,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
bUseCreds = 0;
bUseCredsSysSock = 0;
bCreatePath = DFLT_bCreatePath;
+ ratelimitInterval = DFLT_ratelimitInterval;
+ ratelimitBurst = DFLT_ratelimitBurst;
return RS_RET_OK;
}
@@ -726,10 +812,12 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &bCreatePath, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary,
NULL, &bUseCreds, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary,
- NULL, &bUseCredsSysSock, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt,
+ NULL, &ratelimitInterval, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitburst", 0, eCmdHdlrInt,
+ NULL, &ratelimitBurst, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
/* the following one is a (dirty) trick: the system log socket is not added via
@@ -742,12 +830,16 @@ CODEmodInit_QueryRegCFSLineHdlr
setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary,
setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary,
+ NULL, &bUseCredsSysSock, STD_LOADABLE_MODULE_ID));
/* support statistics gathering */
CHKiRet(statsobj.Construct(&modStats));
CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imuxsock")));
CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &ctrSubmit));
+ CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("lost.ratelimit"),
+ ctrType_IntCtr, &ctrLostRatelimit));
CHKiRet(statsobj.ConstructFinalize(modStats));
ENDmodInit