summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-06-21 15:42:11 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2011-06-21 15:42:11 +0200
commitb71ef4abad365f95cf7adf3df14083940d531f1f (patch)
treec5744b7c3adb4e7ede7a282949cbfe41e414c813
parentb1a905c5eb7833d661be4a910697f49deb34c640 (diff)
parent86225089f2d0e82deb368e1688464e8ba84d24cf (diff)
downloadrsyslog-b71ef4abad365f95cf7adf3df14083940d531f1f.tar.gz
rsyslog-b71ef4abad365f95cf7adf3df14083940d531f1f.tar.xz
rsyslog-b71ef4abad365f95cf7adf3df14083940d531f1f.zip
Merge branch 'v5-stable' into beta
Conflicts: ChangeLog configure.ac doc/manual.html tests/diag.sh tests/sndrcv_drvr.sh
-rw-r--r--ChangeLog32
-rw-r--r--action.c176
-rw-r--r--action.h2
-rw-r--r--doc/rsyslog_conf_global.html16
-rw-r--r--runtime/datetime.c2
-rw-r--r--runtime/glbl.c23
-rw-r--r--runtime/queue.c12
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--runtime/rule.c11
-rw-r--r--tests/Makefile.am14
-rwxr-xr-xtests/diag.sh6
-rwxr-xr-xtests/failover-async.sh2
-rwxr-xr-xtests/failover-double.sh12
-rwxr-xr-xtests/manytcp-too-few-tls.sh6
-rwxr-xr-xtests/sndrcv_drvr.sh42
-rwxr-xr-xtests/sndrcv_drvr_noexit.sh49
-rwxr-xr-xtests/sndrcv_failover.sh21
-rw-r--r--tests/testsuites/failover-double.conf9
-rw-r--r--tests/testsuites/sndrcv_failover_rcvr.conf11
-rw-r--r--tests/testsuites/sndrcv_failover_sender.conf13
-rw-r--r--tools/syslogd.c2
21 files changed, 355 insertions, 107 deletions
diff --git a/ChangeLog b/ChangeLog
index cd7890d8..452ecba7 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,18 @@
---------------------------------------------------------------------------
Version 6.1.9 [BETA] (rgerhards), 2011-06-14
+- bugfix: problems in failover action handling
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=254
+- bugfix: mutex was invalidly left unlocked during action processing
+ At least one case where this can occur is during thread shutdown, which
+ may be initiated by lower activity. In most cases, this is quite
+ unlikely to happen. However, if it does, data structures may be
+ corrupted which could lead to fatal failure and segfault. I detected
+ this via a testbench test, not a user report. But I assume that some
+ users may have had unreproducable aborts that were cause by this bug.
+- bugfix/improvement:$WorkDirectory now gracefully handles trailing slashes
+---------------------------------------------------------------------------
+Version 6.1.9 [BETA] (rgerhards), 2011-06-14
- bugfix: memory leak in imtcp & subsystems under some circumstances
This leak is tied to error conditions which lead to incorrect cleanup
of some data structures. [backport from v6.3]
@@ -181,10 +194,21 @@ Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-03-??
affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236
---------------------------------------------------------------------------
-Version 5.8.2 [V5-stable] (rgerhards), 2011-06-??
+Version 5.8.2 [V5-stable] (rgerhards), 2011-06-21
+- bugfix: problems in failover action handling
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=254
+- bugfix: mutex was invalidly left unlocked during action processing
+ At least one case where this can occur is during thread shutdown, which
+ may be initiated by lower activity. In most cases, this is quite
+ unlikely to happen. However, if it does, data structures may be
+ corrupted which could lead to fatal failure and segfault. I detected
+ this via a testbench test, not a user report. But I assume that some
+ users may have had unreproducable aborts that were cause by this bug.
- bugfix: memory leak in imtcp & subsystems under some circumstances
This leak is tied to error conditions which lead to incorrect cleanup
of some data structures. [backport from v6]
+- bugfix/improvement:$WorkDirectory now gracefully handles trailing slashes
---------------------------------------------------------------------------
Version 5.8.1 [V5-stable] (rgerhards), 2011-05-19
- bugfix: invalid processing in QUEUE_FULL condition
@@ -1072,6 +1096,9 @@ Version 4.6.6 [v4-stable] (rgerhards), 2010-11-??
discarded (due to QUEUE_FULL or similar problem)
- bugfix: a slightly more informative error message when a TCP
connections is aborted
+- bugfix: timestamp was incorrectly calculated for timezones with minute
+ offset
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=271
- some improvements thanks to clang's static code analyzer
o overall cleanup (mostly unnecessary writes and otherwise unused stuff)
o bugfix: fixed a very remote problem in msg.c which could occur when
@@ -1710,6 +1737,9 @@ version before switching to this one.
Thanks to Ken for providing the patch
---------------------------------------------------------------------------
Version 3.22.4 [v3-stable] (rgerhards), 2010-??-??
+- bugfix: timestamp was incorrectly calculated for timezones with minute
+ offset
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=271
- improved some code based on clang static analyzer results
---------------------------------------------------------------------------
Version 3.22.3 [v3-stable] (rgerhards), 2010-11-24
diff --git a/action.c b/action.c
index 4e60ba58..ae833ab0 100644
--- a/action.c
+++ b/action.c
@@ -39,7 +39,35 @@
* - processAction
* - submitBatch
* - tryDoAction
- * -
+ * - ...
+ *
+ * MORE ON PROCESSING, QUEUES and FILTERING
+ * All filtering needs to be done BEFORE messages are enqueued to an
+ * action. In previous code, part of the filtering was done at the
+ * "remote end" of the action queue, which lead to problems in
+ * non-direct mode (because then things run asynchronously). In order
+ * to solve this problem once and for all, I have changed the code so
+ * that all filtering is done before enq, and processing on the
+ * dequeue side of action processing now always executes whatever is
+ * enqueued. This is the only way to handle things consistently and
+ * (as much as possible) in a queue-type agnostic way. However, it is
+ * a rather radical change, which I unfortunately needed to make from
+ * stable version 5.8.1 to 5.8.2. If new problems pop up, you now know
+ * what may be their cause. In any case, the way it is done now is the
+ * only correct one.
+ * A problem is that, under fortunate conditions, we use the current
+ * batch for the output system as well. This is very good from a performance
+ * point of view, but makes the distinction between enq and deq side of
+ * the queue a bit hard. The current idea is that the filter condition
+ * alone is checked at the deq side of the queue (seems to be unavoidable
+ * to do it that way), but all other complex conditons (like failover
+ * handling) go into the computation of the filter condition. For
+ * non-direct queues, we still enqueue only what is acutally necessary.
+ * Note that in this case the rest of the code must ensure that the filter
+ * is set to "true". While this is not perfect and not as simple as
+ * we would like to see it, it looks like the best way to tackle that
+ * beast.
+ * rgerhards, 2011-06-15
*
* Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
@@ -617,8 +645,8 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
}
if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
- DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
- getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n",
+ pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
@@ -938,27 +966,32 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
+dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ /* NOTE: do NOT extend the filter below! Anything else must be done on the
+ * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
+ */
if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC//) {
- && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
+ && pBatch->pElem[i].state != BATCH_STATE_DISC) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
- pBatch->pbShutdownImmediate);
- DBGPRINTF("action call returned %d\n", localRet);
+ pBatch->pbShutdownImmediate);
+ DBGPRINTF("action %p call returned %d\n", pAction, localRet);
/* Note: we directly modify the batch object state, because we know that
* wo do not overwrite BATCH_STATE_DISC indicators!
*/
if(localRet == RS_RET_OK) {
/* mark messages as committed */
while(iCommittedUpTo <= i) {
+ pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
/* mark messages as committed */
while(iCommittedUpTo < i) {
+ pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
pBatch->pElem[i].state = BATCH_STATE_SUB;
@@ -1041,6 +1074,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
+ DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
+ "for iRet %d\n", localRet);
submitBatch(pAction, pBatch, nElem / 2);
submitBatch(pAction, pBatch, nElem - (nElem / 2));
bDone = 1;
@@ -1230,11 +1265,13 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
/* This function builds up a batch of messages to be (later)
* submitted to the action queue.
- * Note: this function is also called from syslogd itself as part of its
- * flush processing. If so, pBatch will be NULL and idxBtch undefined.
+ * Important: this function MUST not be called with messages that are to
+ * be discarded due to their "prevWasSuspended" state. It will not check for
+ * this and submit all messages to the queue for execution. So these must
+ * be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
+actionWriteToAction(action_t *pAction)
{
msg_t *pMsgSave; /* to save current message pointer, necessary to restore
it in case it needs to be updated (e.g. repeated msgs) */
@@ -1331,35 +1368,7 @@ actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- if( pBatch != NULL
- && (pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended)) {
- /* in that case, we need to create a special batch which reflects the
- * suspended state. Otherwise, that information would be dropped inside
- * the queue engine. TODO: in later releases (v6?) create a better
- * solution than what we do here. However, for v5 this sounds much too
- * intrusive. -- rgerhardsm, 2011-03-16
- * (Code is copied over from queue.c and slightly modified)
- */
- batch_t singleBatch;
- batch_obj_t batchObj;
- int i;
- memset(&batchObj, 0, sizeof(batch_obj_t));
- memset(&singleBatch, 0, sizeof(batch_t));
- batchObj.state = BATCH_STATE_RDY;
- batchObj.pUsrp = (obj_t*) pAction->f_pMsg;
- batchObj.bPrevWasSuspended = 1;
- batchObj.bFilterOK = 1;
- singleBatch.nElem = 1; /* there always is only one in direct mode */
- singleBatch.pElem = &batchObj;
-
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch);
-
- for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
- free(batchObj.staticActStrings[i]);
- }
- } else { /* standard case, just submit */
- iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
- }
+ iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@@ -1419,7 +1428,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
* isolated messages), but back off so we'll flush less often in the future.
*/
if(getActNow(pAction) > REPEATTIME(pAction)) {
- iRet = actionWriteToAction(pAction, pBatch, idxBtch);
+ iRet = actionWriteToAction(pAction);
BACKOFF(pAction);
}
} else {/* new message, save it */
@@ -1428,7 +1437,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
*/
if(pAction->f_pMsg != NULL) {
if(pAction->f_prevcount > 0)
- actionWriteToAction(pAction, pBatch, idxBtch);
+ actionWriteToAction(pAction);
/* we do not care about iRet above - I think it's right but if we have
* some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
*/
@@ -1436,7 +1445,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
}
pAction->f_pMsg = MsgAddRef(pMsg);
/* call the output driver */
- iRet = actionWriteToAction(pAction, pBatch, idxBtch);
+ iRet = actionWriteToAction(pAction);
}
finalize_it:
@@ -1525,6 +1534,69 @@ finalize_it:
}
+/* enqueue a batch in direct mode. We have put this into its own function just to avoid
+ * cluttering the actual submit function.
+ * rgerhards, 2011-06-16
+ */
+static inline rsRetVal
+doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
+{
+ sbool FilterSave[1024];
+ sbool *pFilterSave;
+ sbool bNeedSubmit;
+ sbool bModifiedFilter;
+ int i;
+ DEFiRet;
+
+ if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) {
+ pFilterSave = FilterSave;
+ } else {
+ CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
+ }
+
+ /* note: for direct mode, we need to adjust the filter property. For non-direct
+ * this is not necessary, because in that case we enqueue only what actually needs
+ * to be processed.
+ */
+ if(pAction->bExecWhenPrevSusp) {
+ bNeedSubmit = 0;
+ bModifiedFilter = 0;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ pFilterSave[i] = pBatch->pElem[i].bFilterOK;
+ if(!pBatch->pElem[i].bPrevWasSuspended) {
+ DBGPRINTF("action enq stage: change bFilterOK to 0 due to "
+ "failover case in elem %d\n", i);
+ pBatch->pElem[i].bFilterOK = 0;
+ bModifiedFilter = 1;
+ }
+ if(pBatch->pElem[i].bFilterOK)
+ bNeedSubmit = 1;
+ DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
+ }
+ if(bNeedSubmit) {
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ } else {
+ DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
+ }
+ if(bModifiedFilter) {
+ for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
+ DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
+ /* note: clang static code analyzer reports a false positive below */
+ pBatch->pElem[i].bFilterOK = pFilterSave[i];
+ }
+ }
+ } else {
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed.
@@ -1537,13 +1609,19 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
DEFiRet;
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
- if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
- else { /* in this case, we do single submits to the queue.
+
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
+ iRet = doQueueEnqObjDirectBatch(pAction, pBatch);
+ } else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if(pBatch->pElem[i].bFilterOK) {
+ DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
+ if( pBatch->pElem[i].bFilterOK
+ && pBatch->pElem[i].state != BATCH_STATE_DISC
+ && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
}
}
@@ -1564,8 +1642,12 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
int i;
DEFiRet;
- DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
+ DBGPRINTF("Called action %p (complex case), logging to %s\n",
+ pAction, module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
diff --git a/action.h b/action.h
index 749e573e..3bda991d 100644
--- a/action.h
+++ b/action.h
@@ -100,7 +100,7 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
-rsRetVal actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch);
+rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended);
diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html
index a5d69f1d..21786a7f 100644
--- a/doc/rsyslog_conf_global.html
+++ b/doc/rsyslog_conf_global.html
@@ -279,7 +279,21 @@ default may change as uniprocessor systems become less common. [available since
<li>$PreserveFQDN [on/<b>off</b>) - if set to off (legacy default to remain compatible
to sysklogd), the domain part from a name that is within the same domain as the receiving
system is stripped. If set to on, full names are always used.</li>
-<li>$WorkDirectory &lt;name&gt; (directory for spool and other work files)</li>
+<li>$WorkDirectory &lt;name&gt; (directory for spool and other work files.
+Do <b>not</b> use trailing slashes)</li>
+<li>$UDPServerAddress &lt;IP&gt; (imudp) -- local IP
+address (or name) the UDP listens should bind to</li>
+<li>$UDPServerRun &lt;port&gt; (imudp) -- former
+-r&lt;port&gt; option, default 514, start UDP server on this
+port, "*" means all addresses</li>
+<li>$UDPServerTimeRequery &lt;nbr-of-times&gt; (imudp) -- this is a performance
+optimization. Getting the system time is very costly. With this setting, imudp can
+be instructed to obtain the precise time only once every n-times. This logic is
+only activated if messages come in at a very fast rate, so doing less frequent
+time calls should usually be acceptable. The default value is two, because we have
+seen that even without optimization the kernel often returns twice the identical time.
+You can set this value as high as you like, but do so at your own risk. The higher
+the value, the less precise the timestamp.
<li><a href="droppriv.html">$PrivDropToGroup</a></li>
<li><a href="droppriv.html">$PrivDropToGroupID</a></li>
<li><a href="droppriv.html">$PrivDropToUser</a></li>
diff --git a/runtime/datetime.c b/runtime/datetime.c
index de26762d..f81180ea 100644
--- a/runtime/datetime.c
+++ b/runtime/datetime.c
@@ -122,7 +122,7 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
else
t->OffsetMode = '+';
t->OffsetHour = lBias / 3600;
- t->OffsetMinute = lBias % 3600;
+ t->OffsetMinute = (lBias % 3600) / 60;
t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */
}
diff --git a/runtime/glbl.c b/runtime/glbl.c
index 71901dee..2fd52a5a 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -159,8 +159,29 @@ static void SetGlobalInputTermination(void)
*/
static rsRetVal setWorkDir(void __attribute__((unused)) *pVal, uchar *pNewVal)
{
- DEFiRet;
+ size_t lenDir;
+ int i;
struct stat sb;
+ DEFiRet;
+
+ /* remove trailing slashes */
+ lenDir = ustrlen(pNewVal);
+ i = lenDir - 1;
+ while(i > 0 && pNewVal[i] == '/') {
+ --i;
+ }
+
+ if(i < 0) {
+ errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: empty value "
+ "- directive ignored");
+ ABORT_FINALIZE(RS_RET_ERR_WRKDIR);
+ }
+
+ if(i != (int) lenDir - 1) {
+ pNewVal[i+1] = '\0';
+ errmsg.LogError(0, RS_RET_WRN_WRKDIR, "$WorkDirectory: trailing slashes "
+ "removed, new value is '%s'", pNewVal);
+ }
if(stat((char*) pNewVal, &sb) != 0) {
errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: %s can not be "
diff --git a/runtime/queue.c b/runtime/queue.c
index 88e01a7a..00eb76c7 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1678,6 +1678,7 @@ static rsRetVal
ConsumerReg(qqueue_t *pThis, wti_t *pWti)
{
int iCancelStateSave;
+ int bNeedReLock = 0; /**< do we need to lock the mutex again? */
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -1687,6 +1688,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
+ bNeedReLock = 1;
/* at this spot, we may be cancelled */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
@@ -1706,12 +1708,14 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* but now cancellation is no longer permitted */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- /* now we are done, but need to re-aquire the mutex */
- d_pthread_mutex_lock(pThis->mut);
-
finalize_it:
- dbgprintf("regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet,
+ DBGPRINTF("regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+
+ /* now we are done, but potentially need to re-aquire the mutex */
+ if(bNeedReLock)
+ d_pthread_mutex_lock(pThis->mut);
+
RETiRet;
}
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 23547535..9b14cc92 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -349,6 +349,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */
RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */
RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */
+ RS_RET_WRN_WRKDIR = -2182, /**< correctable problems with the rsyslog working directory */
RS_RET_INVLD_CONF_OBJ= -2200, /**< invalid config object (e.g. $Begin conf statement) */
RS_RET_ERR_LIBEE_INIT = -2201, /**< cannot obtain libee ctx */
diff --git a/runtime/rule.c b/runtime/rule.c
index 7c3e5131..16d6fff4 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -291,6 +291,7 @@ static rsRetVal
processBatch(rule_t *pThis, batch_t *pBatch)
{
int i;
+ rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
@@ -298,9 +299,13 @@ processBatch(rule_t *pThis, batch_t *pBatch)
/* first check the filters and reset status variables */
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
- &(pBatch->pElem[i].bFilterOK)));
- // TODO: really abort on error? 2010-06-10
+ localRet = shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
+ &(pBatch->pElem[i].bFilterOK));
+ if(localRet != RS_RET_OK) {
+ DBGPRINTF("processBatch: iRet %d returned from shouldProcessThisMessage, "
+ "ignoring message\n", localRet);
+ pBatch->pElem[i].bFilterOK = 0;
+ }
if(pBatch->pElem[i].bFilterOK) {
/* re-init only when actually needed (cache write cost!) */
pBatch->pElem[i].bPrevWasSuspended = 0;
diff --git a/tests/Makefile.am b/tests/Makefile.am
index b3422297..3f6110db 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -18,6 +18,7 @@ TESTS += \
imtcp_conndrop.sh \
imtcp_addtlframedelim.sh \
sndrcv.sh \
+ sndrcv_failover.sh \
sndrcv_gzip.sh \
sndrcv_udp.sh \
sndrcv_udp_nonstdpt.sh \
@@ -55,6 +56,8 @@ TESTS += \
discard-rptdmsg.sh \
discard-allmark.sh \
discard.sh \
+ failover-async.sh \
+ failover-double.sh \
failover-basic.sh \
failover-rptd.sh \
failover-no-rptd.sh \
@@ -105,7 +108,8 @@ TESTS += \
sndrcv_tls_anon_rebind.sh \
imtcp-tls-basic.sh
if HAVE_VALGRIND
-TESTS += manytcp-too-few-tls.sh
+# This test does not work on v5 as we keep DH params
+#TESTS += manytcp-too-few-tls.sh
endif
endif
@@ -308,6 +312,10 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
failover-basic.sh \
failover-basic-vg.sh \
testsuites/failover-basic.conf \
+ failover-async.sh \
+ testsuites/failover-async.conf \
+ failover-double.sh \
+ testsuites/failover-double.conf \
discard-rptdmsg.sh \
discard-rptdmsg-vg.sh \
testsuites/discard-rptdmsg.conf \
@@ -330,6 +338,10 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
threadingmqaq.sh \
testsuites/threadingmqaq.conf \
sndrcv_drvr.sh \
+ sndrcv_drvr_noexit.sh \
+ sndrcv_failover.sh \
+ testsuites/sndrcv_failover_sender.conf \
+ testsuites/sndrcv_failover_rcvr.conf \
sndrcv.sh \
testsuites/sndrcv_sender.conf \
testsuites/sndrcv_rcvr.conf \
diff --git a/tests/diag.sh b/tests/diag.sh
index 8b0ad573..1f7de2cf 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -10,7 +10,7 @@
#valgrind="valgrind --tool=helgrind --log-fd=1"
#valgrind="valgrind --tool=exp-ptrcheck --log-fd=1"
#set -o xtrace
-#export RSYSLOG_DEBUG="debug nologfuncflow nostdout"
+#export RSYSLOG_DEBUG="debug nologfuncflow noprintmutexaction stdout"
#export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
@@ -22,7 +22,7 @@ case $1 in
rm -f work rsyslog.out.log rsyslog2.out.log rsyslog.out.log.save # common work files
rm -rf test-spool test-logdir
rm -f rsyslog.out.*.log work-presort rsyslog.pipe
- rm -f rsyslog.input
+ rm -f rsyslog.input rsyslog.empty
rm -f core.* vgcore.*
mkdir test-spool
;;
@@ -31,7 +31,7 @@ case $1 in
rm -f work rsyslog.out.log rsyslog2.out.log rsyslog.out.log.save # common work files
rm -rf test-spool test-logdir
rm -f rsyslog.out.*.log rsyslog.random.data work-presort rsyslog.pipe
- rm -f rsyslog.input rsyslog.conf.tlscert stat-file1
+ rm -f rsyslog.input rsyslog.conf.tlscert stat-file1 rsyslog.empty
echo -------------------------------------------------------------------------------
;;
'startup') # start rsyslogd with default params. $2 is the config file name to use
diff --git a/tests/failover-async.sh b/tests/failover-async.sh
index f17bc0ff..5fc393de 100755
--- a/tests/failover-async.sh
+++ b/tests/failover-async.sh
@@ -3,7 +3,7 @@ echo ===========================================================================
echo \[failover-async.sh\]: async test for failover functionality
source $srcdir/diag.sh init
source $srcdir/diag.sh startup failover-async.conf
-source $srcdir/diag.sh injectmsg 0 5
+source $srcdir/diag.sh injectmsg 0 5000
echo doing shutdown
source $srcdir/diag.sh shutdown-when-empty
echo wait on shutdown
diff --git a/tests/failover-double.sh b/tests/failover-double.sh
new file mode 100755
index 00000000..172b91de
--- /dev/null
+++ b/tests/failover-double.sh
@@ -0,0 +1,12 @@
+# This file is part of the rsyslog project, released under GPLv3
+echo ===============================================================================
+echo \[failover-double.sh\]: test for double failover functionality
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup failover-double.conf
+source $srcdir/diag.sh injectmsg 0 5000
+echo doing shutdown
+source $srcdir/diag.sh shutdown-when-empty
+echo wait on shutdown
+source $srcdir/diag.sh wait-shutdown
+source $srcdir/diag.sh seq-check 0 4999
+source $srcdir/diag.sh exit
diff --git a/tests/manytcp-too-few-tls.sh b/tests/manytcp-too-few-tls.sh
index 899a87dc..970a572d 100755
--- a/tests/manytcp-too-few-tls.sh
+++ b/tests/manytcp-too-few-tls.sh
@@ -11,5 +11,9 @@ sleep 1
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
source $srcdir/diag.sh wait-shutdown-vg # we need to wait until rsyslogd is finished!
source $srcdir/diag.sh check-exit-vg
-source $srcdir/diag.sh seq-check 0 39999
+# we do not do a seq check, as of the design of this test some messages
+# will be lost. So there is no point in checking if all were received. The
+# point is that we look at the valgrind result, to make sure we do not
+# have a mem leak in those error cases (we had in the past, thus the test
+# to prevent that in the future).
source $srcdir/diag.sh exit
diff --git a/tests/sndrcv_drvr.sh b/tests/sndrcv_drvr.sh
index 9f036a31..1f3b9113 100755
--- a/tests/sndrcv_drvr.sh
+++ b/tests/sndrcv_drvr.sh
@@ -1,41 +1 @@
-# This is test driver for testing two rsyslog instances. It can be
-# utilized by any test that just needs two instances with different
-# config files, where messages are injected in instance TWO and
-# (with whatever rsyslog mechanism) being relayed over to instance ONE,
-# where they are written to the log file. After the run, the completeness
-# of that log file is checked.
-# The code is almost the same, but the config files differ (probably greatly)
-# for different test cases. As such, this driver needs to be called with the
-# config file name ($2). From that name, the sender and receiver config file
-# names are automatically generated.
-# So: $1 config file name, $2 number of messages
-#
-# added 2009-11-11 by Rgerhards
-# This file is part of the rsyslog project, released under GPLv3
-# uncomment for debugging support:
-source $srcdir/diag.sh init
-# start up the instances
-#export RSYSLOG_DEBUG="debug nostdout"
-#export RSYSLOG_DEBUGLOG="log"
-source $srcdir/diag.sh startup $1_rcvr.conf
-source $srcdir/diag.sh wait-startup
-#export RSYSLOG_DEBUGLOG="log2"
-#valgrind="valgrind"
-source $srcdir/diag.sh startup $1_sender.conf 2
-source $srcdir/diag.sh wait-startup 2
-
-# now inject the messages into instance 2. It will connect to instance 1,
-# and that instance will record the data.
-source $srcdir/diag.sh tcpflood -m$2 -i1
-sleep 2 # make sure all data is received in input buffers
-# shut down sender when everything is sent, receiver continues to run concurrently
-source $srcdir/diag.sh shutdown-when-empty 2
-source $srcdir/diag.sh wait-shutdown 2
-# now it is time to stop the receiver as well
-echo "Shutting down instance 1 (receiver)"
-source $srcdir/diag.sh shutdown-when-empty
-source $srcdir/diag.sh wait-shutdown
-
-# do the final check
-source $srcdir/diag.sh seq-check 1 $2
-source $srcdir/diag.sh exit
+source $srcdir/sndrcv_drvr_noexit.sh $1 $2
diff --git a/tests/sndrcv_drvr_noexit.sh b/tests/sndrcv_drvr_noexit.sh
new file mode 100755
index 00000000..899eace3
--- /dev/null
+++ b/tests/sndrcv_drvr_noexit.sh
@@ -0,0 +1,49 @@
+# This is test driver for testing two rsyslog instances. It can be
+# utilized by any test that just needs two instances with different
+# config files, where messages are injected in instance TWO and
+# (with whatever rsyslog mechanism) being relayed over to instance ONE,
+# where they are written to the log file. After the run, the completeness
+# of that log file is checked.
+# The code is almost the same, but the config files differ (probably greatly)
+# for different test cases. As such, this driver needs to be called with the
+# config file name ($2). From that name, the sender and receiver config file
+# names are automatically generated.
+# So: $1 config file name, $2 number of messages
+#
+# A note on TLS testing: the current testsuite (in git!) already contains
+# TLS test cases. However, getting these test cases correct is not simple.
+# That's not a problem with the code itself, but rater a problem with
+# synchronization in the test environment. So I have deciced to keep the
+# TLS tests in, but not yet actually utilize them. This is most probably
+# left as an excercise for future (devel) releases. -- rgerhards, 2009-11-11
+#
+# added 2009-11-11 by Rgerhards
+# This file is part of the rsyslog project, released under GPLv3
+# uncomment for debugging support:
+source $srcdir/diag.sh init
+# start up the instances
+#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
+#export RSYSLOG_DEBUGLOG="log"
+source $srcdir/diag.sh startup $1_rcvr.conf
+source $srcdir/diag.sh wait-startup
+#export RSYSLOG_DEBUGLOG="log2"
+#valgrind="valgrind"
+source $srcdir/diag.sh startup $1_sender.conf 2
+source $srcdir/diag.sh wait-startup 2
+# may be needed by TLS (once we do it): sleep 30
+
+# now inject the messages into instance 2. It will connect to instance 1,
+# and that instance will record the data.
+source $srcdir/diag.sh tcpflood -m$2 -i1
+sleep 2 # make sure all data is received in input buffers
+# shut down sender when everything is sent, receiver continues to run concurrently
+# may be needed by TLS (once we do it): sleep 60
+source $srcdir/diag.sh shutdown-when-empty 2
+source $srcdir/diag.sh wait-shutdown 2
+# now it is time to stop the receiver as well
+source $srcdir/diag.sh shutdown-when-empty
+source $srcdir/diag.sh wait-shutdown
+
+# may be needed by TLS (once we do it): sleep 60
+# do the final check
+source $srcdir/diag.sh seq-check 1 $2
diff --git a/tests/sndrcv_failover.sh b/tests/sndrcv_failover.sh
new file mode 100755
index 00000000..4c5e1831
--- /dev/null
+++ b/tests/sndrcv_failover.sh
@@ -0,0 +1,21 @@
+# This tests failover capabilities. Data is sent to local port 13516, where
+# no process shall listen. Then it fails over to a second instance, then to
+# a file. The second instance is started. So all data should be received
+# there and none be logged to the file.
+# This builds on the basic sndrcv.sh test, but adds a first, failing,
+# location to the conf file.
+# added 2011-06-20 by Rgerhards
+# This file is part of the rsyslog project, released under GPLv3
+echo ===============================================================================
+echo \[sndrcv_failover.sh\]: testing failover capabilities for tcp sending
+source $srcdir/sndrcv_drvr_noexit.sh sndrcv_failover 50000
+ls -l rsyslog.empty
+if [[ -s rsyslog.empty ]] ; then
+ echo "FAIL: rsyslog.empty has data. Failover handling failed. Data is written"
+ echo " even though the previous action (in a failover chain!) properly"
+ echo " worked."
+ exit 1
+else
+ echo "rsyslog.empty is empty - OK"
+fi ;
+source $srcdir/diag.sh exit
diff --git a/tests/testsuites/failover-double.conf b/tests/testsuites/failover-double.conf
new file mode 100644
index 00000000..a9991321
--- /dev/null
+++ b/tests/testsuites/failover-double.conf
@@ -0,0 +1,9 @@
+$IncludeConfig diag-common.conf
+
+$template outfmt,"%msg:F,58:2%\n"
+
+:msg, contains, "msgnum:" @@127.0.0.1:13516
+$ActionExecOnlyWhenPreviousIsSuspended on
+& @@127.0.0.1:1234
+& ./rsyslog.out.log;outfmt
+$ActionExecOnlyWhenPreviousIsSuspended off
diff --git a/tests/testsuites/sndrcv_failover_rcvr.conf b/tests/testsuites/sndrcv_failover_rcvr.conf
new file mode 100644
index 00000000..6f7ce34b
--- /dev/null
+++ b/tests/testsuites/sndrcv_failover_rcvr.conf
@@ -0,0 +1,11 @@
+# see equally-named shell file for details
+# rgerhards, 2009-11-11
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imtcp/.libs/imtcp
+# then SENDER sends to this port (not tcpflood!)
+$InputTCPServerRun 13515
+
+$template outfmt,"%msg:F,58:2%\n"
+$template dynfile,"rsyslog.out.log" # trick to use relative path names!
+:msg, contains, "msgnum:" ?dynfile;outfmt
diff --git a/tests/testsuites/sndrcv_failover_sender.conf b/tests/testsuites/sndrcv_failover_sender.conf
new file mode 100644
index 00000000..b8e7c186
--- /dev/null
+++ b/tests/testsuites/sndrcv_failover_sender.conf
@@ -0,0 +1,13 @@
+# see tcpsndrcv.sh for details
+# rgerhards, 2009-11-11
+$IncludeConfig diag-common2.conf
+
+$ModLoad ../plugins/imtcp/.libs/imtcp
+# this listener is for message generation by the test framework!
+$InputTCPServerRun 13514
+
+*.* @@127.0.0.1:13516 # this must be DEAD
+$ActionExecOnlyWhenPreviousIsSuspended on
+& @@127.0.0.1:13515
+& ./rsyslog.empty
+$ActionExecOnlyWhenPreviousIsSuspended off
diff --git a/tools/syslogd.c b/tools/syslogd.c
index bab37966..25ab05cd 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -796,7 +796,7 @@ DEFFUNC_llExecFunc(flushRptdMsgsActions)
DBGPRINTF("flush %s: repeated %d times, %d sec.\n",
module.GetStateName(pAction->pMod), pAction->f_prevcount,
repeatinterval[pAction->f_repeatcount]);
- actionWriteToAction(pAction, NULL, 0);
+ actionWriteToAction(pAction);
BACKOFF(pAction);
}
UnlockObj(pAction);