From 53b055b6aabd87fa096edf70a6e58eea6c87f38b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 19:44:53 +0200 Subject: moved zip part to writer thread ... this is necessary in preparation for the final solution (we need to have a "unified" writer). If it causes worse performance to have the zip writher togehter with the synchronous write, we may do an async write... --- runtime/stream.c | 158 +++++++++++++++++++++++++++++---------------------- tests/threadingmq.sh | 4 +- 2 files changed, 91 insertions(+), 71 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index 2bb19caf..dfb43891 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -59,6 +59,8 @@ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); /* methods */ @@ -586,12 +588,10 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } -DBGPRINTF("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; } -DBGPRINTF("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we work asynchronously, we need a couple of synchronization objects */ if(pThis->bAsyncWrite) { @@ -770,7 +770,88 @@ finalize_it: RETiRet; } -#include + + +/* write memory buffer to a stream object. + * To support direct writes of large objects, this method may be called + * with a buffer pointing to some region other than the stream buffer itself. + * However, in that case the stream buffer must be empty (strmFlush() has to + * be called before), because we would otherwise mess up with the sequence + * inside the stream. -- rgerhards, 2008-01-10 + */ +static inline rsRetVal +doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + int iEnq; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + // TODO: optimize, memcopy only for getting it initially going! + //pThis->asyncBuf[iEnq].pBuf = pBuf; + memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); + pThis->asyncBuf[iEnq].lenBuf = lenBuf; + + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + +finalize_it: + RETiRet; +} + + +/* schedule writing to the stream. Depending on our concurrency settings, + * this either directly writes to the stream or schedules writing via + * the background thread. -- rgerhards, 2009-07-07 + */ +static rsRetVal +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf)); + } else { + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + /* This is the writer thread for asynchronous mode. * -- rgerhards, 2009-07-06 @@ -788,7 +869,6 @@ asyncWriterThread(void *pPtr) DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -fprintf(stderr, "async stream writer thread started\n");fflush(stderr); DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); @@ -804,7 +884,8 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; iWritten = pThis->asyncBuf[iDeq].lenBuf; - doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); + // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); // TODO: error check!!!!! 2009-07-06 --pThis->iCnt; @@ -821,36 +902,6 @@ DBGPRINTF("TTT: writer thread shutdown\n"); } -/* This function is called to "do" an async write call, what primarily means that - * the data is handed over to the writer thread (which will then do the actual write - * in parallel. -- rgerhards, 2009-07-06 - */ -static inline rsRetVal -doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) -{ - int iEnq; - DEFiRet; - ISOBJ_TYPE_assert(pThis, strm); - - d_pthread_mutex_lock(&pThis->mut); - while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) - d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - - iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; - // TODO: optimize, memcopy only for getting it initially going! - //pThis->asyncBuf[iEnq].pBuf = pBuf; - memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf); - pThis->asyncBuf[iEnq].lenBuf = *pLenBuf; - - if(++pThis->iCnt == 1) - pthread_cond_signal(&pThis->notEmpty); - d_pthread_mutex_unlock(&pThis->mut); - -finalize_it: - RETiRet; -} - - /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives * fatal failure. Note that we do NOT return an error status if the @@ -903,11 +954,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = lenBuf; - if(pThis->bAsyncWrite) { - CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten)); - } else { - CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - } + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; @@ -989,33 +1036,6 @@ finalize_it: } -/* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 - */ -static rsRetVal -strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) -{ - DEFiRet; - - ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); - - if(pThis->iZipLevel) { - CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); - } else { - /* write without zipping */ - CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); - } - -finalize_it: - RETiRet; -} - - /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -1029,7 +1049,7 @@ strmFlush(strm_t *pThis) dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { - iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); } RETiRet; @@ -1132,7 +1152,7 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n * TODO: is it really? think about disk block sizes! */ CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); } else { /* data fits into a buffer - we just need to see if it * fits into the current buffer... diff --git a/tests/threadingmq.sh b/tests/threadingmq.sh index 3680df5f..5c29ec60 100755 --- a/tests/threadingmq.sh +++ b/tests/threadingmq.sh @@ -9,7 +9,7 @@ echo TEST: threadingmq.sh - main queue concurrency source $srcdir/diag.sh init source $srcdir/diag.sh startup threadingmq.conf -source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 10000000 +source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 100000 source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages -source $srcdir/diag.sh seq-check 0 9999999 +source $srcdir/diag.sh seq-check 0 99999 source $srcdir/diag.sh exit -- cgit