summaryrefslogtreecommitdiffstats
path: root/runtime/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/stream.c')
-rw-r--r--runtime/stream.c111
1 files changed, 106 insertions, 5 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index a4844d2b..8e2f87dc 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -150,6 +150,12 @@ static rsRetVal strmCloseFile(strm_t *pThis)
close(pThis->fd); // TODO: error check
pThis->fd = -1;
+ if(pThis->fdDir != -1) {
+ /* close associated directory handle, if it is open */
+ close(pThis->fdDir);
+ pThis->fdDir = -1;
+ }
+
if(pThis->bDeleteOnClose) {
unlink((char*) pThis->pszCurrFName); // TODO: check returncode
}
@@ -395,10 +401,11 @@ finalize_it:
BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */
pThis->iCurrFNum = 1;
pThis->fd = -1;
+ pThis->fdDir = -1;
pThis->iUngetC = -1;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
- pThis->tOpenMode = 0600; /* TODO: make configurable */
+ pThis->tOpenMode = 0600;
ENDobjConstruct(strm)
@@ -428,6 +435,19 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
+ /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */
+ if(pThis->bSync) {
+ pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY);
+ if(pThis->fdDir == -1) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ // TODO: log an error message? think so...
+ DBGPRINTF("error %d opening directory file for fsync() use - fsync for directory disabled: %s\n",
+ errno, errStr);
+ }
+ }
+
finalize_it:
RETiRet;
}
@@ -478,6 +498,81 @@ finalize_it:
}
+
+/* issue write() api calls until either the buffer is completely
+ * written or an error occured (it may happen that multiple writes
+ * are required, what is perfectly legal. On exit, *pLenBuf contains
+ * the number of bytes actually written.
+ * rgerhards, 2009-06-08
+ */
+static rsRetVal
+doWriteCall(int fd, uchar *pBuf, size_t *pLenBuf)
+{
+ ssize_t lenBuf;
+ ssize_t iTotalWritten;
+ ssize_t iWritten;
+ char *pWriteBuf;
+ DEFiRet;
+
+ lenBuf = *pLenBuf;
+ pWriteBuf = (char*) pBuf;
+ iTotalWritten = 0;
+ do {
+ iWritten = write(fd, pWriteBuf, lenBuf);
+ if(iWritten < 0) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("log file (%d) write error %d: %s\n", fd, err, errStr);
+ if(err == EINTR) {
+ /*NO ERROR, just continue */;
+ } else {
+ ABORT_FINALIZE(RS_RET_ERR);
+ // TODO: cover more error cases!
+ }
+ }
+ /* advance buffer to next write position */
+ iTotalWritten += iWritten;
+ lenBuf -= iWritten;
+ pWriteBuf += iWritten;
+ } while(lenBuf > 0); /* Warning: do..while()! */
+
+finalize_it:
+ *pLenBuf -= iTotalWritten;
+ RETiRet;
+}
+
+
+/* sync the file to disk, so that any unwritten data is persisted. This
+ * also syncs the directory and thus makes sure that the file survives
+ * fatal failure. -- rgerhards, 2009-06-08
+ */
+static rsRetVal
+syncFile(strm_t *pThis)
+{
+ int ret;
+ DEFiRet;
+
+ DBGPRINTF("syncing file %d\n", pThis->fd);
+ ret = fdatasync(pThis->fd);
+ if(ret != 0) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("sync failed for file %d with error (%d): %s - ignoring\n",
+ pThis->fd, err, errStr);
+ }
+ // TODO: check error!
+
+ if(pThis->fdDir != -1) {
+ ret = fsync(pThis->fdDir);
+dbgprintf("sync on dir (fd %d) requested, return code %d\n", pThis->fdDir, ret);
+ }
+
+ RETiRet;
+}
+
+
/* physically write to the output file. the provided data is ready for
* writing (e.g. zipped if we are requested to do that).
* rgerhards, 2009-06-04
@@ -485,17 +580,17 @@ finalize_it:
static rsRetVal
strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
+ size_t iWritten;
DEFiRet;
- int iWritten;
ASSERT(pThis != NULL);
if(pThis->fd == -1)
CHKiRet(strmOpenFile(pThis));
- iWritten = write(pThis->fd, pBuf, lenBuf);
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, iWritten);
- /* TODO: handle error case -- rgerhards, 2008-01-07 */
+ iWritten = lenBuf;
+ CHKiRet(doWriteCall(pThis->fd, pBuf, &iWritten));
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
/* Now indicate buffer empty again. We do this in any case, because there
* is no way we could react more intelligently to an error during write.
@@ -511,6 +606,10 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->pUsrWCntr != NULL)
*pThis->pUsrWCntr += iWritten;
+ if(pThis->bSync) {
+ CHKiRet(syncFile(pThis));
+ }
+
if(pThis->sType == STREAMTYPE_FILE_CIRCULAR)
CHKiRet(strmCheckNextOutputFile(pThis));
@@ -759,6 +858,7 @@ DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
DEFpropSetMeth(strm, iZipLevel, int)
+DEFpropSetMeth(strm, bSync, int)
DEFpropSetMeth(strm, sIOBufSize, size_t)
static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
@@ -1040,6 +1140,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SettOpenMode = strmSettOpenMode;
pIf->SetsType = strmSetsType;
pIf->SetiZipLevel = strmSetiZipLevel;
+ pIf->SetbSync = strmSetbSync;
pIf->SetsIOBufSize = strmSetsIOBufSize;
finalize_it:
ENDobjQueryInterface(strm)