summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog115
-rw-r--r--action.c66
-rw-r--r--action.h4
-rw-r--r--configure.ac4
-rw-r--r--doc/imfile.html11
-rw-r--r--doc/impstats.html4
-rw-r--r--doc/imptcp.html20
-rw-r--r--doc/imtcp.html12
-rw-r--r--doc/imuxsock.html45
-rw-r--r--doc/manual.html2
-rw-r--r--doc/mmsnmptrapd.html9
-rw-r--r--doc/rsconf1_omfileforcechown.html5
-rw-r--r--doc/rsyslog_conf_global.html1
-rw-r--r--doc/rsyslog_conf_templates.html4
-rw-r--r--plugins/imfile/imfile.c23
-rw-r--r--plugins/imklog/bsd.c6
-rw-r--r--plugins/imklog/imklog.c52
-rw-r--r--plugins/imklog/imklog.h4
-rw-r--r--plugins/imklog/linux.c113
-rw-r--r--plugins/imklog/solaris.c68
-rw-r--r--plugins/imptcp/imptcp.c105
-rw-r--r--plugins/imtcp/imtcp.c10
-rw-r--r--plugins/imudp/imudp.c173
-rw-r--r--plugins/imuxsock/imuxsock.c246
-rw-r--r--plugins/mmsnmptrapd/mmsnmptrapd.c2
-rw-r--r--plugins/omhdfs/javaenv.sh14
-rw-r--r--plugins/omhdfs/omhdfs.c85
-rw-r--r--runtime/conf.c48
-rw-r--r--runtime/datetime.c77
-rw-r--r--runtime/datetime.h5
-rw-r--r--runtime/module-template.h2
-rw-r--r--runtime/modules.c80
-rw-r--r--runtime/msg.c4
-rw-r--r--runtime/nsd_gtls.c8
-rw-r--r--runtime/parser.c9
-rw-r--r--runtime/queue.c76
-rw-r--r--runtime/queue.h2
-rw-r--r--runtime/rsyslog.h2
-rw-r--r--runtime/strmsrv.c2
-rw-r--r--runtime/wti.c5
-rw-r--r--tcps_sess.c4
-rw-r--r--tcpsrv.c43
-rw-r--r--tcpsrv.h11
-rw-r--r--tests/inputfilegen.c1
-rw-r--r--tests/tcpflood.c6
-rw-r--r--tests/testsuites/imfile-basic.conf1
-rw-r--r--tools/omfile.c50
-rw-r--r--tools/omfwd.c80
-rw-r--r--tools/ompipe.c18
-rw-r--r--tools/omusrmsg.c16
-rw-r--r--tools/syslogd.c10
51 files changed, 1315 insertions, 448 deletions
diff --git a/ChangeLog b/ChangeLog
index 45709779..4b49ecbf 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,118 @@
---------------------------------------------------------------------------
+Version 5.9.6 [V5-DEVEL], 2012-??-??
+- $IMUXSockRateLimitInterval DEFAULT CHANGED, was 5, now 0
+ The new default turns off rate limiting. This was chosen as people
+ experienced problems with rate-limiting activated by default. Now it
+ needs an explicit opt-in by setting this parameter.
+ Thanks to Chris Gaffney for suggesting to make it opt-in; thanks to
+ many unnamed others who already had complained at the time Chris made
+ the suggestion ;-)
+---------------------------------------------------------------------------
+Version 5.9.5 [V5-DEVEL], 2011-11-29
+- new stats counters for imudp and imtcp
+- new stats counters "discarded.nf" and "discarded.full" for queue object.
+ Tells how many messages have been discarded due to queue full condition.
+- enhanced module loader to not rely on PATH_MAX
+---------------------------------------------------------------------------
+Version 5.9.4 [V5-DEVEL], 2011-11-29
+- imuxsock: added capability to "annotate" messages with "trusted
+ information", which contains some properties obtained from the system
+ and as such sure to not be faked. This is inspired by the similiar idea
+ introduced in systemd.
+- removed dependency on gcrypt for recently-enough GnuTLS
+ see: http://bugzilla.adiscon.com/show_bug.cgi?id=289
+- bugfix: imuxsock did no longer ignore message-provided timestamp, if
+ so configured (the *default*). Lead to no longer sub-second timestamps.
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=281
+- bugfix: omfile returns fatal error code for things that go really wrong
+ previously, RS_RET_RESUME was returned, which lead to a loop inside the
+ rule engine as omfile could not really recover.
+- bugfix: rsyslogd -v always said 64 atomics were not present
+ thanks to mono_matsuko for the patch
+---------------------------------------------------------------------------
+Version 5.9.3 [V5-DEVEL], 2011-09-01
+- bugfix/security: off-by-two bug in legacy syslog parser, CVE-2011-3200
+- bugfix: mark message processing did not work correctly
+- added capability to emit config error location info for warnings
+ otherwise, omusrmsg's warning about new config format was not
+ accompanied by problem location.
+- bugfix: potential misadressing in property replacer
+- bugfix: MSGID corruption in RFC5424 parser under some circumstances
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=275
+- bugfix: The NUL-Byte for the syslogtag was not copied in MsgDup (msg.c)
+---------------------------------------------------------------------------
+Version 5.9.2 [V5-DEVEL] (rgerhards), 2011-07-11
+- systemd support: set stdout/stderr to null - thx to Lennart for the patch
+- added support for the ":omusrmsg:" syntax in configuring user messages
+- added support for the ":omfile:" syntax in configuring user messages
+---------------------------------------------------------------------------
+Version 5.9.1 [V5-DEVEL] (rgerhards), 2011-06-30
+- added support for obtaining timestamp for kernel message from message
+ If the kernel time-stamps messages, time is now take from that
+ timestamp instead of the system time when the message was read. This
+ provides much better accuracy. Thanks to Lennart Poettering for
+ suggesting this feature and his help during implementation.
+- added support for obtaining timestamp from system for imuxsock
+ This permits to read the time a message was submitted to the system
+ log socket. Most importantly, this is provided in microsecond resolution.
+ So we are able to obtain high precision timestampis even for messages
+ that were - as is usual - not formatted with them. This also simplifies
+ things in regard to local time calculation in chroot environments.
+ Many thanks to Lennart Poettering for suggesting this feature,
+ providing some guidance on implementing it and coordinating getting the
+ necessary support into the Linux kernel.
+- bugfix: timestamp was incorrectly calculated for timezones with minute
+ offset
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=271
+- bugfix: problems in failover action handling
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=254
+- bugfix: mutex was invalidly left unlocked during action processing
+ At least one case where this can occur is during thread shutdown, which
+ may be initiated by lower activity. In most cases, this is quite
+ unlikely to happen. However, if it does, data structures may be
+ corrupted which could lead to fatal failure and segfault. I detected
+ this via a testbench test, not a user report. But I assume that some
+ users may have had unreproducable aborts that were cause by this bug.
+- bugfix: memory leak in imtcp & subsystems under some circumstances
+ This leak is tied to error conditions which lead to incorrect cleanup
+ of some data structures. [backport from v6]
+- bugfix/improvement:$WorkDirectory now gracefully handles trailing slashes
+---------------------------------------------------------------------------
+Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-06-08
+- imfile: added $InputFileMaxLinesAtOnce directive
+- enhanced imfile to support input batching
+- added capability for imtcp and imptcp to activate keep-alive packets
+ at the socket layer. This has not been added to imttcp, as the latter is
+ only an experimental module, and one which did not prove to be useful.
+ reference: http://kb.monitorware.com/post20791.html
+- added support to control KEEPALIVE settings in imptcp
+ this has not yet been added to imtcp, but could be done on request.
+- $ActionName is now also used for naming of queues in impstats
+ as well as in the debug output
+- bugfix: do not open files with full privileges, if privs will be dropped
+ This make the privilege drop code more bulletproof, but breaks Ubuntu's
+ work-around for log files created by external programs with the wrong
+ user and/or group. Note that it was long said that this "functionality"
+ would break once we go for serious privilege drop code, so hopefully
+ nobody still depends on it (and, if so, they lost...).
+- bugfix: pipes not opened in full priv mode when privs are to be dropped
+- this begins a new devel branch for v5
+- better handling of queue i/o errors in disk queues. This is kind of a
+ bugfix, but a very intrusive one, this it goes into the devel version
+ first. Right now, "file not found" is handled and leads to the new
+ emergency mode, in which disk action is stopped and the queue run
+ in direct mode. An error message is emited if this happens.
+- added support for user-level PRI provided via systemd
+- added new config directive $InputTCPFlowControl to select if tcp
+ received messages shall be flagged as light delayable or not.
+- enhanced omhdfs to support batching mode. This permits to increase
+ performance, as we now call the HDFS API with much larger message
+ sizes and far more infrequently
+- bugfix: failover did not work correctly if repeated msg reduction was on
+ affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236
+---------------------------------------------------------------------------
Version 5.8.7 [V5-stable] 2012-01-17
- bugfix: instabilities when using RFC5424 header fields
Thanks to Kaiwang Chen for the patch
@@ -293,6 +407,7 @@ Version 5.7.0 [V5-DEVEL] (rgerhards), 2010-09-16
Version 5.6.5 [V5-STABLE] (rgerhards), 2011-03-22
- bugfix: failover did not work correctly if repeated msg reduction was on
affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236
- bugfix: omlibdbi did not use password from rsyslog.con
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=203
- bugfix(kind of): tell users that config graph can currently not be
diff --git a/action.c b/action.c
index 6796cf14..34ca4fc0 100644
--- a/action.c
+++ b/action.c
@@ -112,6 +112,7 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "atomic.h"
+#include "statsobj.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
@@ -127,6 +128,7 @@ DEFobjCurrIf(obj)
DEFobjCurrIf(datetime)
DEFobjCurrIf(module)
DEFobjCurrIf(errmsg)
+DEFobjCurrIf(statsobj)
static int iActExecOnceInterval = 0; /* execute action once every nn seconds */
static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */
@@ -263,6 +265,12 @@ rsRetVal actionDestruct(action_t *pThis)
qqueueDestruct(&pThis->pQueue);
}
+ /* destroy stats object, if we have one (may not always be
+ * be the case, e.g. if turned off)
+ */
+ if(pThis->statsobj != NULL)
+ statsobj.Destruct(&pThis->statsobj);
+
if(pThis->pMod != NULL)
pThis->pMod->freeInstance(pThis->pModData);
@@ -313,13 +321,42 @@ rsRetVal
actionConstructFinalize(action_t *pThis)
{
DEFiRet;
- uchar pszQName[64]; /* friendly name of our queue */
+ uchar pszAName[64]; /* friendly name of our action */
ASSERT(pThis != NULL);
- /* find a name for our queue */
- snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ /* generate a friendly name for us action stats */
+ if(pThis->pszName == NULL) {
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr);
+ } else {
+ ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
+ pszAName[63] = '\0'; /* to be on the save side */
+ }
+
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&pThis->statsobj));
+ CHKiRet(statsobj.SetName(pThis->statsobj, pszAName));
+ STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"),
+ ctrType_IntCtr, &pThis->ctrProcessed));
+
+ STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"),
+ ctrType_IntCtr, &pThis->ctrFail));
+
+ CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
+
+ /* create our queue */
+
+ /* generate a friendly name for the queue */
+ if(pThis->pszName == NULL) {
+ snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue",
+ iActionNbr);
+ } else {
+ ustrncpy(pszAName, pThis->pszName, sizeof(pszAName));
+ pszAName[63] = '\0'; /* to be on the save side */
+ }
/* now check if we can run the action in "firehose mode" during stage one of
* its processing (that is before messages are enqueued into the action q).
* This is only possible if some features, which require strict sequence, are
@@ -362,7 +399,7 @@ actionConstructFinalize(action_t *pThis)
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
(rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
- obj.SetName((obj_t*) pThis->pQueue, pszQName);
+ obj.SetName((obj_t*) pThis->pQueue, pszAName);
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
@@ -1072,6 +1109,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
&& pBatch->pElem[i].state != BATCH_STATE_COMM ) {
pBatch->pElem[i].state = BATCH_STATE_BAD;
pBatch->pElem[i].bPrevWasSuspended = 1;
+ STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
}
}
bDone = 1;
@@ -1261,6 +1299,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
else
@@ -1537,6 +1576,17 @@ finalize_it:
RETiRet;
}
+static inline void
+countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
+ }
+ }
+}
+
/* enqueue a batch in direct mode. We have put this into its own function just to avoid
* cluttering the actual submit function.
@@ -1573,13 +1623,16 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
pBatch->pElem[i].bFilterOK = 0;
bModifiedFilter = 1;
}
- if(pBatch->pElem[i].bFilterOK)
+ if(pBatch->pElem[i].bFilterOK) {
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
bNeedSubmit = 1;
+ }
DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
}
if(bNeedSubmit) {
+ /* note: stats were already computed above */
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
} else {
DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
@@ -1594,6 +1647,8 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
}
} else {
+ if(GatherStats)
+ countStatsBatchEnq(pAction, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
}
@@ -1817,6 +1872,7 @@ rsRetVal actionClassInit(void)
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(module, CORE_COMPONENT));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
diff --git a/action.h b/action.h
index 0c08d3d6..add55a5e 100644
--- a/action.h
+++ b/action.h
@@ -89,6 +89,10 @@ struct action_s {
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
uchar *pszName; /* action name (for documentation) */
DEF_ATOMIC_HELPER_MUT(mutCAS);
+ /* for statistics subsystem */
+ statsobj_t *statsobj;
+ STATSCOUNTER_DEF(ctrProcessed, mutCtrProcessed);
+ STATSCOUNTER_DEF(ctrFail, mutCtrFail);
};
diff --git a/configure.ac b/configure.ac
index 099b78b9..011905a6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-AC_INIT([rsyslog],[5.8.7],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[5.9.4],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
@@ -118,6 +118,8 @@ AC_CHECK_FUNCS([flock basename alarm clock_gettime gethostbyname gethostname get
# let me know! -- rgerhards, 2010-10-06
AC_CHECK_DECL([SCM_CREDENTIALS], [AC_DEFINE(HAVE_SCM_CREDENTIALS, [1], [set define])], [], [#include <sys/types.h>
#include <sys/socket.h>])
+AC_CHECK_DECL([SO_TIMESTAMP], [AC_DEFINE(HAVE_SO_TIMESTAMP, [1], [set define])], [], [#include <sys/types.h>
+#include <sys/socket.h>])
# Check for MAXHOSTNAMELEN
AC_MSG_CHECKING(for MAXHOSTNAMELEN)
diff --git a/doc/imfile.html b/doc/imfile.html
index c44171df..7961729b 100644
--- a/doc/imfile.html
+++ b/doc/imfile.html
@@ -100,9 +100,16 @@ performance, especially when set to a low value. Frequently writing the state
file is very time consuming.
<li><b>$InputFileReadMode</b> [mode]</b><br>
Available in 5.7.5+
+<li><b>$InputFileMaxLinesAtOnce</b> [number]</b><br>
+Available in 5.9.0+
<br>
-Mode to be used when reading lines. 0 (the default) means that each line is forwarded
-as its own log message.
+This is useful if multiple files need to be monitored. If set to 0, each file
+will be fully processed and then processing switches to the next file
+(this was the default in previous versions). If it is set, a maximum of
+[number] lines is processed in sequence for each file, and then the file is
+switched. This provides a kind of mutiplexing the load of multiple files and
+probably leads to a more natural distribution of events when multiple busy files
+are monitored. The default is 10240.
<li>$InputFileBindRuleset &lt;ruleset&gt;<br>
Available in 5.7.5+, 6.1.5+
Binds the listener to a specific <a href="multi_ruleset.html">ruleset</a>.</li>
diff --git a/doc/impstats.html b/doc/impstats.html
index cede4874..260c1aa4 100644
--- a/doc/impstats.html
+++ b/doc/impstats.html
@@ -18,7 +18,9 @@ prepared to change your trending scripts when you upgrade to a newer rsyslog ver
output is periodic, with the interval being configurable (default is 5 minutes).
Be sure that your configuration records the counter messages (default is syslog.info).
<p>Note that loading this module has impact on rsyslog performance. Depending on
-settings, this impact may be severe (for high-load environments).
+settings, this impact may be noticable (for high-load environments).
+<p>The rsyslog website has an updated overview of available
+<a href="http://rsyslog.com/rsyslog-statistic-counter/">rsyslog statistic counters</a>.
</p>
<p><b>Configuration Directives</b>:</p>
<ul>
diff --git a/doc/imptcp.html b/doc/imptcp.html
index d4228185..386e691a 100644
--- a/doc/imptcp.html
+++ b/doc/imptcp.html
@@ -45,7 +45,25 @@ can be found at the <a href="http://www.rsyslog.com/Article321.phtml">Cisco tcp
page.
<li>$InputPTCPServerNotifyOnConnectionClose [on/<b>off</b>]<br>
instructs imptcp to emit a message if the remote peer closes a connection.<br>
-<li>$InputPTCPServerRun &lt;port&gt;<br>
+<li><b>$InputPTCPServerKeepAlive</b> &lt;on/<b>off</b>&gt;<br>
+enable of disable keep-alive packets at the tcp socket layer. The default is
+to disable them.</li>
+<li><b>$InputPTCPServerKeepAlive_probes</b> &lt;number&gt;<br>
+The number of unacknowledged probes to send before considering the connection dead and notifying the application layer.
+The default, 0, means that the operating system defaults are used. This has only
+effect if keep-alive is enabled. The functionality may not be available on
+all platforms.
+<li><b>$InputPTCPServerKeepAlive_intvl</b> &lt;number&gt;<br>
+The interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime.
+The default, 0, means that the operating system defaults are used. This has only
+effect if keep-alive is enabled. The functionality may not be available on
+all platforms.
+<li><b>$InputPTCPServerKeepAlive_time</b> &lt;number&gt;<br>
+The interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further.
+The default, 0, means that the operating system defaults are used. This has only
+effect if keep-alive is enabled. The functionality may not be available on
+all platforms.
+<li><b>$InputPTCPServerRun</b> &lt;port&gt;<br>
Starts a TCP server on selected port</li>
<li>$InputPTCPServerInputName &lt;name&gt;<br>
Sets a name for the inputname property. If no name is set "imptcp" is used by default. Setting a
diff --git a/doc/imtcp.html b/doc/imtcp.html
index 422bbd55..7653f601 100644
--- a/doc/imtcp.html
+++ b/doc/imtcp.html
@@ -55,8 +55,20 @@ so be prepared to wrangle with that!
instructs imtcp to emit a message if the remote peer closes a connection.<br>
<b>Important:</b> This directive is global to all listeners and must be given right
after loading imtcp, otherwise it may have no effect.</li>
+<li><b>$InputTCPServerKeepAlive</b> &lt;on/<b>off</b>&gt;<br>
+enable of disable keep-alive packets at the tcp socket layer. The default is
+to disable them.</li>
<li><b>$InputTCPServerRun</b> &lt;port&gt;<br>
Starts a TCP server on selected port</li>
+<li><b>$InputTCPFlowControl</b> &lt;<b>on</b>/off&gt;<br>
+This setting specifies whether some message flow control shall be exercised on the
+related TCP input. If set to on, messages are handled as "light delayable", which means
+the sender is throttled a bit when the queue becomes near-full. This is done in order
+to preserve some queue space for inputs that can not throttle (like UDP), but it
+may have some undesired effect in some configurations. Still, we consider this as
+a useful setting and thus it is the default. To turn the handling off, simply
+configure that explicitely.
+</li>
<li><b>$InputTCPMaxListeners</b> &lt;number&gt;<br>
Sets the maximum number of listeners (server ports) supported. Default is 20. This must be set before the first $InputTCPServerRun directive.</li>
<li><b>$InputTCPMaxSessions</b> &lt;number&gt;<br> Sets the maximum number of sessions supported. Default is 200. This must be set before the first $InputTCPServerRun directive</li>
diff --git a/doc/imuxsock.html b/doc/imuxsock.html
index ee5db22d..734ae889 100644
--- a/doc/imuxsock.html
+++ b/doc/imuxsock.html
@@ -49,6 +49,15 @@ are places as quickly as possible into the processing queues. If you would like
flow control, you need to enable it via the $SystemLogSocketFlowControl and
$InputUnixListenSocketFlowControl config directives. Just make sure you thought about
the implications. Note that for many systems, turning on flow control does not hurt.
+<p>Starting with rsyslog 5.9.4,
+<b><a href="http://www.rsyslog.com/what-are-trusted-properties/">trusted syslog properties</a>
+are available</b>. These require a recent enough Linux Kernel and access to the /proc file
+system. In other words, this may not work on all platforms and may not work fully when
+privileges are dropped (depending on how they are dropped). Note that trusted properties
+can be very useful, but also typically cause the message to grow rather large. Also, the
+format of log messages is obviously changed by adding the trusted properties at the end.
+For these reasons, the feature is <b>not enabled by default</b>. If you want to use it,
+you must turn it on (via $SystemLogSocketAnnotate and $InputUnixListenSocketAnnotate).
<p><b>Configuration Directives</b>:</p>
<ul>
<li><b>$InputUnixListenSocketIgnoreMsgTimestamp</b> [<b>on</b>/off]
@@ -56,7 +65,10 @@ the implications. Note that for many systems, turning on flow control does not h
<li><b>$InputUnixListenSocketFlowControl</b> [on/<b>off</b>] - specifies if flow control should be applied
to the next socket.</li>
<li><b>$IMUXSockRateLimitInterval</b> [number] - specifies the rate-limiting
-interval in seconds. Default value is 5 seconds. Set it to 0 to turn rate limiting off.
+interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number
+of seconds (5 recommended) to activate rate-limiting. The default of 0 has been choosen in 5.9.6+,
+as people experienced problems with this feature activated by default. Now it needs an
+explicit opt-in by setting this parameter.
</li>
<li><b>$IMUXSockRateLimitBurst</b> [number] - specifies the rate-limiting
burst in number of messages. Default is 200.
@@ -68,9 +80,18 @@ messages that shall be rate-limited.
be obtained from the log socket itself. If so, the TAG part of the message is rewritten.
It is recommended to turn this option on, but the default is "off" to keep compatible
with earlier versions of rsyslog. This option was introduced in 5.7.0.</li>
+<li><b>$InputUnixListenSocketUseSysTimeStamp</b> [<b>on</b>/off] instructs imuxsock
+to obtain message time from the system (via control messages) insted of using time
+recorded inside the message. This may be most useful in combination with systemd. Note:
+this option was introduced with version 5.9.1. Due to the usefulness of it, we
+decided to enable it by default. As such, 5.9.1 and above behave slightly different
+than previous versions. However, we do not see how this could negatively affect
+existing environments.<br>
<li><b>$SystemLogSocketIgnoreMsgTimestamp</b> [<b>on</b>/off]<br>
Ignore timestamps included in the messages, applies to messages received via the system log socket.</li>
-<li><b>$OmitLocalLogging</b> (imuxsock) [on/<b>off</b>] -- former -o option</li>
+<li><b>$OmitLocalLogging</b> (imuxsock) [on/<b>off</b>] -- former -o option;
+do NOT listen for the local log socket. This is most useful if you run multiple
+instances of rsyslogd where only one shall handle the system log socket.</li>
<li><b>$SystemLogSocketName</b> &lt;name-of-socket&gt; -- former -p option</li>
<li><b>$SystemLogFlowControl</b> [on/<b>off</b>] - specifies if flow control should be applied
to the system log socket.</li>
@@ -87,6 +108,7 @@ burst in number of messages. Default is 200.
<li><b>$SystemLogRateLimitSeverity</b> [numerical severity] - specifies the severity of
messages that shall be rate-limited.
</li>
+<li><b>$SystemLogUseSysTimeStamp</b> [<b>on</b>/off] the same as $InputUnixListenSocketUseSysTimeStamp, but for the system log socket.
<li><b>$InputUnixListenSocketCreatePath</b> [on/<b>off</b>] - create directories in the socket path
if they do not already exist. They are created with 0755 permissions with the owner being the process under
which rsyslogd runs. The default is not to create directories. Keep in mind, though, that rsyslogd always
@@ -105,7 +127,12 @@ shall be used inside messages taken from the <b>next</b> $AddUnixListenSocket so
the hostname must be specified before the $AddUnixListenSocket configuration directive, and it
will only affect the next one and then automatically be reset. This functionality is provided so
that the local hostname can be overridden in cases where that is desired.</li>
+<li><b>$InputUnixListenSocketAnnotate</b> &lt;on/<b>off</b>&gt; turn on annotation/trusted
+properties for the non-system log socket in question.</li>
+<li><b>$SystemLogSocketAnnotate</b> &lt;on/<b>off</b>&gt; turn on annotation/trusted
+properties for the system log socket.</li>
</ul>
+
<b>Caveats/Known Bugs:</b><br>
<ul>
<li>There is a compile-time limit of 50 concurrent sockets. If you need more, you need to
@@ -141,17 +168,21 @@ $InputUnixListenSocketHostName /var/run/sshd/dev/log
</textarea>
<p>The following sample is used to turn off input rate limiting on the system log
socket.
-<textarea rows="6" cols="70">$ModLoad imuxsock # needs to be done just once
+<textarea rows="4" cols="70">$ModLoad imuxsock # needs to be done just once
$SystemLogRateLimitInterval 0 # turn off rate limiting
</textarea>
+<p>The following sample is used activate message annotation and thus trusted properties
+on the system log socket.
+<textarea rows="4" cols="70">$ModLoad imuxsock # needs to be done just once
+
+$SystemLogSocketAnnotate on
+</textarea>
<p>[<a href="rsyslog_conf.html">rsyslog.conf overview</a>]
[<a href="manual.html">manual index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p>
<p><font size="2">This documentation is part of the
-<a href="http://www.rsyslog.com/">rsyslog</a>
-project.<br>
-Copyright &copy; 2008-2010 by <a href="http://www.gerhards.net/rainer">Rainer
-Gerhards</a> and
+<a href="http://www.rsyslog.com/">rsyslog</a> project.<br>
+Copyright &copy; 2008-2011 by <a href="http://www.gerhards.net/rainer">Rainer Gerhards</a> and
<a href="http://www.adiscon.com/">Adiscon</a>.
Released under the GNU GPL version 3 or higher.</font></p>
</body></html>
diff --git a/doc/manual.html b/doc/manual.html
index 010ccab6..fdeb2980 100644
--- a/doc/manual.html
+++ b/doc/manual.html
@@ -19,7 +19,7 @@ rsyslog support</a> available directly from the source!</p>
<p><b>Please visit the <a href="http://www.rsyslog.com/sponsors">rsyslog sponsor's page</a>
to honor the project sponsors or become one yourself!</b> We are very grateful for any help towards the
project goals.</p>
-<p><b>This documentation is for version 5.8.7 (v5-stable branch) of rsyslog.</b>
+<p><b>This documentation is for version 5.9.4 (stable branch) of rsyslog.</b>
Visit the <i><a href="http://www.rsyslog.com/status">rsyslog status page</a></i></b>
to obtain current version information and project status.
</p><p><b>If you like rsyslog, you might
diff --git a/doc/mmsnmptrapd.html b/doc/mmsnmptrapd.html
index e69bc241..699049d3 100644
--- a/doc/mmsnmptrapd.html
+++ b/doc/mmsnmptrapd.html
@@ -51,8 +51,11 @@ to control output modules are also available to mmsnmptrapd.
<ul>
<li><b>$mmsnmptrapdTag</b> [tagname]<br>
tells the module which start string inside the tag to look for. The default is
-"snmptrap/"
-<li><b>$mmsnmptrapdSevertiyMapping</b> [severtiymap]<br>
+"snmptrapd". Note that a slash is automatically added to this tag when it comes to
+matching incoming messages. It MUST not be given, except if two slashes are required
+for whatever reasons (so "tag/" results in a check for "tag//" at the start of
+the tag field).
+<li><b>$mmsnmptrapdSeverityMapping</b> [severtiymap]<br>
This specifies the severity mapping table. It needs to be specified as a list. Note that
due to the current config system <b>no whitespace</b> is supported inside the list, so be
sure not to use any whitespace inside it.<br>
@@ -76,7 +79,7 @@ severities. The default tag is used.<br>
# ... other module loads and listener setup ...
*.* /path/to/file/with/orignalMessage # this file receives *un*modified messages
$mmsnmptrapdSeverityMapping warning/4,error/3
-*.* ::mmsnmptrapd: # *now* message is modified
+*.* :mmsnmptrapd: # *now* message is modified
*.* /path/to/file/with/modifiedMessage # this file receives modified messages
# ... rest of config ...
</textarea>
diff --git a/doc/rsconf1_omfileforcechown.html b/doc/rsconf1_omfileforcechown.html
index 7415a6f6..a680810b 100644
--- a/doc/rsconf1_omfileforcechown.html
+++ b/doc/rsconf1_omfileforcechown.html
@@ -8,7 +8,10 @@
<h2>$omfileForceChown</h2>
<p><b>Type:</b> global configuration directive</p>
<p><b>Parameter Values:</b> boolean (on/off, yes/no)</p>
-<p><b>Available since:</b> 4.7.0+, 5.3.0+</p>
+<p><b>Available:</b> 4.7.0+, 5.3.0-5.8.x, <b>NOT</b> available in 5.9.x or higher</p>
+<p><b>Note: this directive has been removed and is no longer available. The
+documentation is currently being retained for historical reaons.</b> Expect
+it to go away at some later stage as well.
<p><b>Default:</b> off</p>
<p><b>Description:</b></p>
<p>Forces rsyslogd to change the ownership for output files that already exist. Please note
diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html
index 21786a7f..b254f366 100644
--- a/doc/rsyslog_conf_global.html
+++ b/doc/rsyslog_conf_global.html
@@ -143,6 +143,7 @@ our paper on <a href="multi_ruleset.html">using multiple rule sets in rsyslog</a
<li><a href="rsconf1_escape8bitcharsonreceive.html">$Escape8BitCharactersOnReceive</a></li>
<li><a href="rsconf1_escapecontrolcharactersonreceive.html">$EscapeControlCharactersOnReceive</a></li>
<li><b>$EscapeControlCharactersOnReceive</b> [<b>on</b>|off] - escape USASCII HT character</li>
+<li>$SpaceLFOnReceive [on/<b>off</b>] - instructs rsyslogd to replace LF with spaces during message reception (sysklogd compatibility aid)</li>
<li>$ErrorMessagesToStderr [<b>on</b>|off] - direct rsyslogd error message to stderr (in addition to other targets)</li>
<li><a href="rsconf1_failonchownfailure.html">$FailOnChownFailure</a></li>
<li><a href="rsconf1_filecreatemode.html">$FileCreateMode</a></li>
diff --git a/doc/rsyslog_conf_templates.html b/doc/rsyslog_conf_templates.html
index 23a02049..bd0b3253 100644
--- a/doc/rsyslog_conf_templates.html
+++ b/doc/rsyslog_conf_templates.html
@@ -146,6 +146,10 @@ with high-precision timestamps and timezone information</li>
useful if you send&nbsp;messages to other syslogd's or rsyslogd
below
version 3.12.5.</li>
+<li><span style="font-weight: bold;">RSYSLOG_SysklogdFileFormat</span>
+- sysklogd compatible log file format. If used with options: $SpaceLFOnReceive on;
+$EscapeControlCharactersOnReceive off; $DropTrailingLFOnReception off,
+the log format will conform to sysklogd log format.</li>
<li><span style="font-weight: bold;">RSYSLOG_ForwardFormat</span>
- a new high-precision forwarding format very similar to the
traditional one, but with high-precision timestamps and timezone
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index ba8318df..632bf390 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -63,6 +63,7 @@ DEFobjCurrIf(strm)
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
+#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */
typedef struct fileInfo_s {
uchar *pszFileName;
uchar *pszTag;
@@ -70,11 +71,13 @@ typedef struct fileInfo_s {
uchar *pszStateFile; /* file in which state between runs is to be stored */
int iFacility;
int iSeverity;
+ int maxLinesAtOnce;
int nRecords; /**< How many records did we process before persisting the stream? */
int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */
strm_t *pStrm; /* its stream (NULL if not assigned) */
int readMode; /* which mode to use in ReadMulteLine call? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ multi_submit_t multiSub;
} fileInfo_t;
@@ -90,6 +93,7 @@ static int iPersistStateInterval = 0; /* how often if state file to be persisted
static int iFacility = 128; /* local0 */
static int iSeverity = 5; /* notice, as of rfc 3164 */
static int readMode = 0; /* mode to use for ReadMultiLine call */
+static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */
static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */
@@ -121,7 +125,9 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
MsgSetRuleset(pMsg, pInfo->pRuleset);
- CHKiRet(submitMsg(pMsg));
+ pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg;
+ if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem)
+ CHKiRet(multiSubmitMsg(&pInfo->multiSub));
finalize_it:
RETiRet;
}
@@ -205,6 +211,7 @@ static void pollFileCancelCleanup(void *pArg)
static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
{
cstr_t *pCStr = NULL;
+ int nProcessed = 0;
DEFiRet;
ASSERT(pbHadFileData != NULL);
@@ -219,7 +226,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
/* loop below will be exited when strmReadLine() returns EOF */
while(glbl.GetGlobalInputTermState() == 0) {
+ if(pThis->maxLinesAtOnce != 0 && nProcessed >= pThis->maxLinesAtOnce)
+ break;
CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode));
+ ++nProcessed;
*pbHadFileData = 1; /* this is just a flag, so set it and forget it */
CHKiRet(enqLine(pThis, pCStr)); /* process line */
rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */
@@ -230,6 +240,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
finalize_it:
+ if(pThis->multiSub.nElem > 0) {
+ /* submit everything that was not yet submitted */
+ CHKiRet(multiSubmitMsg(&pThis->multiSub));
+ }
; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */
/* Note: the problem above is that pthread:cleanup_pop() is a macro which
* evaluates to something like "} while(0);". So the code would become
@@ -474,6 +488,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iSeverity = 5; /* notice, as of rfc 3164 */
readMode = 0;
pBindRuleset = NULL;
+ maxLinesAtOnce = 10240;
RETiRet;
}
@@ -512,8 +527,12 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal)
pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile);
}
+ CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*)));
+ pThis->multiSub.maxElem = NUM_MULTISUB;
+ pThis->multiSub.nElem = 0;
pThis->iSeverity = iSeverity;
pThis->iFacility = iFacility;
+ pThis->maxLinesAtOnce = maxLinesAtOnce;
pThis->iPersistStateInterval = iPersistStateInterval;
pThis->nRecords = 0;
pThis->readMode = readMode;
@@ -591,6 +610,8 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &iPollInterval, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt,
NULL, &readMode, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize,
+ NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt,
NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord,
diff --git a/plugins/imklog/bsd.c b/plugins/imklog/bsd.c
index 0a4c7cd4..930bbd11 100644
--- a/plugins/imklog/bsd.c
+++ b/plugins/imklog/bsd.c
@@ -155,18 +155,18 @@ readklog(void)
for (p = (char*)pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) {
*q = '\0';
- Syslog(LOG_INFO, (uchar*) p);
+ Syslog(LOG_INFO, (uchar*) p, NULL);
}
len = strlen(p);
if (len >= iMaxLine - 1) {
- Syslog(LOG_INFO, (uchar*)p);
+ Syslog(LOG_INFO, (uchar*)p, NULL);
len = 0;
}
if (len > 0)
memmove(pRcv, p, len + 1);
}
if (len > 0)
- Syslog(LOG_INFO, pRcv);
+ Syslog(LOG_INFO, pRcv, NULL);
if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1)
free(pRcv);
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index 69c8cd1a..cb28e68e 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -18,7 +18,10 @@
* Please note that this file replaces the klogd daemon that was
* also present in pre-v3 versions of rsyslog.
*
- * Copyright (C) 2008, 2009 by Rainer Gerhards and Adiscon GmbH
+ * To test under Linux:
+ * echo test1 > /dev/kmsg
+ *
+ * Copyright (C) 2008-2011 by Rainer Gerhards and Adiscon GmbH
*
* This file is part of rsyslog.
*
@@ -93,15 +96,21 @@ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1
* rgerhards, 2008-04-12
*/
static rsRetVal
-enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity)
+enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *tp)
{
- DEFiRet;
+ struct syslogTime st;
msg_t *pMsg;
+ DEFiRet;
assert(msg != NULL);
assert(pszTag != NULL);
- CHKiRet(msgConstruct(&pMsg));
+ if(tp == NULL) {
+ CHKiRet(msgConstruct(&pMsg));
+ } else {
+ datetime.timeval2syslogTime(tp, &st);
+ CHKiRet(msgConstructWithTime(&pMsg, &st, tp->tv_sec));
+ }
MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
MsgSetInputName(pMsg, pInputName);
MsgSetRawMsgWOSize(pMsg, (char*)msg);
@@ -174,31 +183,48 @@ rsRetVal imklogLogIntMsg(int priority, char *fmt, ...)
va_end(ap);
iRet = enqMsg((uchar*)pLogMsg, (uchar*) ((iFacilIntMsg == LOG_KERN) ? "kernel:" : "imklog:"),
- iFacilIntMsg, LOG_PRI(priority));
+ iFacilIntMsg, LOG_PRI(priority), NULL);
RETiRet;
}
-/* log a kernel message
+/* log a kernel message. If tp is non-NULL, it contains the message creation
+ * time to use.
* rgerhards, 2008-04-14
*/
-rsRetVal Syslog(int priority, uchar *pMsg)
+rsRetVal Syslog(int priority, uchar *pMsg, struct timeval *tp)
{
- DEFiRet;
+ int pri = -1;
rsRetVal localRet;
+ DEFiRet;
- /* Output using syslog */
- localRet = parsePRI(&pMsg, &priority);
- if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK)
- FINALIZE;
+ /* then check if we have two PRIs. This can happen in case of systemd,
+ * in which case the second PRI is the rigth one.
+ * TODO: added kernel timestamp support to this PoC. -- rgerhards, 2011-03-18
+ */
+ if(pMsg[3] == '<') { /* could be a pri... */
+ uchar *pMsgTmp = pMsg + 3;
+ localRet = parsePRI(&pMsgTmp, &pri);
+ if(localRet == RS_RET_OK && pri >= 8 && pri <= 192) {
+ /* *this* is our PRI */
+ DBGPRINTF("imklog detected secondary PRI in klog msg\n");
+ pMsg = pMsgTmp;
+ priority = pri;
+ }
+ }
+ if(pri == -1) {
+ localRet = parsePRI(&pMsg, &priority);
+ if(localRet != RS_RET_INVALID_PRI && localRet != RS_RET_OK)
+ FINALIZE;
+ }
/* if we don't get the pri, we use whatever we were supplied */
/* ignore non-kernel messages if not permitted */
if(bPermitNonKernel == 0 && LOG_FAC(priority) != LOG_KERN)
FINALIZE; /* silently ignore */
- iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority));
+ iRet = enqMsg((uchar*)pMsg, (uchar*) "kernel:", LOG_FAC(priority), LOG_PRI(priority), tp);
finalize_it:
RETiRet;
diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h
index c183026d..39bdeb5c 100644
--- a/plugins/imklog/imklog.h
+++ b/plugins/imklog/imklog.h
@@ -5,7 +5,7 @@
* Major change: 2008-04-09: switched to a driver interface for
* several platforms
*
- * Copyright 2007-2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -56,7 +56,7 @@ extern uchar *pszPath;
/* the functions below may be called by the drivers */
rsRetVal imklogLogIntMsg(int priority, char *fmt, ...) __attribute__((format(printf,2, 3)));
-rsRetVal Syslog(int priority, uchar *msg);
+rsRetVal Syslog(int priority, uchar *msg, struct timeval *tp);
/* prototypes */
extern int klog_getMaxLine(void); /* work-around for klog drivers to get configured max line size */
diff --git a/plugins/imklog/linux.c b/plugins/imklog/linux.c
index 727708a5..b44619f5 100644
--- a/plugins/imklog/linux.c
+++ b/plugins/imklog/linux.c
@@ -28,6 +28,8 @@
#include "rsyslog.h"
#include <stdlib.h>
#include <stdio.h>
+#include <ctype.h>
+#include <time.h>
#include <assert.h>
#include <signal.h>
#include <string.h>
@@ -181,6 +183,93 @@ static int copyin( uchar *line, int space,
return(i);
}
+
+/* submit a message to imklog Syslog() API. In this function, we check if
+ * a kernel timestamp is present and, if so, extract and strip it.
+ * Note: this is an extra processing step. We should revisit the whole
+ * idea in v6 and remove all that old stuff that we do not longer need
+ * (like symbol resolution). <-- TODO
+ * Special thanks to Lennart Poettering for suggesting on how to convert
+ * the kernel timestamp to a realtime timestamp. This method depends on
+ * the fact the the kernel timestamp is written using the monotonic clock.
+ * Shall that change (very unlikely), this code must be changed as well. Note
+ * that due to the way we generate the delta, we are unable to write the
+ * absolutely correc timestamp (system call overhead of the clock calls
+ * prevents us from doing so). However, the difference is very minor.
+ * rgerhards, 201106-24
+ */
+static void
+submitSyslog(int pri, uchar *buf)
+{
+ long secs;
+ long nsecs;
+ long secOffs;
+ long nsecOffs;
+ unsigned i;
+ unsigned bufsize;
+ struct timespec monotonic, realtime;
+ struct timeval tv;
+ struct timeval *tp = NULL;
+
+ if(buf[3] != '[')
+ goto done;
+ DBGPRINTF("imklog: kernel timestamp detected, extracting it\n");
+
+ /* we now try to parse the timestamp. iff it parses, we assume
+ * it is a timestamp. Otherwise we know for sure it is no ts ;)
+ */
+ i = 4; /* first digit after '[' */
+ secs = 0;
+ while(buf[i] && isdigit(buf[i])) {
+ secs = secs * 10 + buf[i] - '0';
+ ++i;
+ }
+ if(buf[i] != '.') {
+ DBGPRINTF("no dot --> no kernel timestamp\n");
+ goto done; /* no TS! */
+ }
+
+ ++i; /* skip dot */
+ nsecs = 0;
+ while(buf[i] && isdigit(buf[i])) {
+ nsecs = nsecs * 10 + buf[i] - '0';
+ ++i;
+ }
+ if(buf[i] != ']') {
+ DBGPRINTF("no trailing ']' --> no kernel timestamp\n");
+ goto done; /* no TS! */
+ }
+ ++i; /* skip ']' */
+
+ /* we have a timestamp */
+ DBGPRINTF("kernel timestamp is %ld %ld\n", secs, nsecs);
+ bufsize= strlen((char*)buf);
+ memcpy(buf+3, buf+i, bufsize - i + 1);
+
+ clock_gettime(CLOCK_MONOTONIC, &monotonic);
+ clock_gettime(CLOCK_REALTIME, &realtime);
+ secOffs = realtime.tv_sec - monotonic.tv_sec;
+ nsecOffs = realtime.tv_nsec - monotonic.tv_nsec;
+ if(nsecOffs < 0) {
+ secOffs--;
+ nsecOffs += 1000000000l;
+ }
+
+ nsecs +=nsecOffs;
+ if(nsecs > 999999999l) {
+ secs++;
+ nsecs -= 1000000000l;
+ }
+ secs += secOffs;
+ tv.tv_sec = secs;
+ tv.tv_usec = nsecs / 1000;
+ tp = &tv;
+
+done:
+ Syslog(pri, buf, tp);
+}
+
+
/*
* Messages are separated by "\n". Messages longer than
* LOG_LINE_LENGTH are broken up.
@@ -235,7 +324,7 @@ static void LogLine(char *ptr, int len)
//dbgprintf("Line buffer full:\n");
//dbgprintf("\tLine: %s\n", line);
- Syslog(LOG_INFO, line_buff);
+ submitSyslog(LOG_INFO, line_buff);
line = line_buff;
space = sizeof(line_buff)-1;
parse_state = PARSING_TEXT;
@@ -254,28 +343,24 @@ static void LogLine(char *ptr, int len)
space -= delta;
len -= delta;
- if( space == 0 || len == 0 )
- {
+ if( space == 0 || len == 0 ) {
break; /* full line_buff or end of input buffer */
}
- if( *ptr == '\0' ) /* zero byte */
- {
+ if( *ptr == '\0' ) /* zero byte */ {
ptr++; /* skip zero byte */
space -= 1;
len -= 1;
-
break;
}
- if( *ptr == '\n' ) /* newline */
- {
+ if( *ptr == '\n' ) /* newline */ {
ptr++; /* skip newline */
space -= 1;
len -= 1;
*line = 0; /* force null terminator */
- Syslog(LOG_INFO, line_buff);
+ submitSyslog(LOG_INFO, line_buff);
line = line_buff;
space = sizeof(line_buff)-1;
if (symbols_twice) {
@@ -285,9 +370,7 @@ static void LogLine(char *ptr, int len)
skip_symbol_lookup = 1;
ptr = save_ptr;
len = save_len;
- }
- else
- {
+ } else {
skip_symbol_lookup = 0;
save_ptr = ptr;
save_len = len;
@@ -295,8 +378,7 @@ static void LogLine(char *ptr, int len)
}
break;
}
- if( *ptr == '[' ) /* possible kernel symbol */
- {
+ if( *ptr == '[' ) /* possible kernel symbol */ {
*line++ = *ptr++;
space -= 1;
len -= 1;
@@ -310,8 +392,7 @@ static void LogLine(char *ptr, int len)
break;
case PARSING_SYMSTART:
- if( *ptr != '<' )
- {
+ if( *ptr != '<' ) {
parse_state = PARSING_TEXT; /* not a symbol */
break;
}
diff --git a/plugins/imklog/solaris.c b/plugins/imklog/solaris.c
index 8a6d5af1..0a169cdd 100644
--- a/plugins/imklog/solaris.c
+++ b/plugins/imklog/solaris.c
@@ -80,74 +80,6 @@ klogWillRun(void)
}
-#if 0
-/* Read /dev/klog while data are available, split into lines.
- * Contrary to standard BSD syslogd, we do a blocking read. We can
- * afford this as imklog is running on its own threads. So if we have
- * a single file, it really doesn't matter if we wait inside a 1-file
- * select or the read() directly.
- */
-static void
-readklog(void)
-{
- char *p, *q;
- int len, i;
- int iMaxLine;
- uchar bufRcv[4096+1];
- uchar *pRcv = NULL; /* receive buffer */
-
- iMaxLine = klog_getMaxLine();
-
- /* we optimize performance: if iMaxLine is below 4K (which it is in almost all
- * cases, we use a fixed buffer on the stack. Only if it is higher, heap memory
- * is used. We could use alloca() to achive a similar aspect, but there are so
- * many issues with alloca() that I do not want to take that route.
- * rgerhards, 2008-09-02
- */
- if((size_t) iMaxLine < sizeof(bufRcv) - 1) {
- pRcv = bufRcv;
- } else {
- if((pRcv = (uchar*) malloc(sizeof(uchar) * (iMaxLine + 1))) == NULL)
- iMaxLine = sizeof(bufRcv) - 1; /* better this than noting */
- }
-
- len = 0;
- for (;;) {
- dbgprintf("----------imklog(BSD) waiting for kernel log line\n");
- i = read(fklog, pRcv + len, iMaxLine - len);
- if (i > 0) {
- pRcv[i + len] = '\0';
- } else {
- if (i < 0 && errno != EINTR && errno != EAGAIN) {
- imklogLogIntMsg(LOG_ERR,
- "imklog error %d reading kernel log - shutting down imklog",
- errno);
- fklog = -1;
- }
- break;
- }
-
- for(p = pRcv; (q = strchr(p, '\n')) != NULL; p = q + 1) {
- *q = '\0';
- Syslog(LOG_INFO, (uchar*) p);
- }
- len = strlen(p);
- if (len >= iMaxLine - 1) {
- Syslog(LOG_INFO, (uchar*)p);
- len = 0;
- }
- if (len > 0)
- memmove(pRcv, p, len + 1);
- }
- if (len > 0)
- Syslog(LOG_INFO, pRcv);
-
- if(pRcv != NULL && (size_t) iMaxLine >= sizeof(bufRcv) - 1)
- free(pRcv);
-}
-#endif
-
-
/* to be called in the module's AfterRun entry point
* rgerhards, 2008-04-09
*/
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 65fe703c..d5855879 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -49,6 +49,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
+#include <netinet/tcp.h>
#if HAVE_FCNTL_H
#include <fcntl.h>
#endif
@@ -87,6 +88,10 @@ DEFobjCurrIf(ruleset)
/* config settings */
typedef struct configSettings_s {
+ int bKeepAlive; /* support keep-alive packets */
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
int bEmitMsgOnClose; /* emit an informational message on close by remote peer */
int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
@@ -110,13 +115,17 @@ struct ptcpsrv_s {
ptcpsrv_t *pNext; /* linked list maintenance */
uchar *port; /* Port to listen to */
uchar *lstnIP; /* which IP we should listen on? */
- int bEmitMsgOnClose;
int iAddtlFrameDelim;
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
uchar *pszInputName;
prop_t *pInputName; /* InputName in (fast to process) property format */
ruleset_t *pRuleset;
ptcplstn_t *pLstn; /* root of our listeners */
ptcpsess_t *pSess; /* root of our sessions */
+ sbool bKeepAlive; /* support keep-alive packets */
+ sbool bEmitMsgOnClose;
};
/* the ptcp session object. Describes a single active session.
@@ -428,12 +437,80 @@ finalize_it:
}
+/* Enable KEEPALIVE handling on the socket. */
+static inline rsRetVal
+EnableKeepAlive(ptcplstn_t *pLstn, int sock)
+{
+ int ret;
+ int optval;
+ socklen_t optlen;
+ DEFiRet;
+
+ optval = 1;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
+ if(ret < 0) {
+ dbgprintf("EnableKeepAlive socket call returns error %d\n", ret);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveProbes > 0) {
+ optval = pLstn->pSrv->iKeepAliveProbes;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored");
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveTime > 0) {
+ optval = pLstn->pSrv->iKeepAliveTime;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored");
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveIntvl > 0) {
+ optval = pLstn->pSrv->iKeepAliveIntvl;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored");
+ }
+
+ dbgprintf("KEEPALIVE enabled for socket %d\n", sock);
+
+finalize_it:
+ RETiRet;
+}
+
/* accept an incoming connection request
* rgerhards, 2008-04-22
*/
static rsRetVal
-AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP)
+AcceptConnReq(ptcplstn_t *pLstn, int *newSock, prop_t **peerName, prop_t **peerIP)
{
int sockflags;
struct sockaddr_storage addr;
@@ -442,13 +519,17 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP)
DEFiRet;
- iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen);
+ iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen);
if(iNewSock < 0) {
if(errno == EAGAIN || errno == EWOULDBLOCK)
ABORT_FINALIZE(RS_RET_NO_MORE_DATA);
ABORT_FINALIZE(RS_RET_ACCEPT_ERR);
}
+ if(pLstn->pSrv->bKeepAlive)
+ EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */
+
+
CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr*) &addr));
/* set the new socket to non-blocking IO */
@@ -882,6 +963,10 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t)));
pSrv->pSess = NULL;
pSrv->pLstn = NULL;
+ pSrv->bKeepAlive = cs.bKeepAlive;
+ pSrv->iKeepAliveIntvl = cs.iKeepAliveTime;
+ pSrv->iKeepAliveProbes = cs.iKeepAliveProbes;
+ pSrv->iKeepAliveTime = cs.iKeepAliveTime;
pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose;
pSrv->port = pNewVal;
pSrv->iAddtlFrameDelim = cs.iAddtlFrameDelim;
@@ -945,7 +1030,7 @@ lstnActivity(ptcplstn_t *pLstn)
DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
while(1) {
- localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP);
+ localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP);
if(localRet == RS_RET_NO_MORE_DATA)
break;
CHKiRet(localRet);
@@ -1144,6 +1229,10 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
cs.bEmitMsgOnClose = 0;
+ cs.bKeepAlive = 0;
+ cs.iKeepAliveProbes = 0;
+ cs.iKeepAliveTime = 0;
+ cs.iKeepAliveIntvl = 0;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(cs.pszInputName);
cs.pszInputName = NULL;
@@ -1176,6 +1265,14 @@ CODEmodInit_QueryRegCFSLineHdlr
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary,
+ NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0,
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index 6ab39477..e9961af7 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -82,12 +82,14 @@ static permittedPeers_t *pPermPeersRoot = NULL;
/* config settings */
+static int bKeepAlive = 0; /* support keep-alive packets */
static int iTCPSessMax = 200; /* max number of sessions */
static int iTCPLstnMax = 20; /* max number of sessions */
static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */
static int bEmitMsgOnClose = 0; /* emit an informational message on close by remote peer */
static int iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; /* addtl frame delimiter, e.g. for netscreen, default none */
static int bDisableLFDelim = 0; /* disbale standard LF delimiter */
+static int bUseFlowControl = 1; /* use flow control, what means indicate ourselfs a "light delayable" */
static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */
static uchar *pszInputName = NULL; /* value for inputname property, NULL is OK and handled by core engine */
static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
@@ -191,6 +193,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
if(pOurTcpsrv == NULL) {
CHKiRet(tcpsrv.Construct(&pOurTcpsrv));
+ CHKiRet(tcpsrv.SetKeepAlive(pOurTcpsrv, bKeepAlive));
CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax));
CHKiRet(tcpsrv.SetLstnMax(pOurTcpsrv, iTCPLstnMax));
CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost));
@@ -199,6 +202,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose));
CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose));
CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode));
+ CHKiRet(tcpsrv.SetUseFlowControl(pOurTcpsrv, bUseFlowControl));
CHKiRet(tcpsrv.SetAddtlFrameDelim(pOurTcpsrv, iAddtlFrameDelim));
CHKiRet(tcpsrv.SetbDisableLFDelim(pOurTcpsrv, bDisableLFDelim));
CHKiRet(tcpsrv.SetNotificationOnRemoteClose(pOurTcpsrv, bEmitMsgOnClose));
@@ -287,8 +291,10 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
iTCPSessMax = 200;
+ bKeepAlive = 0;
iTCPLstnMax = 20;
iStrmDrvrMode = 0;
+ bUseFlowControl = 0;
bEmitMsgOnClose = 0;
iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
bDisableLFDelim = 0;
@@ -324,6 +330,8 @@ CODEmodInit_QueryRegCFSLineHdlr
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverkeepalive"), 0, eCmdHdlrBinary,
+ NULL, &bKeepAlive, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt,
NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt,
@@ -344,6 +352,8 @@ CODEmodInit_QueryRegCFSLineHdlr
eCmdHdlrGetWord, NULL, &pszInputName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverbindruleset"), 0,
eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpflowcontrol"), 0,
+ eCmdHdlrBinary, NULL, &bUseFlowControl, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index a5002591..0db6bf9a 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -26,6 +26,7 @@
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
@@ -51,6 +52,7 @@
#include "datetime.h"
#include "prop.h"
#include "ruleset.h"
+#include "statsobj.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -66,6 +68,16 @@ DEFobjCurrIf(net)
DEFobjCurrIf(datetime)
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
+DEFobjCurrIf(statsobj)
+
+
+static struct lstn_s {
+ struct lstn_s *next;
+ int sock; /* socket */
+ ruleset_t *pRuleset; /* bound ruleset */
+ statsobj_t *stats; /* listener stats */
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+} *lcnfRoot = NULL, *lcnfLast = NULL;
static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
static int iMaxLine; /* maximum UDP message size supported */
@@ -73,9 +85,7 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte
* This shall prevent remote DoS when the "discard on disallowed sender"
* message is configured to be logged on occurance of such a case.
*/
-static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements
- * read-only after init(), but beware of restart! */
-static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */
+
static uchar *pszBindAddr = NULL; /* IP to bind socket to */
static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc
* it so that we can check available memory in willRun() and request
@@ -190,9 +200,11 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
DEFiRet;
uchar *bindAddr;
int *newSocks;
- int *tmpSocks;
- int iSrc, iDst;
- ruleset_t **tmpRulesets;
+ int iSrc;
+ struct lstn_s *newlcnfinfo;
+ uchar *bindName;
+ uchar *port;
+ uchar statname[64];
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -203,55 +215,43 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
bindAddr = NULL;
else
bindAddr = pszBindAddr;
+ bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr;
- DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n",
- (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal);
+ DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, pNewVal);
- newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1);
+ port = (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal;
+ newSocks = net.create_udp_socket(bindAddr, port, 1);
if(newSocks != NULL) {
/* we now need to add the new sockets to the existing set */
- if(udpLstnSocks == NULL) {
- /* esay, we can just replace it */
- udpLstnSocks = newSocks;
- CHKmalloc(udpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (newSocks[0] + 1)));
- for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst)
- udpRulesets[iDst] = pBindRuleset;
- } else {
- /* we need to add them */
- tmpSocks = (int*) MALLOC(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]));
- tmpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0]));
- if(tmpSocks == NULL || tmpRulesets == NULL) {
- DBGPRINTF("out of memory trying to allocate udp listen socket array\n");
- /* in this case, we discard the new sockets but continue with what we
- * already have
- */
- free(newSocks);
- free(tmpSocks);
- free(tmpRulesets);
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- } else {
- /* ready to copy */
- iDst = 1;
- for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc, ++iDst) {
- tmpSocks[iDst] = udpLstnSocks[iSrc];
- tmpRulesets[iDst] = udpRulesets[iSrc];
- }
- for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) {
- tmpSocks[iDst] = newSocks[iSrc];
- tmpRulesets[iDst] = pBindRuleset;
- }
- tmpSocks[0] = udpLstnSocks[0] + newSocks[0];
- free(newSocks);
- free(udpLstnSocks);
- udpLstnSocks = tmpSocks;
- free(udpRulesets);
- udpRulesets = tmpRulesets;
- }
+ /* ready to copy */
+ for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) {
+ CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s)));
+ newlcnfinfo->next = NULL;
+ newlcnfinfo->sock = newSocks[iSrc];
+ newlcnfinfo->pRuleset = pBindRuleset;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
+ snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats));
+ /* link to list. Order must be preserved to take care for
+ * conflicting matches.
+ */
+ if(lcnfRoot == NULL)
+ lcnfRoot = newlcnfinfo;
+ if(lcnfLast == NULL)
+ lcnfLast = newlcnfinfo;
+ else
+ lcnfLast->next = newlcnfinfo;
}
}
finalize_it:
free(pNewVal); /* in any case, this is no longer needed */
+ free(newSocks);
RETiRet;
}
@@ -294,8 +294,7 @@ finalize_it:
* on scheduling order. -- rgerhards, 2008-10-02
*/
static inline rsRetVal
-processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
- ruleset_t *pRuleset)
+processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
DEFiRet;
int iNbrTimeUsed;
@@ -315,7 +314,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev,
if(pThrd->bShallStop == TRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
socklen = sizeof(struct sockaddr_storage);
- lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
+ lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
if(lenRcvBuf < 0) {
if(errno != EINTR && errno != EAGAIN) {
rs_strerror_r(errno, errStr, sizeof(errStr));
@@ -360,7 +359,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev,
*pbIsPermitted = 1; /* no check -> everything permitted */
}
- DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", fd, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
+ DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
if(*pbIsPermitted != 0) {
if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
@@ -370,13 +369,14 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev,
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
MsgSetInputName(pMsg, pInputName);
- MsgSetRuleset(pMsg, pRuleset);
+ MsgSetRuleset(pMsg, lstn->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL;
if(*pbIsPermitted == 2)
pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
CHKiRet(submitMsg(pMsg));
+ STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit);
}
}
@@ -443,6 +443,8 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
struct epoll_event *udpEPollEvt = NULL;
struct epoll_event currEvt[NUM_EPOLL_EVENTS];
char errStr[1024];
+ struct lstn_s *lstn;
+ int nLstn;
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
@@ -451,7 +453,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
- CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event)));
+ /* count num listeners -- do it here in order to avoid inconsistency */
+ nLstn = 0;
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next)
+ ++nLstn;
+ CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event)));
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
DBGPRINTF("imudp uses epoll_create1()\n");
@@ -471,16 +477,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* fill the epoll set - we need to do this only once, as the set
* can not change dyamically.
*/
- for (i = 0; i < *udpLstnSocks; i++) {
- if (udpLstnSocks[i+1] != -1) {
+ i = 0;
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+ if(lstn->sock != -1) {
udpEPollEvt[i].events = EPOLLIN | EPOLLET;
- udpEPollEvt[i].data.u64 = i+1;
- if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) {
+ udpEPollEvt[i].data.u64 = (long long unsigned) lstn;
+ if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) {
rs_strerror_r(errno, errStr, sizeof(errStr));
errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n",
- udpLstnSocks[i+1], errStr);
+ lstn->sock, errStr);
}
}
+ i++;
}
while(1) {
@@ -492,8 +500,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
break; /* terminate input! */
for(i = 0 ; i < nfds ; ++i) {
- processSocket(pThrd, udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted,
- udpRulesets[currEvt[i].data.u64]);
+ processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted);
}
}
@@ -510,10 +517,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
DEFiRet;
int maxfds;
int nfds;
- int i;
fd_set readfds;
struct sockaddr_storage frominetPrev;
int bIsPermitted;
+ struct lstn_s *lstn;
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
@@ -524,22 +531,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
DBGPRINTF("imudp uses select()\n");
while(1) {
- /* Add the Unix Domain Sockets to the list of read
- * descriptors.
- * rgerhards 2005-08-01: we must now check if there are
- * any local sockets to listen to at all. If the -o option
- * is given without -a, we do not need to listen at all..
+ /* Add the Unix Domain Sockets to the list of read descriptors.
*/
maxfds = 0;
FD_ZERO(&readfds);
/* Add the UDP listen sockets to the list of read descriptors. */
- for (i = 0; i < *udpLstnSocks; i++) {
- if (udpLstnSocks[i+1] != -1) {
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+ if (lstn->sock != -1) {
if(Debug)
- net.debugListenInfo(udpLstnSocks[i+1], "UDP");
- FD_SET(udpLstnSocks[i+1], &readfds);
- if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1];
+ net.debugListenInfo(lstn->sock, "UDP");
+ FD_SET(lstn->sock, &readfds);
+ if(lstn->sock>maxfds) maxfds=lstn->sock;
}
}
if(Debug) {
@@ -555,10 +558,9 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
- for(i = 0; nfds && i < *udpLstnSocks; i++) {
- if(FD_ISSET(udpLstnSocks[i+1], &readfds)) {
- processSocket(pThrd, udpLstnSocks[i+1], &frominetPrev, &bIsPermitted,
- udpRulesets[i+1]);
+ for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) {
+ if(FD_ISSET(lstn->sock, &readfds)) {
+ processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted);
--nfds; /* indicate we have processed one descriptor */
}
}
@@ -570,7 +572,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
#endif /* #if HAVE_EPOLL_CREATE1 */
/* This function is called to gather input.
- * Note that udpLstnSocks must be non-NULL because otherwise we would not have
+ * Note that sock must be non-NULL because otherwise we would not have
* indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
*/
BEGINrunInput
@@ -591,8 +593,10 @@ CODESTARTwillRun
net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */
/* if we could not set up any listners, there is no point in running... */
- if(udpLstnSocks == NULL)
+ if(lcnfRoot == NULL) {
+ DBGPRINTF("imudp: no listeners configured, will not run\n");
ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
iMaxLine = glbl.GetMaxLine();
@@ -602,15 +606,18 @@ ENDwillRun
BEGINafterRun
+ struct lstn_s *lstn, *lstnDel;
CODESTARTafterRun
/* do cleanup here */
net.clearAllowedSenders((uchar*)"UDP");
- if(udpLstnSocks != NULL) {
- net.closeUDPListenSockets(udpLstnSocks);
- udpLstnSocks = NULL;
- free(udpRulesets);
- udpRulesets = NULL;
+ for(lstn = lcnfRoot ; lstn != NULL ; ) {
+ statsobj.Destruct(&(lstn->stats));
+ close(lstn->sock);
+ lstnDel = lstn;
+ lstn = lstn->next;
+ free(lstnDel);
}
+ lcnfRoot = lcnfLast = NULL;
if(pRcvBuf != NULL) {
free(pRcvBuf);
pRcvBuf = NULL;
@@ -625,6 +632,7 @@ CODESTARTmodExit
/* release what we no longer need */
objRelease(errmsg, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(datetime, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
@@ -662,6 +670,7 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index feddb20c..403173e1 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -34,6 +34,7 @@
#include <string.h>
#include <errno.h>
#include <unistd.h>
+#include <fcntl.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <sys/socket.h>
@@ -135,7 +136,9 @@ typedef struct lstn_s {
sbool bParseHost; /* should parser parse host name? read-only after startup */
sbool bCreatePath; /* auto-creation of socket directory? */
sbool bUseCreds; /* pull original creator credentials from socket */
+ sbool bAnnotate; /* annotate events with trusted properties */
sbool bWritePid; /* write original PID into tag */
+ sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */
} lstn_t;
static lstn_t listeners[MAXFUNIX];
@@ -156,9 +159,13 @@ static int bUseFlowCtl = 0; /* use flow control or not (if yes, only LIGHT is u
static int bIgnoreTimestamp = 1; /* ignore timestamps present in the incoming message? */
static int bWritePid = 0; /* use credentials from recvmsg() and fixup PID in TAG */
static int bWritePidSysSock = 0; /* use credentials from recvmsg() and fixup PID in TAG */
+static int bUseSysTimeStamp = 1; /* use timestamp from system (rather than from message) */
+static int bUseSysTimeStampSysSock = 1; /* same, for system log socket */
+static int bAnnotate = 0; /* annotate trusted properties */
+static int bAnnotateSysSock = 0; /* same, for system log socket */
#define DFLT_bCreatePath 0
static int bCreatePath = DFLT_bCreatePath; /* auto-create socket path? */
-#define DFLT_ratelimitInterval 5
+#define DFLT_ratelimitInterval 0
static int ratelimitInterval = DFLT_ratelimitInterval; /* interval in seconds, 0 = off */
static int ratelimitIntervalSysSock = DFLT_ratelimitInterval;
#define DFLT_ratelimitBurst 200
@@ -300,8 +307,10 @@ addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
listeners[nfd].flags = bIgnoreTimestamp ? IGNDATE : NOFLAG;
listeners[nfd].bCreatePath = bCreatePath;
listeners[nfd].sockName = pNewVal;
- listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval) ? 1 : 0;
+ listeners[nfd].bUseCreds = (bWritePid || ratelimitInterval || bAnnotate) ? 1 : 0;
+ listeners[nfd].bAnnotate = bAnnotate;
listeners[nfd].bWritePid = bWritePid;
+ listeners[nfd].bUseSysTimeStamp = bUseSysTimeStamp;
nfd++;
} else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
@@ -415,9 +424,14 @@ openLogSocket(lstn_t *pLstn)
errmsg.LogError(errno, NO_ERRCODE, "set SCM_CREDENTIALS failed on '%s'", pLstn->sockName);
pLstn->bUseCreds = 0;
}
+// TODO: move to its own #if
+ if(setsockopt(pLstn->fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) != 0) {
+ errmsg.LogError(errno, NO_ERRCODE, "set SO_TIMESTAMP failed on '%s'", pLstn->sockName);
+ }
}
# else /* HAVE_SCM_CREDENTIALS */
pLstn->bUseCreds = 0;
+ pLstn->bAnnotate = 0;
# endif /* HAVE_SCM_CREDENTIALS */
finalize_it:
@@ -500,12 +514,109 @@ fixPID(uchar *bufTAG, int *lenTag, struct ucred *cred)
}
+/* Get an "trusted property" from the system. Returns an empty string if the
+ * property can not be obtained. Inspired by similiar functionality inside
+ * journald. Currently works with Linux /proc filesystem, only.
+ */
+static rsRetVal
+getTrustedProp(struct ucred *cred, char *propName, uchar *buf, size_t lenBuf, int *lenProp)
+{
+ int fd;
+ int i;
+ int lenRead;
+ char namebuf[1024];
+ DEFiRet;
+
+ if(snprintf(namebuf, sizeof(namebuf), "/proc/%lu/%s", (long unsigned) cred->pid,
+ propName) >= (int) sizeof(namebuf)) {
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if((fd = open(namebuf, O_RDONLY)) == -1) {
+ DBGPRINTF("error reading '%s'\n", namebuf);
+ *lenProp = 0;
+ FINALIZE;
+ }
+ if((lenRead = read(fd, buf, lenBuf - 1)) == -1) {
+ DBGPRINTF("error reading file data for '%s'\n", namebuf);
+ *lenProp = 0;
+ close(fd);
+ FINALIZE;
+ }
+
+ /* we strip after the first \n */
+ for(i = 0 ; i < lenRead ; ++i) {
+ if(buf[i] == '\n')
+ break;
+ else if(iscntrl(buf[i]))
+ buf[i] = ' ';
+ }
+ buf[i] = '\0';
+ *lenProp = i;
+
+ close(fd);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* read the exe trusted property path (so far, /proc fs only)
+ */
+static rsRetVal
+getTrustedExe(struct ucred *cred, uchar *buf, size_t lenBuf, int* lenProp)
+{
+ int lenRead;
+ char namebuf[1024];
+ DEFiRet;
+
+ if(snprintf(namebuf, sizeof(namebuf), "/proc/%lu/exe", (long unsigned) cred->pid)
+ >= (int) sizeof(namebuf)) {
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if((lenRead = readlink(namebuf, (char*)buf, lenBuf - 1)) == -1) {
+ DBGPRINTF("error reading link '%s'\n", namebuf);
+ *lenProp = 0;
+ FINALIZE;
+ }
+
+ buf[lenRead] = '\0';
+ *lenProp = lenRead;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* copy a trusted property in escaped mode. That is, the property can contain
+ * any character and so it must be properly quoted AND escaped.
+ * It is assumed the output buffer is large enough. Returns the number of
+ * characters added.
+ */
+static inline int
+copyescaped(uchar *dstbuf, uchar *inbuf, int inlen)
+{
+ int iDst, iSrc;
+
+ *dstbuf = '"';
+ for(iDst=1, iSrc=0 ; iSrc < inlen ; ++iDst, ++iSrc) {
+ if(inbuf[iSrc] == '"' || inbuf[iSrc] == '\\') {
+ dstbuf[iDst++] = '\\';
+ }
+ dstbuf[iDst] = inbuf[iSrc];
+ }
+ dstbuf[iDst++] = '"';
+ return iDst;
+}
+
+
/* submit received message to the queue engine
* We now parse the message according to expected format so that we
* can also mangle it if necessary.
*/
static inline rsRetVal
-SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
+SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct timeval *ts)
{
msg_t *pMsg;
int lenMsg;
@@ -519,6 +630,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
struct syslogTime st;
time_t tt;
rs_ratelimit_state_t *ratelimiter = NULL;
+ int lenProp;
+ uchar propBuf[1024];
+ uchar msgbuf[8192];
+ uchar *pmsgbuf;
+ int toffs; /* offset for trusted properties */
+ struct syslogTime dummyTS;
DEFiRet;
/* TODO: handle format errors?? */
@@ -544,12 +661,58 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */
}
- datetime.getCurrTime(&st, &tt);
+ if(ts == NULL) {
+ datetime.getCurrTime(&st, &tt);
+ } else {
+ datetime.timeval2syslogTime(ts, &st);
+ tt = ts->tv_sec;
+ }
+
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
FINALIZE;
}
+ /* created trusted properties */
+ if(cred != NULL && pLstn->bAnnotate) {
+ if((unsigned) (lenRcv + 4096) < sizeof(msgbuf)) {
+ pmsgbuf = msgbuf;
+ } else {
+ CHKmalloc(pmsgbuf = malloc(lenRcv+4096));
+ }
+ memcpy(pmsgbuf, pRcv, lenRcv);
+ memcpy(pmsgbuf+lenRcv, " @[", 3);
+ toffs = lenRcv + 3; /* next free location */
+ lenProp = snprintf((char*)propBuf, sizeof(propBuf), "_PID=%lu _UID=%lu _GID=%lu",
+ (long unsigned) cred->pid, (long unsigned) cred->uid,
+ (long unsigned) cred->gid);
+ memcpy(pmsgbuf+toffs, propBuf, lenProp);
+ toffs = toffs + lenProp;
+ getTrustedProp(cred, "comm", propBuf, sizeof(propBuf), &lenProp);
+ if(lenProp) {
+ memcpy(pmsgbuf+toffs, " _COMM=", 7);
+ memcpy(pmsgbuf+toffs+7, propBuf, lenProp);
+ toffs = toffs + 7 + lenProp;
+ }
+ getTrustedExe(cred, propBuf, sizeof(propBuf), &lenProp);
+ if(lenProp) {
+ memcpy(pmsgbuf+toffs, " _EXE=", 6);
+ memcpy(pmsgbuf+toffs+6, propBuf, lenProp);
+ toffs = toffs + 6 + lenProp;
+ }
+ getTrustedProp(cred, "cmdline", propBuf, sizeof(propBuf), &lenProp);
+ if(lenProp) {
+ memcpy(pmsgbuf+toffs, " _CMDLINE=", 10);
+ toffs = toffs + 10 +
+ copyescaped(pmsgbuf+toffs+10, propBuf, lenProp);
+ }
+ /* finalize string */
+ pmsgbuf[toffs] = ']';
+ pmsgbuf[toffs+1] = '\0';
+ pRcv = pmsgbuf;
+ lenRcv = toffs + 1;
+ }
+
/* we now create our own message object and submit it to the queue */
CHKiRet(msgConstructWithTime(&pMsg, &st, tt));
MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv);
@@ -564,15 +727,27 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
parse++; lenMsg--; /* '>' */
- if((pLstn->flags & IGNDATE)) {
- /* in this case, we still need to find out if we have a valid
- * datestamp or not .. and advance the parse pointer accordingly.
- */
- struct syslogTime dummy;
- datetime.ParseTIMESTAMP3164(&dummy, &parse, &lenMsg);
- } else {
- if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
- DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
+ if(ts == NULL) {
+ if((pLstn->flags & IGNDATE)) {
+ /* in this case, we still need to find out if we have a valid
+ * datestamp or not .. and advance the parse pointer accordingly.
+ */
+ datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg);
+ } else {
+ if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) {
+ DBGPRINTF("we have a problem, invalid timestamp in msg!\n");
+ }
+ }
+ } else { /* if we pulled the time from the system, we need to update the message text */
+ uchar *tmpParse = parse; /* just to check correctness of TS */
+ if(datetime.ParseTIMESTAMP3164(&dummyTS, &tmpParse, &lenMsg) == RS_RET_OK) {
+ /* We modify the message only if it contained a valid timestamp,
+ * otherwise we do not touch it at all. */
+ datetime.formatTimestamp3164(&st, (char*)parse, 0);
+ parse[15] = ' '; /* re-write \0 from fromatTimestamp3164 by SP */
+ /* update "counters" to reflect processed timestamp */
+ parse += 16;
+ lenMsg -= 16;
}
}
@@ -624,6 +799,7 @@ static rsRetVal readSocket(lstn_t *pLstn)
struct cmsghdr *cm;
# endif
struct ucred *cred;
+ struct timeval *ts;
uchar bufRcv[4096+1];
char aux[128];
uchar *pRcv = NULL; /* receive buffer */
@@ -662,21 +838,28 @@ static rsRetVal readSocket(lstn_t *pLstn)
dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd);
if(iRcvd > 0) {
cred = NULL;
-# if HAVE_SCM_CREDENTIALS
- if(pLstn->bUseCreds) {
- dbgprintf("XXX: pre CM loop, length of control message %d\n", (int) msgh.msg_controllen);
- for (cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) {
- dbgprintf("XXX: in CM loop, %d, %d\n", cm->cmsg_level, cm->cmsg_type);
- if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) {
+ ts = NULL;
+ if(pLstn->bUseCreds || pLstn->bUseSysTimeStamp) {
+ for(cm = CMSG_FIRSTHDR(&msgh); cm; cm = CMSG_NXTHDR(&msgh, cm)) {
+# if HAVE_SCM_CREDENTIALS
+ if( pLstn->bUseCreds
+ && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDENTIALS) {
cred = (struct ucred*) CMSG_DATA(cm);
- dbgprintf("XXX: got credentials pid %d\n", (int) cred->pid);
break;
}
+# endif /* HAVE_SCM_CREDENTIALS */
+# if HAVE_SO_TIMESTAMP
+ if( pLstn->bUseSysTimeStamp
+ && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) {
+ ts = (struct timeval *)CMSG_DATA(cm);
+ dbgprintf("XXX: got timestamp %ld.%ld\n",
+ (long) ts->tv_sec, (long) ts->tv_usec);
+ break;
+ }
+# endif /* HAVE_SO_TIMESTAMP */
}
- dbgprintf("XXX: post CM loop\n");
}
-# endif /* HAVE_SCM_CREDENTIALS */
- CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred));
+ CHKiRet(SubmitMsg(pRcv, iRcvd, pLstn, cred, ts));
} else if(iRcvd < 0 && errno != EINTR) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
@@ -786,8 +969,10 @@ CODESTARTwillRun
listeners[0].ratelimitInterval = ratelimitIntervalSysSock;
listeners[0].ratelimitBurst = ratelimitBurstSysSock;
listeners[0].ratelimitSev = ratelimitSeveritySysSock;
- listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0;
+ listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock || bAnnotateSysSock) ? 1 : 0;
listeners[0].bWritePid = bWritePidSysSock;
+ listeners[0].bAnnotate = bAnnotateSysSock;
+ listeners[0].bUseSysTimeStamp = bUseSysTimeStampSysSock;
sd_fds = sd_listen_fds(0);
if (sd_fds < 0) {
@@ -830,7 +1015,6 @@ CODESTARTafterRun
/* Clean-up files. */
for(i = startIndexUxLocalSockets; i < nfd; i++)
if (listeners[i].sockName && listeners[i].fd != -1) {
-
/* If systemd passed us a socket it is systemd's job to clean it up.
* Do not unlink it -- we will get same socket (node) from systemd
* e.g. on restart again.
@@ -900,6 +1084,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
bUseFlowCtl = 0;
bWritePid = 0;
bWritePidSysSock = 0;
+ bUseSysTimeStamp = 1;
+ bUseSysTimeStampSysSock = 1;
bCreatePath = DFLT_bCreatePath;
ratelimitInterval = DFLT_ratelimitInterval;
ratelimitIntervalSysSock = DFLT_ratelimitInterval;
@@ -934,7 +1120,9 @@ CODEmodInit_QueryRegCFSLineHdlr
listeners[0].fd = -1;
listeners[0].bParseHost = 0;
listeners[0].bUseCreds = 0;
+ listeners[0].bAnnotate = 0;
listeners[0].bCreatePath = 0;
+ listeners[0].bUseSysTimeStamp = 1;
/* initialize socket names */
for(i = 1 ; i < MAXFUNIX ; ++i) {
@@ -962,10 +1150,14 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &pLogHostName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketflowcontrol", 0, eCmdHdlrBinary,
NULL, &bUseFlowCtl, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketannotate", 0, eCmdHdlrBinary,
+ NULL, &bAnnotate, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketcreatepath", 0, eCmdHdlrBinary,
NULL, &bCreatePath, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusepidfromsystem", 0, eCmdHdlrBinary,
NULL, &bWritePid, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputunixlistensocketusesystimestamp", 0, eCmdHdlrBinary,
+ NULL, &bUseSysTimeStamp, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"addunixlistensocket", 0, eCmdHdlrGetWord,
addLstnSocketName, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"imuxsockratelimitinterval", 0, eCmdHdlrInt,
@@ -986,6 +1178,10 @@ CODEmodInit_QueryRegCFSLineHdlr
setSystemLogTimestampIgnore, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketflowcontrol", 0, eCmdHdlrBinary,
setSystemLogFlowControl, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusesystimestamp", 0, eCmdHdlrBinary,
+ NULL, &bUseSysTimeStampSysSock, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogsocketannotate", 0, eCmdHdlrBinary,
+ NULL, &bAnnotateSysSock, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogusepidfromsystem", 0, eCmdHdlrBinary,
NULL, &bWritePidSysSock, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"systemlogratelimitinterval", 0, eCmdHdlrInt,
diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c
index 767829d6..b78046ee 100644
--- a/plugins/mmsnmptrapd/mmsnmptrapd.c
+++ b/plugins/mmsnmptrapd/mmsnmptrapd.c
@@ -418,7 +418,7 @@ CODEmodInit_QueryRegCFSLineHdlr
cs.pszTagName = NULL;
cs.pszSeverityMapping = NULL;
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrInt,
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdtag", 0, eCmdHdlrGetWord,
NULL, &cs.pszTagName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"mmsnmptrapdseveritymapping", 0, eCmdHdlrGetWord,
NULL, &cs.pszSeverityMapping, STD_LOADABLE_MODULE_ID));
diff --git a/plugins/omhdfs/javaenv.sh b/plugins/omhdfs/javaenv.sh
new file mode 100644
index 00000000..d07a8473
--- /dev/null
+++ b/plugins/omhdfs/javaenv.sh
@@ -0,0 +1,14 @@
+# This is a sample file for environment settings on Fedora 13
+# that made me compile & run omhdfs. I really *hate* the way
+# java uses environment variables... Hopefully this file will
+# help building and testing omhdfs in the future (there is also
+# some more information in the rsyslog wiki).
+# rgerhards, 2011-03-11
+# this now works, but don't ask my why ;)
+#export JAVA_HOME=/usr/java/jdk1.6.0_21/bin/java
+export PATH=/usr/java/jdk1.6.0_21/bin:$PATH
+export JAVA_INCLUDES="-I/usr/java/jdk1.6.0_21/include -I/usr/java/jdk1.6.0_21/include/linux"
+export JAVA_LIBS="-L/usr/java/jdk1.6.0_21/jre/lib/i386 -ljava -ljvm -lverify"
+export HADOOP_HOME=/usr/lib/hadoop
+export CLASSPATH=/usr/lib/jvm/java-6-sun/lib:/usr/lib/hadoop/lib:/usr/lib/hadoop/hadoop-ant-0.20.2+320.jar:/usr/lib/hadoop/hadoop-core-0.20.2+320.jar:/usr/lib/hadoop/hadoop-examples-0.20.2+320.jar:/usr/lib/hadoop/hadoop-test-0.20.2+320.jar:/usr/lib/hadoop/hadoop-tools-0.20.2+320.jar/usr/lib/hadoop/lib/commons-cli-1.2.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-el-1.0.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/core-3.1.1.jar:/usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar:/usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar:/usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar:/usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar:/usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar:/usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-6.1.14.jar:/usr/lib/hadoop/lib/jetty-util-6.1.14.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.2.2.jar:/usr/lib/hadoop/lib/libfb303.jar:/usr/lib/hadoop/lib/libthrift.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mockito-all-1.8.2.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf
+###export CLASSPATH="/usr/lib/hadoop/hadoop-0.20.2+320-ant.jar: /usr/lib/hadoop/hadoop-0.20.2+320-core.jar: /usr/lib/hadoop/hadoop-0.20.2+320-examples.jar: /usr/lib/hadoop/hadoop-0.20.2+320-test.jar: /usr/lib/hadoop/hadoop-0.20.2+320-tools.jar: /usr/lib/hadoop/hadoop-ant-0.20.2+320.jar: /usr/lib/hadoop/hadoop-core-0.20.2+320.jar: /usr/lib/hadoop/hadoop-examples-0.20.2+320.jar: /usr/lib/hadoop/hadoop-test-0.20.2+320.jar: /usr/lib/hadoop/hadoop-tools-0.20.2+320.jar:/usr/lib/hadoop/lib: /usr/lib/hadoop/lib/commons-cli-1.2.jar: /usr/lib/hadoop/lib/commons-codec-1.3.jar: /usr/lib/hadoop/lib/commons-el-1.0.jar: /usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar: /usr/lib/hadoop/lib/commons-logging-1.0.4.jar: /usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar: /usr/lib/hadoop/lib/commons-net-1.4.1.jar: /usr/lib/hadoop/lib/core-3.1.1.jar: /usr/lib/hadoop/lib/hadoop-fairscheduler-0.20.2+320.jar: /usr/lib/hadoop/lib/hadoop-scribe-log4j-0.20.2+320.jar: /usr/lib/hadoop/lib/hsqldb-1.8.0.10.jar: /usr/lib/hadoop/lib/hsqldb.jar: /usr/lib/hadoop/lib/jackson-core-asl-1.0.1.jar: /usr/lib/hadoop/lib/jackson-mapper-asl-1.0.1.jar: /usr/lib/hadoop/lib/jasper-compiler-5.5.12.jar: /usr/lib/hadoop/lib/jasper-runtime-5.5.12.jar: /usr/lib/hadoop/lib/jets3t-0.6.1.jar: /usr/lib/hadoop/lib/jetty-6.1.14.jar: /usr/lib/hadoop/lib/jetty-util-6.1.14.jar: /usr/lib/hadoop/lib/junit-4.5.jar: /usr/lib/hadoop/lib/kfs-0.2.2.jar: /usr/lib/hadoop/lib/libfb303.jar: /usr/lib/hadoop/lib/libthrift.jar: /usr/lib/hadoop/lib/log4j-1.2.15.jar: /usr/lib/hadoop/lib/mockito-all-1.8.2.jar: /usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar: /usr/lib/hadoop/lib/oro-2.0.8.jar: /usr/lib/hadoop/lib/servlet-api-2.5-6.1.14.jar: /usr/lib/hadoop/lib/slf4j-api-1.4.3.jar: /usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar: /usr/lib/hadoop/lib/xmlenc-0.52.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib"
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 8b72747f..76128a4e 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -80,6 +80,8 @@ typedef struct {
typedef struct _instanceData {
file_t *pFile;
+ uchar ioBuf[64*1024];
+ unsigned offsBuf;
} instanceData;
/* forward definitions (down here, need data types) */
@@ -260,7 +262,8 @@ fileOpen(file_t *pFile)
if(errno == ENOENT) {
DBGPRINTF("omhdfs: ENOENT trying to append to '%s', now trying create\n",
pFile->name);
- pFile->fh = hdfsOpenFile(pFile->fs, (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
+ pFile->fh = hdfsOpenFile(pFile->fs,
+ (char*)pFile->name, O_WRONLY|O_CREAT, 0, 0, 0);
}
}
if(pFile->fh == NULL) {
@@ -275,12 +278,15 @@ finalize_it:
}
+/* Note: lenWrite is reset to zero on successful write! */
static inline rsRetVal
-fileWrite(file_t *pFile, uchar *buf)
+fileWrite(file_t *pFile, uchar *buf, size_t *lenWrite)
{
- size_t lenWrite;
DEFiRet;
+ if(*lenWrite == 0)
+ FINALIZE;
+
if(pFile->nUsers > 1)
d_pthread_mutex_lock(&pFile->mut);
@@ -294,18 +300,18 @@ fileWrite(file_t *pFile, uchar *buf)
}
}
- lenWrite = strlen((char*) buf);
- tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
- if((unsigned) num_written_bytes != lenWrite) {
- errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE, "omhdfs: failed to write %s, expected %lu bytes, "
- "written %lu\n", pFile->name, (unsigned long) lenWrite,
+dbgprintf("XXXXX: omhdfs writing %u bytes\n", *lenWrite);
+ tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, *lenWrite);
+ if((unsigned) num_written_bytes != *lenWrite) {
+ errmsg.LogError(errno, RS_RET_ERR_HDFS_WRITE,
+ "omhdfs: failed to write %s, expected %lu bytes, "
+ "written %lu\n", pFile->name, (unsigned long) *lenWrite,
(unsigned long) num_written_bytes);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ *lenWrite = 0;
finalize_it:
- if(pFile->nUsers > 1)
- d_pthread_mutex_unlock(&pFile->mut);
RETiRet;
}
@@ -333,6 +339,40 @@ finalize_it:
/* ---END FILE OBJECT---------------------------------------------------- */
+/* This adds data to the output buffer and performs an actual write
+ * if the new data does not fit into the buffer. Note that we never write
+ * partial data records. Other actions may write into the same file, and if
+ * we would write partial records, data could become severely mixed up.
+ * Note that we must check of some new data arrived is large than our
+ * buffer. In that case, the new data will written with its own
+ * write operation.
+ */
+static inline rsRetVal
+addData(instanceData *pData, uchar *buf)
+{
+ unsigned len;
+ DEFiRet;
+
+ len = strlen((char*)buf);
+ if(pData->offsBuf + len < sizeof(pData->ioBuf)) {
+ /* new data fits into remaining buffer */
+ memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len);
+ pData->offsBuf += len;
+ } else {
+dbgprintf("XXXXX: not enough room, need to flush\n");
+ CHKiRet(fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf));
+ if(len >= sizeof(pData->ioBuf)) {
+ CHKiRet(fileWrite(pData->pFile, buf, &len));
+ } else {
+ memcpy((char*) pData->ioBuf + pData->offsBuf, buf, len);
+ pData->offsBuf += len;
+ }
+ }
+
+ iRet = RS_RET_DEFER_COMMIT;
+finalize_it:
+ RETiRet;
+}
BEGINcreateInstance
CODESTARTcreateInstance
@@ -358,13 +398,31 @@ CODESTARTtryResume
}
ENDtryResume
+
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+dbgprintf("omhdfs: beginTransaction\n");
+ENDbeginTransaction
+
+
BEGINdoAction
CODESTARTdoAction
- DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
- iRet = fileWrite(pData->pFile, ppString[0]);
+ DBGPRINTF("omhdfs: action to to write to %s\n", pData->pFile->name);
+ iRet = addData(pData, ppString[0]);
+dbgprintf("omhdfs: done doAction\n");
ENDdoAction
+BEGINendTransaction
+CODESTARTendTransaction
+dbgprintf("omhdfs: endTransaction\n");
+ if(pData->offsBuf != 0) {
+ DBGPRINTF("omhdfs: data unwritten at end of transaction, persisting...\n");
+ iRet = fileWrite(pData->pFile, pData->ioBuf, &pData->offsBuf);
+ }
+ENDendTransaction
+
+
BEGINparseSelectorAct
file_t *pFile;
int r;
@@ -409,6 +467,7 @@ CODESTARTparseSelectorAct
}
fileObjAddUser(pFile);
pData->pFile = pFile;
+ pData->offsBuf = 0;
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -455,6 +514,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
CODEqueryEtryPt_doHUP
ENDqueryEtryPt
@@ -472,5 +532,6 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &hdfsPort, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &dfltTplName, NULL));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
+ DBGPRINTF("omhdfs: module compiled with rsyslog version %s.\n", VERSION);
CODEmodInit_QueryRegCFSLineHdlr
ENDmodInit
diff --git a/runtime/conf.c b/runtime/conf.c
index d731c786..cbba1790 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -399,6 +399,7 @@ processConfFile(uchar *pConfFile)
uchar cbuf[CFGLNSIZ];
uchar *cline;
int i;
+ rsRetVal localRet;
int bHadAnError = 0;
uchar *pszOrgLine = NULL;
size_t lenLine;
@@ -457,16 +458,20 @@ processConfFile(uchar *pConfFile)
/* we now have the complete line, and are positioned at the first non-whitespace
* character. So let's process it
*/
- if(cfline(cbuf, &pCurrRule) != RS_RET_OK) {
+ if((localRet = cfline(cbuf, &pCurrRule)) != RS_RET_OK) {
/* we log a message, but otherwise ignore the error. After all, the next
* line can be correct. -- rgerhards, 2007-08-02
*/
uchar szErrLoc[MAXFNAME + 64];
- dbgprintf("config line NOT successfully processed\n");
+ if(localRet != RS_RET_OK_WARN) {
+ dbgprintf("config line NOT successfully processed\n");
+ bHadAnError = 1;
+ }
snprintf((char*)szErrLoc, sizeof(szErrLoc) / sizeof(uchar),
"%s, line %d", pConfFile, iLnNbr);
- errmsg.LogError(0, NO_ERRCODE, "the last error occured in %s:\"%s\"", (char*)szErrLoc, (char*)pszOrgLine);
- bHadAnError = 1;
+ errmsg.LogError(0, NO_ERRCODE, "the last %s occured in %s:\"%s\"",
+ (localRet == RS_RET_OK_WARN) ? "warning" : "error",
+ (char*)szErrLoc, (char*)pszOrgLine);
}
}
@@ -648,17 +653,14 @@ static rsRetVal cflineProcessTradPRIFilter(uchar **pline, register rule_t *pRule
for (bp=buf; *(bp+1); bp++)
*bp=*(bp+1);
*bp='\0';
- }
- else {
+ } else {
ignorepri = 0;
}
- if ( *buf == '=' )
- {
+ if ( *buf == '=' ) {
singlpri = 1;
pri = decodeSyslogName(&buf[1], syslogPriNames);
}
- else {
- singlpri = 0;
+ else { singlpri = 0;
pri = decodeSyslogName(buf, syslogPriNames);
}
@@ -686,17 +688,13 @@ static rsRetVal cflineProcessTradPRIFilter(uchar **pline, register rule_t *pRule
pRule->f_filterData.f_pmask[i] &= ~(1<<pri);
else
pRule->f_filterData.f_pmask[i] |= (1<<pri);
- }
- else
- {
+ } else {
if ( pri == TABLE_ALLPRI ) {
if ( ignorepri )
pRule->f_filterData.f_pmask[i] = TABLE_NOPRI;
else
pRule->f_filterData.f_pmask[i] = TABLE_ALLPRI;
- }
- else
- {
+ } else {
if ( ignorepri )
for (i2= 0; i2 <= pri; ++i2)
pRule->f_filterData.f_pmask[i] &= ~(1<<i2);
@@ -1082,6 +1080,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction)
omodStringRequest_t *pOMSR;
action_t *pAction = NULL;
void *pModData;
+ int bHadWarning = 0;
ASSERT(p != NULL);
ASSERT(ppAction != NULL);
@@ -1097,6 +1096,10 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction)
pOMSR = NULL;
iRet = pMod->mod.om.parseSelectorAct(p, &pModData, &pOMSR);
dbgprintf("tried selector action for %s: %d\n", module.GetName(pMod), iRet);
+ if(iRet == RS_RET_OK_WARN) {
+ bHadWarning = 1;
+ iRet = RS_RET_OK;
+ }
if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) {
if((iRet = addAction(&pAction, pMod, pModData, pOMSR, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* now check if the module is compatible with select features */
@@ -1125,6 +1128,8 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction)
}
*ppAction = pAction;
+ if(iRet == RS_RET_OK && bHadWarning)
+ iRet = RS_RET_OK_WARN;
RETiRet;
}
@@ -1139,6 +1144,8 @@ cflineClassic(uchar *p, rule_t **ppRule)
{
DEFiRet;
action_t *pAction;
+ rsRetVal localRet;
+ int bHadWarning = 0;
/* lines starting with '&' have no new filters and just add
* new actions to the currently processed selector.
@@ -1165,10 +1172,17 @@ cflineClassic(uchar *p, rule_t **ppRule)
CHKiRet(cflineDoFilter(&p, *ppRule)); /* pull filters */
}
- CHKiRet(cflineDoAction(&p, &pAction));
+ localRet = cflineDoAction(&p, &pAction);
+ if(localRet == RS_RET_OK_WARN) {
+ bHadWarning = 1;
+ } else {
+ CHKiRet(localRet);
+ }
CHKiRet(llAppend(&(*ppRule)->llActList, NULL, (void*) pAction));
finalize_it:
+ if(iRet == RS_RET_OK && bHadWarning)
+ iRet = RS_RET_OK_WARN;
RETiRet;
}
diff --git a/runtime/datetime.c b/runtime/datetime.c
index 679ce0b4..85cbab84 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -53,6 +53,47 @@ static const int tenPowers[6] = { 1, 10, 100, 1000, 10000, 100000 };
/* ------------------------------ methods ------------------------------ */
+/**
+ * Convert struct timeval to syslog_time
+ */
+void
+timeval2syslogTime(struct timeval *tp, struct syslogTime *t)
+{
+ struct tm *tm;
+ struct tm tmBuf;
+ long lBias;
+
+ tm = localtime_r((time_t*) &(tp->tv_sec), &tmBuf);
+
+ t->year = tm->tm_year + 1900;
+ t->month = tm->tm_mon + 1;
+ t->day = tm->tm_mday;
+ t->hour = tm->tm_hour;
+ t->minute = tm->tm_min;
+ t->second = tm->tm_sec;
+ t->secfrac = tp->tv_usec;
+ t->secfracPrecision = 6;
+
+# if __sun
+ /* Solaris uses a different method of exporting the time zone.
+ * It is UTC - localtime, which is the opposite sign of mins east of GMT.
+ */
+ lBias = -(daylight ? altzone : timezone);
+# elif defined(__hpux)
+ lBias = tz.tz_dsttime ? - tz.tz_minuteswest : 0;
+# else
+ lBias = tm->tm_gmtoff;
+# endif
+ if(lBias < 0) {
+ t->OffsetMode = '-';
+ lBias *= -1;
+ } else
+ t->OffsetMode = '+';
+ t->OffsetHour = lBias / 3600;
+ t->OffsetMinute = (lBias % 3600) / 60;
+ t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */
+}
+
/**
* Get the current date/time in the best resolution the operating
* system has to offer (well, actually at most down to the milli-
@@ -72,9 +113,6 @@ static const int tenPowers[6] = { 1, 10, 100, 1000, 10000, 100000 };
static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
{
struct timeval tp;
- struct tm *tm;
- struct tm tmBuf;
- long lBias;
# if defined(__hpux)
struct timezone tz;
# endif
@@ -91,37 +129,7 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
if(ttSeconds != NULL)
*ttSeconds = tp.tv_sec;
- tm = localtime_r((time_t*) &(tp.tv_sec), &tmBuf);
-
- t->year = tm->tm_year + 1900;
- t->month = tm->tm_mon + 1;
- t->day = tm->tm_mday;
- t->hour = tm->tm_hour;
- t->minute = tm->tm_min;
- t->second = tm->tm_sec;
- t->secfrac = tp.tv_usec;
- t->secfracPrecision = 6;
-
-# if __sun
- /* Solaris uses a different method of exporting the time zone.
- * It is UTC - localtime, which is the opposite sign of mins east of GMT.
- */
- lBias = -(daylight ? altzone : timezone);
-# elif defined(__hpux)
- lBias = tz.tz_dsttime ? - tz.tz_minuteswest : 0;
-# else
- lBias = tm->tm_gmtoff;
-# endif
- if(lBias < 0)
- {
- t->OffsetMode = '-';
- lBias *= -1;
- }
- else
- t->OffsetMode = '+';
- t->OffsetHour = lBias / 3600;
- t->OffsetMinute = (lBias % 3600) / 60;
- t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */
+ timeval2syslogTime(&tp, t);
}
@@ -859,6 +867,7 @@ CODESTARTobjQueryInterface(datetime)
*/
pIf->getCurrTime = getCurrTime;
pIf->GetTime = getTime;
+ pIf->timeval2syslogTime = timeval2syslogTime;
pIf->ParseTIMESTAMP3339 = ParseTIMESTAMP3339;
pIf->ParseTIMESTAMP3164 = ParseTIMESTAMP3164;
pIf->formatTimestampToMySQL = formatTimestampToMySQL;
diff --git a/runtime/datetime.h b/runtime/datetime.h
index 7fcd273b..acf54df5 100644
--- a/runtime/datetime.h
+++ b/runtime/datetime.h
@@ -42,8 +42,10 @@ BEGINinterface(datetime) /* name must also be changed in ENDinterface macro! */
int (*formatTimestampSecFrac)(struct syslogTime *ts, char* pBuf);
/* v3, 2009-11-12 */
time_t (*GetTime)(time_t *ttSeconds);
+ /* v6, 2011-06-20 */
+ void (*timeval2syslogTime)(struct timeval *tp, struct syslogTime *t);
ENDinterface(datetime)
-#define datetimeCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */
+#define datetimeCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
/* interface changes:
* 1 - initial version
* 2 - not compatible to 1 - bugfix required ParseTIMESTAMP3164 to accept char ** as
@@ -52,6 +54,7 @@ ENDinterface(datetime)
* 3 - taken by v5 branch!
* 4 - formatTimestamp3164 takes a third int parameter
* 5 - merge of versions 3 + 4 (2010-03-09)
+ * 6 - see above
*/
/* prototypes */
diff --git a/runtime/module-template.h b/runtime/module-template.h
index e21d6157..63bf9a10 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -275,7 +275,7 @@ static rsRetVal parseSelectorAct(uchar **pp, void **ppModData, omodStringRequest
#define CODE_STD_FINALIZERparseSelectorAct \
finalize_it:\
- if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) {\
+ if(iRet == RS_RET_OK || iRet == RS_RET_OK_WARN || iRet == RS_RET_SUSPENDED) {\
*ppModData = pData;\
*pp = p;\
} else {\
diff --git a/runtime/modules.c b/runtime/modules.c
index 4541bddf..6a32b2e8 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -767,7 +767,6 @@ Load(uchar *pModName)
DEFiRet;
size_t iPathLen, iModNameLen;
- uchar szPath[PATH_MAX];
uchar *pModNameCmp;
int bHasExtension;
void *pModHdlr, *pModInit;
@@ -775,13 +774,25 @@ Load(uchar *pModName)
uchar *pModDirCurr, *pModDirNext;
int iLoadCnt;
struct dlhandle_s *pHandle = NULL;
+# ifdef PATH_MAX
+ uchar pathBuf[PATH_MAX+1];
+# else
+ uchar pathBuf[4096];
+# endif
+ uchar *pPathBuf = pathBuf;
+ size_t lenPathBuf = sizeof(pathBuf);
assert(pModName != NULL);
dbgprintf("Requested to load module '%s'\n", pModName);
+ iModNameLen = strlen((char*)pModName);
+ /* overhead for a full path is potentially 1 byte for a slash,
+ * three bytes for ".so" and one byte for '\0'.
+ */
+# define PATHBUF_OVERHEAD 1 + iModNameLen + 3 + 1
+
pthread_mutex_lock(&mutLoadUnload);
- iModNameLen = strlen((char *) pModName);
if(iModNameLen > 3 && !strcmp((char *) pModName + iModNameLen - 3, ".so")) {
iModNameLen -= 3;
bHasExtension = TRUE;
@@ -802,13 +813,19 @@ Load(uchar *pModName)
pModDirNext = NULL;
pModHdlr = NULL;
iLoadCnt = 0;
- do {
- /* now build our load module name */
+ do { /* now build our load module name */
if(*pModName == '/' || *pModName == '.') {
- *szPath = '\0'; /* we do not need to append the path - its already in the module name */
+ if(lenPathBuf < PATHBUF_OVERHEAD) {
+ if(pPathBuf != pathBuf) /* already malloc()ed memory? */
+ free(pPathBuf);
+ /* we always alloc enough memory for everything we potentiall need to add */
+ lenPathBuf = PATHBUF_OVERHEAD;
+ CHKmalloc(pPathBuf = malloc(sizeof(char)*lenPathBuf));
+ }
+ *pPathBuf = '\0'; /* we do not need to append the path - its already in the module name */
iPathLen = 0;
} else {
- *szPath = '\0';
+ *pPathBuf = '\0';
iPathLen = strlen((char *)pModDirCurr);
pModDirNext = (uchar *)strchr((char *)pModDirCurr, ':');
@@ -821,30 +838,27 @@ Load(uchar *pModName)
continue;
}
break;
- } else if(iPathLen > sizeof(szPath) - 1) {
- errmsg.LogError(0, NO_ERRCODE, "could not load module '%s', module path too long\n", pModName);
- ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_PATHLEN);
+ } else if(iPathLen > lenPathBuf - PATHBUF_OVERHEAD) {
+ if(pPathBuf != pathBuf) /* already malloc()ed memory? */
+ free(pPathBuf);
+ /* we always alloc enough memory for everything we potentiall need to add */
+ lenPathBuf = iPathLen + PATHBUF_OVERHEAD;
+ CHKmalloc(pPathBuf = malloc(sizeof(char)*lenPathBuf));
}
- strncat((char *) szPath, (char *)pModDirCurr, iPathLen);
- iPathLen = strlen((char*) szPath);
+ memcpy((char *) pPathBuf, (char *)pModDirCurr, iPathLen);
+ if((pPathBuf[iPathLen - 1] != '/')) {
+ /* we have space, made sure in previous check */
+ pPathBuf[iPathLen++] = '/';
+ }
+ pPathBuf[iPathLen] = '\0';
if(pModDirNext)
pModDirCurr = pModDirNext + 1;
-
- if((szPath[iPathLen - 1] != '/')) {
- if((iPathLen <= sizeof(szPath) - 2)) {
- szPath[iPathLen++] = '/';
- szPath[iPathLen] = '\0';
- } else {
- errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_PATHLEN, "could not load module '%s', path too long\n", pModName);
- ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_PATHLEN);
- }
- }
}
/* ... add actual name ... */
- strncat((char *) szPath, (char *) pModName, sizeof(szPath) - iPathLen - 1);
+ strncat((char *) pPathBuf, (char *) pModName, lenPathBuf - iPathLen - 1);
/* now see if we have an extension and, if not, append ".so" */
if(!bHasExtension) {
@@ -853,17 +867,12 @@ Load(uchar *pModName)
* algo over time... -- rgerhards, 2008-03-05
*/
/* ... so now add the extension */
- strncat((char *) szPath, ".so", sizeof(szPath) - strlen((char*) szPath) - 1);
+ strncat((char *) pPathBuf, ".so", lenPathBuf - strlen((char*) pPathBuf) - 1);
iPathLen += 3;
}
- if(iPathLen + strlen((char*) pModName) >= sizeof(szPath)) {
- errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_PATHLEN, "could not load module '%s', path too long\n", pModName);
- ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_PATHLEN);
- }
-
/* complete load path constructed, so ... GO! */
- dbgprintf("loading module '%s'\n", szPath);
+ dbgprintf("loading module '%s'\n", pPathBuf);
/* see if we have this one already */
for (pHandle = pHandles; pHandle; pHandle = pHandle->next) {
@@ -875,7 +884,7 @@ Load(uchar *pModName)
/* not found, try to dynamically link it */
if (!pModHdlr) {
- pModHdlr = dlopen((char *) szPath, RTLD_NOW);
+ pModHdlr = dlopen((char *) pPathBuf, RTLD_NOW);
}
iLoadCnt++;
@@ -884,25 +893,28 @@ Load(uchar *pModName)
if(!pModHdlr) {
if(iLoadCnt) {
- errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_DLOPEN, "could not load module '%s', dlopen: %s\n", szPath, dlerror());
+ errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_DLOPEN, "could not load module '%s', dlopen: %s\n",
+ pPathBuf, dlerror());
} else {
- errmsg.LogError(0, NO_ERRCODE, "could not load module '%s', ModDir was '%s'\n", szPath,
+ errmsg.LogError(0, NO_ERRCODE, "could not load module '%s', ModDir was '%s'\n", pPathBuf,
((pModDir == NULL) ? _PATH_MODDIR : (char *)pModDir));
}
ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_DLOPEN);
}
if(!(pModInit = dlsym(pModHdlr, "modInit"))) {
- errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_NO_INIT, "could not load module '%s', dlsym: %s\n", szPath, dlerror());
+ errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_NO_INIT, "could not load module '%s', dlsym: %s\n", pPathBuf, dlerror());
dlclose(pModHdlr);
ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_NO_INIT);
}
if((iRet = doModInit(pModInit, (uchar*) pModName, pModHdlr)) != RS_RET_OK) {
- errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_INIT_FAILED, "could not load module '%s', rsyslog error %d\n", szPath, iRet);
+ errmsg.LogError(0, RS_RET_MODULE_LOAD_ERR_INIT_FAILED, "could not load module '%s', rsyslog error %d\n", pPathBuf, iRet);
dlclose(pModHdlr);
ABORT_FINALIZE(RS_RET_MODULE_LOAD_ERR_INIT_FAILED);
}
finalize_it:
+ if(pPathBuf != pathBuf) /* used malloc()ed memory? */
+ free(pPathBuf);
pthread_mutex_unlock(&mutLoadUnload);
RETiRet;
}
diff --git a/runtime/msg.c b/runtime/msg.c
index 31863b2d..810a396e 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -1671,8 +1671,6 @@ void MsgSetTAG(msg_t *pMsg, uchar* pszBuf, size_t lenBuf)
uchar *pBuf;
assert(pMsg != NULL);
-dbgprintf("MsgSetTAG in: len %d, pszBuf: %s\n", lenBuf, pszBuf);
-
freeTAG(pMsg);
pMsg->iLenTAG = lenBuf;
@@ -1691,8 +1689,6 @@ dbgprintf("MsgSetTAG in: len %d, pszBuf: %s\n", lenBuf, pszBuf);
memcpy(pBuf, pszBuf, pMsg->iLenTAG);
pBuf[pMsg->iLenTAG] = '\0'; /* this also works with truncation! */
-
-dbgprintf("MsgSetTAG exit: pMsg->iLenTAG %d, pMsg->TAG.szBuf: %s\n", pMsg->iLenTAG, pMsg->TAG.szBuf);
}
diff --git a/runtime/nsd_gtls.c b/runtime/nsd_gtls.c
index 036e8290..e1192aaf 100644
--- a/runtime/nsd_gtls.c
+++ b/runtime/nsd_gtls.c
@@ -29,7 +29,9 @@
#include <string.h>
#include <gnutls/gnutls.h>
#include <gnutls/x509.h>
-#include <gcrypt.h>
+#if GNUTLS_VERSION_NUMBER <= 0x020b00
+# include <gcrypt.h>
+#endif
#include <errno.h>
#include <sys/stat.h>
#include <unistd.h>
@@ -54,7 +56,9 @@
#define CRLFILE "crl.pem"
+#if GNUTLS_VERSION_NUMBER <= 0x020b00
GCRY_THREAD_OPTION_PTHREAD_IMPL;
+#endif
MODULE_TYPE_LIB
MODULE_TYPE_KEEP
@@ -567,7 +571,9 @@ gtlsGlblInit(void)
DEFiRet;
/* gcry_control must be called first, so that the thread system is correctly set up */
+ #if GNUTLS_VERSION_NUMBER <= 0x020b00
gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+ #endif
CHKgnutls(gnutls_global_init());
/* X509 stuff */
diff --git a/runtime/parser.c b/runtime/parser.c
index b385c54b..300db1e0 100644
--- a/runtime/parser.c
+++ b/runtime/parser.c
@@ -60,6 +60,7 @@ DEFobjCurrIf(ruleset)
/* config data */
static uchar cCCEscapeChar = '#';/* character to be used to start an escape sequence for control chars */
static int bEscapeCCOnRcv = 1; /* escape control characters on reception: 0 - no, 1 - yes */
+static int bSpaceLFOnRcv = 0; /* replace newlines with spaces on reception: 0 - no, 1 - yes */
static int bEscape8BitChars = 0; /* escape characters > 127 on reception: 0 - no, 1 - yes */
static int bEscapeTab = 1; /* escape tab control character when doing CC escapes: 0 - no, 1 - yes */
static int bDropTrailingLF = 1; /* drop trailing LF's on reception? */
@@ -354,9 +355,13 @@ SanitizeMsg(msg_t *pMsg)
int bNeedSanitize = 0;
for(iSrc = 0 ; iSrc < lenMsg ; iSrc++) {
if(iscntrl(pszMsg[iSrc])) {
+ if(bSpaceLFOnRcv && pszMsg[iSrc] == '\n')
+ pszMsg[iSrc] = ' ';
+ else
if(pszMsg[iSrc] == '\0' || bEscapeCCOnRcv) {
bNeedSanitize = 1;
- break;
+ if (!bSpaceLFOnRcv)
+ break;
}
} else if(pszMsg[iSrc] > 127 && bEscape8BitChars) {
bNeedSanitize = 1;
@@ -645,6 +650,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
{
cCCEscapeChar = '#';
bEscapeCCOnRcv = 1; /* default is to escape control characters */
+ bSpaceLFOnRcv = 0;
bEscape8BitChars = 0; /* default is to escape control characters */
bEscapeTab = 1; /* default is to escape control characters */
bDropTrailingLF = 1; /* default is to drop trailing LF's on reception */
@@ -698,6 +704,7 @@ BEGINObjClassInit(parser, 1, OBJ_IS_CORE_MODULE) /* class, version */
CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"spacelfonreceive", 0, eCmdHdlrBinary, NULL, &bSpaceLFOnRcv, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"escape8bitcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscape8BitChars, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactertab", 0, eCmdHdlrBinary, NULL, &bEscapeTab, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
diff --git a/runtime/queue.c b/runtime/queue.c
index 9012abeb..56d05571 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -12,7 +12,7 @@
* function names - this makes it really hard to read and does not provide much
* benefit, at least I (now) think so...
*
- * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -83,6 +83,11 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr);
+static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qDestructDisk(qqueue_t *pThis);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -592,6 +597,47 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
/* -------------------- disk -------------------- */
+/* The following function is used to "save" ourself from being killed by
+ * a fatally failed disk queue. A fatal failure is, for example, if no
+ * data can be read or written. In that case, the disk support is disabled,
+ * with all on-disk structures kept as-is as much as possible. Instead, the
+ * queue is switched to direct mode, so that at least
+ * some processing can happen. Of course, this may still have lots of
+ * undesired side-effects, but is probably better than aborting the
+ * syslogd. Note that this function *must* succeed in one way or another, as
+ * we can not recover from failure here. But it may emit different return
+ * states, which can trigger different processing in the higher layers.
+ * rgerhards, 2011-05-03
+ */
+static inline rsRetVal
+queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
+{
+ pThis->iQueueSize = 0;
+ pThis->nLogDeq = 0;
+ qDestructDisk(pThis); /* free disk structures */
+
+ pThis->qType = QUEUETYPE_DIRECT;
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ if(pThis->pqParent != NULL) {
+ DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
+ pThis->pqParent->bIsDA = 0;
+ pThis->pqParent->pqDA = NULL;
+ /* This may have undesired side effects, not sure if I really evaluated
+ * all. So you know where to look at if you come to this point during
+ * troubleshooting ;) -- rgerhards, 2011-05-03
+ */
+ }
+
+ errmsg.LogError(0, initiatingError, "fatal error on disk queue '%s', emergency switch to direct mode",
+ obj.GetName((obj_t*) pThis));
+ return RS_RET_ERR_QUEUE_EMERGENCY;
+}
+
+
static rsRetVal
qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis)
{
@@ -794,10 +840,7 @@ finalize_it:
static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
-
- CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL));
-
-finalize_it:
+ iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
RETiRet;
}
@@ -1312,6 +1355,7 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
iQueueSize, iSeverity);
+ STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
@@ -1693,7 +1737,18 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueForConsumer(pThis, pWti));
+ iRet = DequeueForConsumer(pThis, pWti);
+ if(iRet == RS_RET_FILE_NOT_FOUND) {
+ /* This is a fatal condition and means the queue is almost unusable */
+ d_pthread_mutex_unlock(pThis->mut);
+ DBGOPRINT((obj_t*) pThis, "got 'file not found' error %d, queue defunct\n", iRet);
+ iRet = queueSwitchToEmergencyMode(pThis, iRet);
+ // TODO: think about what to return as iRet -- keep RS_RET_FILE_NOT_FOUND?
+ d_pthread_mutex_lock(pThis->mut);
+ }
+ if (iRet != RS_RET_OK) {
+ FINALIZE;
+ }
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
@@ -1787,7 +1842,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
{
DEFiRet;
-//DBGPRINTF("XXXX: chkStopWrkrDA called, low watermark %d, phys Size %d\n", pThis->iLowWtrMrk, getPhysicalQueueSize(pThis));
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
}
@@ -1936,6 +1990,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"),
ctrType_IntCtr, &pThis->ctrFull));
+ STATSCOUNTER_INIT(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.full"),
+ ctrType_IntCtr, &pThis->ctrFDscrd));
+ STATSCOUNTER_INIT(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.nf"),
+ ctrType_IntCtr, &pThis->ctrNFDscrd));
+
pThis->ctrMaxqsize = 0;
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"),
ctrType_Int, &pThis->ctrMaxqsize));
@@ -2289,6 +2350,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
// TODO : handle enqOnly => discard!
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
diff --git a/runtime/queue.h b/runtime/queue.h
index 97057180..06a58229 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -169,6 +169,8 @@ struct queue_s {
statsobj_t *statsobj;
STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued);
STATSCOUNTER_DEF(ctrFull, mutCtrFull);
+ STATSCOUNTER_DEF(ctrFDscrd, mutCtrFDscrd);
+ STATSCOUNTER_DEF(ctrNFDscrd, mutCtrNFDscrd);
int ctrMaxqsize;
};
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 69b3c8d1..06438081 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -343,8 +343,10 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */
RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */
RS_RET_WRN_WRKDIR = -2182, /**< correctable problems with the rsyslog working directory */
+ RS_RET_ERR_QUEUE_EMERGENCY = -2183, /**< some fatal error caused queue to switch to emergency mode */
RS_RET_OUTDATED_STMT = -2184, /**< some outdated statement/functionality is being used in conf file */
RS_RET_MISSING_WHITESPACE = -2185, /**< whitespace is missing in some config construct */
+ RS_RET_OK_WARN = -2186, /**< config part: everything was OK, but a warning message was emitted */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/runtime/strmsrv.c b/runtime/strmsrv.c
index 0de18e7f..8310e832 100644
--- a/runtime/strmsrv.c
+++ b/runtime/strmsrv.c
@@ -765,7 +765,7 @@ static rsRetVal
SetKeepAlive(strmsrv_t *pThis, int iVal)
{
DEFiRet;
- dbgprintf("keep-alive set to %d\n", iVal);
+ dbgprintf("strmsrv: keep-alive set to %d\n", iVal);
pThis->bUseKeepAlive = iVal;
RETiRet;
}
diff --git a/runtime/wti.c b/runtime/wti.c
index 0b85c366..9e781341 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -312,7 +312,10 @@ wtiWorker(wti_t *pThis)
*/
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
- if(localRet == RS_RET_IDLE) {
+ if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) {
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+ break; /* end of loop */
+ } else if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
diff --git a/tcps_sess.c b/tcps_sess.c
index 15423cc8..921e05f2 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -253,12 +253,14 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, struct syslogTime *stTime, time_t ttG
CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg);
MsgSetInputName(pMsg, pThis->pLstnInfo->pInputName);
- MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
+ MsgSetFlowControlType(pMsg, pThis->pSrv->bUseFlowControl
+ ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
MsgSetRcvFrom(pMsg, pThis->fromHost);
CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP));
MsgSetRuleset(pMsg, pThis->pLstnInfo->pRuleset);
+ STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit);
if(pMultiSub == NULL) {
CHKiRet(submitMsg(pMsg));
} else {
diff --git a/tcpsrv.c b/tcpsrv.c
index 95c45780..bdc66685 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -39,8 +39,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
@@ -93,6 +93,7 @@ DEFobjCurrIf(netstrm)
DEFobjCurrIf(nssel)
DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
+DEFobjCurrIf(statsobj)
/* add new listener port to listener port list
@@ -102,6 +103,7 @@ static inline rsRetVal
addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
{
tcpLstnPortList_t *pEntry;
+ uchar statname[64];
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -121,6 +123,15 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
pEntry->pNext = pThis->pLstnPorts;
pThis->pLstnPorts = pEntry;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(pEntry->stats)));
+ snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(pEntry->stats, statname));
+ CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(pEntry->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(pEntry->stats));
+
finalize_it:
RETiRet;
}
@@ -399,6 +410,10 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
}
+ if(pThis->bUseKeepAlive) {
+ CHKiRet(netstrm.EnableKeepAlive(pNewStrm));
+ }
+
/* we found a free spot and can construct our session object */
CHKiRet(tcps_sess.Construct(&pSess));
CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
@@ -723,6 +738,7 @@ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macr
pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
pThis->bDisableLFDelim = 0;
pThis->OnMsgReceive = NULL;
+ pThis->bUseFlowControl = 1;
ENDobjConstruct(tcpsrv)
@@ -869,6 +885,15 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr)
}
static rsRetVal
+SetKeepAlive(tcpsrv_t *pThis, int iVal)
+{
+ DEFiRet;
+ dbgprintf("tcpsrv: keep-alive set to %d\n", iVal);
+ pThis->bUseKeepAlive = iVal;
+ RETiRet;
+}
+
+static rsRetVal
SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
{
DEFiRet;
@@ -998,6 +1023,18 @@ SetLstnMax(tcpsrv_t *pThis, int iMax)
}
+/* set if flow control shall be supported
+ */
+static rsRetVal
+SetUseFlowControl(tcpsrv_t *pThis, int bUseFlowControl)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->bUseFlowControl = bUseFlowControl;
+ RETiRet;
+}
+
+
/* set max number of sessions
* this must be called before ConstructFinalize, or it will have no effect!
* rgerhards, 2009-04-09
@@ -1035,11 +1072,13 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;
+ pIf->SetKeepAlive = SetKeepAlive;
pIf->SetUsrP = SetUsrP;
pIf->SetInputName = SetInputName;
pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
pIf->SetbDisableLFDelim = SetbDisableLFDelim;
pIf->SetSessMax = SetSessMax;
+ pIf->SetUseFlowControl = SetUseFlowControl;
pIf->SetLstnMax = SetLstnMax;
pIf->SetDrvrMode = SetDrvrMode;
pIf->SetDrvrAuthMode = SetDrvrAuthMode;
@@ -1071,6 +1110,7 @@ CODESTARTObjClassExit(tcpsrv)
objRelease(tcps_sess, DONT_LOAD_LIB);
objRelease(conf, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
@@ -1097,6 +1137,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
CHKiRet(objUse(conf, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
/* set our own handlers */
diff --git a/tcpsrv.h b/tcpsrv.h
index 269863c2..2d60b473 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -24,6 +24,7 @@
#include "obj.h"
#include "prop.h"
#include "tcps_sess.h"
+#include "statsobj.h"
/* support for framing anomalies */
typedef enum ETCPsyslogFramingAnomaly {
@@ -39,6 +40,8 @@ struct tcpLstnPortList_s {
prop_t *pInputName;
tcpsrv_t *pSrv; /**< pointer to higher-level server instance */
ruleset_t *pRuleset; /**< associated ruleset */
+ statsobj_t *stats; /**< associated stats object */
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
tcpLstnPortList_t *pNext; /**< next port or NULL */
};
@@ -47,6 +50,7 @@ struct tcpLstnPortList_s {
/* the tcpsrv object */
struct tcpsrv_s {
BEGINobjInstance; /**< Data to implement generic object - MUST be the first data element! */
+ int bUseKeepAlive; /**< use socket layer KEEPALIVE handling? */
netstrms_t *pNS; /**< pointer to network stream subsystem */
int iDrvrMode; /**< mode of the stream driver to use */
uchar *pszDrvrAuthMode; /**< auth mode of the stream driver to use */
@@ -55,6 +59,7 @@ struct tcpsrv_s {
permittedPeers_t *pPermPeers;/**< driver's permitted peers */
sbool bEmitMsgOnClose; /**< emit an informational message when the remote peer closes connection */
sbool bUsingEPoll; /**< are we in epoll mode (means we do not need to keep track of sessions!) */
+ sbool bUseFlowControl; /**< use flow control (make light delayable) */
int iLstnCurr; /**< max nbr of listeners currently supported */
netstrm_t **ppLstn; /**< our netstream listners */
tcpLstnPortList_t **ppLstnPort; /**< pointer to relevant listen port description */
@@ -120,8 +125,12 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetNotificationOnRemoteClose)(tcpsrv_t *pThis, int bNewVal); /* 2009-10-01 */
/* added v9 -- rgerhards, 2010-03-01 */
rsRetVal (*SetbDisableLFDelim)(tcpsrv_t*, int);
+ /* added v10 -- rgerhards, 2011-04-01 */
+ rsRetVal (*SetUseFlowControl)(tcpsrv_t*, int);
+ /* added v11 -- rgerhards, 2011-05-09 */
+ rsRetVal (*SetKeepAlive)(tcpsrv_t*, int);
ENDinterface(tcpsrv)
-#define tcpsrvCURR_IF_VERSION 9 /* increment whenever you change the interface structure! */
+#define tcpsrvCURR_IF_VERSION 11 /* increment whenever you change the interface structure! */
/* change for v4:
* - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10
* - SetInputName() added -- rgerhards, 2008-12-10
diff --git a/tests/inputfilegen.c b/tests/inputfilegen.c
index 26fb79af..0ff8d049 100644
--- a/tests/inputfilegen.c
+++ b/tests/inputfilegen.c
@@ -1,5 +1,6 @@
/* generate an input file suitable for use by the testbench
* Copyright (C) 2011 by Rainer Gerhards and Adiscon GmbH.
+ * usage: ./inputfilegen num-lines > file
* Part of rsyslog, licensed under GPLv3
*/
#include <stdio.h>
diff --git a/tests/tcpflood.c b/tests/tcpflood.c
index 8485acbb..8a34f06f 100644
--- a/tests/tcpflood.c
+++ b/tests/tcpflood.c
@@ -91,8 +91,10 @@
#include <errno.h>
#ifdef ENABLE_GNUTLS
# include <gnutls/gnutls.h>
-# include <gcrypt.h>
+# if GNUTLS_VERSION_NUMBER <= 0x020b00
+# include <gcrypt.h>
GCRY_THREAD_OPTION_PTHREAD_IMPL;
+# endif
#endif
#define EXIT_FAILURE 1
@@ -707,7 +709,9 @@ initTLS(void)
int r;
/* order of gcry_control and gnutls_global_init matters! */
+ #if GNUTLS_VERSION_NUMBER <= 0x020b00
gcry_control(GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+ #endif
gnutls_global_init();
/* set debug mode, if so required by the options */
if(tlsLogLevel > 0) {
diff --git a/tests/testsuites/imfile-basic.conf b/tests/testsuites/imfile-basic.conf
index 9fb9b5ca..59b109a6 100644
--- a/tests/testsuites/imfile-basic.conf
+++ b/tests/testsuites/imfile-basic.conf
@@ -6,6 +6,7 @@ $InputFileTag file:
$InputFileStateFile stat-file1
$InputFileSeverity error
$InputFileFacility local7
+$InputFileMaxLinesAtOnce 100000
$InputRunFileMonitor
$template outfmt,"%msg:F,58:2%\n"
diff --git a/tools/omfile.c b/tools/omfile.c
index fbd263c7..b94acc6d 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -122,13 +122,11 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry;
#define USE_ASYNCWRITER_DFLT 0 /* default buffer use async writer */
#define FLUSHONTX_DFLT 1 /* default for flush on TX end */
-#define DFLT_bForceChown 0
/* globals for default values */
static int iDynaFileCacheSize = 10; /* max cache for dynamic files */
static int fCreateMode = 0644; /* mode to use when creating files */
static int fDirCreateMode = 0700; /* mode to use when creating files */
static int bFailOnChown; /* fail if chown fails? */
-static int bForceChown = DFLT_bForceChown; /* Force chown() on existing files? */
static uid_t fileUID; /* UID to be used for newly created files */
static uid_t fileGID; /* GID to be used for newly created files */
static uid_t dirUID; /* UID to be used for newly created directories */
@@ -153,7 +151,6 @@ typedef struct _instanceData {
int fDirCreateMode; /* creation mode for mkdir() */
int bCreateDirs; /* auto-create directories? */
int bSyncFile; /* should the file by sync()'ed? 1- yes, 0- no */
- sbool bForceChown; /* force chown() on existing files? */
uid_t fileUID; /* IDs for creation */
uid_t dirUID;
gid_t fileGID;
@@ -200,7 +197,6 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tfile cache size=%d\n", pData->iDynaFileCacheSize);
dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "yes" : "no");
dbgprintf("\tfile owner %d, group %d\n", (int) pData->fileUID, (int) pData->fileGID);
- dbgprintf("\tforce chown() for all files: %s\n", pData->bForceChown ? "yes" : "no");
dbgprintf("\tdirectory owner %d, group %d\n", (int) pData->dirUID, (int) pData->dirGID);
dbgprintf("\tdir create mode 0%3.3o, file create mode 0%3.3o\n",
pData->fDirCreateMode, pData->fCreateMode);
@@ -239,6 +235,14 @@ rsRetVal setDynaFileCacheSize(void __attribute__((unused)) *pVal, int iNewVal)
}
+rsRetVal goneAway(void __attribute__((unused)) *pVal,
+ int __attribute__((unused)) iNewVal)
+{
+ errmsg.LogError(0, RS_RET_ERR, "directive $omfileForceChown is no longer supported");
+ return RS_RET_ERR;
+}
+
+
/* Helper to cfline(). Parses a output channel name up until the first
* comma and then looks for the template specifier. Tries
* to find that template. Maps the output channel to the
@@ -389,22 +393,7 @@ prepareFile(instanceData *pData, uchar *newFileName)
DEFiRet;
pData->pStrm = NULL;
- if(access((char*)newFileName, F_OK) == 0) {
- if(pData->bForceChown) {
- /* Try to fix wrong ownership set by someone else. Note that this code
- * will no longer work once we have made the $PrivDrop code fully secure.
- * This change is based on an idea of Michael Terry, provided as part of
- * the effort to make rsyslogd the Ubuntu default syslogd.
- * rgerhards, 2009-09-11
- */
- if(chown((char*)newFileName, pData->fileUID, pData->fileGID) != 0) {
- if(pData->bFailOnChown) {
- int eSave = errno;
- errno = eSave;
- }
- }
- }
- } else {
+ if(access((char*)newFileName, F_OK) != 0) {
/* file does not exist, create it (and eventually parent directories */
if(pData->bCreateDirs) {
/* We first need to create parent dirs if they are missing.
@@ -424,7 +413,7 @@ prepareFile(instanceData *pData, uchar *newFileName)
pData->fCreateMode);
if(fd != -1) {
/* check and set uid/gid */
- if(pData->bForceChown || pData->fileUID != (uid_t)-1 || pData->fileGID != (gid_t) -1) {
+ if(pData->fileUID != (uid_t)-1 || pData->fileGID != (gid_t) -1) {
/* we need to set owner/group */
if(fchown(fd, pData->fileUID, pData->fileGID) != 0) {
if(pData->bFailOnChown) {
@@ -654,6 +643,9 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
} else { /* "regular", non-dynafile */
if(pData->pStrm == NULL) {
CHKiRet(prepareFile(pData, pData->f_fname));
+ if(pData->pStrm == NULL) {
+ errmsg.LogError(0, RS_RET_NO_FILE_ACCESS, "Could no open output file '%s'", pData->f_fname);
+ }
}
}
@@ -811,7 +803,6 @@ CODESTARTparseSelectorAct
pData->fDirCreateMode = fDirCreateMode;
pData->bCreateDirs = bCreateDirs;
pData->bFailOnChown = bFailOnChown;
- pData->bForceChown = bForceChown;
pData->fileUID = fileUID;
pData->fileGID = fileGID;
pData->dirUID = dirUID;
@@ -821,18 +812,6 @@ CODESTARTparseSelectorAct
pData->iIOBufSize = (int) iIOBufSize;
pData->iFlushInterval = iFlushInterval;
pData->bUseAsyncWriter = bUseAsyncWriter;
-
- if(pData->bDynamicName == 0) {
- /* try open and emit error message if not possible. At this stage, we ignore the
- * return value of prepareFile, this is taken care of in later steps.
- */
- prepareFile(pData, pData->f_fname);
-
- if(pData->pStrm == NULL) {
- DBGPRINTF("Error opening log file: %s\n", pData->f_fname);
- errmsg.LogError(0, RS_RET_NO_FILE_ACCESS, "Could not open output file '%s'", pData->f_fname);
- }
- }
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -847,7 +826,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
dirUID = -1;
dirGID = -1;
bFailOnChown = 1;
- bForceChown = DFLT_bForceChown;
iDynaFileCacheSize = 10;
fCreateMode = 0644;
fDirCreateMode = 0700;
@@ -922,7 +900,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(omsdRegCFSLineHdlr((uchar *)"filecreatemode", 0, eCmdHdlrFileCreateMode, NULL, &fCreateMode, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"createdirs", 0, eCmdHdlrBinary, NULL, &bCreateDirs, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"failonchownfailure", 0, eCmdHdlrBinary, NULL, &bFailOnChown, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"omfileForceChown", 0, eCmdHdlrBinary, NULL, &bForceChown, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omfileforcechown", 0, eCmdHdlrBinary, goneAway, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionfileenablesync", 0, eCmdHdlrBinary, NULL, &bEnableSync, STD_LOADABLE_MODULE_ID));
CHKiRet(regCfSysLineHdlr((uchar *)"actionfiledefaulttemplate", 0, eCmdHdlrGetWord, NULL, &pszFileDfltTplName, NULL));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
diff --git a/tools/omfwd.c b/tools/omfwd.c
index 10cce0e2..b456db17 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -4,30 +4,26 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * File begun on 2007-07-20 by RGerhards (extracted from syslogd.c)
- * This file is under development and has not yet arrived at being fully
- * self-contained and a real object. So far, it is mostly an excerpt
- * of the "old" message code without any modifications. However, it
- * helps to have things at the right place one we go to the meat of it.
+ * File begun on 2007-07-20 by RGerhards (extracted from syslogd.c, which
+ * at the time of rsyslog fork from sysklogd was under BSD license)
*
- * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2012 Adiscon GmbH.
*
* This file is part of rsyslog.
*
- * Rsyslog is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Rsyslog is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
- *
- * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
#include "config.h"
#include "rsyslog.h"
@@ -131,22 +127,6 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether
}
-/* get the syslog forward port from selector_t. The passed in
- * struct must be one that is setup for forwarding.
- * rgerhards, 2007-06-28
- * We may change the implementation to try to lookup the port
- * if it is unspecified. So far, we use the IANA default auf 514.
- */
-static char *getFwdPt(instanceData *pData)
-{
- assert(pData != NULL);
- if(pData->port == NULL)
- return("514");
- else
- return(pData->port);
-}
-
-
/* destruct the TCP helper objects
* This, for example, is needed after something went wrong.
* This function is void because it "can not" fail.
@@ -345,7 +325,7 @@ static rsRetVal TCPSendInit(void *pvData)
}
/* params set, now connect */
CHKiRet(netstrm.Connect(pData->pNetstrm, glbl.GetDefPFFamily(),
- (uchar*)getFwdPt(pData), (uchar*)pData->f_hname));
+ (uchar*)pData->port, (uchar*)pData->f_hname));
}
finalize_it:
@@ -378,9 +358,9 @@ static rsRetVal doTryResume(instanceData *pData)
hints.ai_flags = AI_NUMERICSERV;
hints.ai_family = glbl.GetDefPFFamily();
hints.ai_socktype = SOCK_DGRAM;
- if((iErr = (getaddrinfo(pData->f_hname, getFwdPt(pData), &hints, &res))) != 0) {
+ if((iErr = (getaddrinfo(pData->f_hname, pData->port, &hints, &res))) != 0) {
dbgprintf("could not get addrinfo for hostname '%s':'%s': %d%s\n",
- pData->f_hname, getFwdPt(pData), iErr, gai_strerror(iErr));
+ pData->f_hname, pData->port, iErr, gai_strerror(iErr));
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
dbgprintf("%s found, resuming.\n", pData->f_hname);
@@ -412,15 +392,18 @@ CODESTARTtryResume
ENDtryResume
BEGINdoAction
- char *psz = NULL; /* temporary buffering */
+ char *psz; /* temporary buffering */
register unsigned l;
int iMaxLine;
+# ifdef USE_NETZIP
+ Bytef *out = NULL; /* for compression */
+# endif
CODESTARTdoAction
CHKiRet(doTryResume(pData));
iMaxLine = glbl.GetMaxLine();
- dbgprintf(" %s:%s/%s\n", pData->f_hname, getFwdPt(pData),
+ dbgprintf(" %s:%s/%s\n", pData->f_hname, pData->port,
pData->protocol == FORW_UDP ? "udp" : "tcp");
psz = (char*) ppString[0];
@@ -438,7 +421,6 @@ CODESTARTdoAction
* rgerhards, 2006-11-30
*/
if(pData->compressionLevel && (l > CONF_MIN_SIZE_FOR_COMPRESS)) {
- Bytef *out;
uLongf destLen = iMaxLine + iMaxLine/100 +12; /* recommended value from zlib doc */
uLong srcLen = l;
int ret;
@@ -459,14 +441,11 @@ CODESTARTdoAction
* rgerhards, 2006-11-30
*/
dbgprintf("Compression failed, sending uncompressed message\n");
- free(out);
} else if(destLen+1 < l) {
/* only use compression if there is a gain in using it! */
dbgprintf("there is gain in compression, so we do it\n");
psz = (char*) out;
l = destLen + 1; /* take care for the "z" at message start! */
- } else {
- free(out);
}
++destLen;
}
@@ -488,10 +467,7 @@ CODESTARTdoAction
}
finalize_it:
# ifdef USE_NETZIP
- if((psz != NULL) && (psz != (char*) ppString[0])) {
- /* we need to free temporary buffer, alloced above - Naoya Nakazawa, 2010-01-11 */
- free(psz);
- }
+ free(out); /* is NULL if it was never used... */
# endif
ENDdoAction
@@ -630,12 +606,16 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
if(pData->port == NULL) {
errmsg.LogError(0, NO_ERRCODE, "Could not get memory to store syslog forwarding port, "
"using default port, results may not be what you intend\n");
- /* we leave f_forw.port set to NULL, this is then handled by getFwdPt(). */
+ /* we leave f_forw.port set to NULL, this is then handled below */
} else {
memcpy(pData->port, tmp, i);
*(pData->port + i) = '\0';
}
}
+ /* check if no port is set. If so, we use the IANA-assigned port of 514 */
+ if(pData->port == NULL) {
+ CHKmalloc(pData->port = strdup("514"));
+ }
/* now skip to template */
while(*p && *p != ';' && *p != '#' && !isspace((int) *p))
diff --git a/tools/ompipe.c b/tools/ompipe.c
index 52f1c60e..1a9d610f 100644
--- a/tools/ompipe.c
+++ b/tools/ompipe.c
@@ -71,6 +71,7 @@ DEFobjCurrIf(errmsg)
typedef struct _instanceData {
uchar f_fname[MAXFNAME];/* pipe or template name (display only) */
short fd; /* pipe descriptor for (current) pipe */
+ sbool bHadError; /* did we already have/report an error on this pipe? */
} instanceData;
@@ -100,6 +101,17 @@ preparePipe(instanceData *pData)
{
DEFiRet;
pData->fd = open((char*) pData->f_fname, O_RDWR|O_NONBLOCK|O_CLOEXEC);
+ if(pData->fd < 0 ) {
+ pData->fd = -1;
+ if(!pData->bHadError) {
+ char errStr[1024];
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_NO_FILE_ACCESS, "Could no open output pipe '%s': %s",
+ pData->f_fname, errStr);
+ pData->bHadError = 1;
+ }
+ DBGPRINTF("Error opening log pipe: %s\n", pData->f_fname);
+ }
RETiRet;
}
@@ -149,6 +161,7 @@ finalize_it:
BEGINcreateInstance
CODESTARTcreateInstance
pData->fd = -1;
+ pData->bHadError = 0;
ENDcreateInstance
@@ -203,11 +216,6 @@ CODESTARTparseSelectorAct
*/
preparePipe(pData);
- if(pData->fd < 0 ) {
- pData->fd = -1;
- DBGPRINTF("Error opening log pipe: %s\n", pData->f_fname);
- errmsg.LogError(0, RS_RET_NO_FILE_ACCESS, "Could not open output pipe '%s'", pData->f_fname);
- }
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
diff --git a/tools/omusrmsg.c b/tools/omusrmsg.c
index a8ad568b..9130e403 100644
--- a/tools/omusrmsg.c
+++ b/tools/omusrmsg.c
@@ -269,6 +269,7 @@ ENDdoAction
BEGINparseSelectorAct
uchar *q;
int i;
+ int bHadWarning = 0;
CODESTARTparseSelectorAct
CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* User names must begin with a gnu e-regex:
@@ -277,9 +278,18 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
*/
if(!strncmp((char*) p, ":omusrmsg:", sizeof(":omusrmsg:") - 1)) {
p += sizeof(":omusrmsg:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
- } else if(!*p || !((*p >= 'a' && *p <= 'z') || (*p >= 'A' && *p <= 'Z')
- || (*p >= '0' && *p <= '9') || *p == '_' || *p == '.' || *p == '*'))
+ } else {
+ if(!*p || !((*p >= 'a' && *p <= 'z') || (*p >= 'A' && *p <= 'Z')
+ || (*p >= '0' && *p <= '9') || *p == '_' || *p == '.' || *p == '*')) {
ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+ } else {
+ errmsg.LogError(0, RS_RET_OUTDATED_STMT,
+ "action '%s' treated as ':omusrmsg:%s' - please "
+ "change syntax, '%s' will not be supported in the future",
+ p, p, p);
+ bHadWarning = 1;
+ }
+ }
CHKiRet(createInstance(&pData));
@@ -313,6 +323,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
!= RS_RET_OK)
goto finalize_it;
}
+ if(iRet == RS_RET_OK && bHadWarning)
+ iRet = RS_RET_OK_WARN;
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
diff --git a/tools/syslogd.c b/tools/syslogd.c
index bec2f9a7..9553ad08 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -334,6 +334,7 @@ static uchar template_WallFmt[] = "\"\r\n\7Message from syslogd@%HOSTNAME% at %t
static uchar template_StdUsrMsgFmt[] = "\" %syslogtag%%msg%\n\r\"";
static uchar template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')\",SQL";
static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-pgsql%', '%timegenerated:::date-pgsql%', %iut%, '%syslogtag%')\",STDSQL";
+static uchar template_SysklogdFileFormat[] = "\"%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg%\n\"";
static uchar template_StdJSONFmt[] = "\"{\\\"message\\\":\\\"%msg%\\\",\\\"fromhost\\\":\\\"%HOSTNAME%\\\",\\\"facility\\\":\\\"%syslogfacility-text%\\\",\\\"priority\\\":\\\"%syslogpriority-text%\\\",\\\"timereported\\\":\\\"%timereported:::date-rfc3339%\\\",\\\"timegenerated\\\":\\\"%timegenerated:::date-rfc3339%\\\"}\",JSON";
static uchar template_spoofadr[] = "\"%fromhost-ip%\"";
/* end templates */
@@ -460,8 +461,15 @@ void untty(void)
#else
{
int i;
+ pid_t pid;
if(!Debug) {
+ pid = getpid();
+ if (setpgid(pid, pid) < 0) {
+ perror("setpgid");
+ exit(1);
+ }
+
i = open(_PATH_TTY, O_RDWR|O_CLOEXEC);
if (i >= 0) {
# if !defined(__hpux)
@@ -2172,6 +2180,8 @@ static rsRetVal mainThread()
tplAddLine(" StdUsrMsgFmt", &pTmp);
pTmp = template_StdDBFmt;
tplAddLine(" StdDBFmt", &pTmp);
+ pTmp = template_SysklogdFileFormat;
+ tplAddLine("RSYSLOG_SysklogdFileFormat", &pTmp);
pTmp = template_StdPgSQLFmt;
tplAddLine(" StdPgSQLFmt", &pTmp);
pTmp = template_StdJSONFmt;