diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-10 13:09:43 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-10 13:09:43 +0000 |
commit | fa859275c66afc639cd3d2ea8a74cfdc63be8b99 (patch) | |
tree | 81d3e957f53a7a8d5b4d973d693a026ae5d087ad /stream.c | |
parent | 8a861afefac51ab495e5ba7946fe4fd986f0dd3c (diff) | |
download | rsyslog-fa859275c66afc639cd3d2ea8a74cfdc63be8b99.tar.gz rsyslog-fa859275c66afc639cd3d2ea8a74cfdc63be8b99.tar.xz rsyslog-fa859275c66afc639cd3d2ea8a74cfdc63be8b99.zip |
- added write functions for several types to stream class
- changed objSerialize methods to work directly on the stream class
Diffstat (limited to 'stream.c')
-rw-r--r-- | stream.c | 132 |
1 files changed, 103 insertions, 29 deletions
@@ -104,6 +104,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) DEFiRet; assert(pThis != NULL); + assert(pThis->fd != -1); dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd); if(pThis->tOperationsMode == STREAMMODE_WRITE) @@ -121,6 +122,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) pThis->pszCurrFName = NULL; } +dbgprintf("exit strmCloseFile, fd: %d\n", pThis->fd); return iRet; } @@ -128,12 +130,15 @@ static rsRetVal strmCloseFile(strm_t *pThis) /* switch to next strm file * This method must only be called if we are in a multi-file mode! */ -rsRetVal strmNextFile(strm_t *pThis) +static rsRetVal +strmNextFile(strm_t *pThis) { DEFiRet; +dbgprintf("strmNextFile, old num %d\n", pThis->iCurrFNum); assert(pThis != NULL); assert(pThis->iMaxFiles != 0); + assert(pThis->fd != -1); CHKiRet(strmCloseFile(pThis)); @@ -161,6 +166,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) { DEFiRet; int bRun; + long iLenRead; assert(pThis != NULL); assert(pC != NULL); @@ -179,10 +185,10 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) while(bRun) { /* first check if we need to (re)open the file (we may have switched to a new one!) */ CHKiRet(strmOpenFile(pThis)); - pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); - dbgprintf("Stream 0x%lx: read %d bytes from file %d\n", (unsigned long) pThis, - pThis->iBufPtrMax, pThis->fd); - if(pThis->iBufPtrMax == 0) { + iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); + dbgprintf("Stream 0x%lx: read %ld bytes from file %d\n", (unsigned long) pThis, + iLenRead, pThis->fd); + if(iLenRead == 0) { if(pThis->iMaxFiles == 0) ABORT_FINALIZE(RS_RET_EOF); else { @@ -191,10 +197,12 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) dbgprintf("Stream 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->fd); CHKiRet(strmNextFile(pThis)); } - } else if(pThis->iBufPtrMax < 0) + } else if(iLenRead < 0) ABORT_FINALIZE(RS_RET_IO_ERROR); - else + else { /* good read */ + pThis->iBufPtrMax = iLenRead; bRun = 0; /* exit loop */ + } } /* if we reach this point, we had a good read */ pThis->iBufPtr = 0; @@ -317,11 +325,16 @@ rsRetVal strmDestruct(strm_t *pThis) /* check if we need to open a new file (in output mode only). * The decision is based on file size AND record delimition state. + * This method may also be called on a closed file, in which case + * it immediately returns. */ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) { DEFiRet; + if(pThis->fd == -1) + FINALIZE; + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgprintf("Stream 0x%lx: max file size %ld reached for %d, now %ld - starting new file\n", (unsigned long) pThis, (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); @@ -333,15 +346,19 @@ finalize_it: } /* write memory buffer to a stream object. + * To support direct writes of large objects, this method may be called + * with a buffer pointing to some region other than the stream buffer itself. + * However, in that case the stream buffer must be empty (strmFlush() has to + * be called before), because we would otherwise mess up with the sequence + * inside the stream. -- rgerhards, 2008-01-10 */ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; int iWritten; -dbgprintf("strmWriteInternal()\n"); assert(pThis != NULL); - assert(pBuf != NULL); + assert(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); @@ -351,10 +368,21 @@ dbgprintf("strmWriteInternal()\n"); iWritten, errno, strerror(errno)); /* TODO: handle error case -- rgerhards, 2008-01-07 */ + /* Now indicate buffer empty again. We do this in any case, because there + * is no way we could react more intelligently to an error during write. + * This MUST be done BEFORE strCheckNextOutputFile(), otherwise we have an + * endless loop. We reset the buffer pointer also in finalize_it - this is + * necessary if we run into problems. Not resetting it would again cause an + * endless loop. So it is better to loose some data (which also justifies + * duplicating that code, too...) -- rgerhards, 2008-01-10 + */ + pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; CHKiRet(strmCheckNextOutputFile(pThis)); finalize_it: + pThis->iBufPtr = 0; /* see comment above */ + return iRet; } @@ -366,47 +394,90 @@ rsRetVal strmFlush(strm_t *pThis) { DEFiRet; - dbgprintf("Stream 0x%lx: flush file %d, buflen %d\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); assert(pThis != NULL); + dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); if(pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); - /* Now indicate buffer empty again. We do this in any case, because there - * is no way we could react more intelligently to an error during write. - * We have not used CHKiRet(), as that would have presented some sequence - * problems, which are not necessary to look at given what we do. - */ - pThis->iBufPtr = 0; } return iRet; } +/* write a *single* character to a stream object -- rgerhards, 2008-01-10 + */ +rsRetVal strmWriteChar(strm_t *pThis, uchar c) +{ + DEFiRet; + + assert(pThis != NULL); + + /* if the buffer is full, we need to flush before we can write */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlush(pThis)); + } + /* we now always have space for one character, so we simply copy it */ + *(pThis->pIOBuf + pThis->iBufPtr) = c; + pThis->iBufPtr++; + +finalize_it: + return iRet; +} + + +/* write an integer value (actually a long) to a stream object */ +rsRetVal strmWriteLong(strm_t *pThis, long i) +{ + DEFiRet; + uchar szBuf[32]; + + assert(pThis != NULL); + + CHKiRet(srUtilItoA((char*)szBuf, sizeof(szBuf), i)); + CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf))); + +finalize_it: + return iRet; +} + + /* write memory buffer to a stream object */ rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; + size_t iPartial; -dbgprintf("strmWrite()\n"); assert(pThis != NULL); assert(pBuf != NULL); - /* if the string does not fit into the current buffer, we flush that buffer and - * then do the write ourselfs. So even if we have data that is of multiple - * buffer lengths, we will write it with a single write operation. - * rgerhards, 2008-01-10 - */ - if(pThis->iBufPtr + lenBuf >= pThis->sIOBufSize) { -dbgprintf("strmWrite() uses direct write\n"); - CHKiRet(strmFlush(pThis)); + /* check if the to-be-written data is larger than our buffer size */ + if(lenBuf >= pThis->sIOBufSize) { + /* it is - so we do a direct write, that is most efficient. + * TODO: is it really? think about disk block sizes! + */ + CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); } else { -dbgprintf("strmWrite() uses buffered write\n"); - /* we have space, so we simply copy over the string */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); - pThis->iBufPtr += lenBuf; + /* data fits into a buffer - we just need to see if it + * fits into the current buffer... + */ + if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { + /* nope, so we must split it */ + iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); + pThis->iBufPtr += iPartial; + } + CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); + pThis->iBufPtr = lenBuf - iPartial; + } else { + /* we have space, so we simply copy over the string */ + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); + pThis->iBufPtr += lenBuf; + } } finalize_it: @@ -511,6 +582,7 @@ rsRetVal strmRecordBegin(strm_t *pThis) assert(pThis != NULL); assert(pThis->bInRecord == 0); pThis->bInRecord = 1; +dbgprintf("strmRecordBegin set \n"); return RS_RET_OK; } @@ -520,8 +592,10 @@ rsRetVal strmRecordEnd(strm_t *pThis) assert(pThis != NULL); assert(pThis->bInRecord == 1); +dbgprintf("strmRecordEnd in %d\n", iRet); pThis->bInRecord = 0; iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */ +dbgprintf("strmRecordEnd out %d\n", iRet); return iRet; } |