diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-04 15:10:24 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-04 15:10:24 +0200 |
commit | f2800ba261d2fb7466cbdebbf80afe92f0bffd3d (patch) | |
tree | b9d36b01c5c24e392c005070d8c5f3f2456bc274 /runtime/stream.c | |
parent | 9e434f19a9baa4a6f411808b5cb6bc22d6a32781 (diff) | |
download | rsyslog-f2800ba261d2fb7466cbdebbf80afe92f0bffd3d.tar.gz rsyslog-f2800ba261d2fb7466cbdebbf80afe92f0bffd3d.tar.xz rsyslog-f2800ba261d2fb7466cbdebbf80afe92f0bffd3d.zip |
modified stream class and omfile to work with it
now some basic operations are carried out via the stream class.
Diffstat (limited to 'runtime/stream.c')
-rw-r--r-- | runtime/stream.c | 168 |
1 files changed, 144 insertions, 24 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index 261eb9ca..abcfce0c 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -1,4 +1,3 @@ -//TODO: O_TRUC mode! /* The serial stream class. * * A serial stream provides serial data access. In theory, serial streams @@ -50,6 +49,7 @@ /* static data */ DEFobjStaticHelpers +DEFobjCurrIf(zlibw) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); @@ -74,7 +74,6 @@ static rsRetVal strmOpenFile(strm_t *pThis) int iFlags; ASSERT(pThis != NULL); - ASSERT(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE); if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); @@ -95,13 +94,27 @@ static rsRetVal strmOpenFile(strm_t *pThis) } } +RUNLOG_VAR("%d", pThis->tOperationsMode); /* compute which flags we need to provide to open */ - if(pThis->tOperationsMode == STREAMMODE_READ) - iFlags = O_RDONLY; - else - iFlags = O_WRONLY | O_CREAT; + switch(pThis->tOperationsMode) { + case STREAMMODE_READ: + iFlags = O_CLOEXEC | O_NOCTTY | O_RDONLY; + break; + case STREAMMODE_WRITE: /* legacy mode used inside queue engine */ + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT; + break; + case STREAMMODE_WRITE_TRUNC: + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_TRUNC; + break; + case STREAMMODE_WRITE_APPEND: + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_APPEND; + break; + default:assert(0); + break; + } - iFlags |= pThis->iAddtlOpenFlags; + iFlags |= pThis->iAddtlOpenFlags; // TODO: remove this! +dbgprintf("XXX: open with flags %d\n", iFlags); pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); if(pThis->fd == -1) { @@ -135,7 +148,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - if(pThis->tOperationsMode == STREAMMODE_WRITE) + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); close(pThis->fd); // TODO: error check @@ -398,14 +411,25 @@ ENDobjConstruct(strm) */ static rsRetVal strmConstructFinalize(strm_t *pThis) { + rsRetVal localRet; DEFiRet; ASSERT(pThis != NULL); - if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */ - if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - pThis->iBufPtrMax = 0; /* results in immediate read request */ + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + pThis->iBufPtrMax = 0; /* results in immediate read request */ + if(pThis->iZipLevel) { /* do we need a zip buf? */ + localRet = objUse(zlibw, LM_ZLIBW_FILENAME); + if(localRet != RS_RET_OK) { + pThis->iZipLevel = 0; + DBGPRINTF("stream was requested with zip mode, but zlibw module unavailable (%d) - using " + "without zip\n", localRet); + } else { + /* we use the same size as the original buf, as we would like + * to make sure we can write out everyting with a SINGLE api call! + */ + CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } } finalize_it: @@ -416,15 +440,20 @@ finalize_it: /* destructor for the strm object */ BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(strm) - if(pThis->tOperationsMode == STREAMMODE_WRITE) + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); /* ... then free resources */ if(pThis->fd != -1) strmCloseFile(pThis); + if(pThis->iZipLevel) { /* do we need a zip buf? */ + objRelease(zlibw, LM_ZLIBW_FILENAME); + } + free(pThis->pszDir); free(pThis->pIOBuf); + free(pThis->pZipBuf); free(pThis->pszCurrFName); free(pThis->pszFName); ENDobjDestruct(strm) @@ -453,20 +482,17 @@ finalize_it: } -/* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 +/* physically write to the output file. the provided data is ready for + * writing (e.g. zipped if we are requested to do that). + * rgerhards, 2009-06-04 */ -static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +static rsRetVal +strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; int iWritten; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); @@ -499,6 +525,95 @@ finalize_it: } +/* write the output buffer in zip mode + * This means we compress it first and then do a physical write. + * Note that we always do a full deflateInit ... deflate ... deflateEnd + * sequence. While this is not optimal, we need to do it because we need + * to ensure that the file is readable even when we are aborted. Doing the + * full sequence brings us as far towards this goal as possible (and not + * doing it would be a total failure). It may be worth considering to + * add a config switch so that the user can decide the risk he is ready + * to take, but so far this is not yet implemented (not even requested ;)). + * rgerhards, 2009-06-04 + */ +static rsRetVal +doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + z_stream zstrm; + int zRet; /* zlib return state */ + DEFiRet; + assert(pThis != NULL); + assert(pBuf != NULL); + + /* allocate deflate state */ + zstrm.zalloc = Z_NULL; + zstrm.zfree = Z_NULL; + zstrm.opaque = Z_NULL; + /* see note in file header for the params we use with deflateInit2() */ + zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); + if(zRet != Z_OK) { + dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet); + ABORT_FINALIZE(RS_RET_ZLIB_ERR); + } +RUNLOG_STR("deflateInit2() done successfully\n"); + + /* now doing the compression */ + zstrm.avail_in = lenBuf; + zstrm.next_in = (Bytef*) pBuf; + /* run deflate() on input until output buffer not full, finish + compression if all of source has been read in */ + do { + dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); + zstrm.avail_out = pThis->sIOBufSize; + zstrm.next_out = pThis->pZipBuf; + zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ + dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); + assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ + CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); + } while (zstrm.avail_out == 0); + assert(zstrm.avail_in == 0); /* all input will be used */ + +RUNLOG_STR("deflate() should be done successfully\n"); + + zRet = zlibw.DeflateEnd(&zstrm); + if(zRet != Z_OK) { + dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet); + ABORT_FINALIZE(RS_RET_ZLIB_ERR); + } +RUNLOG_STR("deflateEnd() done successfully\n"); + +finalize_it: + RETiRet; +} + + +/* write memory buffer to a stream object. + * To support direct writes of large objects, this method may be called + * with a buffer pointing to some region other than the stream buffer itself. + * However, in that case the stream buffer must be empty (strmFlush() has to + * be called before), because we would otherwise mess up with the sequence + * inside the stream. -- rgerhards, 2008-01-10 + */ +static rsRetVal +strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -511,7 +626,7 @@ strmFlush(strm_t *pThis) ASSERT(pThis != NULL); dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); - if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) { + if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); } @@ -605,6 +720,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); +dbgprintf("strmWrite(%p, '%s', %ld);\n", pThis, pBuf,lenBuf); /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -646,6 +762,7 @@ DEFpropSetMeth(strm, iFileNumDigits, int) DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) +DEFpropSetMeth(strm, iZipLevel, int) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { @@ -681,13 +798,14 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) ASSERT(pThis != NULL); ASSERT(pszName != NULL); +dbgprintf("XXX: strm setFname: '%s'\n", pszName); if(iLenName < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); if(pThis->pszFName != NULL) free(pThis->pszFName); - if((pThis->pszFName = malloc(sizeof(uchar) * iLenName + 1)) == NULL) + if((pThis->pszFName = malloc(sizeof(uchar) * (iLenName + 1))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */ @@ -711,6 +829,7 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) ASSERT(pThis != NULL); ASSERT(pszDir != NULL); +dbgprintf("XXX: strm setDir: '%s'\n", pszDir); if(iLenDir < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -927,7 +1046,7 @@ CODESTARTobjQueryInterface(strm) pIf->RecordBegin = strmRecordBegin; pIf->RecordEnd = strmRecordEnd; pIf->Serialize = strmSerialize; - pIf->SetiAddtlOpenFlags = strmSetiAddtlOpenFlags; + /* TODO: remove this! */ pIf->SetiAddtlOpenFlags = strmSetiAddtlOpenFlags; pIf->GetCurrOffset = strmGetCurrOffset; pIf->SetWCntr = strmSetWCntr; /* set methods */ @@ -938,6 +1057,7 @@ CODESTARTobjQueryInterface(strm) pIf->SettOperationsMode = strmSettOperationsMode; pIf->SettOpenMode = strmSettOpenMode; pIf->SetsType = strmSetsType; + pIf->SetiZipLevel = strmSetiZipLevel; finalize_it: ENDobjQueryInterface(strm) |