summaryrefslogtreecommitdiffstats
path: root/stream.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-10 13:09:43 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-10 13:09:43 +0000
commitfa859275c66afc639cd3d2ea8a74cfdc63be8b99 (patch)
tree81d3e957f53a7a8d5b4d973d693a026ae5d087ad /stream.c
parent8a861afefac51ab495e5ba7946fe4fd986f0dd3c (diff)
downloadrsyslog-fa859275c66afc639cd3d2ea8a74cfdc63be8b99.tar.gz
rsyslog-fa859275c66afc639cd3d2ea8a74cfdc63be8b99.tar.xz
rsyslog-fa859275c66afc639cd3d2ea8a74cfdc63be8b99.zip
- added write functions for several types to stream class
- changed objSerialize methods to work directly on the stream class
Diffstat (limited to 'stream.c')
-rw-r--r--stream.c132
1 files changed, 103 insertions, 29 deletions
diff --git a/stream.c b/stream.c
index 1ed46185..6f5e058a 100644
--- a/stream.c
+++ b/stream.c
@@ -104,6 +104,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
DEFiRet;
assert(pThis != NULL);
+ assert(pThis->fd != -1);
dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd);
if(pThis->tOperationsMode == STREAMMODE_WRITE)
@@ -121,6 +122,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
pThis->pszCurrFName = NULL;
}
+dbgprintf("exit strmCloseFile, fd: %d\n", pThis->fd);
return iRet;
}
@@ -128,12 +130,15 @@ static rsRetVal strmCloseFile(strm_t *pThis)
/* switch to next strm file
* This method must only be called if we are in a multi-file mode!
*/
-rsRetVal strmNextFile(strm_t *pThis)
+static rsRetVal
+strmNextFile(strm_t *pThis)
{
DEFiRet;
+dbgprintf("strmNextFile, old num %d\n", pThis->iCurrFNum);
assert(pThis != NULL);
assert(pThis->iMaxFiles != 0);
+ assert(pThis->fd != -1);
CHKiRet(strmCloseFile(pThis));
@@ -161,6 +166,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
{
DEFiRet;
int bRun;
+ long iLenRead;
assert(pThis != NULL);
assert(pC != NULL);
@@ -179,10 +185,10 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
while(bRun) {
/* first check if we need to (re)open the file (we may have switched to a new one!) */
CHKiRet(strmOpenFile(pThis));
- pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
- dbgprintf("Stream 0x%lx: read %d bytes from file %d\n", (unsigned long) pThis,
- pThis->iBufPtrMax, pThis->fd);
- if(pThis->iBufPtrMax == 0) {
+ iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
+ dbgprintf("Stream 0x%lx: read %ld bytes from file %d\n", (unsigned long) pThis,
+ iLenRead, pThis->fd);
+ if(iLenRead == 0) {
if(pThis->iMaxFiles == 0)
ABORT_FINALIZE(RS_RET_EOF);
else {
@@ -191,10 +197,12 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
dbgprintf("Stream 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->fd);
CHKiRet(strmNextFile(pThis));
}
- } else if(pThis->iBufPtrMax < 0)
+ } else if(iLenRead < 0)
ABORT_FINALIZE(RS_RET_IO_ERROR);
- else
+ else { /* good read */
+ pThis->iBufPtrMax = iLenRead;
bRun = 0; /* exit loop */
+ }
}
/* if we reach this point, we had a good read */
pThis->iBufPtr = 0;
@@ -317,11 +325,16 @@ rsRetVal strmDestruct(strm_t *pThis)
/* check if we need to open a new file (in output mode only).
* The decision is based on file size AND record delimition state.
+ * This method may also be called on a closed file, in which case
+ * it immediately returns.
*/
static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
{
DEFiRet;
+ if(pThis->fd == -1)
+ FINALIZE;
+
if(pThis->iCurrOffs >= pThis->iMaxFileSize) {
dbgprintf("Stream 0x%lx: max file size %ld reached for %d, now %ld - starting new file\n",
(unsigned long) pThis, (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
@@ -333,15 +346,19 @@ finalize_it:
}
/* write memory buffer to a stream object.
+ * To support direct writes of large objects, this method may be called
+ * with a buffer pointing to some region other than the stream buffer itself.
+ * However, in that case the stream buffer must be empty (strmFlush() has to
+ * be called before), because we would otherwise mess up with the sequence
+ * inside the stream. -- rgerhards, 2008-01-10
*/
static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
int iWritten;
-dbgprintf("strmWriteInternal()\n");
assert(pThis != NULL);
- assert(pBuf != NULL);
+ assert(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
if(pThis->fd == -1)
CHKiRet(strmOpenFile(pThis));
@@ -351,10 +368,21 @@ dbgprintf("strmWriteInternal()\n");
iWritten, errno, strerror(errno));
/* TODO: handle error case -- rgerhards, 2008-01-07 */
+ /* Now indicate buffer empty again. We do this in any case, because there
+ * is no way we could react more intelligently to an error during write.
+ * This MUST be done BEFORE strCheckNextOutputFile(), otherwise we have an
+ * endless loop. We reset the buffer pointer also in finalize_it - this is
+ * necessary if we run into problems. Not resetting it would again cause an
+ * endless loop. So it is better to loose some data (which also justifies
+ * duplicating that code, too...) -- rgerhards, 2008-01-10
+ */
+ pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
CHKiRet(strmCheckNextOutputFile(pThis));
finalize_it:
+ pThis->iBufPtr = 0; /* see comment above */
+
return iRet;
}
@@ -366,47 +394,90 @@ rsRetVal strmFlush(strm_t *pThis)
{
DEFiRet;
- dbgprintf("Stream 0x%lx: flush file %d, buflen %d\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
assert(pThis != NULL);
+ dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
if(pThis->iBufPtr > 0) {
iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
- /* Now indicate buffer empty again. We do this in any case, because there
- * is no way we could react more intelligently to an error during write.
- * We have not used CHKiRet(), as that would have presented some sequence
- * problems, which are not necessary to look at given what we do.
- */
- pThis->iBufPtr = 0;
}
return iRet;
}
+/* write a *single* character to a stream object -- rgerhards, 2008-01-10
+ */
+rsRetVal strmWriteChar(strm_t *pThis, uchar c)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ /* if the buffer is full, we need to flush before we can write */
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
+ CHKiRet(strmFlush(pThis));
+ }
+ /* we now always have space for one character, so we simply copy it */
+ *(pThis->pIOBuf + pThis->iBufPtr) = c;
+ pThis->iBufPtr++;
+
+finalize_it:
+ return iRet;
+}
+
+
+/* write an integer value (actually a long) to a stream object */
+rsRetVal strmWriteLong(strm_t *pThis, long i)
+{
+ DEFiRet;
+ uchar szBuf[32];
+
+ assert(pThis != NULL);
+
+ CHKiRet(srUtilItoA((char*)szBuf, sizeof(szBuf), i));
+ CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf)));
+
+finalize_it:
+ return iRet;
+}
+
+
/* write memory buffer to a stream object
*/
rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
+ size_t iPartial;
-dbgprintf("strmWrite()\n");
assert(pThis != NULL);
assert(pBuf != NULL);
- /* if the string does not fit into the current buffer, we flush that buffer and
- * then do the write ourselfs. So even if we have data that is of multiple
- * buffer lengths, we will write it with a single write operation.
- * rgerhards, 2008-01-10
- */
- if(pThis->iBufPtr + lenBuf >= pThis->sIOBufSize) {
-dbgprintf("strmWrite() uses direct write\n");
- CHKiRet(strmFlush(pThis));
+ /* check if the to-be-written data is larger than our buffer size */
+ if(lenBuf >= pThis->sIOBufSize) {
+ /* it is - so we do a direct write, that is most efficient.
+ * TODO: is it really? think about disk block sizes!
+ */
+ CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */
CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf));
} else {
-dbgprintf("strmWrite() uses buffered write\n");
- /* we have space, so we simply copy over the string */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
- pThis->iBufPtr += lenBuf;
+ /* data fits into a buffer - we just need to see if it
+ * fits into the current buffer...
+ */
+ if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) {
+ /* nope, so we must split it */
+ iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
+ if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial);
+ pThis->iBufPtr += iPartial;
+ }
+ CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial);
+ pThis->iBufPtr = lenBuf - iPartial;
+ } else {
+ /* we have space, so we simply copy over the string */
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
+ pThis->iBufPtr += lenBuf;
+ }
}
finalize_it:
@@ -511,6 +582,7 @@ rsRetVal strmRecordBegin(strm_t *pThis)
assert(pThis != NULL);
assert(pThis->bInRecord == 0);
pThis->bInRecord = 1;
+dbgprintf("strmRecordBegin set \n");
return RS_RET_OK;
}
@@ -520,8 +592,10 @@ rsRetVal strmRecordEnd(strm_t *pThis)
assert(pThis != NULL);
assert(pThis->bInRecord == 1);
+dbgprintf("strmRecordEnd in %d\n", iRet);
pThis->bInRecord = 0;
iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */
+dbgprintf("strmRecordEnd out %d\n", iRet);
return iRet;
}