From 5221a1e42e16c8c39b48a4a1a18ee6322c38cd17 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 18:33:00 +0200 Subject: added capability to write incomplete buffers after an inactivity timeout for the stream class and thus finally activating omfile's timeout capability in a useful way without polling and too-high performance overhead. --- runtime/stream.c | 46 +++++++++++++++++++++++++++++++++++++++------- runtime/stream.h | 1 + 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/runtime/stream.c b/runtime/stream.c index 9f363257..a0571a61 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -298,7 +298,6 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - dbgCallStackPrintAll(); if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { pThis->bInClose = 1; if(pThis->bAsyncWrite) { @@ -845,6 +844,7 @@ dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf) pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */ if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -885,6 +885,8 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; + struct timespec t; + bool bTimedOut = 0; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -901,9 +903,35 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } - d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + if(bTimedOut && pThis->iBufPtr > 0) { +RUNLOG_STR("XXX: we had a timeout in stream writer"); + /* if we timed out, we need to flush pending data */ + strmFlush(pThis); + bTimedOut = 0; + continue; /* now we should have data */ + } + bTimedOut = 0; + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + if(pThis->bDoTimedWait) { + if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { + int err = errno; + if(err == ETIMEDOUT) { + bTimedOut = 1; + } else { + bTimedOut = 1; + char errStr[1024]; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", + err, errStr); + } + } + } else { + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } } + bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); // TODO: error check????? 2009-07-06 @@ -1194,11 +1222,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { -//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1206,7 +1232,6 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; -//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); /* now check if the buffer right at the end of the write is full and, if so, @@ -1215,11 +1240,18 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); finalize_it: - if(pThis->bAsyncWrite) + if(pThis->bAsyncWrite) { + if(pThis->bDoTimedWait == 0) { + /* we potentially have a partial buffer, so re-activate the + * writer thread that it can set and pick up timeouts. + */ + pThis->bDoTimedWait = 1; + pthread_cond_signal(&pThis->notEmpty); + } d_pthread_mutex_unlock(&pThis->mut); + } RETiRet; } diff --git a/runtime/stream.h b/runtime/stream.h index 1efd29b5..cb368835 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -124,6 +124,7 @@ typedef struct strm_s { /* support for async flush procesing */ bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */ bool bStopWriter; /* shall writer thread terminate? */ + bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */ int iFlushInterval; /* flush in which interval - 0, no flushing */ apc_id_t apcID; /* id of current Apc request (used for cancelling) */ pthread_mutex_t mut;/* mutex for flush in async mode */ -- cgit