diff options
-rw-r--r-- | ChangeLog | 33 | ||||
-rw-r--r-- | doc/imptcp.html | 20 | ||||
-rw-r--r-- | doc/imtcp.html | 12 | ||||
-rw-r--r-- | doc/mmsnmptrapd.html | 9 | ||||
-rw-r--r-- | doc/rsconf1_omfileforcechown.html | 5 | ||||
-rw-r--r-- | plugins/imklog/imklog.c | 24 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 105 | ||||
-rw-r--r-- | plugins/imtcp/imtcp.c | 12 | ||||
-rw-r--r-- | plugins/mmsnmptrapd/mmsnmptrapd.c | 2 | ||||
-rw-r--r-- | plugins/omhdfs/javaenv.sh | 14 | ||||
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 85 | ||||
-rw-r--r-- | runtime/queue.c | 65 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 | ||||
-rw-r--r-- | runtime/strmsrv.c | 2 | ||||
-rw-r--r-- | runtime/wti.c | 5 | ||||
-rw-r--r-- | tcps_sess.c | 3 | ||||
-rw-r--r-- | tcpsrv.c | 28 | ||||
-rw-r--r-- | tcpsrv.h | 8 | ||||
-rw-r--r-- | tools/omfile.c | 51 | ||||
-rw-r--r-- | tools/ompipe.c | 18 |
20 files changed, 424 insertions, 78 deletions
@@ -1,5 +1,35 @@ +--------------------------------------------------------------------------- +Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-03-?? - imfile: added $InputFileMaxLinesAtOnce directive - enhanced imfile to support input batching +- added capability for imtcp and imptcp to activate keep-alive packets + at the socket layer. This has not been added to imttcp, as the latter is + only an experimental module, and one which did not prove to be useful. + reference: http://kb.monitorware.com/post20791.html +- added support to control KEEPALIVE settings in imptcp + this has not yet been added to imtcp, but could be done on request. +- bugfix: do not open files with full privileges, if privs will be dropped + This make the privilege drop code more bulletproof, but breaks Ubuntu's + work-around for log files created by external programs with the wrong + user and/or group. Note that it was long said that this "functionality" + would break once we go for serious privilege drop code, so hopefully + nobody still depends on it (and, if so, they lost...). +- bugfix: pipes not opened in full priv mode when privs are to be dropped +- this begins a new devel branch for v5 +- better handling of queue i/o errors in disk queues. This is kind of a + bugfix, but a very intrusive one, this it goes into the devel version + first. Right now, "file not found" is handled and leads to the new + emergency mode, in which disk action is stopped and the queue run + in direct mode. An error message is emited if this happens. +- added support for user-level PRI provided via systemd +- added new config directive $InputTCPFlowControl to select if tcp + received messages shall be flagged as light delayable or not. +- enhanced omhdfs to support batching mode. This permits to increase + performance, as we now call the HDFS API with much larger message + sizes and far more infrequently +- bugfix: failover did not work correctly if repeated msg reduction was on + affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236 --------------------------------------------------------------------------- Version 5.8.1 [V5-stable] (rgerhards), 2011-05-19 - bugfix: invalid processing in QUEUE_FULL condition @@ -214,6 +244,7 @@ Version 5.7.0 [V5-DEVEL] (rgerhards), 2010-09-16 Version 5.6.5 [V5-STABLE] (rgerhards), 2011-03-22 - bugfix: failover did not work correctly if repeated msg reduction was on affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236 - bugfix: omlibdbi did not use password from rsyslog.con closes: http://bugzilla.adiscon.com/show_bug.cgi?id=203 - bugfix(kind of): tell users that config graph can currently not be @@ -267,6 +298,8 @@ Version 5.6.2 [V5-STABLE] (rgerhards), 2010-11-30 - added the $InputFilePersistStateInterval config directive to imfile - changed imfile so that the state file is never deleted (makes imfile more robust in regard to fatal failures) +- bugfix: a slightly more informative error message when a TCP + connections is aborted --------------------------------------------------------------------------- Version 5.6.1 [V5-STABLE] (rgerhards), 2010-11-24 - bugfix(important): problem in TLS handling could cause rsyslog to loop diff --git a/doc/imptcp.html b/doc/imptcp.html index d4228185..386e691a 100644 --- a/doc/imptcp.html +++ b/doc/imptcp.html @@ -45,7 +45,25 @@ can be found at the <a href="http://www.rsyslog.com/Article321.phtml">Cisco tcp page. <li>$InputPTCPServerNotifyOnConnectionClose [on/<b>off</b>]<br> instructs imptcp to emit a message if the remote peer closes a connection.<br> -<li>$InputPTCPServerRun <port><br> +<li><b>$InputPTCPServerKeepAlive</b> <on/<b>off</b>><br> +enable of disable keep-alive packets at the tcp socket layer. The default is +to disable them.</li> +<li><b>$InputPTCPServerKeepAlive_probes</b> <number><br> +The number of unacknowledged probes to send before considering the connection dead and notifying the application layer. +The default, 0, means that the operating system defaults are used. This has only +effect if keep-alive is enabled. The functionality may not be available on +all platforms. +<li><b>$InputPTCPServerKeepAlive_intvl</b> <number><br> +The interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime. +The default, 0, means that the operating system defaults are used. This has only +effect if keep-alive is enabled. The functionality may not be available on +all platforms. +<li><b>$InputPTCPServerKeepAlive_time</b> <number><br> +The interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further. +The default, 0, means that the operating system defaults are used. This has only +effect if keep-alive is enabled. The functionality may not be available on +all platforms. +<li><b>$InputPTCPServerRun</b> <port><br> Starts a TCP server on selected port</li> <li>$InputPTCPServerInputName <name><br> Sets a name for the inputname property. If no name is set "imptcp" is used by default. Setting a diff --git a/doc/imtcp.html b/doc/imtcp.html index 422bbd55..7653f601 100644 --- a/doc/imtcp.html +++ b/doc/imtcp.html @@ -55,8 +55,20 @@ so be prepared to wrangle with that! instructs imtcp to emit a message if the remote peer closes a connection.<br> <b>Important:</b> This directive is global to all listeners and must be given right after loading imtcp, otherwise it may have no effect.</li> +<li><b>$InputTCPServerKeepAlive</b> <on/<b>off</b>><br> +enable of disable keep-alive packets at the tcp socket layer. The default is +to disable them.</li> <li><b>$InputTCPServerRun</b> <port><br> Starts a TCP server on selected port</li> +<li><b>$InputTCPFlowControl</b> <<b>on</b>/off><br> +This setting specifies whether some message flow control shall be exercised on the +related TCP input. If set to on, messages are handled as "light delayable", which means +the sender is throttled a bit when the queue becomes near-full. This is done in order +to preserve some queue space for inputs that can not throttle (like UDP), but it +may have some undesired effect in some configurations. Still, we consider this as +a useful setting and thus it is the default. To turn the handling off, simply +configure that explicitely. +</li> <li><b>$InputTCPMaxListeners</b> <number><br> Sets the maximum number of listeners (server ports) supported. Default is 20. This must be set before the first $InputTCPServerRun directive.</li> <li><b>$InputTCPMaxSessions</b> <number><br> Sets the maximum number of sessions supported. Default is 200. This must be set before the first $InputTCPServerRun directive</li> diff --git a/doc/mmsnmptrapd.html b/doc/mmsnmptrapd.html index e69bc241..699049d3 100644 --- a/doc/mmsnmptrapd.html +++ b/doc/mmsnmptrapd.html @@ -51,8 +51,11 @@ to control output modules are also available to mmsnmptrapd. <ul> <li><b>$mmsnmptrapdTag</b> [tagname]<br> tells the module which start string inside the tag to look for. The default is -"snmptrap/" -<li><b>$mmsnmptrapdSevertiyMapping</b> [severtiymap]<br> +"snmptrapd". Note that a slash is automatically added to this tag when it comes to +matching incoming messages. It MUST not be given, except if two slashes are required +for whatever reasons (so "tag/" results in a check for "tag//" at the start of +the tag field). +<li><b>$mmsnmptrapdSeverityMapping</b> [severtiymap]<br> This specifies the severity mapping table. It needs to be specified as a list. Note that due to the current config system <b>no whitespace</b> is supported inside the list, so be sure not to use any whitespace inside it.<br> @@ -76,7 +79,7 @@ severities. The default tag is used.<br> # ... other module loads and listener setup ... *.* /path/to/file/with/orignalMessage # this file receives *un*modified messages $mmsnmptrapdSeverityMapping warning/4,error/3 -*.* ::mmsnmptrapd: # *now* message is modified +*.* :mmsnmptrapd: # *now* message is modified *.* /path/to/file/with/modifiedMessage # this file receives modified messages # ... rest of config ... </textarea> diff --git a/doc/rsconf1_omfileforcechown.html b/doc/rsconf1_omfileforcechown.html index 7415a6f6..a680810b 100644 --- a/doc/rsconf1_omfileforcechown.html +++ b/doc/rsconf1_omfileforcechown.html @@ -8,7 +8,10 @@ <h2>$omfileForceChown</h2> <p><b>Type:</b> global configuration directive</p> <p><b>Parameter Values:</b> boolean (on/off, yes/no)</p> -<p><b>Available since:</b> 4.7.0+, 5.3.0+</p> +<p><b>Available:</b> 4.7.0+, 5.3.0-5.8.x, <b>NOT</b> available in 5.9.x or higher</p> +<p><b>Note: this directive has been removed and is no longer available. The +documentation is currently being retained for historical reaons.</b> Expect +it to go away at some later stage as well. <p><b>Default:</b> off</p> <p><b>Description:</b></p> <p>Forces rsyslogd to change the ownership for output files that already exist. Please note diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c index 69c8cd1a..c09fa4d8 100644 --- a/plugins/imklog/imklog.c +++ b/plugins/imklog/imklog.c @@ -186,12 +186,28 @@ rsRetVal imklogLogIntMsg(int priority, char *fmt, ...) rsRetVal Syslog(int priority, uchar *pMsg) { DEFiRet; + int pri = -1; rsRetVal localRet; - /* Output using syslog */ - localRet = parsePRI(&pMsg, &priority); - if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK) - FINALIZE; + /* first check if we have two PRIs. This can happen in case of systemd, + * in which case the second PRI is the rigth one. + * TODO: added kernel timestamp support to this PoC. -- rgerhards, 2011-03-18 + */ + if(pMsg[3] == '<') { /* could be a pri... */ + uchar *pMsgTmp = pMsg + 3; + localRet = parsePRI(&pMsgTmp, &pri); + if(localRet == RS_RET_OK && pri >= 8 && pri <= 192) { + /* *this* is our PRI */ + DBGPRINTF("imklog detected secondary PRI in klog msg\n"); + pMsg = pMsgTmp; + priority = pri; + } + } + if(pri == -1) { + localRet = parsePRI(&pMsg, &priority); + if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK) + FINALIZE; + } /* if we don't get the pri, we use whatever we were supplied */ /* ignore non-kernel messages if not permitted */ diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index e8cffbb0..8751637d 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -50,6 +50,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> +#include <netinet/tcp.h> #if HAVE_FCNTL_H #include <fcntl.h> #endif @@ -88,6 +89,10 @@ DEFobjCurrIf(ruleset) /* config settings */ typedef struct configSettings_s { + int bKeepAlive; /* support keep-alive packets */ + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; int bEmitMsgOnClose; /* emit an informational message on close by remote peer */ int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ @@ -111,13 +116,17 @@ struct ptcpsrv_s { ptcpsrv_t *pNext; /* linked list maintenance */ uchar *port; /* Port to listen to */ uchar *lstnIP; /* which IP we should listen on? */ - int bEmitMsgOnClose; int iAddtlFrameDelim; + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; uchar *pszInputName; prop_t *pInputName; /* InputName in (fast to process) property format */ ruleset_t *pRuleset; ptcplstn_t *pLstn; /* root of our listeners */ ptcpsess_t *pSess; /* root of our sessions */ + sbool bKeepAlive; /* support keep-alive packets */ + sbool bEmitMsgOnClose; }; /* the ptcp session object. Describes a single active session. @@ -429,12 +438,80 @@ finalize_it: } +/* Enable KEEPALIVE handling on the socket. */ +static inline rsRetVal +EnableKeepAlive(ptcplstn_t *pLstn, int sock) +{ + int ret; + int optval; + socklen_t optlen; + DEFiRet; + + optval = 1; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); + if(ret < 0) { + dbgprintf("EnableKeepAlive socket call returns error %d\n", ret); + ABORT_FINALIZE(RS_RET_ERR); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveProbes > 0) { + optval = pLstn->pSrv->iKeepAliveProbes; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveTime > 0) { + optval = pLstn->pSrv->iKeepAliveTime; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveIntvl > 0) { + optval = pLstn->pSrv->iKeepAliveIntvl; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored"); + } + + dbgprintf("KEEPALIVE enabled for socket %d\n", sock); + +finalize_it: + RETiRet; +} + /* accept an incoming connection request * rgerhards, 2008-04-22 */ static rsRetVal -AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP) +AcceptConnReq(ptcplstn_t *pLstn, int *newSock, prop_t **peerName, prop_t **peerIP) { int sockflags; struct sockaddr_storage addr; @@ -443,13 +520,17 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP) DEFiRet; - iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen); + iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen); if(iNewSock < 0) { if(errno == EAGAIN || errno == EWOULDBLOCK) ABORT_FINALIZE(RS_RET_NO_MORE_DATA); ABORT_FINALIZE(RS_RET_ACCEPT_ERR); } + if(pLstn->pSrv->bKeepAlive) + EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */ + + CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr*) &addr)); /* set the new socket to non-blocking IO */ @@ -883,6 +964,10 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t))); pSrv->pSess = NULL; pSrv->pLstn = NULL; + pSrv->bKeepAlive = cs.bKeepAlive; + pSrv->iKeepAliveIntvl = cs.iKeepAliveTime; + pSrv->iKeepAliveProbes = cs.iKeepAliveProbes; + pSrv->iKeepAliveTime = cs.iKeepAliveTime; pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose; pSrv->port = pNewVal; pSrv->iAddtlFrameDelim = cs.iAddtlFrameDelim; @@ -946,7 +1031,7 @@ lstnActivity(ptcplstn_t *pLstn) DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock); while(1) { - localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP); + localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP); if(localRet == RS_RET_NO_MORE_DATA) break; CHKiRet(localRet); @@ -1145,6 +1230,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; + cs.bKeepAlive = 0; + cs.iKeepAliveProbes = 0; + cs.iKeepAliveTime = 0; + cs.iKeepAliveIntvl = 0; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(cs.pszInputName); cs.pszInputName = NULL; @@ -1177,6 +1266,14 @@ CODEmodInit_QueryRegCFSLineHdlr /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0, eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index d3e9cabe..c939e1d6 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -3,7 +3,7 @@ * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -82,12 +82,14 @@ static permittedPeers_t *pPermPeersRoot = NULL; /* config settings */ +static int bKeepAlive = 0; /* support keep-alive packets */ static int iTCPSessMax = 200; /* max number of sessions */ static int iTCPLstnMax = 20; /* max number of sessions */ static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */ static int bEmitMsgOnClose = 0; /* emit an informational message on close by remote peer */ static int iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; /* addtl frame delimiter, e.g. for netscreen, default none */ static int bDisableLFDelim = 0; /* disbale standard LF delimiter */ +static int bUseFlowControl = 1; /* use flow control, what means indicate ourselfs a "light delayable" */ static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ static uchar *pszInputName = NULL; /* value for inputname property, NULL is OK and handled by core engine */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ @@ -191,6 +193,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa if(pOurTcpsrv == NULL) { CHKiRet(tcpsrv.Construct(&pOurTcpsrv)); + CHKiRet(tcpsrv.SetKeepAlive(pOurTcpsrv, bKeepAlive)); CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax)); CHKiRet(tcpsrv.SetLstnMax(pOurTcpsrv, iTCPLstnMax)); CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost)); @@ -199,6 +202,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose)); CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose)); CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode)); + CHKiRet(tcpsrv.SetUseFlowControl(pOurTcpsrv, bUseFlowControl)); CHKiRet(tcpsrv.SetAddtlFrameDelim(pOurTcpsrv, iAddtlFrameDelim)); CHKiRet(tcpsrv.SetbDisableLFDelim(pOurTcpsrv, bDisableLFDelim)); CHKiRet(tcpsrv.SetNotificationOnRemoteClose(pOurTcpsrv, bEmitMsgOnClose)); @@ -287,8 +291,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { iTCPSessMax = 200; + bKeepAlive = 0; iTCPLstnMax = 20; iStrmDrvrMode = 0; + bUseFlowControl = 0; bEmitMsgOnClose = 0; iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; bDisableLFDelim = 0; @@ -324,6 +330,8 @@ CODEmodInit_QueryRegCFSLineHdlr /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &bKeepAlive, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt, NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt, @@ -344,6 +352,8 @@ CODEmodInit_QueryRegCFSLineHdlr eCmdHdlrGetWord, NULL, &pszInputName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverbindruleset"), 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpflowcontrol"), 0, + eCmdHdlrBinary, NULL, &bUseFlowControl, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c index 767829d6..b78046ee 100644 --- a/plugins/mmsnmptrapd/mmsnmptrapd.c +++ b/plugins/mmsnmptrapd/mmsnmptrapd.c @@ -418,7 +418,7 @@ CODEmodInit_QueryRegCFSLineHdlr cs.pszTagName = NULL; cs.pszSeverityMapping = NULL; - CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrInt, + CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrGetWord, NULL, &cs.pszTagName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdseveritymapping", 0, eCmdHdlrGetWord, NULL, &cs.pszSeverityMapping, STD_LOADABLE_MODULE_ID)); diff --git a/plugins/omhdfs/javaenv.sh b/plugins/omhdfs/javaenv.sh new file mode 100644 index 00000000..d07a8473 --- /dev/null +++ b/plugins/omhdfs/javaenv.sh @@ -0,0 +1,14 @@ +# This is a sample file for environment settings on Fedora 13 +# that made me compile & run omhdfs. I really *hate* the way +# java uses environment variables... Hopefully this file will +# help building and testing omhdfs in the future (there is also +# some more information in the rsyslog wiki). +# rgerhards, 2011-03-11 +# this now works, but don't ask my why ;) +#export JAVA_HOME=/usr/java/jdk1.6.0_21/bin/java +export PATH=/usr/java/jdk1.6.0_21/bin:$PATH +export JAVA_INCLUDES="-I/usr/java/jdk1.6.0_21/include -I/usr/java/jdk1.6.0_21/include/linux" +export JAVA_LIBS="-L/usr/java/jdk1.6.0_21/jre/lib/i386 -ljava -ljvm -lverify" +export HADOOP_HOME=/usr/lib/hadoop +export CLASSPATH=/usr/lib/jvm/java-6-sun/lib:/usr/lib/hadoop/lib:/usr/lib/hadoop/hadoop-ant-0.20.2+320.jar:/usr/lib/hadoop/hadoop-core-0.20.2+320.jar:/usr/lib/hadoop/hadoop-examples-0.20.2+320.jar:/usr/lib/hadoop/hadoop-test-0.20.2+320.jar:/usr/lib/hadoop/hadoop-tools-0.20.2+320.jar/usr/lib/hadoop/lib/commons-cli-1.2.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-el-1.0.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/core-3.1.1.jar:/usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar:/usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar:/usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar:/usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar:/usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar:/usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-6.1.14.jar:/usr/lib/hadoop/lib/jetty-util-6.1.14.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.2.2.jar:/usr/lib/hadoop/lib/libfb303.jar:/usr/lib/hadoop/lib/libthrift.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mockito-all-1.8.2.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf +###export CLASSPATH="/usr/lib/hadoop/hadoop-0.20.2+320-ant.jar: /usr/lib/hadoop/hadoop-0.20.2+320-core.jar: /usr/lib/hadoop/hadoop-0.20.2+320-examples.jar: /usr/lib/hadoop/hadoop-0.20.2+320-test.jar: /usr/lib/hadoop/hadoop-0.20.2+320-tools.jar: /usr/lib/hadoop/hadoop-ant-0.20.2+320.jar: /usr/lib/hadoop/hadoop-core-0.20.2+320.jar: /usr/lib/hadoop/hadoop-examples-0.20.2+320.jar: /usr/lib/hadoop/hadoop-test-0.20.2+320.jar: /usr/lib/hadoop/hadoop-tools-0.20.2+320.jar:/usr/lib/hadoop/lib: /usr/lib/hadoop/lib/commons-cli-1.2.jar: /usr/lib/hadoop/lib/commons-codec-1.3.jar: /usr/lib/hadoop/lib/commons-el-1.0.jar: /usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar: /usr/lib/hadoop/lib/commons-logging-1.0.4.jar: /usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar: /usr/lib/hadoop/lib/commons-net-1.4.1.jar: /usr/lib/hadoop/lib/core-3.1.1.jar: /usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar: /usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar: /usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar: /usr/lib/hadoop/lib/hsqldb.jar: /usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar: /usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar: /usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar: /usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar: /usr/lib/hadoop/lib/jets3t-0.6.1.jar: /usr/lib/hadoop/lib/jetty-6.1.14.jar: /usr/lib/hadoop/lib/jetty-util-6.1.14.jar: /usr/lib/hadoop/lib/junit-4.5.jar: /usr/lib/hadoop/lib/kfs-0.2.2.jar: /usr/lib/hadoop/lib/libfb303.jar: /usr/lib/hadoop/lib/libthrift.jar: /usr/lib/hadoop/lib/log4j-1.2.15.jar: /usr/lib/hadoop/lib/mockito-all-1.8.2.jar: /usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar: /usr/lib/hadoop/lib/oro-2.0.8.jar: /usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar: /usr/lib/hadoop/lib/slf4j-api-1.4.3.jar: /usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar: /usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib" diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index 8b72747f..76128a4e 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -80,6 +80,8 @@ typedef struct { typedef struct _instanceData { file_t *pFile; + uchar ioBuf[64*1024]; + unsigned offsBuf; } instanceData; /* forward definitions (down here, need data types) */ @@ -260,7 +262,8 @@ fileOpen(file_t *pFile) if(errno == ENOENT) { DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n", pFile->name); - pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); + pFile->fh = hdfsOpenFile(pFile->fs, + (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0); } } if(pFile->fh == NULL) { @@ -275,12 +278,15 @@ finalize_it: } +/* Note: lenWrite is reset to zero on successful write! */ static inline rsRetVal -fileWrite(file_t *pFile, uchar *buf) +fileWrite(file_t *pFile, uchar *buf, size_t *lenWrite) { - size_t lenWrite; DEFiRet; + if(*lenWrite == 0) + FINALIZE; + if(pFile->nUsers > 1) d_pthread_mutex_lock(&pFile->mut); @@ -294,18 +300,18 @@ fileWrite(file_t *pFile, uchar *buf) } } - lenWrite = strlen((char*) buf); - tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite); - if((unsigned) num_written_bytes != lenWrite) { - errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, " - "written %lu\n", pFile->name, (unsigned long) lenWrite, +dbgprintf("XXXXX: omhdfs writing %u bytes\n", *lenWrite); + tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, *lenWrite); + if((unsigned) num_written_bytes != *lenWrite) { + errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, + "omhdfs: failed to write %s, expected %lu bytes, " + "written %lu\n", pFile->name, (unsigned long) *lenWrite, (unsigned long) num_written_bytes); ABORT_FINALIZE(RS_RET_SUSPENDED); } + *lenWrite = 0; finalize_it: - if(pFile->nUsers > 1) - d_pthread_mutex_unlock(&pFile->mut); RETiRet; } @@ -333,6 +339,40 @@ finalize_it: /* ---END FILE OBJECT---------------------------------------------------- */ +/* This adds data to the output buffer and performs an actual write + * if the new data does not fit into the buffer. Note that we never write + * partial data records. Other actions may write into the same file, and if + * we would write partial records, data could become severely mixed up. + * Note that we must check of some new data arrived is large than our + * buffer. In that case, the new data will written with its own + * write operation. + */ +static inline rsRetVal +addData(instanceData *pData, uchar *buf) +{ + unsigned len; + DEFiRet; + + len = strlen((char*)buf); + if(pData->offsBuf + len < sizeof(pData->ioBuf)) { + /* new data fits into remaining buffer */ + memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len); + pData->offsBuf += len; + } else { +dbgprintf("XXXXX: not enough room, need to flush\n"); + CHKiRet(fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf)); + if(len >= sizeof(pData->ioBuf)) { + CHKiRet(fileWrite(pData->pFile, buf, &len)); + } else { + memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len); + pData->offsBuf += len; + } + } + + iRet = RS_RET_DEFER_COMMIT; +finalize_it: + RETiRet; +} BEGINcreateInstance CODESTARTcreateInstance @@ -358,13 +398,31 @@ CODESTARTtryResume } ENDtryResume + +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omhdfs: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction - DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name); - iRet = fileWrite(pData->pFile, ppString[0]); + DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name); + iRet = addData(pData, ppString[0]); +dbgprintf("omhdfs: done doAction\n"); ENDdoAction +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("omhdfs: endTransaction\n"); + if(pData->offsBuf != 0) { + DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n"); + iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf); + } +ENDendTransaction + + BEGINparseSelectorAct file_t *pFile; int r; @@ -409,6 +467,7 @@ CODESTARTparseSelectorAct } fileObjAddUser(pFile); pData->pFile = pFile; + pData->offsBuf = 0; CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -455,6 +514,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ CODEqueryEtryPt_doHUP ENDqueryEtryPt @@ -472,5 +532,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + DBGPRINTF("omhdfs: module compiled with rsyslog version %s.\n", VERSION); CODEmodInit_QueryRegCFSLineHdlr ENDmodInit diff --git a/runtime/queue.c b/runtime/queue.c index 88e01a7a..d31a4551 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -83,6 +83,11 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr); +static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); +static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); +static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); +static rsRetVal qDestructDisk(qqueue_t *pThis); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -583,6 +588,47 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis) /* -------------------- disk -------------------- */ +/* The following function is used to "save" ourself from being killed by + * a fatally failed disk queue. A fatal failure is, for example, if no + * data can be read or written. In that case, the disk support is disabled, + * with all on-disk structures kept as-is as much as possible. Instead, the + * queue is switched to direct mode, so that at least + * some processing can happen. Of course, this may still have lots of + * undesired side-effects, but is probably better than aborting the + * syslogd. Note that this function *must* succeed in one way or another, as + * we can not recover from failure here. But it may emit different return + * states, which can trigger different processing in the higher layers. + * rgerhards, 2011-05-03 + */ +static inline rsRetVal +queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) +{ + pThis->iQueueSize = 0; + pThis->nLogDeq = 0; + qDestructDisk(pThis); /* free disk structures */ + + pThis->qType = QUEUETYPE_DIRECT; + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + pThis->qAdd = qAddDirect; + pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; + if(pThis->pqParent != NULL) { + DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); + pThis->pqParent->bIsDA = 0; + pThis->pqParent->pqDA = NULL; + /* This may have undesired side effects, not sure if I really evaluated + * all. So you know where to look at if you come to this point during + * troubleshooting ;) -- rgerhards, 2011-05-03 + */ + } + + errmsg.LogError(0, initiatingError, "fatal error on disk queue '%s', emergency switch to direct mode", + obj.GetName((obj_t*) pThis)); + return RS_RET_ERR_QUEUE_EMERGENCY; +} + + static rsRetVal qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis) { @@ -785,10 +831,7 @@ finalize_it: static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) { DEFiRet; - - CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL)); - -finalize_it: + iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); RETiRet; } @@ -1683,7 +1726,18 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueForConsumer(pThis, pWti)); + iRet = DequeueForConsumer(pThis, pWti); + if(iRet == RS_RET_FILE_NOT_FOUND) { + /* This is a fatal condition and means the queue is almost unusable */ + d_pthread_mutex_unlock(pThis->mut); + DBGOPRINT((obj_t*) pThis, "got 'file not found' error %d, queue defunct\n", iRet); + iRet = queueSwitchToEmergencyMode(pThis, iRet); + // TODO: think about what to return as iRet -- keep RS_RET_FILE_NOT_FOUND? + d_pthread_mutex_lock(pThis->mut); + } + if (iRet != RS_RET_OK) { + FINALIZE; + } /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); @@ -1774,7 +1828,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) { DEFiRet; -//DBGPRINTF("XXXX: chkStopWrkrDA called, low watermark %d, phys Size %d\n", pThis->iLowWtrMrk, getPhysicalQueueSize(pThis)); if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; } diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index d63dbe4f..d7f785f2 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -342,6 +342,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */ RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */ RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */ + RS_RET_ERR_QUEUE_EMERGENCY = -2182, /**< some fatal error caused queue to switch to emergency mode */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ diff --git a/runtime/strmsrv.c b/runtime/strmsrv.c index f448cd4f..03691de6 100644 --- a/runtime/strmsrv.c +++ b/runtime/strmsrv.c @@ -767,7 +767,7 @@ static rsRetVal SetKeepAlive(strmsrv_t *pThis, int iVal) { DEFiRet; - dbgprintf("keep-alive set to %d\n", iVal); + dbgprintf("strmsrv: keep-alive set to %d\n", iVal); pThis->bUseKeepAlive = iVal; RETiRet; } diff --git a/runtime/wti.c b/runtime/wti.c index 9343f5c5..ec56c2d5 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -314,7 +314,10 @@ wtiWorker(wti_t *pThis) */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); - if(localRet == RS_RET_IDLE) { + if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { + d_pthread_mutex_unlock(pWtp->pmutUsr); + break; /* end of loop */ + } else if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ diff --git a/tcps_sess.c b/tcps_sess.c index 99af0cb8..bed598dd 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -253,7 +253,8 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); MsgSetInputName(pMsg, pThis->pLstnInfo->pInputName); - MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); + MsgSetFlowControlType(pMsg, pThis->pSrv->bUseFlowControl + ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pThis->fromHost); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP)); @@ -396,6 +396,10 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED); } + if(pThis->bUseKeepAlive) { + CHKiRet(netstrm.EnableKeepAlive(pNewStrm)); + } + /* we found a free spot and can construct our session object */ CHKiRet(tcps_sess.Construct(&pSess)); CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis)); @@ -717,6 +721,7 @@ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macr pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; pThis->bDisableLFDelim = 0; pThis->OnMsgReceive = NULL; + pThis->bUseFlowControl = 1; ENDobjConstruct(tcpsrv) @@ -863,6 +868,15 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr) } static rsRetVal +SetKeepAlive(tcpsrv_t *pThis, int iVal) +{ + DEFiRet; + dbgprintf("tcpsrv: keep-alive set to %d\n", iVal); + pThis->bUseKeepAlive = iVal; + RETiRet; +} + +static rsRetVal SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)) { DEFiRet; @@ -992,6 +1006,18 @@ SetLstnMax(tcpsrv_t *pThis, int iMax) } +/* set if flow control shall be supported + */ +static rsRetVal +SetUseFlowControl(tcpsrv_t *pThis, int bUseFlowControl) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, tcpsrv); + pThis->bUseFlowControl = bUseFlowControl; + RETiRet; +} + + /* set max number of sessions * this must be called before ConstructFinalize, or it will have no effect! * rgerhards, 2009-04-09 @@ -1029,11 +1055,13 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; + pIf->SetKeepAlive = SetKeepAlive; pIf->SetUsrP = SetUsrP; pIf->SetInputName = SetInputName; pIf->SetAddtlFrameDelim = SetAddtlFrameDelim; pIf->SetbDisableLFDelim = SetbDisableLFDelim; pIf->SetSessMax = SetSessMax; + pIf->SetUseFlowControl = SetUseFlowControl; pIf->SetLstnMax = SetLstnMax; pIf->SetDrvrMode = SetDrvrMode; pIf->SetDrvrAuthMode = SetDrvrAuthMode; @@ -48,6 +48,7 @@ struct tcpLstnPortList_s { /* the tcpsrv object */ struct tcpsrv_s { BEGINobjInstance; /**< Data to implement generic object - MUST be the first data element! */ + int bUseKeepAlive; /**< use socket layer KEEPALIVE handling? */ netstrms_t *pNS; /**< pointer to network stream subsystem */ int iDrvrMode; /**< mode of the stream driver to use */ uchar *pszDrvrAuthMode; /**< auth mode of the stream driver to use */ @@ -56,6 +57,7 @@ struct tcpsrv_s { permittedPeers_t *pPermPeers;/**< driver's permitted peers */ sbool bEmitMsgOnClose; /**< emit an informational message when the remote peer closes connection */ sbool bUsingEPoll; /**< are we in epoll mode (means we do not need to keep track of sessions!) */ + sbool bUseFlowControl; /**< use flow control (make light delayable) */ int iLstnCurr; /**< max nbr of listeners currently supported */ netstrm_t **ppLstn; /**< our netstream listners */ tcpLstnPortList_t **ppLstnPort; /**< pointer to relevant listen port description */ @@ -121,8 +123,12 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetNotificationOnRemoteClose)(tcpsrv_t *pThis, int bNewVal); /* 2009-10-01 */ /* added v9 -- rgerhards, 2010-03-01 */ rsRetVal (*SetbDisableLFDelim)(tcpsrv_t*, int); + /* added v10 -- rgerhards, 2011-04-01 */ + rsRetVal (*SetUseFlowControl)(tcpsrv_t*, int); + /* added v11 -- rgerhards, 2011-05-09 */ + rsRetVal (*SetKeepAlive)(tcpsrv_t*, int); ENDinterface(tcpsrv) -#define tcpsrvCURR_IF_VERSION 9 /* increment whenever you change the interface structure! */ +#define tcpsrvCURR_IF_VERSION 11 /* increment whenever you change the interface structure! */ /* change for v4: * - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10 * - SetInputName() added -- rgerhards, 2008-12-10 diff --git a/tools/omfile.c b/tools/omfile.c index 08f965b3..7585ea8c 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -122,13 +122,11 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry; #define USE_ASYNCWRITER_DFLT 0 /* default buffer use async writer */ #define FLUSHONTX_DFLT 1 /* default for flush on TX end */ -#define DFLT_bForceChown 0 /* globals for default values */ static int iDynaFileCacheSize = 10; /* max cache for dynamic files */ static int fCreateMode = 0644; /* mode to use when creating files */ static int fDirCreateMode = 0700; /* mode to use when creating files */ static int bFailOnChown; /* fail if chown fails? */ -static int bForceChown = DFLT_bForceChown; /* Force chown() on existing files? */ static uid_t fileUID; /* UID to be used for newly created files */ static uid_t fileGID; /* GID to be used for newly created files */ static uid_t dirUID; /* UID to be used for newly created directories */ @@ -153,7 +151,6 @@ typedef struct _instanceData { int fDirCreateMode; /* creation mode for mkdir() */ int bCreateDirs; /* auto-create directories? */ int bSyncFile; /* should the file by sync()'ed? 1- yes, 0- no */ - sbool bForceChown; /* force chown() on existing files? */ uid_t fileUID; /* IDs for creation */ uid_t dirUID; gid_t fileGID; @@ -200,7 +197,6 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tfile cache size=%d\n", pData->iDynaFileCacheSize); dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "yes" : "no"); dbgprintf("\tfile owner %d, group %d\n", (int) pData->fileUID, (int) pData->fileGID); - dbgprintf("\tforce chown() for all files: %s\n", pData->bForceChown ? "yes" : "no"); dbgprintf("\tdirectory owner %d, group %d\n", (int) pData->dirUID, (int) pData->dirGID); dbgprintf("\tdir create mode 0%3.3o, file create mode 0%3.3o\n", pData->fDirCreateMode, pData->fCreateMode); @@ -239,6 +235,12 @@ rsRetVal setDynaFileCacheSize(void __attribute__((unused)) *pVal, int iNewVal) } +rsRetVal goneAway(void __attribute__((unused)) *pVal, int iNewVal) +{ + errmsg.LogError(0, RS_RET_ERR, "directive $omfileForceChown is no longer supported"); +} + + /* Helper to cfline(). Parses a output channel name up until the first * comma and then looks for the template specifier. Tries * to find that template. Maps the output channel to the @@ -388,22 +390,7 @@ prepareFile(instanceData *pData, uchar *newFileName) int fd; DEFiRet; - if(access((char*)newFileName, F_OK) == 0) { - if(pData->bForceChown) { - /* Try to fix wrong ownership set by someone else. Note that this code - * will no longer work once we have made the $PrivDrop code fully secure. - * This change is based on an idea of Michael Terry, provided as part of - * the effort to make rsyslogd the Ubuntu default syslogd. - * rgerhards, 2009-09-11 - */ - if(chown((char*)newFileName, pData->fileUID, pData->fileGID) != 0) { - if(pData->bFailOnChown) { - int eSave = errno; - errno = eSave; - } - } - } - } else { + if(access((char*)newFileName, F_OK) != 0) { /* file does not exist, create it (and eventually parent directories */ if(pData->bCreateDirs) { /* We first need to create parent dirs if they are missing. @@ -423,7 +410,7 @@ prepareFile(instanceData *pData, uchar *newFileName) pData->fCreateMode); if(fd != -1) { /* check and set uid/gid */ - if(pData->bForceChown || pData->fileUID != (uid_t)-1 || pData->fileGID != (gid_t) -1) { + if(pData->fileUID != (uid_t)-1 || pData->fileGID != (gid_t) -1) { /* we need to set owner/group */ if(fchown(fd, pData->fileUID, pData->fileGID) != 0) { if(pData->bFailOnChown) { @@ -473,6 +460,9 @@ prepareFile(instanceData *pData, uchar *newFileName) CHKiRet(strm.ConstructFinalize(pData->pStrm)); finalize_it: + if(pData->pStrm == NULL) { + DBGPRINTF("Error opening log file: %s\n", pData->f_fname); + } RETiRet; } @@ -647,6 +637,9 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) } else { /* "regular", non-dynafile */ if(pData->pStrm == NULL) { CHKiRet(prepareFile(pData, pData->f_fname)); + if(pData->pStrm == NULL) { + errmsg.LogError(0, RS_RET_NO_FILE_ACCESS, "Could no open output file '%s'", pData->f_fname); + } } } @@ -790,7 +783,6 @@ CODESTARTparseSelectorAct pData->fDirCreateMode = fDirCreateMode; pData->bCreateDirs = bCreateDirs; pData->bFailOnChown = bFailOnChown; - pData->bForceChown = bForceChown; pData->fileUID = fileUID; pData->fileGID = fileGID; pData->dirUID = dirUID; @@ -800,18 +792,6 @@ CODESTARTparseSelectorAct pData->iIOBufSize = (int) iIOBufSize; pData->iFlushInterval = iFlushInterval; pData->bUseAsyncWriter = bUseAsyncWriter; - - if(pData->bDynamicName == 0) { - /* try open and emit error message if not possible. At this stage, we ignore the - * return value of prepareFile, this is taken care of in later steps. - */ - prepareFile(pData, pData->f_fname); - - if(pData->pStrm == NULL) { - DBGPRINTF("Error opening log file: %s\n", pData->f_fname); - errmsg.LogError(0, RS_RET_NO_FILE_ACCESS, "Could no open output file '%s'", pData->f_fname); - } - } CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -826,7 +806,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a dirUID = -1; dirGID = -1; bFailOnChown = 1; - bForceChown = DFLT_bForceChown; iDynaFileCacheSize = 10; fCreateMode = 0644; fDirCreateMode = 0700; @@ -901,7 +880,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr((uchar *)"filecreatemode", 0, eCmdHdlrFileCreateMode, NULL, &fCreateMode, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"createdirs", 0, eCmdHdlrBinary, NULL, &bCreateDirs, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"failonchownfailure", 0, eCmdHdlrBinary, NULL, &bFailOnChown, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"omfileForceChown", 0, eCmdHdlrBinary, NULL, &bForceChown, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omfileforcechown", 0, eCmdHdlrBinary, goneAway, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionfileenablesync", 0, eCmdHdlrBinary, NULL, &bEnableSync, STD_LOADABLE_MODULE_ID)); CHKiRet(regCfSysLineHdlr((uchar *)"actionfiledefaulttemplate", 0, eCmdHdlrGetWord, NULL, &pszFileDfltTplName, NULL)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); diff --git a/tools/ompipe.c b/tools/ompipe.c index 58725fb9..cbedb559 100644 --- a/tools/ompipe.c +++ b/tools/ompipe.c @@ -72,6 +72,7 @@ DEFobjCurrIf(errmsg) typedef struct _instanceData { uchar f_fname[MAXFNAME];/* pipe or template name (display only) */ short fd; /* pipe descriptor for (current) pipe */ + sbool bHadError; /* did we already have/report an error on this pipe? */ } instanceData; @@ -101,6 +102,17 @@ preparePipe(instanceData *pData) { DEFiRet; pData->fd = open((char*) pData->f_fname, O_RDWR|O_NONBLOCK|O_CLOEXEC); + if(pData->fd < 0 ) { + pData->fd = -1; + if(!pData->bHadError) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_NO_FILE_ACCESS, "Could no open output pipe '%s': %s", + pData->f_fname, errStr); + pData->bHadError = 1; + } + DBGPRINTF("Error opening log pipe: %s\n", pData->f_fname); + } RETiRet; } @@ -150,6 +162,7 @@ finalize_it: BEGINcreateInstance CODESTARTcreateInstance pData->fd = -1; + pData->bHadError = 0; ENDcreateInstance @@ -204,11 +217,6 @@ CODESTARTparseSelectorAct */ preparePipe(pData); - if(pData->fd < 0 ) { - pData->fd = -1; - DBGPRINTF("Error opening log pipe: %s\n", pData->f_fname); - errmsg.LogError(0, RS_RET_NO_FILE_ACCESS, "Could no open output pipe '%s'", pData->f_fname); - } CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct |