diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-07 08:33:22 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-07 08:33:22 +0200 |
commit | 4e9deb5b88129a397d23b55e3954c6f1c259e466 (patch) | |
tree | 88c6ed72219889de26c70d022e1deafa4a2b46b2 /runtime | |
parent | f27b5cc2535f0b1763e1963304546b611cd3c26a (diff) | |
download | rsyslog-4e9deb5b88129a397d23b55e3954c6f1c259e466.tar.gz rsyslog-4e9deb5b88129a397d23b55e3954c6f1c259e466.tar.xz rsyslog-4e9deb5b88129a397d23b55e3954c6f1c259e466.zip |
stream now uses a singular buffer strucuture for writing
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/stream.c | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index d25f8b02..6924d527 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -603,12 +603,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } - //pThis->pIOBuf = pThis->ioBuf[0]; + pThis->pIOBuf = pThis->asyncBuf[0].pBuf; pThis->bStopWriter = 0; if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); // TODO: remove that below later! - CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); @@ -802,7 +802,10 @@ finalize_it: /* This function is called to "do" an async write call, what primarily means that * the data is handed over to the writer thread (which will then do the actual write * in parallel). Note that the stream mutex has already been locked by the - * strmWrite...() calls. -- rgerhards, 2009-07-06 + * strmWrite...() calls. Also note that we always have only a single producer, + * so we can simply serially assign the next free buffer to it and be sure that + * the very some producer comes back in sequence to submit the then-filled buffers. + * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -814,11 +817,12 @@ doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; // TODO: optimize, memcopy only for getting it initially going! //pThis->asyncBuf[iEnq].pBuf = pBuf; - memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); - pThis->asyncBuf[iEnq].lenBuf = lenBuf; + //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); + pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; + pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -1164,6 +1168,8 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n /* it is - so we do a direct write, that is most efficient. * TODO: is it really? think about disk block sizes! */ +printf("error: we should not have reached this code!\n"); + abort(); CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); } else { |