From e910078e41e2960f9afc55013cbd287532be478e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 19 Mar 2010 07:19:28 +0100 Subject: bugfix: improper synchronization when "$OMFileFlushOnTXEnd on" was used Internal data structures were not properly protected due to missing mutex calls. --- ChangeLog | 3 +++ runtime/stream.c | 46 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/ChangeLog b/ChangeLog index 1f74b252..b00965c4 100644 --- a/ChangeLog +++ b/ChangeLog @@ -6,6 +6,9 @@ Version 4.6.2 [v4-stable] (rgerhards), 2010-03-?? to ease migration from syslog-ng. See property replacer doc for details. [backport from 5.5.3 because urgently needed by some] - improved testbench +- bugfix: improper synchronization when "$OMFileFlushOnTXEnd on" was used + Internal data structures were not properly protected due to missing + mutex calls. - bugfix: potential data loss during file stream shutdown - bugfix: potential problems during file stream shutdown The shutdown/close sequence was not clean, what potentially (but diff --git a/runtime/stream.c b/runtime/stream.c index 9a0a8615..93f7fd58 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -67,7 +67,7 @@ DEFobjStaticHelpers DEFobjCurrIf(zlibw) /* forward definitions */ -static rsRetVal strmFlush(strm_t *pThis); +static rsRetVal strmFlushInternal(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); @@ -309,7 +309,7 @@ strmWaitAsyncWriterDone(strm_t *pThis) * Note: it is valid to call this function when the physical file is closed. If so, * strmCloseFile() will still check if there is any unwritten data inside buffers * (this may be the case) and, if so, will open the file, write the data, and then - * close it again (this is done via strmFlush and friends). + * close it again (this is done via strmFlushInternal and friends). */ static rsRetVal strmCloseFile(strm_t *pThis) { @@ -320,7 +320,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName); if(pThis->tOperationsMode != STREAMMODE_READ) { - strmFlush(pThis); + strmFlushInternal(pThis); if(pThis->bAsyncWrite) { strmWaitAsyncWriterDone(pThis); } @@ -939,7 +939,7 @@ dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName); } if(bTimedOut && pThis->iBufPtr > 0) { /* if we timed out, we need to flush pending data */ - strmFlush(pThis); + strmFlushInternal(pThis); bTimedOut = 0; continue; /* now we should have data */ } @@ -1136,12 +1136,11 @@ finalize_it: * rgerhards, 2008-01-10 */ static rsRetVal -strmFlush(strm_t *pThis) +strmFlushInternal(strm_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - DBGOPRINT((obj_t*) pThis, "file %d(%s) flush, buflen %ld\n", pThis->fd, pThis->pszFName, (long) pThis->iBufPtr); DBGOPRINT((obj_t*) pThis, "file %d(%s) flush, buflen %ld%s\n", pThis->fd, (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName, (long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : ""); @@ -1154,6 +1153,31 @@ strmFlush(strm_t *pThis) } +/* flush stream output buffer to persistent storage. This can be called at any time + * and is automatically called when the output buffer is full. This function is for + * use by EXTERNAL callers. Do NOT use it internally. It locks the async writer + * mutex if ther is need to do so. + * rgerhards, 2010-03-18 + */ +static rsRetVal +strmFlush(strm_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + CHKiRet(strmFlushInternal(pThis)); + +finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + + RETiRet; +} + + /* seek a stream to a specific location. Pending writes are flushed, read data * is invalidated. * rgerhards, 2008-01-12 @@ -1167,7 +1191,7 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs) if(pThis->fd == -1) strmOpenFile(pThis); else - strmFlush(pThis); + strmFlushInternal(pThis); int i; DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs); i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error! @@ -1208,7 +1232,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); + CHKiRet(strmFlushInternal(pThis)); } /* we now always have space for one character, so we simply copy it */ *(pThis->pIOBuf + pThis->iBufPtr) = c; @@ -1278,7 +1302,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ } iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) @@ -1293,7 +1317,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) * write it. This seems more natural than waiting (hours?) for the next message... */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ } finalize_it: @@ -1452,7 +1476,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) ISOBJ_TYPE_assert(pThis, strm); ISOBJ_TYPE_assert(pStrm, strm); - strmFlush(pThis); + strmFlushInternal(pThis); CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis)); objSerializeSCALAR(pStrm, iCurrFNum, INT); -- cgit