summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-06-08 18:39:06 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-06-08 18:39:06 +0200
commit8d1e2e496c6a4a4d40d1e8604c746e0d32013536 (patch)
treef98dfe6ae70878b49135de308703f9096c0b394b /runtime
parent84b786753744d9f7a002665571a428283feae248 (diff)
downloadrsyslog-8d1e2e496c6a4a4d40d1e8604c746e0d32013536.tar.gz
rsyslog-8d1e2e496c6a4a4d40d1e8604c746e0d32013536.tar.xz
rsyslog-8d1e2e496c6a4a4d40d1e8604c746e0d32013536.zip
improved error handling and added fsync() support
... restoring missing functionality after the restructuring of imfile. As a side-effect, this also lays the foundation for even more reliable queue engine operations (but this is not yet done).
Diffstat (limited to 'runtime')
-rw-r--r--runtime/stream.c111
-rw-r--r--runtime/stream.h5
2 files changed, 110 insertions, 6 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index a4844d2b..8e2f87dc 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -150,6 +150,12 @@ static rsRetVal strmCloseFile(strm_t *pThis)
close(pThis->fd); // TODO: error check
pThis->fd = -1;
+ if(pThis->fdDir != -1) {
+ /* close associated directory handle, if it is open */
+ close(pThis->fdDir);
+ pThis->fdDir = -1;
+ }
+
if(pThis->bDeleteOnClose) {
unlink((char*) pThis->pszCurrFName); // TODO: check returncode
}
@@ -395,10 +401,11 @@ finalize_it:
BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */
pThis->iCurrFNum = 1;
pThis->fd = -1;
+ pThis->fdDir = -1;
pThis->iUngetC = -1;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
- pThis->tOpenMode = 0600; /* TODO: make configurable */
+ pThis->tOpenMode = 0600;
ENDobjConstruct(strm)
@@ -428,6 +435,19 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
}
+ /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */
+ if(pThis->bSync) {
+ pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY);
+ if(pThis->fdDir == -1) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ // TODO: log an error message? think so...
+ DBGPRINTF("error %d opening directory file for fsync() use - fsync for directory disabled: %s\n",
+ errno, errStr);
+ }
+ }
+
finalize_it:
RETiRet;
}
@@ -478,6 +498,81 @@ finalize_it:
}
+
+/* issue write() api calls until either the buffer is completely
+ * written or an error occured (it may happen that multiple writes
+ * are required, what is perfectly legal. On exit, *pLenBuf contains
+ * the number of bytes actually written.
+ * rgerhards, 2009-06-08
+ */
+static rsRetVal
+doWriteCall(int fd, uchar *pBuf, size_t *pLenBuf)
+{
+ ssize_t lenBuf;
+ ssize_t iTotalWritten;
+ ssize_t iWritten;
+ char *pWriteBuf;
+ DEFiRet;
+
+ lenBuf = *pLenBuf;
+ pWriteBuf = (char*) pBuf;
+ iTotalWritten = 0;
+ do {
+ iWritten = write(fd, pWriteBuf, lenBuf);
+ if(iWritten < 0) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("log file (%d) write error %d: %s\n", fd, err, errStr);
+ if(err == EINTR) {
+ /*NO ERROR, just continue */;
+ } else {
+ ABORT_FINALIZE(RS_RET_ERR);
+ // TODO: cover more error cases!
+ }
+ }
+ /* advance buffer to next write position */
+ iTotalWritten += iWritten;
+ lenBuf -= iWritten;
+ pWriteBuf += iWritten;
+ } while(lenBuf > 0); /* Warning: do..while()! */
+
+finalize_it:
+ *pLenBuf -= iTotalWritten;
+ RETiRet;
+}
+
+
+/* sync the file to disk, so that any unwritten data is persisted. This
+ * also syncs the directory and thus makes sure that the file survives
+ * fatal failure. -- rgerhards, 2009-06-08
+ */
+static rsRetVal
+syncFile(strm_t *pThis)
+{
+ int ret;
+ DEFiRet;
+
+ DBGPRINTF("syncing file %d\n", pThis->fd);
+ ret = fdatasync(pThis->fd);
+ if(ret != 0) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("sync failed for file %d with error (%d): %s - ignoring\n",
+ pThis->fd, err, errStr);
+ }
+ // TODO: check error!
+
+ if(pThis->fdDir != -1) {
+ ret = fsync(pThis->fdDir);
+dbgprintf("sync on dir (fd %d) requested, return code %d\n", pThis->fdDir, ret);
+ }
+
+ RETiRet;
+}
+
+
/* physically write to the output file. the provided data is ready for
* writing (e.g. zipped if we are requested to do that).
* rgerhards, 2009-06-04
@@ -485,17 +580,17 @@ finalize_it:
static rsRetVal
strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
+ size_t iWritten;
DEFiRet;
- int iWritten;
ASSERT(pThis != NULL);
if(pThis->fd == -1)
CHKiRet(strmOpenFile(pThis));
- iWritten = write(pThis->fd, pBuf, lenBuf);
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, iWritten);
- /* TODO: handle error case -- rgerhards, 2008-01-07 */
+ iWritten = lenBuf;
+ CHKiRet(doWriteCall(pThis->fd, pBuf, &iWritten));
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
/* Now indicate buffer empty again. We do this in any case, because there
* is no way we could react more intelligently to an error during write.
@@ -511,6 +606,10 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->pUsrWCntr != NULL)
*pThis->pUsrWCntr += iWritten;
+ if(pThis->bSync) {
+ CHKiRet(syncFile(pThis));
+ }
+
if(pThis->sType == STREAMTYPE_FILE_CIRCULAR)
CHKiRet(strmCheckNextOutputFile(pThis));
@@ -759,6 +858,7 @@ DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
DEFpropSetMeth(strm, iZipLevel, int)
+DEFpropSetMeth(strm, bSync, int)
DEFpropSetMeth(strm, sIOBufSize, size_t)
static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
@@ -1040,6 +1140,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SettOpenMode = strmSettOpenMode;
pIf->SetsType = strmSetsType;
pIf->SetiZipLevel = strmSetiZipLevel;
+ pIf->SetbSync = strmSetbSync;
pIf->SetsIOBufSize = strmSetsIOBufSize;
finalize_it:
ENDobjQueryInterface(strm)
diff --git a/runtime/stream.h b/runtime/stream.h
index 7eb386fb..a66108b7 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -103,10 +103,12 @@ typedef struct strm_s {
int64 iCurrOffs;/* current offset */
int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since the last CntrSet() */
/* dynamic properties, valid only during file open, not to be persistet */
- size_t sIOBufSize;/* size of IO buffer */
+ int bSync; /* sync this file after every write? */
+ size_t sIOBufSize;/* size of IO buffer */
uchar *pszDir; /* Directory */
int lenDir;
int fd; /* the file descriptor, -1 if closed */
+ int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */
uchar *pszCurrFName; /* name of current file (if open) */
uchar *pIOBuf; /* io Buffer */
size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
@@ -148,6 +150,7 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
INTERFACEpropSetMeth(strm, tOpenMode, mode_t);
INTERFACEpropSetMeth(strm, sType, strmType_t);
INTERFACEpropSetMeth(strm, iZipLevel, int);
+ INTERFACEpropSetMeth(strm, bSync, int);
INTERFACEpropSetMeth(strm, sIOBufSize, size_t);
ENDinterface(strm)
#define strmCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */