From 9704f129f72ec9ece11aeccea4bbf0cbccb116cb Mon Sep 17 00:00:00 2001
From: Rainer Gerhards
To create a disk queue, use the "$<object>QueueType Disk" config directive. Checkpoint intervals can be specified via "$<object>QueueCheckpointInterval", -with 0 meaning no checkpoints.
+with 0 meaning no checkpoints. Note that disk-based queues can be made very reliable +by issuing a (f)sync after each write operation. Starting with version 4.3.2, this can +be requested via "<object>QueueSyncQueueFiles on/off with the +default being off. Activating this option has a performance penalty, so it should +not be turned on without reason.In-memory queue mode is what most people have on their mind when they think about computing queues. Here, the enqueued data elements are held in memory. diff --git a/runtime/queue.c b/runtime/queue.c index 3532a145..aa8e6c21 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -294,6 +294,7 @@ qqueueStartDA(qqueue_t *pThis) CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); + CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); @@ -817,6 +818,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) ; } else { CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite)); + CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles)); CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); @@ -824,6 +826,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite)); CHKiRet(strm.Construct(&pThis->tVars.disk.pRead)); + CHKiRet(strm.SetbSync(pThis->tVars.disk.pRead, pThis->bSyncQueueFiles)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); CHKiRet(strm.SetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); @@ -1924,6 +1927,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(strm.Construct(&psQIF)); CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC)); + CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); @@ -2279,6 +2283,7 @@ finalize_it: /* some simple object access methods */ +DEFpropSetMeth(qqueue, bSyncQueueFiles, int) DEFpropSetMeth(qqueue, iPersistUpdCnt, int) DEFpropSetMeth(qqueue, iDeqtWinFromHr, int) DEFpropSetMeth(qqueue, iDeqtWinToHr, int) diff --git a/runtime/queue.h b/runtime/queue.h index a267862d..07f134aa 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -73,6 +73,7 @@ typedef struct queue_s { void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ + int bSyncQueueFiles;/* if working with files, sync them after each write? */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ @@ -186,6 +187,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int); PROTOTYPEpropSetMeth(qqueue, toQShutdown, long); diff --git a/tests/Makefile.am b/tests/Makefile.am index a95139f2..3d555fc3 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,6 +5,7 @@ TESTS = $(TESTRUNS) cfg.sh \ validation-run.sh \ imtcp-multiport.sh \ diskqueue.sh \ + diskqueue-fsync.sh \ manytcp.sh \ queue-persist.sh @@ -59,6 +60,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ fieldtest.sh \ diskqueue.sh \ testsuites/diskqueue.conf \ + diskqueue-fsync.sh \ + testsuites/diskqueue-fsync.conf \ imtcp-multiport.sh \ testsuites/imtcp-multiport.conf \ manytcp.sh \ diff --git a/tests/diskqueue-fsync.sh b/tests/diskqueue-fsync.sh new file mode 100755 index 00000000..f558b491 --- /dev/null +++ b/tests/diskqueue-fsync.sh @@ -0,0 +1,15 @@ +# Test for disk-only queue mode (with fsync for queue files) +# This test checks if queue files can be correctly written +# and read back, but it does not test the transition from +# memory to disk mode for DA queues. +# added 2009-06-09 by Rgerhards +# This file is part of the rsyslog project, released under GPLv3 +# uncomment for debugging support: +echo testing queue disk-only mode, fsync case +source $srcdir/diag.sh init +source $srcdir/diag.sh startup diskqueue-fsync.conf +# 5000 messages should be enough - the disk fsync test is very slow! +source $srcdir/diag.sh tcpflood 127.0.0.1 13514 1 5000 +source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +source $srcdir/diag.sh seq-check 0 4999 +source $srcdir/diag.sh exit diff --git a/tests/testsuites/diskqueue-fsync.conf b/tests/testsuites/diskqueue-fsync.conf new file mode 100644 index 00000000..0a02c6ce --- /dev/null +++ b/tests/testsuites/diskqueue-fsync.conf @@ -0,0 +1,17 @@ +# Test for queue disk mode (see .sh file for details) +# rgerhards, 2009-04-17 +$IncludeConfig diag-common.conf + +$ModLoad ../plugins/imtcp/.libs/imtcp +$InputTCPServerRun 13514 + +# set spool locations and switch queue to disk-only mode +$WorkDirectory test-spool +$MainMsgQueueSyncQueueFiles on +$MainMsgQueueTimeoutShutdown 10000 +$MainMsgQueueFilename mainq +$MainMsgQueueType disk + +$template outfmt,"%msg:F,58:2%\n" +$template dynfile,"rsyslog.out.log" # trick to use relative path names! +:msg, contains, "msgnum:" ?dynfile;outfmt diff --git a/tools/syslogd.c b/tools/syslogd.c index 206b79e8..19ece349 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -297,6 +297,7 @@ static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main m static uchar *pszMainMsgQFName = NULL; /* prefix for the main message queue file */ static int64 iMainMsgQueMaxFileSize = 1024*1024; static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates */ +static int bMainMsgQSyncQeueFiles = 0; /* sync queue files on every write? */ static int iMainMsgQtoQShutdown = 0; /* queue shutdown */ static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */ static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */ @@ -359,6 +360,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iMainMsgQueMaxFileSize = 1024 * 1024; iMainMsgQueueNumWorkers = 1; iMainMsgQPersistUpdCnt = 0; + bMainMsgQSyncQeueFiles = 0; iMainMsgQtoQShutdown = 0; iMainMsgQtoActShutdown = 1000; iMainMsgQtoEnq = 2000; @@ -2715,6 +2717,7 @@ init(void) setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles); setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown); @@ -3084,6 +3087,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuediscardmark", 0, eCmdHdlrInt, NULL, &iMainMsgQDiscardMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuediscardseverity", 0, eCmdHdlrSeverity, NULL, &iMainMsgQDiscardSeverity, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iMainMsgQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bMainMsgQSyncQeueFiles, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoQShutdown, NULL)); -- cgit