summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog123
-rw-r--r--Makefile.am12
-rw-r--r--action.c25
-rw-r--r--action.h1
-rw-r--r--configure.ac110
-rw-r--r--doc/imfile.html8
-rw-r--r--doc/imptcp.html5
-rw-r--r--doc/manual.html3
-rw-r--r--doc/ommysql.html12
-rw-r--r--plugins/imfile/imfile.c66
-rw-r--r--plugins/imptcp/imptcp.c269
-rw-r--r--plugins/imudp/Makefile.am2
-rw-r--r--plugins/imudp/imudp.c168
-rw-r--r--plugins/imuxsock/imuxsock.c111
-rw-r--r--plugins/omhdfs/omhdfs.c2
-rw-r--r--plugins/ommongodb/Makefile.am11
-rw-r--r--plugins/ommongodb/README23
-rw-r--r--plugins/ommongodb/ommongodb.c280
-rw-r--r--plugins/ommysql/ommysql.c39
-rw-r--r--plugins/pmaixforwardedfrom/Makefile.am8
-rw-r--r--plugins/pmaixforwardedfrom/pmaixforwardedfrom.c167
-rw-r--r--plugins/pmcisconames/Makefile.am8
-rw-r--r--plugins/pmcisconames/pmcisconames.c177
-rw-r--r--plugins/pmsnare/Makefile.am8
-rw-r--r--plugins/pmsnare/pmsnare.c238
-rw-r--r--runtime/batch.h8
-rw-r--r--runtime/datetime.h1
-rw-r--r--runtime/errmsg.h1
-rw-r--r--runtime/expr.h1
-rw-r--r--runtime/glbl.c37
-rw-r--r--runtime/modules.h1
-rw-r--r--runtime/msg.c10
-rw-r--r--runtime/msg.h3
-rw-r--r--runtime/queue.c22
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.c18
-rw-r--r--runtime/rsyslog.h8
-rw-r--r--runtime/ruleset.c23
-rw-r--r--runtime/stream.c142
-rw-r--r--runtime/stream.h5
-rw-r--r--runtime/stringbuf.c2
-rw-r--r--runtime/wtp.c6
-rw-r--r--tests/Makefile.am24
-rwxr-xr-xtests/imuxsock_ccmiddle_root.sh18
-rwxr-xr-xtests/imuxsock_logger_root.sh18
-rwxr-xr-xtests/imuxsock_traillf_root.sh18
-rw-r--r--tests/resultdata/imuxsock_ccmiddle.log1
-rw-r--r--tests/resultdata/imuxsock_logger.log1
-rw-r--r--tests/resultdata/imuxsock_traillf.log1
-rw-r--r--tests/syslog_inject.c28
-rw-r--r--tests/testsuites/imuxsock_ccmiddle_root.conf7
-rw-r--r--tests/testsuites/imuxsock_logger_root.conf7
-rw-r--r--tests/testsuites/imuxsock_traillf_root.conf7
-rw-r--r--threads.c16
-rw-r--r--tools/iminternal.c7
-rw-r--r--tools/iminternal.h5
-rw-r--r--tools/syslogd.c47
57 files changed, 2099 insertions, 271 deletions
diff --git a/ChangeLog b/ChangeLog
index 66743ad0..1d2e3ce1 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -3,13 +3,54 @@
- improved TLS startup (Diffie-Hellman bits do not need to be generated,
as we do not support full anon key exchange -- we always need certs)
---------------------------------------------------------------------------
-Version 6.1.3 [DEVEL] (rgerhards), 2010-12-??
+Version 6.1.5 [DEVEL] (rgerhards), 2011-02-??
+- bugfix: fixed a memory leak and potential abort condition
+ this could happen if multiple rulesets were used and some output batches
+ contained messages belonging to more than one ruleset.
+ fixes: http://bugzilla.adiscon.com/show_bug.cgi?id=226
+ fixes: http://bugzilla.adiscon.com/show_bug.cgi?id=218
+- bugfix: memory leak when $RepeatedMsgReduction on was used
+ bug tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=225
+---------------------------------------------------------------------------
+Version 6.1.4 [DEVEL] (rgerhards), 2011-02-18
+- bugfix/omhdfs: directive $OMHDFSFileName rendered unusable
+ due to a search and replace-induced bug ;)
+- bugfix: minor race condition in action.c - considered cosmetic
+ This is considered cosmetic as multiple threads tried to write exactly
+ the same value into the same memory location without sync. The method
+ has been changed so this can no longer happen.
+- added pmsnare parser module (written by David Lang)
+- enhanced imfile to support non-cancel input termination
+- improved systemd socket activation thanks to Marius Tomaschweski
+- improved error reporting for $WorkDirectory
+ non-existance and other detectable problems are now reported,
+ and the work directory is NOT set in this case
+- bugfix: pmsnare causded abort under some conditions
+- bugfix: abort if imfile reads file line of more than 64KiB
+ Thanks to Peter Eisentraut for reporting and analysing this problem.
+ bug tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=221
+- bugfix: queue engine did not properly slow down inputs in FULL_DELAY mode
+ when in disk-assisted mode. This especially affected imfile, which
+ created unnecessarily queue files if a large set of input file data was
+ to process.
+- bugfix: very long running actions could prevent shutdown under some
+ circumstances. This has now been solved, at least for common
+ situations.
+- bugfix: fixed compile problem due to empty structs
+ this occured only on some platforms/compilers. thanks to Dražen Kačar
+ for the fix
+---------------------------------------------------------------------------
+Version 6.1.3 [DEVEL] (rgerhards), 2011-02-01
+- experimental support for monogodb added
- added $IMUDPSchedulingPolicy and $IMUDPSchedulingPriority config settings
- added $LocalHostName config directive
- improved tcpsrv performance by enabling multiple-entry epoll
so far, we always pulled a single event from the epoll interface.
Now 128, what should result in performance improvement (less API
calls) on busy systems. Most importantly affects imtcp.
+- imptcp now supports non-cancel termination mode, a plus in stability
+- imptcp speedup: multiple worker threads can now be used to read data
+- new directive $InputIMPTcpHelperThreads added
- bugfix: fixed build problems on some platforms
namely those that have 32bit atomic operations but not 64 bit ones
- bugfix: local hostname was pulled too-early, so that some config
@@ -17,6 +58,8 @@ Version 6.1.3 [DEVEL] (rgerhards), 2010-12-??
- enhanced tcpflood to support multiple sender threads
this is required for some high-throughput scenarios (and necessary to
run some performance tests, because otherwise the sender is too slow).
+- added some new custom parsers (snare, aix, some Cisco "specialities")
+ thanks to David Lang
---------------------------------------------------------------------------
Version 6.1.2 [DEVEL] (rgerhards), 2010-12-16
- added experimental support for log normalizaton (via liblognorm)
@@ -70,13 +113,63 @@ expected that interfaces, even new ones, break during the initial
syslog plain tcp input plugin (NOT supporting TLS!)
[ported from v4]
---------------------------------------------------------------------------
-Version 5.7.3 [V5-DEVEL] (rgerhards), 2010-12-??
+Version 5.7.6 [V5-BETA] (rgerhards), 2011-02-25
+- bugfix: fixed a memory leak and potential abort condition
+ this could happen if multiple rulesets were used and some output batches
+ contained messages belonging to more than one ruleset.
+ fixes: http://bugzilla.adiscon.com/show_bug.cgi?id=226
+ fixes: http://bugzilla.adiscon.com/show_bug.cgi?id=218
+- bugfix: memory leak when $RepeatedMsgReduction on was used
+ bug tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=225
+---------------------------------------------------------------------------
+Version 5.7.5 [V5-BETA] (rgerhards), 2011-02-23
+- enhance: imfile did not yet support multiple rulesets, now added
+ we do this directly in the beta because a) it does not affect existing
+ functionality and b) one may argue that this missing functionality is
+ close to a bug.
+- improved testbench, added tests for imuxsock
+- bugfix: imuxsock did no longer sanitize received messages
+ This was a regression from the imuxsock partial rewrite. Happened
+ because the message is no longer run through the standard parsers.
+ bug tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=224
+- bugfix: minor race condition in action.c - considered cosmetic
+ This is considered cosmetic as multiple threads tried to write exactly
+ the same value into the same memory location without sync. The method
+ has been changed so this can no longer happen.
+---------------------------------------------------------------------------
+Version 5.7.4 [V5-BETA] (rgerhards), 2011-02-17
+- added pmsnare parser module (written by David Lang)
+- enhanced imfile to support non-cancel input termination
+- improved systemd socket activation thanks to Marius Tomaschweski
+- improved error reporting for $WorkDirectory
+ non-existance and other detectable problems are now reported,
+ and the work directory is NOT set in this case
+- bugfix: pmsnare causded abort under some conditions
+- bugfix: abort if imfile reads file line of more than 64KiB
+ Thanks to Peter Eisentraut for reporting and analysing this problem.
+ bug tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=221
+- bugfix: queue engine did not properly slow down inputs in FULL_DELAY mode
+ when in disk-assisted mode. This especially affected imfile, which
+ created unnecessarily queue files if a large set of input file data was
+ to process.
+- bugfix: very long running actions could prevent shutdown under some
+ circumstances. This has now been solved, at least for common
+ situations.
+- bugfix: fixed compile problem due to empty structs
+ this occured only on some platforms/compilers. thanks to Dražen Kačar
+ for the fix
+---------------------------------------------------------------------------
+Version 5.7.3 [V5-BETA] (rgerhards), 2011-02-07
+- added support for processing multi-line messages in imfile
- added $IMUDPSchedulingPolicy and $IMUDPSchedulingPriority config settings
- added $LocalHostName config directive
- bugfix: fixed build problems on some platforms
namely those that have 32bit atomic operations but not 64 bit ones
- bugfix: local hostname was pulled too-early, so that some config
directives (namely FQDN settings) did not have any effect
+- bugfix: imfile did duplicate messages under some circumstances
+- added $OMMySQLConfigFile config directive
+- added $OMMySQLConfigSection config directive
---------------------------------------------------------------------------
Version 5.7.2 [V5-DEVEL] (rgerhards), 2010-11-26
- bugfix(important): problem in TLS handling could cause rsyslog to loop
@@ -123,8 +216,12 @@ Version 5.7.0 [V5-DEVEL] (rgerhards), 2010-09-16
thanks to Lennart Poettering for this patch
* sd-systemd API added as part of rsyslog runtime library
---------------------------------------------------------------------------
-Version 5.6.3 [V5-STABLE] (rgerhards), 2010-12-??
-- bugfix: action processor released mememory too early, resulting in
+Version 5.6.4 [V5-STABLE] (rgerhards), 2011-02-25
+- bugfix: memory leak when $RepeatedMsgReduction on was used
+ bug tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=225
+---------------------------------------------------------------------------
+Version 5.6.3 [V5-STABLE] (rgerhards), 2011-01-26
+- bugfix: action processor released memory too early, resulting in
potential issue in retry cases (but very unlikely due to another
bug, which I also fixed -- only after the fix this problem here
became actually visible).
@@ -136,6 +233,9 @@ Version 5.6.3 [V5-STABLE] (rgerhards), 2010-12-??
it!)
- bugfix: batches which had actions in error were not properly retried in
all cases
+- bugfix: imfile did duplicate messages under some circumstances
+- bugfix: testbench was not activated if no Java was present on system
+ ... what actually was a left-over. Java is no longer required.
---------------------------------------------------------------------------
Version 5.6.2 [V5-STABLE] (rgerhards), 2010-11-30
- bugfix: compile failed on systems without epoll_create1()
@@ -712,6 +812,21 @@ Version 4.7.0 [v4-devel] (rgerhards), 2010-04-14
- imported changes from 4.5.6 and below
---------------------------------------------------------------------------
Version 4.6.6 [v4-stable] (rgerhards), 2010-11-??
+- bugfix: imfile potentially duplicates lines
+ This can happen when 0 bytes are read from the input file, and some
+ writer appends data to the file BEFORE we check if a rollover happens.
+ The check for rollover uses the inode and size as a criterion. So far,
+ we checked for equality of sizes, which is not given in this scenario,
+ but that does not indicate a rollover. From the source code comments:
+ Note that when we check the size, we MUST NOT check for equality.
+ The reason is that the file may have been written right after we
+ did try to read (so the file size has increased). That is NOT in
+ indicator of a rollover (this is an actual bug scenario we
+ experienced). So we need to check if the new size is smaller than
+ what we already have seen!
+ Also, under some circumstances an invalid truncation was detected. This
+ code has now been removed, a file change (and thus resent) is only
+ detected if the inode number changes.
- bugfix: a couple of problems that imfile had on some platforms, namely
Ubuntu (not their fault, but occured there)
- bugfix: imfile utilizes 32 bit to track offset. Most importantly,
diff --git a/Makefile.am b/Makefile.am
index a8a53dfb..b63e9ecc 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -123,6 +123,18 @@ if ENABLE_OMSTDOUT
SUBDIRS += plugins/omstdout
endif
+if ENABLE_PMCISCONAMES
+SUBDIRS += plugins/pmcisconames
+endif
+
+if ENABLE_PMAIXFORWARDEDFROM
+SUBDIRS += plugins/pmaixforwardedfrom
+endif
+
+if ENABLE_PMSNARE
+SUBDIRS += plugins/pmsnare
+endif
+
if ENABLE_PMLASTMSG
SUBDIRS += plugins/pmlastmsg
endif
diff --git a/action.c b/action.c
index 81195e8b..804c4645 100644
--- a/action.c
+++ b/action.c
@@ -500,7 +500,8 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow)
* kind of facility: in the first place, the module should return a proper indication
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
-static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
+static inline rsRetVal
+actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate)
{
int iRetries;
int iSleepPeriod;
@@ -510,7 +511,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
ASSERT(pThis != NULL);
iRetries = 0;
- while((*pThis->pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
+ while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) {
bTreatOKasSusp = 1;
@@ -530,7 +531,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
iSleepPeriod = pThis->iResumeInterval;
ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */
srSleep(iSleepPeriod, 0);
- if(*pThis->pbShutdownImmediate) {
+ if(*pbShutdownImmediate) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
}
@@ -551,7 +552,7 @@ finalize_it:
/* try to resume an action -- rgerhards, 2007-08-02
* changed to new action state engine -- rgerhards, 2009-05-07
*/
-static rsRetVal actionTryResume(action_t *pThis)
+static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
{
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
@@ -575,7 +576,7 @@ static rsRetVal actionTryResume(action_t *pThis)
if(pThis->eState == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, ttNow));
+ CHKiRet(actionDoRetry(pThis, ttNow, pbShutdownImmediate));
}
if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
@@ -592,12 +593,12 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static inline rsRetVal actionPrepare(action_t *pThis)
+static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate)
{
DEFiRet;
assert(pThis != NULL);
- CHKiRet(actionTryResume(pThis));
+ CHKiRet(actionTryResume(pThis, pbShutdownImmediate));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
@@ -806,14 +807,14 @@ finalize_it:
* rgerhards, 2008-01-28
*/
static inline rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams)
+actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate)
{
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
- CHKiRet(actionPrepare(pThis));
+ CHKiRet(actionPrepare(pThis, pbShutdownImmediate));
if(pThis->eState == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
@@ -840,7 +841,7 @@ finishBatch(action_t *pThis, batch_t *pBatch)
FINALIZE; /* nothing to do */
}
- CHKiRet(actionPrepare(pThis));
+ CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate));
if(pThis->eState == ACT_STATE_ITX) {
iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData);
switch(iRet) {
@@ -907,7 +908,8 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
&& pBatch->pElem[i].state != BATCH_STATE_DISC
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
- localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams);
+ localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
+ pBatch->pbShutdownImmediate);
DBGPRINTF("action call returned %d\n", localRet);
/* Note: we directly modify the batch object state, because we know that
* wo do not overwrite BATCH_STATE_DISC indicators!
@@ -1078,7 +1080,6 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
pBatch->pbShutdownImmediate = pbShutdownImmediate;
- pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate;
CHKiRet(prepareBatch(pAction, pBatch));
/* We now must guard the output module against execution by multiple threads. The
diff --git a/action.h b/action.h
index 56a9d3e0..ae2b5184 100644
--- a/action.h
+++ b/action.h
@@ -88,7 +88,6 @@ struct action_s {
SYNC_OBJ_TOOL; /* required for mutex support */
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
uchar *pszName; /* action name (for documentation) */
- int *pbShutdownImmediate;/* to facilitate shutdown, if var is 1, shut down immediately */
DEF_ATOMIC_HELPER_MUT(mutCAS);
};
diff --git a/configure.ac b/configure.ac
index 1cde5935..bd0d6e62 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-AC_INIT([rsyslog],[6.1.2],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[6.1.5],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
@@ -278,6 +278,36 @@ if test "x$enable_pthreads" != "xno"; then
)
fi
+AC_CHECK_FUNCS(
+ [pthread_setschedparam],
+ [
+ rsyslog_have_pthread_setschedparam=yes
+ ],
+ [
+ rsyslog_have_pthread_setschedparam=no
+ ]
+)
+AC_CHECK_HEADERS(
+ [sched.h],
+ [
+ rsyslog_have_sched_h=yes
+ ],
+ [
+ rsyslog_have_sched_h=no
+ ]
+)
+if test "$rsyslog_have_pthread_setschedparam" = "yes" -a "$rsyslog_have_sched_h" = "yes"; then
+ save_LIBS=$LIBS
+ LIBS=
+ AC_SEARCH_LIBS(sched_get_priority_max, rt)
+ if test "x$ac_cv_search" != "xno"; then
+ AC_CHECK_FUNCS(sched_get_priority_max)
+ fi
+ IMUDP_LIBS=$LIBS
+ AC_SUBST(IMUDP_LIBS)
+ LIBS=$save_LIBS
+fi
+
# klog
AC_ARG_ENABLE(klog,
@@ -794,7 +824,7 @@ AC_SUBST(LIBLOGGING_LIBS)
# enable/disable the testbench (e.g. because some important parts
# are missing)
AC_ARG_ENABLE(testbench,
- [AS_HELP_STRING([--enable-testbench],[file input module enabled @<:@default=yes@:>@])],
+ [AS_HELP_STRING([--enable-testbench],[testbench enabled @<:@default=yes@:>@])],
[case "${enableval}" in
yes) enable_testbench="yes" ;;
no) enable_testbench="no" ;;
@@ -802,11 +832,6 @@ AC_ARG_ENABLE(testbench,
esac],
[enable_testbench=yes]
)
-if test "$enable_testbench" = "yes"; then
- if test x$HAVE_JAVAC = x; then
- enable_testbench='no'
- fi
-fi
AM_CONDITIONAL(ENABLE_TESTBENCH, test x$enable_testbench = xyes)
@@ -825,7 +850,7 @@ AM_CONDITIONAL(ENABLE_IMFILE, test x$enable_imfile = xyes)
# settings for the door input module (under solaris, thus default off)
AC_ARG_ENABLE(imsolaris,
- [AS_HELP_STRING([--enable-imsolaris],[door input module enabled @<:@default=no@:>@])],
+ [AS_HELP_STRING([--enable-imsolaris],[solaris input module enabled @<:@default=no@:>@])],
[case "${enableval}" in
yes) enable_imsolaris="yes" ;;
no) enable_imsolaris="no" ;;
@@ -943,6 +968,45 @@ AC_ARG_ENABLE(pmlastmsg,
AM_CONDITIONAL(ENABLE_PMLASTMSG, test x$enable_pmlastmsg = xyes)
+# settings for pmcisconames
+AC_ARG_ENABLE(pmcisconames,
+ [AS_HELP_STRING([--enable-pmcisconames],[Compiles cisconames parser module @<:@default=no@:>@])],
+ [case "${enableval}" in
+ yes) enable_pmcisconames="yes" ;;
+ no) enable_pmcisconames="no" ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-pmcisconames) ;;
+ esac],
+ [enable_pmcisconames=no]
+)
+AM_CONDITIONAL(ENABLE_PMCISCONAMES, test x$enable_pmcisconames = xyes)
+
+
+# settings for pmaixforwardedfrom
+AC_ARG_ENABLE(pmaixforwardedfrom,
+ [AS_HELP_STRING([--enable-pmaixforwardedfrom],[Compiles aixforwardedfrom parser module @<:@default=no@:>@])],
+ [case "${enableval}" in
+ yes) enable_pmaixforwardedfrom="yes" ;;
+ no) enable_pmaixforwardedfrom="no" ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-pmaixforwardedfrom) ;;
+ esac],
+ [enable_pmaixforwardedfrom=no]
+)
+AM_CONDITIONAL(ENABLE_PMAIXFORWARDEDFROM, test x$enable_pmaixforwardedfrom = xyes)
+
+
+# settings for pmsnare
+AC_ARG_ENABLE(pmsnare,
+ [AS_HELP_STRING([--enable-pmsnare],[Compiles snare parser module @<:@default=no@:>@])],
+ [case "${enableval}" in
+ yes) enable_pmsnare="yes" ;;
+ no) enable_pmsnare="no" ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-pmsnare) ;;
+ esac],
+ [enable_pmsnare=no]
+)
+AM_CONDITIONAL(ENABLE_PMSNARE, test x$enable_pmsnare = xyes)
+
+
# settings for pmrfc3164sd
AC_ARG_ENABLE(pmrfc3164sd,
[AS_HELP_STRING([--enable-pmrfc3164sd],[Compiles rfc3164sd parser module @<:@default=no@:>@])],
@@ -1006,7 +1070,7 @@ AC_SUBST(RELP_LIBS)
# settings for omuxsock
AC_ARG_ENABLE(omuxsock,
- [AS_HELP_STRING([--enable-omuxsock],[Compiles stdout module @<:@default=no@:>@])],
+ [AS_HELP_STRING([--enable-omuxsock],[Compiles omuxsock module @<:@default=no@:>@])],
[case "${enableval}" in
yes) enable_omuxsock="yes" ;;
no) enable_omuxsock="no" ;;
@@ -1021,7 +1085,7 @@ AM_CONDITIONAL(ENABLE_OMUXSOCK, test x$enable_omuxsock = xyes)
# part of rsyslog, into the build process. It is named cust1, so that
# additional such modules can easily be added.
AC_ARG_ENABLE(cust1,
- [AS_HELP_STRING([--enable-cust1],[Compiles stdout module @<:@default=no@:>@])],
+ [AS_HELP_STRING([--enable-cust1],[Compiles cust1 module @<:@default=no@:>@])],
[case "${enableval}" in
yes) enable_cust1="yes" ;;
no) enable_cust1="no" ;;
@@ -1089,6 +1153,25 @@ AC_ARG_ENABLE(omhdfs,
AM_CONDITIONAL(ENABLE_OMHDFS, test x$enable_omhdfs = xyes)
+#MONGODB SUPPORT
+
+AC_ARG_ENABLE(ommongodb,
+ [AS_HELP_STRING([--enable-ommongodb],[Compiles ommongodb template module @<:@default=no@:>@])],
+ [case "${enableval}" in
+ yes) enable_ommongodb="yes" ;;
+ no) enable_ommongodb="no" ;;
+ *) AC_MSG_ERROR(bad value ${enableval} for --enable-ommongodb) ;;
+ esac],
+ [enable_ommongodb=no]
+)
+#
+# you may want to do some library checks here - see snmp, mysql, pgsql modules
+# for samples
+#
+AM_CONDITIONAL(ENABLE_OMMONGODB, test x$enable_ommongodb = xyes)
+# end of copy template - be sure to search for omtemplate to find everything!
+
+
AC_CONFIG_FILES([Makefile \
runtime/Makefile \
tools/Makefile \
@@ -1107,6 +1190,9 @@ AC_CONFIG_FILES([Makefile \
plugins/omstdout/Makefile \
plugins/pmrfc3164sd/Makefile \
plugins/pmlastmsg/Makefile \
+ plugins/pmcisconames/Makefile \
+ plugins/pmsnare/Makefile \
+ plugins/pmaixforwardedfrom/Makefile \
plugins/omruleset/Makefile \
plugins/omdbalerting/Makefile \
plugins/omuxsock/Makefile \
@@ -1159,6 +1245,7 @@ echo
echo "---{ output plugins }---"
echo " Mail support enabled: $enable_mail"
echo " omprog module will be compiled: $enable_omprog"
+echo " output mongodb module will be compiled: $enable_ommongodb"
echo " omstdout module will be compiled: $enable_omstdout"
echo " omhdfs module will be compiled: $enable_omhdfs"
echo " omruleset module will be compiled: $enable_omruleset"
@@ -1170,6 +1257,9 @@ echo
echo "---{ parser modules }---"
echo " pmrfc3164sd module will be compiled: $enable_pmrfc3164sd"
echo " pmlastmsg module will be compiled: $enable_pmlastmsg"
+echo " pmcisconames module will be compiled: $enable_pmcisconames"
+echo " pmaixforwardedfrom module w.be compiled: $enable_pmaixforwardedfrom"
+echo " pmsnare module will be compiled: $enable_pmsnare"
echo
echo "---{ message modification modules }---"
echo " mmnormalize module will be compiled: $enable_mmnormalize"
diff --git a/doc/imfile.html b/doc/imfile.html
index f6b140a7..60726ceb 100644
--- a/doc/imfile.html
+++ b/doc/imfile.html
@@ -96,6 +96,14 @@ been processed. This setting can be used to guard against message duplication du
to fatal errors (like power fail). Note that this setting affects imfile
performance, especially when set to a low value. Frequently writing the state
file is very time consuming.
+<li><b>$InputFileReadMode</b> [mode]</b><br>
+Available in 5.7.5+
+<br>
+Mode to be used when reading lines. 0 (the default) means that each line is forwarded
+as its own log message.
+<li>$InputFileBindRuleset &lt;ruleset&gt;<br>
+Available in 5.7.5+, 6.1.5+
+Binds the listener to a specific <a href="multi_ruleset.html">ruleset</a>.</li>
</ul>
<b>Caveats/Known Bugs:</b>
<p>So far, only 100 files can be monitored. If more are needed,
diff --git a/doc/imptcp.html b/doc/imptcp.html
index d4228185..c7a0e599 100644
--- a/doc/imptcp.html
+++ b/doc/imptcp.html
@@ -53,6 +53,11 @@ name is not strictly necessary, but can be useful to apply filtering based on wh
the message was received from.
<li>$InputPTCPServerBindRuleset &lt;name&gt;<br>
Binds specified ruleset to next server defined.
+<li>$InputPTCPHelperThreads &lt;number&gt;<br>
+Number of helper worker threads to process incoming messages. These
+threads are utilized to pull data off the network. On a busy system, additional
+helper threads (but not more than there are CPUs/Cores) can help improving
+performance. The default value is two.
<li>$InputPTCPServerListenIP &lt;name&gt;<br>
On multi-homed machines, specifies to which local address the next listerner should
be bound.
diff --git a/doc/manual.html b/doc/manual.html
index 49a4300b..e82d4cba 100644
--- a/doc/manual.html
+++ b/doc/manual.html
@@ -19,7 +19,7 @@ rsyslog support</a> available directly from the source!</p>
<p><b>Please visit the <a href="http://www.rsyslog.com/sponsors">rsyslog sponsor's page</a>
to honor the project sponsors or become one yourself!</b> We are very grateful for any help towards the
project goals.</p>
-<p><b>This documentation is for version 6.1.2 (devel branch) of rsyslog.</b>
+<p><b>This documentation is for version 6.1.5 (devel branch) of rsyslog.</b>
Visit the <i><a href="http://www.rsyslog.com/status">rsyslog status page</a></i></b>
to obtain current version information and project status.
</p><p><b>If you like rsyslog, you might
@@ -41,7 +41,6 @@ if you do not read the doc, but doing so will definitely improve your experience
<li><a href="rsyslog_conf.html">configuration file syntax (rsyslog.conf)</a></li>
<li><a href="http://www.rsyslog.com/tool-regex">a regular expression checker/generator tool for rsyslog</a></li>
<li> <a href="property_replacer.html">property replacer, an important core component</a></li>
-<li>a commented <a href="sample.conf.html">sample rsyslog.conf</a> </li>
<li><a href="bugs.html">rsyslog bug list</a></li>
<li><a href="messageparser.html">understanding rsyslog message parsers</a></li>
<li><a href="generic_design.html">backgrounder on generic syslog application design</a></li>
diff --git a/doc/ommysql.html b/doc/ommysql.html
index 9b35b402..daef9cab 100644
--- a/doc/ommysql.html
+++ b/doc/ommysql.html
@@ -24,6 +24,18 @@ directive configuration system.
a non-standard port for the MySQL server. The default is 0, which means the
system default port is used. There is no need to specify this directive unless
you know the server is running on a non-standard listen port.
+<li><b>$OmMySQLConfigFile &lt;file name&gt;</b><br>Permits the selection
+of an optional MySQL Client Library configuration file (my.cnf) for extended
+configuration functionality. The use of this configuration directive is necessary
+only if you have a non-standard environment or if fine-grained control over the
+database connection is desired.</li>
+<li><b>$OmMySQLConfigSection &lt;string&gt;</b><br>Permits the selection of the
+section within the configuration file specified by the <b>$OmMySQLConfigFile</b> directive.
+<br>This will likely only be used where the database administrator provides a single
+configuration file with multiple profiles.
+<br>This configuration directive is ignored unless <b>$OmMySQLConfigFile</b> is also used
+in the rsyslog configration file.
+<br>If omitted, the MySQL Client Library default of &quot;client&quot; will be used.</li>
<li>Action parameters:
<br><b>:ommysql:database-server,database-name,database-userid,database-password</b>
<br>All parameters should be filled in for a successful connect.
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 36a2c015..bd44fd55 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -48,6 +48,7 @@
#include "unicode-helper.h"
#include "prop.h"
#include "stringbuf.h"
+#include "ruleset.h"
MODULE_TYPE_INPUT /* must be present for input modules, do not remove */
@@ -60,6 +61,7 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(datetime)
DEFobjCurrIf(strm)
DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
typedef struct fileInfo_s {
uchar *pszFileName;
@@ -71,6 +73,8 @@ typedef struct fileInfo_s {
int nRecords; /**< How many records did we process before persisting the stream? */
int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */
strm_t *pStrm; /* its stream (NULL if not assigned) */
+ int readMode; /* which mode to use in ReadMulteLine call? */
+ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
} fileInfo_t;
@@ -85,6 +89,8 @@ static int iPollInterval = 10; /* number of seconds to sleep when there was no f
static int iPersistStateInterval = 0; /* how often if state file to be persisted? (default 0->never) */
static int iFacility = 128; /* local0 */
static int iSeverity = 5; /* notice, as of rfc 3164 */
+static int readMode = 0; /* mode to use for ReadMultiLine call */
+static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */
#define MAX_INPUT_FILES 100
@@ -114,6 +120,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
MsgSetTAG(pMsg, pInfo->pszTag, pInfo->lenTag);
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
+ MsgSetRuleset(pMsg, pInfo->pRuleset);
CHKiRet(submitMsg(pMsg));
finalize_it:
RETiRet;
@@ -211,8 +218,8 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
/* loop below will be exited when strmReadLine() returns EOF */
- while(1) {
- CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr));
+ while(glbl.GetGlobalInputTermState() == 0) {
+ CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode));
*pbHadFileData = 1; /* this is just a flag, so set it and forget it */
CHKiRet(enqLine(pThis, pCStr)); /* process line */
rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */
@@ -287,23 +294,24 @@ BEGINrunInput
int bHadFileData; /* were there at least one file with data during this run? */
CODESTARTrunInput
pthread_cleanup_push(inputModuleCleanup, NULL);
- while(1) {
-
+ while(glbl.GetGlobalInputTermState() == 0) {
do {
bHadFileData = 0;
for(i = 0 ; i < iFilPtr ; ++i) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
pollFile(&files[i], &bHadFileData);
}
- } while(iFilPtr > 1 && bHadFileData == 1); /* warning: do...while()! */
+ } while(iFilPtr > 1 && bHadFileData == 1 && glbl.GetGlobalInputTermState() == 0); /* warning: do...while()! */
/* Note: the additional 10ns wait is vitally important. It guards rsyslog against totally
* hogging the CPU if the users selects a polling interval of 0 seconds. It doesn't hurt any
* other valid scenario. So do not remove. -- rgerhards, 2008-02-14
*/
- srSleep(iPollInterval, 10);
-
+ if(glbl.GetGlobalInputTermState() == 0)
+ srSleep(iPollInterval, 10);
}
- /*NOTREACHED*/
+ DBGPRINTF("imfile: terminating upon request of rsyslog core\n");
pthread_cleanup_pop(0); /* just for completeness, but never called... */
RETiRet; /* use it to make sure the housekeeping is done! */
@@ -396,6 +404,13 @@ CODESTARTafterRun
ENDafterRun
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
/* The following entry points are defined in module-template.h.
* In general, they need to be present, but you do NOT need to provide
* any code here.
@@ -408,12 +423,14 @@ CODESTARTmodExit
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
@@ -447,6 +464,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iPollInterval = 10;
iFacility = 128; /* local0 */
iSeverity = 5; /* notice, as of rfc 3164 */
+ readMode = 0;
+ pBindRuleset = NULL;
RETiRet;
}
@@ -489,6 +508,8 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal)
pThis->iFacility = iFacility;
pThis->iPersistStateInterval = iPersistStateInterval;
pThis->nRecords = 0;
+ pThis->readMode = readMode;
+ pThis->pRuleset = pBindRuleset;
iPersistStateInterval = 0;
} else {
errmsg.LogError(0, RS_RET_OUT_OF_DESRIPTORS, "Too many file monitors configured - ignoring this one");
@@ -504,6 +525,29 @@ finalize_it:
RETiRet;
}
+
+/* accept a new ruleset to bind. Checks if it exists and complains, if not */
+static rsRetVal
+setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
+{
+ ruleset_t *pRuleset;
+ rsRetVal localRet;
+ DEFiRet;
+
+ localRet = ruleset.GetRuleset(&pRuleset, pszName);
+ if(localRet == RS_RET_NOT_FOUND) {
+ errmsg.LogError(0, NO_ERRCODE, "error: ruleset '%s' not found - ignored", pszName);
+ }
+ CHKiRet(localRet);
+ pBindRuleset = pRuleset;
+ DBGPRINTF("imfile current bind ruleset %p: '%s'\n", pRuleset, pszName);
+
+finalize_it:
+ free(pszName); /* no longer needed */
+ RETiRet;
+}
+
+
/* modInit() is called once the module is loaded. It must perform all module-wide
* initialization tasks. There are also a number of housekeeping tasks that the
* framework requires. These are handled by the macros. Please note that the
@@ -521,8 +565,10 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(strm, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ DBGPRINTF("imfile: version %s initializing\n", VERSION);
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilename", 0, eCmdHdlrGetWord,
NULL, &pszFileName, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfiletag", 0, eCmdHdlrGetWord,
@@ -535,8 +581,12 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &iFacility, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepollinterval", 0, eCmdHdlrInt,
NULL, &iPollInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt,
+ NULL, &readMode, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt,
NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord,
+ setRuleset, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
/* that command ads a new file! */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputrunfilemonitor", 0, eCmdHdlrGetWord,
addMonitor, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 2ff292c2..63447a72 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -10,7 +10,7 @@
*
* File begun on 2010-08-10 by RGerhards
*
- * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -83,7 +83,8 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(ruleset)
-
+/* forward references */
+static void * wrkr(void *myself);
/* config settings */
typedef struct configSettings_s {
@@ -92,6 +93,7 @@ typedef struct configSettings_s {
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
uchar *lstnIP; /* which IP we should listen on? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int wrkrMax; /* max number of workers (actually "helper workers") */
} configSettings_t;
static configSettings_t cs;
@@ -117,6 +119,7 @@ struct ptcpsrv_s {
ruleset_t *pRuleset;
ptcplstn_t *pLstn; /* root of our listeners */
ptcpsess_t *pSess; /* root of our sessions */
+ pthread_mutex_t mutSessLst;
};
/* the ptcp session object. Describes a single active session.
@@ -154,6 +157,20 @@ struct ptcplstn_s {
};
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ struct epoll_event *event; /* event == NULL -> idle */
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[16];
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrRunning;
+
+
/* type of object stored in epoll descriptor */
typedef enum {
epolld_lstn,
@@ -171,20 +188,10 @@ struct epolld_s {
/* global data */
-//static permittedPeers_t *pPermPeersRoot = NULL;
+pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */
static ptcpsrv_t *pSrvRoot = NULL;
static int epollfd = -1; /* (sole) descriptor for epoll */
static int iMaxLine; /* maximum size of a single message */
-/* we use a single static receive buffer, as this module is not multi-threaded. Keeping
- * the buffer in the data segment is probably a little bit more efficient than on the stack
- * (but at least I can't believe it will ever be less efficient ;) -- rgerhards, 2010-08-10
- * Note that we do NOT (yet?) provide a config setting to set the buffer size. For usual
- * syslog traffic, it should be large enough. Also keep in mind that we run under a virtual
- * memory system, so if we do not use large parts of the buffer, that's no issue at
- * all -- it'll just use up address space. On the other hand, it would be silly to page in
- * or page out some data just to get space for the IO buffer.
- */
-static char rcvBuf[128*1024];
/* forward definitions */
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
@@ -209,6 +216,7 @@ static void
destructSrv(ptcpsrv_t *pSrv)
{
prop.Destruct(&pSrv->pInputName);
+ pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->port);
free(pSrv);
}
@@ -678,6 +686,7 @@ static inline void
initConfigSettings(void)
{
cs.bEmitMsgOnClose = 0;
+ cs.wrkrMax = 2;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
cs.pszInputName = NULL;
cs.pRuleset = NULL;
@@ -790,10 +799,12 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP)
/* add to start of server's listener list */
pSess->prev = NULL;
+ pthread_mutex_lock(&pSrv->mutSessLst);
pSess->next = pSrv->pSess;
if(pSrv->pSess != NULL)
pSrv->pSess->prev = pSess;
pSrv->pSess = pSess;
+ pthread_mutex_unlock(&pSrv->mutSessLst);
iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd);
@@ -816,10 +827,8 @@ closeSess(ptcpsess_t *pSess)
CHKiRet(removeEPollSock(sock, pSess->epd));
close(sock);
+ pthread_mutex_lock(&pSess->pSrv->mutSessLst);
/* finally unlink session from structures */
-//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev);
-//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next);
-//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev);
if(pSess->next != NULL)
pSess->next->prev = pSess->prev;
if(pSess->prev == NULL) {
@@ -828,6 +837,7 @@ closeSess(ptcpsess_t *pSess)
} else {
pSess->prev->next = pSess->next;
}
+ pthread_mutex_unlock(&pSess->pSrv->mutSessLst);
/* unlinked, now remove structure */
destructSess(pSess);
@@ -838,21 +848,6 @@ finalize_it:
}
-#if 0
-/* set permitted peer -- rgerhards, 2008-05-19
- */
-static rsRetVal
-setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID)
-{
- DEFiRet;
- CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID));
- free(pszID); /* no longer needed, but we need to free as of interface def */
-finalize_it:
- RETiRet;
-}
-#endif
-
-
/* accept a new ruleset to bind. Checks if it exists and complains, if not */
static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
{
@@ -880,6 +875,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
ptcpsrv_t *pSrv;
CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t)));
+ pthread_mutex_init(&pSrv->mutSessLst, NULL);
pSrv->pSess = NULL;
pSrv->pLstn = NULL;
pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose;
@@ -911,6 +907,46 @@ finalize_it:
}
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+startWorkerPool(void)
+{
+ int i;
+ wrkrRunning = 0;
+ if(cs.wrkrMax > 16)
+ cs.wrkrMax = 16; /* TODO: make dynamic? */
+ pthread_mutex_init(&wrkrMut, NULL);
+ pthread_cond_init(&wrkrIdle, NULL);
+ for(i = 0 ; i < cs.wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].event = NULL;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i]));
+ }
+
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < cs.wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+
+
+
/* start up all listeners
* This is a one-time stop once the module is set to start.
*/
@@ -922,7 +958,7 @@ startupServers()
pSrv = pSrvRoot;
while(pSrv != NULL) {
- DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
+ DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
startupSrv(pSrv);
pSrv = pSrv->pNext;
}
@@ -944,9 +980,9 @@ lstnActivity(ptcplstn_t *pLstn)
DEFiRet;
DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
- while(1) {
+ while(glbl.GetGlobalInputTermState() == 0) {
localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP);
- if(localRet == RS_RET_NO_MORE_DATA)
+ if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1)
break;
CHKiRet(localRet);
CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP));
@@ -965,6 +1001,7 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
+ char rcvBuf[128*1024];
DEFiRet;
DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock);
@@ -1002,35 +1039,127 @@ finalize_it:
}
+/* This function is called to process a single request. This may
+ * be carried out by the main worker or a helper. It can be run
+ * concurrently.
+ */
+static inline void
+processWorkItem(struct epoll_event *event)
+{
+ epolld_t *epd;
+
+ epd = (epolld_t*) event->data.ptr;
+ switch(epd->typ) {
+ case epolld_lstn:
+ lstnActivity((ptcplstn_t *) epd->ptr);
+ break;
+ case epolld_sess:
+ sessActivity((ptcpsess_t *) epd->ptr);
+ break;
+ default:
+ errmsg.LogError(0, RS_RET_INTERNAL_ERROR,
+ "error: invalid epolld_type_t %d after epoll", epd->typ);
+ break;
+ }
+}
+
+
+/* This function is called to process a complete workset, that
+ * is a set of events returned from epoll.
+ */
+static inline void
+processWorkSet(int nEvents, struct epoll_event events[])
+{
+ int iEvt;
+ int i;
+ int remainEvents;
+
+ remainEvents = nEvents;
+ for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) {
+ if(remainEvents == 1) {
+ /* process self, save context switch */
+ processWorkItem(events+iEvt);
+ } else {
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < cs.wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i)
+ /*do search*/;
+ if(i < cs.wrkrMax) {
+ /* worker free -> use it! */
+ wrkrInfo[i].event = events+iEvt;
+ ++wrkrRunning;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorkItem(events+iEvt);
+ }
+ }
+ --remainEvents;
+ }
+
+ if(nEvents > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+ }
+
+}
+
+
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->event == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1)
+ break;
+ pthread_mutex_unlock(&wrkrMut);
+
+ ++me->numCalled;
+ processWorkItem(me->event);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->event = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
/* This function is called to gather input.
*/
BEGINrunInput
- int i;
- int nfds;
- struct epoll_event events[1];
- epolld_t *epd;
+ int nEvents;
+ struct epoll_event events[128];
CODESTARTrunInput
- DBGPRINTF("imptcp now beginning to process input data\n");
- /* v5 TODO: consentual termination mode */
- while(1) {
+ startWorkerPool();
+ DBGPRINTF("imptcp: now beginning to process input data\n");
+ while(glbl.GetGlobalInputTermState() == 0) {
DBGPRINTF("imptcp going on epoll_wait\n");
- nfds = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
- for(i = 0 ; i < nfds ; ++i) { /* support for larger batches (later, TODO) */
- epd = (epolld_t*) events[i].data.ptr;
- switch(epd->typ) {
- case epolld_lstn:
- lstnActivity((ptcplstn_t *) epd->ptr);
- break;
- case epolld_sess:
- sessActivity((ptcpsess_t *) epd->ptr);
- break;
- default:
- errmsg.LogError(0, RS_RET_INTERNAL_ERROR,
- "error: invalid epolld_type_t %d after epoll", epd->typ);
- break;
- }
- }
+ nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
+ DBGPRINTF("imptcp: epoll returned %d events\n", nEvents);
+ processWorkSet(nEvents, events);
}
+ DBGPRINTF("imptcp: successfully terminated\n");
+ /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */
ENDrunInput
@@ -1038,7 +1167,6 @@ ENDrunInput
BEGINwillRun
CODESTARTwillRun
/* first apply some config settings */
- //net.PrintAllowedSenders(2); /* TCP */
iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */
if(pSrvRoot == NULL) {
@@ -1104,8 +1232,8 @@ shutdownSrv(ptcpsrv_t *pSrv)
BEGINafterRun
ptcpsrv_t *pSrv, *srvDel;
CODESTARTafterRun
- /* do cleanup here */
- //net.clearAllowedSenders(UCHAR_CONSTANT("TCP"));
+ stopWorkerPool();
+
/* we need to close everything that is still open */
pSrv = pSrvRoot;
while(pSrv != NULL) {
@@ -1121,12 +1249,7 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
-#if 0
- if(pPermPeersRoot != NULL) {
- net.DestructPermittedPeers(&pPermPeersRoot);
- }
-#endif
-
+ pthread_attr_destroy(&wrkrThrdAttr);
/* release objects we used */
objRelease(glbl, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
@@ -1141,6 +1264,7 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
cs.bEmitMsgOnClose = 0;
+ cs.wrkrMax = 2;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(cs.pszInputName);
cs.pszInputName = NULL;
@@ -1150,10 +1274,17 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
}
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
@@ -1170,6 +1301,10 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ /* initialize "read-only" thread attributes */
+ pthread_attr_init(&wrkrThrdAttr);
+ pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024);
+
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
@@ -1177,6 +1312,8 @@ CODEmodInit_QueryRegCFSLineHdlr
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt,
+ NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0,
eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0,
diff --git a/plugins/imudp/Makefile.am b/plugins/imudp/Makefile.am
index 517b1287..bc64b8c8 100644
--- a/plugins/imudp/Makefile.am
+++ b/plugins/imudp/Makefile.am
@@ -3,4 +3,4 @@ pkglib_LTLIBRARIES = imudp.la
imudp_la_SOURCES = imudp.c
imudp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
imudp_la_LDFLAGS = -module -avoid-version
-imudp_la_LIBADD =
+imudp_la_LIBADD = $(IMUDP_LIBS)
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index b960322e..d347b0ac 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -35,6 +35,9 @@
#if HAVE_SYS_EPOLL_H
# include <sys/epoll.h>
#endif
+#ifdef HAVE_SCHED_H
+# include <sched.h>
+#endif
#include "rsyslog.h"
#include "dirty.h"
#include "net.h"
@@ -78,14 +81,103 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a
* termination if we can not get it. -- rgerhards, 2007-12-27
*/
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
+static uchar *pszSchedPolicy = NULL; /* scheduling policy string */
+static int iSchedPolicy; /* scheduling policy as SCHED_xxx */
+static int iSchedPrio; /* scheduling priority */
+static int seen_iSchedPrio = 0; /* have we seen scheduling priority in the config file? */
static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
-static uchar *pszSchedPolicy = NULL; /**< scheduling policy (string) */
-static int iSchedPrio = -1; /**< scheduling priority (must not be negative) */
#define TIME_REQUERY_DFLT 2
static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */
/* config settings */
+static rsRetVal check_scheduling_priority(int report_error)
+{
+ DEFiRet;
+
+#ifdef HAVE_SCHED_GET_PRIORITY_MAX
+ if (iSchedPrio < sched_get_priority_min(iSchedPolicy) ||
+ iSchedPrio > sched_get_priority_max(iSchedPolicy)) {
+ if (report_error)
+ errmsg.LogError(errno, NO_ERRCODE,
+ "imudp: scheduling priority %d out of range (%d - %d)"
+ " for scheduling policy '%s' - ignoring settings",
+ iSchedPrio,
+ sched_get_priority_min(iSchedPolicy),
+ sched_get_priority_max(iSchedPolicy),
+ pszSchedPolicy);
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+#endif
+
+finalize_it:
+ RETiRet;
+}
+
+/* Set scheduling priority in the supplied variable (will be iSchedPrio)
+ * and record that we have seen the directive (in seen_iSchedPrio).
+ */
+static rsRetVal set_scheduling_priority(void *pVal, int value)
+{
+ DEFiRet;
+
+ if (seen_iSchedPrio) {
+ errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ *(int *)pVal = value;
+ seen_iSchedPrio = 1;
+ if (pszSchedPolicy != NULL)
+ CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+ RETiRet;
+}
+
+/* Set scheduling policy in iSchedPolicy */
+static rsRetVal set_scheduling_policy(void *pVal, uchar *pNewVal)
+{
+ int have_sched_policy = 0;
+ DEFiRet;
+
+ if (pszSchedPolicy != NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ *((uchar**)pVal) = pNewVal; /* pVal is pszSchedPolicy */
+ if (0) { /* trick to use conditional compilation */
+#ifdef SCHED_FIFO
+ } else if (!strcasecmp((char*)pszSchedPolicy, "fifo")) {
+ iSchedPolicy = SCHED_FIFO;
+ have_sched_policy = 1;
+#endif
+#ifdef SCHED_RR
+ } else if (!strcasecmp((char*)pszSchedPolicy, "rr")) {
+ iSchedPolicy = SCHED_RR;
+ have_sched_policy = 1;
+#endif
+#ifdef SCHED_OTHER
+ } else if (!strcasecmp((char*)pszSchedPolicy, "other")) {
+ iSchedPolicy = SCHED_OTHER;
+ have_sched_policy = 1;
+#endif
+ } else {
+ errmsg.LogError(errno, NO_ERRCODE,
+ "imudp: invalid scheduling policy '%s' "
+ "- ignoring setting", pszSchedPolicy);
+ }
+ if (have_sched_policy == 0) {
+ free(pszSchedPolicy);
+ pszSchedPolicy = NULL;
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ if (seen_iSchedPrio)
+ CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+ RETiRet;
+}
+
/* This function is called when a new listener shall be added. It takes
* the configured parameters, tries to bind the socket and, if that
@@ -296,6 +388,41 @@ finalize_it:
RETiRet;
}
+static void set_thread_schedparam(void)
+{
+ struct sched_param sparam;
+
+ if (pszSchedPolicy != NULL && seen_iSchedPrio == 0) {
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: scheduling policy set, but without priority - ignoring settings");
+ } else if (pszSchedPolicy == NULL && seen_iSchedPrio != 0) {
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: scheduling priority set, but without policy - ignoring settings");
+ } else if (pszSchedPolicy != NULL && seen_iSchedPrio != 0 &&
+ check_scheduling_priority(0) == 0) {
+#ifndef HAVE_PTHREAD_SETSCHEDPARAM
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: cannot set thread scheduling policy, "
+ "pthread_setschedparam() not available");
+#else
+ int err;
+
+ memset(&sparam, 0, sizeof sparam);
+ sparam.sched_priority = iSchedPrio;
+ dbgprintf("imudp trying to set sched policy to '%s', prio %d\n",
+ pszSchedPolicy, iSchedPrio);
+ err = pthread_setschedparam(pthread_self(), iSchedPolicy, &sparam);
+ if (err != 0) {
+ errmsg.LogError(err, NO_ERRCODE, "imudp: pthread_setschedparam() failed");
+ }
+#endif
+ }
+
+ if (pszSchedPolicy != NULL) {
+ free(pszSchedPolicy);
+ pszSchedPolicy = NULL;
+ }
+}
/* This function implements the main reception loop. Depending on the environment,
* we either use the traditional (but slower) select() or the Linux-specific epoll()
@@ -319,6 +446,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
+ set_thread_schedparam();
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
@@ -386,6 +514,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
+ set_thread_schedparam();
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
DBGPRINTF("imudp uses select()\n");
@@ -448,7 +577,6 @@ ENDrunInput
/* initialize and return if will run or not */
BEGINwillRun
- struct sched_param sparam;
CODESTARTwillRun
/* we need to create the inputName property (only once during our lifetime) */
CHKiRet(prop.Construct(&pInputName));
@@ -457,40 +585,6 @@ CODESTARTwillRun
net.PrintAllowedSenders(1); /* UDP */
net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */
-
- if(pszSchedPolicy == NULL) {
- if(iSchedPrio != -1) {
- errmsg.LogError(errno, NO_ERRCODE, "imudp: scheduling policy not set, but "
- "priority - ignoring settings");
- }
- } else {
- if(iSchedPrio == -1) {
- errmsg.LogError(errno, NO_ERRCODE, "imudp: scheduling policy set, but no "
- "priority - ignoring settings");
- }
- sparam.sched_priority = iSchedPrio;
- dbgprintf("imudp trying to set sched policy to '%s', prio %d\n",
- pszSchedPolicy, iSchedPrio);
- if(0) { /* trick to use conditional compilation */
-# ifdef SCHED_FIFO
- } else if(!strcasecmp((char*)pszSchedPolicy, "fifo")) {
- pthread_setschedparam(pthread_self(), SCHED_FIFO, &sparam);
-# endif
-# ifdef SCHED_RR
- } else if(!strcasecmp((char*)pszSchedPolicy, "rr")) {
- pthread_setschedparam(pthread_self(), SCHED_RR, &sparam);
-# endif
-# ifdef SCHED_OTHER
- } else if(!strcasecmp((char*)pszSchedPolicy, "other")) {
- pthread_setschedparam(pthread_self(), SCHED_OTHER, &sparam);
-# endif
- } else {
- errmsg.LogError(errno, NO_ERRCODE, "imudp: invliad scheduling policy '%s' "
- "ignoring settings", pszSchedPolicy);
- }
- free(pszSchedPolicy);
- pszSchedPolicy = NULL;
- }
/* if we could not set up any listners, there is no point in running... */
if(udpLstnSocks == NULL)
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index c1168c87..b90a3363 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -46,6 +46,7 @@
#include "net.h"
#include "glbl.h"
#include "msg.h"
+#include "parser.h"
#include "prop.h"
#include "debug.h"
#include "unlimited_select.h"
@@ -81,6 +82,7 @@ DEF_IMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(prop)
+DEFobjCurrIf(parser)
DEFobjCurrIf(datetime)
DEFobjCurrIf(statsobj)
@@ -143,7 +145,7 @@ static int startIndexUxLocalSockets; /* process fd from that index on (used to
* read-only after startup
*/
static int nfd = 1; /* number of Unix sockets open / read-only after startup */
-static int bSysSockFromSystemd = 0; /* Did we receive the system socket from systemd? */
+static int sd_fds = 0; /* number of systemd activated sockets */
/* config settings */
static int bOmitLocalLogging = 0;
@@ -372,41 +374,32 @@ openLogSocket(lstn_t *pLstn)
if(pLstn->sockName[0] == '\0')
return -1;
- if (ustrcmp(pLstn->sockName, UCHAR_CONSTANT(_PATH_LOG)) == 0) {
- bSysSockFromSystemd = 0; /* set default */
- int r;
-
- /* System log socket code. Check whether an FD was passed in from systemd. If
- * so, it's the /dev/log socket, so use it. */
-
- r = sd_listen_fds(0);
- if (r < 0) {
- errmsg.LogError(-r, NO_ERRCODE, "Failed to acquire systemd socket");
- ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
- }
-
- if (r > 1) {
- errmsg.LogError(EINVAL, NO_ERRCODE, "Wrong number of systemd sockets passed");
- ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
- }
-
- if (r == 1) {
- pLstn->fd = SD_LISTEN_FDS_START;
- r = sd_is_socket_unix(pLstn->fd, SOCK_DGRAM, -1, _PATH_LOG, 0);
- if (r < 0) {
- errmsg.LogError(-r, NO_ERRCODE, "Failed to verify systemd socket type");
- ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
- }
-
- if (!r) {
- errmsg.LogError(EINVAL, NO_ERRCODE, "Passed systemd socket of wrong type");
- ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
- }
- bSysSockFromSystemd = 1; /* indicate we got the socket from systemd */
- } else {
- CHKiRet(createLogSocket(pLstn));
+ pLstn->fd = -1;
+
+ if (sd_fds > 0) {
+ /* Check if the current socket is a systemd activated one.
+ * If so, just use it.
+ */
+ int fd;
+
+ for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + sd_fds; fd++) {
+ if( sd_is_socket_unix(fd, SOCK_DGRAM, -1, (const char*) pLstn->sockName, 0) == 1) {
+ /* ok, it matches -- just use as is */
+ pLstn->fd = fd;
+
+ dbgprintf("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n",
+ pLstn->sockName, pLstn->fd);
+ break;
+ }
+ /*
+ * otherwise it either didn't matched *this* socket and
+ * we just continue to check the next one or there were
+ * an error and we will create a new socket bellow.
+ */
}
- } else {
+ }
+
+ if (pLstn->fd == -1) {
CHKiRet(createLogSocket(pLstn));
}
@@ -510,6 +503,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
{
msg_t *pMsg;
int lenMsg;
+ int offs;
int i;
uchar *parse;
int pri;
@@ -527,13 +521,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
*/
parse = pRcv;
lenMsg = lenRcv;
+ offs = 1; /* '<' */
- parse++; lenMsg--; /* '<' */
+ parse++;
pri = 0;
- while(lenMsg && isdigit(*parse)) {
+ while(offs < lenMsg && isdigit(*parse)) {
pri = pri * 10 + *parse - '0';
++parse;
- --lenMsg;
+ ++offs;
}
facil = LOG_FAC(pri);
sever = LOG_PRI(pri);
@@ -552,12 +547,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
/* we now create our own message object and submit it to the queue */
CHKiRet(msgConstructWithTime(&pMsg, &st, tt));
MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv);
+ parser.SanitizeMsg(pMsg);
+ lenMsg = pMsg->iLenRawMsg - offs;
MsgSetInputName(pMsg, pInputName);
MsgSetFlowControlType(pMsg, pLstn->flowCtl);
pMsg->iFacility = facil;
pMsg->iSeverity = sever;
- MsgSetAfterPRIOffs(pMsg, lenRcv - lenMsg);
+ MsgSetAfterPRIOffs(pMsg, offs);
parse++; lenMsg--; /* '>' */
@@ -577,7 +574,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
fixPID(bufParseTAG, &i, cred);
MsgSetTAG(pMsg, bufParseTAG, i);
- MsgSetMSGoffs(pMsg, lenRcv - lenMsg);
+ MsgSetMSGoffs(pMsg, pMsg->iLenRawMsg - lenMsg);
if(pLstn->bParseHost) {
pMsg->msgFlags = pLstn->flags | PARSE_HOSTNAME;
@@ -609,7 +606,9 @@ static rsRetVal readSocket(lstn_t *pLstn)
int iMaxLine;
struct msghdr msgh;
struct iovec msgiov;
+# if HAVE_SCM_CREDENTIALS
struct cmsghdr *cm;
+# endif
struct ucred *cred;
uchar bufRcv[4096+1];
char aux[128];
@@ -633,11 +632,13 @@ static rsRetVal readSocket(lstn_t *pLstn)
memset(&msgh, 0, sizeof(msgh));
memset(&msgiov, 0, sizeof(msgiov));
+# if HAVE_SCM_CREDENTIALS
if(pLstn->bUseCreds) {
memset(&aux, 0, sizeof(aux));
msgh.msg_control = aux;
msgh.msg_controllen = sizeof(aux);
}
+# endif
msgiov.iov_base = pRcv;
msgiov.iov_len = iMaxLine;
msgh.msg_iov = &msgiov;
@@ -774,12 +775,18 @@ CODESTARTwillRun
listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0;
listeners[0].bWritePid = bWritePidSysSock;
+ sd_fds = sd_listen_fds(0);
+ if (sd_fds < 0) {
+ errmsg.LogError(-sd_fds, NO_ERRCODE, "imuxsock: Failed to acquire systemd socket");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+
/* initialize and return if will run or not */
actSocks = 0;
for (i = startIndexUxLocalSockets ; i < nfd ; i++) {
if(openLogSocket(&(listeners[i])) == RS_RET_OK) {
++actSocks;
- dbgprintf("Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd);
+ dbgprintf("imuxsock: Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd);
}
}
@@ -806,15 +813,19 @@ CODESTARTafterRun
if (listeners[i].fd != -1)
close(listeners[i].fd);
- /* Clean-up files. If systemd passed us a socket it is
- * systemd's job to clean it up.*/
- if(bSysSockFromSystemd) {
- DBGPRINTF("imuxsock: got system socket from systemd, not unlinking it\n");
- i = 1;
- } else
- i = startIndexUxLocalSockets;
- for(; i < nfd; i++)
+ /* Clean-up files. */
+ for(i = startIndexUxLocalSockets; i < nfd; i++)
if (listeners[i].sockName && listeners[i].fd != -1) {
+
+ /* If systemd passed us a socket it is systemd's job to clean it up.
+ * Do not unlink it -- we will get same socket (node) from systemd
+ * e.g. on restart again.
+ */
+ if (sd_fds > 0 &&
+ listeners[i].fd >= SD_LISTEN_FDS_START &&
+ listeners[i].fd < SD_LISTEN_FDS_START + sd_fds)
+ continue;
+
DBGPRINTF("imuxsock: unlinking unix socket file[%d] %s\n", i, listeners[i].sockName);
unlink((char*) listeners[i].sockName);
}
@@ -835,6 +846,7 @@ BEGINmodExit
CODESTARTmodExit
statsobj.Destruct(&modStats);
+ objRelease(parser, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
@@ -896,6 +908,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION);
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 1fe9b46b..1bf10bd7 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -477,7 +477,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string,
fileObjDestruct4Hashtable));
- CHKiRet(regCfSysLineHdlr((uchar *)"omhdfscs.fileName", 0, eCmdHdlrGetWord, NULL, &cs.fileName, NULL, eConfObjAction));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &cs.fileName, NULL, eConfObjAction));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &cs.hdfsHost, NULL, eConfObjAction));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &cs.hdfsPort, NULL, eConfObjAction));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.dfltTplName, NULL, eConfObjAction));
diff --git a/plugins/ommongodb/Makefile.am b/plugins/ommongodb/Makefile.am
new file mode 100644
index 00000000..1b0e23a1
--- /dev/null
+++ b/plugins/ommongodb/Makefile.am
@@ -0,0 +1,11 @@
+mongodir = ./mongo-c-driver/src
+pkglib_LTLIBRARIES = ommongodb.la
+
+ommongodb_la_SOURCES = ommongodb.c
+ommongodb_la_SOURCES += $(mongodir)/bson.c $(mongodir)/mongo.c $(mongodir)/md5.c $(mongodir)/numbers.c
+
+ommongodb_la_CPPFLAGS = -DMONGO_HAVE_STDINT -Imongo-c-driver/src $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+ommongodb_la_LDFLAGS = -module -avoid-version
+ommongodb_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/ommongodb/README b/plugins/ommongodb/README
new file mode 100644
index 00000000..cea3f3bc
--- /dev/null
+++ b/plugins/ommongodb/README
@@ -0,0 +1,23 @@
+plugin to use MongoDB as backend.
+
+tested in ubuntu 10.04 and ubuntu 10.10
+
+configuration:
+
+in your /etc/rsyslog.conf, together with other modules:
+$ModLoad ommongodb # provides mongodb support
+
+then in your /etc/rsyslog.d (check your distribution way to organize the configuration..) you create a file 10-mongodb.conf with the following content:
+
+#the format for the driver is :ommongodb:ip:db:collection;StdMongoDBFmt
+#if you want to change what is logged in the db, the template, you must change the source code since the keys are hardcoded
+$template StdMongoDBFmt,"%msg%%syslogfacility%%HOSTNAME%%syslogpriority%"
+*.* :ommongodb:127.0.0.1,syslog,logs;StdMongoDBFmt
+
+
+TODO
+we must ensure that the collection is a capped collection
+refactor my code :-)
+
+email Victor Pereira <victor.pereira@bigrails.com>
+twitter twitter.com/vpereira
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c
new file mode 100644
index 00000000..8e19105f
--- /dev/null
+++ b/plugins/ommongodb/ommongodb.c
@@ -0,0 +1,280 @@
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+#include <errno.h>
+#include <assert.h>
+#include <signal.h>
+#include <time.h>
+#include "bson.h"
+#include "mongo.h"
+#include "config.h"
+#include "rsyslog.h"
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "cfsysline.h"
+#include "mongo-c-driver/src/mongo.h"
+
+#define countof(X) ( (size_t) ( sizeof(X)/sizeof*(X) ) )
+
+#define DEFAULT_SERVER "127.0.0.1"
+#define DEFAULT_DATABASE "syslog"
+#define DEFAULT_COLLECTION "log"
+#define DEFAULT_DB_COLLECTION "syslog.log"
+
+//i just defined some constants, i couldt not find the limit
+#define MONGO_DB_NAME_SIZE 128
+#define MONGO_COLLECTION_NAME_SIZE 128
+
+MODULE_TYPE_OUTPUT
+/* internal structures
+ */
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+
+typedef struct _instanceData {
+ mongo_connection conn[1]; /* ptr */
+ mongo_connection_options opts[1];
+ mongo_conn_return status;
+ char db[MONGO_DB_NAME_SIZE];
+ char collection[MONGO_COLLECTION_NAME_SIZE];
+ char dbcollection[MONGO_DB_NAME_SIZE + MONGO_COLLECTION_NAME_SIZE + 1];
+ unsigned uLastMongoDBErrno;
+ //unsigned iSrvPort; /* sample: server port */
+} instanceData;
+
+char db[_DB_MAXDBLEN+2];
+static int iSrvPort = 27017;
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ /* use this to specify if select features are supported by this
+ * plugin. If not, the framework will handle that. Currently, only
+ * RepeatedMsgReduction ("last message repeated n times") is optional.
+ */
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+static void closeMongoDB(instanceData *pData)
+{
+ ASSERT(pData != NULL);
+
+ if(pData->conn != NULL) {
+ mongo_destroy( pData->conn );
+ memset(pData->conn,0x00,sizeof(mongo_connection));
+ }
+}
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ closeMongoDB(pData);
+ENDfreeInstance
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ /* nothing special here */
+ENDdbgPrintInstInfo
+
+/* log a database error with descriptive message.
+ * We check if we have a valid MongoDB handle. If not, we simply
+ * report an error
+ */
+static void reportDBError(instanceData *pData, int bSilent)
+{
+ char errMsg[512];
+ bson ErrObj;
+
+ ASSERT(pData != NULL);
+
+ /* output log message */
+ errno = 0;
+ if(pData->conn == NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MongoDB handle");
+ } else { /* we can ask mysql for the error description... */
+ //we should handle the error. if bSilent is set then we should print as debug
+ mongo_cmd_get_last_error(pData->conn, pData->db, &ErrObj);
+ bson_destroy(&ErrObj);
+ }
+
+ return;
+}
+
+/* The following function is responsible for initializing a
+ * MySQL connection.
+ * Initially added 2004-10-28 mmeckelein
+ */
+static rsRetVal initMongoDB(instanceData *pData, int bSilent)
+{
+ DEFiRet;
+
+ ASSERT(pData != NULL);
+ ASSERT(pData->conn == NULL);
+
+ //I'm trying to fallback to a default here
+ if(pData->opts->port == 0)
+ pData->opts->port = 27017;
+
+ if(pData->opts->host == 0x00)
+ strcpy(pData->opts->host,DEFAULT_SERVER);
+
+ if(pData->dbcollection == 0x00)
+ strcpy(pData->dbcollection,DEFAULT_DB_COLLECTION);
+
+ pData->status = mongo_connect(pData->conn, pData->opts );
+
+ switch (pData->status) {
+ case mongo_conn_success:
+ fprintf(stderr, "connection succeeded\n" );
+ iRet = RS_RET_OK;
+ break;
+ case mongo_conn_bad_arg:
+ errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
+ fprintf(stderr, "bad arguments\n" );
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case mongo_conn_no_socket:
+ errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
+ fprintf(stderr, "no socket\n" );
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case mongo_conn_fail:
+ errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
+ fprintf(stderr, "connection failed\n" );
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case mongo_conn_not_master:
+ errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
+ fprintf(stderr, "not master\n" );
+ iRet = RS_RET_SUSPENDED;
+ break;
+ }
+ RETiRet;
+}
+
+//we must implement it
+rsRetVal writeMongoDB(uchar *psz, instanceData *pData)
+{
+ char mydate[32];
+ char **szParams;
+ bson b[1];
+ bson_buffer buf[1];
+ bson_buffer_init( buf );
+ bson_append_new_oid(buf, "_id" );
+ memset(mydate,0x00,32);
+
+
+ DEFiRet;
+
+ ASSERT(psz != NULL);
+ ASSERT(pData != NULL);
+
+
+ /* see if we are ready to proceed */
+ if(pData->conn == NULL) {
+ CHKiRet(initMongoDB(pData, 0));
+ }
+
+szParams = (char**)(void*) psz;
+//We can make it beter
+//if you change the fields in your template, we must update it here
+//there is any C_metaprogramming_ninja there? :-)
+if(countof(szParams) > 0)
+{
+ bson_append_string( buf, "msg", szParams[0]);
+ bson_append_string( buf, "facility",szParams[1]);
+ bson_append_string( buf, "hostname", szParams[2] );
+ bson_append_string(buf, "priority",szParams[3]);
+ bson_append_int(buf,"count",countof(szParams));
+ bson_from_buffer( b, buf );
+ mongo_insert(pData->conn, pData->dbcollection, b );
+}
+
+if(b)
+ bson_destroy(b);
+
+
+ finalize_it:
+ if(iRet == RS_RET_OK) {
+ pData->uLastMongoDBErrno = 0; /* reset error for error supression */
+ }
+
+
+ RETiRet;
+}
+
+BEGINtryResume
+CODESTARTtryResume
+ if(pData->conn == NULL) {
+ iRet = initMongoDB(pData, 1);
+ }
+ENDtryResume
+
+BEGINdoAction
+CODESTARTdoAction
+ iRet = writeMongoDB(ppString[0], pData);
+ENDdoAction
+
+BEGINparseSelectorAct
+ //int iMongoDBPropErr = 0;
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+
+ if(!strncmp((char*) p, ":ommongodb:", sizeof(":ommongodb:") - 1)) {
+ p += sizeof(":ommongodb:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
+ } else {
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+ }
+
+ CHKiRet(createInstance(&pData));
+
+ if(getSubString(&p, pData->opts->host, MAXHOSTNAMELEN+1, ','))
+ strcpy(pData->opts->host,DEFAULT_SERVER);
+
+ //we must define the max db name
+ if(getSubString(&p,pData->db,255,','))
+ strcpy(pData->db,DEFAULT_DATABASE);
+ if(getSubString(&p,pData->collection,255,';'))
+ strcpy(pData->collection,DEFAULT_COLLECTION);
+ if(*(p-1) == ';')
+ --p;
+
+
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_ARRAY, (uchar*) " StdMongoDBFmt"));
+
+
+ pData->opts->port = (unsigned) iSrvPort; /* set configured port */
+ sprintf(pData->dbcollection,"%s.%s",pData->db,pData->collection);
+ CHKiRet(initMongoDB(pData, 0));
+
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+ENDqueryEtryPt
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+ DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION);
+ DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not ");
+ENDmodInit \ No newline at end of file
diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c
index 5b44d687..4b9d2f7e 100644
--- a/plugins/ommysql/ommysql.c
+++ b/plugins/ommysql/ommysql.c
@@ -62,10 +62,14 @@ typedef struct _instanceData {
char f_dbuid[_DB_MAXUNAMELEN+1]; /* DB user */
char f_dbpwd[_DB_MAXPWDLEN+1]; /* DB user's password */
unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */
+ uchar * f_configfile; /* MySQL Client Configuration File */
+ uchar * f_configsection; /* MySQL Client Configuration Section */
} instanceData;
typedef struct configSettings_s {
int iSrvPort; /* database server port */
+ uchar *pszMySQLConfigFile; /* MySQL Client Configuration File */
+ uchar *pszMySQLConfigSection; /* MySQL Client Configuration Section */
} configSettings_t;
SCOPING_SUPPORT; /* must be set AFTER configSettings_t is defined */
@@ -101,6 +105,14 @@ static void closeMySQL(instanceData *pData)
mysql_close(pData->f_hmysql);
pData->f_hmysql = NULL;
}
+ if(pData->f_configfile!=NULL){
+ free(pData->f_configfile);
+ pData->f_configfile=NULL;
+ }
+ if(pData->f_configsection!=NULL){
+ free(pData->f_configsection);
+ pData->f_configsection=NULL;
+ }
}
BEGINfreeInstance
@@ -162,6 +174,25 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent)
errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MySQL handle");
iRet = RS_RET_SUSPENDED;
} else { /* we could get the handle, now on with work... */
+ mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->f_configsection!=NULL)?(char*)pData->f_configsection:"client"));
+ if(pData->f_configfile!=NULL){
+ FILE * fp;
+ fp=fopen((char*)pData->f_configfile,"r");
+ int err=errno;
+ if(fp==NULL){
+ char msg[512];
+ snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->f_configfile);
+ if(bSilent) {
+ char errStr[512];
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ dbgprintf("mysql configuration error(%d): %s - %s\n",err,msg,errStr);
+ } else
+ errmsg.LogError(err,NO_ERRCODE,"mysql configuration error: %s\n",msg);
+ } else {
+ fclose(fp);
+ mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_FILE,pData->f_configfile);
+ }
+ }
/* Connect to database */
if(mysql_real_connect(pData->f_hmysql, pData->f_dbsrv, pData->f_dbuid,
pData->f_dbpwd, pData->f_dbname, pData->f_dbsrvPort, NULL, 0) == NULL) {
@@ -288,6 +319,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
} else {
pData->f_dbsrvPort = (unsigned) cs.iSrvPort; /* set configured port */
+ pData->f_configfile = cs.pszMySQLConfigFile;
+ pData->f_configsection = cs.pszMySQLConfigSection;
pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */
}
@@ -312,6 +345,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
{
DEFiRet;
cs.iSrvPort = 0; /* zero is the default port */
+ free(cs.pszMySQLConfigFile);
+ cs.pszMySQLConfigFile = NULL;
+ free(cs.pszMySQLConfigSection);
+ cs.pszMySQLConfigSection = NULL;
RETiRet;
}
@@ -323,6 +360,8 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
/* register our config handlers */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionommysqlserverport", 0, eCmdHdlrInt, NULL, &cs.iSrvPort, STD_LOADABLE_MODULE_ID, eConfObjAction));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"ommysqlconfigfile",0,eCmdHdlrGetWord,NULL,&cs.pszMySQLConfigFile,STD_LOADABLE_MODULE_ID, eConfObjAction));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"ommysqlconfigsection",0,eCmdHdlrGetWord,NULL,&cs.pszMySQLConfigSection,STD_LOADABLE_MODULE_ID, eConfObjAction));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjAction));
ENDmodInit
diff --git a/plugins/pmaixforwardedfrom/Makefile.am b/plugins/pmaixforwardedfrom/Makefile.am
new file mode 100644
index 00000000..af359d31
--- /dev/null
+++ b/plugins/pmaixforwardedfrom/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = pmaixforwardedfrom.la
+
+pmaixforwardedfrom_la_SOURCES = pmaixforwardedfrom.c
+pmaixforwardedfrom_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools
+pmaixforwardedfrom_la_LDFLAGS = -module -avoid-version
+pmaixforwardedfrom_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c b/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c
new file mode 100644
index 00000000..11634199
--- /dev/null
+++ b/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c
@@ -0,0 +1,167 @@
+/* pmaixforwardedfrom.c
+ *
+ * this detects logs sent by Cisco devices that mangle their syslog output when you tell them to log by name by adding ' :' between the name and the %XXX-X-XXXXXXX: tag
+ *
+ * instead of actually parsing the message, this modifies the message and then falls through to allow a later parser to handle the now modified message
+ *
+ * created 2010-12-13 by David Lang based on pmlastmsg
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <ctype.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "template.h"
+#include "msg.h"
+#include "module-template.h"
+#include "glbl.h"
+#include "errmsg.h"
+#include "parser.h"
+#include "datetime.h"
+#include "unicode-helper.h"
+
+MODULE_TYPE_PARSER
+PARSER_NAME("rsyslog.aixforwardedfrom")
+
+/* internal structures
+ */
+DEF_PMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(parser)
+DEFobjCurrIf(datetime)
+
+
+/* static data */
+static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATUREAutomaticSanitazion)
+ iRet = RS_RET_OK;
+ if(eFeat == sFEATUREAutomaticPRIParsing)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINparse
+ uchar *p2parse;
+ uchar *opening;
+ int lenMsg;
+#define OpeningText "Message forwarded from "
+CODESTARTparse
+ dbgprintf("Message will now be parsed by fix AIX Forwarded From parser.\n");
+ assert(pMsg != NULL);
+ assert(pMsg->pszRawMsg != NULL);
+ lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */
+ p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */
+
+ /* check if this message is of the type we handle in this (very limited) parser */
+ /* first, we permit SP */
+ while(lenMsg && *p2parse == ' ') {
+ --lenMsg;
+ ++p2parse;
+ }
+dbgprintf("pmaixforwardedfrom: msg to look at: [%d]'%s'\n", lenMsg, p2parse);
+ if((unsigned) lenMsg < 42) {
+ /* too short, can not be "our" message */
+ /* minimum message, 16 character timestamp, 'Message forwarded from ", 1 character name, ': '*/
+dbgprintf("msg too short!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+
+ /* skip over timestamp */
+ lenMsg -=16;
+ p2parse +=16;
+ /* if there is the string "Message forwarded from " were the hostname should be */
+ if(strncasecmp((char*) p2parse, OpeningText, sizeof(OpeningText)-1) != 0) {
+ /* wrong opening text */
+dbgprintf("not a AIX message forwarded from mangled log!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ /* bump the message portion up by 23 characters to overwrite the "Message forwarded from " with the hostname */
+ lenMsg -=23;
+ memmove(p2parse, p2parse + 23, lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=23;
+ pMsg->iLenMSG -=23;
+ /* now look for the : after the hostname to walk past the hostname, also watch for a space in case this isn't really an AIX log, but has a similar preamble */
+ while(lenMsg && *p2parse != ' ' && *p2parse != ':') {
+ --lenMsg;
+ ++p2parse;
+ }
+ if (lenMsg && *p2parse != ':') {
+dbgprintf("not a AIX message forwarded from mangled log but similar enough that the preamble has been removed\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ /* bump the message portion up by one character to overwrite the extra : */
+ lenMsg -=1;
+ memmove(p2parse, p2parse + 1, lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=1;
+ pMsg->iLenMSG -=1;
+ /* now, claim to abort so that something else can parse the now modified message */
+ DBGPRINTF("pmaixforwardedfrom: new mesage: [%d]'%s'\n", lenMsg, pMsg->pszRawMsg + pMsg->offAfterPRI);
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+
+finalize_it:
+ENDparse
+
+
+BEGINmodExit
+CODESTARTmodExit
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_PMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+
+ DBGPRINTF("aixforwardedfrom parser init called, compiled with version %s\n", VERSION);
+ bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */
+
+
+ENDmodInit
+
+/* vim:set ai:
+ */
diff --git a/plugins/pmcisconames/Makefile.am b/plugins/pmcisconames/Makefile.am
new file mode 100644
index 00000000..16ed347d
--- /dev/null
+++ b/plugins/pmcisconames/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = pmcisconames.la
+
+pmcisconames_la_SOURCES = pmcisconames.c
+pmcisconames_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools
+pmcisconames_la_LDFLAGS = -module -avoid-version
+pmcisconames_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/pmcisconames/pmcisconames.c b/plugins/pmcisconames/pmcisconames.c
new file mode 100644
index 00000000..4171e688
--- /dev/null
+++ b/plugins/pmcisconames/pmcisconames.c
@@ -0,0 +1,177 @@
+/* pmcisconames.c
+ *
+ * this detects logs sent by Cisco devices that mangle their syslog output when you tell them to log by name by adding ' :' between the name and the %XXX-X-XXXXXXX: tag
+ *
+ * instead of actually parsing the message, this modifies the message and then falls through to allow a later parser to handle the now modified message
+ *
+ * created 2010-12-13 by David Lang based on pmlastmsg
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <ctype.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "template.h"
+#include "msg.h"
+#include "module-template.h"
+#include "glbl.h"
+#include "errmsg.h"
+#include "parser.h"
+#include "datetime.h"
+#include "unicode-helper.h"
+
+MODULE_TYPE_PARSER
+PARSER_NAME("rsyslog.cisconames")
+
+/* internal structures
+ */
+DEF_PMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(parser)
+DEFobjCurrIf(datetime)
+
+
+/* static data */
+static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATUREAutomaticSanitazion)
+ iRet = RS_RET_OK;
+ if(eFeat == sFEATUREAutomaticPRIParsing)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINparse
+ uchar *p2parse;
+ int lenMsg;
+#define OpeningText ": %"
+CODESTARTparse
+ dbgprintf("Message will now be parsed by fix Cisco Names parser.\n");
+ assert(pMsg != NULL);
+ assert(pMsg->pszRawMsg != NULL);
+ lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */
+ p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */
+
+ /* check if this message is of the type we handle in this (very limited) parser */
+ /* first, we permit SP */
+ while(lenMsg && *p2parse == ' ') {
+ --lenMsg;
+ ++p2parse;
+ }
+dbgprintf("pmcisconames: msg to look at: [%d]'%s'\n", lenMsg, p2parse);
+ if((unsigned) lenMsg < 34) {
+ /* too short, can not be "our" message */
+ /* minimum message, 16 character timestamp, 1 character name, ' : %ASA-1-000000: '*/
+dbgprintf("msg too short!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ /* check if the timestamp is a 16 character or 21 character timestamp
+ 'Mmm DD HH:MM:SS ' spaces at 3,6,15 : at 9,12
+ 'Mmm DD YYYY HH:MM:SS ' spaces at 3,6,11,20 : at 14,17
+ check for the : first as that will differentiate the two conditions the fastest
+ this allows the compiler to short circuit the rst of the tests if it is the wrong timestamp
+ but still check the rest to see if it looks correct
+ */
+ if ( *(p2parse + 9) == ':' && *(p2parse + 12) == ':' && *(p2parse + 3) == ' ' && *(p2parse + 6) == ' ' && *(p2parse + 15) == ' ') {
+ /* skip over timestamp */
+ dbgprintf("short timestamp found\n");
+ lenMsg -=16;
+ p2parse +=16;
+ } else {
+ if ( *(p2parse + 14) == ':' && *(p2parse + 17) == ':' && *(p2parse + 3) == ' ' && *(p2parse + 6) == ' ' && *(p2parse + 11) == ' ' && *(p2parse + 20) == ' ') {
+ /* skip over timestamp */
+ dbgprintf("long timestamp found\n");
+ lenMsg -=21;
+ p2parse +=21;
+ } else {
+ dbgprintf("timestamp is not one of the valid formats\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ }
+ /* now look for the next space to walk past the hostname */
+ while(lenMsg && *p2parse != ' ') {
+ --lenMsg;
+ ++p2parse;
+ }
+ /* skip the space after the hostname */
+ lenMsg -=1;
+ p2parse +=1;
+ /* if the syslog tag is : and the next thing starts with a % assume that this is a mangled cisco log and fix it */
+ if(strncasecmp((char*) p2parse, OpeningText, sizeof(OpeningText)-1) != 0) {
+ /* wrong opening text */
+dbgprintf("not a cisco name mangled log!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ /* bump the message portion up by two characters to overwrite the extra : */
+ lenMsg -=2;
+ memmove(p2parse, p2parse + 2, lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=2;
+ pMsg->iLenMSG -=2;
+ /* now, claim to abort so that something else can parse the now modified message */
+ DBGPRINTF("pmcisconames: new mesage: [%d]'%s'\n", lenMsg, p2parse);
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+
+finalize_it:
+ENDparse
+
+
+BEGINmodExit
+CODESTARTmodExit
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_PMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+
+ DBGPRINTF("cisconames parser init called, compiled with version %s\n", VERSION);
+ bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */
+
+
+ENDmodInit
+
+/* vim:set ai:
+ */
diff --git a/plugins/pmsnare/Makefile.am b/plugins/pmsnare/Makefile.am
new file mode 100644
index 00000000..5b2696ac
--- /dev/null
+++ b/plugins/pmsnare/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = pmsnare.la
+
+pmsnare_la_SOURCES = pmsnare.c
+pmsnare_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools
+pmsnare_la_LDFLAGS = -module -avoid-version
+pmsnare_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/pmsnare/pmsnare.c b/plugins/pmsnare/pmsnare.c
new file mode 100644
index 00000000..4a9880d4
--- /dev/null
+++ b/plugins/pmsnare/pmsnare.c
@@ -0,0 +1,238 @@
+/* pmsnare.c
+ *
+ * this detects logs sent by Snare and cleans them up so that they can be processed by the normal parser
+ *
+ * there are two variations of this, if the client is set to 'syslog' mode it sends
+ *
+ * <pri>timestamp<sp>hostname<sp>tag<tab>otherstuff
+ *
+ * if the client is not set to syslog it sends
+ *
+ * hostname<tab>tag<tab>otherstuff
+ *
+ * ToDo, take advantage of items in the message itself to set more friendly information
+ * where the normal parser will find it by re-writing more of the message
+ *
+ * Intereting information includes:
+ *
+ * in the case of windows snare messages:
+ * the system hostname is field 12
+ * the severity is field 3 (criticality ranging form 0 to 4)
+ * the source of the log is field 4 and may be able to be mapped to facility
+ *
+ *
+ * created 2010-12-13 by David Lang based on pmlastmsg
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <ctype.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "template.h"
+#include "msg.h"
+#include "module-template.h"
+#include "glbl.h"
+#include "errmsg.h"
+#include "parser.h"
+#include "datetime.h"
+#include "unicode-helper.h"
+
+MODULE_TYPE_PARSER
+PARSER_NAME("rsyslog.snare")
+
+/* internal structures
+ */
+DEF_PMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(parser)
+DEFobjCurrIf(datetime)
+
+
+/* static data */
+static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATUREAutomaticSanitazion)
+ iRet = RS_RET_OK;
+ if(eFeat == sFEATUREAutomaticPRIParsing)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINparse
+ uchar *p2parse;
+ int lenMsg;
+ int snaremessage;
+ int tablength;
+
+CODESTARTparse
+ #define TabRepresentation "#011"
+ tablength=sizeof(TabRepresentation);
+ dbgprintf("Message will now be parsed by fix Snare parser.\n");
+ assert(pMsg != NULL);
+ assert(pMsg->pszRawMsg != NULL);
+
+ /* check if this message is of the type we handle in this (very limited) parser
+
+ find out if we have a space separated or tab separated for the first item
+ if tab separated see if the second word is one of our expected tags
+ if so replace the tabs with spaces so that hostname and syslog tag are going to be parsed properly
+ optionally replace the hostname at the beginning of the message with one from later in the message
+ else, wrong message, abort
+ else, assume that we have a valid timestamp, move over to the syslog tag
+ if that is tab separated from the rest of the message and one of our expected tags
+ if so, replace the tab with a space so that it will be parsed properly
+ optionally replace the hostname at the beginning of the message withone from later in the message
+
+ */
+ snaremessage=0;
+ lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */
+ p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */
+ dbgprintf("pmsnare: msg to look at: [%d]'%s'\n", lenMsg, p2parse);
+ if((unsigned) lenMsg < 30) {
+ /* too short, can not be "our" message */
+ dbgprintf("msg too short!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+
+ while(lenMsg && *p2parse != ' ' && *p2parse != '\t' && *p2parse != '#') {
+ --lenMsg;
+ ++p2parse;
+ }
+ dbgprintf("pmsnare: separator [%d]'%s' msg after the first separator: [%d]'%s'\n", tablength,TabRepresentation,lenMsg, p2parse);
+ if ((lenMsg > tablength) && (*p2parse == '\t' || strncasecmp((char*) p2parse, TabRepresentation , tablength-1) == 0)) {
+ //if ((lenMsg > tablength) && (*p2parse == '\t' || *p2parse == '#')) {
+ dbgprintf("pmsnare: tab separated message\n");
+ if(strncasecmp((char*) (p2parse + tablength - 1), "MSWinEventLog", 13) == 0) {
+ snaremessage=13; /* 0 means not a snare message, a number is how long the tag is */
+ }
+ if(strncasecmp((char*) (p2parse + tablength - 1), "LinuxKAudit", 11) == 0) {
+ snaremessage=11; /* 0 means not a snare message, a number is how long the tag is */
+ }
+ if(snaremessage) {
+ /* replace the tab with a space and if needed move the message portion up by the length of TabRepresentation -2 characters to overwrite the extra : */
+ *p2parse = ' ';
+ lenMsg -=(tablength-2);
+ p2parse++;
+ lenMsg--;
+ memmove(p2parse, p2parse + (tablength-2), lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=(tablength-2);
+ pMsg->iLenMSG -=(tablength-2);
+ p2parse += snaremessage;
+ lenMsg -= snaremessage;
+ *p2parse = ' ';
+ p2parse++;
+ lenMsg--;
+ lenMsg -=(tablength-2);
+ memmove(p2parse, p2parse + (tablength-2), lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=(tablength-2);
+ pMsg->iLenMSG -=(tablength-2);
+ dbgprintf("found a Snare message with snare not set to send syslog messages\n");
+ }
+ } else {
+ /* go back to the beginning of the message */
+ lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */
+ p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */
+ /* skip over timestamp and space*/
+ lenMsg -=17;
+ p2parse +=17;
+ /* skip over what should be the hostname */
+ while(lenMsg && *p2parse != ' ') {
+ --lenMsg;
+ ++p2parse;
+ }
+ if (lenMsg){
+ --lenMsg;
+ ++p2parse;
+ }
+ dbgprintf("pmsnare: separator [%d]'%s' msg after the timestamp and hostname: [%d]'%s'\n", tablength,TabRepresentation,lenMsg, p2parse);
+ if(lenMsg > 13 && strncasecmp((char*) p2parse, "MSWinEventLog", 13) == 0) {
+ snaremessage=13; /* 0 means not a snare message, a number is how long the tag is */
+ }
+ if(lenMsg > 11 && strncasecmp((char*) p2parse, "LinuxKAudit", 11) == 0) {
+ snaremessage=11; /* 0 means not a snare message, a number is how long the tag is */
+ }
+ if(snaremessage) {
+ p2parse += snaremessage;
+ lenMsg -= snaremessage;
+ *p2parse = ' ';
+ p2parse++;
+ lenMsg--;
+ lenMsg -=(tablength-2);
+ memmove(p2parse, p2parse + (tablength-2), lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=(tablength-2);
+ pMsg->iLenMSG -=(tablength-2);
+ dbgprintf("found a Snare message with snare set to send syslog messages\n");
+ }
+
+ }
+ DBGPRINTF("pmsnare: new message: [%d]'%s'\n", lenMsg, pMsg->pszRawMsg + pMsg->offAfterPRI);
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+
+finalize_it:
+ENDparse
+
+
+BEGINmodExit
+CODESTARTmodExit
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_PMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+
+ DBGPRINTF("snare parser init called, compiled with version %s\n", VERSION);
+ bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */
+
+
+ENDmodInit
+
+/* vim:set ai:
+ */
diff --git a/runtime/batch.h b/runtime/batch.h
index d0504f2b..944889bd 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -136,11 +136,16 @@ batchIsValidElem(batch_t *pBatch, int i) {
/* copy one batch element to another.
* This creates a complete duplicate in those cases where
* it is needed. Use duplication only when absolutely necessary!
+ * Note that all working fields are reset to zeros. If that were
+ * not done, we would have potential problems with invalid
+ * or double pointer frees.
* rgerhards, 2010-06-10
*/
static inline void
batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) {
- memcpy(pDest, pSrc, sizeof(batch_obj_t));
+ memset(pDest, 0, sizeof(batch_obj_t));
+ pDest->pUsrp = pSrc->pUsrp;
+ pDest->state = pSrc->state;
}
@@ -171,6 +176,7 @@ batchFree(batch_t *pBatch) {
static inline rsRetVal
batchInit(batch_t *pBatch, int maxElem) {
DEFiRet;
+ pBatch->iDoneUpTo = 0;
pBatch->maxElem = maxElem;
CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
// TODO: replace calloc by inidividual writes?
diff --git a/runtime/datetime.h b/runtime/datetime.h
index 82bd415b..70bbf416 100644
--- a/runtime/datetime.h
+++ b/runtime/datetime.h
@@ -28,6 +28,7 @@
/* the datetime object */
typedef struct datetime_s {
+ char dummy;
} datetime_t;
diff --git a/runtime/errmsg.h b/runtime/errmsg.h
index 799954fb..ac7018b3 100644
--- a/runtime/errmsg.h
+++ b/runtime/errmsg.h
@@ -30,6 +30,7 @@
/* the errmsg object */
typedef struct errmsg_s {
+ char dummy;
} errmsg_t;
diff --git a/runtime/expr.h b/runtime/expr.h
index 974b71ec..1afe1a1f 100644
--- a/runtime/expr.h
+++ b/runtime/expr.h
@@ -30,6 +30,7 @@
/* a node inside an expression tree */
typedef struct exprNode_s {
+ char dummy;
} exprNode_t;
diff --git a/runtime/glbl.c b/runtime/glbl.c
index 7dc17df4..0114b1ac 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -31,6 +31,9 @@
#include "config.h"
#include <stdlib.h>
#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
#include <assert.h>
#include "rsyslog.h"
@@ -40,6 +43,7 @@
#include "glbl.h"
#include "prop.h"
#include "atomic.h"
+#include "errmsg.h"
/* some defaults */
#ifndef DFLT_NETSTRM_DRVR
@@ -49,6 +53,7 @@
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(prop)
+DEFobjCurrIf(errmsg)
/* static data
* For this object, these variables are obviously what makes the "meat" of the
@@ -147,6 +152,35 @@ static void SetGlobalInputTermination(void)
}
+/* This function is used to set the global work directory name.
+ * It verifies that the provided directory actually exists and
+ * emits an error message if not.
+ * rgerhards, 2011-02-16
+ */
+static rsRetVal setWorkDir(void __attribute__((unused)) *pVal, uchar *pNewVal)
+{
+ DEFiRet;
+ struct stat sb;
+
+ if(stat((char*) pNewVal, &sb) != 0) {
+ errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: %s can not be "
+ "accessed, probably does not exist - directive ignored", pNewVal);
+ ABORT_FINALIZE(RS_RET_ERR_WRKDIR);
+ }
+
+ if(!S_ISDIR(sb.st_mode)) {
+ errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: %s not a directory - directive ignored",
+ pNewVal);
+ ABORT_FINALIZE(RS_RET_ERR_WRKDIR);
+ }
+
+ free(pszWorkDir);
+ pszWorkDir = pNewVal;
+
+finalize_it:
+ RETiRet;
+}
+
/* return our local hostname. if it is not set, "[localhost]" is returned
*/
static uchar*
@@ -354,9 +388,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
/* register config handlers (TODO: we need to implement a way to unregister them) */
- CHKiRet(regCfSysLineHdlr((uchar *)"workdirectory", 0, eCmdHdlrGetWord, NULL, &pszWorkDir, NULL, eConfObjGlobal));
+ CHKiRet(regCfSysLineHdlr((uchar *)"workdirectory", 0, eCmdHdlrGetWord, setWorkDir, NULL, NULL, eConfObjGlobal));
CHKiRet(regCfSysLineHdlr((uchar *)"dropmsgswithmaliciousdnsptrrecords", 0, eCmdHdlrBinary, NULL, &bDropMalPTRMsgs, NULL, eConfObjGlobal));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdriver", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvr, NULL, eConfObjGlobal));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercafile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCAF, NULL, eConfObjGlobal));
diff --git a/runtime/modules.h b/runtime/modules.h
index 9569512b..a78cd6f1 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -123,6 +123,7 @@ struct modInfo_s {
rsRetVal (*restoreScope)(void);
} om;
struct { /* data for library modules */
+ char dummy;
} lm;
struct { /* data for parser modules */
rsRetVal (*parse)(msg_t*);
diff --git a/runtime/msg.c b/runtime/msg.c
index ad045dec..70b20749 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -686,6 +686,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
/* initialize members in ORDER they appear in structure (think "cache line"!) */
pM->flowCtlType = 0;
pM->bDoLock = 0;
+ pM->bAlreadyFreed = 0;
pM->iRefCount = 1;
pM->iSeverity = -1;
pM->iFacility = -1;
@@ -813,6 +814,15 @@ CODESTARTobjDestruct(msg)
if(currRefCount == 0)
{
/* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */
+ /* The if below is included to try to nail down a well-hidden bug causing
+ * segfaults. I hope that do to the test code the problem is sooner detected and
+ * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23.
+ * TODO: remove when no longer needed.
+ */
+ if(pThis->bAlreadyFreed)
+ abort();
+ pThis->bAlreadyFreed = 1;
+ /* end debug code */
if(pThis->pszRawMsg != pThis->szRawMsg)
free(pThis->pszRawMsg);
freeTAG(pThis);
diff --git a/runtime/msg.h b/runtime/msg.h
index 1fd95994..01a1e059 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -63,7 +63,8 @@ struct msg {
once data has entered the queue, this property is no longer needed. */
pthread_mutex_t mut;
int iRefCount; /* reference counter (0 = unused) */
- sbool bDoLock; /* use the mutex? */
+ sbool bDoLock; /* use the mutex? */
+ sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */
short iSeverity; /* the severity 0..7 */
short iFacility; /* Facility code 0 .. 23*/
short offAfterPRI; /* offset, at which raw message WITHOUT PRI part starts in pszRawMsg */
diff --git a/runtime/queue.c b/runtime/queue.c
index e4922f37..ef6e843b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -246,6 +246,7 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
if(!pThis->bEnqOnly) {
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
+ DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
} else {
if(getLogicalQueueSize(pThis) == 0) {
@@ -841,6 +842,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
batch_t singleBatch;
batch_obj_t batchObj;
+ int i;
DEFiRet;
//TODO: init batchObj (states _OK and new fields -- CHECK)
@@ -862,6 +864,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
+ /* delete the batch string params: TODO: create its own "class" for this */
+ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
+ free(batchObj.staticActStrings[i]);
+ }
objDestruct(pUsr);
RETiRet;
@@ -1211,7 +1217,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
/* set some water marks so that we have useful defaults if none are set specifically */
pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
-
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1497,7 +1502,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
* now that we dequeue batches of pointers, this is much less an issue...
* rgerhards, 2009-04-22
*/
- if(iQueueSize < pThis->iFullDlyMrk / 2) {
+ if(iQueueSize < pThis->iFullDlyMrk / 2 || glbl.GetGlobalInputTermState() == 1) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
}
@@ -1819,6 +1824,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
uchar pszBuf[64];
+ int wrk;
uchar *qName;
size_t lenBuf;
@@ -1841,7 +1847,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
- pthread_cond_init (&pThis->condDAReady, NULL);
pthread_cond_init (&pThis->notFull, NULL);
pthread_cond_init (&pThis->notEmpty, NULL);
pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL);
@@ -1850,6 +1855,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
+ /* re-adjust some params if required */
+ if(pThis->bIsDA) {
+ /* if we are in DA mode, we must make sure full delayable messages do not
+ * initiate going to disk!
+ */
+ wrk = pThis->iHighWtrMrk - (pThis->iHighWtrMrk / 100) * 50; /* 50% of high water mark */
+ if(wrk < pThis->iFullDlyMrk)
+ pThis->iFullDlyMrk = wrk;
+ }
+
DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, "
"full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
@@ -2129,7 +2144,6 @@ CODESTARTobjDestruct(qqueue)
free(pThis->mut);
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
- pthread_cond_destroy(&pThis->condDAReady);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
diff --git a/runtime/queue.h b/runtime/queue.h
index 38e248cd..97057180 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -124,7 +124,6 @@ struct queue_s {
pthread_cond_t notFull, notEmpty;
pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
- pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
int bThrdStateChanged; /* at least one thread state has changed if 1 */
/* end sync variables */
/* the following variables are always present, because they
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 8baa2b59..bdb1c9ff 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -85,6 +85,12 @@
#include "statsobj.h"
#include "atomic.h"
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+struct sched_param default_sched_param;
+pthread_attr_t default_thread_attr;
+int default_thr_sched_policy;
+#endif
+
/* forward definitions */
static rsRetVal dfltErrLogger(int, uchar *errMsg);
@@ -139,6 +145,18 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
if(iRefCount == 0) {
/* init runtime only if not yet done */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ CHKiRet(pthread_getschedparam(pthread_self(),
+ &default_thr_sched_policy,
+ &default_sched_param));
+ CHKiRet(pthread_attr_init(&default_thread_attr));
+ CHKiRet(pthread_attr_setschedpolicy(&default_thread_attr,
+ default_thr_sched_policy));
+ CHKiRet(pthread_attr_setschedparam(&default_thread_attr,
+ &default_sched_param));
+ CHKiRet(pthread_attr_setinheritsched(&default_thread_attr,
+ PTHREAD_EXPLICIT_SCHED));
+#endif
if(ppErrObj != NULL) *ppErrObj = "obj";
CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */
CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 17b20de2..9d3d0289 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -25,6 +25,7 @@
*/
#ifndef INCLUDED_RSYSLOG_H
#define INCLUDED_RSYSLOG_H
+#include <pthread.h>
#include "typedefs.h"
/* ############################################################# *
@@ -346,6 +347,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_ERR_HDFS_WRITE = -2178, /**< error writing to HDFS */
RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */
RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */
+ RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */
RS_RET_INVLD_CONF_OBJ= -2200, /**< invalid config object (e.g. $Begin conf statement) */
RS_RET_ERR_LIBEE_INIT = -2201, /**< cannot obtain libee ctx */
@@ -427,6 +429,12 @@ typedef enum rsObjectID rsObjID;
#define RSFREEOBJ(x) {(x)->OID = OIDrsFreed; free(x);}
#endif
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+extern struct sched_param default_sched_param;
+extern pthread_attr_t default_thread_attr;
+extern int default_thr_sched_policy;
+#endif
+
/* for the time being, we do our own portability handling here. It
* looks like autotools either does not yet support checks for it, or
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 8162c752..f48c2c2d 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -171,35 +171,40 @@ processBatchMultiRuleset(batch_t *pBatch)
int i;
int iStart; /* start index of partial batch */
int iNew; /* index for new (temporary) batch */
+ int bHaveUnprocessed; /* do we (still) have unprocessed entries? (loop term predicate) */
DEFiRet;
- CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
- snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
-
- while(1) { /* loop broken inside */
+ do {
+ bHaveUnprocessed = 0;
/* search for first unprocessed element */
for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart)
/* just search, no action */;
-
if(iStart == pBatch->nElem)
- FINALIZE; /* everything processed */
+ break; /* everything processed */
/* prepare temporary batch */
+ CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
+ snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
currRuleset = batchElemGetRuleset(pBatch, iStart);
iNew = 0;
for(i = iStart ; i < pBatch->nElem ; ++i) {
if(batchElemGetRuleset(pBatch, i) == currRuleset) {
- batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i]));
+ /* for performance reasons, we copy only those members that we actually need */
+ snglRuleBatch.pElem[iNew].pUsrp = pBatch->pElem[i].pUsrp;
+ snglRuleBatch.pElem[iNew].state = pBatch->pElem[i].state;
+ ++iNew;
/* We indicate the element also as done, so it will not be processed again */
pBatch->pElem[i].state = BATCH_STATE_DISC;
+ } else {
+ bHaveUnprocessed = 1;
}
}
snglRuleBatch.nElem = iNew; /* was left just right by the for loop */
batchSetSingleRuleset(&snglRuleBatch, 1);
/* process temp batch */
processBatch(&snglRuleBatch);
- }
- batchFree(&snglRuleBatch);
+ batchFree(&snglRuleBatch);
+ } while(bHaveUnprocessed == 1);
finalize_it:
RETiRet;
diff --git a/runtime/stream.c b/runtime/stream.c
index 260b59ef..24dbcc09 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -401,6 +401,12 @@ finalize_it:
* If we are monitoring a file, someone may have rotated it. In this case, we
* also need to close it and reopen it under the same name.
* rgerhards, 2008-02-13
+ * The previous code also did a check for file truncation, in which case the
+ * file was considered rewritten. However, this potential border case turned
+ * out to be a big trouble spot on busy systems. It caused massive message
+ * duplication (I guess stat() can return a too-low number under some
+ * circumstances). So starting as of now, we only check the inode number and
+ * a file change is detected only if the inode changes. -- rgerhards, 2011-01-10
*/
static rsRetVal
strmHandleEOFMonitor(strm_t *pThis)
@@ -410,23 +416,18 @@ strmHandleEOFMonitor(strm_t *pThis)
struct stat statName;
ISOBJ_TYPE_assert(pThis, strm);
- /* find inodes of both current descriptor as well as file now in file
- * system. If they are different, the file has been rotated (or
- * otherwise rewritten). We also check the size, because the inode
- * does not change if the file is truncated (this, BTW, is also a case
- * where we actually loose log lines, because we can not do anything
- * against truncation...). We do NOT rely on the time of last
- * modificaton because that may not be available under all
- * circumstances. -- rgerhards, 2008-02-13
- */
if(fstat(pThis->fd, &statOpen) == -1)
ABORT_FINALIZE(RS_RET_IO_ERROR);
if(stat((char*) pThis->pszCurrFName, &statName) == -1)
ABORT_FINALIZE(RS_RET_IO_ERROR);
- if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) {
+ DBGPRINTF("stream checking for file change on '%s', inode %u/%u",
+ pThis->pszCurrFName, (unsigned) statOpen.st_ino,
+ (unsigned) statName.st_ino);
+ if(statOpen.st_ino == statName.st_ino) {
ABORT_FINALIZE(RS_RET_EOF);
} else {
/* we had a file change! */
+ DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName);
CHKiRet(strmCloseFile(pThis));
CHKiRet(strmOpenFile(pThis));
}
@@ -561,39 +562,100 @@ static rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
return RS_RET_OK;
}
-
-/* read a line from a strm file. A line is terminated by LF. The LF is read, but it
- * is not returned in the buffer (it is discared). The caller is responsible for
- * destruction of the returned CStr object! -- rgerhards, 2008-01-07
- * rgerhards, 2008-03-27: I now use the ppCStr directly, without any interim
- * string pointer. The reason is that this function my be called by inputs, which
- * are pthread_killed() upon termination. So if we use their native pointer, they
- * can cleanup (but only then).
+/* read a 'paragraph' from a strm file.
+ * A paragraph may be terminated by a LF, by a LFLF, or by LF<not whitespace> depending on the option set.
+ * The termination LF characters are read, but are
+ * not returned in the buffer (it is discared). The caller is responsible for
+ * destruction of the returned CStr object! -- dlang 2010-12-13
*/
static rsRetVal
-strmReadLine(strm_t *pThis, cstr_t **ppCStr)
+strmReadLine(strm_t *pThis, cstr_t **ppCStr, int mode)
{
- DEFiRet;
- uchar c;
-
- ASSERT(pThis != NULL);
- ASSERT(ppCStr != NULL);
-
- CHKiRet(cstrConstruct(ppCStr));
-
- /* now read the line */
- CHKiRet(strmReadChar(pThis, &c));
- while(c != '\n') {
- CHKiRet(cstrAppendChar(*ppCStr, c));
- CHKiRet(strmReadChar(pThis, &c));
+ /* mode = 0 single line mode (equivalent to ReadLine)
+ * mode = 1 LFLF mode (paragraph, blank line between entries)
+ * mode = 2 LF <not whitespace> mode, a log line starts at the beginning of a line, but following lines that are indented are part of the same log entry
+ * This modal interface is not nearly as flexible as being able to define a regex for when a new record starts, but it's also not nearly as hard (or as slow) to implement
+ */
+ DEFiRet;
+ uchar c;
+ uchar finished;
+
+ ASSERT(pThis != NULL);
+ ASSERT(ppCStr != NULL);
+
+ CHKiRet(cstrConstruct(ppCStr));
+
+ /* now read the line */
+ CHKiRet(strmReadChar(pThis, &c));
+ if (mode == 0){
+ while(c != '\n') {
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ }
+ CHKiRet(cstrFinalize(*ppCStr));
+ }
+ if (mode == 1){
+ finished=0;
+ while(finished == 0){
+ if(c != '\n') {
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ } else {
+ if ((((*ppCStr)->iStrLen) > 0) ){
+ if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] == '\n'){
+ rsCStrTruncate(*ppCStr,1); /* remove the prior newline */
+ finished=1;
+ } else {
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ }
+ } else {
+ finished=1; /* this is a blank line, a \n with nothing since the last complete record */
+ }
+ }
+ }
+ CHKiRet(cstrFinalize(*ppCStr));
+ }
+ if (mode == 2){
+ /* indented follow-up lines */
+ finished=0;
+ while(finished == 0){
+ if ((*ppCStr)->iStrLen == 0){
+ if(c != '\n') {
+ /* nothing in the buffer, and it's not a newline, add it to the buffer */
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ } else {
+ finished=1; /* this is a blank line, a \n with nothing since the last complete record */
+ }
+ } else {
+ if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] != '\n'){
+ /* not the first character after a newline, add it to the buffer */
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ } else {
+ if ((c == ' ') || (c == '\t')){
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ } else {
+ /* clean things up by putting the character we just read back into
+ * the input buffer and removing the LF character that is currently at the
+ * end of the output string */
+ CHKiRet(strmUnreadChar(pThis, c));
+ rsCStrTruncate(*ppCStr,1);
+ finished=1;
+ }
+ }
+ }
+ }
+ CHKiRet(cstrFinalize(*ppCStr));
}
- CHKiRet(cstrFinalize(*ppCStr));
finalize_it:
- if(iRet != RS_RET_OK && *ppCStr != NULL)
- cstrDestruct(ppCStr);
+ if(iRet != RS_RET_OK && *ppCStr != NULL)
+ cstrDestruct(ppCStr);
- RETiRet;
+ RETiRet;
}
@@ -669,7 +731,13 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
pThis->bStopWriter = 0;
- if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
+ if(pthread_create(&pThis->writerThreadID,
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ &default_thread_attr,
+#else
+ NULL,
+#endif
+ asyncWriterThread, pThis) != 0)
DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
} else {
/* we work synchronously, so we need to alloc a fixed pIOBuf */
diff --git a/runtime/stream.h b/runtime/stream.h
index 37e9d570..60c68cb2 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -156,7 +156,6 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName);
rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC);
rsRetVal (*UnreadChar)(strm_t *pThis, uchar c);
- rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr);
rsRetVal (*SeekCurrOffs)(strm_t *pThis);
rsRetVal (*Write)(strm_t *pThis, uchar *pBuf, size_t lenBuf);
rsRetVal (*WriteChar)(strm_t *pThis, uchar c);
@@ -183,8 +182,10 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
INTERFACEpropSetMeth(strm, iSizeLimit, off_t);
INTERFACEpropSetMeth(strm, iFlushInterval, int);
INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
+ /* v6 added */
+ rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, int mode);
ENDinterface(strm)
-#define strmCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */
+#define strmCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/runtime/stringbuf.c b/runtime/stringbuf.c
index f4a9caae..d8c5923b 100644
--- a/runtime/stringbuf.c
+++ b/runtime/stringbuf.c
@@ -185,7 +185,7 @@ rsRetVal
rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded)
{
uchar *pNewBuf;
- unsigned short iNewSize;
+ size_t iNewSize;
DEFiRet;
/* first compute the new size needed */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index ece80911..e615fb19 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -90,6 +90,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_mutex_init(&pThis->mutWtp, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
pthread_attr_init(&pThis->attrThrd);
+ /* Set thread scheduling policy to default */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
+ pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
+ pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED);
+#endif
pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 68e84d53..ec1b0813 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1,6 +1,6 @@
if ENABLE_TESTBENCH
TESTRUNS = rt_init rscript
-check_PROGRAMS = $(TESTRUNS) ourtail nettester tcpflood chkseq msleep randomgen diagtalker uxsockrcvr syslog_caller
+check_PROGRAMS = $(TESTRUNS) ourtail nettester tcpflood chkseq msleep randomgen diagtalker uxsockrcvr syslog_caller syslog_inject
TESTS = $(TESTRUNS) cfg.sh \
arrayqueue.sh \
linkedlistqueue.sh \
@@ -13,7 +13,6 @@ TESTS = $(TESTRUNS) cfg.sh \
rulesetmultiqueue.sh \
manytcp.sh \
rsf_getenv.sh \
- manyptcp.sh \
imtcp_conndrop.sh \
imtcp_addtlframedelim.sh \
sndrcv.sh \
@@ -37,7 +36,6 @@ TESTS = $(TESTRUNS) cfg.sh \
complex1.sh \
queue-persist.sh \
pipeaction.sh \
- uxsock_simple.sh \
execonlyonce.sh \
execonlywhenprevsuspended.sh \
execonlywhenprevsuspended2.sh \
@@ -46,15 +44,23 @@ TESTS = $(TESTRUNS) cfg.sh \
pipe_noreader.sh \
dircreate_dflt.sh \
dircreate_off.sh \
+ imuxsock_logger_root.sh \
+ imuxsock_traillf_root.sh \
+ imuxsock_ccmiddle_root.sh \
queue-persist.sh
if ENABLE_IMPTCP
TESTS += \
+ manyptcp.sh \
imptcp_large.sh \
imptcp_addtlframedelim.sh \
imptcp_conndrop.sh
endif
+if ENABLE_OMUXSOCK
+TESTS += uxsock_simple.sh
+endif
+
if ENABLE_OMUDPSPOOF
TESTS += sndrcv_omudpspoof.sh \
sndrcv_omudpspoof_nonstdpt.sh
@@ -316,6 +322,15 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
testsuites/dircreate_dflt.conf \
dircreate_off.sh \
testsuites/dircreate_off.conf \
+ imuxsock_logger_root.sh \
+ testsuites/imuxsock_logger_root.conf \
+ resultdata/imuxsock_logger.log \
+ imuxsock_traillf_root.sh \
+ testsuites/imuxsock_traillf_root.conf \
+ resultdata/imuxsock_traillf.log \
+ imuxsock_ccmiddle_root.sh \
+ testsuites/imuxsock_ccmiddle_root.conf \
+ resultdata/imuxsock_ccmiddle.log \
cfg.sh
ourtail_SOURCES = ourtail.c
@@ -335,6 +350,9 @@ endif
syslog_caller_SOURCES = syslog_caller.c
syslog_caller_LDADD = $(SOL_LIBS)
+syslog_inject_SOURCES = syslog_inject.c
+syslog_inject_LDADD = $(SOL_LIBS)
+
diagtalker_SOURCES = diagtalker.c
diagtalker_LDADD = $(SOL_LIBS)
diff --git a/tests/imuxsock_ccmiddle_root.sh b/tests/imuxsock_ccmiddle_root.sh
new file mode 100755
index 00000000..b487611a
--- /dev/null
+++ b/tests/imuxsock_ccmiddle_root.sh
@@ -0,0 +1,18 @@
+# note: we must be root and no other syslogd running in order to
+# carry out this test
+echo \[imuxsock_ccmiddle_root.sh\]: test trailing LF handling in imuxsock
+echo This test must be run as root with no other active syslogd
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup imuxsock_ccmiddle_root.conf
+# send a message with trailing LF
+./syslog_inject -c
+# the sleep below is needed to prevent too-early termination of rsyslogd
+./msleep 100
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh wait-shutdown # we need to wait until rsyslogd is finished!
+cmp rsyslog.out.log $srcdir/resultdata/imuxsock_ccmiddle.log
+if [ ! $? -eq 0 ]; then
+echo "imuxsock_ccmiddle_root.sh failed"
+exit 1
+fi;
+source $srcdir/diag.sh exit
diff --git a/tests/imuxsock_logger_root.sh b/tests/imuxsock_logger_root.sh
new file mode 100755
index 00000000..377999f7
--- /dev/null
+++ b/tests/imuxsock_logger_root.sh
@@ -0,0 +1,18 @@
+# note: we must be root and no other syslogd running in order to
+# carry out this test.
+echo \[imuxsock_logger_root.sh\]: test trailing LF handling in imuxsock
+echo This test must be run as root with no other active syslogd
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup imuxsock_logger_root.conf
+# send a message with trailing LF
+logger test
+# the sleep below is needed to prevent too-early termination of rsyslogd
+./msleep 100
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh wait-shutdown # we need to wait until rsyslogd is finished!
+cmp rsyslog.out.log $srcdir/resultdata/imuxsock_logger.log
+if [ ! $? -eq 0 ]; then
+echo "imuxsock_logger.sh failed"
+exit 1
+fi;
+source $srcdir/diag.sh exit
diff --git a/tests/imuxsock_traillf_root.sh b/tests/imuxsock_traillf_root.sh
new file mode 100755
index 00000000..1b821ee7
--- /dev/null
+++ b/tests/imuxsock_traillf_root.sh
@@ -0,0 +1,18 @@
+# note: we must be root and no other syslogd running in order to
+# carry out this test
+echo \[imuxsock_traillf_root.sh\]: test trailing LF handling in imuxsock
+echo This test must be run as root with no other active syslogd
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup imuxsock_traillf_root.conf
+# send a message with trailing LF
+./syslog_inject -l
+# the sleep below is needed to prevent too-early termination of rsyslogd
+./msleep 100
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh wait-shutdown # we need to wait until rsyslogd is finished!
+cmp rsyslog.out.log $srcdir/resultdata/imuxsock_traillf.log
+if [ ! $? -eq 0 ]; then
+echo "imuxsock_traillf_root.sh failed"
+exit 1
+fi;
+source $srcdir/diag.sh exit
diff --git a/tests/resultdata/imuxsock_ccmiddle.log b/tests/resultdata/imuxsock_ccmiddle.log
new file mode 100644
index 00000000..d2531f9d
--- /dev/null
+++ b/tests/resultdata/imuxsock_ccmiddle.log
@@ -0,0 +1 @@
+ test 1#0112
diff --git a/tests/resultdata/imuxsock_logger.log b/tests/resultdata/imuxsock_logger.log
new file mode 100644
index 00000000..883dabdf
--- /dev/null
+++ b/tests/resultdata/imuxsock_logger.log
@@ -0,0 +1 @@
+ test
diff --git a/tests/resultdata/imuxsock_traillf.log b/tests/resultdata/imuxsock_traillf.log
new file mode 100644
index 00000000..883dabdf
--- /dev/null
+++ b/tests/resultdata/imuxsock_traillf.log
@@ -0,0 +1 @@
+ test
diff --git a/tests/syslog_inject.c b/tests/syslog_inject.c
new file mode 100644
index 00000000..a5d77b1a
--- /dev/null
+++ b/tests/syslog_inject.c
@@ -0,0 +1,28 @@
+/* This tool deliberately logs a message with the a trailing LF */
+/* Options:
+ * -l: inject linefeed at end of message
+ * -c: inject control character in middle of message
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <syslog.h>
+#include <string.h>
+
+static inline void usage(void) {
+ fprintf(stderr, "Usage: syslog_inject [-l] [-c]\n");
+ exit(1);
+}
+
+int main(int argc, char *argv[])
+{
+ if(argc != 2)
+ usage();
+ if(!strcmp(argv[1], "-l"))
+ syslog(LOG_NOTICE, "test\n");
+ else if(!strcmp(argv[1], "-c"))
+ syslog(LOG_NOTICE, "test 1\t2");
+ else
+ usage();
+
+ return 0;
+}
diff --git a/tests/testsuites/imuxsock_ccmiddle_root.conf b/tests/testsuites/imuxsock_ccmiddle_root.conf
new file mode 100644
index 00000000..8336ecfa
--- /dev/null
+++ b/tests/testsuites/imuxsock_ccmiddle_root.conf
@@ -0,0 +1,7 @@
+# rgerhards, 2011-02-21
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imuxsock/.libs/imuxsock
+
+$template outfmt,"%msg:%\n"
+*.notice ./rsyslog.out.log;outfmt
diff --git a/tests/testsuites/imuxsock_logger_root.conf b/tests/testsuites/imuxsock_logger_root.conf
new file mode 100644
index 00000000..8336ecfa
--- /dev/null
+++ b/tests/testsuites/imuxsock_logger_root.conf
@@ -0,0 +1,7 @@
+# rgerhards, 2011-02-21
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imuxsock/.libs/imuxsock
+
+$template outfmt,"%msg:%\n"
+*.notice ./rsyslog.out.log;outfmt
diff --git a/tests/testsuites/imuxsock_traillf_root.conf b/tests/testsuites/imuxsock_traillf_root.conf
new file mode 100644
index 00000000..8336ecfa
--- /dev/null
+++ b/tests/testsuites/imuxsock_traillf_root.conf
@@ -0,0 +1,7 @@
+# rgerhards, 2011-02-21
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imuxsock/.libs/imuxsock
+
+$template outfmt,"%msg:%\n"
+*.notice ./rsyslog.out.log;outfmt
diff --git a/threads.c b/threads.c
index 04ccb8d3..11c9a1d8 100644
--- a/threads.c
+++ b/threads.c
@@ -101,12 +101,14 @@ thrdTerminateNonCancel(thrdInfo_t *pThis)
do {
d_pthread_mutex_lock(&pThis->mutThrd);
pthread_kill(pThis->thrdID, SIGTTIN);
- timeoutComp(&tTimeout, 10); /* a fixed 10ms timeout, do after lock (may take long!) */
+ timeoutComp(&tTimeout, 1000); /* a fixed 1sec timeout */
ret = d_pthread_cond_timedwait(&pThis->condThrdTerm, &pThis->mutThrd, &tTimeout);
d_pthread_mutex_unlock(&pThis->mutThrd);
if(Debug) {
if(ret == ETIMEDOUT) {
- dbgprintf("input thread term: had a timeout waiting on thread termination\n");
+ dbgprintf("input thread term: timeout expired waiting on thread termination - canceling\n");
+ pthread_cancel(pThis->thrdID);
+ pThis->bIsActive = 0;
} else if(ret == 0) {
dbgprintf("input thread term: thread returned normally and is terminated\n");
} else {
@@ -194,8 +196,8 @@ static void* thrdStarter(void *arg)
* keep the thread debugger happer, it would not really be necessary with
* the logic we employ...)
*/
- pThis->bIsActive = 0;
d_pthread_mutex_lock(&pThis->mutThrd);
+ pThis->bIsActive = 0;
pthread_cond_signal(&pThis->condThrdTerm);
d_pthread_mutex_unlock(&pThis->mutThrd);
@@ -219,7 +221,13 @@ rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdI
pThis->pUsrThrdMain = thrdMain;
pThis->pAfterRun = afterRun;
pThis->bNeedsCancel = bNeedsCancel;
- pthread_create(&pThis->thrdID, NULL, thrdStarter, pThis);
+ pthread_create(&pThis->thrdID,
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ &default_thread_attr,
+#else
+ NULL,
+#endif
+ thrdStarter, pThis);
CHKiRet(llAppend(&llThrds, NULL, pThis));
finalize_it:
diff --git a/tools/iminternal.c b/tools/iminternal.c
index bd1fa128..167e2b29 100644
--- a/tools/iminternal.c
+++ b/tools/iminternal.c
@@ -89,7 +89,7 @@ finalize_it:
* The interface of this function is modelled after syslogd/logmsg(),
* for which it is an "replacement".
*/
-rsRetVal iminternalAddMsg(int pri, msg_t *pMsg)
+rsRetVal iminternalAddMsg(msg_t *pMsg)
{
DEFiRet;
iminternal_t *pThis;
@@ -98,7 +98,6 @@ rsRetVal iminternalAddMsg(int pri, msg_t *pMsg)
CHKiRet(iminternalConstruct(&pThis));
- pThis->pri = pri;
pThis->pMsg = pMsg;
CHKiRet(llAppend(&llMsgs, NULL, (void*) pThis));
@@ -118,17 +117,15 @@ finalize_it:
* from the list and return it to the caller. The caller is
* responsible for freeing the message!
*/
-rsRetVal iminternalRemoveMsg(int *pPri, msg_t **ppMsg)
+rsRetVal iminternalRemoveMsg(msg_t **ppMsg)
{
DEFiRet;
iminternal_t *pThis;
linkedListCookie_t llCookie = NULL;
- assert(pPri != NULL);
assert(ppMsg != NULL);
CHKiRet(llGetNextElt(&llMsgs, &llCookie, (void*)&pThis));
- *pPri = pThis->pri;
*ppMsg = pThis->pMsg;
pThis->pMsg = NULL; /* we do no longer own it - important for destructor */
diff --git a/tools/iminternal.h b/tools/iminternal.h
index f1062a15..8a9e2506 100644
--- a/tools/iminternal.h
+++ b/tools/iminternal.h
@@ -33,7 +33,6 @@
* The short name is cslch (Configfile SysLine CommandHandler)
*/
struct iminternal_s { /* config file sysline parse entry */
- int pri;
msg_t *pMsg; /* the message (in all its glory) */
};
typedef struct iminternal_s iminternal_t;
@@ -41,8 +40,8 @@ typedef struct iminternal_s iminternal_t;
/* prototypes */
rsRetVal modInitIminternal(void);
rsRetVal modExitIminternal(void);
-rsRetVal iminternalAddMsg(int pri, msg_t *pMsg);
+rsRetVal iminternalAddMsg(msg_t *pMsg);
rsRetVal iminternalHaveMsgReady(int* pbHaveOne);
-rsRetVal iminternalRemoveMsg(int *pPri, msg_t **ppMsg);
+rsRetVal iminternalRemoveMsg(msg_t **ppMsg);
#endif /* #ifndef IMINTERNAL_H_INCLUDED */
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 36b48bde..73995e68 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -571,7 +571,7 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags)
}
if(bHaveMainQueue == 0) { /* not yet in queued mode */
- iminternalAddMsg(pri, pMsg);
+ iminternalAddMsg(pMsg);
} else {
/* we have the queue, so we can simply provide the
* message to the queue engine.
@@ -696,7 +696,6 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu
assert(pBatch != NULL);
pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */
preprocessBatch(pBatch);
-//pBatch->bSingleRuleset = 0; // TODO: testing aid, remove!!!!
ruleset.ProcessBatch(pBatch);
//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
//do not have this yet and so we emulate -- 2010-06-10
@@ -1865,10 +1864,9 @@ void sigttin_handler()
*/
static inline void processImInternal(void)
{
- int iPri;
msg_t *pMsg;
- while(iminternalRemoveMsg(&iPri, &pMsg) == RS_RET_OK) {
+ while(iminternalRemoveMsg(&pMsg) == RS_RET_OK) {
submitMsg(pMsg);
}
}
@@ -2424,13 +2422,46 @@ doGlblProcessInit(void)
*/
exit(1); /* "good" exit - after forking, not diasabling anything */
}
+
num_fds = getdtablesize();
close(0);
/* we keep stdout and stderr open in case we have to emit something */
+ i = 3;
+
+ /* if (sd_booted()) */ {
+ const char *e;
+ char buf[24] = { '\0' };
+ char *p = NULL;
+ unsigned long l;
+ int sd_fds;
+
+ /* fork & systemd socket activation:
+ * fetch listen pid and update to ours,
+ * when it is set to pid of our parent.
+ */
+ if ( (e = getenv("LISTEN_PID"))) {
+ errno = 0;
+ l = strtoul(e, &p, 10);
+ if (errno == 0 && l > 0 && (!p || !*p)) {
+ if (getppid() == (pid_t)l) {
+ snprintf(buf, sizeof(buf), "%d",
+ getpid());
+ setenv("LISTEN_PID", buf, 1);
+ }
+ }
+ }
+
+ /*
+ * close only all further fds, except
+ * of the fds provided by systemd.
+ */
+ sd_fds = sd_listen_fds(0);
+ if (sd_fds > 0)
+ i = SD_LISTEN_FDS_START + sd_fds;
+ }
+ for ( ; i < num_fds; i++)
+ (void) close(i);
- if (sd_listen_fds(0) <= 0)
- for (i = 3; i < num_fds; i++)
- (void) close(i);
untty();
} else {
fputs(" Already running. If you want to run multiple instances, you need "
@@ -2838,7 +2869,7 @@ int realMain(int argc, char **argv)
if(iCompatibilityMode < 4) {
errmsg.LogError(0, NO_ERRCODE, "WARNING: rsyslogd is running in compatibility mode. Automatically "
"generated config directives may interfer with your rsyslog.conf settings. "
- "We suggest upgrading your config and adding -c4 as the first "
+ "We suggest upgrading your config and adding -c5 as the first "
"rsyslogd option.");
}