summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-03-19 07:19:28 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2010-03-19 07:19:28 +0100
commite910078e41e2960f9afc55013cbd287532be478e (patch)
treee4e6ad3fcd71ffbfb498306fdd206c375230806a /runtime
parentadf3e203bd180c43e68bb0d87656e7ef470e269d (diff)
downloadrsyslog-e910078e41e2960f9afc55013cbd287532be478e.tar.gz
rsyslog-e910078e41e2960f9afc55013cbd287532be478e.tar.xz
rsyslog-e910078e41e2960f9afc55013cbd287532be478e.zip
bugfix: improper synchronization when "$OMFileFlushOnTXEnd on" was used
Internal data structures were not properly protected due to missing mutex calls.
Diffstat (limited to 'runtime')
-rw-r--r--runtime/stream.c46
1 files changed, 35 insertions, 11 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 9a0a8615..93f7fd58 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -67,7 +67,7 @@ DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
/* forward definitions */
-static rsRetVal strmFlush(strm_t *pThis);
+static rsRetVal strmFlushInternal(strm_t *pThis);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
@@ -309,7 +309,7 @@ strmWaitAsyncWriterDone(strm_t *pThis)
* Note: it is valid to call this function when the physical file is closed. If so,
* strmCloseFile() will still check if there is any unwritten data inside buffers
* (this may be the case) and, if so, will open the file, write the data, and then
- * close it again (this is done via strmFlush and friends).
+ * close it again (this is done via strmFlushInternal and friends).
*/
static rsRetVal strmCloseFile(strm_t *pThis)
{
@@ -320,7 +320,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
(pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName);
if(pThis->tOperationsMode != STREAMMODE_READ) {
- strmFlush(pThis);
+ strmFlushInternal(pThis);
if(pThis->bAsyncWrite) {
strmWaitAsyncWriterDone(pThis);
}
@@ -939,7 +939,7 @@ dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName);
}
if(bTimedOut && pThis->iBufPtr > 0) {
/* if we timed out, we need to flush pending data */
- strmFlush(pThis);
+ strmFlushInternal(pThis);
bTimedOut = 0;
continue; /* now we should have data */
}
@@ -1136,12 +1136,11 @@ finalize_it:
* rgerhards, 2008-01-10
*/
static rsRetVal
-strmFlush(strm_t *pThis)
+strmFlushInternal(strm_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
- DBGOPRINT((obj_t*) pThis, "file %d(%s) flush, buflen %ld\n", pThis->fd, pThis->pszFName, (long) pThis->iBufPtr);
DBGOPRINT((obj_t*) pThis, "file %d(%s) flush, buflen %ld%s\n", pThis->fd,
(pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName,
(long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : "");
@@ -1154,6 +1153,31 @@ strmFlush(strm_t *pThis)
}
+/* flush stream output buffer to persistent storage. This can be called at any time
+ * and is automatically called when the output buffer is full. This function is for
+ * use by EXTERNAL callers. Do NOT use it internally. It locks the async writer
+ * mutex if ther is need to do so.
+ * rgerhards, 2010-03-18
+ */
+static rsRetVal
+strmFlush(strm_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_lock(&pThis->mut);
+ CHKiRet(strmFlushInternal(pThis));
+
+finalize_it:
+ if(pThis->bAsyncWrite)
+ d_pthread_mutex_unlock(&pThis->mut);
+
+ RETiRet;
+}
+
+
/* seek a stream to a specific location. Pending writes are flushed, read data
* is invalidated.
* rgerhards, 2008-01-12
@@ -1167,7 +1191,7 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs)
if(pThis->fd == -1)
strmOpenFile(pThis);
else
- strmFlush(pThis);
+ strmFlushInternal(pThis);
int i;
DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs);
i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error!
@@ -1208,7 +1232,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlush(pThis));
+ CHKiRet(strmFlushInternal(pThis));
}
/* we now always have space for one character, so we simply copy it */
*(pThis->pIOBuf + pThis->iBufPtr) = c;
@@ -1278,7 +1302,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
}
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
@@ -1293,7 +1317,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
* write it. This seems more natural than waiting (hours?) for the next message...
*/
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
}
finalize_it:
@@ -1452,7 +1476,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
- strmFlush(pThis);
+ strmFlushInternal(pThis);
CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis));
objSerializeSCALAR(pStrm, iCurrFNum, INT);