From abdcea7d8f0eda058a6c6719cf24955878279d7c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sun, 13 Jan 2008 15:47:41 +0000 Subject: support for reading back persistet queue information completed --- queue.c | 253 +++++++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 154 insertions(+), 99 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index b31173d9..d9a3f130 100644 --- a/queue.c +++ b/queue.c @@ -191,6 +191,104 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) /* -------------------- disk -------------------- */ + +/* This method checks if there is any persistent information on the + * queue. + */ +#if 0 +static rsRetVal +queueTryLoadPersistedInfo(queue_t *pThis) +{ + DEFiRet; + strm_t *psQIF = NULL; + uchar pszQIFNam[MAXFNAME]; + size_t lenQIFNam; + AsPropBagstruct stat stat_buf; +} +#endif + + +static rsRetVal +queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis) +{ + DEFiRet; + ISOBJ_TYPE_assert(pStrm, strm); + ISOBJ_TYPE_assert(pThis, queue); + CHKiRet(strmSetDir(pStrm, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); +finalize_it: + return iRet; +} + + +/* The method loads the persistent queue information. + * rgerhards, 2008-01-11 + */ +static rsRetVal +queueTryLoadPersistedInfo(queue_t *pThis) +{ + DEFiRet; + strm_t *psQIF = NULL; + uchar pszQIFNam[MAXFNAME]; + size_t lenQIFNam; + struct stat stat_buf; + + ISOBJ_TYPE_assert(pThis, queue); + + /* Construct file name */ + lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", + (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); + + /* check if the file exists */ +dbgprintf("stat '%s'\n", pszQIFNam); + if(stat((char*) pszQIFNam, &stat_buf) == -1) { + if(errno == ENOENT) { + dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis)); + ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + } else { + dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + } + + /* If we reach this point, we have a .qi file */ + + CHKiRet(strmConstruct(&psQIF)); + CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ)); + CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strmConstructFinalize(psQIF)); + + /* first, we try to read the property bag for ourselfs */ + CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF)); + + /* and now the stream objects (some order as when persisted!) */ + CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF, + (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + CHKiRet(objDeserialize(&pThis->tVars.disk.pRead, OBJstrm, psQIF, + (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead)); + + /* OK, we could successfully read the file, so we now can request that it be + * deleted when we are done with the persisted information. + */ + pThis->bNeedDelQIF = 1; + +finalize_it: + if(psQIF != NULL) + strmDestruct(psQIF); + + if(iRet != RS_RET_OK) { + dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n", + queueGetID(pThis), iRet); + } + + return iRet; +} + + /* disk queue constructor. * Note that we use a file limit of 10,000,000 files. That number should never pose a * problem. If so, I guess the user has a design issue... But of course, the code can @@ -201,23 +299,48 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) static rsRetVal qConstructDisk(queue_t *pThis) { DEFiRet; + int bRestarted = 0; assert(pThis != NULL); - CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite)); - CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); - CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); - CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); - - CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); - CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); - CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); - CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); + /* and now check if there is some persistent information that needs to be read in */ + iRet = queueTryLoadPersistedInfo(pThis); + if(iRet == RS_RET_OK) + bRestarted = 1; + else if(iRet != RS_RET_FILE_NOT_FOUND) + FINALIZE; + +dbgprintf("qConstructDisk: bRestarted %d, iRet %d\n", bRestarted, iRet); + if(bRestarted == 1) { + ; + } else { + CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite)); + CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); + CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); + + CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); + CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); + + + CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix)); + } + + /* now we set (and overwrite in case of a persisted restart) some parameters which + * should always reflect the current configuration variables. Be careful by doing so, + * for example file name generation must not be changed as that would break the + * ability to read existing queue files. -- rgerhards, 2008-01-12 + */ +CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); +CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); finalize_it: return iRet; @@ -254,7 +377,7 @@ finalize_it: static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) { - return objDeserialize(ppUsr, OBJMsg, pThis->tVars.disk.pRead); + return objDeserialize(ppUsr, OBJMsg, pThis->tVars.disk.pRead, NULL, NULL); } /* -------------------- direct (no queueing) -------------------- */ @@ -480,67 +603,6 @@ queueWorker(void *arg) } -/* This method checks if there is any persistent information on the - * queue and, if so, tries to load it. This method can only legally be - * called from the destructor (I moved it out from there to keep the - * Constructor code somewhat smaller). -- rgerhards, 2008-01-11 - */ -static rsRetVal -queueTryLoadPersistedInfo(queue_t *pThis) -{ - DEFiRet; - strm_t *psQIF = NULL; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; - struct stat stat_buf; - - ISOBJ_TYPE_assert(pThis, queue); - - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); - - /* check if the file exists */ -dbgprintf("stat '%s'\n", pszQIFNam); - if(stat((char*) pszQIFNam, &stat_buf) == -1) { - if(errno == ENOENT) { - dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis)); - ABORT_FINALIZE(RS_RET_OK); - } else { - dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno); - ABORT_FINALIZE(RS_RET_IO_ERROR); - } - } - - /* If we reach this point, we have a .qi file */ - - CHKiRet(strmConstruct(&psQIF)); - CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); - CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ)); - CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); - CHKiRet(strmConstructFinalize(psQIF)); - - /* first, we try to read the property bag for ourselfs */ - CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF)); - - /* and now the stream objects (some order as when persisted!) */ - CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pWrite, psQIF)); - CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pRead, psQIF)); - -finalize_it: - if(psQIF != NULL) - strmDestruct(psQIF); - - if(iRet != RS_RET_OK) { - dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n", - queueGetID(pThis), iRet); - } - - return iRet; -} - - /* Constructor for the queue object * This constructs the data structure, but does not yet start the queue. That * is done by queueStart(). The reason is that we want to give the caller a chance @@ -611,9 +673,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, break; } - /* call type-specific constructor */ - CHKiRet(pThis->qConstruct(pThis)); - finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP return iRet; @@ -631,8 +690,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ assert(pThis != NULL); - /* and now check if there is some persistent information that needs to be read in */ - CHKiRet(queueTryLoadPersistedInfo(pThis)); + /* call type-specific constructor */ + CHKiRet(pThis->qConstruct(pThis)); dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType, pThis->iMaxFileSize); @@ -681,15 +740,12 @@ finalize_it: static rsRetVal queuePersist(queue_t *pThis) { DEFiRet; - strm_t *psQIF; /* Queue Info File */ + strm_t *psQIF = NULL;; /* Queue Info File */ uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; int i; assert(pThis != NULL); - if(pThis->iQueueSize == 0) - FINALIZE; /* nothing left to do, so be happy */ - if(pThis->qType != QUEUETYPE_DISK) ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */ @@ -697,6 +753,14 @@ static rsRetVal queuePersist(queue_t *pThis) /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); + if(pThis->iQueueSize == 0) { + if(pThis->bNeedDelQIF) { + unlink((char*)pszQIFNam); + pThis->bNeedDelQIF = 0; + } + FINALIZE; /* nothing left to do, so be happy */ + } + CHKiRet(strmConstruct(&psQIF)); CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE)); @@ -715,23 +779,20 @@ static rsRetVal queuePersist(queue_t *pThis) objSerializeSCALAR_VAR(psQIF, qType, INT, i); objSerializeSCALAR(psQIF, iQueueSize, INT); CHKiRet(objEndSerialize(psQIF)); -dbgprintf("queue serial 1\n"); /* this is disk specific and must be moved to a function */ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); -dbgprintf("queue serial 2\n"); /* persist queue object itself */ - /* ready with the queue, now call driver to persist queue data */ - //iRet = ; - - /* the the input file object that it must not delete the file on close */ + /* tell the input file object that it must not delete the file on close */ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); finalize_it: - strmDestruct(psQIF); + if(psQIF != NULL) + strmDestruct(psQIF); + return iRet; } @@ -794,11 +855,7 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); pThis->lenFilePrefix = iLenPrefix; - - if(pThis->qType == QUEUETYPE_DISK) { - CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix)); - CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix)); - } + finalize_it: return iRet; } @@ -818,8 +875,6 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) } pThis->iMaxFileSize = iMaxFileSize; - if(pThis->qType == QUEUETYPE_DISK) - CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, iMaxFileSize)); finalize_it: return iRet; -- cgit