diff options
Diffstat (limited to 'runtime/stream.c')
-rw-r--r-- | runtime/stream.c | 147 |
1 files changed, 100 insertions, 47 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index 36f44003..e8805a40 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -67,7 +67,7 @@ DEFobjStaticHelpers DEFobjCurrIf(zlibw) /* forward definitions */ -static rsRetVal strmFlush(strm_t *pThis); +static rsRetVal strmFlushInternal(strm_t *pThis); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); @@ -163,7 +163,7 @@ doSizeLimitProcessing(strm_t *pThis) ASSERT(pThis->fd != -1); if(pThis->iCurrOffs >= pThis->iSizeLimit) { - /* strmClosefile() destroys the current file name, so we + /* strmCloseFile() destroys the current file name, so we * need to preserve it. */ CHKmalloc(pszCurrFName = ustrdup(pThis->pszCurrFName)); @@ -220,7 +220,7 @@ doPhysOpen(strm_t *pThis) char errStr[1024]; int err = errno; rs_strerror_r(err, errStr, sizeof(errStr)); - dbgoprint((obj_t*) pThis, "open error %d, file '%s': %s\n", errno, pThis->pszCurrFName, errStr); + DBGOPRINT((obj_t*) pThis, "open error %d, file '%s': %s\n", errno, pThis->pszCurrFName, errStr); if(err == ENOENT) ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); else @@ -278,7 +278,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) pThis->iCurrOffs = offset; } - dbgoprint((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName, + DBGOPRINT((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName, (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd); finalize_it: @@ -296,8 +296,10 @@ strmWaitAsyncWriterDone(strm_t *pThis) BEGINfunc if(pThis->bAsyncWrite) { /* awake writer thread and make it write out everything */ - pthread_cond_signal(&pThis->notEmpty); - d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + while(pThis->iCnt > 0) { + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } } ENDfunc } @@ -306,27 +308,33 @@ strmWaitAsyncWriterDone(strm_t *pThis) /* close a strm file * Note that the bDeleteOnClose flag is honored. If it is set, the file will be * deleted after close. This is in support for the qRead thread. + * Note: it is valid to call this function when the physical file is closed. If so, + * strmCloseFile() will still check if there is any unwritten data inside buffers + * (this may be the case) and, if so, will open the file, write the data, and then + * close it again (this is done via strmFlushInternal and friends). */ static rsRetVal strmCloseFile(strm_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - ASSERT(pThis->fd != -1); - dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd); + DBGOPRINT((obj_t*) pThis, "file %d(%s) closing\n", pThis->fd, + (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName); - if(!pThis->bInClose && pThis->tOperationsMode != STREAMMODE_READ) { - pThis->bInClose = 1; + if(pThis->tOperationsMode != STREAMMODE_READ) { + strmFlushInternal(pThis); if(pThis->bAsyncWrite) { - strmFlush(pThis); - } else { strmWaitAsyncWriterDone(pThis); } - pThis->bInClose = 0; } - close(pThis->fd); - pThis->fd = -1; + /* the file may already be closed (or never have opened), so guard + * against this. -- rgerhards, 2010-03-19 + */ + if(pThis->fd != -1) { + close(pThis->fd); + pThis->fd = -1; + } if(pThis->fdDir != -1) { /* close associated directory handle, if it is open */ @@ -441,7 +449,7 @@ strmHandleEOF(strm_t *pThis) case STREAMTYPE_FILE_CIRCULAR: /* we have multiple files and need to switch to the next one */ /* TODO: think about emulating EOF in this case (not yet needed) */ - dbgoprint((obj_t*) pThis, "file %d EOF\n", pThis->fd); + DBGOPRINT((obj_t*) pThis, "file %d EOF\n", pThis->fd); CHKiRet(strmNextFile(pThis)); break; case STREAMTYPE_FILE_MONITOR: @@ -473,7 +481,7 @@ strmReadBuf(strm_t *pThis) */ CHKiRet(strmOpenFile(pThis)); iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); - dbgoprint((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead); + DBGOPRINT((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead); if(iLenRead == 0) { CHKiRet(strmHandleEOF(pThis)); } else if(iLenRead < 0) @@ -505,7 +513,7 @@ static rsRetVal strmReadChar(strm_t *pThis, uchar *pC) ASSERT(pThis != NULL); ASSERT(pC != NULL); - /* DEV debug only: dbgoprint((obj_t*) pThis, "strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ + /* DEV debug only: DBGOPRINT((obj_t*) pThis, "strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ *pC = pThis->iUngetC; ++pThis->iCurrOffs; /* one more octet read */ @@ -617,11 +625,11 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) * to make sure we can write out everything with a SINGLE api call! * We add another 128 bytes to take care of the gzip header and "all eventualities". */ - CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize + 128)); + CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * (pThis->sIOBufSize + 128))); } } - /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */ + /* if we are set to sync, we must obtain a file handle to the directory for fsync() purposes */ if(pThis->bSync && !pThis->bIsTTY) { pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY | O_CLOEXEC | O_NOCTTY); if(pThis->fdDir == -1) { @@ -633,6 +641,9 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } + DBGPRINTF("file stream %s params: flush interval %d, async write %d\n", + (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName, + pThis->iFlushInterval, pThis->bAsyncWrite); /* if we have a flush interval, we need to do async writes in any case */ if(pThis->iFlushInterval != 0) { pThis->bAsyncWrite = 1; @@ -685,8 +696,10 @@ CODESTARTobjDestruct(strm) /* Note: mutex will be unlocked in stopWriter! */ d_pthread_mutex_lock(&pThis->mut); - if(pThis->tOperationsMode != STREAMMODE_READ) - strmFlush(pThis); + /* strmClose() will handle read-only files as well as need to open + * files that have unwritten buffers. -- rgerhards, 2010-03-09 + */ + strmCloseFile(pThis); if(pThis->bAsyncWrite) { stopWriter(pThis); @@ -705,14 +718,11 @@ CODESTARTobjDestruct(strm) * IMPORTANT: we MUST free this only AFTER the ansyncWriter has been stopped, else * we get random errors... */ - if(pThis->fd != -1) - strmCloseFile(pThis); - free(pThis->pszDir); free(pThis->pZipBuf); free(pThis->pszCurrFName); free(pThis->pszFName); - + pThis->bStopWriter = 2; /* RG: use as flag for destruction */ ENDobjDestruct(strm) @@ -732,7 +742,7 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) strmWaitAsyncWriterDone(pThis); if(pThis->iCurrOffs >= pThis->iMaxFileSize) { - dbgoprint((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", + DBGOPRINT((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); CHKiRet(strmNextFile(pThis)); } @@ -790,6 +800,7 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) if(iWritten < 0) { char errStr[1024]; int err = errno; + iWritten = 0; /* we have written NO bytes! */ rs_strerror_r(err, errStr, sizeof(errStr)); DBGPRINTF("log file (%d) write error %d: %s\n", pThis->fd, err, errStr); if(err == EINTR) { @@ -811,7 +822,7 @@ doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) pWriteBuf += iWritten; } while(lenBuf > 0); /* Warning: do..while()! */ - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + DBGOPRINT((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); finalize_it: *pLenBuf = iTotalWritten; @@ -855,7 +866,8 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); - while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) + /* the -1 below is important, because we need one buffer for the main thread! */ + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS - 1) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; @@ -880,13 +892,22 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) ASSERT(pThis != NULL); + /* we need to reset the buffer pointer BEFORE calling the actual write + * function. Otherwise, in circular mode, the write function will + * potentially close the file, then close will flush and as the + * buffer pointer is nonzero, will re-call into this code here. In + * the end result, we than have a problem (and things are screwed + * up). So we reset the buffer pointer first, and all this can + * not happen. It is safe to do so, because that pointer is NOT + * used inside the write functions. -- rgerhads, 2010-03-10 + */ + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ if(pThis->bAsyncWrite) { CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); } else { CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); } - pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ finalize_it: RETiRet; @@ -911,10 +932,11 @@ asyncWriterThread(void *pPtr) if(prctl(PR_SET_NAME, "rs:asyn strmwr", 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); } -#endif +# endif while(1) { /* loop broken inside */ d_pthread_mutex_lock(&pThis->mut); +dbgprintf("XXX: asyncWriterThread iterating %s\n", pThis->pszFName); while(pThis->iCnt == 0) { if(pThis->bStopWriter) { pthread_cond_broadcast(&pThis->isEmpty); @@ -923,13 +945,14 @@ asyncWriterThread(void *pPtr) } if(bTimedOut && pThis->iBufPtr > 0) { /* if we timed out, we need to flush pending data */ - strmFlush(pThis); + strmFlushInternal(pThis); bTimedOut = 0; continue; /* now we should have data */ } bTimedOut = 0; - timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ + timeoutComp(&t, pThis->iFlushInterval * 2000); /* *1000 millisconds */ // TODO: check the 2000?!? if(pThis->bDoTimedWait) { +dbgprintf("asyncWriter thread going to timeout sleep\n"); if(pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t) != 0) { int err = errno; if(err == ETIMEDOUT) { @@ -943,13 +966,16 @@ asyncWriterThread(void *pPtr) } } } else { +dbgprintf("asyncWriter thread going to eternal sleep\n"); d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); } +dbgprintf("asyncWriter woke up\n"); } bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; +dbgprintf("asyncWriter writes data\n"); doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); // TODO: error check????? 2009-07-06 @@ -1058,10 +1084,6 @@ finalize_it: * add a config switch so that the user can decide the risk he is ready * to take, but so far this is not yet implemented (not even requested ;)). * rgerhards, 2009-06-04 - * For the time being, we take a very conservative approach and do not run this - * method multithreaded. This is done in an effort to solve a segfault condition - * that seems to be related to the zip code. -- rgerhards, 2009-09-22 - * TODO: make multithreaded again! */ static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -1120,12 +1142,14 @@ finalize_it: * rgerhards, 2008-01-10 */ static rsRetVal -strmFlush(strm_t *pThis) +strmFlushInternal(strm_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr); + DBGOPRINT((obj_t*) pThis, "file %d(%s) flush, buflen %ld%s\n", pThis->fd, + (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName, + (long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : ""); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); @@ -1135,6 +1159,31 @@ strmFlush(strm_t *pThis) } +/* flush stream output buffer to persistent storage. This can be called at any time + * and is automatically called when the output buffer is full. This function is for + * use by EXTERNAL callers. Do NOT use it internally. It locks the async writer + * mutex if ther is need to do so. + * rgerhards, 2010-03-18 + */ +static rsRetVal +strmFlush(strm_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + CHKiRet(strmFlushInternal(pThis)); + +finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + + RETiRet; +} + + /* seek a stream to a specific location. Pending writes are flushed, read data * is invalidated. * rgerhards, 2008-01-12 @@ -1148,9 +1197,9 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs) if(pThis->fd == -1) strmOpenFile(pThis); else - strmFlush(pThis); + strmFlushInternal(pThis); int i; - dbgoprint((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs); + DBGOPRINT((obj_t*) pThis, "file %d seek, pos %ld\n", pThis->fd, (long) offs); i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error! pThis->iCurrOffs = offs; /* we are now at *this* offset */ pThis->iBufPtr = 0; /* buffer invalidated */ @@ -1189,7 +1238,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); + CHKiRet(strmFlushInternal(pThis)); } /* we now always have space for one character, so we simply copy it */ *(pThis->pIOBuf + pThis->iBufPtr) = c; @@ -1233,6 +1282,11 @@ finalize_it: * caller-provided buffer is larger than our one. So instead of optimizing a case * which normally does not exist, we expect some degradation in its case but make us * perform better in the regular cases. -- rgerhards, 2009-07-07 + * Note: the pThis->iBufPtr == pThis->sIOBufSize logic below looks a bit like an + * on-off error. In fact, it is not, because iBufPtr always points to the next + * *free* byte in the buffer. So if it is sIOBufSize - 1, there actually is one + * free byte left. This came up during a code walkthrough and was considered + * worth nothing. -- rgerhards, 2010-03-10 */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) @@ -1254,7 +1308,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ } iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) @@ -1269,7 +1323,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) * write it. This seems more natural than waiting (hours?) for the next message... */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ } finalize_it: @@ -1357,8 +1411,7 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) if(iLenDir < 1) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); - if((pThis->pszDir = malloc(sizeof(uchar) * iLenDir + 1)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + CHKmalloc(pThis->pszDir = malloc(sizeof(uchar) * iLenDir + 1)); memcpy(pThis->pszDir, pszDir, iLenDir + 1); /* always think about the \0! */ pThis->lenDir = iLenDir; @@ -1429,7 +1482,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) ISOBJ_TYPE_assert(pThis, strm); ISOBJ_TYPE_assert(pStrm, strm); - strmFlush(pThis); + strmFlushInternal(pThis); CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis)); objSerializeSCALAR(pStrm, iCurrFNum, INT); |