summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-07 12:09:41 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-07 12:09:41 +0200
commitf53aa966e1fad03c478de342f5a878e57405de13 (patch)
treee9da164a635aed6f0ee72d107505e0fe3ed44108
parent53aa68fc7d78279ab1af88de253a6134fc7ef00d (diff)
downloadrsyslog-f53aa966e1fad03c478de342f5a878e57405de13.tar.gz
rsyslog-f53aa966e1fad03c478de342f5a878e57405de13.tar.xz
rsyslog-f53aa966e1fad03c478de342f5a878e57405de13.zip
solved a race condition
-rw-r--r--runtime/debug.c2
-rw-r--r--runtime/debug.h1
-rw-r--r--runtime/stream.c63
-rwxr-xr-xtests/diag.sh2
4 files changed, 52 insertions, 16 deletions
diff --git a/runtime/debug.c b/runtime/debug.c
index 4ee90226..ded1c218 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -732,6 +732,8 @@ static void dbgGetThrdName(char *pszBuf, size_t lenBuf, pthread_t thrd, int bInc
*/
void dbgSetThrdName(uchar *pszName)
{
+return;
+
dbgThrdInfo_t *pThrd = dbgGetThrdInfo();
if(pThrd->pszThrdName != NULL)
free(pThrd->pszThrdName);
diff --git a/runtime/debug.h b/runtime/debug.h
index 8b66d784..dcbfb930 100644
--- a/runtime/debug.h
+++ b/runtime/debug.h
@@ -135,7 +135,6 @@ void dbgPrintAllDebugInfo(void);
/* debug aides */
#ifdef RTINST
-//#if 0 // temporarily removed for helgrind
#define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
#define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT )
diff --git a/runtime/stream.c b/runtime/stream.c
index 92122215..eae36796 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -50,6 +50,8 @@
#include "module-template.h"
#include <sys/prctl.h>
+#define inline
+
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
@@ -267,6 +269,21 @@ finalize_it:
}
+/* wait for the output writer thread to be done. This must be called before actions
+ * that require data to be persisted. May be called in non-async mode and is a null
+ * operation than. Must be called with the mutex locked.
+ */
+static inline void
+strmWaitAsyncWriterDone(strm_t *pThis)
+{
+ if(pThis->bAsyncWrite) {
+ /* awake writer thread and make it write out everything */
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut);
+ }
+}
+
+
/* close a strm file
* Note that the bDeleteOnClose flag is honored. If it is set, the file will be
* deleted after close. This is in support for the qRead thread.
@@ -282,6 +299,8 @@ static rsRetVal strmCloseFile(strm_t *pThis)
if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
+ strmWaitAsyncWriterDone(pThis);
+
close(pThis->fd);
pThis->fd = -1;
@@ -618,16 +637,14 @@ finalize_it:
/* stop the writer thread (we MUST be runnnig asynchronously when this method
- * is called!) -- rgerhards, 2009-07-06
+ * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06
*/
static inline void
stopWriter(strm_t *pThis)
{
BEGINfunc
- d_pthread_mutex_lock(&pThis->mut);
pThis->bStopWriter = 1;
pthread_cond_signal(&pThis->notEmpty);
- d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut);
d_pthread_mutex_unlock(&pThis->mut);
pthread_join(pThis->writerThreadID, NULL);
ENDfunc
@@ -638,9 +655,14 @@ stopWriter(strm_t *pThis)
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(strm)
+ if(pThis->bAsyncWrite)
+ /* Note: mutex will be unlocked in stopWriter! */
+ d_pthread_mutex_lock(&pThis->mut);
+
if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
+dbgprintf("XXX: destruct stream %p\n", pThis);
/* ... then free resources */
if(pThis->fd != -1)
strmCloseFile(pThis);
@@ -681,6 +703,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
if(pThis->fd == -1)
FINALIZE;
+ /* wait for output to be empty, so that our counts are correct */
+ strmWaitAsyncWriterDone(pThis);
+
if(pThis->iCurrOffs >= pThis->iMaxFileSize) {
dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n",
(long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
@@ -783,7 +808,6 @@ doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
DEFiRet;
ASSERT(pThis != NULL);
- ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
if(pThis->iZipLevel) {
CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
@@ -811,15 +835,15 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
DEFiRet;
ISOBJ_TYPE_assert(pThis, strm);
+dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf);
while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf;
- pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf;
+ pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf;
if(++pThis->iCnt == 1)
pthread_cond_signal(&pThis->notEmpty);
- d_pthread_mutex_unlock(&pThis->mut);
finalize_it:
RETiRet;
@@ -836,7 +860,6 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
DEFiRet;
ASSERT(pThis != NULL);
- ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
if(pThis->bAsyncWrite) {
CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
@@ -844,6 +867,8 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
}
+ pThis->iBufPtr = 0; /* we are at the begin of a new buffer */
+
finalize_it:
RETiRet;
}
@@ -857,7 +882,6 @@ static void*
asyncWriterThread(void *pPtr)
{
int iDeq;
- size_t iWritten;
strm_t *pThis = (strm_t*) pPtr;
ISOBJ_TYPE_assert(pThis, strm);
@@ -870,7 +894,7 @@ asyncWriterThread(void *pPtr)
d_pthread_mutex_lock(&pThis->mut);
while(pThis->iCnt == 0) {
if(pThis->bStopWriter) {
- pthread_cond_signal(&pThis->isEmpty);
+ pthread_cond_broadcast(&pThis->isEmpty);
d_pthread_mutex_unlock(&pThis->mut);
goto finalize_it; /* break main loop */
}
@@ -878,13 +902,14 @@ asyncWriterThread(void *pPtr)
}
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
- iWritten = pThis->asyncBuf[iDeq].lenBuf;
- doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten);
- // TODO: error check!!!!! 2009-07-06
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+ // TODO: error check????? 2009-07-06
--pThis->iCnt;
if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) {
pthread_cond_signal(&pThis->notFull);
+ if(pThis->iCnt == 0)
+ pthread_cond_broadcast(&pThis->isEmpty);
}
d_pthread_mutex_unlock(&pThis->mut);
}
@@ -949,7 +974,6 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iWritten = lenBuf;
CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
- pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
/* update user counter, if provided */
if(pThis->pUsrWCntr != NULL)
@@ -1158,7 +1182,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
-DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
if(pThis->bAsyncWrite)
d_pthread_mutex_lock(&pThis->mut);
@@ -1167,9 +1191,11 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n
iOffset = 0;
do {
+//dbgprintf("XXX: enter write loop\n");
if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
}
+//dbgprintf("XXX: post strmFlush\n");
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
iWrite = lenBuf;
@@ -1177,8 +1203,17 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n
pThis->iBufPtr += iWrite;
iOffset += iWrite;
lenBuf -= iWrite;
+//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset);
} while(lenBuf > 0);
+ /* now check if the buffer right at the end of the write is full and, if so,
+ * write it. This seems more natural than waiting (hours?) for the next message...
+ */
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
+ CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ }
+//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf);
+
finalize_it:
if(pThis->bAsyncWrite)
d_pthread_mutex_unlock(&pThis->mut);
diff --git a/tests/diag.sh b/tests/diag.sh
index 299c5d71..1514474c 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -5,7 +5,7 @@
# not always able to convey back states to the upper-level test driver
# begun 2009-05-27 by rgerhards
# This file is part of the rsyslog project, released under GPLv3
-#valgrind="valgrind --log-fd=1"
+valgrind="valgrind --log-fd=1"
#valgrind="valgrind --tool=drd --log-fd=1"
#valgrind="valgrind --tool=helgrind --log-fd=1"
#set -o xtrace