summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c90
1 files changed, 88 insertions, 2 deletions
diff --git a/queue.c b/queue.c
index b582fb13..0d59a068 100644
--- a/queue.c
+++ b/queue.c
@@ -28,6 +28,7 @@
*/
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
@@ -206,6 +207,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
@@ -213,6 +215,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
finalize_it:
@@ -584,6 +587,70 @@ finalize_it:
return iRet;
}
+
+#if 0
+/* persist disk status on disk. This is necessary if we run either
+ * a disk queue or in a disk assisted mode.
+ */
+static rsRetVal queuePersistDskFilInfo(queue_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+
+finalize_it:
+ return iRet;
+}
+#endif
+
+
+
+/* persist the queue to disk. If we have something to persist, we first
+ * save the information on the queue properties itself and then we call
+ * the queue-type specific drivers.
+ * rgerhards, 2008-01-10
+ */
+static rsRetVal queuePersist(queue_t *pThis)
+{
+ DEFiRet;
+ strm_t *psQIF; /* Queue Info File */
+ uchar pszQIFNam[MAXFNAME];
+ size_t lenQIFNam;
+
+ assert(pThis != NULL);
+ if(pThis->iQueueSize == 0)
+ FINALIZE; /* nothing left to do, so be happy */
+
+ dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize);
+ /* Construct file name */
+ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
+ (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+ CHKiRet(strmConstruct(&psQIF));
+ CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
+ CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
+ CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strmConstructFinalize(psQIF));
+
+ /* this is disk specific and must be moved to a function */
+ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
+ CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
+
+ /* persist queue object itself */
+
+ /* ready with the queue, now call driver to persist queue data */
+ //iRet = ;
+
+ /* the the input file object that it must not delete the file on close */
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+
+finalize_it:
+ strmDestruct(psQIF);
+ return iRet;
+}
+
+
/* destructor for the queue object */
rsRetVal queueDestruct(queue_t *pThis)
{
@@ -598,6 +665,13 @@ rsRetVal queueDestruct(queue_t *pThis)
pThis->pWrkThrds = NULL;
}
+ /* now check if we need to persist the queue */
+ if(pThis->bImmediateShutdown) {
+ CHKiRet_Hdlr(queuePersist(pThis)) {
+ dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet);
+ }
+ }
+
/* ... then free resources */
pthread_mutex_destroy(pThis->mut);
free(pThis->mut);
@@ -608,6 +682,9 @@ rsRetVal queueDestruct(queue_t *pThis)
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
+ if(pThis->pszFilePrefix != NULL)
+ free(pThis->pszFilePrefix);
+
/* and finally delete the queue objet itself */
free(pThis);
@@ -624,9 +701,18 @@ rsRetVal
queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
+
+ if(pThis->pszFilePrefix != NULL)
+ free(pThis->pszFilePrefix);
+
+ if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1);
+ pThis->lenFilePrefix = iLenPrefix;
+
if(pThis->qType == QUEUETYPE_DISK) {
- CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix));
- CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix));
}
finalize_it:
return iRet;