From 0edf73f88096f3656da466027417d04ce7a2511c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 1 Apr 2011 16:26:21 +0200 Subject: added new config directive $InputTCPFlowControl... ... to select if tcp received messages shall be flagged as light delayable or not. --- ChangeLog | 2 ++ doc/imtcp.html | 9 +++++++++ plugins/imtcp/imtcp.c | 7 ++++++- tcps_sess.c | 3 ++- tcpsrv.c | 14 ++++++++++++++ tcpsrv.h | 5 ++++- 6 files changed, 37 insertions(+), 3 deletions(-) diff --git a/ChangeLog b/ChangeLog index 7e8b77a1..33628371 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,8 @@ --------------------------------------------------------------------------- Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-03-?? - this begins a new devel branch for v5 +- added new config directive $InputTCPFlowControl to select if tcp + received messages shall be flagged as light delayable or not. - enhanced omhdfs to support batching mode. This permits to increase performance, as we now call the HDFS API with much larger message sizes and far more infrequently diff --git a/doc/imtcp.html b/doc/imtcp.html index 422bbd55..b0aaa3c1 100644 --- a/doc/imtcp.html +++ b/doc/imtcp.html @@ -57,6 +57,15 @@ instructs imtcp to emit a message if the remote peer closes a connection.
after loading imtcp, otherwise it may have no effect.
  • $InputTCPServerRun <port>
    Starts a TCP server on selected port
  • +
  • $InputTCPFlowControl <on/off>
    +This setting specifies whether some message flow control shall be exercised on the +related TCP input. If set to on, messages are handled as "light delayable", which means +the sender is throttled a bit when the queue becomes near-full. This is done in order +to preserve some queue space for inputs that can not throttle (like UDP), but it +may have some undesired effect in some configurations. Still, we consider this as +a useful setting and thus it is the default. To turn the handling off, simply +configure that explicitely. +
  • $InputTCPMaxListeners <number>
    Sets the maximum number of listeners (server ports) supported. Default is 20. This must be set before the first $InputTCPServerRun directive.
  • $InputTCPMaxSessions <number>
    Sets the maximum number of sessions supported. Default is 200. This must be set before the first $InputTCPServerRun directive
  • diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index d3e9cabe..1a62d82e 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -3,7 +3,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -88,6 +88,7 @@ static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mos static int bEmitMsgOnClose = 0; /* emit an informational message on close by remote peer */ static int iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; /* addtl frame delimiter, e.g. for netscreen, default none */ static int bDisableLFDelim = 0; /* disbale standard LF delimiter */ +static int bUseFlowControl = 1; /* use flow control, what means indicate ourselfs a "light delayable" */ static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ static uchar *pszInputName = NULL; /* value for inputname property, NULL is OK and handled by core engine */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ @@ -199,6 +200,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose)); CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose)); CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode)); + CHKiRet(tcpsrv.SetUseFlowControl(pOurTcpsrv, bUseFlowControl)); CHKiRet(tcpsrv.SetAddtlFrameDelim(pOurTcpsrv, iAddtlFrameDelim)); CHKiRet(tcpsrv.SetbDisableLFDelim(pOurTcpsrv, bDisableLFDelim)); CHKiRet(tcpsrv.SetNotificationOnRemoteClose(pOurTcpsrv, bEmitMsgOnClose)); @@ -289,6 +291,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus iTCPSessMax = 200; iTCPLstnMax = 20; iStrmDrvrMode = 0; + bUseFlowControl = 0; bEmitMsgOnClose = 0; iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; bDisableLFDelim = 0; @@ -344,6 +347,8 @@ CODEmodInit_QueryRegCFSLineHdlr eCmdHdlrGetWord, NULL, &pszInputName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverbindruleset"), 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpflowcontrol"), 0, + eCmdHdlrBinary, NULL, &bUseFlowControl, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/tcps_sess.c b/tcps_sess.c index 99af0cb8..bed598dd 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -253,7 +253,8 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); MsgSetInputName(pMsg, pThis->pLstnInfo->pInputName); - MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); + MsgSetFlowControlType(pMsg, pThis->pSrv->bUseFlowControl + ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pThis->fromHost); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); diff --git a/tcpsrv.c b/tcpsrv.c index e8d79142..3060fe05 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -716,6 +716,7 @@ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macr pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; pThis->bDisableLFDelim = 0; pThis->OnMsgReceive = NULL; + pThis->bUseFlowControl = 1; ENDobjConstruct(tcpsrv) @@ -991,6 +992,18 @@ SetLstnMax(tcpsrv_t *pThis, int iMax) } +/* set if flow control shall be supported + */ +static rsRetVal +SetUseFlowControl(tcpsrv_t *pThis, int bUseFlowControl) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, tcpsrv); + pThis->bUseFlowControl = bUseFlowControl; + RETiRet; +} + + /* set max number of sessions * this must be called before ConstructFinalize, or it will have no effect! * rgerhards, 2009-04-09 @@ -1033,6 +1046,7 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->SetAddtlFrameDelim = SetAddtlFrameDelim; pIf->SetbDisableLFDelim = SetbDisableLFDelim; pIf->SetSessMax = SetSessMax; + pIf->SetUseFlowControl = SetUseFlowControl; pIf->SetLstnMax = SetLstnMax; pIf->SetDrvrMode = SetDrvrMode; pIf->SetDrvrAuthMode = SetDrvrAuthMode; diff --git a/tcpsrv.h b/tcpsrv.h index 57bdf4b1..6c2bad45 100644 --- a/tcpsrv.h +++ b/tcpsrv.h @@ -56,6 +56,7 @@ struct tcpsrv_s { permittedPeers_t *pPermPeers;/**< driver's permitted peers */ sbool bEmitMsgOnClose; /**< emit an informational message when the remote peer closes connection */ sbool bUsingEPoll; /**< are we in epoll mode (means we do not need to keep track of sessions!) */ + sbool bUseFlowControl; /**< use flow control (make light delayable) */ int iLstnCurr; /**< max nbr of listeners currently supported */ netstrm_t **ppLstn; /**< our netstream listners */ tcpLstnPortList_t **ppLstnPort; /**< pointer to relevant listen port description */ @@ -121,8 +122,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetNotificationOnRemoteClose)(tcpsrv_t *pThis, int bNewVal); /* 2009-10-01 */ /* added v9 -- rgerhards, 2010-03-01 */ rsRetVal (*SetbDisableLFDelim)(tcpsrv_t*, int); + /* added v10 -- rgerhards, 2011-04-01 */ + rsRetVal (*SetUseFlowControl)(tcpsrv_t*, int); ENDinterface(tcpsrv) -#define tcpsrvCURR_IF_VERSION 9 /* increment whenever you change the interface structure! */ +#define tcpsrvCURR_IF_VERSION 10 /* increment whenever you change the interface structure! */ /* change for v4: * - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10 * - SetInputName() added -- rgerhards, 2008-12-10 -- cgit