summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-06-27 12:33:26 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2011-06-27 12:33:26 +0200
commit8488d8c3c1e65cb4dacb1dddc71c9186ec9f8f37 (patch)
tree9f612b2808a1590e48cd0f43cb85efca3bb6f83f
parent2bd4e10a4dc909346d5a010edefb12c65ed77aec (diff)
parent47729f3b9362f7956c936088ac4bb703633cb33b (diff)
downloadrsyslog-8488d8c3c1e65cb4dacb1dddc71c9186ec9f8f37.tar.gz
rsyslog-8488d8c3c1e65cb4dacb1dddc71c9186ec9f8f37.tar.xz
rsyslog-8488d8c3c1e65cb4dacb1dddc71c9186ec9f8f37.zip
Merge branch 'v5-devel'
Conflicts: ChangeLog configure.ac doc/manual.html plugins/imfile/imfile.c plugins/imklog/imklog.c plugins/imptcp/imptcp.c plugins/imtcp/imtcp.c plugins/imuxsock/imuxsock.c plugins/mmsnmptrapd/mmsnmptrapd.c tools/omfile.c
-rw-r--r--ChangeLog82
-rw-r--r--action.c7
-rw-r--r--configure.ac2
-rw-r--r--doc/imfile.html11
-rw-r--r--doc/imptcp.html20
-rw-r--r--doc/imtcp.html3
-rw-r--r--doc/imuxsock.html12
-rw-r--r--doc/mmsnmptrapd.html9
-rw-r--r--plugins/imfile/imfile.c23
-rw-r--r--plugins/imklog/bsd.c6
-rw-r--r--plugins/imklog/imklog.c23
-rw-r--r--plugins/imklog/imklog.h4
-rw-r--r--plugins/imklog/linux.c113
-rw-r--r--plugins/imklog/solaris.c68
-rw-r--r--plugins/imptcp/imptcp.c113
-rw-r--r--plugins/imtcp/imtcp.c13
-rw-r--r--plugins/imuxsock/imuxsock.c85
-rw-r--r--plugins/mmsnmptrapd/mmsnmptrapd.c2
-rw-r--r--runtime/datetime.c77
-rw-r--r--runtime/datetime.h5
-rw-r--r--runtime/queue.c65
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--runtime/strmsrv.c2
-rw-r--r--runtime/wti.c5
-rw-r--r--tcpsrv.c14
-rw-r--r--tcpsrv.h5
-rw-r--r--tests/inputfilegen.c1
-rw-r--r--tests/testsuites/imfile-basic.conf1
28 files changed, 585 insertions, 187 deletions
diff --git a/ChangeLog b/ChangeLog
index a7a5997b..52d5e4dd 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,22 @@
---------------------------------------------------------------------------
Version 6.3.2 [DEVEL] (rgerhards), 2011-06-??
+- added support for obtaining timestamp for kernel message from message
+ If the kernel time-stamps messages, time is now take from that
+ timestamp instead of the system time when the message was read. This
+ provides much better accuracy. Thanks to Lennart Poettering for
+ suggesting this feature and his help during implementation.
+- added support for obtaining timestamp from system for imuxsock
+ This permits to read the time a message was submitted to the system
+ log socket. Most importantly, this is provided in microsecond resolution.
+ So we are able to obtain high precision timestampis even for messages
+ that were - as is usual - not formatted with them. This also simplifies
+ things in regard to local time calculation in chroot environments.
+ Many thanks to Lennart Poettering for suggesting this feature,
+ providing some guidance on implementing it and coordinating getting the
+ necessary support into the Linux kernel.
+- bugfix: timestamp was incorrectly calculated for timezones with minute
+ offset
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=271
- bugfix: memory leak in imtcp & subsystems under some circumstances
This leak is tied to error conditions which lead to incorrect cleanup
of some data structures.
@@ -27,19 +44,6 @@ Version 6.3.0 [DEVEL] (rgerhards), 2011-06-01
http://blog.gerhards.net/2011/06/new-rsyslog-config-system-materializes.html
---------------------------------------------------------------------------
Version 6.1.9 [BETA] (rgerhards), 2011-06-14
-- bugfix: problems in failover action handling
- closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270
- closes: http://bugzilla.adiscon.com/show_bug.cgi?id=254
-- bugfix: mutex was invalidly left unlocked during action processing
- At least one case where this can occur is during thread shutdown, which
- may be initiated by lower activity. In most cases, this is quite
- unlikely to happen. However, if it does, data structures may be
- corrupted which could lead to fatal failure and segfault. I detected
- this via a testbench test, not a user report. But I assume that some
- users may have had unreproducable aborts that were cause by this bug.
-- bugfix/improvement:$WorkDirectory now gracefully handles trailing slashes
----------------------------------------------------------------------------
-Version 6.1.9 [BETA] (rgerhards), 2011-06-14
- bugfix: memory leak in imtcp & subsystems under some circumstances
This leak is tied to error conditions which lead to incorrect cleanup
of some data structures. [backport from v6.3]
@@ -217,7 +221,50 @@ expected that interfaces, even new ones, break during the initial
syslog plain tcp input plugin (NOT supporting TLS!)
[ported from v4]
---------------------------------------------------------------------------
-Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-03-??
+Version 5.9.1 [V5-DEVEL] (rgerhards), 2011-06-??
+- added support for obtaining timestamp for kernel message from message
+ If the kernel time-stamps messages, time is now take from that
+ timestamp instead of the system time when the message was read. This
+ provides much better accuracy. Thanks to Lennart Poettering for
+ suggesting this feature and his help during implementation.
+- added support for obtaining timestamp from system for imuxsock
+ This permits to read the time a message was submitted to the system
+ log socket. Most importantly, this is provided in microsecond resolution.
+ So we are able to obtain high precision timestampis even for messages
+ that were - as is usual - not formatted with them. This also simplifies
+ things in regard to local time calculation in chroot environments.
+ Many thanks to Lennart Poettering for suggesting this feature,
+ providing some guidance on implementing it and coordinating getting the
+ necessary support into the Linux kernel.
+- bugfix: timestamp was incorrectly calculated for timezones with minute
+ offset
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=271
+- bugfix: problems in failover action handling
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=254
+- bugfix: mutex was invalidly left unlocked during action processing
+ At least one case where this can occur is during thread shutdown, which
+ may be initiated by lower activity. In most cases, this is quite
+ unlikely to happen. However, if it does, data structures may be
+ corrupted which could lead to fatal failure and segfault. I detected
+ this via a testbench test, not a user report. But I assume that some
+ users may have had unreproducable aborts that were cause by this bug.
+- bugfix: memory leak in imtcp & subsystems under some circumstances
+ This leak is tied to error conditions which lead to incorrect cleanup
+ of some data structures. [backport from v6]
+- bugfix/improvement:$WorkDirectory now gracefully handles trailing slashes
+---------------------------------------------------------------------------
+Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-06-08
+- imfile: added $InputFileMaxLinesAtOnce directive
+- enhanced imfile to support input batching
+- added capability for imtcp and imptcp to activate keep-alive packets
+ at the socket layer. This has not been added to imttcp, as the latter is
+ only an experimental module, and one which did not prove to be useful.
+ reference: http://kb.monitorware.com/post20791.html
+- added support to control KEEPALIVE settings in imptcp
+ this has not yet been added to imtcp, but could be done on request.
+- $ActionName is now also used for naming of queues in impstats
+ as well as in the debug output
- bugfix: do not open files with full privileges, if privs will be dropped
This make the privilege drop code more bulletproof, but breaks Ubuntu's
work-around for log files created by external programs with the wrong
@@ -226,6 +273,11 @@ Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-03-??
nobody still depends on it (and, if so, they lost...).
- bugfix: pipes not opened in full priv mode when privs are to be dropped
- this begins a new devel branch for v5
+- better handling of queue i/o errors in disk queues. This is kind of a
+ bugfix, but a very intrusive one, this it goes into the devel version
+ first. Right now, "file not found" is handled and leads to the new
+ emergency mode, in which disk action is stopped and the queue run
+ in direct mode. An error message is emited if this happens.
- added support for user-level PRI provided via systemd
- added new config directive $InputTCPFlowControl to select if tcp
received messages shall be flagged as light delayable or not.
@@ -519,6 +571,8 @@ Version 5.6.2 [V5-STABLE] (rgerhards), 2010-11-30
- added the $InputFilePersistStateInterval config directive to imfile
- changed imfile so that the state file is never deleted (makes imfile
more robust in regard to fatal failures)
+- bugfix: a slightly more informative error message when a TCP
+ connections is aborted
---------------------------------------------------------------------------
Version 5.6.1 [V5-STABLE] (rgerhards), 2010-11-24
- bugfix(important): problem in TLS handling could cause rsyslog to loop
diff --git a/action.c b/action.c
index 7a372c0a..0fb660bb 100644
--- a/action.c
+++ b/action.c
@@ -327,7 +327,12 @@ actionConstructFinalize(action_t *pThis)
ASSERT(pThis != NULL);
/* find a name for our queue */
- snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ if(pThis->pszName == NULL) {
+ snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ } else {
+ ustrncpy(pszQName, pThis->pszName, sizeof(pszQName));
+ pszQName[63] = '\0'; /* to be on the save side */
+ }
/* now check if we can run the action in "firehose mode" during stage one of
* its processing (that is before messages are enqueued into the action q).
diff --git a/configure.ac b/configure.ac
index b659d291..b0e71c20 100644
--- a/configure.ac
+++ b/configure.ac
@@ -122,6 +122,8 @@ AC_CHECK_FUNCS([flock basename alarm clock_gettime gethostbyname gethostname get
# let me know! -- rgerhards, 2010-10-06
AC_CHECK_DECL([SCM_CREDENTIALS], [AC_DEFINE(HAVE_SCM_CREDENTIALS, [1], [set define])], [], [#include <sys/types.h>
#include <sys/socket.h>])
+AC_CHECK_DECL([SO_TIMESTAMP], [AC_DEFINE(HAVE_SO_TIMESTAMP, [1], [set define])], [], [#include <sys/types.h>
+#include <sys/socket.h>])
# Check for MAXHOSTNAMELEN
AC_MSG_CHECKING(for MAXHOSTNAMELEN)
diff --git a/doc/imfile.html b/doc/imfile.html
index 60726ceb..60bbbeea 100644
--- a/doc/imfile.html
+++ b/doc/imfile.html
@@ -98,9 +98,16 @@ performance, especially when set to a low value. Frequently writing the state
file is very time consuming.
<li><b>$InputFileReadMode</b> [mode]</b><br>
Available in 5.7.5+
+<li><b>$InputFileMaxLinesAtOnce</b> [number]</b><br>
+Available in 5.9.0+
<br>
-Mode to be used when reading lines. 0 (the default) means that each line is forwarded
-as its own log message.
+This is useful if multiple files need to be monitored. If set to 0, each file
+will be fully processed and then processing switches to the next file
+(this was the default in previous versions). If it is set, a maximum of
+[number] lines is processed in sequence for each file, and then the file is
+switched. This provides a kind of mutiplexing the load of multiple files and
+probably leads to a more natural distribution of events when multiple busy files
+are monitored. The default is 10240.
<li>$InputFileBindRuleset &lt;ruleset&gt;<br>
Available in 5.7.5+, 6.1.5+
Binds the listener to a specific <a href="multi_ruleset.html">ruleset</a>.</li>
diff --git a/doc/imptcp.html b/doc/imptcp.html
index c7a0e599..a4f43249 100644
--- a/doc/imptcp.html
+++ b/doc/imptcp.html
@@ -45,7 +45,25 @@ can be found at the <a href="http://www.rsyslog.com/Article321.phtml">Cisco tcp
page.
<li>$InputPTCPServerNotifyOnConnectionClose [on/<b>off</b>]<br>
instructs imptcp to emit a message if the remote peer closes a connection.<br>
-<li>$InputPTCPServerRun &lt;port&gt;<br>
+<li><b>$InputPTCPServerKeepAlive</b> &lt;on/<b>off</b>&gt;<br>
+enable of disable keep-alive packets at the tcp socket layer. The default is
+to disable them.</li>
+<li><b>$InputPTCPServerKeepAlive_probes</b> &lt;number&gt;<br>
+The number of unacknowledged probes to send before considering the connection dead and notifying the application layer.
+The default, 0, means that the operating system defaults are used. This has only
+effect if keep-alive is enabled. The functionality may not be available on
+all platforms.
+<li><b>$InputPTCPServerKeepAlive_intvl</b> &lt;number&gt;<br>
+The interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime.
+The default, 0, means that the operating system defaults are used. This has only
+effect if keep-alive is enabled. The functionality may not be available on
+all platforms.
+<li><b>$InputPTCPServerKeepAlive_time</b> &lt;number&gt;<br>
+The interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further.
+The default, 0, means that the operating system defaults are used. This has only
+effect if keep-alive is enabled. The functionality may not be available on
+all platforms.
+<li><b>$InputPTCPServerRun</b> &lt;port&gt;<br>
Starts a TCP server on selected port</li>
<li>$InputPTCPServerInputName &lt;name&gt;<br>
Sets a name for the inputname property. If no name is set "imptcp" is used by default. Setting a
diff --git a/doc/imtcp.html b/doc/imtcp.html
index b0aaa3c1..7653f601 100644
--- a/doc/imtcp.html
+++ b/doc/imtcp.html
@@ -55,6 +55,9 @@ so be prepared to wrangle with that!
instructs imtcp to emit a message if the remote peer closes a connection.<br>
<b>Important:</b> This directive is global to all listeners and must be given right
after loading imtcp, otherwise it may have no effect.</li>
+<li><b>$InputTCPServerKeepAlive</b> &lt;on/<b>off</b>&gt;<br>
+enable of disable keep-alive packets at the tcp socket layer. The default is
+to disable them.</li>
<li><b>$InputTCPServerRun</b> &lt;port&gt;<br>
Starts a TCP server on selected port</li>
<li><b>$InputTCPFlowControl</b> &lt;<b>on</b>/off&gt;<br>
diff --git a/doc/imuxsock.html b/doc/imuxsock.html
index ee5db22d..58b3ae54 100644
--- a/doc/imuxsock.html
+++ b/doc/imuxsock.html
@@ -68,9 +68,18 @@ messages that shall be rate-limited.
be obtained from the log socket itself. If so, the TAG part of the message is rewritten.
It is recommended to turn this option on, but the default is "off" to keep compatible
with earlier versions of rsyslog. This option was introduced in 5.7.0.</li>
+<li><b>$InputUnixListenSocketUseSysTimeStamp</b> [<b>on</b>/off] instructs imuxsock
+to obtain message time from the system (via control messages) insted of using time
+recorded inside the message. This may be most useful in combination with systemd. Note:
+this option was introduced with version 5.9.1. Due to the usefulness of it, we
+decided to enable it by default. As such, 5.9.1 and above behave slightly different
+than previous versions. However, we do not see how this could negatively affect
+existing environments.<br>
<li><b>$SystemLogSocketIgnoreMsgTimestamp</b> [<b>on</b>/off]<br>
Ignore timestamps included in the messages, applies to messages received via the system log socket.</li>
-<li><b>$OmitLocalLogging</b> (imuxsock) [on/<b>off</b>] -- former -o option</li>
+<li><b>$OmitLocalLogging</b> (imuxsock) [on/<b>off</b>] -- former -o option;
+do NOT listen for the local log socket. This is most useful if you run multiple
+instances of rsyslogd where only one shall handle the system log socket.</li>
<li><b>$SystemLogSocketName</b> &lt;name-of-socket&gt; -- former -p option</li>
<li><b>$SystemLogFlowControl</b> [on/<b>off</b>] - specifies if flow control should be applied
to the system log socket.</li>
@@ -87,6 +96,7 @@ burst in number of messages. Default is 200.
<li><b>$SystemLogRateLimitSeverity</b> [numerical severity] - specifies the severity of
messages that shall be rate-limited.
</li>
+<li><b>$SystemLogUseSysTimeStamp</b> [<b>on</b>/off] the same as $InputUnixListenSocketUseSysTimeStamp, but for the system log socket.
<li><b>$InputUnixListenSocketCreatePath</b> [on/<b>off</b>] - create directories in the socket path
if they do not already exist. They are created with 0755 permissions with the owner being the process under
which rsyslogd runs. The default is not to create directories. Keep in mind, though, that rsyslogd always
diff --git a/doc/mmsnmptrapd.html b/doc/mmsnmptrapd.html
index e69bc241..699049d3 100644
--- a/doc/mmsnmptrapd.html
+++ b/doc/mmsnmptrapd.html
@@ -51,8 +51,11 @@ to control output modules are also available to mmsnmptrapd.
<ul>
<li><b>$mmsnmptrapdTag</b> [tagname]<br>
tells the module which start string inside the tag to look for. The default is
-"snmptrap/"
-<li><b>$mmsnmptrapdSevertiyMapping</b> [severtiymap]<br>
+"snmptrapd". Note that a slash is automatically added to this tag when it comes to
+matching incoming messages. It MUST not be given, except if two slashes are required
+for whatever reasons (so "tag/" results in a check for "tag//" at the start of
+the tag field).
+<li><b>$mmsnmptrapdSeverityMapping</b> [severtiymap]<br>
This specifies the severity mapping table. It needs to be specified as a list. Note that
due to the current config system <b>no whitespace</b> is supported inside the list, so be
sure not to use any whitespace inside it.<br>
@@ -76,7 +79,7 @@ severities. The default tag is used.<br>
# ... other module loads and listener setup ...
*.* /path/to/file/with/orignalMessage # this file receives *un*modified messages
$mmsnmptrapdSeverityMapping warning/4,error/3
-*.* ::mmsnmptrapd: # *now* message is modified
+*.* :mmsnmptrapd: # *now* message is modified
*.* /path/to/file/with/modifiedMessage # this file receives modified messages
# ... rest of config ...
</textarea>
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 4425c949..3537d809 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -64,6 +64,7 @@ DEFobjCurrIf(strm)
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
+#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */
typedef struct fileInfo_s {
uchar *pszFileName;
uchar *pszTag;
@@ -71,11 +72,13 @@ typedef struct fileInfo_s {
uchar *pszStateFile; /* file in which state between runs is to be stored */
int iFacility;
int iSeverity;
+ int maxLinesAtOnce;
int nRecords; /**< How many records did we process before persisting the stream? */
int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */
strm_t *pStrm; /* its stream (NULL if not assigned) */
int readMode; /* which mode to use in ReadMulteLine call? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ multi_submit_t multiSub;
} fileInfo_t;
@@ -95,6 +98,7 @@ static int iPersistStateInterval = 0; /* how often if state file to be persisted
static int iFacility = 128; /* local0 */
static int iSeverity = 5; /* notice, as of rfc 3164 */
static int readMode = 0; /* mode to use for ReadMultiLine call */
+static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */
static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */
@@ -126,7 +130,9 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
MsgSetRuleset(pMsg, pInfo->pRuleset);
- CHKiRet(submitMsg(pMsg));
+ pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg;
+ if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem)
+ CHKiRet(multiSubmitMsg(&pInfo->multiSub));
finalize_it:
RETiRet;
}
@@ -210,6 +216,7 @@ static void pollFileCancelCleanup(void *pArg)
static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
{
cstr_t *pCStr = NULL;
+ int nProcessed = 0;
DEFiRet;
ASSERT(pbHadFileData != NULL);
@@ -224,7 +231,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
/* loop below will be exited when strmReadLine() returns EOF */
while(glbl.GetGlobalInputTermState() == 0) {
+ if(pThis->maxLinesAtOnce != 0 && nProcessed >= pThis->maxLinesAtOnce)
+ break;
CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode));
+ ++nProcessed;
*pbHadFileData = 1; /* this is just a flag, so set it and forget it */
CHKiRet(enqLine(pThis, pCStr)); /* process line */
rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */
@@ -235,6 +245,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
finalize_it:
+ if(pThis->multiSub.nElem > 0) {
+ /* submit everything that was not yet submitted */
+ CHKiRet(multiSubmitMsg(&pThis->multiSub));
+ }
; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */
/* Note: the problem above is that pthread:cleanup_pop() is a macro which
* evaluates to something like "} while(0);". So the code would become
@@ -480,6 +494,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iSeverity = 5; /* notice, as of rfc 3164 */
readMode = 0;
pBindRuleset = NULL;
+ maxLinesAtOnce = 10240;
RETiRet;
}
@@ -518,8 +533,12 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal)
pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile);
}
+ CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*)));
+ pThis->multiSub.maxElem = NUM_MULTISUB;
+ pThis->multiSub.nElem = 0;
pThis->iSeverity = iSeverity;
pThis->iFacility = iFacility;
+ pThis->maxLinesAtOnce = maxLinesAtOnce;
pThis->iPersistStateInterval = iPersistStateInterval;
pThis->nRecords = 0;
pThis->readMode = readMode;
@@ -597,6 +616,8 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &iPollInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt,
NULL, &readMode, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize,
+ NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt,
NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord,
diff --git a/plugins/imklog/bsd.c b/plugins/imklog/bsd.c
index 0a4c7cd4..930bbd11 100644
--- a/plugins/imklog/bsd.c
+++ b/plugins/imklog/bsd.c
@@ -155,18 +155,18 @@ readklog(void)
for (p = (char*)pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) {
*q = '\0';
- Syslog(LOG_INFO, (uchar*) p);
+ Syslog(LOG_INFO, (uchar*) p, NULL);
}
len = strlen(p);
if (len >= iMaxLine - 1) {
- Syslog(LOG_INFO, (uchar*)p);
+ Syslog(LOG_INFO, (uchar*)p, NULL);
len = 0;
}
if (len > 0)
memmove(pRcv, p, len + 1);
}
if (len > 0)
- Syslog(LOG_INFO, pRcv);
+ Syslog(LOG_INFO, pRcv, NULL);
if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1)
free(pRcv);
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index 65a4cd57..b64a8f1f 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -112,15 +112,21 @@ initConfigSettings(void)
* rgerhards, 2008-04-12
*/
static rsRetVal
-enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity)
+enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *tp)
{
- DEFiRet;
+ struct syslogTime st;
msg_t *pMsg;
+ DEFiRet;
assert(msg != NULL);
assert(pszTag != NULL);
- CHKiRet(msgConstruct(&pMsg));
+ if(tp == NULL) {
+ CHKiRet(msgConstruct(&pMsg));
+ } else {
+ datetime.timeval2syslogTime(tp, &st);
+ CHKiRet(msgConstructWithTime(&pMsg, &st, tp->tv_sec));
+ }
MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
MsgSetInputName(pMsg, pInputName);
MsgSetRawMsgWOSize(pMsg, (char*)msg);
@@ -198,16 +204,17 @@ rsRetVal imklogLogIntMsg(int priority, char *fmt, ...)
}
-/* log a kernel message
+/* log a kernel message. If tp is non-NULL, it contains the message creation
+ * time to use.
* rgerhards, 2008-04-14
*/
-rsRetVal Syslog(int priority, uchar *pMsg)
+rsRetVal Syslog(int priority, uchar *pMsg, struct timeval *tp)
{
- DEFiRet;
int pri = -1;
rsRetVal localRet;
+ DEFiRet;
- /* first check if we have two PRIs. This can happen in case of systemd,
+ /* then check if we have two PRIs. This can happen in case of systemd,
* in which case the second PRI is the rigth one.
* TODO: added kernel timestamp support to this PoC. -- rgerhards, 2011-03-18
*/
@@ -232,7 +239,7 @@ rsRetVal Syslog(int priority, uchar *pMsg)
if(cs.bPermitNonKernel == 0 && LOG_FAC(priority) != LOG_KERN)
FINALIZE; /* silently ignore */
- iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority));
+ iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority), tp);
finalize_it:
RETiRet;
diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h
index 447211dc..b0772711 100644
--- a/plugins/imklog/imklog.h
+++ b/plugins/imklog/imklog.h
@@ -5,7 +5,7 @@
* Major change: 2008-04-09: switched to a driver interface for
* several platforms
*
- * Copyright 2007-2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -72,7 +72,7 @@ extern uchar *pszPath;
/* the functions below may be called by the drivers */
rsRetVal imklogLogIntMsg(int priority, char *fmt, ...) __attribute__((format(printf,2, 3)));
-rsRetVal Syslog(int priority, uchar *msg);
+rsRetVal Syslog(int priority, uchar *msg, struct timeval *tp);
/* prototypes */
extern int klog_getMaxLine(void); /* work-around for klog drivers to get configured max line size */
diff --git a/plugins/imklog/linux.c b/plugins/imklog/linux.c
index 97171e4f..38250efa 100644
--- a/plugins/imklog/linux.c
+++ b/plugins/imklog/linux.c
@@ -28,6 +28,8 @@
#include "rsyslog.h"
#include <stdlib.h>
#include <stdio.h>
+#include <ctype.h>
+#include <time.h>
#include <assert.h>
#include <signal.h>
#include <string.h>
@@ -181,6 +183,93 @@ static int copyin( uchar *line, int space,
return(i);
}
+
+/* submit a message to imklog Syslog() API. In this function, we check if
+ * a kernel timestamp is present and, if so, extract and strip it.
+ * Note: this is an extra processing step. We should revisit the whole
+ * idea in v6 and remove all that old stuff that we do not longer need
+ * (like symbol resolution). <-- TODO
+ * Special thanks to Lennart Poettering for suggesting on how to convert
+ * the kernel timestamp to a realtime timestamp. This method depends on
+ * the fact the the kernel timestamp is written using the monotonic clock.
+ * Shall that change (very unlikely), this code must be changed as well. Note
+ * that due to the way we generate the delta, we are unable to write the
+ * absolutely correc timestamp (system call overhead of the clock calls
+ * prevents us from doing so). However, the difference is very minor.
+ * rgerhards, 201106-24
+ */
+static void
+submitSyslog(int pri, uchar *buf)
+{
+ long secs;
+ long nsecs;
+ long secOffs;
+ long nsecOffs;
+ unsigned i;
+ unsigned bufsize;
+ struct timespec monotonic, realtime;
+ struct timeval tv;
+ struct timeval *tp = NULL;
+
+ if(buf[3] != '[')
+ goto done;
+ DBGPRINTF("imklog: kernel timestamp detected, extracting it\n");
+
+ /* we now try to parse the timestamp. iff it parses, we assume
+ * it is a timestamp. Otherwise we know for sure it is no ts ;)
+ */
+ i = 4; /* first digit after '[' */
+ secs = 0;
+ while(buf[i] && isdigit(buf[i])) {
+ secs = secs * 10 + buf[i] - '0';
+ ++i;
+ }
+ if(buf[i] != '.') {
+ DBGPRINTF("no dot --> no kernel timestamp\n");
+ goto done; /* no TS! */
+ }
+
+ ++i; /* skip dot */
+ nsecs = 0;
+ while(buf[i] && isdigit(buf[i])) {
+ nsecs = nsecs * 10 + buf[i] - '0';
+ ++i;
+ }
+ if(buf[i] != ']') {
+ DBGPRINTF("no trailing ']' --> no kernel timestamp\n");
+ goto done; /* no TS! */
+ }
+ ++i; /* skip ']' */
+
+ /* we have a timestamp */
+ DBGPRINTF("kernel timestamp is %ld %ld\n", secs, nsecs);
+ bufsize= strlen((char*)buf);
+ memcpy(buf+3, buf+i, bufsize - i + 1);
+
+ clock_gettime(CLOCK_MONOTONIC, &monotonic);
+ clock_gettime(CLOCK_REALTIME, &realtime);
+ secOffs = realtime.tv_sec - monotonic.tv_sec;
+ nsecOffs = realtime.tv_nsec - monotonic.tv_nsec;
+ if(nsecOffs < 0) {
+ secOffs--;
+ nsecOffs += 1000000000l;
+ }
+
+ nsecs +=nsecOffs;
+ if(nsecs > 999999999l) {
+ secs++;
+ nsecs -= 1000000000l;
+ }
+ secs += secOffs;
+ tv.tv_sec = secs;
+ tv.tv_usec = nsecs / 1000;
+ tp = &tv;
+
+done:
+ Syslog(pri, buf, tp);
+}
+
+
/*
* Messages are separated by "\n". Messages longer than
* LOG_LINE_LENGTH are broken up.
@@ -235,7 +324,7 @@ static void LogLine(modConfData_t *pModConf, char *ptr, int len)
//dbgprintf("Line buffer full:\n");
//dbgprintf("\tLine: %s\n", line);
- Syslog(LOG_INFO, line_buff);
+ submitSyslog(LOG_INFO, line_buff);
line = line_buff;
space = sizeof(line_buff)-1;
parse_state = PARSING_TEXT;
@@ -254,28 +343,24 @@ static void LogLine(modConfData_t *pModConf, char *ptr, int len)
space -= delta;
len -= delta;
- if( space == 0 || len == 0 )
- {
+ if( space == 0 || len == 0 ) {
break; /* full line_buff or end of input buffer */
}
- if( *ptr == '\0' ) /* zero byte */
- {
+ if( *ptr == '\0' ) /* zero byte */ {
ptr++; /* skip zero byte */
space -= 1;
len -= 1;
-
break;
}
- if( *ptr == '\n' ) /* newline */
- {
+ if( *ptr == '\n' ) /* newline */ {
ptr++; /* skip newline */
space -= 1;
len -= 1;
*line = 0; /* force null terminator */
- Syslog(LOG_INFO, line_buff);
+ submitSyslog(LOG_INFO, line_buff);
line = line_buff;
space = sizeof(line_buff)-1;
if(pModConf->symbols_twice) {
@@ -285,9 +370,7 @@ static void LogLine(modConfData_t *pModConf, char *ptr, int len)
skip_symbol_lookup = 1;
ptr = save_ptr;
len = save_len;
- }
- else
- {
+ } else {
skip_symbol_lookup = 0;
save_ptr = ptr;
save_len = len;
@@ -295,8 +378,7 @@ static void LogLine(modConfData_t *pModConf, char *ptr, int len)
}
break;
}
- if( *ptr == '[' ) /* possible kernel symbol */
- {
+ if( *ptr == '[' ) /* possible kernel symbol */ {
*line++ = *ptr++;
space -= 1;
len -= 1;
@@ -310,8 +392,7 @@ static void LogLine(modConfData_t *pModConf, char *ptr, int len)
break;
case PARSING_SYMSTART:
- if( *ptr != '<' )
- {
+ if( *ptr != '<' ) {
parse_state = PARSING_TEXT; /* not a symbol */
break;
}
diff --git a/plugins/imklog/solaris.c b/plugins/imklog/solaris.c
index 8a6d5af1..0a169cdd 100644
--- a/plugins/imklog/solaris.c
+++ b/plugins/imklog/solaris.c
@@ -80,74 +80,6 @@ klogWillRun(void)
}
-#if 0
-/* Read /dev/klog while data are available, split into lines.
- * Contrary to standard BSD syslogd, we do a blocking read. We can
- * afford this as imklog is running on its own threads. So if we have
- * a single file, it really doesn't matter if we wait inside a 1-file
- * select or the read() directly.
- */
-static void
-readklog(void)
-{
- char *p, *q;
- int len, i;
- int iMaxLine;
- uchar bufRcv[4096+1];
- uchar *pRcv = NULL; /* receive buffer */
-
- iMaxLine = klog_getMaxLine();
-
- /* we optimize performance: if iMaxLine is below 4K (which it is in almost all
- * cases, we use a fixed buffer on the stack. Only if it is higher, heap memory
- * is used. We could use alloca() to achive a similar aspect, but there are so
- * many issues with alloca() that I do not want to take that route.
- * rgerhards, 2008-09-02
- */
- if((size_t) iMaxLine < sizeof(bufRcv) - 1) {
- pRcv = bufRcv;
- } else {
- if((pRcv = (uchar*) malloc(sizeof(uchar) * (iMaxLine + 1))) == NULL)
- iMaxLine = sizeof(bufRcv) - 1; /* better this than noting */
- }
-
- len = 0;
- for (;;) {
- dbgprintf("----------imklog(BSD) waiting for kernel log line\n");
- i = read(fklog, pRcv + len, iMaxLine - len);
- if (i > 0) {
- pRcv[i + len] = '\0';
- } else {
- if (i < 0 && errno != EINTR && errno != EAGAIN) {
- imklogLogIntMsg(LOG_ERR,
- "imklog error %d reading kernel log - shutting down imklog",
- errno);
- fklog = -1;
- }
- break;
- }
-
- for(p = pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) {
- *q = '\0';
- Syslog(LOG_INFO, (uchar*) p);
- }
- len = strlen(p);
- if (len >= iMaxLine - 1) {
- Syslog(LOG_INFO, (uchar*)p);
- len = 0;
- }
- if (len > 0)
- memmove(pRcv, p, len + 1);
- }
- if (len > 0)
- Syslog(LOG_INFO, pRcv);
-
- if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1)
- free(pRcv);
-}
-#endif
-
-
/* to be called in the module's AfterRun entry point
* rgerhards, 2008-04-09
*/
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 713ee83c..1ee91bb7 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -50,6 +50,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
+#include <netinet/tcp.h>
#if HAVE_FCNTL_H
#include <fcntl.h>
#endif
@@ -89,6 +90,10 @@ static void * wrkr(void *myself);
/* config settings */
typedef struct configSettings_s {
+ int bKeepAlive; /* support keep-alive packets */
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
int bEmitMsgOnClose; /* emit an informational message on close by remote peer */
int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
@@ -99,6 +104,10 @@ typedef struct configSettings_s {
static configSettings_t cs;
struct instanceConf_s {
+ int bKeepAlive; /* support keep-alive packets */
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
int bEmitMsgOnClose;
int iAddtlFrameDelim;
uchar *pszBindPort; /* port to bind to */
@@ -134,14 +143,18 @@ struct ptcpsrv_s {
ptcpsrv_t *pNext; /* linked list maintenance */
uchar *port; /* Port to listen to */
uchar *lstnIP; /* which IP we should listen on? */
- int bEmitMsgOnClose;
int iAddtlFrameDelim;
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
uchar *pszInputName;
prop_t *pInputName; /* InputName in (fast to process) property format */
ruleset_t *pRuleset;
ptcplstn_t *pLstn; /* root of our listeners */
ptcpsess_t *pSess; /* root of our sessions */
pthread_mutex_t mutSessLst;
+ sbool bKeepAlive; /* support keep-alive packets */
+ sbool bEmitMsgOnClose;
};
/* the ptcp session object. Describes a single active session.
@@ -458,12 +471,80 @@ finalize_it:
}
+/* Enable KEEPALIVE handling on the socket. */
+static inline rsRetVal
+EnableKeepAlive(ptcplstn_t *pLstn, int sock)
+{
+ int ret;
+ int optval;
+ socklen_t optlen;
+ DEFiRet;
+
+ optval = 1;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
+ if(ret < 0) {
+ dbgprintf("EnableKeepAlive socket call returns error %d\n", ret);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveProbes > 0) {
+ optval = pLstn->pSrv->iKeepAliveProbes;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored");
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveTime > 0) {
+ optval = pLstn->pSrv->iKeepAliveTime;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored");
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveIntvl > 0) {
+ optval = pLstn->pSrv->iKeepAliveIntvl;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored");
+ }
+
+ dbgprintf("KEEPALIVE enabled for socket %d\n", sock);
+
+finalize_it:
+ RETiRet;
+}
+
/* accept an incoming connection request
* rgerhards, 2008-04-22
*/
static rsRetVal
-AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP)
+AcceptConnReq(ptcplstn_t *pLstn, int *newSock, prop_t **peerName, prop_t **peerIP)
{
int sockflags;
struct sockaddr_storage addr;
@@ -472,13 +553,17 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP)
DEFiRet;
- iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen);
+ iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen);
if(iNewSock < 0) {
if(errno == EAGAIN || errno == EWOULDBLOCK)
ABORT_FINALIZE(RS_RET_NO_MORE_DATA);
ABORT_FINALIZE(RS_RET_ACCEPT_ERR);
}
+ if(pLstn->pSrv->bKeepAlive)
+ EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */
+
+
CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr*) &addr));
/* set the new socket to non-blocking IO */
@@ -905,6 +990,10 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal)
CHKmalloc(inst->pszInputName = ustrdup(cs.pszInputName));
}
inst->pBindRuleset = NULL;
+ inst->bKeepAlive = cs.bKeepAlive;
+ inst->iKeepAliveIntvl = cs.iKeepAliveTime;
+ inst->iKeepAliveProbes = cs.iKeepAliveProbes;
+ inst->iKeepAliveTime = cs.iKeepAliveTime;
inst->bEmitMsgOnClose = cs.bEmitMsgOnClose;
inst->iAddtlFrameDelim = cs.iAddtlFrameDelim;
inst->next = NULL;
@@ -933,6 +1022,10 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pthread_mutex_init(&pSrv->mutSessLst, NULL);
pSrv->pSess = NULL;
pSrv->pLstn = NULL;
+ pSrv->bKeepAlive = inst->bKeepAlive;
+ pSrv->iKeepAliveIntvl = inst->iKeepAliveTime;
+ pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
+ pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
@@ -1052,7 +1145,7 @@ lstnActivity(ptcplstn_t *pLstn)
DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
while(glbl.GetGlobalInputTermState() == 0) {
- localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP);
+ localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP);
if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1)
break;
CHKiRet(localRet);
@@ -1406,6 +1499,10 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
{
cs.bEmitMsgOnClose = 0;
cs.wrkrMax = 2;
+ cs.bKeepAlive = 0;
+ cs.iKeepAliveProbes = 0;
+ cs.iKeepAliveTime = 0;
+ cs.iKeepAliveIntvl = 0;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(cs.pszInputName);
cs.pszInputName = NULL;
@@ -1453,6 +1550,14 @@ CODEmodInit_QueryRegCFSLineHdlr
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
addInstance, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary,
+ NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0,
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index 976fbf1f..142f0791 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -88,6 +88,7 @@ static struct configSettings_s {
int iTCPSessMax;
int iTCPLstnMax;
int iStrmDrvrMode;
+ int bKeepAlive;
int bEmitMsgOnClose;
int iAddtlFrameDelim;
int bDisableLFDelim;
@@ -112,10 +113,11 @@ struct modConfData_s {
int iTCPSessMax; /* max number of sessions */
int iTCPLstnMax; /* max number of sessions */
int iStrmDrvrMode; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */
- int bEmitMsgOnClose; /* emit an informational message on close by remote peer */
int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */
- int bDisableLFDelim; /* disable standard LF delimiter */
- int bUseFlowControl; /* use flow control, what means indicate ourselfs a "light delayable" */
+ sbool bDisableLFDelim; /* disable standard LF delimiter */
+ sbool bUseFlowControl; /* use flow control, what means indicate ourselfs a "light delayable" */
+ sbool bKeepAlive;
+ sbool bEmitMsgOnClose; /* emit an informational message on close by remote peer */
uchar *pszStrmDrvrAuthMode; /* authentication mode to use */
};
@@ -249,6 +251,7 @@ addListner(modConfData_t *modConf, instanceConf_t *inst)
CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose));
CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose));
/* params */
+ CHKiRet(tcpsrv.SetKeepAlive(pOurTcpsrv, modConf->bKeepAlive));
CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, modConf->iTCPSessMax));
CHKiRet(tcpsrv.SetLstnMax(pOurTcpsrv, modConf->iTCPLstnMax));
CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, modConf->iStrmDrvrMode));
@@ -300,6 +303,7 @@ CODESTARTendCnfLoad
pModConf->iAddtlFrameDelim = cs.iAddtlFrameDelim;
pModConf->bDisableLFDelim = cs.bDisableLFDelim;
pModConf->bUseFlowControl = cs.bUseFlowControl;
+ pModConf->bKeepAlive = cs.bKeepAlive;
if((cs.pszStrmDrvrAuthMode == NULL) || (cs.pszStrmDrvrAuthMode[0] == '\0')) {
loadModConf->pszStrmDrvrAuthMode = NULL;
free(cs.pszStrmDrvrAuthMode);
@@ -409,6 +413,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
cs.iTCPLstnMax = 20;
cs.iStrmDrvrMode = 0;
cs.bUseFlowControl = 0;
+ cs.bKeepAlive = 0;
cs.bEmitMsgOnClose = 0;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
cs.bDisableLFDelim = 0;
@@ -446,6 +451,8 @@ CODEmodInit_QueryRegCFSLineHdlr
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverrun"), 0, eCmdHdlrGetWord,
addInstance, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverkeepalive"), 0, eCmdHdlrBinary,
+ NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt,
NULL, &cs.iTCPSessMax, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt,
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 64477620..4eafe92a 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -140,6 +140,7 @@ typedef struct lstn_s {
sbool bCreatePath; /* auto-creation of socket directory? */
sbool bUseCreds; /* pull original creator credentials from socket */
sbool bWritePid; /* write original PID into tag */
+ sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */
} lstn_t;
static lstn_t listeners[MAXFUNIX];
@@ -163,6 +164,8 @@ static struct configSettings_s {
uchar *pLogHostName; /* host name to use with this socket */
int bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used! */
int bIgnoreTimestamp; /* ignore timestamps present in the incoming message? */
+ int bUseSysTimeStamp; /* use timestamp from system (rather than from message) */
+ int bUseSysTimeStampSysSock; /* same, for system log socket */
int bWritePid; /* use credentials from recvmsg() and fixup PID in TAG */
int bWritePidSysSock; /* use credentials from recvmsg() and fixup PID in TAG */
int bCreatePath; /* auto-create socket path? */
@@ -177,9 +180,10 @@ static struct configSettings_s {
struct instanceConf_s {
uchar *sockName;
uchar *pLogHostName; /* host name to use with this socket */
- int bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used! */
- int bIgnoreTimestamp; /* ignore timestamps present in the incoming message? */
- int bWritePid; /* use credentials from recvmsg() and fixup PID in TAG */
+ sbool bUseFlowCtl; /* use flow control or not (if yes, only LIGHT is used! */
+ sbool bIgnoreTimestamp; /* ignore timestamps present in the incoming message? */
+ sbool bWritePid; /* use credentials from recvmsg() and fixup PID in TAG */
+ sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */
int bCreatePath; /* auto-create socket path? */
int ratelimitInterval; /* interval in seconds, 0 = off */
int ratelimitBurst; /* max nbr of messages in interval */
@@ -191,11 +195,12 @@ struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
instanceConf_t *root, *tail;
uchar *pLogSockName;
- int bOmitLocalLogging;
- int bWritePidSysSock;
int ratelimitIntervalSysSock;
int ratelimitBurstSysSock;
int ratelimitSeveritySysSock;
+ sbool bOmitLocalLogging;
+ sbool bWritePidSysSock;
+ sbool bUseSysTimeStamp;
};
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */
@@ -317,6 +322,7 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal)
inst->bUseFlowCtl = cs.bUseFlowCtl;
inst->bIgnoreTimestamp = cs.bIgnoreTimestamp;
inst->bCreatePath = cs.bCreatePath;
+ inst->bUseSysTimeStamp = cs.bUseSysTimeStamp;
inst->bWritePid = cs.bWritePid;
inst->next = NULL;
@@ -381,6 +387,7 @@ addListner(instanceConf_t *inst)
listeners[nfd].sockName = ustrdup(inst->sockName);
listeners[nfd].bUseCreds = (inst->bWritePid || inst->ratelimitInterval) ? 1 : 0;
listeners[nfd].bWritePid = inst->bWritePid;
+ listeners[nfd].bUseSysTimeStamp = inst->bUseSysTimeStamp;
nfd++;
} else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
@@ -494,6 +501,10 @@ openLogSocket(lstn_t *pLstn)
errmsg.LogError(errno, NO_ERRCODE, "set SCM_CREDENTIALS failed on '%s'", pLstn->sockName);
pLstn->bUseCreds = 0;
}
+// TODO: move to its own #if
+ if(setsockopt(pLstn->fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) != 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "set SO_TIMESTAMP failed on '%s'", pLstn->sockName);
+ }
}
# else /* HAVE_SCM_CREDENTIALS */
pLstn->bUseCreds = 0;
@@ -584,7 +595,7 @@ fixPID(uchar *bufTAG, int *lenTag, struct ucred *cred)
* can also mangle it if necessary.
*/
static inline rsRetVal
-SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
+SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct timeval *ts)
{
msg_t *pMsg;
int lenMsg;
@@ -623,7 +634,13 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */
}
- datetime.getCurrTime(&st, &tt);
+ if(ts == NULL) {
+ datetime.getCurrTime(&st, &tt);
+ } else {
+ datetime.timeval2syslogTime(ts, &st);
+ tt = ts->tv_sec;
+ }
+
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
FINALIZE;
@@ -643,8 +660,26 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
parse++; lenMsg--; /* '>' */
- if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
- DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
+ if(ts == NULL) {
+ if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
+ DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
+ }
+ } else { /* if we pulled the time from the system, we need to update the message text */
+ if(lenMsg >= 16) {
+ /* RFC3164 timestamp is 16 bytes long, so assuming a valid stamp,
+ * we can fixup the message. If the part is smaller, the stamp can
+ * not be valid and we do not touch the message. Note that there may
+ * be some scenarios where the message is larg enough but the stamp is
+ * still invalid. In those cases we will destruct part of the message,
+ * but this case is considered extremely unlikely and thus not handled
+ * specifically. -- rgerhards, 2011-06-20
+ */
+ datetime.formatTimestamp3164(&st, (char*)parse, 0);
+ parse[15] = ' '; /* re-write \0 from fromatTimestamp3164 by SP */
+ /* update "counters" to reflect processed timestamp */
+ parse += 16;
+ lenMsg -= 16;
+ }
}
/* pull tag */
@@ -695,6 +730,7 @@ static rsRetVal readSocket(lstn_t *pLstn)
struct cmsghdr *cm;
# endif
struct ucred *cred;
+ struct timeval *ts;
uchar bufRcv[4096+1];
char aux[128];
uchar *pRcv = NULL; /* receive buffer */
@@ -733,21 +769,32 @@ static rsRetVal readSocket(lstn_t *pLstn)
dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd);
if(iRcvd > 0) {
cred = NULL;
-# if HAVE_SCM_CREDENTIALS
- if(pLstn->bUseCreds) {
+ ts = NULL;
+ if(pLstn->bUseCreds || pLstn->bUseSysTimeStamp) {
dbgprintf("XXX: pre CM loop, length of control message %d\n", (int) msgh.msg_controllen);
- for (cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) {
+ for(cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) {
dbgprintf("XXX: in CM loop, %d, %d\n", cm->cmsg_level, cm->cmsg_type);
- if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) {
+# if HAVE_SCM_CREDENTIALS
+ if( pLstn->bUseCreds
+ && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) {
cred = (struct ucred*) CMSG_DATA(cm);
dbgprintf("XXX: got credentials pid %d\n", (int) cred->pid);
break;
}
+# endif /* HAVE_SCM_CREDENTIALS */
+# if HAVE_SO_TIMESTAMP
+ if( pLstn->bUseSysTimeStamp
+ && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) {
+ ts = (struct timeval *)CMSG_DATA(cm);
+ dbgprintf("XXX: got timestamp %ld.%ld\n",
+ (long) ts->tv_sec, (long) ts->tv_usec);
+ break;
+ }
+# endif /* HAVE_SO_TIMESTAMP */
}
dbgprintf("XXX: post CM loop\n");
}
-# endif /* HAVE_SCM_CREDENTIALS */
- CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred));
+ CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred, ts));
} else if(iRcvd < 0 && errno != EINTR) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
@@ -797,6 +844,7 @@ activateListeners()
listeners[0].ratelimitSev = runModConf->ratelimitSeveritySysSock;
listeners[0].bUseCreds = (runModConf->bWritePidSysSock || runModConf->ratelimitIntervalSysSock) ? 1 : 0;
listeners[0].bWritePid = runModConf->bWritePidSysSock;
+ listeners[0].bUseSysTimeStamp = runModConf->bUseSysTimeStamp;
sd_fds = sd_listen_fds(0);
if(sd_fds < 0) {
@@ -1017,6 +1065,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
cs.pLogHostName = NULL;
cs.bIgnoreTimestamp = 1;
cs.bUseFlowCtl = 0;
+ cs.bUseSysTimeStamp = 1;
+ cs.bUseSysTimeStampSysSock = 1;
cs.bWritePid = 0;
cs.bWritePidSysSock = 0;
cs.bCreatePath = DFLT_bCreatePath;
@@ -1063,6 +1113,7 @@ CODEmodInit_QueryRegCFSLineHdlr
listeners[0].bParseHost = 0;
listeners[0].bUseCreds = 0;
listeners[0].bCreatePath = 0;
+ listeners[0].bUseSysTimeStamp = 1;
/* initialize socket names */
for(i = 1 ; i < MAXFUNIX ; ++i) {
@@ -1092,6 +1143,8 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &cs.bUseFlowCtl, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary,
NULL, &cs.bCreatePath, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusesystimestamp", 0, eCmdHdlrBinary,
+ NULL, &cs.bUseSysTimeStamp, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addInstance, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary,
@@ -1114,6 +1167,8 @@ CODEmodInit_QueryRegCFSLineHdlr
setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary,
setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary,
+ NULL, &cs.bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary,
NULL, &cs.bWritePidSysSock, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt,
diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c
index 54354830..aebd9af1 100644
--- a/plugins/mmsnmptrapd/mmsnmptrapd.c
+++ b/plugins/mmsnmptrapd/mmsnmptrapd.c
@@ -414,7 +414,7 @@ CODEmodInit_QueryRegCFSLineHdlr
cs.pszTagName = NULL;
cs.pszSeverityMapping = NULL;
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrInt,
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrGetWord,
NULL, &cs.pszTagName, STD_LOADABLE_MODULE_ID, eConfObjAction));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdseveritymapping", 0, eCmdHdlrGetWord,
NULL, &cs.pszSeverityMapping, STD_LOADABLE_MODULE_ID, eConfObjAction));
diff --git a/runtime/datetime.c b/runtime/datetime.c
index f81180ea..89452f1c 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -55,6 +55,47 @@ static const int tenPowers[6] = { 1, 10, 100, 1000, 10000, 100000 };
/* ------------------------------ methods ------------------------------ */
+/**
+ * Convert struct timeval to syslog_time
+ */
+void
+timeval2syslogTime(struct timeval *tp, struct syslogTime *t)
+{
+ struct tm *tm;
+ struct tm tmBuf;
+ long lBias;
+
+ tm = localtime_r((time_t*) &(tp->tv_sec), &tmBuf);
+
+ t->year = tm->tm_year + 1900;
+ t->month = tm->tm_mon + 1;
+ t->day = tm->tm_mday;
+ t->hour = tm->tm_hour;
+ t->minute = tm->tm_min;
+ t->second = tm->tm_sec;
+ t->secfrac = tp->tv_usec;
+ t->secfracPrecision = 6;
+
+# if __sun
+ /* Solaris uses a different method of exporting the time zone.
+ * It is UTC - localtime, which is the opposite sign of mins east of GMT.
+ */
+ lBias = -(daylight ? altzone : timezone);
+# elif defined(__hpux)
+ lBias = tz.tz_dsttime ? - tz.tz_minuteswest : 0;
+# else
+ lBias = tm->tm_gmtoff;
+# endif
+ if(lBias < 0) {
+ t->OffsetMode = '-';
+ lBias *= -1;
+ } else
+ t->OffsetMode = '+';
+ t->OffsetHour = lBias / 3600;
+ t->OffsetMinute = (lBias % 3600) / 60;
+ t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */
+}
+
/**
* Get the current date/time in the best resolution the operating
* system has to offer (well, actually at most down to the milli-
@@ -74,9 +115,6 @@ static const int tenPowers[6] = { 1, 10, 100, 1000, 10000, 100000 };
static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
{
struct timeval tp;
- struct tm *tm;
- struct tm tmBuf;
- long lBias;
# if defined(__hpux)
struct timezone tz;
# endif
@@ -93,37 +131,7 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
if(ttSeconds != NULL)
*ttSeconds = tp.tv_sec;
- tm = localtime_r((time_t*) &(tp.tv_sec), &tmBuf);
-
- t->year = tm->tm_year + 1900;
- t->month = tm->tm_mon + 1;
- t->day = tm->tm_mday;
- t->hour = tm->tm_hour;
- t->minute = tm->tm_min;
- t->second = tm->tm_sec;
- t->secfrac = tp.tv_usec;
- t->secfracPrecision = 6;
-
-# if __sun
- /* Solaris uses a different method of exporting the time zone.
- * It is UTC - localtime, which is the opposite sign of mins east of GMT.
- */
- lBias = -(daylight ? altzone : timezone);
-# elif defined(__hpux)
- lBias = tz.tz_dsttime ? - tz.tz_minuteswest : 0;
-# else
- lBias = tm->tm_gmtoff;
-# endif
- if(lBias < 0)
- {
- t->OffsetMode = '-';
- lBias *= -1;
- }
- else
- t->OffsetMode = '+';
- t->OffsetHour = lBias / 3600;
- t->OffsetMinute = (lBias % 3600) / 60;
- t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */
+ timeval2syslogTime(&tp, t);
}
@@ -861,6 +869,7 @@ CODESTARTobjQueryInterface(datetime)
*/
pIf->getCurrTime = getCurrTime;
pIf->GetTime = getTime;
+ pIf->timeval2syslogTime = timeval2syslogTime;
pIf->ParseTIMESTAMP3339 = ParseTIMESTAMP3339;
pIf->ParseTIMESTAMP3164 = ParseTIMESTAMP3164;
pIf->formatTimestampToMySQL = formatTimestampToMySQL;
diff --git a/runtime/datetime.h b/runtime/datetime.h
index 70bbf416..2175d6a1 100644
--- a/runtime/datetime.h
+++ b/runtime/datetime.h
@@ -44,8 +44,10 @@ BEGINinterface(datetime) /* name must also be changed in ENDinterface macro! */
int (*formatTimestampSecFrac)(struct syslogTime *ts, char* pBuf);
/* v3, 2009-11-12 */
time_t (*GetTime)(time_t *ttSeconds);
+ /* v6, 2011-06-20 */
+ void (*timeval2syslogTime)(struct timeval *tp, struct syslogTime *t);
ENDinterface(datetime)
-#define datetimeCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */
+#define datetimeCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
/* interface changes:
* 1 - initial version
* 2 - not compatible to 1 - bugfix required ParseTIMESTAMP3164 to accept char ** as
@@ -54,6 +56,7 @@ ENDinterface(datetime)
* 3 - taken by v5 branch!
* 4 - formatTimestamp3164 takes a third int parameter
* 5 - merge of versions 3 + 4 (2010-03-09)
+ * 6 - see above
*/
/* prototypes */
diff --git a/runtime/queue.c b/runtime/queue.c
index 00eb76c7..c831836d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -83,6 +83,11 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr);
+static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qDestructDisk(qqueue_t *pThis);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -583,6 +588,47 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
/* -------------------- disk -------------------- */
+/* The following function is used to "save" ourself from being killed by
+ * a fatally failed disk queue. A fatal failure is, for example, if no
+ * data can be read or written. In that case, the disk support is disabled,
+ * with all on-disk structures kept as-is as much as possible. Instead, the
+ * queue is switched to direct mode, so that at least
+ * some processing can happen. Of course, this may still have lots of
+ * undesired side-effects, but is probably better than aborting the
+ * syslogd. Note that this function *must* succeed in one way or another, as
+ * we can not recover from failure here. But it may emit different return
+ * states, which can trigger different processing in the higher layers.
+ * rgerhards, 2011-05-03
+ */
+static inline rsRetVal
+queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
+{
+ pThis->iQueueSize = 0;
+ pThis->nLogDeq = 0;
+ qDestructDisk(pThis); /* free disk structures */
+
+ pThis->qType = QUEUETYPE_DIRECT;
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ if(pThis->pqParent != NULL) {
+ DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
+ pThis->pqParent->bIsDA = 0;
+ pThis->pqParent->pqDA = NULL;
+ /* This may have undesired side effects, not sure if I really evaluated
+ * all. So you know where to look at if you come to this point during
+ * troubleshooting ;) -- rgerhards, 2011-05-03
+ */
+ }
+
+ errmsg.LogError(0, initiatingError, "fatal error on disk queue '%s', emergency switch to direct mode",
+ obj.GetName((obj_t*) pThis));
+ return RS_RET_ERR_QUEUE_EMERGENCY;
+}
+
+
static rsRetVal
qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis)
{
@@ -785,10 +831,7 @@ finalize_it:
static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
-
- CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL));
-
-finalize_it:
+ iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
RETiRet;
}
@@ -1684,7 +1727,18 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueForConsumer(pThis, pWti));
+ iRet = DequeueForConsumer(pThis, pWti);
+ if(iRet == RS_RET_FILE_NOT_FOUND) {
+ /* This is a fatal condition and means the queue is almost unusable */
+ d_pthread_mutex_unlock(pThis->mut);
+ DBGOPRINT((obj_t*) pThis, "got 'file not found' error %d, queue defunct\n", iRet);
+ iRet = queueSwitchToEmergencyMode(pThis, iRet);
+ // TODO: think about what to return as iRet -- keep RS_RET_FILE_NOT_FOUND?
+ d_pthread_mutex_lock(pThis->mut);
+ }
+ if (iRet != RS_RET_OK) {
+ FINALIZE;
+ }
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
@@ -1778,7 +1832,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
{
DEFiRet;
-//DBGPRINTF("XXXX: chkStopWrkrDA called, low watermark %d, phys Size %d\n", pThis->iLowWtrMrk, getPhysicalQueueSize(pThis));
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
}
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index b1a93ab4..d1290aeb 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -350,6 +350,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */
RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */
RS_RET_WRN_WRKDIR = -2182, /**< correctable problems with the rsyslog working directory */
+ RS_RET_ERR_QUEUE_EMERGENCY = -2183, /**< some fatal error caused queue to switch to emergency mode */
RS_RET_INVLD_CONF_OBJ= -2200, /**< invalid config object (e.g. $Begin conf statement) */
RS_RET_ERR_LIBEE_INIT = -2201, /**< cannot obtain libee ctx */
diff --git a/runtime/strmsrv.c b/runtime/strmsrv.c
index f448cd4f..03691de6 100644
--- a/runtime/strmsrv.c
+++ b/runtime/strmsrv.c
@@ -767,7 +767,7 @@ static rsRetVal
SetKeepAlive(strmsrv_t *pThis, int iVal)
{
DEFiRet;
- dbgprintf("keep-alive set to %d\n", iVal);
+ dbgprintf("strmsrv: keep-alive set to %d\n", iVal);
pThis->bUseKeepAlive = iVal;
RETiRet;
}
diff --git a/runtime/wti.c b/runtime/wti.c
index 9343f5c5..ec56c2d5 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -314,7 +314,10 @@ wtiWorker(wti_t *pThis)
*/
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
- if(localRet == RS_RET_IDLE) {
+ if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) {
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+ break; /* end of loop */
+ } else if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
diff --git a/tcpsrv.c b/tcpsrv.c
index adb23e60..fd591e3c 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -415,6 +415,10 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
}
+ if(pThis->bUseKeepAlive) {
+ CHKiRet(netstrm.EnableKeepAlive(pNewStrm));
+ }
+
/* we found a free spot and can construct our session object */
CHKiRet(tcps_sess.Construct(&pSess));
CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
@@ -1025,6 +1029,15 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr)
}
static rsRetVal
+SetKeepAlive(tcpsrv_t *pThis, int iVal)
+{
+ DEFiRet;
+ dbgprintf("tcpsrv: keep-alive set to %d\n", iVal);
+ pThis->bUseKeepAlive = iVal;
+ RETiRet;
+}
+
+static rsRetVal
SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
{
DEFiRet;
@@ -1203,6 +1216,7 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;
+ pIf->SetKeepAlive = SetKeepAlive;
pIf->SetUsrP = SetUsrP;
pIf->SetInputName = SetInputName;
pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
diff --git a/tcpsrv.h b/tcpsrv.h
index b5557d4e..e9eba458 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -48,6 +48,7 @@ struct tcpLstnPortList_s {
/* the tcpsrv object */
struct tcpsrv_s {
BEGINobjInstance; /**< Data to implement generic object - MUST be the first data element! */
+ int bUseKeepAlive; /**< use socket layer KEEPALIVE handling? */
netstrms_t *pNS; /**< pointer to network stream subsystem */
int iDrvrMode; /**< mode of the stream driver to use */
uchar *pszDrvrAuthMode; /**< auth mode of the stream driver to use */
@@ -136,8 +137,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetbDisableLFDelim)(tcpsrv_t*, int);
/* added v10 -- rgerhards, 2011-04-01 */
rsRetVal (*SetUseFlowControl)(tcpsrv_t*, int);
+ /* added v11 -- rgerhards, 2011-05-09 */
+ rsRetVal (*SetKeepAlive)(tcpsrv_t*, int);
ENDinterface(tcpsrv)
-#define tcpsrvCURR_IF_VERSION 10 /* increment whenever you change the interface structure! */
+#define tcpsrvCURR_IF_VERSION 11 /* increment whenever you change the interface structure! */
/* change for v4:
* - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10
* - SetInputName() added -- rgerhards, 2008-12-10
diff --git a/tests/inputfilegen.c b/tests/inputfilegen.c
index 26fb79af..0ff8d049 100644
--- a/tests/inputfilegen.c
+++ b/tests/inputfilegen.c
@@ -1,5 +1,6 @@
/* generate an input file suitable for use by the testbench
* Copyright (C) 2011 by Rainer Gerhards and Adiscon GmbH.
+ * usage: ./inputfilegen num-lines > file
* Part of rsyslog, licensed under GPLv3
*/
#include <stdio.h>
diff --git a/tests/testsuites/imfile-basic.conf b/tests/testsuites/imfile-basic.conf
index 9fb9b5ca..59b109a6 100644
--- a/tests/testsuites/imfile-basic.conf
+++ b/tests/testsuites/imfile-basic.conf
@@ -6,6 +6,7 @@ $InputFileTag file:
$InputFileStateFile stat-file1
$InputFileSeverity error
$InputFileFacility local7
+$InputFileMaxLinesAtOnce 100000
$InputRunFileMonitor
$template outfmt,"%msg:F,58:2%\n"