summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog14
-rw-r--r--action.c3
-rw-r--r--configure.ac2
-rw-r--r--doc/manual.html6
-rw-r--r--doc/rsyslog_conf_global.html7
-rw-r--r--runtime/apc.c2
-rw-r--r--runtime/debug.c4
-rw-r--r--runtime/debug.h3
-rw-r--r--runtime/msg.c319
-rw-r--r--runtime/msg.h4
-rw-r--r--runtime/rule.c7
-rw-r--r--runtime/stream.c442
-rw-r--r--runtime/stream.h19
-rw-r--r--runtime/wtp.c7
-rw-r--r--tcpclt.c14
-rw-r--r--tcpclt.h6
-rwxr-xr-xtests/diag.sh2
-rwxr-xr-xtests/killrsyslog.sh2
-rw-r--r--tests/nettester.c2
-rw-r--r--tests/tcpflood.c19
-rw-r--r--tools/omfile.c7
-rw-r--r--tools/omfwd.c6
-rw-r--r--tools/syslogd.c6
23 files changed, 587 insertions, 316 deletions
diff --git a/ChangeLog b/ChangeLog
index e44b9f83..f5f3940d 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,11 @@
---------------------------------------------------------------------------
-Version 4.5.1 [DEVEL] (rgerhards), 2009-07-??
+Version 4.5.2 [DEVEL] (rgerhards), 2009-07-??
+- bugfix: memory leak with some input modules. Those inputs that
+ use parseAndSubmitMsg() leak two small memory blocks with every message.
+ Typically, those process only relatively few messages, so the issue
+ does most probably not have any effect in practice.
+---------------------------------------------------------------------------
+Version 4.5.1 [DEVEL] (rgerhards), 2009-07-15
- CONFIG CHANGE: $HUPisRestart default is now "off". We are doing this
to support removal of restart-type HUP in v5.
- bugfix: fromhost-ip was sometimes truncated
@@ -16,6 +22,12 @@ Version 4.5.1 [DEVEL] (rgerhards), 2009-07-??
- bugfix: message could be truncated after TAG, often when forwarding
This was a result of an internal processing error if maximum field
sizes had been specified in the property replacer.
+- added ability for the TCP output action to "rebind" its send socket after
+ sending n messages (actually, it re-opens the connection, the name is
+ used because this is a concept very similiar to $ActionUDPRebindInterval).
+ New config directive $ActionSendTCPRebindInterval added for the purpose.
+ By default, rebinding is disabled. This is considered useful for load
+ balancers.
- testbench improvements
---------------------------------------------------------------------------
Version 4.5.0 [DEVEL] (rgerhards), 2009-07-02
diff --git a/action.c b/action.c
index f21feea6..bcb23659 100644
--- a/action.c
+++ b/action.c
@@ -43,6 +43,7 @@
#include "srUtils.h"
#include "errmsg.h"
#include "datetime.h"
+#include "unicode-helper.h"
/* forward definitions */
rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
@@ -780,7 +781,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg)
/* suppress duplicate messages */
if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
(pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
- !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
+ !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
!strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
!strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) &&
!strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) {
diff --git a/configure.ac b/configure.ac
index 2fa496c7..8709c25a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-AC_INIT([rsyslog],[4.5.0],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[4.5.1],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
AC_CONFIG_SRCDIR([ChangeLog])
AC_CONFIG_MACRO_DIR([m4])
diff --git a/doc/manual.html b/doc/manual.html
index d473f485..0fb91e1d 100644
--- a/doc/manual.html
+++ b/doc/manual.html
@@ -19,9 +19,9 @@ rsyslog support</a> available directly from the source!</p>
<p><b>Please visit the <a href="http://www.rsyslog.com/sponsors">rsyslog sponsor's page</a>
to honor the project sponsors or become one yourself!</b> We are very grateful for any help towards the
project goals.</p>
-<p><b>This documentation is for version 4.5.0 (beta branch) of rsyslog.</b>
-Visit the <i> <a href="http://www.rsyslog.com/doc-status.html">rsyslog status page</a></i></b> to obtain current
-version information and project status.
+<p><b>This documentation is for version 4.5.1 (beta branch) of rsyslog.</b>
+Visit the <i><a href="http://www.rsyslog.com/doc-status.html">rsyslog status page</a></i></b>
+to obtain current version information and project status.
</p><p><b>If you like rsyslog, you might
want to lend us a helping hand. </b>It doesn't require a lot of
time - even a single mouse click helps. Learn <a href="how2help.html">how to help the rsyslog project</a>.
diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html
index 1fe72c5f..2bbb136e 100644
--- a/doc/rsyslog_conf_global.html
+++ b/doc/rsyslog_conf_global.html
@@ -96,6 +96,13 @@ default 60000 (1 minute)]</li>
(driver-specific)</li><li>$ActionSendStreamDriverAuthMode &lt;mode&gt;,&nbsp; authentication mode to use with the stream driver
(driver-specific)</li><li>$ActionSendStreamDriverPermittedPeer &lt;ID&gt;,&nbsp; accepted fingerprint (SHA1) or name of remote peer
(driver-specific) -<span style="font-weight: bold;"> directive may go away</span>!</li>
+<li><b>$ActionSendTCPRebindInterval</b> nbr</a>- [available since 4.5.1] - instructs the TCP send
+action to close and re-open the connection to the remote host every nbr of messages sent.
+Zero, the default, means that no such processing is done. This directive is useful for
+use with load-balancers. Note that there is some performance overhead associated with it,
+so it is advisable to not too often &quot;rebind&quot; the connection (what
+&quot;too often&quot; actually means depends on your configuration, a rule of thumb is
+that it should be not be much more often than once per second).</li>
<li><b>$ActionSendUDPRebindInterval</b> nbr</a>- [available since 4.3.2] - instructs the UDP send
action to rebind the send socket every nbr of messages sent. Zero, the default, means
that no rebind is done. This directive is useful for use with load-balancers.</li>
diff --git a/runtime/apc.c b/runtime/apc.c
index 5919191d..bc330e39 100644
--- a/runtime/apc.c
+++ b/runtime/apc.c
@@ -335,9 +335,11 @@ CancelApc(apc_id_t id)
{
DEFVARS_mutexProtection_uncond;
+ BEGINfunc
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
deleteApc(id);
END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
+ ENDfunc
return RS_RET_OK;
}
diff --git a/runtime/debug.c b/runtime/debug.c
index 4ee90226..807fd3f7 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -732,6 +732,8 @@ static void dbgGetThrdName(char *pszBuf, size_t lenBuf, pthread_t thrd, int bInc
*/
void dbgSetThrdName(uchar *pszName)
{
+return;
+
dbgThrdInfo_t *pThrd = dbgGetThrdInfo();
if(pThrd->pszThrdName != NULL)
free(pThrd->pszThrdName);
@@ -776,7 +778,7 @@ static void dbgCallStackPrint(dbgThrdInfo_t *pThrd)
/* print all threads call stacks
*/
-static void dbgCallStackPrintAll(void)
+void dbgCallStackPrintAll(void)
{
dbgThrdInfo_t *pThrd;
/* stack info */
diff --git a/runtime/debug.h b/runtime/debug.h
index 1375493d..dcbfb930 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -134,8 +134,7 @@ void dbgPrintAllDebugInfo(void);
/* debug aides */
-//#ifdef RTINST
-#if 0 // temporarily removed for helgrind
+#ifdef RTINST
#define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
diff --git a/runtime/msg.c b/runtime/msg.c
index d29da560..de298871 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -1159,16 +1159,16 @@ int getMSGLen(msg_t *pM)
return((pM == NULL) ? 0 : pM->iLenMSG);
}
-char *getMSG(msg_t *pM)
+uchar *getMSG(msg_t *pM)
{
- char *ret;
+ uchar *ret;
if(pM == NULL)
- ret = "";
+ ret = UCHAR_CONSTANT("");
else {
if(pM->offMSG == -1)
- ret = "";
+ ret = UCHAR_CONSTANT("");
else
- ret = (char*)(pM->pszRawMsg + pM->offMSG);
+ ret = pM->pszRawMsg + pM->offMSG;
}
return ret;
}
@@ -1613,21 +1613,23 @@ static inline void tryEmulateTAG(msg_t *pM, bool bLockMutex)
}
-static inline char *getTAG(msg_t *pM)
+static inline void
+getTAG(msg_t *pM, uchar **ppBuf, int *piLen)
{
- char *ret;
-
- if(pM == NULL)
- ret = "";
- else {
+ if(pM == NULL) {
+ *ppBuf = UCHAR_CONSTANT("");
+ *piLen = 0;
+ } else {
if(pM->iLenTAG == 0)
tryEmulateTAG(pM, LOCK_MUTEX);
- if(pM->iLenTAG == 0)
- ret = "";
- else
- ret = (char*) ((pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG);
+ if(pM->iLenTAG == 0) {
+ *ppBuf = UCHAR_CONSTANT("");
+ *piLen = 0;
+ } else {
+ *ppBuf = (pM->iLenTAG < CONF_TAG_BUFSIZE) ? pM->TAG.szBuf : pM->TAG.pszTAG;
+ *piLen = pM->iLenTAG;
+ }
}
- return(ret);
}
@@ -2130,14 +2132,14 @@ static uchar *getNOW(eNOWType eNow)
* be used in selector line processing.
* rgerhards 2005-09-15
*/
-char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
+uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propID, size_t *pPropLen,
unsigned short *pbMustBeFreed)
{
- char *pRes; /* result pointer */
+ uchar *pRes; /* result pointer */
int bufLen = -1; /* length of string or -1, if not known */
- char *pBufStart;
- char *pBuf;
+ uchar *pBufStart;
+ uchar *pBuf;
int iLen;
short iOffs;
@@ -2159,16 +2161,16 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
bufLen = getMSGLen(pMsg);
break;
case PROP_TIMESTAMP:
- pRes = getTimeReported(pMsg, pTpe->data.field.eDateFormat);
+ pRes = (uchar*)getTimeReported(pMsg, pTpe->data.field.eDateFormat);
break;
case PROP_HOSTNAME:
- pRes = getHOSTNAME(pMsg);
+ pRes = (uchar*)getHOSTNAME(pMsg);
break;
case PROP_SYSLOGTAG:
- pRes = getTAG(pMsg);
+ getTAG(pMsg, &pRes, &bufLen);
break;
case PROP_RAWMSG:
- pRes = getRawMsg(pMsg);
+ pRes = (uchar*)getRawMsg(pMsg);
break;
/* enable this, if someone actually uses UxTradMsg, delete after some time has
* passed and nobody complained -- rgerhards, 2009-06-16
@@ -2177,120 +2179,120 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
break;
*/
case PROP_INPUTNAME:
- getInputName(pMsg, ((uchar**) &pRes), &bufLen);
+ getInputName(pMsg, &pRes, &bufLen);
break;
case PROP_FROMHOST:
- pRes = (char*) getRcvFrom(pMsg);
+ pRes = getRcvFrom(pMsg);
break;
case PROP_FROMHOST_IP:
- pRes = (char*) getRcvFromIP(pMsg);
+ pRes = getRcvFromIP(pMsg);
break;
case PROP_PRI:
- pRes = getPRI(pMsg);
+ pRes = (uchar*)getPRI(pMsg);
break;
case PROP_PRI_TEXT:
- pBuf = malloc(20 * sizeof(char));
+ pBuf = malloc(20 * sizeof(uchar));
if(pBuf == NULL) {
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else {
*pbMustBeFreed = 1;
- pRes = textpri(pBuf, 20, getPRIi(pMsg));
+ pRes = (uchar*)textpri((char*)pBuf, 20, getPRIi(pMsg));
}
break;
case PROP_IUT:
- pRes = "1"; /* always 1 for syslog messages (a MonitorWare thing;)) */
+ pRes = UCHAR_CONSTANT("1"); /* always 1 for syslog messages (a MonitorWare thing;)) */
break;
case PROP_SYSLOGFACILITY:
- pRes = getFacility(pMsg);
+ pRes = (uchar*)getFacility(pMsg);
break;
case PROP_SYSLOGFACILITY_TEXT:
- pRes = getFacilityStr(pMsg);
+ pRes = (uchar*)getFacilityStr(pMsg);
break;
case PROP_SYSLOGSEVERITY:
- pRes = getSeverity(pMsg);
+ pRes = (uchar*)getSeverity(pMsg);
break;
case PROP_SYSLOGSEVERITY_TEXT:
- pRes = getSeverityStr(pMsg);
+ pRes = (uchar*)getSeverityStr(pMsg);
break;
case PROP_TIMEGENERATED:
- pRes = getTimeGenerated(pMsg, pTpe->data.field.eDateFormat);
+ pRes = (uchar*)getTimeGenerated(pMsg, pTpe->data.field.eDateFormat);
break;
case PROP_PROGRAMNAME:
- pRes = getProgramName(pMsg, LOCK_MUTEX);
+ pRes = (uchar*)getProgramName(pMsg, LOCK_MUTEX);
break;
case PROP_PROTOCOL_VERSION:
- pRes = getProtocolVersionString(pMsg);
+ pRes = (uchar*)getProtocolVersionString(pMsg);
break;
case PROP_STRUCTURED_DATA:
- pRes = getStructuredData(pMsg);
+ pRes = (uchar*)getStructuredData(pMsg);
break;
case PROP_APP_NAME:
- pRes = getAPPNAME(pMsg, LOCK_MUTEX);
+ pRes = (uchar*)getAPPNAME(pMsg, LOCK_MUTEX);
break;
case PROP_PROCID:
- pRes = getPROCID(pMsg, LOCK_MUTEX);
+ pRes = (uchar*)getPROCID(pMsg, LOCK_MUTEX);
break;
case PROP_MSGID:
- pRes = getMSGID(pMsg);
+ pRes = (uchar*)getMSGID(pMsg);
break;
case PROP_SYS_NOW:
- if((pRes = (char*) getNOW(NOW_NOW)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_NOW)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_YEAR:
- if((pRes = (char*) getNOW(NOW_YEAR)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_YEAR)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_MONTH:
- if((pRes = (char*) getNOW(NOW_MONTH)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_MONTH)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_DAY:
- if((pRes = (char*) getNOW(NOW_DAY)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_DAY)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_HOUR:
- if((pRes = (char*) getNOW(NOW_HOUR)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_HOUR)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_HHOUR:
- if((pRes = (char*) getNOW(NOW_HHOUR)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_HHOUR)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_QHOUR:
- if((pRes = (char*) getNOW(NOW_QHOUR)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_QHOUR)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_MINUTE:
- if((pRes = (char*) getNOW(NOW_MINUTE)) == NULL) {
- return "***OUT OF MEMORY***";
+ if((pRes = getNOW(NOW_MINUTE)) == NULL) {
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
} else
*pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
break;
case PROP_SYS_MYHOSTNAME:
- pRes = (char*) glbl.GetLocalHostName();
+ pRes = glbl.GetLocalHostName();
break;
default:
/* there is no point in continuing, we may even otherwise render the
* error message unreadable. rgerhards, 2007-07-10
*/
dbgprintf("invalid property id: '%d'\n", propID);
- return "**INVALID PROPERTY NAME**";
+ return UCHAR_CONSTANT("**INVALID PROPERTY NAME**");
}
@@ -2310,8 +2312,8 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
if(pTpe->data.field.has_fields == 1) {
size_t iCurrFld;
- char *pFld;
- char *pFldEnd;
+ uchar *pFld;
+ uchar *pFldEnd;
/* first, skip to the field in question. The field separator
* is always one character and is stored in the template entry.
*/
@@ -2349,7 +2351,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
/* now copy */
memcpy(pBuf, pFld, iLen);
@@ -2366,12 +2368,12 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**FIELD NOT FOUND**";
+ return UCHAR_CONSTANT("**FIELD NOT FOUND**");
}
} else if(pTpe->data.field.iFromPos != 0 || pTpe->data.field.iToPos != 0) {
/* we need to obtain a private copy */
int iFrom, iTo;
- char *pSb;
+ uchar *pSb;
iFrom = pTpe->data.field.iFromPos;
iTo = pTpe->data.field.iToPos;
/* need to zero-base to and from (they are 1-based!) */
@@ -2379,44 +2381,55 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
--iFrom;
if(iTo > 0)
--iTo;
- iLen = iTo - iFrom + 1; /* the +1 is for an actual char, NOT \0! */
- pBufStart = pBuf = malloc((iLen + 1) * sizeof(char));
- if(pBuf == NULL) {
- if(*pbMustBeFreed == 1)
- free(pRes);
- *pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
- }
- pSb = pRes;
- if(iFrom) {
- /* skip to the start of the substring (can't do pointer arithmetic
- * because the whole string might be smaller!!)
- */
- while(*pSb && iFrom) {
- --iFrom;
+ if(bufLen == -1)
+ bufLen = ustrlen(pRes);
+ if(iFrom == 0 && iTo >= bufLen) {
+ /* in this case, the requested string is a superset of what we already have,
+ * so there is no need to do any processing. This is a frequent case for size-limited
+ * fields like TAG in the default forwarding template (so it is a useful optimization
+ * to check for this condition ;)). -- rgerhards, 2009-07-09
+ */
+ ; /*DO NOTHING*/
+ } else {
+ iLen = iTo - iFrom + 1; /* the +1 is for an actual char, NOT \0! */
+ pBufStart = pBuf = malloc((iLen + 1) * sizeof(char));
+ if(pBuf == NULL) {
+ if(*pbMustBeFreed == 1)
+ free(pRes);
+ *pbMustBeFreed = 0;
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
+ }
+ pSb = pRes;
+ if(iFrom) {
+ /* skip to the start of the substring (can't do pointer arithmetic
+ * because the whole string might be smaller!!)
+ */
+ while(*pSb && iFrom) {
+ --iFrom;
+ ++pSb;
+ }
+ }
+ /* OK, we are at the begin - now let's copy... */
+ bufLen = iLen;
+ while(*pSb && iLen) {
+ *pBuf++ = *pSb;
++pSb;
+ --iLen;
}
+ *pBuf = '\0';
+ bufLen -= iLen; /* subtract remaining length if the string was smaller! */
+ if(*pbMustBeFreed == 1)
+ free(pRes);
+ pRes = pBufStart;
+ *pbMustBeFreed = 1;
}
- /* OK, we are at the begin - now let's copy... */
- bufLen = iLen;
- while(*pSb && iLen) {
- *pBuf++ = *pSb;
- ++pSb;
- --iLen;
- }
- *pBuf = '\0';
- bufLen -= iLen; /* subtract remaining length if the string was smaller! */
- if(*pbMustBeFreed == 1)
- free(pRes);
- pRes = pBufStart;
- *pbMustBeFreed = 1;
#ifdef FEATURE_REGEXP
} else {
/* Check for regular expressions */
if (pTpe->data.field.has_regex != 0) {
if (pTpe->data.field.has_regex == 2)
/* Could not compile regex before! */
- return "**NO MATCH** **BAD REGULAR EXPRESSION**";
+ return UCHAR_CONSTANT("**NO MATCH** **BAD REGULAR EXPRESSION**");
dbgprintf("string to match for regex is: %s\n", pRes);
@@ -2429,7 +2442,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
while(!bFound) {
int iREstat;
- iREstat = regexp.regexec(&pTpe->data.field.re, pRes + iOffs, nmatch, pmatch, 0);
+ iREstat = regexp.regexec(&pTpe->data.field.re, (char*)(pRes + iOffs), nmatch, pmatch, 0);
dbgprintf("regexec return is %d\n", iREstat);
if(iREstat == 0) {
if(pmatch[0].rm_so == -1) {
@@ -2457,11 +2470,11 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pbMustBeFreed = 0;
}
if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR)
- return "**NO MATCH**";
+ return UCHAR_CONSTANT("**NO MATCH**");
else if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_ZERO)
- return "0";
+ return UCHAR_CONSTANT("0");
else
- return "";
+ return UCHAR_CONSTANT("");
}
} else {
/* Match- but did it match the one we wanted? */
@@ -2473,24 +2486,24 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pbMustBeFreed = 0;
}
if(pTpe->data.field.nomatchAction == TPL_REGEX_NOMATCH_USE_DFLTSTR)
- return "**NO MATCH**";
+ return UCHAR_CONSTANT("**NO MATCH**");
else
- return "";
+ return UCHAR_CONSTANT("");
}
}
/* OK, we have a usable match - we now need to malloc pB */
int iLenBuf;
- char *pB;
+ uchar *pB;
iLenBuf = pmatch[pTpe->data.field.iSubMatchToUse].rm_eo
- pmatch[pTpe->data.field.iSubMatchToUse].rm_so;
- pB = (char *) malloc((iLenBuf + 1) * sizeof(char));
+ pB = malloc((iLenBuf + 1) * sizeof(uchar));
if (pB == NULL) {
if (*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY ALLOCATING pBuf**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
/* Lets copy the matched substring to the buffer */
@@ -2513,7 +2526,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
free(pRes);
*pbMustBeFreed = 0;
}
- return "***REGEXP NOT AVAILABLE***";
+ return UCHAR_CONSTANT("***REGEXP NOT AVAILABLE***");
}
}
#endif /* #ifdef FEATURE_REGEXP */
@@ -2525,7 +2538,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
uchar cFirst = *pRes; /* save first char */
if(*pbMustBeFreed == 1)
free(pRes);
- pRes = (cFirst == ' ') ? "" : " ";
+ pRes = (cFirst == ' ') ? UCHAR_CONSTANT("") : UCHAR_CONSTANT(" ");
bufLen = (cFirst == ' ') ? 0 : 1;
*pbMustBeFreed = 0;
}
@@ -2537,21 +2550,21 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(pTpe->data.field.eCaseConv != tplCaseConvNo) {
/* we need to obtain a private copy */
if(bufLen == -1)
- bufLen = strlen(pRes);
- char *pBStart;
- char *pB;
- char *pSrc;
+ bufLen = ustrlen(pRes);
+ uchar *pBStart;
+ uchar *pB;
+ uchar *pSrc;
pBStart = pB = malloc((bufLen + 1) * sizeof(char));
if(pB == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
pSrc = pRes;
while(*pSrc) {
*pB++ = (pTpe->data.field.eCaseConv == tplCaseConvUpper) ?
- (char)toupper((int)*pSrc) : (char)tolower((int)*pSrc);
+ (uchar)toupper((int)*pSrc) : (uchar)tolower((int)*pSrc);
/* currently only these two exist */
++pSrc;
}
@@ -2575,10 +2588,10 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
if(pTpe->data.field.options.bDropCC) {
int iLenBuf = 0;
- char *pSrc = pRes;
- char *pDstStart;
- char *pDst;
- char bDropped = 0;
+ uchar *pSrc = pRes;
+ uchar *pDstStart;
+ uchar *pDst;
+ uchar bDropped = 0;
while(*pSrc) {
if(!iscntrl((int) *pSrc++))
@@ -2593,7 +2606,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
for(pSrc = pRes; *pSrc; pSrc++) {
if(!iscntrl((int) *pSrc))
@@ -2607,9 +2620,9 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pbMustBeFreed = 1;
}
} else if(pTpe->data.field.options.bSpaceCC) {
- char *pSrc;
- char *pDstStart;
- char *pDst;
+ uchar *pSrc;
+ uchar *pDstStart;
+ uchar *pDst;
if(*pbMustBeFreed == 1) {
/* in this case, we already work on dynamic
@@ -2623,13 +2636,13 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
} else {
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
pDst = pDstStart = malloc(bufLen + 1);
if(pDst == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
for(pSrc = pRes; *pSrc; pSrc++) {
if(iscntrl((int) *pSrc))
@@ -2649,7 +2662,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
int iNumCC = 0;
int iLenBuf = 0;
- char *pB;
+ uchar *pB;
for(pB = pRes ; *pB ; ++pB) {
++iLenBuf;
@@ -2659,21 +2672,21 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(iNumCC > 0) { /* if 0, there is nothing to escape, so we are done */
/* OK, let's do the escaping... */
- char *pBStart;
- char szCCEsc[8]; /* buffer for escape sequence */
+ uchar *pBStart;
+ uchar szCCEsc[8]; /* buffer for escape sequence */
int i;
iLenBuf += iNumCC * 4;
- pBStart = pB = malloc((iLenBuf + 1) * sizeof(char));
+ pBStart = pB = malloc((iLenBuf + 1) * sizeof(uchar));
if(pB == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
while(*pRes) {
if(iscntrl((int) *pRes)) {
- snprintf(szCCEsc, sizeof(szCCEsc), "#%3.3d", *pRes);
+ snprintf((char*)szCCEsc, sizeof(szCCEsc), "#%3.3d", *pRes);
for(i = 0 ; i < 4 ; ++i)
*pB++ = szCCEsc[i];
} else {
@@ -2697,10 +2710,10 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(pTpe->data.field.options.bSecPathDrop || pTpe->data.field.options.bSecPathReplace) {
if(pTpe->data.field.options.bSecPathDrop) {
int iLenBuf = 0;
- char *pSrc = pRes;
- char *pDstStart;
- char *pDst;
- char bDropped = 0;
+ uchar *pSrc = pRes;
+ uchar *pDstStart;
+ uchar *pDst;
+ uchar bDropped = 0;
while(*pSrc) {
if(*pSrc++ != '/')
@@ -2715,7 +2728,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
for(pSrc = pRes; *pSrc; pSrc++) {
if(*pSrc != '/')
@@ -2729,9 +2742,9 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*pbMustBeFreed = 1;
}
} else {
- char *pSrc;
- char *pDstStart;
- char *pDst;
+ uchar *pSrc;
+ uchar *pDstStart;
+ uchar *pDst;
if(*pbMustBeFreed == 1) {
/* here, again, we can modify the string as we already obtained
@@ -2745,13 +2758,13 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
} else {
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
pDst = pDstStart = malloc(bufLen + 1);
if(pDst == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
for(pSrc = pRes; *pSrc; pSrc++) {
if(*pSrc == '/')
@@ -2771,19 +2784,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
/* check for "." and ".." (note the parenthesis in the if condition!) */
if((*pRes == '.') && (*(pRes + 1) == '\0' || (*(pRes + 1) == '.' && *(pRes + 2) == '\0'))) {
- char *pTmp = pRes;
+ uchar *pTmp = pRes;
if(*(pRes + 1) == '\0')
- pRes = "_";
+ pRes = UCHAR_CONSTANT("_");
else
- pRes = "_.";;
+ pRes = UCHAR_CONSTANT("_.");;
if(*pbMustBeFreed == 1)
free(pTmp);
*pbMustBeFreed = 0;
} else if(*pRes == '\0') {
if(*pbMustBeFreed == 1)
free(pRes);
- pRes = "_";
+ pRes = UCHAR_CONSTANT("_");
bufLen = 1;
*pbMustBeFreed = 0;
}
@@ -2794,19 +2807,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
*/
if(pTpe->data.field.options.bDropLastLF && !pTpe->data.field.options.bEscapeCC) {
int iLn;
- char *pB;
+ uchar *pB;
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
iLn = bufLen;
if(iLn > 0 && *(pRes + iLn - 1) == '\n') {
/* we have a LF! */
/* check if we need to obtain a private copy */
if(*pbMustBeFreed == 0) {
/* ok, original copy, need a private one */
- pB = malloc((iLn + 1) * sizeof(char));
+ pB = malloc((iLn + 1) * sizeof(uchar));
if(pB == NULL) {
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
memcpy(pB, pRes, iLn - 1);
pRes = pB;
@@ -2825,19 +2838,19 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
if(pTpe->data.field.options.bCSV) {
/* we need to obtain a private copy, as we need to at least add the double quotes */
int iBufLen;
- char *pBStart;
- char *pDst;
- char *pSrc;
+ uchar *pBStart;
+ uchar *pDst;
+ uchar *pSrc;
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
iBufLen = bufLen;
/* the malloc may be optimized, we currently use the worst case... */
- pBStart = pDst = malloc((2 * iBufLen + 3) * sizeof(char));
+ pBStart = pDst = malloc((2 * iBufLen + 3) * sizeof(uchar));
if(pDst == NULL) {
if(*pbMustBeFreed == 1)
free(pRes);
*pbMustBeFreed = 0;
- return "**OUT OF MEMORY**";
+ return UCHAR_CONSTANT("**OUT OF MEMORY**");
}
pSrc = pRes;
*pDst++ = '"'; /* starting quote */
@@ -2856,7 +2869,7 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
if(bufLen == -1)
- bufLen = strlen(pRes);
+ bufLen = ustrlen(pRes);
*pPropLen = bufLen;
ENDfunc
diff --git a/runtime/msg.h b/runtime/msg.h
index c20fb005..0b346f7b 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -159,7 +159,7 @@ void MsgSetMSGoffs(msg_t *pMsg, short offs);
void MsgSetRawMsgWOSize(msg_t *pMsg, char* pszRawMsg);
void MsgSetRawMsg(msg_t *pMsg, char* pszRawMsg, size_t lenMsg);
rsRetVal MsgReplaceMSG(msg_t *pThis, uchar* pszMSG, int lenMSG);
-char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
+uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propID, size_t *pPropLen, unsigned short *pbMustBeFreed);
char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
@@ -168,7 +168,7 @@ uchar *getRcvFrom(msg_t *pM);
/* TODO: remove these five (so far used in action.c) */
-char *getMSG(msg_t *pM);
+uchar *getMSG(msg_t *pM);
char *getHOSTNAME(msg_t *pM);
char *getPROCID(msg_t *pM, bool bLockMutex);
char *getAPPNAME(msg_t *pM, bool bLockMutex);
diff --git a/runtime/rule.c b/runtime/rule.c
index 3a257a90..182d616a 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -39,6 +39,7 @@
#include "vm.h"
#include "var.h"
#include "srUtils.h"
+#include "unicode-helper.h"
#include "dirty.h" /* for getFIOPName */
/* static data */
@@ -104,7 +105,7 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
{
DEFiRet;
unsigned short pbMustBeFreed;
- char *pszPropVal;
+ uchar *pszPropVal;
int bRet = 0;
size_t propLen;
vm_t *pVM = NULL;
@@ -189,12 +190,12 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
break;
case FIOP_ISEQUAL:
if(rsCStrSzStrCmp(pRule->f_filterData.prop.pCSCompValue,
- (uchar*) pszPropVal, strlen(pszPropVal)) == 0)
+ pszPropVal, ustrlen(pszPropVal)) == 0)
bRet = 1; /* process message! */
break;
case FIOP_STARTSWITH:
if(rsCStrSzStrStartsWithCStr(pRule->f_filterData.prop.pCSCompValue,
- (uchar*) pszPropVal, strlen(pszPropVal)) == 0)
+ pszPropVal, ustrlen(pszPropVal)) == 0)
bRet = 1; /* process message! */
break;
case FIOP_REGEX:
diff --git a/runtime/stream.c b/runtime/stream.c
index 00c726d9..a0571a61 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -48,37 +48,25 @@
#include "stream.h"
#include "unicode-helper.h"
#include "module-template.h"
-#include "apc.h"
+#include <sys/prctl.h>
+
+#define inline
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
-DEFobjCurrIf(apc)
/* forward definitions */
static rsRetVal strmFlush(strm_t *pThis);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
+static void *asyncWriterThread(void *pPtr);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
/* methods */
-/* async flush apc handler
- */
-static void
-flushApc(void *param1, void __attribute__((unused)) *param2)
-{
- DEFVARS_mutexProtection_uncond;
- strm_t *pThis = (strm_t*) param1;
- ISOBJ_TYPE_assert(pThis, strm);
-
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- strmFlush(pThis);
- pThis->apcRequested = 0;
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
-}
-
-
/* Try to resolve a size limit situation. This is used to support custom-file size handlers
* for omfile. It first runs the command, and then checks if we are still above the size
* treshold. Note that this works only with single file names, NOT with circular names.
@@ -139,10 +127,11 @@ resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName)
finalize_it:
if(iRet != RS_RET_OK) {
- if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE)
- dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName);
- else
- dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet);
+ if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) {
+ DBGPRINTF("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName);
+ } else {
+ DBGPRINTF("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet);
+ }
pThis->bDisabled = 1;
}
@@ -280,6 +269,23 @@ finalize_it:
}
+/* wait for the output writer thread to be done. This must be called before actions
+ * that require data to be persisted. May be called in non-async mode and is a null
+ * operation than. Must be called with the mutex locked.
+ */
+static inline void
+strmWaitAsyncWriterDone(strm_t *pThis)
+{
+ BEGINfunc
+ if(pThis->bAsyncWrite) {
+ /* awake writer thread and make it write out everything */
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut);
+ }
+ ENDfunc
+}
+
+
/* close a strm file
* Note that the bDeleteOnClose flag is honored. If it is set, the file will be
* deleted after close. This is in support for the qRead thread.
@@ -292,8 +298,15 @@ static rsRetVal strmCloseFile(strm_t *pThis)
ASSERT(pThis->fd != -1);
dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd);
- if(pThis->tOperationsMode != STREAMMODE_READ)
- strmFlush(pThis);
+ if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) {
+ pThis->bInClose = 1;
+ if(pThis->bAsyncWrite) {
+ strmFlush(pThis);
+ } else {
+ strmWaitAsyncWriterDone(pThis);
+ }
+ pThis->bInClose = 0;
+ }
close(pThis->fd);
pThis->fd = -1;
@@ -569,11 +582,11 @@ ENDobjConstruct(strm)
static rsRetVal strmConstructFinalize(strm_t *pThis)
{
rsRetVal localRet;
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
- CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
pThis->iBufPtrMax = 0; /* results in immediate read request */
if(pThis->iZipLevel) { /* do we need a zip buf? */
localRet = objUse(zlibw, LM_ZLIBW_FILENAME);
@@ -601,9 +614,28 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
- /* if we should call flush apc's, we need a mutex */
+ /* if we have a flush interval, we need to do async writes in any case */
if(pThis->iFlushInterval != 0) {
+ pThis->bAsyncWrite = 1;
+ }
+
+ /* if we work asynchronously, we need a couple of synchronization objects */
+ if(pThis->bAsyncWrite) {
pthread_mutex_init(&pThis->mut, 0);
+ pthread_cond_init(&pThis->notFull, 0);
+ pthread_cond_init(&pThis->notEmpty, 0);
+ pthread_cond_init(&pThis->isEmpty, 0);
+ pThis->iCnt = pThis->iEnq = pThis->iDeq = 0;
+ for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
+ CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ }
+ pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
+ pThis->bStopWriter = 0;
+ if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
+ DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
+ } else {
+ /* we work synchronously, so we need to alloc a fixed pIOBuf */
+ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
}
finalize_it:
@@ -611,12 +643,33 @@ finalize_it:
}
+/* stop the writer thread (we MUST be runnnig asynchronously when this method
+ * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06
+ */
+static inline void
+stopWriter(strm_t *pThis)
+{
+ BEGINfunc
+ pThis->bStopWriter = 1;
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+ pthread_join(pThis->writerThreadID, NULL);
+ ENDfunc
+}
+
+
/* destructor for the strm object */
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
+ int i;
CODESTARTobjDestruct(strm)
+ if(pThis->bAsyncWrite)
+ /* Note: mutex will be unlocked in stopWriter! */
+ d_pthread_mutex_lock(&pThis->mut);
+
if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
+dbgprintf("XXX: destruct stream %p\n", pThis);
/* ... then free resources */
if(pThis->fd != -1)
strmCloseFile(pThis);
@@ -625,16 +678,23 @@ CODESTARTobjDestruct(strm)
objRelease(zlibw, LM_ZLIBW_FILENAME);
}
- if(pThis->iFlushInterval != 0) {
- // TODO: check if there is an apc and remove it!
- pthread_mutex_destroy(&pThis->mut);
- }
-
free(pThis->pszDir);
- free(pThis->pIOBuf);
free(pThis->pZipBuf);
free(pThis->pszCurrFName);
free(pThis->pszFName);
+
+ if(pThis->bAsyncWrite) {
+ stopWriter(pThis);
+ pthread_mutex_destroy(&pThis->mut);
+ pthread_cond_destroy(&pThis->notFull);
+ pthread_cond_destroy(&pThis->notEmpty);
+ pthread_cond_destroy(&pThis->isEmpty);
+ for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
+ free(pThis->asyncBuf[i].pBuf);
+ }
+ } else {
+ free(pThis->pIOBuf);
+ }
ENDobjDestruct(strm)
@@ -650,6 +710,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
if(pThis->fd == -1)
FINALIZE;
+ /* wait for output to be empty, so that our counts are correct */
+ strmWaitAsyncWriterDone(pThis);
+
if(pThis->iCurrOffs >= pThis->iMaxFileSize) {
dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n",
(long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
@@ -730,12 +793,164 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
pWriteBuf += iWritten;
} while(lenBuf > 0); /* Warning: do..while()! */
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
+
finalize_it:
*pLenBuf = iTotalWritten;
RETiRet;
}
+
+/* write memory buffer to a stream object.
+ */
+static inline rsRetVal
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->iZipLevel) {
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ } else {
+ /* write without zipping */
+ CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to "do" an async write call, what primarily means that
+ * the data is handed over to the writer thread (which will then do the actual write
+ * in parallel). Note that the stream mutex has already been locked by the
+ * strmWrite...() calls. Also note that we always have only a single producer,
+ * so we can simply serially assign the next free buffer to it and be sure that
+ * the very some producer comes back in sequence to submit the then-filled buffers.
+ * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06
+ */
+static inline rsRetVal
+doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf);
+ while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
+ d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
+
+ pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf;
+ pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf;
+
+ pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */
+ if(++pThis->iCnt == 1)
+ pthread_cond_signal(&pThis->notEmpty);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* schedule writing to the stream. Depending on our concurrency settings,
+ * this either directly writes to the stream or schedules writing via
+ * the background thread. -- rgerhards, 2009-07-07
+ */
+static rsRetVal
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->bAsyncWrite) {
+ CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
+ } else {
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ }
+
+ pThis->iBufPtr = 0; /* we are at the begin of a new buffer */
+
+finalize_it:
+ RETiRet;
+}
+
+
+
+/* This is the writer thread for asynchronous mode.
+ * -- rgerhards, 2009-07-06
+ */
+static void*
+asyncWriterThread(void *pPtr)
+{
+ int iDeq;
+ struct timespec t;
+ bool bTimedOut = 0;
+ strm_t *pThis = (strm_t*) pPtr;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ BEGINfunc
+ if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) {
+ DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
+ }
+
+ while(1) { /* loop broken inside */
+ d_pthread_mutex_lock(&pThis->mut);
+ while(pThis->iCnt == 0) {
+ if(pThis->bStopWriter) {
+ pthread_cond_broadcast(&pThis->isEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+ goto finalize_it; /* break main loop */
+ }
+ if(bTimedOut && pThis->iBufPtr > 0) {
+RUNLOG_STR("XXX: we had a timeout in stream writer");
+ /* if we timed out, we need to flush pending data */
+ strmFlush(pThis);
+ bTimedOut = 0;
+ continue; /* now we should have data */
+ }
+ bTimedOut = 0;
+ timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */
+ if(pThis->bDoTimedWait) {
+ if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) {
+ int err = errno;
+ if(err == ETIMEDOUT) {
+ bTimedOut = 1;
+ } else {
+ bTimedOut = 1;
+ char errStr[1024];
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n",
+ err, errStr);
+ }
+ }
+ } else {
+ d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
+ }
+ }
+
+ bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
+
+ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+ // TODO: error check????? 2009-07-06
+
+ --pThis->iCnt;
+ if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) {
+ pthread_cond_signal(&pThis->notFull);
+ if(pThis->iCnt == 0)
+ pthread_cond_broadcast(&pThis->isEmpty);
+ }
+ d_pthread_mutex_unlock(&pThis->mut);
+ }
+
+finalize_it:
+ ENDfunc
+ return NULL; /* to keep pthreads happy */
+}
+
+
/* sync the file to disk, so that any unwritten data is persisted. This
* also syncs the directory and thus makes sure that the file survives
* fatal failure. Note that we do NOT return an error status if the
@@ -789,9 +1004,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iWritten = lenBuf;
CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
- pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
/* update user counter, if provided */
if(pThis->pUsrWCntr != NULL)
@@ -839,7 +1052,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
/* see note in file header for the params we use with deflateInit2() */
zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
if(zRet != Z_OK) {
- dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet);
+ DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
ABORT_FINALIZE(RS_RET_ZLIB_ERR);
}
@@ -849,11 +1062,11 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
/* run deflate() on input until output buffer not full, finish
compression if all of source has been read in */
do {
- dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
zstrm.avail_out = pThis->sIOBufSize;
zstrm.next_out = pThis->pZipBuf;
zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
- dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
} while (zstrm.avail_out == 0);
@@ -862,7 +1075,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
zRet = zlibw.DeflateEnd(&zstrm);
if(zRet != Z_OK) {
- dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet);
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
ABORT_FINALIZE(RS_RET_ZLIB_ERR);
}
@@ -871,33 +1084,6 @@ finalize_it:
}
-/* write memory buffer to a stream object.
- * To support direct writes of large objects, this method may be called
- * with a buffer pointing to some region other than the stream buffer itself.
- * However, in that case the stream buffer must be empty (strmFlush() has to
- * be called before), because we would otherwise mess up with the sequence
- * inside the stream. -- rgerhards, 2008-01-10
- */
-static rsRetVal
-strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
-{
- DEFiRet;
-
- ASSERT(pThis != NULL);
- ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
-
- if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
- } else {
- /* write without zipping */
- CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
@@ -911,7 +1097,7 @@ strmFlush(strm_t *pThis)
dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr);
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
RETiRet;
@@ -964,6 +1150,12 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
ASSERT(pThis != NULL);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
+ if(pThis->bDisabled)
+ ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis));
@@ -973,11 +1165,18 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
pThis->iBufPtr++;
finalize_it:
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_unlock(&pThis->mut);
+
RETiRet;
}
-/* write an integer value (actually a long) to a stream object */
+/* write an integer value (actually a long) to a stream object
+ * Note that we do not need to lock the mutex here, because we call
+ * strmWrite(), which does the lock (aka: we must not lock it, else we
+ * would run into a recursive lock, resulting in a deadlock!)
+ */
static rsRetVal strmWriteLong(strm_t *pThis, long i)
{
DEFiRet;
@@ -993,89 +1192,65 @@ finalize_it:
}
-/* schedule an Apc flush request.
- * rgerhards, 2009-06-15
- */
-static inline rsRetVal
-scheduleFlushRequest(strm_t *pThis)
-{
- apc_t *pApc;
- DEFiRet;
-
- if(!pThis->apcRequested) {
- /* we do an request only if none is yet pending */
- pThis->apcRequested = 1;
- // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID));
-dbgprintf("XXX: requesting to add apc!\n");
- CHKiRet(apc.Construct(&pApc));
- CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc));
- CHKiRet(apc.SetParam1(pApc, pThis));
- CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID));
- }
-
-finalize_it:
- RETiRet;
-}
-
-
-/* write memory buffer to a stream object
+/* write memory buffer to a stream object.
+ * process the data in chunks and copy it over to our buffer. The caller-provided data
+ * may theoritically be larger than our buffer. In that case, we do multiple copies. One
+ * may argue if it were more efficient to write out the caller-provided buffer in that case
+ * and earlier versions of rsyslog did this. However, this introduces a lot of complexity
+ * inside the buffered writer and potential performance bottlenecks when trying to solve
+ * it. Now keep in mind that we actually do (almost?) never have a case where the
+ * caller-provided buffer is larger than our one. So instead of optimizing a case
+ * which normally does not exist, we expect some degradation in its case but make us
+ * perform better in the regular cases. -- rgerhards, 2009-07-07
*/
static rsRetVal
strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
- DEFVARS_mutexProtection_uncond;
DEFiRet;
- size_t iPartial;
+ size_t iWrite;
+ size_t iOffset;
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
-dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
-RUNLOG_VAR("%d", pThis->iFlushInterval);
- if(pThis->iFlushInterval != 0) {
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
- }
-
- /* check if the to-be-written data is larger than our buffer size */
- if(lenBuf >= pThis->sIOBufSize) {
- /* it is - so we do a direct write, that is most efficient.
- * TODO: is it really? think about disk block sizes!
- */
- CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */
- CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf));
- } else {
- /* data fits into a buffer - we just need to see if it
- * fits into the current buffer...
- */
- if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) {
- /* nope, so we must split it */
- iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
- if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial);
- pThis->iBufPtr += iPartial;
- }
+ iOffset = 0;
+ do {
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
- memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial);
- pThis->iBufPtr = lenBuf - iPartial;
- } else {
- /* we have space, so we simply copy over the string */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
- pThis->iBufPtr += lenBuf;
}
- }
-
- /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at
- * termination. For Zip mode, it could be fatal if we write after each record.
+ iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
+ if(iWrite > lenBuf)
+ iWrite = lenBuf;
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite);
+ pThis->iBufPtr += iWrite;
+ iOffset += iWrite;
+ lenBuf -= iWrite;
+ } while(lenBuf > 0);
+
+ /* now check if the buffer right at the end of the write is full and, if so,
+ * write it. This seems more natural than waiting (hours?) for the next message...
*/
- if(pThis->iFlushInterval != 0)
- scheduleFlushRequest(pThis);
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
+ CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ }
finalize_it:
- if(pThis->iFlushInterval != 0) {
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ if(pThis->bAsyncWrite) {
+ if(pThis->bDoTimedWait == 0) {
+ /* we potentially have a partial buffer, so re-activate the
+ * writer thread that it can set and pick up timeouts.
+ */
+ pThis->bDoTimedWait = 1;
+ pthread_cond_signal(&pThis->notEmpty);
+ }
+ d_pthread_mutex_unlock(&pThis->mut);
}
RETiRet;
@@ -1390,7 +1565,6 @@ ENDobjQueryInterface(strm)
*/
BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
- CHKiRet(objUse(apc, CORE_COMPONENT));
OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty);
diff --git a/runtime/stream.h b/runtime/stream.h
index ac003c7b..cb368835 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -87,6 +87,7 @@ typedef enum { /* when extending, do NOT change existing modes! */
STREAMMODE_WRITE_APPEND = 4
} strmMode_t;
+#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */
/* The strm_t data structure */
typedef struct strm_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
@@ -112,17 +113,32 @@ typedef struct strm_s {
int fd; /* the file descriptor, -1 if closed */
int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */
uchar *pszCurrFName; /* name of current file (if open) */
- uchar *pIOBuf; /* io Buffer */
+ uchar *pIOBuf; /* the iobuffer currently in use to gather data */
size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
size_t iBufPtr; /* pointer into current buffer */
int iUngetC; /* char set via UngetChar() call or -1 if none set */
bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */
+ bool bInClose; /* used to break "deadly close loops", tells us we are already inside a close */
int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */
Bytef *pZipBuf;
/* support for async flush procesing */
+ bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
+ bool bStopWriter; /* shall writer thread terminate? */
+ bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
int iFlushInterval; /* flush in which interval - 0, no flushing */
apc_id_t apcID; /* id of current Apc request (used for cancelling) */
pthread_mutex_t mut;/* mutex for flush in async mode */
+ pthread_cond_t notFull;
+ pthread_cond_t notEmpty;
+ pthread_cond_t isEmpty;
+ short iEnq;
+ short iDeq;
+ short iCnt; /* current nbr of elements in buffer */
+ struct {
+ uchar *pBuf;
+ size_t lenBuf;
+ } asyncBuf[STREAM_ASYNC_NUMBUFS];
+ pthread_t writerThreadID;
int apcRequested; /* is an apc Requested? */
/* support for omfile size-limiting commands, special counters, NOT persisted! */
off_t iSizeLimit; /* file size limit, 0 = no limit */
@@ -130,6 +146,7 @@ typedef struct strm_s {
bool bIsTTY; /* is this a tty file? */
} strm_t;
+
/* interfaces */
BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(strm_t **ppThis);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 87c3b324..596ff866 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -54,6 +54,7 @@
#include "wtp.h"
#include "wti.h"
#include "obj.h"
+#include "unicode-helper.h"
#include "glbl.h"
/* static data */
@@ -422,6 +423,8 @@ wtpWrkrExecCancelCleanup(void *arg)
static void *
wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
{
+ uchar *pszDbgHdr;
+ uchar thrdName[32] = "rs:";
DEFiRet;
DEFVARS_mutexProtection;
wti_t *pWti = (wti_t*) arg;
@@ -437,7 +440,9 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
# if HAVE_PRCTL && defined PR_SET_NAME
/* set thread name - we ignore if the call fails, has no harsh consequences... */
- if(prctl(PR_SET_NAME, wtpGetDbgHdr(pThis), 0, 0, 0) != 0) {
+ pszDbgHdr = wtpGetDbgHdr(pThis);
+ ustrncpy(thrdName+3, pszDbgHdr, 20);
+ if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) {
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", wtpGetDbgHdr(pThis));
}
# endif
diff --git a/tcpclt.c b/tcpclt.c
index c53f00f7..617aaef6 100644
--- a/tcpclt.c
+++ b/tcpclt.c
@@ -297,6 +297,12 @@ Send(tcpclt_t *pThis, void *pData, char *msg, size_t len)
CHKiRet(TCPSendBldFrame(pThis, &msg, &len, &bMsgMustBeFreed));
+ if(pThis->iRebindInterval > 0 && ++pThis->iNumMsgs == pThis->iRebindInterval) {
+ /* we need to rebind, and use the retry logic for this*/
+ CHKiRet(pThis->prepRetryFunc(pData)); /* try to recover */
+ pThis->iNumMsgs = 0;
+ }
+
while(!bDone) { /* loop is broken when send succeeds or error occurs */
CHKiRet(pThis->initFunc(pData));
iRet = pThis->sendFunc(pData, msg, len);
@@ -388,6 +394,13 @@ SetFraming(tcpclt_t *pThis, TCPFRAMINGMODE framing)
pThis->tcp_framing = framing;
RETiRet;
}
+static rsRetVal
+SetRebindInterval(tcpclt_t *pThis, int iRebindInterval)
+{
+ DEFiRet;
+ pThis->iRebindInterval = iRebindInterval;
+ RETiRet;
+}
/* Standard-Constructor
@@ -445,6 +458,7 @@ CODESTARTobjQueryInterface(tcpclt)
pIf->SetSendFrame = SetSendFrame;
pIf->SetSendPrepRetry = SetSendPrepRetry;
pIf->SetFraming = SetFraming;
+ pIf->SetRebindInterval = SetRebindInterval;
finalize_it:
ENDobjQueryInterface(tcpclt)
diff --git a/tcpclt.h b/tcpclt.h
index 1d704044..5a8eba75 100644
--- a/tcpclt.h
+++ b/tcpclt.h
@@ -36,6 +36,8 @@ typedef struct tcpclt_s {
short bResendLastOnRecon; /* should the last message be resent on a successful reconnect? */
size_t lenPrevMsg;
/* session specific callbacks */
+ int iRebindInterval; /* how often should the send socket be rebound? */
+ int iNumMsgs; /* number of messages during current "rebind session" */
rsRetVal (*initFunc)(void*);
rsRetVal (*sendFunc)(void*, char*, size_t);
rsRetVal (*prepRetryFunc)(void*);
@@ -55,8 +57,10 @@ BEGINinterface(tcpclt) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetSendFrame)(tcpclt_t*, rsRetVal (*)(void*, char*, size_t));
rsRetVal (*SetSendPrepRetry)(tcpclt_t*, rsRetVal (*)(void*));
rsRetVal (*SetFraming)(tcpclt_t*, TCPFRAMINGMODE framing);
+ /* v3, 2009-07-14*/
+ rsRetVal (*SetRebindInterval)(tcpclt_t*, int iRebindInterval);
ENDinterface(tcpclt)
-#define tcpcltCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */
+#define tcpcltCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/tests/diag.sh b/tests/diag.sh
index b2bd13ac..13bb877d 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -9,7 +9,7 @@
#valgrind="valgrind --tool=drd --log-fd=1"
#valgrind="valgrind --tool=helgrind --log-fd=1"
#set -o xtrace
-#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+#export RSYSLOG_DEBUG="debug nostdout printmutexaction"
#export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
diff --git a/tests/killrsyslog.sh b/tests/killrsyslog.sh
index b1be757b..c9b6e0ac 100755
--- a/tests/killrsyslog.sh
+++ b/tests/killrsyslog.sh
@@ -2,6 +2,6 @@
if [ -e "rsyslog.pid" ]
then
echo rsyslog.pid exists, trying to shut down rsyslogd process `cat rsyslog.pid`.
- kill `cat rsyslog.pid`
+ kill -9 `cat rsyslog.pid`
sleep 1
fi
diff --git a/tests/nettester.c b/tests/nettester.c
index b5da8ee8..209c2a6f 100644
--- a/tests/nettester.c
+++ b/tests/nettester.c
@@ -142,7 +142,7 @@ tcpSend(char *buf, int lenBuf)
if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
break;
} else {
- if(retries++ == 30) {
+ if(retries++ == 50) {
++iFailed;
fprintf(stderr, "connect() failed\n");
return(1);
diff --git a/tests/tcpflood.c b/tests/tcpflood.c
index 2ca796ca..0439e33e 100644
--- a/tests/tcpflood.c
+++ b/tests/tcpflood.c
@@ -61,6 +61,7 @@ int openConn(int *fd)
{
int sock;
struct sockaddr_in addr;
+ int retries = 0;
if((sock=socket(AF_INET, SOCK_STREAM, 0))==-1) {
perror("socket()");
@@ -74,11 +75,19 @@ int openConn(int *fd)
fprintf(stderr, "inet_aton() failed\n");
return(1);
}
- if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
- perror("connect()");
- fprintf(stderr, "connect() failed\n");
- return(1);
- }
+ while(1) { /* loop broken inside */
+ if(connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
+ break;
+ } else {
+ if(retries++ == 50) {
+ perror("connect()");
+ fprintf(stderr, "connect() failed\n");
+ return(1);
+ } else {
+ usleep(100000); /* ms = 1000 us! */
+ }
+ }
+ }
*fd = sock;
return 0;
diff --git a/tools/omfile.c b/tools/omfile.c
index 82944a96..bb12b4b6 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -401,12 +401,17 @@ prepareFile(instanceData *pData, uchar *newFileName)
CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName)));
CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel));
CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize));
- CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval));
CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND));
CHKiRet(strm.SettOpenMode(pData->pStrm, fCreateMode));
CHKiRet(strm.SetbSync(pData->pStrm, pData->bSyncFile));
CHKiRet(strm.SetsType(pData->pStrm, STREAMTYPE_FILE_SINGLE));
CHKiRet(strm.SetiSizeLimit(pData->pStrm, pData->iSizeLimit));
+ /* set the flush interval only if we actually use it - otherwise it will activate
+ * async processing, which is a real performance waste if we do not do buffered
+ * writes! -- rgerhards, 2009-07-06
+ */
+ if(!pData->bFlushOnTXEnd)
+ CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval));
if(pData->pszSizeLimitCmd != NULL)
CHKiRet(strm.SetpszSizeLimitCmd(pData->pStrm, ustrdup(pData->pszSizeLimitCmd)));
CHKiRet(strm.ConstructFinalize(pData->pStrm));
diff --git a/tools/omfwd.c b/tools/omfwd.c
index d207cce5..fe65f515 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -90,6 +90,7 @@ typedef struct _instanceData {
char *port;
int protocol;
int iUDPRebindInterval; /* rebind interval */
+ int iTCPRebindInterval; /* rebind interval */
int nXmit; /* number of transmissions since last (re-)bind */
# define FORW_UDP 0
# define FORW_TCP 1
@@ -104,6 +105,7 @@ static short iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 m
static short bResendLastOnRecon = 0; /* should the last message be re-sent on a successful reconnect? */
static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */
static int iUDPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */
+static int iTCPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */
static permittedPeers_t *pPermPeers = NULL;
@@ -643,6 +645,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* copy over config data as needed */
pData->iUDPRebindInterval = iUDPRebindInterval;
+ pData->iTCPRebindInterval = iTCPRebindInterval;
/* process template */
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS,
@@ -657,6 +660,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
CHKiRet(tcpclt.SetSendFrame(pData->pTCPClt, TCPSendFrame));
CHKiRet(tcpclt.SetSendPrepRetry(pData->pTCPClt, TCPSendPrepRetry));
CHKiRet(tcpclt.SetFraming(pData->pTCPClt, tcp_framing));
+ CHKiRet(tcpclt.SetRebindInterval(pData->pTCPClt, pData->iTCPRebindInterval));
pData->iStrmDrvrMode = iStrmDrvrMode;
if(pszStrmDrvr != NULL)
CHKmalloc(pData->pszStrmDrvr = (uchar*)strdup((char*)pszStrmDrvr));
@@ -728,6 +732,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iStrmDrvrMode = 0;
bResendLastOnRecon = 0;
iUDPRebindInterval = 0;
+ iTCPRebindInterval = 0;
return RS_RET_OK;
}
@@ -742,6 +747,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(net,LM_NET_FILENAME));
CHKiRet(regCfSysLineHdlr((uchar *)"actionforwarddefaulttemplate", 0, eCmdHdlrGetWord, NULL, &pszTplName, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionsendtcprebindinterval", 0, eCmdHdlrInt, NULL, &iTCPRebindInterval, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionsendudprebindinterval", 0, eCmdHdlrInt, NULL, &iUDPRebindInterval, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriver", 0, eCmdHdlrGetWord, NULL, &pszStrmDrvr, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdrivermode", 0, eCmdHdlrInt, NULL, &iStrmDrvrMode, NULL));
diff --git a/tools/syslogd.c b/tools/syslogd.c
index e52a524a..3846e6a9 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -2550,8 +2550,8 @@ mainloop(void)
* powertop, for example). In that case, we primarily wait for a signal,
* but a once-a-day wakeup should be quite acceptable. -- rgerhards, 2008-06-09
*/
- //tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/;
- tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */
+ tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/;
+ //tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */
tvSelectTimeout.tv_usec = 0;
select(1, NULL, NULL, NULL, &tvSelectTimeout);
if(bFinished)
@@ -2586,7 +2586,7 @@ mainloop(void)
bHadHUP = 0;
continue;
}
- execScheduled(); /* handle Apc calls (if any) */
+ // TODO: remove execScheduled(); /* handle Apc calls (if any) */
}
ENDfunc
}