From 8dad3997505f71e6e9962892f79d7b7dad0a89ce Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 11 Jan 2008 14:12:25 +0000 Subject: file stream objects are now persistet on immediate queue shutdown (queue itself is not yet fully persisted) --- obj-types.h | 25 ++++++++++++++++- obj.c | 8 +++--- obj.h | 10 +++++-- queue.c | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- queue.h | 3 ++- stream.c | 43 ++++++++++++++++++----------- stream.h | 10 ++++--- 7 files changed, 161 insertions(+), 28 deletions(-) diff --git a/obj-types.h b/obj-types.h index 6da56db0..ca7e1c67 100644 --- a/obj-types.h +++ b/obj-types.h @@ -83,12 +83,34 @@ typedef struct objInfo_s { typedef struct obj { /* the dummy struct that each derived class can be casted to */ objInfo_t *pObjInfo; +#ifndef NDEBUG /* this means if debug... */ + unsigned int iObjCooCKiE; /* must always be 0xBADEFEE for a valid object */ +#endif } obj_t; /* macros which must be gloablly-visible (because they are used during definition of * other objects. */ -#define BEGINobjInstance objInfo_t *pObjInfo +#ifndef NDEBUG /* this means if debug... */ +# define BEGINobjInstance \ + objInfo_t *pObjInfo; \ + unsigned int iObjCooCKiE; /* prevent name conflict, thus the strange name */ +# define ISOBJ_assert(pObj) \ + { \ + assert(pObj != NULL); \ + assert((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \ + } +# define ISOBJ_TYPE_assert(pObj, objType) \ + { \ + assert(pObj != NULL); \ + assert((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \ + assert(objGetObjID(pObj) == OBJ##objType); \ + } +#else /* non-debug mode, no checks but much faster */ +# define BEGINobjInstance objInfo_t *pObjInfo; +# define ISOBJ_TYPE_assert(pObj, objType) +# define ISOBJ_assert(pObj, objType) +#endif #define DEFpropSetMeth(obj, prop, dataType)\ rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\ @@ -135,6 +157,7 @@ finalize_it: \ if((pThis = (obj##_t *)calloc(1, sizeof(obj##_t))) == NULL) { \ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); \ } \ + objConstructSetObjInfo(pThis); \ \ obj##Initialize(pThis); \ \ diff --git a/obj.c b/obj.c index 938e1ead..97fb3373 100644 --- a/obj.c +++ b/obj.c @@ -125,7 +125,7 @@ static rsRetVal objSerializeHeader(strm_t *pStrm, obj_t *pObj) DEFiRet; assert(pStrm != NULL); - assert(pObj != NULL); + ISOBJ_assert(pObj); /* object cookie and serializer version (so far always 1) */ CHKiRet(strmWriteChar(pStrm, COOKIE_OBJLINE)); @@ -186,6 +186,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop assert(pStrm != NULL); assert(pszPropName != NULL); + dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr); /* if we have no user pointer, there is no need to write this property. * TODO: think if that's the righ point of view * rgerhards, 2008-01-06 @@ -194,6 +195,8 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop ABORT_FINALIZE(RS_RET_OK); } + /* TODO: use the stream functions for data conversion here - should be quicker */ + switch(propType) { case PROPTYPE_PSZ: pszBuf = (uchar*) pUsr; @@ -413,7 +416,6 @@ static rsRetVal objDeserializeHeader(objID_t *poID, int* poVers, strm_t *pStrm) *poVers = oVers; finalize_it: -dbgprintf("DeserializeHeader oid: %ld, vers: %ld, iRet: %d\n", ioID, oVers, iRet); return iRet; } @@ -632,7 +634,7 @@ rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo) assert(pInfo != NULL); if(oID < 1 || oID > OBJ_NUM_IDS) ABORT_FINALIZE(RS_RET_INVALID_OID); - + arrObjInfo[oID] = pInfo; finalize_it: diff --git a/obj.h b/obj.h index 31a08004..ae395a53 100644 --- a/obj.h +++ b/obj.h @@ -69,8 +69,14 @@ #define objGetName(pThis) (((obj_t*) (pThis))->pObjInfo->pszName) #define objGetObjID(pThis) (((obj_t*) (pThis))->pObjInfo->objID) #define objGetVersion(pThis) (((obj_t*) (pThis))->pObjInfo->iObjVers) -/* must be called in Constructor: */ -#define objConstructSetObjInfo(pThis) ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ; +/* the next macro MUST be called in Constructors: */ +#ifndef NDEBUG /* this means if debug... */ +# define objConstructSetObjInfo(pThis) \ + ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ; \ + ((obj_t*) (pThis))->iObjCooCKiE = 0xBADEFEE +#else +# define objConstructSetObjInfo(pThis) ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ +#endif #define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(pThis) #define objSerialize(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_SERIALIZE]) diff --git a/queue.c b/queue.c index b582fb13..0d59a068 100644 --- a/queue.c +++ b/queue.c @@ -28,6 +28,7 @@ */ #include "config.h" +#include #include #include #include @@ -206,6 +207,7 @@ static rsRetVal qConstructDisk(queue_t *pThis) CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); + CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); @@ -213,6 +215,7 @@ static rsRetVal qConstructDisk(queue_t *pThis) CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); finalize_it: @@ -584,6 +587,70 @@ finalize_it: return iRet; } + +#if 0 +/* persist disk status on disk. This is necessary if we run either + * a disk queue or in a disk assisted mode. + */ +static rsRetVal queuePersistDskFilInfo(queue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + +finalize_it: + return iRet; +} +#endif + + + +/* persist the queue to disk. If we have something to persist, we first + * save the information on the queue properties itself and then we call + * the queue-type specific drivers. + * rgerhards, 2008-01-10 + */ +static rsRetVal queuePersist(queue_t *pThis) +{ + DEFiRet; + strm_t *psQIF; /* Queue Info File */ + uchar pszQIFNam[MAXFNAME]; + size_t lenQIFNam; + + assert(pThis != NULL); + if(pThis->iQueueSize == 0) + FINALIZE; /* nothing left to do, so be happy */ + + dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize); + /* Construct file name */ + lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", + (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); + CHKiRet(strmConstruct(&psQIF)); + CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE)); + CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strmConstructFinalize(psQIF)); + + /* this is disk specific and must be moved to a function */ + CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); + CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); + + /* persist queue object itself */ + + /* ready with the queue, now call driver to persist queue data */ + //iRet = ; + + /* the the input file object that it must not delete the file on close */ + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); + +finalize_it: + strmDestruct(psQIF); + return iRet; +} + + /* destructor for the queue object */ rsRetVal queueDestruct(queue_t *pThis) { @@ -598,6 +665,13 @@ rsRetVal queueDestruct(queue_t *pThis) pThis->pWrkThrds = NULL; } + /* now check if we need to persist the queue */ + if(pThis->bImmediateShutdown) { + CHKiRet_Hdlr(queuePersist(pThis)) { + dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet); + } + } + /* ... then free resources */ pthread_mutex_destroy(pThis->mut); free(pThis->mut); @@ -608,6 +682,9 @@ rsRetVal queueDestruct(queue_t *pThis) /* type-specific destructor */ iRet = pThis->qDestruct(pThis); + if(pThis->pszFilePrefix != NULL) + free(pThis->pszFilePrefix); + /* and finally delete the queue objet itself */ free(pThis); @@ -624,9 +701,18 @@ rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; + + if(pThis->pszFilePrefix != NULL) + free(pThis->pszFilePrefix); + + if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); + pThis->lenFilePrefix = iLenPrefix; + if(pThis->qType == QUEUETYPE_DISK) { - CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix)); - CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix)); } finalize_it: return iRet; diff --git a/queue.h b/queue.h index 56a9daaa..56841faf 100644 --- a/queue.h +++ b/queue.h @@ -69,7 +69,7 @@ typedef enum { typedef struct qWrkThrd_s { pthread_t thrdID; /* thread ID */ - volatile qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ + qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ } qWrkThrd_t; /* type for queue worker threads */ /* the queue object */ @@ -128,5 +128,6 @@ rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*)); PROTOTYPEpropSetMeth(queue, bImmediateShutdown, int); +#define queueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ diff --git a/stream.c b/stream.c index 740ca9fb..7fb6aa74 100644 --- a/stream.c +++ b/stream.c @@ -47,7 +47,6 @@ /* static data */ DEFobjStaticHelpers - /* methods */ /* first, we define type-specific handlers. The provide a generic functionality, @@ -74,8 +73,13 @@ static rsRetVal strmOpenFile(strm_t *pThis) if(pThis->pszFName == NULL) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, - pThis->pszFName, pThis->lenFilePrefix, pThis->iCurrFNum, pThis->iFileNumDigits)); + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) { + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits)); + } else { + if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } /* compute which flags we need to provide to open */ if(pThis->tOperationsMode == STREAMMODE_READ) @@ -274,7 +278,7 @@ BEGINobjConstruct(strm) pThis->iCurrFNum = 1; pThis->fd = -1; pThis->iUngetC = -1; - pThis->sType = STREAMTYPE_FILE; + pThis->sType = STREAMTYPE_FILE_SINGLE; pThis->sIOBufSize = glblGetIOBufSize(); pThis->tOpenMode = 0600; ENDobjConstruct(strm) @@ -305,7 +309,10 @@ rsRetVal strmDestruct(strm_t *pThis) { DEFiRet; - assert(pThis != NULL); + ISOBJ_TYPE_assert(pThis, strm); + + if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) + strmFlush(pThis); /* ... then free resources */ if(pThis->fd != -1) @@ -362,8 +369,8 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = write(pThis->fd, pBuf, lenBuf); - dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis, - iWritten, errno, strerror(errno)); + dbgprintf("Stream 0x%lx: write wrote %d bytes to file %d, errno: %d, err %s\n", (unsigned long) pThis, + iWritten, pThis->fd, errno, strerror(errno)); /* TODO: handle error case -- rgerhards, 2008-01-07 */ /* Now indicate buffer empty again. We do this in any case, because there @@ -376,7 +383,9 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) */ pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; - CHKiRet(strmCheckNextOutputFile(pThis)); + + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) + CHKiRet(strmCheckNextOutputFile(pThis)); finalize_it: pThis->iBufPtr = 0; /* see comment above */ @@ -490,6 +499,7 @@ DEFpropSetMeth(strm, iMaxFileSize, int) DEFpropSetMeth(strm, iFileNumDigits, int) DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) +DEFpropSetMeth(strm, sType, strmType_t); rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { @@ -505,21 +515,21 @@ rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) * rgerhards, 2008-01-09 */ rsRetVal -strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix) +strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) { DEFiRet; assert(pThis != NULL); - assert(pszPrefix != NULL); + assert(pszName != NULL); - if(iLenPrefix < 1) + if(iLenName < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - if((pThis->pszFName = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) + if((pThis->pszFName = malloc(sizeof(uchar) * iLenName + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - memcpy(pThis->pszFName, pszPrefix, iLenPrefix + 1); /* always think about the \0! */ - pThis->lenFilePrefix = iLenPrefix; + memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */ + pThis->lenFName = iLenName; finalize_it: return iRet; @@ -616,6 +626,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis)); +dbgprintf("strmSerialize in %p\n", pThis); i = pThis->sType; objSerializeSCALAR_VAR(pStrm, sType, INT, i); objSerializeSCALAR(pStrm, iCurrFNum, INT); @@ -624,7 +635,8 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) objSerializeSCALAR_VAR(pStrm, tOperationsMode, INT, i); i = pThis->tOpenMode; objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i); - i = (long) pThis->iMaxFileSize; + l = (long) pThis->iMaxFileSize; +dbgprintf("max file size %ld, l: %ld\n", pThis->iMaxFileSize, l); objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l); objSerializeSCALAR(pStrm, iMaxFiles, INT); objSerializeSCALAR(pStrm, iFileNumDigits, INT); @@ -632,6 +644,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) CHKiRet(objEndSerialize(pStrm)); finalize_it: +dbgprintf("strmSerialize out, iret %d\n", iRet); return iRet; } diff --git a/stream.h b/stream.h index ac02e66c..b1b93355 100644 --- a/stream.h +++ b/stream.h @@ -49,7 +49,8 @@ /* stream types */ typedef enum { - STREAMTYPE_FILE = 0 + STREAMTYPE_FILE_SINGLE = 0, + STREAMTYPE_FILE_CIRCULAR = 1 } strmType_t; typedef enum { @@ -65,7 +66,7 @@ typedef struct strm_s { /* descriptive properties */ int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */ uchar *pszFName; /* prefix for generated filenames */ - int lenFilePrefix; + int lenFName; strmMode_t tOperationsMode; mode_t tOpenMode; size_t iMaxFileSize;/* maximum size a file may grow to */ @@ -91,13 +92,13 @@ rsRetVal strmConstruct(strm_t **ppThis); rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis); rsRetVal strmDestruct(strm_t *pThis); rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize); -rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); +rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName); rsRetVal strmReadChar(strm_t *pThis, uchar *pC); rsRetVal strmUnreadChar(strm_t *pThis, uchar c); rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); rsRetVal strmWriteChar(strm_t *pThis, uchar c); rsRetVal strmWriteLong(strm_t *pThis, long i); -rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); +rsRetVal strmSetFName(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir); rsRetVal strmFlush(strm_t *pThis); rsRetVal strmRecordBegin(strm_t *pThis); @@ -110,5 +111,6 @@ PROTOTYPEpropSetMeth(strm, iMaxFiles, int); PROTOTYPEpropSetMeth(strm, iFileNumDigits, int); PROTOTYPEpropSetMeth(strm, tOperationsMode, int); PROTOTYPEpropSetMeth(strm, tOpenMode, mode_t); +PROTOTYPEpropSetMeth(strm, sType, strmType_t); #endif /* #ifndef STREAM_H_INCLUDED */ -- cgit