summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-08-21 10:37:59 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-08-21 10:37:59 +0200
commit596383988792635f128cb645c9b7c8aae50453e6 (patch)
tree5e5ba6dc03b6e9b687b09f49828b5a1fcdda1cda
parent8624f937523c1ca78cc1d78b8eba18f628ae9ab5 (diff)
parentdaa76ad94428599336ddafdd6854dc0b71356180 (diff)
downloadrsyslog-596383988792635f128cb645c9b7c8aae50453e6.tar.gz
rsyslog-596383988792635f128cb645c9b7c8aae50453e6.tar.xz
rsyslog-596383988792635f128cb645c9b7c8aae50453e6.zip
Merge branch 'v4-devel' into v4-beta
-rw-r--r--ChangeLog34
-rw-r--r--action.c3
-rw-r--r--configure.ac2
-rw-r--r--doc/imtcp.html10
-rw-r--r--doc/manual.html6
-rw-r--r--doc/rsyslog_conf_global.html7
-rw-r--r--plugins/imtcp/imtcp.c5
-rw-r--r--runtime/apc.c2
-rw-r--r--runtime/datetime.c56
-rw-r--r--runtime/debug.c4
-rw-r--r--runtime/debug.h3
-rw-r--r--runtime/msg.c319
-rw-r--r--runtime/msg.h4
-rw-r--r--runtime/rule.c7
-rw-r--r--runtime/stream.c445
-rw-r--r--runtime/stream.h19
-rw-r--r--runtime/wti.c1
-rw-r--r--runtime/wtp.c7
-rw-r--r--tcpclt.c14
-rw-r--r--tcpclt.h6
-rw-r--r--tcpsrv.c47
-rw-r--r--tcpsrv.h7
-rwxr-xr-xtests/diag.sh2
-rwxr-xr-xtests/killrsyslog.sh2
-rw-r--r--tests/nettester.c2
-rw-r--r--tests/tcpflood.c19
-rw-r--r--tests/testsuites/upcase-date.parse14
-rw-r--r--tools/omfile.c7
-rw-r--r--tools/omfwd.c6
-rw-r--r--tools/syslogd.c8
30 files changed, 698 insertions, 360 deletions
diff --git a/ChangeLog b/ChangeLog
index eeaab08a..4fb45a12 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,31 @@
---------------------------------------------------------------------------
-Version 4.5.1 [DEVEL] (rgerhards), 2009-07-??
+Version 4.5.2 [DEVEL] (rgerhards), 2009-07-??
+- legacy syslog parser changed so that it now accepts date stamps in
+ wrong case. Some devices seem to create them and I do not see any harm
+ in supporting that.
+- added $InputTCPMaxListeners directive - permits to specify how many
+ TCP servers shall be possible (default is 20).
+- bugfix: memory leak with some input modules. Those inputs that
+ use parseAndSubmitMsg() leak two small memory blocks with every message.
+ Typically, those process only relatively few messages, so the issue
+ does most probably not have any effect in practice.
+- bugfix: if tcp listen port could not be created, no error message was
+ emitted
+- bugfix: potential segfault in output file writer (omfile)
+ In async write mode, we use modular arithmetic to index the output
+ buffer array. However, the counter variables accidently were signed,
+ thus resulting in negative indizes after integer overflow. That in turn
+ could lead to segfaults, but was depending on the memory layout of
+ the instance in question (which in turn depended on a number of
+ variables, like compile settings but also configuration). The counters
+ are now unsigned (as they always should have been) and so the dangling
+ mis-indexing does no longer happen. This bug potentially affected all
+ installations, even if only some may actually have seen a segfault.
+- bugfix: hostnames with dashes in them were incorrectly treated as
+ malformed, thus causing them to be treated as TAG (this was a regression
+ introduced from the "rfc3164 strict" change in 4.5.0).
+---------------------------------------------------------------------------
+Version 4.5.1 [DEVEL] (rgerhards), 2009-07-15
- CONFIG CHANGE: $HUPisRestart default is now "off". We are doing this
to support removal of restart-type HUP in v5.
- bugfix: fromhost-ip was sometimes truncated
@@ -16,6 +42,12 @@ Version 4.5.1 [DEVEL] (rgerhards), 2009-07-??
- bugfix: message could be truncated after TAG, often when forwarding
This was a result of an internal processing error if maximum field
sizes had been specified in the property replacer.
+- added ability for the TCP output action to "rebind" its send socket after
+ sending n messages (actually, it re-opens the connection, the name is
+ used because this is a concept very similiar to $ActionUDPRebindInterval).
+ New config directive $ActionSendTCPRebindInterval added for the purpose.
+ By default, rebinding is disabled. This is considered useful for load
+ balancers.
- testbench improvements
---------------------------------------------------------------------------
Version 4.5.0 [DEVEL] (rgerhards), 2009-07-02
diff --git a/action.c b/action.c
index f21feea6..bcb23659 100644
--- a/action.c
+++ b/action.c
@@ -43,6 +43,7 @@
#include "srUtils.h"
#include "errmsg.h"
#include "datetime.h"
+#include "unicode-helper.h"
/* forward definitions */
rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
@@ -780,7 +781,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
/* suppress duplicate messages */
if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
(pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
- !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
+ !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
!strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
!strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) &&
!strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) {
diff --git a/configure.ac b/configure.ac
index 2fa496c7..8709c25a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-AC_INIT([rsyslog],[4.5.0],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[4.5.1],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
AC_CONFIG_SRCDIR([ChangeLog])
AC_CONFIG_MACRO_DIR([m4])
diff --git a/doc/imtcp.html b/doc/imtcp.html
index 9ea7efa1..5217634e 100644
--- a/doc/imtcp.html
+++ b/doc/imtcp.html
@@ -43,15 +43,19 @@ can be found at the <a href="http://www.rsyslog.com/Article321.phtml">Cisco tcp
page.
<li>$InputTCPServerRun &lt;port&gt;<br>
Starts a TCP server on selected port</li>
-<li><ul><li>$InputTCPMaxSessions &lt;number&gt;</li></ul>
-Sets the maximum number of sessions supported</li><li>$InputTCPServerStreamDriverMode &lt;number&gt;<br>
+<li>$InputTCPMaxListeners &lt;number&gt;<br>
+Sets the maximum number of listeners (server ports) supported. Default is 20. This must be set before the first $InputTCPServerRun directive.</li>
+<li>$InputTCPMaxSessions &lt;number&gt;<br>
+Sets the maximum number of sessions supported. Default is 200. This must be set before the first $InputTCPServerRun directive</li>
+<li>$InputTCPServerStreamDriverMode &lt;number&gt;<br>
Sets the driver mode for the currently selected <a href="netstream.html">network stream driver</a>. &lt;number&gt; is driver specifc.</li>
<li>$InputTCPServerInputName &lt;name&gt;<br>
Sets a name for the inputname property. If no name is set "imtcp" is used by default. Setting a
name is not strictly necessary, but can be useful to apply filtering based on which input
the message was received from.
<li>$InputTCPServerStreamDriverAuthMode &lt;mode-string&gt;<br>
-Sets the authentication mode for the currently selected <a href="netstream.html">network stream driver</a>. &lt;mode-string&gt; is driver specifc.</li><li>$InputTCPServerStreamDriverPermittedPeer &lt;id-string&gt;<br>
+Sets the authentication mode for the currently selected <a href="netstream.html">network stream driver</a>. &lt;mode-string&gt; is driver specifc.</li>
+<li>$InputTCPServerStreamDriverPermittedPeer &lt;id-string&gt;<br>
Sets permitted peer IDs. Only these peers are able to connect to the
listener. &lt;id-string&gt; semantics depend on the currently selected
AuthMode and&nbsp; <a href="netstream.html">network stream driver</a>. PermittedPeers may not be set in anonymous modes.</li>
diff --git a/doc/manual.html b/doc/manual.html
index d473f485..0fb91e1d 100644
--- a/doc/manual.html
+++ b/doc/manual.html
@@ -19,9 +19,9 @@ rsyslog support</a> available directly from the source!</p>
<p><b>Please visit the <a href="http://www.rsyslog.com/sponsors">rsyslog sponsor's page</a>
to honor the project sponsors or become one yourself!</b> We are very grateful for any help towards the
project goals.</p>
-<p><b>This documentation is for version 4.5.0 (beta branch) of rsyslog.</b>
-Visit the <i> <a href="http://www.rsyslog.com/doc-status.html">rsyslog status page</a></i></b> to obtain current
-version information and project status.
+<p><b>This documentation is for version 4.5.1 (beta branch) of rsyslog.</b>
+Visit the <i><a href="http://www.rsyslog.com/doc-status.html">rsyslog status page</a></i></b>
+to obtain current version information and project status.
</p><p><b>If you like rsyslog, you might
want to lend us a helping hand. </b>It doesn't require a lot of
time - even a single mouse click helps. Learn <a href="how2help.html">how to help the rsyslog project</a>.
diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html
index 1fe72c5f..2bbb136e 100644
--- a/doc/rsyslog_conf_global.html
+++ b/doc/rsyslog_conf_global.html
@@ -96,6 +96,13 @@ default 60000 (1 minute)]</li>
(driver-specific)</li><li>$ActionSendStreamDriverAuthMode &lt;mode&gt;,&nbsp; authentication mode to use with the stream driver
(driver-specific)</li><li>$ActionSendStreamDriverPermittedPeer &lt;ID&gt;,&nbsp; accepted fingerprint (SHA1) or name of remote peer
(driver-specific) -<span style="font-weight: bold;"> directive may go away</span>!</li>
+<li><b>$ActionSendTCPRebindInterval</b> nbr</a>- [available since 4.5.1] - instructs the TCP send
+action to close and re-open the connection to the remote host every nbr of messages sent.
+Zero, the default, means that no such processing is done. This directive is useful for
+use with load-balancers. Note that there is some performance overhead associated with it,
+so it is advisable to not too often &quot;rebind&quot; the connection (what
+&quot;too often&quot; actually means depends on your configuration, a rule of thumb is
+that it should be not be much more often than once per second).</li>
<li><b>$ActionSendUDPRebindInterval</b> nbr</a>- [available since 4.3.2] - instructs the UDP send
action to rebind the send socket every nbr of messages sent. Zero, the default, means
that no rebind is done. This directive is useful for use with load-balancers.</li>
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index e1f513c8..01c4bc33 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -82,6 +82,7 @@ static permittedPeers_t *pPermPeersRoot = NULL;
/* config settings */
static int iTCPSessMax = 200; /* max number of sessions */
+static int iTCPLstnMax = 20; /* max number of sessions */
static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */
static int iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; /* addtl frame delimiter, e.g. for netscreen, default none */
static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */
@@ -188,6 +189,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
if(pOurTcpsrv == NULL) {
CHKiRet(tcpsrv.Construct(&pOurTcpsrv));
CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax));
+ CHKiRet(tcpsrv.SetLstnMax(pOurTcpsrv, iTCPLstnMax));
CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost));
CHKiRet(tcpsrv.SetCBRcvData(pOurTcpsrv, doRcvData));
CHKiRet(tcpsrv.SetCBOpenLstnSocks(pOurTcpsrv, doOpenLstnSocks));
@@ -273,6 +275,7 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
iTCPSessMax = 200;
+ iTCPLstnMax = 20;
iStrmDrvrMode = 0;
iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(pszInputName);
@@ -308,6 +311,8 @@ CODEmodInit_QueryRegCFSLineHdlr
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt,
NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt,
+ NULL, &iTCPLstnMax, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverstreamdrivermode"), 0,
eCmdHdlrInt, NULL, &iStrmDrvrMode, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverstreamdriverauthmode"), 0,
diff --git a/runtime/apc.c b/runtime/apc.c
index 5919191d..bc330e39 100644
--- a/runtime/apc.c
+++ b/runtime/apc.c
@@ -335,9 +335,11 @@ CancelApc(apc_id_t id)
{
DEFVARS_mutexProtection_uncond;
+ BEGINfunc
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
deleteApc(id);
END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
+ ENDfunc
return RS_RET_OK;
}
diff --git a/runtime/datetime.c b/runtime/datetime.c
index 2db1d3c5..dfa56b4f 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -335,6 +335,10 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
* We will use this for parsing, as it probably is the
* fastest way to parse it.
*
+ * 2009-08-17: we now do case-insensitive comparisons, as some devices obviously do not
+ * obey to the RFC-specified case. As we need to guess in any case, we can ignore case
+ * in the first place -- rgerhards
+ *
* 2005-07-18, well sometimes it pays to be a bit more verbose, even in C...
* Fixed a bug that lead to invalid detection of the data. The issue was that
* we had an if(++pszTS == 'x') inside of some of the consturcts below. However,
@@ -346,20 +350,21 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
*/
switch(*pszTS++)
{
+ case 'j':
case 'J':
- if(*pszTS == 'a') {
+ if(*pszTS == 'a' || *pszTS == 'A') {
++pszTS;
- if(*pszTS == 'n') {
+ if(*pszTS == 'n' || *pszTS == 'N') {
++pszTS;
month = 1;
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
- } else if(*pszTS == 'u') {
+ } else if(*pszTS == 'u' || *pszTS == 'U') {
++pszTS;
- if(*pszTS == 'n') {
+ if(*pszTS == 'n' || *pszTS == 'N') {
++pszTS;
month = 6;
- } else if(*pszTS == 'l') {
+ } else if(*pszTS == 'l' || *pszTS == 'L') {
++pszTS;
month = 7;
} else
@@ -367,10 +372,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
break;
+ case 'f':
case 'F':
- if(*pszTS == 'e') {
+ if(*pszTS == 'e' || *pszTS == 'E') {
++pszTS;
- if(*pszTS == 'b') {
+ if(*pszTS == 'b' || *pszTS == 'B') {
++pszTS;
month = 2;
} else
@@ -378,13 +384,14 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
break;
+ case 'm':
case 'M':
- if(*pszTS == 'a') {
+ if(*pszTS == 'a' || *pszTS == 'A') {
++pszTS;
- if(*pszTS == 'r') {
+ if(*pszTS == 'r' || *pszTS == 'R') {
++pszTS;
month = 3;
- } else if(*pszTS == 'y') {
+ } else if(*pszTS == 'y' || *pszTS == 'Y') {
++pszTS;
month = 5;
} else
@@ -392,17 +399,18 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
break;
+ case 'a':
case 'A':
- if(*pszTS == 'p') {
+ if(*pszTS == 'p' || *pszTS == 'P') {
++pszTS;
- if(*pszTS == 'r') {
+ if(*pszTS == 'r' || *pszTS == 'R') {
++pszTS;
month = 4;
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
- } else if(*pszTS == 'u') {
+ } else if(*pszTS == 'u' || *pszTS == 'U') {
++pszTS;
- if(*pszTS == 'g') {
+ if(*pszTS == 'g' || *pszTS == 'G') {
++pszTS;
month = 8;
} else
@@ -410,10 +418,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
break;
+ case 's':
case 'S':
- if(*pszTS == 'e') {
+ if(*pszTS == 'e' || *pszTS == 'E') {
++pszTS;
- if(*pszTS == 'p') {
+ if(*pszTS == 'p' || *pszTS == 'P') {
++pszTS;
month = 9;
} else
@@ -421,10 +430,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
break;
+ case 'o':
case 'O':
- if(*pszTS == 'c') {
+ if(*pszTS == 'c' || *pszTS == 'C') {
++pszTS;
- if(*pszTS == 't') {
+ if(*pszTS == 't' || *pszTS == 'T') {
++pszTS;
month = 10;
} else
@@ -432,10 +442,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
break;
+ case 'n':
case 'N':
- if(*pszTS == 'o') {
+ if(*pszTS == 'o' || *pszTS == 'O') {
++pszTS;
- if(*pszTS == 'v') {
+ if(*pszTS == 'v' || *pszTS == 'V') {
++pszTS;
month = 11;
} else
@@ -443,10 +454,11 @@ ParseTIMESTAMP3164(struct syslogTime *pTime, uchar** ppszTS)
} else
ABORT_FINALIZE(RS_RET_INVLD_TIME);
break;
+ case 'd':
case 'D':
- if(*pszTS == 'e') {
+ if(*pszTS == 'e' || *pszTS == 'E') {
++pszTS;
- if(*pszTS == 'c') {
+ if(*pszTS == 'c' || *pszTS == 'C') {
++pszTS;
month = 12;
} else
diff --git a/runtime/debug.c b/runtime/debug.c
index 4ee90226..807fd3f7 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -732,6 +732,8 @@ static void dbgGetThrdName(char *pszBuf, size_t lenBuf, pthread_t thrd, int bInc
*/
void dbgSetThrdName(uchar *pszName)
{
+return;
+
dbgThrdInfo_t *pThrd = dbgGetThrdInfo();
if(pThrd->pszThrdName != NULL)
free(pThrd->pszThrdName);
@@ -776,7 +778,7 @@ static void dbgCallStackPrint(dbgThrdInfo_t *pThrd)
/* print all threads call stacks
*/
-static void dbgCallStackPrintAll(void)
+void dbgCallStackPrintAll(void)
{
dbgThrdInfo_t *pThrd;
/* stack info */
diff --git a/runtime/debug.h b/runtime/debug.h
index 1375493d..dcbfb930 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -134,8 +134,7 @@ void dbgPrintAllDebugInfo(void);
/* debug aides */
-//#ifdef RTINST
-#if 0 // temporarily removed for helgrind
+#ifdef RTINST
#define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
diff --git a/runtime/msg.c b/runtime/msg.c
index d29da560..de298871 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -1159,16 +1159,16 @@ int getMSGLen(msg_t *pM)
return((pM == NULL) ? 0 : pM->iLenMSG);
}
-char *getMSG(msg_t *pM)
+uchar *getMSG(msg_t *pM)
{
- char *ret;
+ uchar *ret;
if(pM == NULL)
- ret = "";
+ ret = UCHAR_CONSTANT("");
else {
if(pM->offMSG == -1)
- ret = "";
+ ret = UCHAR_CONSTANT("");
else
- ret = (char*)(pM->pszRawMsg + pM->offMSG);
+ ret = pM->pszRawMsg + pM->offMSG;
}
return ret;
}
@@ -1613,21 +1613,23 @@ static inline void tryEmulateTAG(msg_t *pM, bool bLockMutex)
}
-static inline char *getTAG(msg_t *pM)
+static inline void
+getTAG(msg_t *pM, uchar **ppBuf, int *piLen)
{
- char *ret;
-
- if(pM == NULL)
- ret = "";
- else {
+ if(pM == NULL) {
+ *ppBuf = UCHAR_CONSTANT("");
+ *piLen = 0;
+ } else {
if(pM->iLenTAG == 0)
tryEmulateTAG(pM, LOCK_MUTEX);
- if(pM->iLenTAG == 0)
- ret = "";
- else
- ret = (char*) ((pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG);
+ if(pM->iLenTAG == 0) {
+ *ppBuf = UCHAR_CONSTANT("");
+ *piLen = 0;
+ } else {
+ *ppBuf = (pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG;
+ *piLen = pM->iLenTAG;
+ }
}
- return(ret);
}
@@ -2130,14 +2132,14 @@ static uchar *getNOW(eNOWType eNow)
* be used in selector line processing.
* rgerhards 2005-09-15
*/
-char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
+uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propID, size_t *pPropLen,
unsigned short *pbMustBeFreed)
{
- char *pRes; /* result pointer */
+ uchar *pRes; /* result pointer */
int bufLen = -1; /* length of string or -1, if not known */
- char *pBufStart;
- char *pBuf;
+ uchar *pBufStart;
+ uchar *pBuf;
int iLen;
short iOffs;
@@ -2159,16 +2161,16 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
bufLen = getMSGLen(pMsg);
break;
case PROP_TIMESTAMP:
- pRes = getTimeReported(pMsg, pTpe->data.field.eDateFormat);
+ pRes = (uchar*)getTimeReported(pMsg, pTpe->data.field.eDateFormat);
break;
case PROP_HOSTNAME:
- pRes = getHOSTNAME(pMsg);
+ pRes = (uchar*)getHOSTNAME(pMsg);
break;
case PROP_SYSLOGTAG:
- pRes = getTAG(pMsg);
+ getTAG(pMsg, &pRes, &bufLen);
break;
case PROP_RAWMSG:
- pRes = getRawMsg(pMsg);
+ pRes = (uchar*)getRawMsg(pMsg);
break;
/* enable this, if someone actually uses UxTradMsg, delete after some time has
* passed and nobody complained -- rgerhards, 2009-06-16
@@ -2177,120 +2179,120 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
break;
*/
case PROP_INPUTNAME:
- getInputName(pMsg, ((uchar**) &pRes), &bufLen);
+ getInputName(pMsg, &pRes, &bufLen);
break;
case PROP_FROMHOST:
- pRes = (char*) getRcvFrom(pMsg);
+ pRes = getRcvFrom(pMsg);
break;
case PROP_FROMHOST_IP:
- pRes = (char*) getRcvFromIP(pMsg);
+ pRes = getRcvFromIP(pMsg);
break;
case PROP_PRI:
- pRes = getPRI(pMsg);
+ pRes = (uchar*)getPRI(pMsg);
break;
case PROP_PRI_TEXT:
- pBuf = malloc(20 * sizeof(char));
+ pBuf = malloc(20 * sizeof(uchar));
if(pBuf == NULL) {
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else {
*pbMustBeFreed = 1;
- pRes = textpri(pBuf, 20, getPRIi(pMsg));
+ pRes = (uchar*)textpri((char*)pBuf, 20, getPRIi(pMsg));
}
break;
case PROP_IUT:
- pRes = "1"; /* always 1 for syslog messages (a MonitorWare thing;)) */
+ pRes = UCHAR_CONSTANT("1"); /* always 1 for syslog messages (a MonitorWare thing;)) */
break;
case PROP_SYSLOGFACILITY:
- pRes = getFacility(pMsg);
+ pRes = (uchar*)getFacility(pMsg);
break;
case PROP_SYSLOGFACILITY_TEXT:
- pRes = getFacilityStr(pMsg);
+ pRes = (uchar*)getFacilityStr(pMsg);
break;
case PROP_SYSLOGSEVERITY:
- pRes = getSeverity(pMsg);
+ pRes = (uchar*)getSeverity(pMsg);
break;
case PROP_SYSLOGSEVERITY_TEXT:
- pRes = getSeverityStr(pMsg);
+ pRes = (uchar*)getSeverityStr(pMsg);
break;
case PROP_TIMEGENERATED:
- pRes = getTimeGenerated(pMsg, pTpe->data.field.eDateFormat);
+ pRes = (uchar*)getTimeGenerated(pMsg, pTpe->data.field.eDateFormat);
break;
case PROP_PROGRAMNAME:
- pRes = getProgramName(pMsg, LOCK_MUTEX);
+ pRes = (uchar*)getProgramName(pMsg, LOCK_MUTEX);
break;
case PROP_PROTOCOL_VERSION:
- pRes = getProtocolVersionString(pMsg);
+ pRes = (uchar*)getProtocolVersionString(pMsg);
break;
case PROP_STRUCTURED_DATA:
- pRes = getStructuredData(pMsg);
+ pRes = (uchar*)getStructuredData(pMsg);
break;
case PROP_APP_NAME:
- pRes = getAPPNAME(pMsg, LOCK_MUTEX);
+ pRes = (uchar*)getAPPNAME(pMsg, LOCK_MUTEX);
break;
case PROP_PROCID:
- pRes = getPROCID(pMsg, LOCK_MUTEX);
+ pRes = (uchar*)getPROCID(pMsg, LOCK_MUTEX);
break;
case PROP_MSGID:
- pRes = getMSGID(pMsg);
+ pRes = (uchar*)getMSGID(pMsg);
break;
case PROP_SYS_NOW:
- if((pRes = (char*) getNOW(NOW_NOW)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_NOW)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_YEAR:
- if((pRes = (char*) getNOW(NOW_YEAR)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_YEAR)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_MONTH:
- if((pRes = (char*) getNOW(NOW_MONTH)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_MONTH)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_DAY:
- if((pRes = (char*) getNOW(NOW_DAY)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_DAY)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_HOUR:
- if((pRes = (char*) getNOW(NOW_HOUR)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_HOUR)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_HHOUR:
- if((pRes = (char*) getNOW(NOW_HHOUR)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_HHOUR)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_QHOUR:
- if((pRes = (char*) getNOW(NOW_QHOUR)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_QHOUR)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_MINUTE:
- if((pRes = (char*) getNOW(NOW_MINUTE)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_MINUTE)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_MYHOSTNAME:
- pRes = (char*) glbl.GetLocalHostName();
+ pRes = glbl.GetLocalHostName();
break;
default:
/* there is no point in continuing, we may even otherwise render the
* error message unreadable. rgerhards, 2007-07-10
*/
dbgprintf("invalid property id: '%d'\n", propID);
- return "**INVALID PROPERTY NAME**";
+ return UCHAR_CONSTANT("**INVALID PROPERTY NAME**");
}
@@ -2310,8 +2312,8 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
if(pTpe->data.field.has_fields == 1) {
size_t iCurrFld;
- char *pFld;
- char *pFldEnd;
+ uchar *pFld;
+ uchar *pFldEnd;
/* first, skip to the field in question. The field separator
* is always one character and is stored in the template entry.
*/
@@ -2349,7 +2351,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
/* now copy */
memcpy(pBuf, pFld, iLen);
@@ -2366,12 +2368,12 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**FIELD NOT FOUND**";
+ return UCHAR_CONSTANT("**FIELD NOT FOUND**");
}
} else if(pTpe->data.field.iFromPos != 0 || pTpe->data.field.iToPos != 0) {
/* we need to obtain a private copy */
int iFrom, iTo;
- char *pSb;
+ uchar *pSb;
iFrom = pTpe->data.field.iFromPos;
iTo = pTpe->data.field.iToPos;
/* need to zero-base to and from (they are 1-based!) */
@@ -2379,44 +2381,55 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
--iFrom;
if(iTo > 0)
--iTo;
- iLen = iTo - iFrom + 1; /* the +1 is for an actual char, NOT \0! */
- pBufStart = pBuf = malloc((iLen + 1) * sizeof(char));
- if(pBuf == NULL) {
- if(*pbMustBeFreed == 1)
- free(pRes);
- *pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
- }
- pSb = pRes;
- if(iFrom) {
- /* skip to the start of the substring (can't do pointer arithmetic
- * because the whole string might be smaller!!)
- */
- while(*pSb && iFrom) {
- --iFrom;
+ if(bufLen == -1)
+ bufLen = ustrlen(pRes);
+ if(iFrom == 0 && iTo >= bufLen) {
+ /* in this case, the requested string is a superset of what we already have,
+ * so there is no need to do any processing. This is a frequent case for size-limited
+ * fields like TAG in the default forwarding template (so it is a useful optimization
+ * to check for this condition ;)). -- rgerhards, 2009-07-09
+ */
+ ; /*DO NOTHING*/
+ } else {
+ iLen = iTo - iFrom + 1; /* the +1 is for an actual char, NOT \0! */
+ pBufStart = pBuf = malloc((iLen + 1) * sizeof(char));
+ if(pBuf == NULL) {
+ if(*pbMustBeFreed == 1)
+ free(pRes);
+ *pbMustBeFreed = 0;
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
+ }
+ pSb = pRes;
+ if(iFrom) {
+ /* skip to the start of the substring (can't do pointer arithmetic
+ * because the whole string might be smaller!!)
+ */
+ while(*pSb && iFrom) {
+ --iFrom;
+ ++pSb;
+ }
+ }
+ /* OK, we are at the begin - now let's copy... */
+ bufLen = iLen;
+ while(*pSb && iLen) {
+ *pBuf++ = *pSb;
++pSb;
+ --iLen;
}
+ *pBuf = '\0';
+ bufLen -= iLen; /* subtract remaining length if the string was smaller! */
+ if(*pbMustBeFreed == 1)
+ free(pRes);
+ pRes = pBufStart;
+ *pbMustBeFreed = 1;
}
- /* OK, we are at the begin - now let's copy... */
- bufLen = iLen;
- while(*pSb && iLen) {
- *pBuf++ = *pSb;
- ++pSb;
- --iLen;
- }
- *pBuf = '\0';
- bufLen -= iLen; /* subtract remaining length if the string was smaller! */
- if(*pbMustBeFreed == 1)
- free(pRes);
- pRes = pBufStart;
- *pbMustBeFreed = 1;
#ifdef FEATURE_REGEXP
} else {
/* Check for regular expressions */
if (pTpe->data.field.has_regex != 0) {
if (pTpe->data.field.has_regex == 2)
/* Could not compile regex before! */
- return "**NO MATCH** **BAD REGULAR EXPRESSION**";
+ return UCHAR_CONSTANT("**NO MATCH** **BAD REGULAR EXPRESSION**");
dbgprintf("string to match for regex is: %s\n", pRes);
@@ -2429,7 +2442,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
while(!bFound) {
int iREstat;
- iREstat = regexp.regexec(&pTpe->data.field.re, pRes + iOffs, nmatch, pmatch, 0);
+ iREstat = regexp.regexec(&pTpe->data.field.re, (char*)(pRes + iOffs), nmatch, pmatch, 0);
dbgprintf("regexec return is %d\n", iREstat);
if(iREstat == 0) {
if(pmatch[0].rm_so == -1) {
@@ -2457,11 +2470,11 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pbMustBeFreed = 0;
}
if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR)
- return "**NO MATCH**";
+ return UCHAR_CONSTANT("**NO MATCH**");
else if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_ZERO)
- return "0";
+ return UCHAR_CONSTANT("0");
else
- return "";
+ return UCHAR_CONSTANT("");
}
} else {
/* Match- but did it match the one we wanted? */
@@ -2473,24 +2486,24 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pbMustBeFreed = 0;
}
if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR)
- return "**NO MATCH**";
+ return UCHAR_CONSTANT("**NO MATCH**");
else
- return "";
+ return UCHAR_CONSTANT("");
}
}
/* OK, we have a usable match - we now need to malloc pB */
int iLenBuf;
- char *pB;
+ uchar *pB;
iLenBuf = pmatch[pTpe->data.field.iSubMatchToUse].rm_eo
- pmatch[pTpe->data.field.iSubMatchToUse].rm_so;
- pB = (char *) malloc((iLenBuf + 1) * sizeof(char));
+ pB = malloc((iLenBuf + 1) * sizeof(uchar));
if (pB == NULL) {
if (*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY ALLOCATING pBuf**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
/* Lets copy the matched substring to the buffer */
@@ -2513,7 +2526,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
free(pRes);
*pbMustBeFreed = 0;
}
- return "***REGEXP NOT AVAILABLE***";
+ return UCHAR_CONSTANT("***REGEXP NOT AVAILABLE***");
}
}
#endif /* #ifdef FEATURE_REGEXP */
@@ -2525,7 +2538,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
uchar cFirst = *pRes; /* save first char */
if(*pbMustBeFreed == 1)
free(pRes);
- pRes = (cFirst == ' ') ? "" : " ";
+ pRes = (cFirst == ' ') ? UCHAR_CONSTANT("") : UCHAR_CONSTANT(" ");
bufLen = (cFirst == ' ') ? 0 : 1;
*pbMustBeFreed = 0;
}
@@ -2537,21 +2550,21 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(pTpe->data.field.eCaseConv != tplCaseConvNo) {
/* we need to obtain a private copy */
if(bufLen == -1)
- bufLen = strlen(pRes);
- char *pBStart;
- char *pB;
- char *pSrc;
+ bufLen = ustrlen(pRes);
+ uchar *pBStart;
+ uchar *pB;
+ uchar *pSrc;
pBStart = pB = malloc((bufLen + 1) * sizeof(char));
if(pB == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
pSrc = pRes;
while(*pSrc) {
*pB++ = (pTpe->data.field.eCaseConv == tplCaseConvUpper) ?
- (char)toupper((int)*pSrc) : (char)tolower((int)*pSrc);
+ (uchar)toupper((int)*pSrc) : (uchar)tolower((int)*pSrc);
/* currently only these two exist */
++pSrc;
}
@@ -2575,10 +2588,10 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
if(pTpe->data.field.options.bDropCC) {
int iLenBuf = 0;
- char *pSrc = pRes;
- char *pDstStart;
- char *pDst;
- char bDropped = 0;
+ uchar *pSrc = pRes;
+ uchar *pDstStart;
+ uchar *pDst;
+ uchar bDropped = 0;
while(*pSrc) {
if(!iscntrl((int) *pSrc++))
@@ -2593,7 +2606,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
for(pSrc = pRes; *pSrc; pSrc++) {
if(!iscntrl((int) *pSrc))
@@ -2607,9 +2620,9 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pbMustBeFreed = 1;
}
} else if(pTpe->data.field.options.bSpaceCC) {
- char *pSrc;
- char *pDstStart;
- char *pDst;
+ uchar *pSrc;
+ uchar *pDstStart;
+ uchar *pDst;
if(*pbMustBeFreed == 1) {
/* in this case, we already work on dynamic
@@ -2623,13 +2636,13 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
} else {
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
pDst = pDstStart = malloc(bufLen + 1);
if(pDst == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
for(pSrc = pRes; *pSrc; pSrc++) {
if(iscntrl((int) *pSrc))
@@ -2649,7 +2662,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
int iNumCC = 0;
int iLenBuf = 0;
- char *pB;
+ uchar *pB;
for(pB = pRes ; *pB ; ++pB) {
++iLenBuf;
@@ -2659,21 +2672,21 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(iNumCC > 0) { /* if 0, there is nothing to escape, so we are done */
/* OK, let's do the escaping... */
- char *pBStart;
- char szCCEsc[8]; /* buffer for escape sequence */
+ uchar *pBStart;
+ uchar szCCEsc[8]; /* buffer for escape sequence */
int i;
iLenBuf += iNumCC * 4;
- pBStart = pB = malloc((iLenBuf + 1) * sizeof(char));
+ pBStart = pB = malloc((iLenBuf + 1) * sizeof(uchar));
if(pB == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
while(*pRes) {
if(iscntrl((int) *pRes)) {
- snprintf(szCCEsc, sizeof(szCCEsc), "#%3.3d", *pRes);
+ snprintf((char*)szCCEsc, sizeof(szCCEsc), "#%3.3d", *pRes);
for(i = 0 ; i < 4 ; ++i)
*pB++ = szCCEsc[i];
} else {
@@ -2697,10 +2710,10 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(pTpe->data.field.options.bSecPathDrop || pTpe->data.field.options.bSecPathReplace) {
if(pTpe->data.field.options.bSecPathDrop) {
int iLenBuf = 0;
- char *pSrc = pRes;
- char *pDstStart;
- char *pDst;
- char bDropped = 0;
+ uchar *pSrc = pRes;
+ uchar *pDstStart;
+ uchar *pDst;
+ uchar bDropped = 0;
while(*pSrc) {
if(*pSrc++ != '/')
@@ -2715,7 +2728,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
for(pSrc = pRes; *pSrc; pSrc++) {
if(*pSrc != '/')
@@ -2729,9 +2742,9 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pbMustBeFreed = 1;
}
} else {
- char *pSrc;
- char *pDstStart;
- char *pDst;
+ uchar *pSrc;
+ uchar *pDstStart;
+ uchar *pDst;
if(*pbMustBeFreed == 1) {
/* here, again, we can modify the string as we already obtained
@@ -2745,13 +2758,13 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
} else {
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
pDst = pDstStart = malloc(bufLen + 1);
if(pDst == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
for(pSrc = pRes; *pSrc; pSrc++) {
if(*pSrc == '/')
@@ -2771,19 +2784,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
/* check for "." and ".." (note the parenthesis in the if condition!) */
if((*pRes == '.') && (*(pRes + 1) == '\0' || (*(pRes + 1) == '.' && *(pRes + 2) == '\0'))) {
- char *pTmp = pRes;
+ uchar *pTmp = pRes;
if(*(pRes + 1) == '\0')
- pRes = "_";
+ pRes = UCHAR_CONSTANT("_");
else
- pRes = "_.";;
+ pRes = UCHAR_CONSTANT("_.");;
if(*pbMustBeFreed == 1)
free(pTmp);
*pbMustBeFreed = 0;
} else if(*pRes == '\0') {
if(*pbMustBeFreed == 1)
free(pRes);
- pRes = "_";
+ pRes = UCHAR_CONSTANT("_");
bufLen = 1;
*pbMustBeFreed = 0;
}
@@ -2794,19 +2807,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
if(pTpe->data.field.options.bDropLastLF && !pTpe->data.field.options.bEscapeCC) {
int iLn;
- char *pB;
+ uchar *pB;
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
iLn = bufLen;
if(iLn > 0 && *(pRes + iLn - 1) == '\n') {
/* we have a LF! */
/* check if we need to obtain a private copy */
if(*pbMustBeFreed == 0) {
/* ok, original copy, need a private one */
- pB = malloc((iLn + 1) * sizeof(char));
+ pB = malloc((iLn + 1) * sizeof(uchar));
if(pB == NULL) {
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
memcpy(pB, pRes, iLn - 1);
pRes = pB;
@@ -2825,19 +2838,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(pTpe->data.field.options.bCSV) {
/* we need to obtain a private copy, as we need to at least add the double quotes */
int iBufLen;
- char *pBStart;
- char *pDst;
- char *pSrc;
+ uchar *pBStart;
+ uchar *pDst;
+ uchar *pSrc;
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
iBufLen = bufLen;
/* the malloc may be optimized, we currently use the worst case... */
- pBStart = pDst = malloc((2 * iBufLen + 3) * sizeof(char));
+ pBStart = pDst = malloc((2 * iBufLen + 3) * sizeof(uchar));
if(pDst == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
pSrc = pRes;
*pDst++ = '"'; /* starting quote */
@@ -2856,7 +2869,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
*pPropLen = bufLen;
ENDfunc
diff --git a/runtime/msg.h b/runtime/msg.h
index c20fb005..0b346f7b 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -159,7 +159,7 @@ void MsgSetMSGoffs(msg_t *pMsg, short offs);
void MsgSetRawMsgWOSize(msg_t *pMsg, char* pszRawMsg);
void MsgSetRawMsg(msg_t *pMsg, char* pszRawMsg, size_t lenMsg);
rsRetVal MsgReplaceMSG(msg_t *pThis, uchar* pszMSG, int lenMSG);
-char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
+uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propID, size_t *pPropLen, unsigned short *pbMustBeFreed);
char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
@@ -168,7 +168,7 @@ uchar *getRcvFrom(msg_t *pM);
/* TODO: remove these five (so far used in action.c) */
-char *getMSG(msg_t *pM);
+uchar *getMSG(msg_t *pM);
char *getHOSTNAME(msg_t *pM);
char *getPROCID(msg_t *pM, bool bLockMutex);
char *getAPPNAME(msg_t *pM, bool bLockMutex);
diff --git a/runtime/rule.c b/runtime/rule.c
index 3a257a90..182d616a 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -39,6 +39,7 @@
#include "vm.h"
#include "var.h"
#include "srUtils.h"
+#include "unicode-helper.h"
#include "dirty.h" /* for getFIOPName */
/* static data */
@@ -104,7 +105,7 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
{
DEFiRet;
unsigned short pbMustBeFreed;
- char *pszPropVal;
+ uchar *pszPropVal;
int bRet = 0;
size_t propLen;
vm_t *pVM = NULL;
@@ -189,12 +190,12 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
break;
case FIOP_ISEQUAL:
if(rsCStrSzStrCmp(pRule->f_filterData.prop.pCSCompValue,
- (uchar*) pszPropVal, strlen(pszPropVal)) == 0)
+ pszPropVal, ustrlen(pszPropVal)) == 0)
bRet = 1; /* process message! */
break;
case FIOP_STARTSWITH:
if(rsCStrSzStrStartsWithCStr(pRule->f_filterData.prop.pCSCompValue,
- (uchar*) pszPropVal, strlen(pszPropVal)) == 0)
+ pszPropVal, ustrlen(pszPropVal)) == 0)
bRet = 1; /* process message! */
break;
case FIOP_REGEX:
diff --git a/runtime/stream.c b/runtime/stream.c
index 00c726d9..605a9771 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -48,37 +48,27 @@
#include "stream.h"
#include "unicode-helper.h"
#include "module-template.h"
-#include "apc.h"
+#if HAVE_SYS_PRCTL_H
+# include <sys/prctl.h>
+#endif
+
+#define inline
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
-DEFobjCurrIf(apc)
/* forward definitions */
static rsRetVal strmFlush(strm_t *pThis);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
+static void *asyncWriterThread(void *pPtr);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
/* methods */
-/* async flush apc handler
- */
-static void
-flushApc(void *param1, void __attribute__((unused)) *param2)
-{
- DEFVARS_mutexProtection_uncond;
- strm_t *pThis = (strm_t*) param1;
- ISOBJ_TYPE_assert(pThis, strm);
-
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- strmFlush(pThis);
- pThis->apcRequested = 0;
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
-}
-
-
/* Try to resolve a size limit situation. This is used to support custom-file size handlers
* for omfile. It first runs the command, and then checks if we are still above the size
* treshold. Note that this works only with single file names, NOT with circular names.
@@ -139,10 +129,11 @@ resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName)
finalize_it:
if(iRet != RS_RET_OK) {
- if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE)
- dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName);
- else
- dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet);
+ if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) {
+ DBGPRINTF("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName);
+ } else {
+ DBGPRINTF("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet);
+ }
pThis->bDisabled = 1;
}
@@ -280,6 +271,23 @@ finalize_it:
}
+/* wait for the output writer thread to be done. This must be called before actions
+ * that require data to be persisted. May be called in non-async mode and is a null
+ * operation than. Must be called with the mutex locked.
+ */
+static inline void
+strmWaitAsyncWriterDone(strm_t *pThis)
+{
+ BEGINfunc
+ if(pThis->bAsyncWrite) {
+ /* awake writer thread and make it write out everything */
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut);
+ }
+ ENDfunc
+}
+
+
/* close a strm file
* Note that the bDeleteOnClose flag is honored. If it is set, the file will be
* deleted after close. This is in support for the qRead thread.
@@ -292,8 +300,15 @@ static rsRetVal strmCloseFile(strm_t *pThis)
ASSERT(pThis->fd != -1);
dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd);
- if(pThis->tOperationsMode != STREAMMODE_READ)
- strmFlush(pThis);
+ if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) {
+ pThis->bInClose = 1;
+ if(pThis->bAsyncWrite) {
+ strmFlush(pThis);
+ } else {
+ strmWaitAsyncWriterDone(pThis);
+ }
+ pThis->bInClose = 0;
+ }
close(pThis->fd);
pThis->fd = -1;
@@ -569,11 +584,11 @@ ENDobjConstruct(strm)
static rsRetVal strmConstructFinalize(strm_t *pThis)
{
rsRetVal localRet;
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
- CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
pThis->iBufPtrMax = 0; /* results in immediate read request */
if(pThis->iZipLevel) { /* do we need a zip buf? */
localRet = objUse(zlibw, LM_ZLIBW_FILENAME);
@@ -601,9 +616,28 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
- /* if we should call flush apc's, we need a mutex */
+ /* if we have a flush interval, we need to do async writes in any case */
if(pThis->iFlushInterval != 0) {
+ pThis->bAsyncWrite = 1;
+ }
+
+ /* if we work asynchronously, we need a couple of synchronization objects */
+ if(pThis->bAsyncWrite) {
pthread_mutex_init(&pThis->mut, 0);
+ pthread_cond_init(&pThis->notFull, 0);
+ pthread_cond_init(&pThis->notEmpty, 0);
+ pthread_cond_init(&pThis->isEmpty, 0);
+ pThis->iCnt = pThis->iEnq = pThis->iDeq = 0;
+ for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
+ CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ }
+ pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
+ pThis->bStopWriter = 0;
+ if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
+ DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
+ } else {
+ /* we work synchronously, so we need to alloc a fixed pIOBuf */
+ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
}
finalize_it:
@@ -611,12 +645,33 @@ finalize_it:
}
+/* stop the writer thread (we MUST be runnnig asynchronously when this method
+ * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06
+ */
+static inline void
+stopWriter(strm_t *pThis)
+{
+ BEGINfunc
+ pThis->bStopWriter = 1;
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+ pthread_join(pThis->writerThreadID, NULL);
+ ENDfunc
+}
+
+
/* destructor for the strm object */
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
+ int i;
CODESTARTobjDestruct(strm)
+ if(pThis->bAsyncWrite)
+ /* Note: mutex will be unlocked in stopWriter! */
+ d_pthread_mutex_lock(&pThis->mut);
+
if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
+dbgprintf("XXX: destruct stream %p\n", pThis);
/* ... then free resources */
if(pThis->fd != -1)
strmCloseFile(pThis);
@@ -625,16 +680,23 @@ CODESTARTobjDestruct(strm)
objRelease(zlibw, LM_ZLIBW_FILENAME);
}
- if(pThis->iFlushInterval != 0) {
- // TODO: check if there is an apc and remove it!
- pthread_mutex_destroy(&pThis->mut);
- }
-
free(pThis->pszDir);
- free(pThis->pIOBuf);
free(pThis->pZipBuf);
free(pThis->pszCurrFName);
free(pThis->pszFName);
+
+ if(pThis->bAsyncWrite) {
+ stopWriter(pThis);
+ pthread_mutex_destroy(&pThis->mut);
+ pthread_cond_destroy(&pThis->notFull);
+ pthread_cond_destroy(&pThis->notEmpty);
+ pthread_cond_destroy(&pThis->isEmpty);
+ for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
+ free(pThis->asyncBuf[i].pBuf);
+ }
+ } else {
+ free(pThis->pIOBuf);
+ }
ENDobjDestruct(strm)
@@ -650,6 +712,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
if(pThis->fd == -1)
FINALIZE;
+ /* wait for output to be empty, so that our counts are correct */
+ strmWaitAsyncWriterDone(pThis);
+
if(pThis->iCurrOffs >= pThis->iMaxFileSize) {
dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n",
(long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
@@ -730,12 +795,165 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
pWriteBuf += iWritten;
} while(lenBuf > 0); /* Warning: do..while()! */
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
+
finalize_it:
*pLenBuf = iTotalWritten;
RETiRet;
}
+
+/* write memory buffer to a stream object.
+ */
+static inline rsRetVal
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->iZipLevel) {
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ } else {
+ /* write without zipping */
+ CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to "do" an async write call, what primarily means that
+ * the data is handed over to the writer thread (which will then do the actual write
+ * in parallel). Note that the stream mutex has already been locked by the
+ * strmWrite...() calls. Also note that we always have only a single producer,
+ * so we can simply serially assign the next free buffer to it and be sure that
+ * the very some producer comes back in sequence to submit the then-filled buffers.
+ * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06
+ */
+static inline rsRetVal
+doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf);
+ while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
+ d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
+
+ pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf;
+ pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf;
+
+ pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */
+ if(++pThis->iCnt == 1)
+ pthread_cond_signal(&pThis->notEmpty);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* schedule writing to the stream. Depending on our concurrency settings,
+ * this either directly writes to the stream or schedules writing via
+ * the background thread. -- rgerhards, 2009-07-07
+ */
+static rsRetVal
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->bAsyncWrite) {
+ CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
+ } else {
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ }
+
+ pThis->iBufPtr = 0; /* we are at the begin of a new buffer */
+
+finalize_it:
+ RETiRet;
+}
+
+
+
+/* This is the writer thread for asynchronous mode.
+ * -- rgerhards, 2009-07-06
+ */
+static void*
+asyncWriterThread(void *pPtr)
+{
+ int iDeq;
+ struct timespec t;
+ bool bTimedOut = 0;
+ strm_t *pThis = (strm_t*) pPtr;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ BEGINfunc
+# if HAVE_PRCTL && defined PR_SET_NAME
+ if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) {
+ DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
+ }
+#endif
+
+ while(1) { /* loop broken inside */
+ d_pthread_mutex_lock(&pThis->mut);
+ while(pThis->iCnt == 0) {
+ if(pThis->bStopWriter) {
+ pthread_cond_broadcast(&pThis->isEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+ goto finalize_it; /* break main loop */
+ }
+ if(bTimedOut && pThis->iBufPtr > 0) {
+ /* if we timed out, we need to flush pending data */
+ strmFlush(pThis);
+ bTimedOut = 0;
+ continue; /* now we should have data */
+ }
+ bTimedOut = 0;
+ timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */
+ if(pThis->bDoTimedWait) {
+ if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) {
+ int err = errno;
+ if(err == ETIMEDOUT) {
+ bTimedOut = 1;
+ } else {
+ bTimedOut = 1;
+ char errStr[1024];
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n",
+ err, errStr);
+ }
+ }
+ } else {
+ d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
+ }
+ }
+
+ bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
+
+ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+ // TODO: error check????? 2009-07-06
+
+ --pThis->iCnt;
+ if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) {
+ pthread_cond_signal(&pThis->notFull);
+ if(pThis->iCnt == 0)
+ pthread_cond_broadcast(&pThis->isEmpty);
+ }
+ d_pthread_mutex_unlock(&pThis->mut);
+ }
+
+finalize_it:
+ ENDfunc
+ return NULL; /* to keep pthreads happy */
+}
+
+
/* sync the file to disk, so that any unwritten data is persisted. This
* also syncs the directory and thus makes sure that the file survives
* fatal failure. Note that we do NOT return an error status if the
@@ -789,9 +1007,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iWritten = lenBuf;
CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
- pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
/* update user counter, if provided */
if(pThis->pUsrWCntr != NULL)
@@ -839,7 +1055,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
/* see note in file header for the params we use with deflateInit2() */
zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
if(zRet != Z_OK) {
- dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet);
+ DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
ABORT_FINALIZE(RS_RET_ZLIB_ERR);
}
@@ -849,11 +1065,11 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
/* run deflate() on input until output buffer not full, finish
compression if all of source has been read in */
do {
- dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
zstrm.avail_out = pThis->sIOBufSize;
zstrm.next_out = pThis->pZipBuf;
zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
- dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
} while (zstrm.avail_out == 0);
@@ -862,7 +1078,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
zRet = zlibw.DeflateEnd(&zstrm);
if(zRet != Z_OK) {
- dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet);
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
ABORT_FINALIZE(RS_RET_ZLIB_ERR);
}
@@ -871,33 +1087,6 @@ finalize_it:
}
-/* write memory buffer to a stream object.
- * To support direct writes of large objects, this method may be called
- * with a buffer pointing to some region other than the stream buffer itself.
- * However, in that case the stream buffer must be empty (strmFlush() has to
- * be called before), because we would otherwise mess up with the sequence
- * inside the stream. -- rgerhards, 2008-01-10
- */
-static rsRetVal
-strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
-{
- DEFiRet;
-
- ASSERT(pThis != NULL);
- ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
-
- if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
- } else {
- /* write without zipping */
- CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
@@ -911,7 +1100,7 @@ strmFlush(strm_t *pThis)
dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr);
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
RETiRet;
@@ -964,6 +1153,12 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
ASSERT(pThis != NULL);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
+ if(pThis->bDisabled)
+ ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis));
@@ -973,11 +1168,18 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
pThis->iBufPtr++;
finalize_it:
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_unlock(&pThis->mut);
+
RETiRet;
}
-/* write an integer value (actually a long) to a stream object */
+/* write an integer value (actually a long) to a stream object
+ * Note that we do not need to lock the mutex here, because we call
+ * strmWrite(), which does the lock (aka: we must not lock it, else we
+ * would run into a recursive lock, resulting in a deadlock!)
+ */
static rsRetVal strmWriteLong(strm_t *pThis, long i)
{
DEFiRet;
@@ -993,89 +1195,65 @@ finalize_it:
}
-/* schedule an Apc flush request.
- * rgerhards, 2009-06-15
- */
-static inline rsRetVal
-scheduleFlushRequest(strm_t *pThis)
-{
- apc_t *pApc;
- DEFiRet;
-
- if(!pThis->apcRequested) {
- /* we do an request only if none is yet pending */
- pThis->apcRequested = 1;
- // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID));
-dbgprintf("XXX: requesting to add apc!\n");
- CHKiRet(apc.Construct(&pApc));
- CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc));
- CHKiRet(apc.SetParam1(pApc, pThis));
- CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID));
- }
-
-finalize_it:
- RETiRet;
-}
-
-
-/* write memory buffer to a stream object
+/* write memory buffer to a stream object.
+ * process the data in chunks and copy it over to our buffer. The caller-provided data
+ * may theoritically be larger than our buffer. In that case, we do multiple copies. One
+ * may argue if it were more efficient to write out the caller-provided buffer in that case
+ * and earlier versions of rsyslog did this. However, this introduces a lot of complexity
+ * inside the buffered writer and potential performance bottlenecks when trying to solve
+ * it. Now keep in mind that we actually do (almost?) never have a case where the
+ * caller-provided buffer is larger than our one. So instead of optimizing a case
+ * which normally does not exist, we expect some degradation in its case but make us
+ * perform better in the regular cases. -- rgerhards, 2009-07-07
*/
static rsRetVal
strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
- DEFVARS_mutexProtection_uncond;
DEFiRet;
- size_t iPartial;
+ size_t iWrite;
+ size_t iOffset;
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
-dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
-RUNLOG_VAR("%d", pThis->iFlushInterval);
- if(pThis->iFlushInterval != 0) {
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- }
-
- /* check if the to-be-written data is larger than our buffer size */
- if(lenBuf >= pThis->sIOBufSize) {
- /* it is - so we do a direct write, that is most efficient.
- * TODO: is it really? think about disk block sizes!
- */
- CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */
- CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf));
- } else {
- /* data fits into a buffer - we just need to see if it
- * fits into the current buffer...
- */
- if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) {
- /* nope, so we must split it */
- iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
- if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial);
- pThis->iBufPtr += iPartial;
- }
+ iOffset = 0;
+ do {
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
- memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial);
- pThis->iBufPtr = lenBuf - iPartial;
- } else {
- /* we have space, so we simply copy over the string */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
- pThis->iBufPtr += lenBuf;
}
- }
-
- /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at
- * termination. For Zip mode, it could be fatal if we write after each record.
+ iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
+ if(iWrite > lenBuf)
+ iWrite = lenBuf;
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite);
+ pThis->iBufPtr += iWrite;
+ iOffset += iWrite;
+ lenBuf -= iWrite;
+ } while(lenBuf > 0);
+
+ /* now check if the buffer right at the end of the write is full and, if so,
+ * write it. This seems more natural than waiting (hours?) for the next message...
*/
- if(pThis->iFlushInterval != 0)
- scheduleFlushRequest(pThis);
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
+ CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ }
finalize_it:
- if(pThis->iFlushInterval != 0) {
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ if(pThis->bAsyncWrite) {
+ if(pThis->bDoTimedWait == 0) {
+ /* we potentially have a partial buffer, so re-activate the
+ * writer thread that it can set and pick up timeouts.
+ */
+ pThis->bDoTimedWait = 1;
+ pthread_cond_signal(&pThis->notEmpty);
+ }
+ d_pthread_mutex_unlock(&pThis->mut);
}
RETiRet;
@@ -1390,7 +1568,6 @@ ENDobjQueryInterface(strm)
*/
BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
- CHKiRet(objUse(apc, CORE_COMPONENT));
OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty);
diff --git a/runtime/stream.h b/runtime/stream.h
index ac003c7b..64ffb6e1 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -87,6 +87,7 @@ typedef enum { /* when extending, do NOT change existing modes! */
STREAMMODE_WRITE_APPEND = 4
} strmMode_t;
+#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */
/* The strm_t data structure */
typedef struct strm_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
@@ -112,17 +113,32 @@ typedef struct strm_s {
int fd; /* the file descriptor, -1 if closed */
int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */
uchar *pszCurrFName; /* name of current file (if open) */
- uchar *pIOBuf; /* io Buffer */
+ uchar *pIOBuf; /* the iobuffer currently in use to gather data */
size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
size_t iBufPtr; /* pointer into current buffer */
int iUngetC; /* char set via UngetChar() call or -1 if none set */
bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */
+ bool bInClose; /* used to break "deadly close loops", tells us we are already inside a close */
int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */
Bytef *pZipBuf;
/* support for async flush procesing */
+ bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
+ bool bStopWriter; /* shall writer thread terminate? */
+ bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
int iFlushInterval; /* flush in which interval - 0, no flushing */
apc_id_t apcID; /* id of current Apc request (used for cancelling) */
pthread_mutex_t mut;/* mutex for flush in async mode */
+ pthread_cond_t notFull;
+ pthread_cond_t notEmpty;
+ pthread_cond_t isEmpty;
+ unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
+ unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
+ short iCnt; /* current nbr of elements in buffer */
+ struct {
+ uchar *pBuf;
+ size_t lenBuf;
+ } asyncBuf[STREAM_ASYNC_NUMBUFS];
+ pthread_t writerThreadID;
int apcRequested; /* is an apc Requested? */
/* support for omfile size-limiting commands, special counters, NOT persisted! */
off_t iSizeLimit; /* file size limit, 0 = no limit */
@@ -130,6 +146,7 @@ typedef struct strm_s {
bool bIsTTY; /* is this a tty file? */
} strm_t;
+
/* interfaces */
BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(strm_t **ppThis);
diff --git a/runtime/wti.c b/runtime/wti.c
index 156d8116..abdf4add 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -41,7 +41,6 @@
#ifdef OS_SOLARIS
# include <sched.h>
-# define pthread_yield() sched_yield()
#endif
#include "rsyslog.h"
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 87c3b324..596ff866 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -54,6 +54,7 @@
#include "wtp.h"
#include "wti.h"
#include "obj.h"
+#include "unicode-helper.h"
#include "glbl.h"
/* static data */
@@ -422,6 +423,8 @@ wtpWrkrExecCancelCleanup(void *arg)
static void *
wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
{
+ uchar *pszDbgHdr;
+ uchar thrdName[32] = "rs:";
DEFiRet;
DEFVARS_mutexProtection;
wti_t *pWti = (wti_t*) arg;
@@ -437,7 +440,9 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
# if HAVE_PRCTL && defined PR_SET_NAME
/* set thread name - we ignore if the call fails, has no harsh consequences... */
- if(prctl(PR_SET_NAME, wtpGetDbgHdr(pThis), 0, 0, 0) != 0) {
+ pszDbgHdr = wtpGetDbgHdr(pThis);
+ ustrncpy(thrdName+3, pszDbgHdr, 20);
+ if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) {
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", wtpGetDbgHdr(pThis));
}
# endif
diff --git a/tcpclt.c b/tcpclt.c
index c53f00f7..617aaef6 100644
--- a/tcpclt.c
+++ b/tcpclt.c
@@ -297,6 +297,12 @@ Send(tcpclt_t *pThis, void *pData, char *msg, size_t len)
CHKiRet(TCPSendBldFrame(pThis, &msg, &len, &bMsgMustBeFreed));
+ if(pThis->iRebindInterval > 0 && ++pThis->iNumMsgs == pThis->iRebindInterval) {
+ /* we need to rebind, and use the retry logic for this*/
+ CHKiRet(pThis->prepRetryFunc(pData)); /* try to recover */
+ pThis->iNumMsgs = 0;
+ }
+
while(!bDone) { /* loop is broken when send succeeds or error occurs */
CHKiRet(pThis->initFunc(pData));
iRet = pThis->sendFunc(pData, msg, len);
@@ -388,6 +394,13 @@ SetFraming(tcpclt_t *pThis, TCPFRAMINGMODE framing)
pThis->tcp_framing = framing;
RETiRet;
}
+static rsRetVal
+SetRebindInterval(tcpclt_t *pThis, int iRebindInterval)
+{
+ DEFiRet;
+ pThis->iRebindInterval = iRebindInterval;
+ RETiRet;
+}
/* Standard-Constructor
@@ -445,6 +458,7 @@ CODESTARTobjQueryInterface(tcpclt)
pIf->SetSendFrame = SetSendFrame;
pIf->SetSendPrepRetry = SetSendPrepRetry;
pIf->SetFraming = SetFraming;
+ pIf->SetRebindInterval = SetRebindInterval;
finalize_it:
ENDobjQueryInterface(tcpclt)
diff --git a/tcpclt.h b/tcpclt.h
index 1d704044..5a8eba75 100644
--- a/tcpclt.h
+++ b/tcpclt.h
@@ -36,6 +36,8 @@ typedef struct tcpclt_s {
short bResendLastOnRecon; /* should the last message be resent on a successful reconnect? */
size_t lenPrevMsg;
/* session specific callbacks */
+ int iRebindInterval; /* how often should the send socket be rebound? */
+ int iNumMsgs; /* number of messages during current "rebind session" */
rsRetVal (*initFunc)(void*);
rsRetVal (*sendFunc)(void*, char*, size_t);
rsRetVal (*prepRetryFunc)(void*);
@@ -55,8 +57,10 @@ BEGINinterface(tcpclt) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetSendFrame)(tcpclt_t*, rsRetVal (*)(void*, char*, size_t));
rsRetVal (*SetSendPrepRetry)(tcpclt_t*, rsRetVal (*)(void*));
rsRetVal (*SetFraming)(tcpclt_t*, TCPFRAMINGMODE framing);
+ /* v3, 2009-07-14*/
+ rsRetVal (*SetRebindInterval)(tcpclt_t*, int iRebindInterval);
ENDinterface(tcpclt)
-#define tcpcltCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */
+#define tcpcltCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/tcpsrv.c b/tcpsrv.c
index e8ea2b98..0be723d1 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -261,7 +261,7 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
}
/* finally close our listen streams */
- for(i = 0 ; i < pThis->iLstnMax ; ++i) {
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
netstrm.Destruct(pThis->ppLstn + i);
}
}
@@ -280,12 +280,12 @@ addTcpLstn(void *pUsr, netstrm_t *pLstn)
ISOBJ_TYPE_assert(pThis, tcpsrv);
ISOBJ_TYPE_assert(pLstn, netstrm);
- if(pThis->iLstnMax >= TCPLSTN_MAX_DEFAULT)
+ if(pThis->iLstnCurr >= pThis->iLstnMax)
ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED);
- pThis->ppLstn[pThis->iLstnMax] = pLstn;
- pThis->ppLstnPort[pThis->iLstnMax] = pPortList;
- ++pThis->iLstnMax;
+ pThis->ppLstn[pThis->iLstnCurr] = pLstn;
+ pThis->ppLstnPort[pThis->iLstnCurr] = pPortList;
+ ++pThis->iLstnCurr;
finalize_it:
RETiRet;
@@ -327,15 +327,19 @@ finalize_it:
static rsRetVal
create_tcp_socket(tcpsrv_t *pThis)
{
- tcpLstnPortList_t *pEntry;
DEFiRet;
+ rsRetVal localRet;
+ tcpLstnPortList_t *pEntry;
ISOBJ_TYPE_assert(pThis, tcpsrv);
/* init all configured ports */
pEntry = pThis->pLstnPorts;
while(pEntry != NULL) {
- CHKiRet(initTCPListener(pThis, pEntry));
+ localRet = initTCPListener(pThis, pEntry);
+ if(localRet != RS_RET_OK) {
+ errmsg.LogError(0, localRet, "Could not create tcp listener, ignoring port %s.", pEntry->pszPort);
+ }
pEntry = pEntry->pNext;
}
@@ -481,7 +485,6 @@ Run(tcpsrv_t *pThis)
* this thread. Thus, we also need to instantiate a cancel cleanup handler
* to prevent us from leaking anything. -- rgerharsd, 20080-04-24
*/
-RUNLOG_STR("XXXX: tcp server runs\n");
pthread_cleanup_push(RunCancelCleanup, (void*) &pSel);
while(1) {
CHKiRet(nssel.Construct(&pSel));
@@ -489,7 +492,7 @@ RUNLOG_STR("XXXX: tcp server runs\n");
CHKiRet(nssel.ConstructFinalize(pSel));
/* Add the TCP listen sockets to the list of read descriptors. */
- for(i = 0 ; i < pThis->iLstnMax ; ++i) {
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
CHKiRet(nssel.Add(pSel, pThis->ppLstn[i], NSDSEL_RD));
}
@@ -502,11 +505,10 @@ RUNLOG_STR("XXXX: tcp server runs\n");
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
-RUNLOG_STR("XXXX: tcp server select\n");
/* wait for io to become ready */
CHKiRet(nssel.Wait(pSel, &nfds));
- for(i = 0 ; i < pThis->iLstnMax ; ++i) {
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]);
@@ -514,7 +516,6 @@ RUNLOG_STR("XXXX: tcp server select\n");
--nfds; /* indicate we have processed one */
}
}
-RUNLOG_STR("XXXX: tcp server post select\n");
/* now check the sessions */
iTCPSess = TCPSessGetNxtSess(pThis, -1);
@@ -578,7 +579,8 @@ finalize_it: /* this is a very special case - this time only we do not exit the
/* Standard-Constructor */
BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
- pThis->iSessMax = TCPSESS_MAX_DEFAULT; /* TODO: useful default ;) */
+ pThis->iSessMax = TCPSESS_MAX_DEFAULT;
+ pThis->iLstnMax = TCPLSTN_MAX_DEFAULT;
pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
pThis->OnMsgReceive = NULL;
ENDobjConstruct(tcpsrv)
@@ -602,8 +604,8 @@ tcpsrvConstructFinalize(tcpsrv_t *pThis)
CHKiRet(netstrms.ConstructFinalize(pThis->pNS));
/* set up listeners */
- CHKmalloc(pThis->ppLstn = calloc(TCPLSTN_MAX_DEFAULT, sizeof(netstrm_t*)));
- CHKmalloc(pThis->ppLstnPort = calloc(TCPLSTN_MAX_DEFAULT, sizeof(tcpLstnPortList_t*)));
+ CHKmalloc(pThis->ppLstn = calloc(pThis->iLstnMax, sizeof(netstrm_t*)));
+ CHKmalloc(pThis->ppLstnPort = calloc(pThis->iLstnMax, sizeof(tcpLstnPortList_t*)));
iRet = pThis->OpenLstnSocks(pThis);
finalize_it:
@@ -820,6 +822,20 @@ SetDrvrPermPeers(tcpsrv_t *pThis, permittedPeers_t *pPermPeers)
* -------------------------------------------------------------------------- */
+/* set max number of listeners
+ * this must be called before ConstructFinalize, or it will have no effect!
+ * rgerhards, 2009-08-17
+ */
+static rsRetVal
+SetLstnMax(tcpsrv_t *pThis, int iMax)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->iLstnMax = iMax;
+ RETiRet;
+}
+
+
/* set max number of sessions
* this must be called before ConstructFinalize, or it will have no effect!
* rgerhards, 2009-04-09
@@ -862,6 +878,7 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->SetInputName = SetInputName;
pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
pIf->SetSessMax = SetSessMax;
+ pIf->SetLstnMax = SetLstnMax;
pIf->SetDrvrMode = SetDrvrMode;
pIf->SetDrvrAuthMode = SetDrvrAuthMode;
pIf->SetDrvrPermPeers = SetDrvrPermPeers;
diff --git a/tcpsrv.h b/tcpsrv.h
index 70682398..64065aab 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -54,9 +54,10 @@ struct tcpsrv_s {
uchar *pszInputName; /**< value to be used as input name */
ruleset_t *pRuleset; /**< ruleset to bind to */
permittedPeers_t *pPermPeers;/**< driver's permitted peers */
- int iLstnMax; /**< max nbr of listeners currently supported */
+ int iLstnCurr; /**< max nbr of listeners currently supported */
netstrm_t **ppLstn; /**< our netstream listners */
tcpLstnPortList_t **ppLstnPort; /**< pointer to relevant listen port description */
+ int iLstnMax; /**< max number of listners supported */
int iSessMax; /**< max number of sessions supported */
tcpLstnPortList_t *pLstnPorts; /**< head pointer for listen ports */
int addtlFrameDelim; /**< additional frame delimiter for plain TCP syslog framing (e.g. to handle NetScreen) */
@@ -111,8 +112,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
/* added v6 */
rsRetVal (*SetOnMsgReceive)(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)); /* 2009-05-24 */
rsRetVal (*SetRuleset)(tcpsrv_t *pThis, ruleset_t*); /* 2009-06-12 */
+ /* added v7 */
+ rsRetVal (*SetLstnMax)(tcpsrv_t *pThis, int iMaxLstn); /* 2009-08-17 */
ENDinterface(tcpsrv)
-#define tcpsrvCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
+#define tcpsrvCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
/* change for v4:
* - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10
* - SetInputName() added -- rgerhards, 2008-12-10
diff --git a/tests/diag.sh b/tests/diag.sh
index b2bd13ac..13bb877d 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -9,7 +9,7 @@
#valgrind="valgrind --tool=drd --log-fd=1"
#valgrind="valgrind --tool=helgrind --log-fd=1"
#set -o xtrace
-#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+#export RSYSLOG_DEBUG="debug nostdout printmutexaction"
#export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
diff --git a/tests/killrsyslog.sh b/tests/killrsyslog.sh
index b1be757b..c9b6e0ac 100755
--- a/tests/killrsyslog.sh
+++ b/tests/killrsyslog.sh
@@ -2,6 +2,6 @@
if [ -e "rsyslog.pid" ]
then
echo rsyslog.pid exists, trying to shut down rsyslogd process `cat rsyslog.pid`.
- kill `cat rsyslog.pid`
+ kill -9 `cat rsyslog.pid`
sleep 1
fi
diff --git a/tests/nettester.c b/tests/nettester.c
index b5da8ee8..209c2a6f 100644
--- a/tests/nettester.c
+++ b/tests/nettester.c
@@ -142,7 +142,7 @@ tcpSend(char *buf, int lenBuf)
if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
break;
} else {
- if(retries++ == 30) {
+ if(retries++ == 50) {
++iFailed;
fprintf(stderr, "connect() failed\n");
return(1);
diff --git a/tests/tcpflood.c b/tests/tcpflood.c
index 2ca796ca..0439e33e 100644
--- a/tests/tcpflood.c
+++ b/tests/tcpflood.c
@@ -61,6 +61,7 @@ int openConn(int *fd)
{
int sock;
struct sockaddr_in addr;
+ int retries = 0;
if((sock=socket(AF_INET, SOCK_STREAM, 0))==-1) {
perror("socket()");
@@ -74,11 +75,19 @@ int openConn(int *fd)
fprintf(stderr, "inet_aton() failed\n");
return(1);
}
- if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
- perror("connect()");
- fprintf(stderr, "connect() failed\n");
- return(1);
- }
+ while(1) { /* loop broken inside */
+ if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
+ break;
+ } else {
+ if(retries++ == 50) {
+ perror("connect()");
+ fprintf(stderr, "connect() failed\n");
+ return(1);
+ } else {
+ usleep(100000); /* ms = 1000 us! */
+ }
+ }
+ }
*fd = sock;
return 0;
diff --git a/tests/testsuites/upcase-date.parse1 b/tests/testsuites/upcase-date.parse1
new file mode 100644
index 00000000..2d21222a
--- /dev/null
+++ b/tests/testsuites/upcase-date.parse1
@@ -0,0 +1,4 @@
+<6>AUG 10 22:18:24 2009 netips-warden2-p [audit] user=[*SMS] src=192.168.11.11 iface=5 access=9 Update State Reset
+6,kern,info,Aug 10 22:18:24,2009,,, netips-warden2-p [audit] user=[*SMS] src=192.168.11.11 iface=5 access=9 Update State Reset
+#Example from RFC3164, section 5.4
+#Only the first two lines are important, you may place anything behind them!
diff --git a/tools/omfile.c b/tools/omfile.c
index 82944a96..bb12b4b6 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -401,12 +401,17 @@ prepareFile(instanceData *pData, uchar *newFileName)
CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName)));
CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel));
CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize));
- CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval));
CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND));
CHKiRet(strm.SettOpenMode(pData->pStrm, fCreateMode));
CHKiRet(strm.SetbSync(pData->pStrm, pData->bSyncFile));
CHKiRet(strm.SetsType(pData->pStrm, STREAMTYPE_FILE_SINGLE));
CHKiRet(strm.SetiSizeLimit(pData->pStrm, pData->iSizeLimit));
+ /* set the flush interval only if we actually use it - otherwise it will activate
+ * async processing, which is a real performance waste if we do not do buffered
+ * writes! -- rgerhards, 2009-07-06
+ */
+ if(!pData->bFlushOnTXEnd)
+ CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval));
if(pData->pszSizeLimitCmd != NULL)
CHKiRet(strm.SetpszSizeLimitCmd(pData->pStrm, ustrdup(pData->pszSizeLimitCmd)));
CHKiRet(strm.ConstructFinalize(pData->pStrm));
diff --git a/tools/omfwd.c b/tools/omfwd.c
index d207cce5..fe65f515 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -90,6 +90,7 @@ typedef struct _instanceData {
char *port;
int protocol;
int iUDPRebindInterval; /* rebind interval */
+ int iTCPRebindInterval; /* rebind interval */
int nXmit; /* number of transmissions since last (re-)bind */
# define FORW_UDP 0
# define FORW_TCP 1
@@ -104,6 +105,7 @@ static short iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 m
static short bResendLastOnRecon = 0; /* should the last message be re-sent on a successful reconnect? */
static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */
static int iUDPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */
+static int iTCPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */
static permittedPeers_t *pPermPeers = NULL;
@@ -643,6 +645,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* copy over config data as needed */
pData->iUDPRebindInterval = iUDPRebindInterval;
+ pData->iTCPRebindInterval = iTCPRebindInterval;
/* process template */
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS,
@@ -657,6 +660,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
CHKiRet(tcpclt.SetSendFrame(pData->pTCPClt, TCPSendFrame));
CHKiRet(tcpclt.SetSendPrepRetry(pData->pTCPClt, TCPSendPrepRetry));
CHKiRet(tcpclt.SetFraming(pData->pTCPClt, tcp_framing));
+ CHKiRet(tcpclt.SetRebindInterval(pData->pTCPClt, pData->iTCPRebindInterval));
pData->iStrmDrvrMode = iStrmDrvrMode;
if(pszStrmDrvr != NULL)
CHKmalloc(pData->pszStrmDrvr = (uchar*)strdup((char*)pszStrmDrvr));
@@ -728,6 +732,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iStrmDrvrMode = 0;
bResendLastOnRecon = 0;
iUDPRebindInterval = 0;
+ iTCPRebindInterval = 0;
return RS_RET_OK;
}
@@ -742,6 +747,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(net,LM_NET_FILENAME));
CHKiRet(regCfSysLineHdlr((uchar *)"actionforwarddefaulttemplate", 0, eCmdHdlrGetWord, NULL, &pszTplName, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionsendtcprebindinterval", 0, eCmdHdlrInt, NULL, &iTCPRebindInterval, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionsendudprebindinterval", 0, eCmdHdlrInt, NULL, &iUDPRebindInterval, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriver", 0, eCmdHdlrGetWord, NULL, &pszStrmDrvr, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdrivermode", 0, eCmdHdlrInt, NULL, &iStrmDrvrMode, NULL));
diff --git a/tools/syslogd.c b/tools/syslogd.c
index e52a524a..88588621 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -1218,7 +1218,7 @@ int parseLegacySyslogMsg(msg_t *pMsg, int flags)
if(flags & PARSE_HOSTNAME) {
i = 0;
while((isalnum(p2parse[i]) || p2parse[i] == '.' || p2parse[i] == '.'
- || p2parse[i] == '_') && i < CONF_TAG_MAXSIZE) {
+ || p2parse[i] == '_' || p2parse[i] == '-') && i < CONF_TAG_MAXSIZE) {
bufParseHOSTNAME[i] = p2parse[i];
++i;
}
@@ -2550,8 +2550,8 @@ mainloop(void)
* powertop, for example). In that case, we primarily wait for a signal,
* but a once-a-day wakeup should be quite acceptable. -- rgerhards, 2008-06-09
*/
- //tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/;
- tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */
+ tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/;
+ //tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */
tvSelectTimeout.tv_usec = 0;
select(1, NULL, NULL, NULL, &tvSelectTimeout);
if(bFinished)
@@ -2586,7 +2586,7 @@ mainloop(void)
bHadHUP = 0;
continue;
}
- execScheduled(); /* handle Apc calls (if any) */
+ // TODO: remove execScheduled(); /* handle Apc calls (if any) */
}
ENDfunc
}