summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/stream.c46
-rw-r--r--runtime/stream.h1
2 files changed, 40 insertions, 7 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 9f363257..a0571a61 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -298,7 +298,6 @@ static rsRetVal strmCloseFile(strm_t *pThis)
ASSERT(pThis->fd != -1);
dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd);
- dbgCallStackPrintAll();
if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) {
pThis->bInClose = 1;
if(pThis->bAsyncWrite) {
@@ -845,6 +844,7 @@ dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf)
pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf;
pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf;
+ pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */
if(++pThis->iCnt == 1)
pthread_cond_signal(&pThis->notEmpty);
@@ -885,6 +885,8 @@ static void*
asyncWriterThread(void *pPtr)
{
int iDeq;
+ struct timespec t;
+ bool bTimedOut = 0;
strm_t *pThis = (strm_t*) pPtr;
ISOBJ_TYPE_assert(pThis, strm);
@@ -901,9 +903,35 @@ asyncWriterThread(void *pPtr)
d_pthread_mutex_unlock(&pThis->mut);
goto finalize_it; /* break main loop */
}
- d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
+ if(bTimedOut && pThis->iBufPtr > 0) {
+RUNLOG_STR("XXX: we had a timeout in stream writer");
+ /* if we timed out, we need to flush pending data */
+ strmFlush(pThis);
+ bTimedOut = 0;
+ continue; /* now we should have data */
+ }
+ bTimedOut = 0;
+ timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */
+ if(pThis->bDoTimedWait) {
+ if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) {
+ int err = errno;
+ if(err == ETIMEDOUT) {
+ bTimedOut = 1;
+ } else {
+ bTimedOut = 1;
+ char errStr[1024];
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("stream async writer timeout with error (%d): %s - ignoring\n",
+ err, errStr);
+ }
+ }
+ } else {
+ d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut);
+ }
}
+ bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
+
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
// TODO: error check????? 2009-07-06
@@ -1194,11 +1222,9 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iOffset = 0;
do {
-//dbgprintf("XXX: enter write loop\n");
if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
}
-//dbgprintf("XXX: post strmFlush\n");
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
iWrite = lenBuf;
@@ -1206,7 +1232,6 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
pThis->iBufPtr += iWrite;
iOffset += iWrite;
lenBuf -= iWrite;
-//dbgprintf("XXX: after write , iBufPtr %d, iOffset %d, len %d, unwritten: '%20.20s'\n", pThis->iBufPtr, iOffset, lenBuf, pBuf + iOffset);
} while(lenBuf > 0);
/* now check if the buffer right at the end of the write is full and, if so,
@@ -1215,11 +1240,18 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
}
-//dbgprintf("XXX: done write loop, iBufPtr %d, iOffset %d, len %d\n", pThis->iBufPtr, iOffset, lenBuf);
finalize_it:
- if(pThis->bAsyncWrite)
+ if(pThis->bAsyncWrite) {
+ if(pThis->bDoTimedWait == 0) {
+ /* we potentially have a partial buffer, so re-activate the
+ * writer thread that it can set and pick up timeouts.
+ */
+ pThis->bDoTimedWait = 1;
+ pthread_cond_signal(&pThis->notEmpty);
+ }
d_pthread_mutex_unlock(&pThis->mut);
+ }
RETiRet;
}
diff --git a/runtime/stream.h b/runtime/stream.h
index 1efd29b5..cb368835 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -124,6 +124,7 @@ typedef struct strm_s {
/* support for async flush procesing */
bool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
bool bStopWriter; /* shall writer thread terminate? */
+ bool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
int iFlushInterval; /* flush in which interval - 0, no flushing */
apc_id_t apcID; /* id of current Apc request (used for cancelling) */
pthread_mutex_t mut;/* mutex for flush in async mode */