summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-06 19:44:53 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-06 19:44:53 +0200
commit53b055b6aabd87fa096edf70a6e58eea6c87f38b (patch)
tree07ae1f4e4f8f7e2efbf72ab41f2cae9917135b3c
parent8e76a0521bee36e02e8bce2e97fa3d2aa67130da (diff)
downloadrsyslog-53b055b6aabd87fa096edf70a6e58eea6c87f38b.tar.gz
rsyslog-53b055b6aabd87fa096edf70a6e58eea6c87f38b.tar.xz
rsyslog-53b055b6aabd87fa096edf70a6e58eea6c87f38b.zip
moved zip part to writer thread
... this is necessary in preparation for the final solution (we need to have a "unified" writer). If it causes worse performance to have the zip writher togehter with the synchronous write, we may do an async write...
-rw-r--r--runtime/stream.c158
-rwxr-xr-xtests/threadingmq.sh4
2 files changed, 91 insertions, 71 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 2bb19caf..dfb43891 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -59,6 +59,8 @@ static rsRetVal strmFlush(strm_t *pThis);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
/* methods */
@@ -586,12 +588,10 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
-DBGPRINTF("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite);
/* if we have a flush interval, we need to do async writes in any case */
if(pThis->iFlushInterval != 0) {
pThis->bAsyncWrite = 1;
}
-DBGPRINTF("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite);
/* if we work asynchronously, we need a couple of synchronization objects */
if(pThis->bAsyncWrite) {
@@ -770,7 +770,88 @@ finalize_it:
RETiRet;
}
-#include <stdio.h>
+
+
+/* write memory buffer to a stream object.
+ * To support direct writes of large objects, this method may be called
+ * with a buffer pointing to some region other than the stream buffer itself.
+ * However, in that case the stream buffer must be empty (strmFlush() has to
+ * be called before), because we would otherwise mess up with the sequence
+ * inside the stream. -- rgerhards, 2008-01-10
+ */
+static inline rsRetVal
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
+
+ if(pThis->iZipLevel) {
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ } else {
+ /* write without zipping */
+ CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to "do" an async write call, what primarily means that
+ * the data is handed over to the writer thread (which will then do the actual write
+ * in parallel. -- rgerhards, 2009-07-06
+ */
+static inline rsRetVal
+doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ int iEnq;
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ d_pthread_mutex_lock(&pThis->mut);
+ while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
+ d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
+
+ iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS;
+ // TODO: optimize, memcopy only for getting it initially going!
+ //pThis->asyncBuf[iEnq].pBuf = pBuf;
+ memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf);
+ pThis->asyncBuf[iEnq].lenBuf = lenBuf;
+
+ if(++pThis->iCnt == 1)
+ pthread_cond_signal(&pThis->notEmpty);
+ d_pthread_mutex_unlock(&pThis->mut);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* schedule writing to the stream. Depending on our concurrency settings,
+ * this either directly writes to the stream or schedules writing via
+ * the background thread. -- rgerhards, 2009-07-07
+ */
+static rsRetVal
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
+
+ if(pThis->bAsyncWrite) {
+ CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf));
+ } else {
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
/* This is the writer thread for asynchronous mode.
* -- rgerhards, 2009-07-06
@@ -788,7 +869,6 @@ asyncWriterThread(void *pPtr)
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
}
-fprintf(stderr, "async stream writer thread started\n");fflush(stderr);
DBGPRINTF("TTT: writer thread startup\n");
while(1) { /* loop broken inside */
d_pthread_mutex_lock(&pThis->mut);
@@ -804,7 +884,8 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter)
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
iWritten = pThis->asyncBuf[iDeq].lenBuf;
- doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten);
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten);
+ // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten);
// TODO: error check!!!!! 2009-07-06
--pThis->iCnt;
@@ -821,36 +902,6 @@ DBGPRINTF("TTT: writer thread shutdown\n");
}
-/* This function is called to "do" an async write call, what primarily means that
- * the data is handed over to the writer thread (which will then do the actual write
- * in parallel. -- rgerhards, 2009-07-06
- */
-static inline rsRetVal
-doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
-{
- int iEnq;
- DEFiRet;
- ISOBJ_TYPE_assert(pThis, strm);
-
- d_pthread_mutex_lock(&pThis->mut);
- while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
- d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
-
- iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS;
- // TODO: optimize, memcopy only for getting it initially going!
- //pThis->asyncBuf[iEnq].pBuf = pBuf;
- memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf);
- pThis->asyncBuf[iEnq].lenBuf = *pLenBuf;
-
- if(++pThis->iCnt == 1)
- pthread_cond_signal(&pThis->notEmpty);
- d_pthread_mutex_unlock(&pThis->mut);
-
-finalize_it:
- RETiRet;
-}
-
-
/* sync the file to disk, so that any unwritten data is persisted. This
* also syncs the directory and thus makes sure that the file survives
* fatal failure. Note that we do NOT return an error status if the
@@ -903,11 +954,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
CHKiRet(strmOpenFile(pThis));
iWritten = lenBuf;
- if(pThis->bAsyncWrite) {
- CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten));
- } else {
- CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
- }
+ CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
@@ -989,33 +1036,6 @@ finalize_it:
}
-/* write memory buffer to a stream object.
- * To support direct writes of large objects, this method may be called
- * with a buffer pointing to some region other than the stream buffer itself.
- * However, in that case the stream buffer must be empty (strmFlush() has to
- * be called before), because we would otherwise mess up with the sequence
- * inside the stream. -- rgerhards, 2008-01-10
- */
-static rsRetVal
-strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
-{
- DEFiRet;
-
- ASSERT(pThis != NULL);
- ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
-
- if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
- } else {
- /* write without zipping */
- CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
@@ -1029,7 +1049,7 @@ strmFlush(strm_t *pThis)
dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr);
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
RETiRet;
@@ -1132,7 +1152,7 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n
* TODO: is it really? think about disk block sizes!
*/
CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */
- CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf));
+ CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf));
} else {
/* data fits into a buffer - we just need to see if it
* fits into the current buffer...
diff --git a/tests/threadingmq.sh b/tests/threadingmq.sh
index 3680df5f..5c29ec60 100755
--- a/tests/threadingmq.sh
+++ b/tests/threadingmq.sh
@@ -9,7 +9,7 @@
echo TEST: threadingmq.sh - main queue concurrency
source $srcdir/diag.sh init
source $srcdir/diag.sh startup threadingmq.conf
-source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 10000000
+source $srcdir/diag.sh tcpflood 127.0.0.1 13514 2 100000
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
-source $srcdir/diag.sh seq-check 0 9999999
+source $srcdir/diag.sh seq-check 0 99999
source $srcdir/diag.sh exit