summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-07 09:16:14 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-07 09:16:14 +0200
commit53aa68fc7d78279ab1af88de253a6134fc7ef00d (patch)
tree7986d3f5d3d8f77f2d4c489ef3086c6840871f99
parent4e9deb5b88129a397d23b55e3954c6f1c259e466 (diff)
downloadrsyslog-53aa68fc7d78279ab1af88de253a6134fc7ef00d.tar.gz
rsyslog-53aa68fc7d78279ab1af88de253a6134fc7ef00d.tar.xz
rsyslog-53aa68fc7d78279ab1af88de253a6134fc7ef00d.zip
clean solution for "writing" arbrietary-size user buffers to a stream
-rw-r--r--runtime/stream.c70
1 files changed, 28 insertions, 42 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 6924d527..92122215 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -607,8 +607,6 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
pThis->bStopWriter = 0;
if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
- // TODO: remove that below later!
- //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
} else {
/* we work synchronously, so we need to alloc a fixed pIOBuf */
CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
@@ -808,19 +806,14 @@ finalize_it:
* This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06
*/
static inline rsRetVal
-doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
{
- int iEnq;
DEFiRet;
ISOBJ_TYPE_assert(pThis, strm);
while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
- //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS;
- // TODO: optimize, memcopy only for getting it initially going!
- //pThis->asyncBuf[iEnq].pBuf = pBuf;
- //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf);
pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf;
pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf;
@@ -846,7 +839,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
if(pThis->bAsyncWrite) {
- CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf));
+ CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
} else {
CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
}
@@ -873,11 +866,9 @@ asyncWriterThread(void *pPtr)
DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer");
}
-DBGPRINTF("TTT: writer thread startup\n");
while(1) { /* loop broken inside */
d_pthread_mutex_lock(&pThis->mut);
while(pThis->iCnt == 0) {
-DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter);
if(pThis->bStopWriter) {
pthread_cond_signal(&pThis->isEmpty);
d_pthread_mutex_unlock(&pThis->mut);
@@ -889,7 +880,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter)
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
iWritten = pThis->asyncBuf[iDeq].lenBuf;
doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten);
- // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten);
// TODO: error check!!!!! 2009-07-06
--pThis->iCnt;
@@ -900,7 +890,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter)
}
finalize_it:
-DBGPRINTF("TTT: writer thread shutdown\n");
ENDfunc
return NULL; /* to keep pthreads happy */
}
@@ -1109,6 +1098,9 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
if(pThis->bAsyncWrite)
d_pthread_mutex_lock(&pThis->mut);
+ if(pThis->bDisabled)
+ ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis));
@@ -1145,13 +1137,23 @@ finalize_it:
}
-/* write memory buffer to a stream object
+/* write memory buffer to a stream object.
+ * process the data in chunks and copy it over to our buffer. The caller-provided data
+ * may theoritically be larger than our buffer. In that case, we do multiple copies. One
+ * may argue if it were more efficient to write out the caller-provided buffer in that case
+ * and earlier versions of rsyslog did this. However, this introduces a lot of complexity
+ * inside the buffered writer and potential performance bottlenecks when trying to solve
+ * it. Now keep in mind that we actually do (almost?) never have a case where the
+ * caller-provided buffer is larger than our one. So instead of optimizing a case
+ * which normally does not exist, we expect some degradation in its case but make us
+ * perform better in the regular cases. -- rgerhards, 2009-07-07
*/
static rsRetVal
strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
- size_t iPartial;
+ size_t iWrite;
+ size_t iOffset;
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
@@ -1163,35 +1165,19 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n
if(pThis->bDisabled)
ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
- /* check if the to-be-written data is larger than our buffer size */
- if(lenBuf >= pThis->sIOBufSize) {
- /* it is - so we do a direct write, that is most efficient.
- * TODO: is it really? think about disk block sizes!
- */
-printf("error: we should not have reached this code!\n");
- abort();
- CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */
- CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf));
- } else {
- /* data fits into a buffer - we just need to see if it
- * fits into the current buffer...
- */
- if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) {
- /* nope, so we must split it */
- iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
- if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial);
- pThis->iBufPtr += iPartial;
- }
+ iOffset = 0;
+ do {
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
- memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial);
- pThis->iBufPtr = lenBuf - iPartial;
- } else {
- /* we have space, so we simply copy over the string */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
- pThis->iBufPtr += lenBuf;
}
- }
+ iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
+ if(iWrite > lenBuf)
+ iWrite = lenBuf;
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite);
+ pThis->iBufPtr += iWrite;
+ iOffset += iWrite;
+ lenBuf -= iWrite;
+ } while(lenBuf > 0);
finalize_it:
if(pThis->bAsyncWrite)