diff options
-rw-r--r-- | ChangeLog | 5 | ||||
-rw-r--r-- | action.c | 4 | ||||
-rw-r--r-- | doc/queues.html | 6 | ||||
-rw-r--r-- | runtime/queue.c | 5 | ||||
-rw-r--r-- | runtime/queue.h | 2 | ||||
-rw-r--r-- | tests/Makefile.am | 3 | ||||
-rwxr-xr-x | tests/diskqueue-fsync.sh | 15 | ||||
-rw-r--r-- | tests/testsuites/diskqueue-fsync.conf | 17 | ||||
-rw-r--r-- | tools/syslogd.c | 4 |
9 files changed, 60 insertions, 1 deletions
@@ -12,9 +12,14 @@ Version 4.3.2 [DEVEL] (rgerhards), 2009-??-?? - reduced max value for $DynaFileCacheSize to 1,000 (the former maximum of 10,000 really made no sense, even 1,000 is very high, but we like to keep the user in control ;)). +- added capability to fsync() queue disk files for enhanced reliability + (also add's speed, because you do no longer need to run the whole file + system in sync mode) - added configuration commands (see doc for explanations) * $OMFileZipLevel * $OMFileIOBufferSize + * $MainMsgQueueSyncQueueFiles + * $ActionQueueSyncQueueFiles --------------------------------------------------------------------------- Version 4.3.1 [DEVEL] (rgerhards), 2009-05-25 - added capability to run multiple tcp listeners (on different ports) @@ -72,6 +72,7 @@ static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */ static int64 iActionQueMaxFileSize = 1024*1024; static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ +static int bActionQSyncQeueFiles = 0; /* sync queue files */ static int iActionQtoQShutdown = 0; /* queue shutdown */ static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ static int iActionQtoEnq = 2000; /* timeout for queue enque */ @@ -151,6 +152,7 @@ actionResetQueueParams(void) iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ iActionQueMaxFileSize = 1024*1024; iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ + bActionQSyncQeueFiles = 0; iActionQtoQShutdown = 0; /* queue shutdown */ iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ iActionQtoEnq = 2000; /* timeout for queue enque */ @@ -273,6 +275,7 @@ actionConstructFinalize(action_t *pThis) setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", bActionQSyncQeueFiles); setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); @@ -838,6 +841,7 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); diff --git a/doc/queues.html b/doc/queues.html index 4a9509a0..45ce1bd1 100644 --- a/doc/queues.html +++ b/doc/queues.html @@ -115,7 +115,11 @@ isolation. This is currently selected by specifying different <i>$WorkDirectory< config directives before the queue creation statement.</p> <p>To create a disk queue, use the "<i>$<object>QueueType Disk</i>" config directive. Checkpoint intervals can be specified via "<i>$<object>QueueCheckpointInterval</i>", -with 0 meaning no checkpoints. </p> +with 0 meaning no checkpoints. Note that disk-based queues can be made very reliable +by issuing a (f)sync after each write operation. Starting with version 4.3.2, this can +be requested via "<i><object>QueueSyncQueueFiles on/off</i> with the +default being off. Activating this option has a performance penalty, so it should +not be turned on without reason.</p> <h2>In-Memory Queues</h2> <p>In-memory queue mode is what most people have on their mind when they think about computing queues. Here, the enqueued data elements are held in memory. diff --git a/runtime/queue.c b/runtime/queue.c index 3532a145..aa8e6c21 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -294,6 +294,7 @@ qqueueStartDA(qqueue_t *pThis) CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); + CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); @@ -817,6 +818,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) ; } else { CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite)); + CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles)); CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); @@ -824,6 +826,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite)); CHKiRet(strm.Construct(&pThis->tVars.disk.pRead)); + CHKiRet(strm.SetbSync(pThis->tVars.disk.pRead, pThis->bSyncQueueFiles)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); CHKiRet(strm.SetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); @@ -1924,6 +1927,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(strm.Construct(&psQIF)); CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC)); + CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); @@ -2279,6 +2283,7 @@ finalize_it: /* some simple object access methods */ +DEFpropSetMeth(qqueue, bSyncQueueFiles, int) DEFpropSetMeth(qqueue, iPersistUpdCnt, int) DEFpropSetMeth(qqueue, iDeqtWinFromHr, int) DEFpropSetMeth(qqueue, iDeqtWinToHr, int) diff --git a/runtime/queue.h b/runtime/queue.h index a267862d..07f134aa 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -73,6 +73,7 @@ typedef struct queue_s { void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ + int bSyncQueueFiles;/* if working with files, sync them after each write? */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ @@ -186,6 +187,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int); PROTOTYPEpropSetMeth(qqueue, toQShutdown, long); diff --git a/tests/Makefile.am b/tests/Makefile.am index a95139f2..3d555fc3 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,6 +5,7 @@ TESTS = $(TESTRUNS) cfg.sh \ validation-run.sh \ imtcp-multiport.sh \ diskqueue.sh \ + diskqueue-fsync.sh \ manytcp.sh \ queue-persist.sh @@ -59,6 +60,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ fieldtest.sh \ diskqueue.sh \ testsuites/diskqueue.conf \ + diskqueue-fsync.sh \ + testsuites/diskqueue-fsync.conf \ imtcp-multiport.sh \ testsuites/imtcp-multiport.conf \ manytcp.sh \ diff --git a/tests/diskqueue-fsync.sh b/tests/diskqueue-fsync.sh new file mode 100755 index 00000000..f558b491 --- /dev/null +++ b/tests/diskqueue-fsync.sh @@ -0,0 +1,15 @@ +# Test for disk-only queue mode (with fsync for queue files) +# This test checks if queue files can be correctly written +# and read back, but it does not test the transition from +# memory to disk mode for DA queues. +# added 2009-06-09 by Rgerhards +# This file is part of the rsyslog project, released under GPLv3 +# uncomment for debugging support: +echo testing queue disk-only mode, fsync case +source $srcdir/diag.sh init +source $srcdir/diag.sh startup diskqueue-fsync.conf +# 5000 messages should be enough - the disk fsync test is very slow! +source $srcdir/diag.sh tcpflood 127.0.0.1 13514 1 5000 +source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +source $srcdir/diag.sh seq-check 0 4999 +source $srcdir/diag.sh exit diff --git a/tests/testsuites/diskqueue-fsync.conf b/tests/testsuites/diskqueue-fsync.conf new file mode 100644 index 00000000..0a02c6ce --- /dev/null +++ b/tests/testsuites/diskqueue-fsync.conf @@ -0,0 +1,17 @@ +# Test for queue disk mode (see .sh file for details) +# rgerhards, 2009-04-17 +$IncludeConfig diag-common.conf + +$ModLoad ../plugins/imtcp/.libs/imtcp +$InputTCPServerRun 13514 + +# set spool locations and switch queue to disk-only mode +$WorkDirectory test-spool +$MainMsgQueueSyncQueueFiles on +$MainMsgQueueTimeoutShutdown 10000 +$MainMsgQueueFilename mainq +$MainMsgQueueType disk + +$template outfmt,"%msg:F,58:2%\n" +$template dynfile,"rsyslog.out.log" # trick to use relative path names! +:msg, contains, "msgnum:" ?dynfile;outfmt diff --git a/tools/syslogd.c b/tools/syslogd.c index 206b79e8..19ece349 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -297,6 +297,7 @@ static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main m static uchar *pszMainMsgQFName = NULL; /* prefix for the main message queue file */ static int64 iMainMsgQueMaxFileSize = 1024*1024; static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates */ +static int bMainMsgQSyncQeueFiles = 0; /* sync queue files on every write? */ static int iMainMsgQtoQShutdown = 0; /* queue shutdown */ static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */ static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */ @@ -359,6 +360,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iMainMsgQueMaxFileSize = 1024 * 1024; iMainMsgQueueNumWorkers = 1; iMainMsgQPersistUpdCnt = 0; + bMainMsgQSyncQeueFiles = 0; iMainMsgQtoQShutdown = 0; iMainMsgQtoActShutdown = 1000; iMainMsgQtoEnq = 2000; @@ -2715,6 +2717,7 @@ init(void) setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles); setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown); @@ -3084,6 +3087,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuediscardmark", 0, eCmdHdlrInt, NULL, &iMainMsgQDiscardMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuediscardseverity", 0, eCmdHdlrSeverity, NULL, &iMainMsgQDiscardSeverity, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iMainMsgQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bMainMsgQSyncQeueFiles, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoQShutdown, NULL)); |