diff options
Diffstat (limited to 'runtime/stream.c')
-rw-r--r-- | runtime/stream.c | 111 |
1 files changed, 106 insertions, 5 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index a4844d2b..8e2f87dc 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -150,6 +150,12 @@ static rsRetVal strmCloseFile(strm_t *pThis) close(pThis->fd); // TODO: error check pThis->fd = -1; + if(pThis->fdDir != -1) { + /* close associated directory handle, if it is open */ + close(pThis->fdDir); + pThis->fdDir = -1; + } + if(pThis->bDeleteOnClose) { unlink((char*) pThis->pszCurrFName); // TODO: check returncode } @@ -395,10 +401,11 @@ finalize_it: BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */ pThis->iCurrFNum = 1; pThis->fd = -1; + pThis->fdDir = -1; pThis->iUngetC = -1; pThis->sType = STREAMTYPE_FILE_SINGLE; pThis->sIOBufSize = glblGetIOBufSize(); - pThis->tOpenMode = 0600; /* TODO: make configurable */ + pThis->tOpenMode = 0600; ENDobjConstruct(strm) @@ -428,6 +435,19 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } + /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */ + if(pThis->bSync) { + pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY); + if(pThis->fdDir == -1) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + // TODO: log an error message? think so... + DBGPRINTF("error %d opening directory file for fsync() use - fsync for directory disabled: %s\n", + errno, errStr); + } + } + finalize_it: RETiRet; } @@ -478,6 +498,81 @@ finalize_it: } + +/* issue write() api calls until either the buffer is completely + * written or an error occured (it may happen that multiple writes + * are required, what is perfectly legal. On exit, *pLenBuf contains + * the number of bytes actually written. + * rgerhards, 2009-06-08 + */ +static rsRetVal +doWriteCall(int fd, uchar *pBuf, size_t *pLenBuf) +{ + ssize_t lenBuf; + ssize_t iTotalWritten; + ssize_t iWritten; + char *pWriteBuf; + DEFiRet; + + lenBuf = *pLenBuf; + pWriteBuf = (char*) pBuf; + iTotalWritten = 0; + do { + iWritten = write(fd, pWriteBuf, lenBuf); + if(iWritten < 0) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("log file (%d) write error %d: %s\n", fd, err, errStr); + if(err == EINTR) { + /*NO ERROR, just continue */; + } else { + ABORT_FINALIZE(RS_RET_ERR); + // TODO: cover more error cases! + } + } + /* advance buffer to next write position */ + iTotalWritten += iWritten; + lenBuf -= iWritten; + pWriteBuf += iWritten; + } while(lenBuf > 0); /* Warning: do..while()! */ + +finalize_it: + *pLenBuf -= iTotalWritten; + RETiRet; +} + + +/* sync the file to disk, so that any unwritten data is persisted. This + * also syncs the directory and thus makes sure that the file survives + * fatal failure. -- rgerhards, 2009-06-08 + */ +static rsRetVal +syncFile(strm_t *pThis) +{ + int ret; + DEFiRet; + + DBGPRINTF("syncing file %d\n", pThis->fd); + ret = fdatasync(pThis->fd); + if(ret != 0) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("sync failed for file %d with error (%d): %s - ignoring\n", + pThis->fd, err, errStr); + } + // TODO: check error! + + if(pThis->fdDir != -1) { + ret = fsync(pThis->fdDir); +dbgprintf("sync on dir (fd %d) requested, return code %d\n", pThis->fdDir, ret); + } + + RETiRet; +} + + /* physically write to the output file. the provided data is ready for * writing (e.g. zipped if we are requested to do that). * rgerhards, 2009-06-04 @@ -485,17 +580,17 @@ finalize_it: static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { + size_t iWritten; DEFiRet; - int iWritten; ASSERT(pThis != NULL); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); - iWritten = write(pThis->fd, pBuf, lenBuf); - dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, iWritten); - /* TODO: handle error case -- rgerhards, 2008-01-07 */ + iWritten = lenBuf; + CHKiRet(doWriteCall(pThis->fd, pBuf, &iWritten)); + dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); /* Now indicate buffer empty again. We do this in any case, because there * is no way we could react more intelligently to an error during write. @@ -511,6 +606,10 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->pUsrWCntr != NULL) *pThis->pUsrWCntr += iWritten; + if(pThis->bSync) { + CHKiRet(syncFile(pThis)); + } + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) CHKiRet(strmCheckNextOutputFile(pThis)); @@ -759,6 +858,7 @@ DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) DEFpropSetMeth(strm, iZipLevel, int) +DEFpropSetMeth(strm, bSync, int) DEFpropSetMeth(strm, sIOBufSize, size_t) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) @@ -1040,6 +1140,7 @@ CODESTARTobjQueryInterface(strm) pIf->SettOpenMode = strmSettOpenMode; pIf->SetsType = strmSetsType; pIf->SetiZipLevel = strmSetiZipLevel; + pIf->SetbSync = strmSetbSync; pIf->SetsIOBufSize = strmSetsIOBufSize; finalize_it: ENDobjQueryInterface(strm) |