From 5221a1e42e16c8c39b48a4a1a18ee6322c38cd17 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Jul 2009 18:33:00 +0200 Subject: added capability to write incomplete buffers after an inactivity timeout for the stream class and thus finally activating omfile's timeout capability in a useful way without polling and too-high performance overhead. --- runtime/stream.c | 46 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c index 9f363257..a0571a61 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -298,7 +298,6 @@ static rsRetVal strmCloseFile(strm_t *pThis) ASSERT(pThis->fd != -1); dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); - dbgCallStackPrintAll(); if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { pThis->bInClose = 1; if(pThis->bAsyncWrite) { @@ -845,6 +844,7 @@ dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf) pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; + pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */ if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); @@ -885,6 +885,8 @@ static void* asyncWriterThread(void *pPtr) { int iDeq; + struct timespec t; + bool bTimedOut = 0; strm_t *pThis = (strm_t*) pPtr; ISOBJ_TYPE_assert(pThis, strm); @@ -901,9 +903,35 @@ asyncWriterThread(void *pPtr) d_pthread_mutex_unlock(&pThis->mut); goto finalize_it; /* break main loop */ } - d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + if(bTimedOut && pThis->iBufPtr > 0) { +RUNLOG_STR("XXX: we had a timeout in stream writer"); + /* if we timed out, we need to flush pending data */ + strmFlush(pThis); + bTimedOut = 0; + continue; /* now we should have data */ + } + bTimedOut = 0; + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + if(pThis->bDoTimedWait) { + if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { + int err = errno; + if(err == ETIMEDOUT) { + bTimedOut = 1; + } else { + bTimedOut = 1; + char errStr[1024]; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n", + err, errStr); + } + } + } else { + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } } + bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); // TODO: error check????? 2009-07-06 @@ -1194,11 +1222,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { -//dbgprintf("XXX: enter write loop\n"); if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: post strmFlush\n"); iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) iWrite = lenBuf; @@ -1206,7 +1232,6 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) pThis->iBufPtr += iWrite; iOffset += iWrite; lenBuf -= iWrite; -//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset); } while(lenBuf > 0); /* now check if the buffer right at the end of the write is full and, if so, @@ -1215,11 +1240,18 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->iBufPtr == pThis->sIOBufSize) { CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ } -//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf); finalize_it: - if(pThis->bAsyncWrite) + if(pThis->bAsyncWrite) { + if(pThis->bDoTimedWait == 0) { + /* we potentially have a partial buffer, so re-activate the + * writer thread that it can set and pick up timeouts. + */ + pThis->bDoTimedWait = 1; + pthread_cond_signal(&pThis->notEmpty); + } d_pthread_mutex_unlock(&pThis->mut); + } RETiRet; } -- cgit