summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-07 08:33:22 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-07 08:33:22 +0200
commit4e9deb5b88129a397d23b55e3954c6f1c259e466 (patch)
tree88c6ed72219889de26c70d022e1deafa4a2b46b2
parentf27b5cc2535f0b1763e1963304546b611cd3c26a (diff)
downloadrsyslog-4e9deb5b88129a397d23b55e3954c6f1c259e466.tar.gz
rsyslog-4e9deb5b88129a397d23b55e3954c6f1c259e466.tar.xz
rsyslog-4e9deb5b88129a397d23b55e3954c6f1c259e466.zip
stream now uses a singular buffer strucuture for writing
-rw-r--r--runtime/stream.c18
1 files changed, 12 insertions, 6 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index d25f8b02..6924d527 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -603,12 +603,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) {
CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
}
- //pThis->pIOBuf = pThis->ioBuf[0];
+ pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
pThis->bStopWriter = 0;
if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
// TODO: remove that below later!
- CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
} else {
/* we work synchronously, so we need to alloc a fixed pIOBuf */
CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
@@ -802,7 +802,10 @@ finalize_it:
/* This function is called to "do" an async write call, what primarily means that
* the data is handed over to the writer thread (which will then do the actual write
* in parallel). Note that the stream mutex has already been locked by the
- * strmWrite...() calls. -- rgerhards, 2009-07-06
+ * strmWrite...() calls. Also note that we always have only a single producer,
+ * so we can simply serially assign the next free buffer to it and be sure that
+ * the very some producer comes back in sequence to submit the then-filled buffers.
+ * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06
*/
static inline rsRetVal
doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
@@ -814,11 +817,12 @@ doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
- iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS;
+ //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS;
// TODO: optimize, memcopy only for getting it initially going!
//pThis->asyncBuf[iEnq].pBuf = pBuf;
- memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf);
- pThis->asyncBuf[iEnq].lenBuf = lenBuf;
+ //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf);
+ pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf;
+ pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf;
if(++pThis->iCnt == 1)
pthread_cond_signal(&pThis->notEmpty);
@@ -1164,6 +1168,8 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n
/* it is - so we do a direct write, that is most efficient.
* TODO: is it really? think about disk block sizes!
*/
+printf("error: we should not have reached this code!\n");
+ abort();
CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */
CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf));
} else {