diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-10 09:08:13 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-10 09:08:13 +0000 |
commit | 79040541b70e95f0f00add4c9cafa08e9c411d79 (patch) | |
tree | 0092425a37b05c5035a3529d5441eae7b42a54ad /stream.c | |
parent | 24c125cfc3032e6269e6e5de91c72c91508adde0 (diff) | |
download | rsyslog-79040541b70e95f0f00add4c9cafa08e9c411d79.tar.gz rsyslog-79040541b70e95f0f00add4c9cafa08e9c411d79.tar.xz rsyslog-79040541b70e95f0f00add4c9cafa08e9c411d79.zip |
added buffered output to stream class
Diffstat (limited to 'stream.c')
-rw-r--r-- | stream.c | 116 |
1 files changed, 96 insertions, 20 deletions
@@ -60,11 +60,13 @@ DEFobjStaticHelpers * It is OK to call this function when the stream is already open. In that * case, it returns immediately with RS_RET_OK */ -rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode) +static rsRetVal strmOpenFile(strm_t *pThis) { DEFiRet; + int iFlags; assert(pThis != NULL); + assert(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE); if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); @@ -72,19 +74,21 @@ rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode) if(pThis->pszFilePrefix == NULL) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ - if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - pThis->iBufPtrMax = 0; /* results in immediate read request */ - } - CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, pThis->pszFilePrefix, pThis->lenFilePrefix, pThis->iCurrFNum, pThis->iFileNumDigits)); - pThis->fd = open((char*)pThis->pszCurrFName, flags, mode); // TODO: open modes! + /* compute which flags we need to provide to open */ + if(pThis->tOperationsMode == STREAMMODE_READ) + iFlags = O_RDONLY; + else + iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; + + pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); pThis->iCurrOffs = 0; - dbgprintf("Stream 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pThis->pszCurrFName, flags, pThis->fd); + dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis, + pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", + iFlags, pThis->fd); finalize_it: return iRet; @@ -102,6 +106,9 @@ static rsRetVal strmCloseFile(strm_t *pThis) assert(pThis != NULL); dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd); + if(pThis->tOperationsMode == STREAMMODE_WRITE) + strmFlush(pThis); + close(pThis->fd); // TODO: error check pThis->fd = -1; @@ -173,9 +180,10 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) bRun = 1; while(bRun) { /* first check if we need to (re)open the file (we may have switched to a new one!) */ - CHKiRet(strmOpenFile(pThis, O_RDONLY, 0600)); // TODO: open modes! + CHKiRet(strmOpenFile(pThis)); pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); - dbgprintf("strmReadChar read %d bytes from file %d\n", pThis->iBufPtrMax, pThis->fd); + dbgprintf("Stream 0x%lx: read %d bytes from file %d\n", (unsigned long) pThis, + pThis->iBufPtrMax, pThis->fd); if(pThis->iBufPtrMax == 0) { if(pThis->iMaxFiles == 0) ABORT_FINALIZE(RS_RET_EOF); @@ -269,15 +277,27 @@ BEGINobjConstruct(strm) pThis->iUngetC = -1; pThis->sType = STREAMTYPE_FILE; pThis->sIOBufSize = glblGetIOBufSize(); + pThis->tOpenMode = 0600; ENDobjConstruct(strm) -/* ConstructionFinalizer - currently provided just to comply to the interface - * definiton. -- rgerhards, 2008-01-09 +/* ConstructionFinalizer + * rgerhards, 2008-01-09 */ -rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis) +rsRetVal strmConstructFinalize(strm_t *pThis) { - return RS_RET_OK; + DEFiRet; + + assert(pThis != NULL); + + if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ + if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + pThis->iBufPtrMax = 0; /* results in immediate read request */ + } + +finalize_it: + return iRet; } @@ -301,20 +321,19 @@ rsRetVal strmDestruct(strm_t *pThis) return iRet; } - -/* write memory buffer to a stream object +/* write memory buffer to a stream object. */ -rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; int iWritten; -dbgprintf("strmWrite()\n"); +dbgprintf("strmWriteInternal()\n"); assert(pThis != NULL); assert(pBuf != NULL); if(pThis->fd == -1) - CHKiRet(strmOpenFile(pThis, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! + CHKiRet(strmOpenFile(pThis)); iWritten = write(pThis->fd, pBuf, lenBuf); dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis, @@ -332,12 +351,69 @@ finalize_it: return iRet; } +/* flush stream output buffer to persistent storage. This can be called at any time + * and is automatically called when the output buffer is full. + * rgerhards, 2008-01-10 + */ +rsRetVal strmFlush(strm_t *pThis) +{ + DEFiRet; + + dbgprintf("Stream 0x%lx: flush file %d, buflen %d\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); + assert(pThis != NULL); + + if(pThis->iBufPtr > 0) { + iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); + /* Now indicate buffer empty again. We do this in any case, because there + * is no way we could react more intelligently to an error during write. + * We have not used CHKiRet(), as that would have presented some sequence + * problems, which are not necessary to look at given what we do. + */ + pThis->iBufPtr = 0; + } + + return iRet; +} + + +/* write memory buffer to a stream object + */ +rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + +dbgprintf("strmWrite()\n"); + assert(pThis != NULL); + assert(pBuf != NULL); + + /* if the string does not fit into the current buffer, we flush that buffer and + * then do the write ourselfs. So even if we have data that is of multiple + * buffer lengths, we will write it with a single write operation. + * rgerhards, 2008-01-10 + */ + if(pThis->iBufPtr + lenBuf >= pThis->sIOBufSize) { +dbgprintf("strmWrite() uses direct write\n"); + CHKiRet(strmFlush(pThis)); + CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); + } else { +dbgprintf("strmWrite() uses buffered write\n"); + /* we have space, so we simply copy over the string */ + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); + pThis->iBufPtr += lenBuf; + } + +finalize_it: + return iRet; +} + /* property set methods */ /* simple ones first */ DEFpropSetMeth(strm, bDeleteOnClose, int) DEFpropSetMeth(strm, iMaxFileSize, int) DEFpropSetMeth(strm, iFileNumDigits, int) +DEFpropSetMeth(strm, tOperationsMode, int); +DEFpropSetMeth(strm, tOpenMode, mode_t); rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { |