diff options
-rw-r--r-- | queue.c | 5 | ||||
-rw-r--r-- | stream.c | 116 | ||||
-rw-r--r-- | stream.h | 15 |
3 files changed, 113 insertions, 23 deletions
@@ -205,12 +205,14 @@ static rsRetVal qConstructDisk(queue_t *pThis) CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite)); CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, pszSpoolDirectory, strlen((char*)pszSpoolDirectory))); CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); CHKiRet(strmSetDir(pThis->tVars.disk.pRead, pszSpoolDirectory, strlen((char*)pszSpoolDirectory))); CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); finalize_it: @@ -240,10 +242,11 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) assert(pThis != NULL); - CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! + //CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! CHKiRet((objSerialize(pUsr))(pUsr, &pCStr)); CHKiRet(strmWrite(pThis->tVars.disk.pWrite, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr))); + CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); finalize_it: return iRet; @@ -60,11 +60,13 @@ DEFobjStaticHelpers * It is OK to call this function when the stream is already open. In that * case, it returns immediately with RS_RET_OK */ -rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode) +static rsRetVal strmOpenFile(strm_t *pThis) { DEFiRet; + int iFlags; assert(pThis != NULL); + assert(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE); if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); @@ -72,19 +74,21 @@ rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode) if(pThis->pszFilePrefix == NULL) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ - if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - pThis->iBufPtrMax = 0; /* results in immediate read request */ - } - CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, pThis->pszFilePrefix, pThis->lenFilePrefix, pThis->iCurrFNum, pThis->iFileNumDigits)); - pThis->fd = open((char*)pThis->pszCurrFName, flags, mode); // TODO: open modes! + /* compute which flags we need to provide to open */ + if(pThis->tOperationsMode == STREAMMODE_READ) + iFlags = O_RDONLY; + else + iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; + + pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); pThis->iCurrOffs = 0; - dbgprintf("Stream 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pThis->pszCurrFName, flags, pThis->fd); + dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis, + pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", + iFlags, pThis->fd); finalize_it: return iRet; @@ -102,6 +106,9 @@ static rsRetVal strmCloseFile(strm_t *pThis) assert(pThis != NULL); dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd); + if(pThis->tOperationsMode == STREAMMODE_WRITE) + strmFlush(pThis); + close(pThis->fd); // TODO: error check pThis->fd = -1; @@ -173,9 +180,10 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) bRun = 1; while(bRun) { /* first check if we need to (re)open the file (we may have switched to a new one!) */ - CHKiRet(strmOpenFile(pThis, O_RDONLY, 0600)); // TODO: open modes! + CHKiRet(strmOpenFile(pThis)); pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); - dbgprintf("strmReadChar read %d bytes from file %d\n", pThis->iBufPtrMax, pThis->fd); + dbgprintf("Stream 0x%lx: read %d bytes from file %d\n", (unsigned long) pThis, + pThis->iBufPtrMax, pThis->fd); if(pThis->iBufPtrMax == 0) { if(pThis->iMaxFiles == 0) ABORT_FINALIZE(RS_RET_EOF); @@ -269,15 +277,27 @@ BEGINobjConstruct(strm) pThis->iUngetC = -1; pThis->sType = STREAMTYPE_FILE; pThis->sIOBufSize = glblGetIOBufSize(); + pThis->tOpenMode = 0600; ENDobjConstruct(strm) -/* ConstructionFinalizer - currently provided just to comply to the interface - * definiton. -- rgerhards, 2008-01-09 +/* ConstructionFinalizer + * rgerhards, 2008-01-09 */ -rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis) +rsRetVal strmConstructFinalize(strm_t *pThis) { - return RS_RET_OK; + DEFiRet; + + assert(pThis != NULL); + + if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ + if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + pThis->iBufPtrMax = 0; /* results in immediate read request */ + } + +finalize_it: + return iRet; } @@ -301,20 +321,19 @@ rsRetVal strmDestruct(strm_t *pThis) return iRet; } - -/* write memory buffer to a stream object +/* write memory buffer to a stream object. */ -rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; int iWritten; -dbgprintf("strmWrite()\n"); +dbgprintf("strmWriteInternal()\n"); assert(pThis != NULL); assert(pBuf != NULL); if(pThis->fd == -1) - CHKiRet(strmOpenFile(pThis, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! + CHKiRet(strmOpenFile(pThis)); iWritten = write(pThis->fd, pBuf, lenBuf); dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis, @@ -332,12 +351,69 @@ finalize_it: return iRet; } +/* flush stream output buffer to persistent storage. This can be called at any time + * and is automatically called when the output buffer is full. + * rgerhards, 2008-01-10 + */ +rsRetVal strmFlush(strm_t *pThis) +{ + DEFiRet; + + dbgprintf("Stream 0x%lx: flush file %d, buflen %d\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); + assert(pThis != NULL); + + if(pThis->iBufPtr > 0) { + iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); + /* Now indicate buffer empty again. We do this in any case, because there + * is no way we could react more intelligently to an error during write. + * We have not used CHKiRet(), as that would have presented some sequence + * problems, which are not necessary to look at given what we do. + */ + pThis->iBufPtr = 0; + } + + return iRet; +} + + +/* write memory buffer to a stream object + */ +rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + +dbgprintf("strmWrite()\n"); + assert(pThis != NULL); + assert(pBuf != NULL); + + /* if the string does not fit into the current buffer, we flush that buffer and + * then do the write ourselfs. So even if we have data that is of multiple + * buffer lengths, we will write it with a single write operation. + * rgerhards, 2008-01-10 + */ + if(pThis->iBufPtr + lenBuf >= pThis->sIOBufSize) { +dbgprintf("strmWrite() uses direct write\n"); + CHKiRet(strmFlush(pThis)); + CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); + } else { +dbgprintf("strmWrite() uses buffered write\n"); + /* we have space, so we simply copy over the string */ + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); + pThis->iBufPtr += lenBuf; + } + +finalize_it: + return iRet; +} + /* property set methods */ /* simple ones first */ DEFpropSetMeth(strm, bDeleteOnClose, int) DEFpropSetMeth(strm, iMaxFileSize, int) DEFpropSetMeth(strm, iFileNumDigits, int) +DEFpropSetMeth(strm, tOperationsMode, int); +DEFpropSetMeth(strm, tOpenMode, mode_t); rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { @@ -52,6 +52,12 @@ typedef enum { STREAMTYPE_FILE = 0 } strmType_t; +typedef enum { + STREAMMMODE_INVALID = 0, + STREAMMODE_READ = 1, + STREAMMODE_WRITE = 2 +} strmMode_t; + /* The strm_t data structure */ typedef struct strm_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ @@ -63,8 +69,11 @@ typedef struct strm_s { int lenDir; uchar *pszFilePrefix; /* prefix for generated filenames */ int lenFilePrefix; + strmMode_t tOperationsMode; + mode_t tOpenMode; size_t iCurrOffs;/* current offset */ uchar *pIOBuf; /* io Buffer */ + size_t sIOBufSize;/* size of IO buffer */ int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ int iBufPtr; /* pointer into current buffer */ int iUngetC; /* char set via UngetChar() call or -1 if none set */ @@ -74,7 +83,6 @@ typedef struct strm_s { int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ int iMaxFiles; /* maximum number of files if a circular mode is in use */ int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */ - size_t sIOBufSize;/* size of IO buffer */ } strm_t; /* prototypes */ @@ -87,13 +95,16 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC); rsRetVal strmUnreadChar(strm_t *pThis, uchar c); rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); rsRetVal strmNextFile(strm_t *pThis); -rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode); +//rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode); rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir); +rsRetVal strmFlush(strm_t *pThis); PROTOTYPEObjClassInit(strm); PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int); PROTOTYPEpropSetMeth(strm, iMaxFileSize, int); PROTOTYPEpropSetMeth(strm, iMaxFiles, int); PROTOTYPEpropSetMeth(strm, iFileNumDigits, int); +PROTOTYPEpropSetMeth(strm, tOperationsMode, int); +PROTOTYPEpropSetMeth(strm, tOpenMode, mode_t); #endif /* #ifndef STREAM_H_INCLUDED */ |