diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-08 15:23:25 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-08 15:23:25 +0200 |
commit | cf906392c3039ffa2526316b44ff28bd1d899061 (patch) | |
tree | 09c83a8866c917207b7d7130de1c88590c5f2b64 /runtime/stream.c | |
parent | 8e4ad77e54c9d9272cef41f71d94ae277965711e (diff) | |
parent | 5221a1e42e16c8c39b48a4a1a18ee6322c38cd17 (diff) | |
download | rsyslog-cf906392c3039ffa2526316b44ff28bd1d899061.tar.gz rsyslog-cf906392c3039ffa2526316b44ff28bd1d899061.tar.xz rsyslog-cf906392c3039ffa2526316b44ff28bd1d899061.zip |
Merge branch 'v4-devel'
Conflicts:
runtime/debug.h
runtime/stream.c
Diffstat (limited to 'runtime/stream.c')
-rw-r--r-- | runtime/stream.c | 444 |
1 files changed, 308 insertions, 136 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index 67941062..4f611a62 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -48,39 +48,25 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" -#include "apc.h" +#include <sys/prctl.h> + +#define inline /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) -DEFobjCurrIf(apc) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); +static void *asyncWriterThread(void *pPtr); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); /* methods */ -/* async flush apc handler - */ -static void -flushApc(void *param1, void __attribute__((unused)) *param2) -{ - DEFVARS_mutexProtection_uncond; - strm_t *pThis = (strm_t*) param1; - ISOBJ_TYPE_assert(pThis, strm); - - BEGINfunc - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - strmFlush(pThis); - pThis->apcRequested = 0; - END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - ENDfunc -} - - /* Try to resolve a size limit situation. This is used to support custom-file size handlers * for omfile. It first runs the command, and then checks if we are still above the size * treshold. Note that this works only with single file names, NOT with circular names. @@ -141,10 +127,11 @@ resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName) finalize_it: if(iRet != RS_RET_OK) { - if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) - dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); - else - dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) { + DBGPRINTF("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); + } else { + DBGPRINTF("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + } pThis->bDisabled = 1; } @@ -282,6 +269,23 @@ finalize_it: } +/* wait for the output writer thread to be done. This must be called before actions + * that require data to be persisted. May be called in non-async mode and is a null + * operation than. Must be called with the mutex locked. + */ +static inline void +strmWaitAsyncWriterDone(strm_t *pThis) +{ + BEGINfunc + if(pThis->bAsyncWrite) { + /* awake writer thread and make it write out everything */ + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } + ENDfunc +} + + /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. @@ -294,8 +298,15 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - if(pThis->tOperationsMode != STREAMMODE_READ) - strmFlush(pThis); + if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { + pThis->bInClose = 1; + if(pThis->bAsyncWrite) { + strmFlush(pThis); + } else { + strmWaitAsyncWriterDone(pThis); + } + pThis->bInClose = 0; + } close(pThis->fd); pThis->fd = -1; @@ -571,11 +582,11 @@ ENDobjConstruct(strm) static rsRetVal strmConstructFinalize(strm_t *pThis) { rsRetVal localRet; + int i; DEFiRet; ASSERT(pThis != NULL); - CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); pThis->iBufPtrMax = 0; /* results in immediate read request */ if(pThis->iZipLevel) { /* do we need a zip buf? */ localRet = objUse(zlibw, LM_ZLIBW_FILENAME); @@ -603,9 +614,28 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } - /* if we should call flush apc's, we need a mutex */ + /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { + pThis->bAsyncWrite = 1; + } + + /* if we work asynchronously, we need a couple of synchronization objects */ + if(pThis->bAsyncWrite) { pthread_mutex_init(&pThis->mut, 0); + pthread_cond_init(&pThis->notFull, 0); + pthread_cond_init(&pThis->notEmpty, 0); + pthread_cond_init(&pThis->isEmpty, 0); + pThis->iCnt = pThis->iEnq = pThis->iDeq = 0; + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } + pThis->pIOBuf = pThis->asyncBuf[0].pBuf; + pThis->bStopWriter = 0; + if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) + DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); + } else { + /* we work synchronously, so we need to alloc a fixed pIOBuf */ + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } finalize_it: @@ -613,12 +643,33 @@ finalize_it: } +/* stop the writer thread (we MUST be runnnig asynchronously when this method + * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06 + */ +static inline void +stopWriter(strm_t *pThis) +{ + BEGINfunc + pThis->bStopWriter = 1; + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + pthread_join(pThis->writerThreadID, NULL); + ENDfunc +} + + /* destructor for the strm object */ BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ + int i; CODESTARTobjDestruct(strm) + if(pThis->bAsyncWrite) + /* Note: mutex will be unlocked in stopWriter! */ + d_pthread_mutex_lock(&pThis->mut); + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); +dbgprintf("XXX: destruct stream %p\n", pThis); /* ... then free resources */ if(pThis->fd != -1) strmCloseFile(pThis); @@ -627,16 +678,23 @@ CODESTARTobjDestruct(strm) objRelease(zlibw, LM_ZLIBW_FILENAME); } - if(pThis->iFlushInterval != 0) { - // TODO: check if there is an apc and remove it! - pthread_mutex_destroy(&pThis->mut); - } - free(pThis->pszDir); - free(pThis->pIOBuf); free(pThis->pZipBuf); free(pThis->pszCurrFName); free(pThis->pszFName); + + if(pThis->bAsyncWrite) { + stopWriter(pThis); + pthread_mutex_destroy(&pThis->mut); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->isEmpty); + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + free(pThis->asyncBuf[i].pBuf); + } + } else { + free(pThis->pIOBuf); + } ENDobjDestruct(strm) @@ -652,6 +710,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) if(pThis->fd == -1) FINALIZE; + /* wait for output to be empty, so that our counts are correct */ + strmWaitAsyncWriterDone(pThis); + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); @@ -732,12 +793,164 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) pWriteBuf += iWritten; } while(lenBuf > 0); /* Warning: do..while()! */ + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + finalize_it: *pLenBuf = iTotalWritten; RETiRet; } + +/* write memory buffer to a stream object. + */ +static inline rsRetVal +doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel). Note that the stream mutex has already been locked by the + * strmWrite...() calls. Also note that we always have only a single producer, + * so we can simply serially assign the next free buffer to it and be sure that + * the very some producer comes back in sequence to submit the then-filled buffers. + * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + +dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf); + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; + pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; + + pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */ + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + +finalize_it: + RETiRet; +} + + +/* schedule writing to the stream. Depending on our concurrency settings, + * this either directly writes to the stream or schedules writing via + * the background thread. -- rgerhards, 2009-07-07 + */ +static rsRetVal +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); + } else { + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); + } + + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ + +finalize_it: + RETiRet; +} + + + +/* This is the writer thread for asynchronous mode. + * -- rgerhards, 2009-07-06 + */ +static void* +asyncWriterThread(void *pPtr) +{ + int iDeq; + struct timespec t; + bool bTimedOut = 0; + strm_t *pThis = (strm_t*) pPtr; + ISOBJ_TYPE_assert(pThis, strm); + + BEGINfunc + if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); + } + + while(1) { /* loop broken inside */ + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt == 0) { + if(pThis->bStopWriter) { + pthread_cond_broadcast(&pThis->isEmpty); + d_pthread_mutex_unlock(&pThis->mut); + goto finalize_it; /* break main loop */ + } + if(bTimedOut && pThis->iBufPtr > 0) { +RUNLOG_STR("XXX: we had a timeout in stream writer"); + /* if we timed out, we need to flush pending data */ + strmFlush(pThis); + bTimedOut = 0; + continue; /* now we should have data */ + } + bTimedOut = 0; + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + if(pThis->bDoTimedWait) { + if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { + int err = errno; + if(err == ETIMEDOUT) { + bTimedOut = 1; + } else { + bTimedOut = 1; + char errStr[1024]; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", + err, errStr); + } + } + } else { + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } + } + + bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ + + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); + // TODO: error check????? 2009-07-06 + + --pThis->iCnt; + if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { + pthread_cond_signal(&pThis->notFull); + if(pThis->iCnt == 0) + pthread_cond_broadcast(&pThis->isEmpty); + } + d_pthread_mutex_unlock(&pThis->mut); + } + +finalize_it: + ENDfunc + return NULL; /* to keep pthreads happy */ +} + + /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives * fatal failure. Note that we do NOT return an error status if the @@ -791,9 +1004,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iWritten = lenBuf; CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); - pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; /* update user counter, if provided */ if(pThis->pUsrWCntr != NULL) @@ -841,7 +1052,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) /* see note in file header for the params we use with deflateInit2() */ zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); if(zRet != Z_OK) { - dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet); + DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } @@ -851,11 +1062,11 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) /* run deflate() on input until output buffer not full, finish compression if all of source has been read in */ do { - dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); zstrm.avail_out = pThis->sIOBufSize; zstrm.next_out = pThis->pZipBuf; zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ - dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); } while (zstrm.avail_out == 0); @@ -864,7 +1075,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) zRet = zlibw.DeflateEnd(&zstrm); if(zRet != Z_OK) { - dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet); + DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } @@ -873,33 +1084,6 @@ finalize_it: } -/* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 - */ -static rsRetVal -strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) -{ - DEFiRet; - - ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); - - if(pThis->iZipLevel) { - CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); - } else { - /* write without zipping */ - CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); - } - -finalize_it: - RETiRet; -} - - /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -913,7 +1097,7 @@ strmFlush(strm_t *pThis) dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { - iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); } RETiRet; @@ -966,6 +1150,12 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) ASSERT(pThis != NULL); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -975,11 +1165,18 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) pThis->iBufPtr++; finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + RETiRet; } -/* write an integer value (actually a long) to a stream object */ +/* write an integer value (actually a long) to a stream object + * Note that we do not need to lock the mutex here, because we call + * strmWrite(), which does the lock (aka: we must not lock it, else we + * would run into a recursive lock, resulting in a deadlock!) + */ static rsRetVal strmWriteLong(strm_t *pThis, long i) { DEFiRet; @@ -995,89 +1192,65 @@ finalize_it: } -/* schedule an Apc flush request. - * rgerhards, 2009-06-15 - */ -static inline rsRetVal -scheduleFlushRequest(strm_t *pThis) -{ - apc_t *pApc; - DEFiRet; - - if(!pThis->apcRequested) { - /* we do an request only if none is yet pending */ - pThis->apcRequested = 1; - // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID)); -dbgprintf("XXX: requesting to add apc!\n"); - CHKiRet(apc.Construct(&pApc)); - CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); - CHKiRet(apc.SetParam1(pApc, pThis)); - CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); - } - -finalize_it: - RETiRet; -} - - -/* write memory buffer to a stream object +/* write memory buffer to a stream object. + * process the data in chunks and copy it over to our buffer. The caller-provided data + * may theoritically be larger than our buffer. In that case, we do multiple copies. One + * may argue if it were more efficient to write out the caller-provided buffer in that case + * and earlier versions of rsyslog did this. However, this introduces a lot of complexity + * inside the buffered writer and potential performance bottlenecks when trying to solve + * it. Now keep in mind that we actually do (almost?) never have a case where the + * caller-provided buffer is larger than our one. So instead of optimizing a case + * which normally does not exist, we expect some degradation in its case but make us + * perform better in the regular cases. -- rgerhards, 2009-07-07 */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { - DEFVARS_mutexProtection_uncond; DEFiRet; - size_t iPartial; + size_t iWrite; + size_t iOffset; ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); +//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); -RUNLOG_VAR("%d", pThis->iFlushInterval); - if(pThis->iFlushInterval != 0) { - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - } - - /* check if the to-be-written data is larger than our buffer size */ - if(lenBuf >= pThis->sIOBufSize) { - /* it is - so we do a direct write, that is most efficient. - * TODO: is it really? think about disk block sizes! - */ - CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); - } else { - /* data fits into a buffer - we just need to see if it - * fits into the current buffer... - */ - if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { - /* nope, so we must split it */ - iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ - if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); - pThis->iBufPtr += iPartial; - } + iOffset = 0; + do { + if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ - memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); - pThis->iBufPtr = lenBuf - iPartial; - } else { - /* we have space, so we simply copy over the string */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); - pThis->iBufPtr += lenBuf; } - } - - /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at - * termination. For Zip mode, it could be fatal if we write after each record. + iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iWrite > lenBuf) + iWrite = lenBuf; + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite); + pThis->iBufPtr += iWrite; + iOffset += iWrite; + lenBuf -= iWrite; + } while(lenBuf > 0); + + /* now check if the buffer right at the end of the write is full and, if so, + * write it. This seems more natural than waiting (hours?) for the next message... */ - if(pThis->iFlushInterval != 0) - scheduleFlushRequest(pThis); + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + } finalize_it: - if(pThis->iFlushInterval != 0) { - END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + if(pThis->bAsyncWrite) { + if(pThis->bDoTimedWait == 0) { + /* we potentially have a partial buffer, so re-activate the + * writer thread that it can set and pick up timeouts. + */ + pThis->bDoTimedWait = 1; + pthread_cond_signal(&pThis->notEmpty); + } + d_pthread_mutex_unlock(&pThis->mut); } RETiRet; @@ -1433,7 +1606,6 @@ ENDobjQueryInterface(strm) */ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ - CHKiRet(objUse(apc, CORE_COMPONENT)); OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); |