From e3040285dbf0854443bc2443e0de5ac59f6f839e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 16:38:09 +0200 Subject: first shot at asynchronous stream writer with timeout capability ... seems to work on quick testing, but needs a far more testing and improvement. Good milestone commit. --- runtime/stream.c | 216 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 146 insertions(+), 70 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 00c726d9..a9f4803f 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -48,37 +48,21 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" -#include "apc.h" +#include /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) -DEFobjCurrIf(apc) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); +static void *asyncWriterThread(void *pPtr); /* methods */ -/* async flush apc handler - */ -static void -flushApc(void *param1, void __attribute__((unused)) *param2) -{ - DEFVARS_mutexProtection_uncond; - strm_t *pThis = (strm_t*) param1; - ISOBJ_TYPE_assert(pThis, strm); - - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - strmFlush(pThis); - pThis->apcRequested = 0; - END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); -} - - /* Try to resolve a size limit situation. This is used to support custom-file size handlers * for omfile. It first runs the command, and then checks if we are still above the size * treshold. Note that this works only with single file names, NOT with circular names. @@ -569,11 +553,11 @@ ENDobjConstruct(strm) static rsRetVal strmConstructFinalize(strm_t *pThis) { rsRetVal localRet; + int i; DEFiRet; ASSERT(pThis != NULL); - CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); pThis->iBufPtrMax = 0; /* results in immediate read request */ if(pThis->iZipLevel) { /* do we need a zip buf? */ localRet = objUse(zlibw, LM_ZLIBW_FILENAME); @@ -601,9 +585,33 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } - /* if we should call flush apc's, we need a mutex */ +dbgprintf("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); + /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { + pThis->bAsyncWrite = 1; + } +dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); + + /* if we work asynchronously, we need a couple of synchronization objects */ + if(pThis->bAsyncWrite) { pthread_mutex_init(&pThis->mut, 0); + pthread_cond_init(&pThis->notFull, 0); + pthread_cond_init(&pThis->notEmpty, 0); + pthread_cond_init(&pThis->isEmpty, 0); + pThis->iCnt = pThis->iEnq = pThis->iDeq = 0; + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } + //pThis->pIOBuf = pThis->ioBuf[0]; + pThis->bStopWriter = 0; + // TODO: detached thread? + if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) + dbgprintf("ERROR: stream %p cold not create writer thread\n", pThis); + // TODO: remove that below later! + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + } else { + /* we work synchronously, so we need to alloc a fixed pIOBuf */ + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } finalize_it: @@ -611,8 +619,26 @@ finalize_it: } +/* stop the writer thread (we MUST be runnnig asynchronously when this method + * is called!) -- rgerhards, 2009-07-06 + */ +static inline void +stopWriter(strm_t *pThis) +{ + BEGINfunc + d_pthread_mutex_lock(&pThis->mut); + pThis->bStopWriter = 1; + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + d_pthread_mutex_unlock(&pThis->mut); + pthread_join(pThis->writerThreadID, NULL); + ENDfunc +} + + /* destructor for the strm object */ BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ + int i; CODESTARTobjDestruct(strm) if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); @@ -625,16 +651,23 @@ CODESTARTobjDestruct(strm) objRelease(zlibw, LM_ZLIBW_FILENAME); } - if(pThis->iFlushInterval != 0) { - // TODO: check if there is an apc and remove it! - pthread_mutex_destroy(&pThis->mut); - } - free(pThis->pszDir); - free(pThis->pIOBuf); free(pThis->pZipBuf); free(pThis->pszCurrFName); free(pThis->pszFName); + + if(pThis->bAsyncWrite) { + stopWriter(pThis); + pthread_mutex_destroy(&pThis->mut); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->isEmpty); + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + free(pThis->asyncBuf[i].pBuf); + } + } else { + free(pThis->pIOBuf); + } ENDobjDestruct(strm) @@ -730,11 +763,93 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) pWriteBuf += iWritten; } while(lenBuf > 0); /* Warning: do..while()! */ + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + finalize_it: *pLenBuf = iTotalWritten; RETiRet; } +#include + +/* This is the writer thread for asynchronous mode. + * -- rgerhards, 2009-07-06 + */ +static void* +asyncWriterThread(void *pPtr) +{ + int iDeq; + size_t iWritten; + strm_t *pThis = (strm_t*) pPtr; + ISOBJ_TYPE_assert(pThis, strm); + + BEGINfunc + if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); + } + +fprintf(stderr, "async stream writer thread started\n");fflush(stderr); +dbgprintf("TTT: writer thread startup\n"); + while(1) { /* loop broken inside */ + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt == 0) { +dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); + if(pThis->bStopWriter) { + pthread_cond_signal(&pThis->isEmpty); + d_pthread_mutex_unlock(&pThis->mut); + goto finalize_it; /* break main loop */ + } + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } + + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; + iWritten = pThis->asyncBuf[iDeq].lenBuf; + doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); + // TODO: error check!!!!! 2009-07-06 + + --pThis->iCnt; + if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { + pthread_cond_signal(&pThis->notFull); + } + d_pthread_mutex_unlock(&pThis->mut); + } + +finalize_it: +dbgprintf("TTT: writer thread shutdown\n"); + ENDfunc + return NULL; /* to keep pthreads happy */ +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) +{ + int iEnq; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + // TODO: optimize, memcopy only for getting it initially going! + //pThis->asyncBuf[iEnq].pBuf = pBuf; + memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf); + pThis->asyncBuf[iEnq].lenBuf = *pLenBuf; + + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + +finalize_it: + RETiRet; +} + /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives @@ -788,8 +903,11 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = lenBuf; - CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten)); + } else { + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); + } pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; @@ -993,37 +1111,11 @@ finalize_it: } -/* schedule an Apc flush request. - * rgerhards, 2009-06-15 - */ -static inline rsRetVal -scheduleFlushRequest(strm_t *pThis) -{ - apc_t *pApc; - DEFiRet; - - if(!pThis->apcRequested) { - /* we do an request only if none is yet pending */ - pThis->apcRequested = 1; - // TODO: find similar thing later CHKiRet(apc.CancelApc(pThis->apcID)); -dbgprintf("XXX: requesting to add apc!\n"); - CHKiRet(apc.Construct(&pApc)); - CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); - CHKiRet(apc.SetParam1(pApc, pThis)); - CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); - } - -finalize_it: - RETiRet; -} - - /* write memory buffer to a stream object */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { - DEFVARS_mutexProtection_uncond; DEFiRet; size_t iPartial; @@ -1034,11 +1126,6 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); -RUNLOG_VAR("%d", pThis->iFlushInterval); - if(pThis->iFlushInterval != 0) { - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - } - /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -1067,17 +1154,7 @@ RUNLOG_VAR("%d", pThis->iFlushInterval); } } - /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at - * termination. For Zip mode, it could be fatal if we write after each record. - */ - if(pThis->iFlushInterval != 0) - scheduleFlushRequest(pThis); - finalize_it: - if(pThis->iFlushInterval != 0) { - END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); - } - RETiRet; } @@ -1390,7 +1467,6 @@ ENDobjQueryInterface(strm) */ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ - CHKiRet(objUse(apc, CORE_COMPONENT)); OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); -- cgit From 8e76a0521bee36e02e8bce2e97fa3d2aa67130da Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 19:28:22 +0200 Subject: some minor cleanup --- runtime/stream.c | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index a9f4803f..2bb19caf 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -123,10 +123,11 @@ resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName) finalize_it: if(iRet != RS_RET_OK) { - if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) - dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); - else - dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) { + DBGPRINTF("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName); + } else { + DBGPRINTF("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet); + } pThis->bDisabled = 1; } @@ -585,12 +586,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } -dbgprintf("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); +DBGPRINTF("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; } -dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); +DBGPRINTF("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we work asynchronously, we need a couple of synchronization objects */ if(pThis->bAsyncWrite) { @@ -604,9 +605,8 @@ dbgprintf("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlus } //pThis->pIOBuf = pThis->ioBuf[0]; pThis->bStopWriter = 0; - // TODO: detached thread? if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) - dbgprintf("ERROR: stream %p cold not create writer thread\n", pThis); + DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); // TODO: remove that below later! CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { @@ -789,11 +789,11 @@ asyncWriterThread(void *pPtr) } fprintf(stderr, "async stream writer thread started\n");fflush(stderr); -dbgprintf("TTT: writer thread startup\n"); +DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { -dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); +DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); if(pThis->bStopWriter) { pthread_cond_signal(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); @@ -815,7 +815,7 @@ dbgprintf("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) } finalize_it: -dbgprintf("TTT: writer thread shutdown\n"); +DBGPRINTF("TTT: writer thread shutdown\n"); ENDfunc return NULL; /* to keep pthreads happy */ } @@ -957,7 +957,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) /* see note in file header for the params we use with deflateInit2() */ zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); if(zRet != Z_OK) { - dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet); + DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } @@ -967,11 +967,11 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) /* run deflate() on input until output buffer not full, finish compression if all of source has been read in */ do { - dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); zstrm.avail_out = pThis->sIOBufSize; zstrm.next_out = pThis->pZipBuf; zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ - dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); } while (zstrm.avail_out == 0); @@ -980,7 +980,7 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) zRet = zlibw.DeflateEnd(&zstrm); if(zRet != Z_OK) { - dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet); + DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } @@ -1122,7 +1122,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); +DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); -- cgit From 53b055b6aabd87fa096edf70a6e58eea6c87f38b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 6 Jul 2009 19:44:53 +0200 Subject: moved zip part to writer thread ... this is necessary in preparation for the final solution (we need to have a "unified" writer). If it causes worse performance to have the zip writher togehter with the synchronous write, we may do an async write... --- runtime/stream.c | 158 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 89 insertions(+), 69 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 2bb19caf..dfb43891 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -59,6 +59,8 @@ static rsRetVal strmFlush(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); /* methods */ @@ -586,12 +588,10 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } -DBGPRINTF("TTT: before checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; } -DBGPRINTF("TTT: after checks: iFlushInterval %d, bAsyncWrite %d\n", pThis->iFlushInterval, pThis->bAsyncWrite); /* if we work asynchronously, we need a couple of synchronization objects */ if(pThis->bAsyncWrite) { @@ -770,7 +770,88 @@ finalize_it: RETiRet; } -#include + + +/* write memory buffer to a stream object. + * To support direct writes of large objects, this method may be called + * with a buffer pointing to some region other than the stream buffer itself. + * However, in that case the stream buffer must be empty (strmFlush() has to + * be called before), because we would otherwise mess up with the sequence + * inside the stream. -- rgerhards, 2008-01-10 + */ +static inline rsRetVal +doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel. -- rgerhards, 2009-07-06 + */ +static inline rsRetVal +doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + int iEnq; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + d_pthread_mutex_lock(&pThis->mut); + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + // TODO: optimize, memcopy only for getting it initially going! + //pThis->asyncBuf[iEnq].pBuf = pBuf; + memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); + pThis->asyncBuf[iEnq].lenBuf = lenBuf; + + if(++pThis->iCnt == 1) + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + +finalize_it: + RETiRet; +} + + +/* schedule writing to the stream. Depending on our concurrency settings, + * this either directly writes to the stream or schedules writing via + * the background thread. -- rgerhards, 2009-07-07 + */ +static rsRetVal +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); + + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf)); + } else { + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + /* This is the writer thread for asynchronous mode. * -- rgerhards, 2009-07-06 @@ -788,7 +869,6 @@ asyncWriterThread(void *pPtr) DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -fprintf(stderr, "async stream writer thread started\n");fflush(stderr); DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); @@ -804,7 +884,8 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; iWritten = pThis->asyncBuf[iDeq].lenBuf; - doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); + // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); // TODO: error check!!!!! 2009-07-06 --pThis->iCnt; @@ -821,36 +902,6 @@ DBGPRINTF("TTT: writer thread shutdown\n"); } -/* This function is called to "do" an async write call, what primarily means that - * the data is handed over to the writer thread (which will then do the actual write - * in parallel. -- rgerhards, 2009-07-06 - */ -static inline rsRetVal -doAsyncWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) -{ - int iEnq; - DEFiRet; - ISOBJ_TYPE_assert(pThis, strm); - - d_pthread_mutex_lock(&pThis->mut); - while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) - d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - - iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; - // TODO: optimize, memcopy only for getting it initially going! - //pThis->asyncBuf[iEnq].pBuf = pBuf; - memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, *pLenBuf); - pThis->asyncBuf[iEnq].lenBuf = *pLenBuf; - - if(++pThis->iCnt == 1) - pthread_cond_signal(&pThis->notEmpty); - d_pthread_mutex_unlock(&pThis->mut); - -finalize_it: - RETiRet; -} - - /* sync the file to disk, so that any unwritten data is persisted. This * also syncs the directory and thus makes sure that the file survives * fatal failure. Note that we do NOT return an error status if the @@ -903,11 +954,7 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = lenBuf; - if(pThis->bAsyncWrite) { - CHKiRet(doAsyncWriteCall(pThis, pBuf, &iWritten)); - } else { - CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - } + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; @@ -989,33 +1036,6 @@ finalize_it: } -/* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 - */ -static rsRetVal -strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) -{ - DEFiRet; - - ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); - - if(pThis->iZipLevel) { - CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); - } else { - /* write without zipping */ - CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); - } - -finalize_it: - RETiRet; -} - - /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -1029,7 +1049,7 @@ strmFlush(strm_t *pThis) dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { - iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); } RETiRet; @@ -1132,7 +1152,7 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n * TODO: is it really? think about disk block sizes! */ CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); } else { /* data fits into a buffer - we just need to see if it * fits into the current buffer... -- cgit From f27b5cc2535f0b1763e1963304546b611cd3c26a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 08:08:22 +0200 Subject: moved locking primitives --- runtime/stream.c | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index dfb43891..d25f8b02 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -801,7 +801,8 @@ finalize_it: /* This function is called to "do" an async write call, what primarily means that * the data is handed over to the writer thread (which will then do the actual write - * in parallel. -- rgerhards, 2009-07-06 + * in parallel). Note that the stream mutex has already been locked by the + * strmWrite...() calls. -- rgerhards, 2009-07-06 */ static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -810,7 +811,6 @@ doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); - d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); @@ -1102,6 +1102,9 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) ASSERT(pThis != NULL); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -1111,11 +1114,18 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) pThis->iBufPtr++; finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + RETiRet; } -/* write an integer value (actually a long) to a stream object */ +/* write an integer value (actually a long) to a stream object + * Note that we do not need to lock the mutex here, because we call + * strmWrite(), which does the lock (aka: we must not lock it, else we + * would run into a recursive lock, resulting in a deadlock!) + */ static rsRetVal strmWriteLong(strm_t *pThis, long i) { DEFiRet; @@ -1143,6 +1153,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pBuf != NULL); DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); @@ -1175,6 +1188,9 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n } finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + RETiRet; } -- cgit From 4e9deb5b88129a397d23b55e3954c6f1c259e466 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 08:33:22 +0200 Subject: stream now uses a singular buffer strucuture for writing --- runtime/stream.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index d25f8b02..6924d527 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -603,12 +603,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } - //pThis->pIOBuf = pThis->ioBuf[0]; + pThis->pIOBuf = pThis->asyncBuf[0].pBuf; pThis->bStopWriter = 0; if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); // TODO: remove that below later! - CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); + //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); @@ -802,7 +802,10 @@ finalize_it: /* This function is called to "do" an async write call, what primarily means that * the data is handed over to the writer thread (which will then do the actual write * in parallel). Note that the stream mutex has already been locked by the - * strmWrite...() calls. -- rgerhards, 2009-07-06 + * strmWrite...() calls. Also note that we always have only a single producer, + * so we can simply serially assign the next free buffer to it and be sure that + * the very some producer comes back in sequence to submit the then-filled buffers. + * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -814,11 +817,12 @@ doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; + //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; // TODO: optimize, memcopy only for getting it initially going! //pThis->asyncBuf[iEnq].pBuf = pBuf; - memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); - pThis->asyncBuf[iEnq].lenBuf = lenBuf; + //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); + pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; + pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -1164,6 +1168,8 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n /* it is - so we do a direct write, that is most efficient. * TODO: is it really? think about disk block sizes! */ +printf("error: we should not have reached this code!\n"); + abort(); CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); } else { -- cgit From 53aa68fc7d78279ab1af88de253a6134fc7ef00d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 09:16:14 +0200 Subject: clean solution for "writing" arbrietary-size user buffers to a stream --- runtime/stream.c | 70 +++++++++++++++++++++++--------------------------------- 1 file changed, 28 insertions(+), 42 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 6924d527..92122215 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -607,8 +607,6 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) pThis->bStopWriter = 0; if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); - // TODO: remove that below later! - //CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)); @@ -808,19 +806,14 @@ finalize_it: * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ static inline rsRetVal -doAsyncWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) { - int iEnq; DEFiRet; ISOBJ_TYPE_assert(pThis, strm); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); - //iEnq = pThis->iEnq++ % STREAM_ASYNC_NUMBUFS; - // TODO: optimize, memcopy only for getting it initially going! - //pThis->asyncBuf[iEnq].pBuf = pBuf; - //memcpy(pThis->asyncBuf[iEnq].pBuf, pBuf, lenBuf); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; @@ -846,7 +839,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->bAsyncWrite) { - CHKiRet(doAsyncWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); } else { CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } @@ -873,11 +866,9 @@ asyncWriterThread(void *pPtr) DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -DBGPRINTF("TTT: writer thread startup\n"); while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { -DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter); if(pThis->bStopWriter) { pthread_cond_signal(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); @@ -889,7 +880,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; iWritten = pThis->asyncBuf[iDeq].lenBuf; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); - // doWriteCall(pThis, pThis->asyncBuf[iDeq].pBuf, &iWritten); // TODO: error check!!!!! 2009-07-06 --pThis->iCnt; @@ -900,7 +890,6 @@ DBGPRINTF("TTT: writer thread empty queue, stopWriter=%d\n", pThis->bStopWriter) } finalize_it: -DBGPRINTF("TTT: writer thread shutdown\n"); ENDfunc return NULL; /* to keep pthreads happy */ } @@ -1109,6 +1098,9 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); @@ -1145,13 +1137,23 @@ finalize_it: } -/* write memory buffer to a stream object +/* write memory buffer to a stream object. + * process the data in chunks and copy it over to our buffer. The caller-provided data + * may theoritically be larger than our buffer. In that case, we do multiple copies. One + * may argue if it were more efficient to write out the caller-provided buffer in that case + * and earlier versions of rsyslog did this. However, this introduces a lot of complexity + * inside the buffered writer and potential performance bottlenecks when trying to solve + * it. Now keep in mind that we actually do (almost?) never have a case where the + * caller-provided buffer is larger than our one. So instead of optimizing a case + * which normally does not exist, we expect some degradation in its case but make us + * perform better in the regular cases. -- rgerhards, 2009-07-07 */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; - size_t iPartial; + size_t iWrite; + size_t iOffset; ASSERT(pThis != NULL); ASSERT(pBuf != NULL); @@ -1163,35 +1165,19 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); - /* check if the to-be-written data is larger than our buffer size */ - if(lenBuf >= pThis->sIOBufSize) { - /* it is - so we do a direct write, that is most efficient. - * TODO: is it really? think about disk block sizes! - */ -printf("error: we should not have reached this code!\n"); - abort(); - CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ - CHKiRet(strmSchedWrite(pThis, pBuf, lenBuf)); - } else { - /* data fits into a buffer - we just need to see if it - * fits into the current buffer... - */ - if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { - /* nope, so we must split it */ - iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ - if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); - pThis->iBufPtr += iPartial; - } + iOffset = 0; + do { + if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ - memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); - pThis->iBufPtr = lenBuf - iPartial; - } else { - /* we have space, so we simply copy over the string */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); - pThis->iBufPtr += lenBuf; } - } + iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iWrite > lenBuf) + iWrite = lenBuf; + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite); + pThis->iBufPtr += iWrite; + iOffset += iWrite; + lenBuf -= iWrite; + } while(lenBuf > 0); finalize_it: if(pThis->bAsyncWrite) -- cgit From f53aa966e1fad03c478de342f5a878e57405de13 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 12:09:41 +0200 Subject: solved a race condition --- runtime/stream.c | 63 +++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 14 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 92122215..eae36796 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -50,6 +50,8 @@ #include "module-template.h" #include +#define inline + /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) @@ -267,6 +269,21 @@ finalize_it: } +/* wait for the output writer thread to be done. This must be called before actions + * that require data to be persisted. May be called in non-async mode and is a null + * operation than. Must be called with the mutex locked. + */ +static inline void +strmWaitAsyncWriterDone(strm_t *pThis) +{ + if(pThis->bAsyncWrite) { + /* awake writer thread and make it write out everything */ + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } +} + + /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. @@ -282,6 +299,8 @@ static rsRetVal strmCloseFile(strm_t *pThis) if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); + strmWaitAsyncWriterDone(pThis); + close(pThis->fd); pThis->fd = -1; @@ -618,16 +637,14 @@ finalize_it: /* stop the writer thread (we MUST be runnnig asynchronously when this method - * is called!) -- rgerhards, 2009-07-06 + * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06 */ static inline void stopWriter(strm_t *pThis) { BEGINfunc - d_pthread_mutex_lock(&pThis->mut); pThis->bStopWriter = 1; pthread_cond_signal(&pThis->notEmpty); - d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); d_pthread_mutex_unlock(&pThis->mut); pthread_join(pThis->writerThreadID, NULL); ENDfunc @@ -638,9 +655,14 @@ stopWriter(strm_t *pThis) BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(strm) + if(pThis->bAsyncWrite) + /* Note: mutex will be unlocked in stopWriter! */ + d_pthread_mutex_lock(&pThis->mut); + if(pThis->tOperationsMode != STREAMMODE_READ) strmFlush(pThis); +dbgprintf("XXX: destruct stream %p\n", pThis); /* ... then free resources */ if(pThis->fd != -1) strmCloseFile(pThis); @@ -681,6 +703,9 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) if(pThis->fd == -1) FINALIZE; + /* wait for output to be empty, so that our counts are correct */ + strmWaitAsyncWriterDone(pThis); + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); @@ -783,7 +808,6 @@ doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->iZipLevel) { CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); @@ -811,15 +835,15 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); +dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; - pThis->pIOBuf = pThis->asyncBuf[pThis->iEnq++ % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); - d_pthread_mutex_unlock(&pThis->mut); finalize_it: RETiRet; @@ -836,7 +860,6 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) DEFiRet; ASSERT(pThis != NULL); - ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->bAsyncWrite) { CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); @@ -844,6 +867,8 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ + finalize_it: RETiRet; } @@ -857,7 +882,6 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; - size_t iWritten; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -870,7 +894,7 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_lock(&pThis->mut); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { - pthread_cond_signal(&pThis->isEmpty); + pthread_cond_broadcast(&pThis->isEmpty); d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } @@ -878,13 +902,14 @@ asyncWriterThread(void *pPtr) } iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; - iWritten = pThis->asyncBuf[iDeq].lenBuf; - doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, iWritten); - // TODO: error check!!!!! 2009-07-06 + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); + // TODO: error check????? 2009-07-06 --pThis->iCnt; if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { pthread_cond_signal(&pThis->notFull); + if(pThis->iCnt == 0) + pthread_cond_broadcast(&pThis->isEmpty); } d_pthread_mutex_unlock(&pThis->mut); } @@ -949,7 +974,6 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iWritten = lenBuf; CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); - pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; /* update user counter, if provided */ if(pThis->pUsrWCntr != NULL) @@ -1158,7 +1182,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); ASSERT(pBuf != NULL); -DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); +//DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs); if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); @@ -1167,9 +1191,11 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n iOffset = 0; do { +//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } +//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1177,8 +1203,17 @@ DBGPRINTF("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; +//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); + /* now check if the buffer right at the end of the write is full and, if so, + * write it. This seems more natural than waiting (hours?) for the next message... + */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + } +//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); + finalize_it: if(pThis->bAsyncWrite) d_pthread_mutex_unlock(&pThis->mut); -- cgit From 26227091faac8c3cc9bc282eb4e4fc408635f8d2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 17:18:51 +0200 Subject: fixed a bug introduced today that lead to an abort in queue disk mode --- runtime/stream.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index eae36796..9f363257 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -276,11 +276,13 @@ finalize_it: static inline void strmWaitAsyncWriterDone(strm_t *pThis) { + BEGINfunc if(pThis->bAsyncWrite) { /* awake writer thread and make it write out everything */ pthread_cond_signal(&pThis->notEmpty); d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); } + ENDfunc } @@ -296,10 +298,16 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - if(pThis->tOperationsMode != STREAMMODE_READ) - strmFlush(pThis); - - strmWaitAsyncWriterDone(pThis); + dbgCallStackPrintAll(); + if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { + pThis->bInClose = 1; + if(pThis->bAsyncWrite) { + strmFlush(pThis); + } else { + strmWaitAsyncWriterDone(pThis); + } + pThis->bInClose = 0; + } close(pThis->fd); pThis->fd = -1; @@ -796,11 +804,6 @@ finalize_it: /* write memory buffer to a stream object. - * To support direct writes of large objects, this method may be called - * with a buffer pointing to some region other than the stream buffer itself. - * However, in that case the stream buffer must be empty (strmFlush() has to - * be called before), because we would otherwise mess up with the sequence - * inside the stream. -- rgerhards, 2008-01-10 */ static inline rsRetVal doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) -- cgit From 5221a1e42e16c8c39b48a4a1a18ee6322c38cd17 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 18:33:00 +0200 Subject: added capability to write incomplete buffers after an inactivity timeout for the stream class and thus finally activating omfile's timeout capability in a useful way without polling and too-high performance overhead. --- runtime/stream.c | 46 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 9f363257..a0571a61 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -298,7 +298,6 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - dbgCallStackPrintAll(); if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { pThis->bInClose = 1; if(pThis->bAsyncWrite) { @@ -845,6 +844,7 @@ dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf) pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */ if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -885,6 +885,8 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; + struct timespec t; + bool bTimedOut = 0; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -901,9 +903,35 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } - d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + if(bTimedOut && pThis->iBufPtr > 0) { +RUNLOG_STR("XXX: we had a timeout in stream writer"); + /* if we timed out, we need to flush pending data */ + strmFlush(pThis); + bTimedOut = 0; + continue; /* now we should have data */ + } + bTimedOut = 0; + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + if(pThis->bDoTimedWait) { + if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { + int err = errno; + if(err == ETIMEDOUT) { + bTimedOut = 1; + } else { + bTimedOut = 1; + char errStr[1024]; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", + err, errStr); + } + } + } else { + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } } + bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); // TODO: error check????? 2009-07-06 @@ -1194,11 +1222,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { -//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1206,7 +1232,6 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; -//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); /* now check if the buffer right at the end of the write is full and, if so, @@ -1215,11 +1240,18 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); finalize_it: - if(pThis->bAsyncWrite) + if(pThis->bAsyncWrite) { + if(pThis->bDoTimedWait == 0) { + /* we potentially have a partial buffer, so re-activate the + * writer thread that it can set and pick up timeouts. + */ + pThis->bDoTimedWait = 1; + pthread_cond_signal(&pThis->notEmpty); + } d_pthread_mutex_unlock(&pThis->mut); + } RETiRet; } -- cgit From 1d0806b9e3c6e83443c8daa9da8f25bd4df75f9b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 16 Jul 2009 13:51:52 +0200 Subject: calls to prctl() need to be based on configure results (cross-platform issue) This is for another prctl() call, not present in the beta version (looks like it would make sense to stick these into a utility function) --- runtime/stream.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index a0571a61..605a9771 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -48,7 +48,9 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" -#include +#if HAVE_SYS_PRCTL_H +# include +#endif #define inline @@ -891,9 +893,11 @@ asyncWriterThread(void *pPtr) ISOBJ_TYPE_assert(pThis, strm); BEGINfunc +# if HAVE_PRCTL && defined PR_SET_NAME if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } +#endif while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); @@ -904,7 +908,6 @@ asyncWriterThread(void *pPtr) goto finalize_it; /* break main loop */ } if(bTimedOut && pThis->iBufPtr > 0) { -RUNLOG_STR("XXX: we had a timeout in stream writer"); /* if we timed out, we need to flush pending data */ strmFlush(pThis); bTimedOut = 0; -- cgit From bfc3eaf23cae0ef8685fc25b71e701e2c4690509 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 18 Aug 2009 18:48:18 +0200 Subject: bugfix: potential segfault in output file writer (omfile) In async write mode, we use modular arithmetic to index the output buffer array. However, the counter variables accidently were signed, thus resulting in negative indizes after integer overflow. That in turn could lead to segfaults, but was depending on the memory layout of the instance in question (which in turn depended on a number of variables, like compile settings but also configuration). The counters are now unsigned (as they always should have been) and so the dangling mis-indexing does no longer happen. This bug potentially affected all installations, even if only some may actually have seen a segfault. --- runtime/stream.c | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 605a9771..a6ed70fe 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -833,6 +833,7 @@ finalize_it: * the very some producer comes back in sequence to submit the then-filled buffers. * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ +//#include static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) { -- cgit From 9bb9181572d445dd300546113fc617eb549866ba Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 18 Aug 2009 19:08:44 +0200 Subject: very minor cleanup --- runtime/stream.c | 1 - 1 file changed, 1 deletion(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index a6ed70fe..605a9771 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -833,7 +833,6 @@ finalize_it: * the very some producer comes back in sequence to submit the then-filled buffers. * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 */ -//#include static inline rsRetVal doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) { -- cgit