diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/queue.c | 5 | ||||
-rw-r--r-- | runtime/queue.h | 2 |
2 files changed, 7 insertions, 0 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 3532a145..aa8e6c21 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -294,6 +294,7 @@ qqueueStartDA(qqueue_t *pThis) CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); + CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); @@ -817,6 +818,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) ; } else { CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite)); + CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles)); CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); @@ -824,6 +826,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite)); CHKiRet(strm.Construct(&pThis->tVars.disk.pRead)); + CHKiRet(strm.SetbSync(pThis->tVars.disk.pRead, pThis->bSyncQueueFiles)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); CHKiRet(strm.SetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); @@ -1924,6 +1927,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(strm.Construct(&psQIF)); CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC)); + CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); @@ -2279,6 +2283,7 @@ finalize_it: /* some simple object access methods */ +DEFpropSetMeth(qqueue, bSyncQueueFiles, int) DEFpropSetMeth(qqueue, iPersistUpdCnt, int) DEFpropSetMeth(qqueue, iDeqtWinFromHr, int) DEFpropSetMeth(qqueue, iDeqtWinToHr, int) diff --git a/runtime/queue.h b/runtime/queue.h index a267862d..07f134aa 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -73,6 +73,7 @@ typedef struct queue_s { void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ + int bSyncQueueFiles;/* if working with files, sync them after each write? */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ @@ -186,6 +187,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int); PROTOTYPEpropSetMeth(qqueue, toQShutdown, long); |