diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-09-27 15:06:30 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-09-27 15:06:30 +0200 |
commit | b5da6352830e9841dd367b8490d79461adb5cb22 (patch) | |
tree | 91c0ea46bc6bcee8cffb3424b7d0d9317dd30d82 /plugins/imuxsock/imuxsock.c | |
parent | 3773aadcdd8008b97bb532f6d29fe29d17b06159 (diff) | |
download | rsyslog-b5da6352830e9841dd367b8490d79461adb5cb22.tar.gz rsyslog-b5da6352830e9841dd367b8490d79461adb5cb22.tar.xz rsyslog-b5da6352830e9841dd367b8490d79461adb5cb22.zip |
first shot at imuxsock ratelimiting
works, but at a global level, need to go down to pid or cgroup
Diffstat (limited to 'plugins/imuxsock/imuxsock.c')
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 98 |
1 files changed, 95 insertions, 3 deletions
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index df2b8efc..ccd31b44 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -81,6 +81,15 @@ DEFobjCurrIf(statsobj) statsobj_t *modStats; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) +STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit) + +struct rs_ratelimit_state { + unsigned short interval; + unsigned short burst; + unsigned done; + unsigned missed; + time_t begin; +} ratelimiter; /* structure to describe a specific listener */ typedef struct lstn_s { @@ -113,6 +122,70 @@ static int bUseCreds = 0; /* use credentials from recvmsg() and fixup PID in TA static int bUseCredsSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */ #define DFLT_bCreatePath 0 static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */ +#define DFLT_ratelimitInterval 2 +static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */ +#define DFLT_ratelimitBurst 200 +static int ratelimitBurst = DFLT_ratelimitBurst; /* max nbr of messages in interval */ + + + +static void +initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst) +{ + rs->interval = interval; + rs->burst = burst; + rs->done = 0; + rs->missed = 0; + rs->begin = 0; +} + + +/* ratelimiting support, modelled after the linux kernel + * returns 1 if message is within rate limit and shall be + * processed, 0 otherwise. + * This implementation is NOT THREAD-SAFE and must not + * be called concurrently. + */ +static inline int +withinRatelimit(struct rs_ratelimit_state *rs, time_t tt) +{ + int ret; + uchar msgbuf[1024]; + + if(rs->interval == 0) { + ret = 1; + goto finalize_it; + } + + assert(rs->burst != 0); + + if(rs->begin == 0) + rs->begin = tt; + + /* resume if we go out of out time window */ + if(tt > rs->begin + rs->interval) { + if(rs->missed) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "imuxsock lost %u messages due to rate-limiting", rs->missed); + logmsgInternal(0, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + rs->missed = 0; + } + rs->begin = 0; + rs->done = 0; + } + + /* do actual limit check */ + if(rs->burst > rs->done) { + rs->done++; + ret = 1; + } else { + rs->missed++; + ret = 0; + } + +finalize_it: + return ret; +} /* set the timestamp ignore / not ignore option for the system @@ -348,14 +421,23 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) uchar *parse; int pri; uchar bufParseTAG[CONF_TAG_MAXSIZE]; + struct syslogTime st; + time_t tt; DEFiRet; + datetime.getCurrTime(&st, &tt); + if(!withinRatelimit(&ratelimiter, tt)) { + STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit); + FINALIZE; + } + /* we now create our own message object and submit it to the queue */ - CHKiRet(msgConstruct(&pMsg)); + CHKiRet(msgConstructWithTime(&pMsg, &st, tt)); MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv); MsgSetInputName(pMsg, pInputName); MsgSetFlowControlType(pMsg, pLstn->flowCtl); + // TODO: handle format errors parse = pRcv; lenMsg = lenRcv; @@ -504,6 +586,8 @@ CODESTARTrunInput * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ + initRatelimitState(&ratelimiter, ratelimitInterval, ratelimitBurst); + while(1) { /* Add the Unix Domain Sockets to the list of read * descriptors. @@ -668,6 +752,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a bUseCreds = 0; bUseCredsSysSock = 0; bCreatePath = DFLT_bCreatePath; + ratelimitInterval = DFLT_ratelimitInterval; + ratelimitBurst = DFLT_ratelimitBurst; return RS_RET_OK; } @@ -726,10 +812,12 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &bCreatePath, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary, NULL, &bUseCreds, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, - NULL, &bUseCredsSysSock, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord, addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt, + NULL, &ratelimitInterval, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitburst", 0, eCmdHdlrInt, + NULL, &ratelimitBurst, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); /* the following one is a (dirty) trick: the system log socket is not added via @@ -742,12 +830,16 @@ CODEmodInit_QueryRegCFSLineHdlr setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary, setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary, + NULL, &bUseCredsSysSock, STD_LOADABLE_MODULE_ID)); /* support statistics gathering */ CHKiRet(statsobj.Construct(&modStats)); CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imuxsock"))); CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &ctrSubmit)); + CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("lost.ratelimit"), + ctrType_IntCtr, &ctrLostRatelimit)); CHKiRet(statsobj.ConstructFinalize(modStats)); ENDmodInit |