summaryrefslogtreecommitdiffstats
path: root/stream.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-10 09:08:13 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-10 09:08:13 +0000
commit79040541b70e95f0f00add4c9cafa08e9c411d79 (patch)
tree0092425a37b05c5035a3529d5441eae7b42a54ad /stream.c
parent24c125cfc3032e6269e6e5de91c72c91508adde0 (diff)
downloadrsyslog-79040541b70e95f0f00add4c9cafa08e9c411d79.tar.gz
rsyslog-79040541b70e95f0f00add4c9cafa08e9c411d79.tar.xz
rsyslog-79040541b70e95f0f00add4c9cafa08e9c411d79.zip
added buffered output to stream class
Diffstat (limited to 'stream.c')
-rw-r--r--stream.c116
1 files changed, 96 insertions, 20 deletions
diff --git a/stream.c b/stream.c
index 03431439..89e6e70d 100644
--- a/stream.c
+++ b/stream.c
@@ -60,11 +60,13 @@ DEFobjStaticHelpers
* It is OK to call this function when the stream is already open. In that
* case, it returns immediately with RS_RET_OK
*/
-rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode)
+static rsRetVal strmOpenFile(strm_t *pThis)
{
DEFiRet;
+ int iFlags;
assert(pThis != NULL);
+ assert(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE);
if(pThis->fd != -1)
ABORT_FINALIZE(RS_RET_OK);
@@ -72,19 +74,21 @@ rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode)
if(pThis->pszFilePrefix == NULL)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
- if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */
- if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- pThis->iBufPtrMax = 0; /* results in immediate read request */
- }
-
CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
pThis->pszFilePrefix, pThis->lenFilePrefix, pThis->iCurrFNum, pThis->iFileNumDigits));
- pThis->fd = open((char*)pThis->pszCurrFName, flags, mode); // TODO: open modes!
+ /* compute which flags we need to provide to open */
+ if(pThis->tOperationsMode == STREAMMODE_READ)
+ iFlags = O_RDONLY;
+ else
+ iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
+
+ pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
pThis->iCurrOffs = 0;
- dbgprintf("Stream 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pThis->pszCurrFName, flags, pThis->fd);
+ dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis,
+ pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE",
+ iFlags, pThis->fd);
finalize_it:
return iRet;
@@ -102,6 +106,9 @@ static rsRetVal strmCloseFile(strm_t *pThis)
assert(pThis != NULL);
dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd);
+ if(pThis->tOperationsMode == STREAMMODE_WRITE)
+ strmFlush(pThis);
+
close(pThis->fd); // TODO: error check
pThis->fd = -1;
@@ -173,9 +180,10 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
bRun = 1;
while(bRun) {
/* first check if we need to (re)open the file (we may have switched to a new one!) */
- CHKiRet(strmOpenFile(pThis, O_RDONLY, 0600)); // TODO: open modes!
+ CHKiRet(strmOpenFile(pThis));
pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
- dbgprintf("strmReadChar read %d bytes from file %d\n", pThis->iBufPtrMax, pThis->fd);
+ dbgprintf("Stream 0x%lx: read %d bytes from file %d\n", (unsigned long) pThis,
+ pThis->iBufPtrMax, pThis->fd);
if(pThis->iBufPtrMax == 0) {
if(pThis->iMaxFiles == 0)
ABORT_FINALIZE(RS_RET_EOF);
@@ -269,15 +277,27 @@ BEGINobjConstruct(strm)
pThis->iUngetC = -1;
pThis->sType = STREAMTYPE_FILE;
pThis->sIOBufSize = glblGetIOBufSize();
+ pThis->tOpenMode = 0600;
ENDobjConstruct(strm)
-/* ConstructionFinalizer - currently provided just to comply to the interface
- * definiton. -- rgerhards, 2008-01-09
+/* ConstructionFinalizer
+ * rgerhards, 2008-01-09
*/
-rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis)
+rsRetVal strmConstructFinalize(strm_t *pThis)
{
- return RS_RET_OK;
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */
+ if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ pThis->iBufPtrMax = 0; /* results in immediate read request */
+ }
+
+finalize_it:
+ return iRet;
}
@@ -301,20 +321,19 @@ rsRetVal strmDestruct(strm_t *pThis)
return iRet;
}
-
-/* write memory buffer to a stream object
+/* write memory buffer to a stream object.
*/
-rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
int iWritten;
-dbgprintf("strmWrite()\n");
+dbgprintf("strmWriteInternal()\n");
assert(pThis != NULL);
assert(pBuf != NULL);
if(pThis->fd == -1)
- CHKiRet(strmOpenFile(pThis, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
+ CHKiRet(strmOpenFile(pThis));
iWritten = write(pThis->fd, pBuf, lenBuf);
dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis,
@@ -332,12 +351,69 @@ finalize_it:
return iRet;
}
+/* flush stream output buffer to persistent storage. This can be called at any time
+ * and is automatically called when the output buffer is full.
+ * rgerhards, 2008-01-10
+ */
+rsRetVal strmFlush(strm_t *pThis)
+{
+ DEFiRet;
+
+ dbgprintf("Stream 0x%lx: flush file %d, buflen %d\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
+ assert(pThis != NULL);
+
+ if(pThis->iBufPtr > 0) {
+ iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ /* Now indicate buffer empty again. We do this in any case, because there
+ * is no way we could react more intelligently to an error during write.
+ * We have not used CHKiRet(), as that would have presented some sequence
+ * problems, which are not necessary to look at given what we do.
+ */
+ pThis->iBufPtr = 0;
+ }
+
+ return iRet;
+}
+
+
+/* write memory buffer to a stream object
+ */
+rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+dbgprintf("strmWrite()\n");
+ assert(pThis != NULL);
+ assert(pBuf != NULL);
+
+ /* if the string does not fit into the current buffer, we flush that buffer and
+ * then do the write ourselfs. So even if we have data that is of multiple
+ * buffer lengths, we will write it with a single write operation.
+ * rgerhards, 2008-01-10
+ */
+ if(pThis->iBufPtr + lenBuf >= pThis->sIOBufSize) {
+dbgprintf("strmWrite() uses direct write\n");
+ CHKiRet(strmFlush(pThis));
+ CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf));
+ } else {
+dbgprintf("strmWrite() uses buffered write\n");
+ /* we have space, so we simply copy over the string */
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
+ pThis->iBufPtr += lenBuf;
+ }
+
+finalize_it:
+ return iRet;
+}
+
/* property set methods */
/* simple ones first */
DEFpropSetMeth(strm, bDeleteOnClose, int)
DEFpropSetMeth(strm, iMaxFileSize, int)
DEFpropSetMeth(strm, iFileNumDigits, int)
+DEFpropSetMeth(strm, tOperationsMode, int);
+DEFpropSetMeth(strm, tOpenMode, mode_t);
rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
{