diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-07 09:16:14 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-07 09:16:14 +0200 |
commit | 53aa68fc7d78279ab1af88de253a6134fc7ef00d (patch) | |
tree | 7986d3f5d3d8f77f2d4c489ef3086c6840871f99 | |
parent | 4e9deb5b88129a397d23b55e3954c6f1c259e466 (diff) | |
download | rsyslog-53aa68fc7d78279ab1af88de253a6134fc7ef00d.tar.gz rsyslog-53aa68fc7d78279ab1af88de253a6134fc7ef00d.tar.xz rsyslog-53aa68fc7d78279ab1af88de253a6134fc7ef00d.zip |
clean solution for "writing" arbrietary-size user buffers to a stream
-rw-r--r-- | runtime/stream.c | 70 |
1 files changed, 28 insertions, 42 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index 6924d527..92122215 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -607,8 +607,6 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) pThis->bStopWriter = 0; if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); - // TODO: remove that below later! - //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); @@ -808,19 +806,14 @@ finalize_it: * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ static inline rsRetVal -doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) { - int iEnq; DEFiRet; ISOBJ_TYPE_assert(pThis, strm); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; - // TODO: optimize, memcopy only for getting it initially going! - //pThis->asyncBuf[iEnq].pBuf = pBuf; - //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; @@ -846,7 +839,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->bAsyncWrite) { - CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); } else { CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } @@ -873,11 +866,9 @@ asyncWriterThread(void *pPtr) DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { -DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); if(pThis->bStopWriter) { pthread_cond_signal(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); @@ -889,7 +880,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; iWritten = pThis->asyncBuf[iDeq].lenBuf; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); - // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); // TODO: error check!!!!! 2009-07-06 --pThis->iCnt; @@ -900,7 +890,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) } finalize_it: -DBGPRINTF("TTT: writer thread shutdown\n"); ENDfunc return NULL; /* to keep pthreads happy */ } @@ -1109,6 +1098,9 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -1145,13 +1137,23 @@ finalize_it: } -/* write memory buffer to a stream object +/* write memory buffer to a stream object. + * process the data in chunks and copy it over to our buffer. The caller-provided data + * may theoritically be larger than our buffer. In that case, we do multiple copies. One + * may argue if it were more efficient to write out the caller-provided buffer in that case + * and earlier versions of rsyslog did this. However, this introduces a lot of complexity + * inside the buffered writer and potential performance bottlenecks when trying to solve + * it. Now keep in mind that we actually do (almost?) never have a case where the + * caller-provided buffer is larger than our one. So instead of optimizing a case + * which normally does not exist, we expect some degradation in its case but make us + * perform better in the regular cases. -- rgerhards, 2009-07-07 */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; - size_t iPartial; + size_t iWrite; + size_t iOffset; ASSERT(pThis != NULL); ASSERT(pBuf != NULL); @@ -1163,35 +1165,19 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); - /* check if the to-be-written data is larger than our buffer size */ - if(lenBuf >= pThis->sIOBufSize) { - /* it is - so we do a direct write, that is most efficient. - * TODO: is it really? think about disk block sizes! - */ -printf("error: we should not have reached this code!\n"); - abort(); - CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); - } else { - /* data fits into a buffer - we just need to see if it - * fits into the current buffer... - */ - if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { - /* nope, so we must split it */ - iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ - if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); - pThis->iBufPtr += iPartial; - } + iOffset = 0; + do { + if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ - memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); - pThis->iBufPtr = lenBuf - iPartial; - } else { - /* we have space, so we simply copy over the string */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); - pThis->iBufPtr += lenBuf; } - } + iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iWrite > lenBuf) + iWrite = lenBuf; + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite); + pThis->iBufPtr += iWrite; + iOffset += iWrite; + lenBuf -= iWrite; + } while(lenBuf > 0); finalize_it: if(pThis->bAsyncWrite) |