summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-06-09 19:00:18 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-06-09 19:00:18 +0200
commit9704f129f72ec9ece11aeccea4bbf0cbccb116cb (patch)
tree96f7b742a3040fd73569bf561e466f12b9086449
parent8d1e2e496c6a4a4d40d1e8604c746e0d32013536 (diff)
downloadrsyslog-9704f129f72ec9ece11aeccea4bbf0cbccb116cb.tar.gz
rsyslog-9704f129f72ec9ece11aeccea4bbf0cbccb116cb.tar.xz
rsyslog-9704f129f72ec9ece11aeccea4bbf0cbccb116cb.zip
added capability to fsync() queue disk files for enhanced reliability
also adds speed, because you do no longer need to run the whole file system in sync mode. New testbench and new config directives: - $MainMsgQueueSyncQueueFiles - $ActionQueueSyncQueueFiles
-rw-r--r--ChangeLog5
-rw-r--r--action.c4
-rw-r--r--doc/queues.html6
-rw-r--r--runtime/queue.c5
-rw-r--r--runtime/queue.h2
-rw-r--r--tests/Makefile.am3
-rwxr-xr-xtests/diskqueue-fsync.sh15
-rw-r--r--tests/testsuites/diskqueue-fsync.conf17
-rw-r--r--tools/syslogd.c4
9 files changed, 60 insertions, 1 deletions
diff --git a/ChangeLog b/ChangeLog
index 8fc4194d..57bb6e7e 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -12,9 +12,14 @@ Version 4.3.2 [DEVEL] (rgerhards), 2009-??-??
- reduced max value for $DynaFileCacheSize to 1,000 (the former maximum
of 10,000 really made no sense, even 1,000 is very high, but we like
to keep the user in control ;)).
+- added capability to fsync() queue disk files for enhanced reliability
+ (also add's speed, because you do no longer need to run the whole file
+ system in sync mode)
- added configuration commands (see doc for explanations)
* $OMFileZipLevel
* $OMFileIOBufferSize
+ * $MainMsgQueueSyncQueueFiles
+ * $ActionQueueSyncQueueFiles
---------------------------------------------------------------------------
Version 4.3.1 [DEVEL] (rgerhards), 2009-05-25
- added capability to run multiple tcp listeners (on different ports)
diff --git a/action.c b/action.c
index 51620fce..8bdb6dec 100644
--- a/action.c
+++ b/action.c
@@ -72,6 +72,7 @@ static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm
static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */
static int64 iActionQueMaxFileSize = 1024*1024;
static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int bActionQSyncQeueFiles = 0; /* sync queue files */
static int iActionQtoQShutdown = 0; /* queue shutdown */
static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
static int iActionQtoEnq = 2000; /* timeout for queue enque */
@@ -151,6 +152,7 @@ actionResetQueueParams(void)
iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
iActionQueMaxFileSize = 1024*1024;
iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+ bActionQSyncQeueFiles = 0;
iActionQtoQShutdown = 0; /* queue shutdown */
iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
iActionQtoEnq = 2000; /* timeout for queue enque */
@@ -273,6 +275,7 @@ actionConstructFinalize(action_t *pThis)
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", bActionQSyncQeueFiles);
setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
@@ -838,6 +841,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL));
diff --git a/doc/queues.html b/doc/queues.html
index 4a9509a0..45ce1bd1 100644
--- a/doc/queues.html
+++ b/doc/queues.html
@@ -115,7 +115,11 @@ isolation. This is currently selected by specifying different <i>$WorkDirectory<
config directives before the queue creation statement.</p>
<p>To create a disk queue, use the "<i>$&lt;object&gt;QueueType Disk</i>" config
directive. Checkpoint intervals can be specified via "<i>$&lt;object&gt;QueueCheckpointInterval</i>",
-with 0 meaning no checkpoints. </p>
+with 0 meaning no checkpoints. Note that disk-based queues can be made very reliable
+by issuing a (f)sync after each write operation. Starting with version 4.3.2, this can
+be requested via "<i>&lt;object&gt;QueueSyncQueueFiles on/off</i> with the
+default being off. Activating this option has a performance penalty, so it should
+not be turned on without reason.</p>
<h2>In-Memory Queues</h2>
<p>In-memory queue mode is what most people have on their mind when they think
about computing queues. Here, the enqueued data elements are held in memory.
diff --git a/runtime/queue.c b/runtime/queue.c
index 3532a145..aa8e6c21 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -294,6 +294,7 @@ qqueueStartDA(qqueue_t *pThis)
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
+ CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
@@ -817,6 +818,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
;
} else {
CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite));
+ CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles));
CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
@@ -824,6 +826,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite));
CHKiRet(strm.Construct(&pThis->tVars.disk.pRead));
+ CHKiRet(strm.SetbSync(pThis->tVars.disk.pRead, pThis->bSyncQueueFiles));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
CHKiRet(strm.SetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
@@ -1924,6 +1927,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
CHKiRet(strm.Construct(&psQIF));
CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC));
+ CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles));
CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam));
CHKiRet(strm.ConstructFinalize(psQIF));
@@ -2279,6 +2283,7 @@ finalize_it:
/* some simple object access methods */
+DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
DEFpropSetMeth(qqueue, iDeqtWinFromHr, int)
DEFpropSetMeth(qqueue, iDeqtWinToHr, int)
diff --git a/runtime/queue.h b/runtime/queue.h
index a267862d..07f134aa 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -73,6 +73,7 @@ typedef struct queue_s {
void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
+ int bSyncQueueFiles;/* if working with files, sync them after each write? */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
@@ -186,6 +187,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
+PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int);
PROTOTYPEpropSetMeth(qqueue, toQShutdown, long);
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a95139f2..3d555fc3 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -5,6 +5,7 @@ TESTS = $(TESTRUNS) cfg.sh \
validation-run.sh \
imtcp-multiport.sh \
diskqueue.sh \
+ diskqueue-fsync.sh \
manytcp.sh \
queue-persist.sh
@@ -59,6 +60,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
fieldtest.sh \
diskqueue.sh \
testsuites/diskqueue.conf \
+ diskqueue-fsync.sh \
+ testsuites/diskqueue-fsync.conf \
imtcp-multiport.sh \
testsuites/imtcp-multiport.conf \
manytcp.sh \
diff --git a/tests/diskqueue-fsync.sh b/tests/diskqueue-fsync.sh
new file mode 100755
index 00000000..f558b491
--- /dev/null
+++ b/tests/diskqueue-fsync.sh
@@ -0,0 +1,15 @@
+# Test for disk-only queue mode (with fsync for queue files)
+# This test checks if queue files can be correctly written
+# and read back, but it does not test the transition from
+# memory to disk mode for DA queues.
+# added 2009-06-09 by Rgerhards
+# This file is part of the rsyslog project, released under GPLv3
+# uncomment for debugging support:
+echo testing queue disk-only mode, fsync case
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup diskqueue-fsync.conf
+# 5000 messages should be enough - the disk fsync test is very slow!
+source $srcdir/diag.sh tcpflood 127.0.0.1 13514 1 5000
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh seq-check 0 4999
+source $srcdir/diag.sh exit
diff --git a/tests/testsuites/diskqueue-fsync.conf b/tests/testsuites/diskqueue-fsync.conf
new file mode 100644
index 00000000..0a02c6ce
--- /dev/null
+++ b/tests/testsuites/diskqueue-fsync.conf
@@ -0,0 +1,17 @@
+# Test for queue disk mode (see .sh file for details)
+# rgerhards, 2009-04-17
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imtcp/.libs/imtcp
+$InputTCPServerRun 13514
+
+# set spool locations and switch queue to disk-only mode
+$WorkDirectory test-spool
+$MainMsgQueueSyncQueueFiles on
+$MainMsgQueueTimeoutShutdown 10000
+$MainMsgQueueFilename mainq
+$MainMsgQueueType disk
+
+$template outfmt,"%msg:F,58:2%\n"
+$template dynfile,"rsyslog.out.log" # trick to use relative path names!
+:msg, contains, "msgnum:" ?dynfile;outfmt
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 206b79e8..19ece349 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -297,6 +297,7 @@ static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main m
static uchar *pszMainMsgQFName = NULL; /* prefix for the main message queue file */
static int64 iMainMsgQueMaxFileSize = 1024*1024;
static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int bMainMsgQSyncQeueFiles = 0; /* sync queue files on every write? */
static int iMainMsgQtoQShutdown = 0; /* queue shutdown */
static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */
static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */
@@ -359,6 +360,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iMainMsgQueMaxFileSize = 1024 * 1024;
iMainMsgQueueNumWorkers = 1;
iMainMsgQPersistUpdCnt = 0;
+ bMainMsgQSyncQeueFiles = 0;
iMainMsgQtoQShutdown = 0;
iMainMsgQtoActShutdown = 1000;
iMainMsgQtoEnq = 2000;
@@ -2715,6 +2717,7 @@ init(void)
setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles);
setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown);
setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown);
@@ -3084,6 +3087,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuediscardmark", 0, eCmdHdlrInt, NULL, &iMainMsgQDiscardMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuediscardseverity", 0, eCmdHdlrSeverity, NULL, &iMainMsgQDiscardSeverity, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iMainMsgQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bMainMsgQSyncQeueFiles, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoQShutdown, NULL));