summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog5
-rw-r--r--action.c4
-rw-r--r--doc/queues.html6
-rw-r--r--runtime/queue.c5
-rw-r--r--runtime/queue.h2
-rw-r--r--tests/Makefile.am3
-rwxr-xr-xtests/diskqueue-fsync.sh15
-rw-r--r--tests/testsuites/diskqueue-fsync.conf17
-rw-r--r--tools/syslogd.c4
9 files changed, 60 insertions, 1 deletions
diff --git a/ChangeLog b/ChangeLog
index 8fc4194d..57bb6e7e 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -12,9 +12,14 @@ Version 4.3.2 [DEVEL] (rgerhards), 2009-??-??
- reduced max value for $DynaFileCacheSize to 1,000 (the former maximum
of 10,000 really made no sense, even 1,000 is very high, but we like
to keep the user in control ;)).
+- added capability to fsync() queue disk files for enhanced reliability
+ (also add's speed, because you do no longer need to run the whole file
+ system in sync mode)
- added configuration commands (see doc for explanations)
* $OMFileZipLevel
* $OMFileIOBufferSize
+ * $MainMsgQueueSyncQueueFiles
+ * $ActionQueueSyncQueueFiles
---------------------------------------------------------------------------
Version 4.3.1 [DEVEL] (rgerhards), 2009-05-25
- added capability to run multiple tcp listeners (on different ports)
diff --git a/action.c b/action.c
index 51620fce..8bdb6dec 100644
--- a/action.c
+++ b/action.c
@@ -72,6 +72,7 @@ static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm
static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */
static int64 iActionQueMaxFileSize = 1024*1024;
static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int bActionQSyncQeueFiles = 0; /* sync queue files */
static int iActionQtoQShutdown = 0; /* queue shutdown */
static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
static int iActionQtoEnq = 2000; /* timeout for queue enque */
@@ -151,6 +152,7 @@ actionResetQueueParams(void)
iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
iActionQueMaxFileSize = 1024*1024;
iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+ bActionQSyncQeueFiles = 0;
iActionQtoQShutdown = 0; /* queue shutdown */
iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
iActionQtoEnq = 2000; /* timeout for queue enque */
@@ -273,6 +275,7 @@ actionConstructFinalize(action_t *pThis)
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", bActionQSyncQeueFiles);
setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
@@ -838,6 +841,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL));
diff --git a/doc/queues.html b/doc/queues.html
index 4a9509a0..45ce1bd1 100644
--- a/doc/queues.html
+++ b/doc/queues.html
@@ -115,7 +115,11 @@ isolation. This is currently selected by specifying different <i>$WorkDirectory<
config directives before the queue creation statement.</p>
<p>To create a disk queue, use the "<i>$&lt;object&gt;QueueType Disk</i>" config
directive. Checkpoint intervals can be specified via "<i>$&lt;object&gt;QueueCheckpointInterval</i>",
-with 0 meaning no checkpoints. </p>
+with 0 meaning no checkpoints. Note that disk-based queues can be made very reliable
+by issuing a (f)sync after each write operation. Starting with version 4.3.2, this can
+be requested via "<i>&lt;object&gt;QueueSyncQueueFiles on/off</i> with the
+default being off. Activating this option has a performance penalty, so it should
+not be turned on without reason.</p>
<h2>In-Memory Queues</h2>
<p>In-memory queue mode is what most people have on their mind when they think
about computing queues. Here, the enqueued data elements are held in memory.
diff --git a/runtime/queue.c b/runtime/queue.c
index 3532a145..aa8e6c21 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -294,6 +294,7 @@ qqueueStartDA(qqueue_t *pThis)
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
+ CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
@@ -817,6 +818,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
;
} else {
CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite));
+ CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles));
CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
@@ -824,6 +826,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite));
CHKiRet(strm.Construct(&pThis->tVars.disk.pRead));
+ CHKiRet(strm.SetbSync(pThis->tVars.disk.pRead, pThis->bSyncQueueFiles));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
CHKiRet(strm.SetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
@@ -1924,6 +1927,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
CHKiRet(strm.Construct(&psQIF));
CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC));
+ CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles));
CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam));
CHKiRet(strm.ConstructFinalize(psQIF));
@@ -2279,6 +2283,7 @@ finalize_it:
/* some simple object access methods */
+DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
DEFpropSetMeth(qqueue, iDeqtWinFromHr, int)
DEFpropSetMeth(qqueue, iDeqtWinToHr, int)
diff --git a/runtime/queue.h b/runtime/queue.h
index a267862d..07f134aa 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -73,6 +73,7 @@ typedef struct queue_s {
void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
+ int bSyncQueueFiles;/* if working with files, sync them after each write? */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
@@ -186,6 +187,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
+PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int);
PROTOTYPEpropSetMeth(qqueue, toQShutdown, long);
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a95139f2..3d555fc3 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -5,6 +5,7 @@ TESTS = $(TESTRUNS) cfg.sh \
validation-run.sh \
imtcp-multiport.sh \
diskqueue.sh \
+ diskqueue-fsync.sh \
manytcp.sh \
queue-persist.sh
@@ -59,6 +60,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
fieldtest.sh \
diskqueue.sh \
testsuites/diskqueue.conf \
+ diskqueue-fsync.sh \
+ testsuites/diskqueue-fsync.conf \
imtcp-multiport.sh \
testsuites/imtcp-multiport.conf \
manytcp.sh \
diff --git a/tests/diskqueue-fsync.sh b/tests/diskqueue-fsync.sh
new file mode 100755
index 00000000..f558b491
--- /dev/null
+++ b/tests/diskqueue-fsync.sh
@@ -0,0 +1,15 @@
+# Test for disk-only queue mode (with fsync for queue files)
+# This test checks if queue files can be correctly written
+# and read back, but it does not test the transition from
+# memory to disk mode for DA queues.
+# added 2009-06-09 by Rgerhards
+# This file is part of the rsyslog project, released under GPLv3
+# uncomment for debugging support:
+echo testing queue disk-only mode, fsync case
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup diskqueue-fsync.conf
+# 5000 messages should be enough - the disk fsync test is very slow!
+source $srcdir/diag.sh tcpflood 127.0.0.1 13514 1 5000
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh seq-check 0 4999
+source $srcdir/diag.sh exit
diff --git a/tests/testsuites/diskqueue-fsync.conf b/tests/testsuites/diskqueue-fsync.conf
new file mode 100644
index 00000000..0a02c6ce
--- /dev/null
+++ b/tests/testsuites/diskqueue-fsync.conf
@@ -0,0 +1,17 @@
+# Test for queue disk mode (see .sh file for details)
+# rgerhards, 2009-04-17
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imtcp/.libs/imtcp
+$InputTCPServerRun 13514
+
+# set spool locations and switch queue to disk-only mode
+$WorkDirectory test-spool
+$MainMsgQueueSyncQueueFiles on
+$MainMsgQueueTimeoutShutdown 10000
+$MainMsgQueueFilename mainq
+$MainMsgQueueType disk
+
+$template outfmt,"%msg:F,58:2%\n"
+$template dynfile,"rsyslog.out.log" # trick to use relative path names!
+:msg, contains, "msgnum:" ?dynfile;outfmt
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 206b79e8..19ece349 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -297,6 +297,7 @@ static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main m
static uchar *pszMainMsgQFName = NULL; /* prefix for the main message queue file */
static int64 iMainMsgQueMaxFileSize = 1024*1024;
static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int bMainMsgQSyncQeueFiles = 0; /* sync queue files on every write? */
static int iMainMsgQtoQShutdown = 0; /* queue shutdown */
static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */
static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */
@@ -359,6 +360,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iMainMsgQueMaxFileSize = 1024 * 1024;
iMainMsgQueueNumWorkers = 1;
iMainMsgQPersistUpdCnt = 0;
+ bMainMsgQSyncQeueFiles = 0;
iMainMsgQtoQShutdown = 0;
iMainMsgQtoActShutdown = 1000;
iMainMsgQtoEnq = 2000;
@@ -2715,6 +2717,7 @@ init(void)
setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles);
setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown);
setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown);
@@ -3084,6 +3087,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuediscardmark", 0, eCmdHdlrInt, NULL, &iMainMsgQDiscardMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuediscardseverity", 0, eCmdHdlrSeverity, NULL, &iMainMsgQDiscardSeverity, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iMainMsgQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bMainMsgQSyncQeueFiles, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoQShutdown, NULL));