diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-07 12:09:41 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-07 12:09:41 +0200 |
commit | f53aa966e1fad03c478de342f5a878e57405de13 (patch) | |
tree | e9da164a635aed6f0ee72d107505e0fe3ed44108 | |
parent | 53aa68fc7d78279ab1af88de253a6134fc7ef00d (diff) | |
download | rsyslog-f53aa966e1fad03c478de342f5a878e57405de13.tar.gz rsyslog-f53aa966e1fad03c478de342f5a878e57405de13.tar.xz rsyslog-f53aa966e1fad03c478de342f5a878e57405de13.zip |
solved a race condition
-rw-r--r-- | runtime/debug.c | 2 | ||||
-rw-r--r-- | runtime/debug.h | 1 | ||||
-rw-r--r-- | runtime/stream.c | 63 | ||||
-rwxr-xr-x | tests/diag.sh | 2 |
4 files changed, 52 insertions, 16 deletions
diff --git a/runtime/debug.c b/runtime/debug.c index 4ee90226..ded1c218 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -732,6 +732,8 @@ static void dbgGetThrdName(char *pszBuf, size_t lenBuf, pthread_t thrd, int bInc */ void dbgSetThrdName(uchar *pszName) { +return; + dbgThrdInfo_t *pThrd = dbgGetThrdInfo(); if(pThrd->pszThrdName != NULL) free(pThrd->pszThrdName); diff --git a/runtime/debug.h b/runtime/debug.h index 8b66d784..dcbfb930 100644 --- a/runtime/debug.h +++ b/runtime/debug.h @@ -135,7 +135,6 @@ void dbgPrintAllDebugInfo(void); /* debug aides */ #ifdef RTINST -//#if 0 // temporarily removed for helgrind #define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_trylock(x) dbgMutexTryLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) diff --git a/runtime/stream.c b/runtime/stream.c index 92122215..eae36796 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -50,6 +50,8 @@ #include "module-template.h" #include <sys/prctl.h> +#define inline + /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) @@ -267,6 +269,21 @@ finalize_it: } +/* wait for the output writer thread to be done. This must be called before actions + * that require data to be persisted. May be called in non-async mode and is a null + * operation than. Must be called with the mutex locked. + */ +static inline void +strmWaitAsyncWriterDone(strm_t *pThis) +{ + if(pThis->bAsyncWrite) { + /* awake writer thread and make it write out everything */ + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } +} + + /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. @@ -282,6 +299,8 @@ static rsRetVal strmCloseFile(strm_t *pThis) if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); + strmWaitAsyncWriterDone(pThis); + close(pThis->fd); pThis->fd = -1; @@ -618,16 +637,14 @@ finalize_it: /* stop the writer thread (we MUST be runnnig asynchronously when this method - * is called!) -- rgerhards, 2009-07-06 + * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06 */ static inline void stopWriter(strm_t *pThis) { BEGINfunc - d_pthread_mutex_lock(&pThis->mut); pThis->bStopWriter = 1; pthread_cond_signal(&pThis->notEmpty); - d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); d_pthread_mutex_unlock(&pThis->mut); pthread_join(pThis->writerThreadID, NULL); ENDfunc @@ -638,9 +655,14 @@ stopWriter(strm_t *pThis) BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(strm) + if(pThis->bAsyncWrite) + /* Note: mutex will be unlocked in stopWriter! */ + d_pthread_mutex_lock(&pThis->mut); + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); +dbgprintf("XXX: destruct stream %p\n", pThis); /* ... then free resources */ if(pThis->fd != -1) strmCloseFile(pThis); @@ -681,6 +703,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) if(pThis->fd == -1) FINALIZE; + /* wait for output to be empty, so that our counts are correct */ + strmWaitAsyncWriterDone(pThis); + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); @@ -783,7 +808,6 @@ doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->iZipLevel) { CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); @@ -811,15 +835,15 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); +dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; - pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); - d_pthread_mutex_unlock(&pThis->mut); finalize_it: RETiRet; @@ -836,7 +860,6 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->bAsyncWrite) { CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); @@ -844,6 +867,8 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ + finalize_it: RETiRet; } @@ -857,7 +882,6 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; - size_t iWritten; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -870,7 +894,7 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { - pthread_cond_signal(&pThis->isEmpty); + pthread_cond_broadcast(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } @@ -878,13 +902,14 @@ asyncWriterThread(void *pPtr) } iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; - iWritten = pThis->asyncBuf[iDeq].lenBuf; - doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); - // TODO: error check!!!!! 2009-07-06 + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); + // TODO: error check????? 2009-07-06 --pThis->iCnt; if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { pthread_cond_signal(&pThis->notFull); + if(pThis->iCnt == 0) + pthread_cond_broadcast(&pThis->isEmpty); } d_pthread_mutex_unlock(&pThis->mut); } @@ -949,7 +974,6 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iWritten = lenBuf; CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; /* update user counter, if provided */ if(pThis->pUsrWCntr != NULL) @@ -1158,7 +1182,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); +//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); @@ -1167,9 +1191,11 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n iOffset = 0; do { +//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } +//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1177,8 +1203,17 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; +//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); + /* now check if the buffer right at the end of the write is full and, if so, + * write it. This seems more natural than waiting (hours?) for the next message... + */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + } +//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); + finalize_it: if(pThis->bAsyncWrite) d_pthread_mutex_unlock(&pThis->mut); diff --git a/tests/diag.sh b/tests/diag.sh index 299c5d71..1514474c 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -5,7 +5,7 @@ # not always able to convey back states to the upper-level test driver # begun 2009-05-27 by rgerhards # This file is part of the rsyslog project, released under GPLv3 -#valgrind="valgrind --log-fd=1" +valgrind="valgrind --log-fd=1" #valgrind="valgrind --tool=drd --log-fd=1" #valgrind="valgrind --tool=helgrind --log-fd=1" #set -o xtrace |