summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--queue.c5
-rw-r--r--stream.c116
-rw-r--r--stream.h15
3 files changed, 113 insertions, 23 deletions
diff --git a/queue.c b/queue.c
index 272161a9..6ff4358f 100644
--- a/queue.c
+++ b/queue.c
@@ -205,12 +205,14 @@ static rsRetVal qConstructDisk(queue_t *pThis)
CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite));
CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, pszSpoolDirectory, strlen((char*)pszSpoolDirectory)));
CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
CHKiRet(strmSetDir(pThis->tVars.disk.pRead, pszSpoolDirectory, strlen((char*)pszSpoolDirectory)));
CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
finalize_it:
@@ -240,10 +242,11 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
assert(pThis != NULL);
- CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
+ //CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
CHKiRet((objSerialize(pUsr))(pUsr, &pCStr));
CHKiRet(strmWrite(pThis->tVars.disk.pWrite, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr)));
+ CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
finalize_it:
return iRet;
diff --git a/stream.c b/stream.c
index 03431439..89e6e70d 100644
--- a/stream.c
+++ b/stream.c
@@ -60,11 +60,13 @@ DEFobjStaticHelpers
* It is OK to call this function when the stream is already open. In that
* case, it returns immediately with RS_RET_OK
*/
-rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode)
+static rsRetVal strmOpenFile(strm_t *pThis)
{
DEFiRet;
+ int iFlags;
assert(pThis != NULL);
+ assert(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE);
if(pThis->fd != -1)
ABORT_FINALIZE(RS_RET_OK);
@@ -72,19 +74,21 @@ rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode)
if(pThis->pszFilePrefix == NULL)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
- if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */
- if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- pThis->iBufPtrMax = 0; /* results in immediate read request */
- }
-
CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
pThis->pszFilePrefix, pThis->lenFilePrefix, pThis->iCurrFNum, pThis->iFileNumDigits));
- pThis->fd = open((char*)pThis->pszCurrFName, flags, mode); // TODO: open modes!
+ /* compute which flags we need to provide to open */
+ if(pThis->tOperationsMode == STREAMMODE_READ)
+ iFlags = O_RDONLY;
+ else
+ iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
+
+ pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
pThis->iCurrOffs = 0;
- dbgprintf("Stream 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pThis->pszCurrFName, flags, pThis->fd);
+ dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis,
+ pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE",
+ iFlags, pThis->fd);
finalize_it:
return iRet;
@@ -102,6 +106,9 @@ static rsRetVal strmCloseFile(strm_t *pThis)
assert(pThis != NULL);
dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd);
+ if(pThis->tOperationsMode == STREAMMODE_WRITE)
+ strmFlush(pThis);
+
close(pThis->fd); // TODO: error check
pThis->fd = -1;
@@ -173,9 +180,10 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
bRun = 1;
while(bRun) {
/* first check if we need to (re)open the file (we may have switched to a new one!) */
- CHKiRet(strmOpenFile(pThis, O_RDONLY, 0600)); // TODO: open modes!
+ CHKiRet(strmOpenFile(pThis));
pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
- dbgprintf("strmReadChar read %d bytes from file %d\n", pThis->iBufPtrMax, pThis->fd);
+ dbgprintf("Stream 0x%lx: read %d bytes from file %d\n", (unsigned long) pThis,
+ pThis->iBufPtrMax, pThis->fd);
if(pThis->iBufPtrMax == 0) {
if(pThis->iMaxFiles == 0)
ABORT_FINALIZE(RS_RET_EOF);
@@ -269,15 +277,27 @@ BEGINobjConstruct(strm)
pThis->iUngetC = -1;
pThis->sType = STREAMTYPE_FILE;
pThis->sIOBufSize = glblGetIOBufSize();
+ pThis->tOpenMode = 0600;
ENDobjConstruct(strm)
-/* ConstructionFinalizer - currently provided just to comply to the interface
- * definiton. -- rgerhards, 2008-01-09
+/* ConstructionFinalizer
+ * rgerhards, 2008-01-09
*/
-rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis)
+rsRetVal strmConstructFinalize(strm_t *pThis)
{
- return RS_RET_OK;
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */
+ if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ pThis->iBufPtrMax = 0; /* results in immediate read request */
+ }
+
+finalize_it:
+ return iRet;
}
@@ -301,20 +321,19 @@ rsRetVal strmDestruct(strm_t *pThis)
return iRet;
}
-
-/* write memory buffer to a stream object
+/* write memory buffer to a stream object.
*/
-rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
int iWritten;
-dbgprintf("strmWrite()\n");
+dbgprintf("strmWriteInternal()\n");
assert(pThis != NULL);
assert(pBuf != NULL);
if(pThis->fd == -1)
- CHKiRet(strmOpenFile(pThis, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
+ CHKiRet(strmOpenFile(pThis));
iWritten = write(pThis->fd, pBuf, lenBuf);
dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis,
@@ -332,12 +351,69 @@ finalize_it:
return iRet;
}
+/* flush stream output buffer to persistent storage. This can be called at any time
+ * and is automatically called when the output buffer is full.
+ * rgerhards, 2008-01-10
+ */
+rsRetVal strmFlush(strm_t *pThis)
+{
+ DEFiRet;
+
+ dbgprintf("Stream 0x%lx: flush file %d, buflen %d\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
+ assert(pThis != NULL);
+
+ if(pThis->iBufPtr > 0) {
+ iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ /* Now indicate buffer empty again. We do this in any case, because there
+ * is no way we could react more intelligently to an error during write.
+ * We have not used CHKiRet(), as that would have presented some sequence
+ * problems, which are not necessary to look at given what we do.
+ */
+ pThis->iBufPtr = 0;
+ }
+
+ return iRet;
+}
+
+
+/* write memory buffer to a stream object
+ */
+rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+dbgprintf("strmWrite()\n");
+ assert(pThis != NULL);
+ assert(pBuf != NULL);
+
+ /* if the string does not fit into the current buffer, we flush that buffer and
+ * then do the write ourselfs. So even if we have data that is of multiple
+ * buffer lengths, we will write it with a single write operation.
+ * rgerhards, 2008-01-10
+ */
+ if(pThis->iBufPtr + lenBuf >= pThis->sIOBufSize) {
+dbgprintf("strmWrite() uses direct write\n");
+ CHKiRet(strmFlush(pThis));
+ CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf));
+ } else {
+dbgprintf("strmWrite() uses buffered write\n");
+ /* we have space, so we simply copy over the string */
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
+ pThis->iBufPtr += lenBuf;
+ }
+
+finalize_it:
+ return iRet;
+}
+
/* property set methods */
/* simple ones first */
DEFpropSetMeth(strm, bDeleteOnClose, int)
DEFpropSetMeth(strm, iMaxFileSize, int)
DEFpropSetMeth(strm, iFileNumDigits, int)
+DEFpropSetMeth(strm, tOperationsMode, int);
+DEFpropSetMeth(strm, tOpenMode, mode_t);
rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
{
diff --git a/stream.h b/stream.h
index 1c6ad44a..2bb38e99 100644
--- a/stream.h
+++ b/stream.h
@@ -52,6 +52,12 @@ typedef enum {
STREAMTYPE_FILE = 0
} strmType_t;
+typedef enum {
+ STREAMMMODE_INVALID = 0,
+ STREAMMODE_READ = 1,
+ STREAMMODE_WRITE = 2
+} strmMode_t;
+
/* The strm_t data structure */
typedef struct strm_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
@@ -63,8 +69,11 @@ typedef struct strm_s {
int lenDir;
uchar *pszFilePrefix; /* prefix for generated filenames */
int lenFilePrefix;
+ strmMode_t tOperationsMode;
+ mode_t tOpenMode;
size_t iCurrOffs;/* current offset */
uchar *pIOBuf; /* io Buffer */
+ size_t sIOBufSize;/* size of IO buffer */
int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
int iBufPtr; /* pointer into current buffer */
int iUngetC; /* char set via UngetChar() call or -1 if none set */
@@ -74,7 +83,6 @@ typedef struct strm_s {
int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
int iMaxFiles; /* maximum number of files if a circular mode is in use */
int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
- size_t sIOBufSize;/* size of IO buffer */
} strm_t;
/* prototypes */
@@ -87,13 +95,16 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC);
rsRetVal strmUnreadChar(strm_t *pThis, uchar c);
rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
rsRetVal strmNextFile(strm_t *pThis);
-rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode);
+//rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode);
rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir);
+rsRetVal strmFlush(strm_t *pThis);
PROTOTYPEObjClassInit(strm);
PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
PROTOTYPEpropSetMeth(strm, iMaxFileSize, int);
PROTOTYPEpropSetMeth(strm, iMaxFiles, int);
PROTOTYPEpropSetMeth(strm, iFileNumDigits, int);
+PROTOTYPEpropSetMeth(strm, tOperationsMode, int);
+PROTOTYPEpropSetMeth(strm, tOpenMode, mode_t);
#endif /* #ifndef STREAM_H_INCLUDED */