From abdcea7d8f0eda058a6c6719cf24955878279d7c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sun, 13 Jan 2008 15:47:41 +0000 Subject: support for reading back persistet queue information completed --- stream.c | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 56 insertions(+), 8 deletions(-) (limited to 'stream.c') diff --git a/stream.c b/stream.c index 59067041..eb9fb960 100644 --- a/stream.c +++ b/stream.c @@ -70,6 +70,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); +dbgprintf("strmOpenFile actual open %p, iFileNumDigits: %d\n", pThis, pThis->iFileNumDigits); if(pThis->pszFName == NULL) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -85,12 +86,13 @@ static rsRetVal strmOpenFile(strm_t *pThis) if(pThis->tOperationsMode == STREAMMODE_READ) iFlags = O_RDONLY; else - iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; + //iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; + iFlags = O_WRONLY | O_CREAT; pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); if(pThis->fd == -1) { int ierrnoSave = errno; - dbgprintf("Stream 0x%lx: open error %d\n", (unsigned long) pThis, errno); + dbgprintf("Stream 0x%lx: open error %d, file '%s'\n", (unsigned long) pThis, errno, pThis->pszCurrFName); if(ierrnoSave == ENOENT) ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); else @@ -185,6 +187,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) /* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ *pC = pThis->iUngetC; + ++pThis->iCurrOffs; /* one more octet read */ pThis->iUngetC = -1; ABORT_FINALIZE(RS_RET_OK); } @@ -220,6 +223,8 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) } *pC = pThis->pIOBuf[pThis->iBufPtr++]; + ++pThis->iCurrOffs; /* one more octet read */ +//dbgprintf("ReadChar: read %c, offset %d\n", *pC, pThis->iCurrOffs); finalize_it: return iRet; @@ -235,6 +240,10 @@ rsRetVal strmUnreadChar(strm_t *pThis, uchar c) assert(pThis != NULL); assert(pThis->iUngetC == -1); pThis->iUngetC = c; + --pThis->iCurrOffs; /* one less octet read - NOTE: this can cause problems if we got a file change + and immediately do an unread and the file is on a buffer boundary and the stream is then persisted. + With the queue, this can not happen as an Unread is only done on record begin, which is never split + accross files. For other cases we accept the very remote risk. -- rgerhards, 2008-01-12 */ return RS_RET_OK; } @@ -402,6 +411,7 @@ finalize_it: return iRet; } + /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -413,7 +423,7 @@ rsRetVal strmFlush(strm_t *pThis) assert(pThis != NULL); dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); - if(pThis->iBufPtr > 0) { + if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); } @@ -421,6 +431,45 @@ rsRetVal strmFlush(strm_t *pThis) } +/* seek a stream to a specific location. Pending writes are flushed, read data + * is invalidated. + * rgerhards, 2008-01-12 + */ +static rsRetVal strmSeek(strm_t *pThis, off_t offs) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + if(pThis->fd == -1) + strmOpenFile(pThis); + else + strmFlush(pThis); + int i; + dbgprintf("Stream 0x%lx: seek file %d, pos %ld\n", (unsigned long) pThis, pThis->fd, offs); + i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error! +dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i); + pThis->iCurrOffs = offs; /* we are now at *this* offset */ + pThis->iBufPtr = 0; /* buffer invalidated */ + + return iRet; +} + + +/* seek to current offset. This is primarily a helper to readjust the OS file + * pointer after a strm object has been deserialized. + */ +rsRetVal strmSeekCurrOffs(strm_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + iRet = strmSeek(pThis, pThis->iCurrOffs); + return iRet; +} + + /* write a *single* character to a stream object -- rgerhards, 2008-01-10 */ rsRetVal strmWriteChar(strm_t *pThis, uchar c) @@ -514,6 +563,7 @@ rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { pThis->iMaxFiles = iNewVal; pThis->iFileNumDigits = getNumberDigits(iNewVal); +dbgprintf("strmSetiMaxFiles %p val %d, digits %d\n", pThis, iNewVal, pThis->iFileNumDigits); return RS_RET_OK; } @@ -633,14 +683,12 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) ISOBJ_TYPE_assert(pThis, strm); ISOBJ_TYPE_assert(pStrm, strm); -dbgprintf("strmSerialize 1\n"); + strmFlush(pThis); CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis)); -dbgprintf("strmSerialize 2\n"); objSerializeSCALAR(pStrm, iCurrFNum, INT); objSerializePTR(pStrm, pszFName, PSZ); objSerializeSCALAR(pStrm, iMaxFiles, INT); - objSerializeSCALAR(pStrm, iFileNumDigits, INT); objSerializeSCALAR(pStrm, bDeleteOnClose, INT); i = pThis->sType; @@ -662,11 +710,11 @@ dbgprintf("strmSerialize 2\n"); CHKiRet(objEndSerialize(pStrm)); finalize_it: -dbgprintf("strmSerialize out %d\n", iRet); return iRet; } +#include "stringbuf.h" /* This function can be used as a generic way to set properties. * rgerhards, 2008-01-11 @@ -698,7 +746,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp) } else if(isProp("iFileNumDigits")) { CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.vInt)); } else if(isProp("bDeleteOnClose")) { - CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.vInt)); + CHKiRet(strmSetbDeleteOnClose(pThis, pProp->val.vInt)); } finalize_it: -- cgit