summaryrefslogtreecommitdiffstats
path: root/runtime/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/stream.c')
-rw-r--r--runtime/stream.c687
1 files changed, 573 insertions, 114 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 59e8be3a..372a3a5f 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -1,4 +1,3 @@
-//TODO: O_TRUC mode!
/* The serial stream class.
*
* A serial stream provides serial data access. In theory, serial streams
@@ -7,8 +6,9 @@
* "driver").
*
* File begun on 2008-01-09 by RGerhards
+ * Large modifications in 2009-06 to support using it with omfile, including zip writer.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -39,6 +39,7 @@
#include <unistd.h>
#include <sys/stat.h> /* required for HP UX */
#include <errno.h>
+#include <pthread.h>
#include "rsyslog.h"
#include "stringbuf.h"
@@ -46,18 +47,191 @@
#include "obj.h"
#include "stream.h"
#include "unicode-helper.h"
+#include "module-template.h"
+#include "apc.h"
/* static data */
DEFobjStaticHelpers
+DEFobjCurrIf(zlibw)
+DEFobjCurrIf(apc)
+
+/* forward definitions */
+static rsRetVal strmFlush(strm_t *pThis);
+static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal strmCloseFile(strm_t *pThis);
+
/* methods */
-/* first, we define type-specific handlers. The provide a generic functionality,
+/* async flush apc handler
+ */
+static void
+flushApc(void *param1, void __attribute__((unused)) *param2)
+{
+ DEFVARS_mutexProtection_uncond;
+ strm_t *pThis = (strm_t*) param1;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ strmFlush(pThis);
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+}
+
+
+/* Try to resolve a size limit situation. This is used to support custom-file size handlers
+ * for omfile. It first runs the command, and then checks if we are still above the size
+ * treshold. Note that this works only with single file names, NOT with circular names.
+ * Note that pszCurrFName can NOT be taken from pThis, because the stream is closed when
+ * we are called (and that destroys pszCurrFName, as there is NO CURRENT file name!). So
+ * we need to receive the name as a parameter.
+ * initially wirtten 2005-06-21, moved to this class & updates 2009-06-01, both rgerhards
+ */
+static rsRetVal
+resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName)
+{
+ uchar *pParams;
+ uchar *pCmd;
+ uchar *p;
+ off_t actualFileSize;
+ rsRetVal localRet;
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
+ assert(pszCurrFName != NULL);
+
+ if(pThis->pszSizeLimitCmd == NULL) {
+ ABORT_FINALIZE(RS_RET_NON_SIZELIMITCMD); /* nothing we can do in this case... */
+ }
+
+ /* we first check if we have command line parameters. We assume this,
+ * when we have a space in the program name. If we find it, everything after
+ * the space is treated as a single argument.
+ */
+ CHKmalloc(pCmd = ustrdup(pThis->pszSizeLimitCmd));
+
+ for(p = pCmd ; *p && *p != ' ' ; ++p) {
+ /* JUST SKIP */
+ }
+
+ if(*p == ' ') {
+ *p = '\0'; /* pretend string-end */
+ pParams = p+1;
+ } else
+ pParams = NULL;
+
+ /* the execProg() below is probably not great, but at least is is
+ * fairly secure now. Once we change the way file size limits are
+ * handled, we should also revisit how this command is run (and
+ * with which parameters). rgerhards, 2007-07-20
+ */
+ execProg(pCmd, 1, pParams);
+
+ free(pCmd);
+
+ localRet = getFileSize(pszCurrFName, &actualFileSize);
+
+ if(localRet == RS_RET_OK && actualFileSize >= pThis->iSizeLimit) {
+ ABORT_FINALIZE(RS_RET_SIZELIMITCMD_DIDNT_RESOLVE); /* OK, it didn't work out... */
+ } else if(localRet != RS_RET_FILE_NOT_FOUND) {
+ /* file not found is OK, the command may have moved away the file */
+ ABORT_FINALIZE(localRet);
+ }
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE)
+ dbgprintf("file size limit cmd for file '%s' did no resolve situation\n", pszCurrFName);
+ else
+ dbgprintf("file size limit cmd for file '%s' failed with code %d.\n", pszCurrFName, iRet);
+ pThis->bDisabled = 1;
+ }
+
+ RETiRet;
+}
+
+
+/* Check if the file has grown beyond the configured omfile iSizeLimit
+ * and, if so, initiate processing.
+ */
+static rsRetVal
+doSizeLimitProcessing(strm_t *pThis)
+{
+ uchar *pszCurrFName = NULL;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+ ASSERT(pThis->iSizeLimit != 0);
+ ASSERT(pThis->fd != -1);
+
+ if(pThis->iCurrOffs >= pThis->iSizeLimit) {
+ /* strmClosefile() destroys the current file name, so we
+ * need to preserve it.
+ */
+ CHKmalloc(pszCurrFName = ustrdup(pThis->pszCurrFName));
+ CHKiRet(strmCloseFile(pThis));
+ CHKiRet(resolveFileSizeLimit(pThis, pszCurrFName));
+ }
+
+finalize_it:
+ free(pszCurrFName);
+ RETiRet;
+}
+
+
+/* now, we define type-specific handlers. The provide a generic functionality,
* but for this specific type of strm. The mapping to these handlers happens during
* strm construction. Later on, handlers are called by pointers present in the
* strm instance object.
*/
+/* do the physical open() call on a file.
+ */
+static rsRetVal
+doPhysOpen(strm_t *pThis)
+{
+ int iFlags;
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ /* compute which flags we need to provide to open */
+ switch(pThis->tOperationsMode) {
+ case STREAMMODE_READ:
+ iFlags = O_CLOEXEC | O_NOCTTY | O_RDONLY;
+ break;
+ case STREAMMODE_WRITE: /* legacy mode used inside queue engine */
+ iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT;
+ break;
+ case STREAMMODE_WRITE_TRUNC:
+ iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_TRUNC;
+ break;
+ case STREAMMODE_WRITE_APPEND:
+ iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_APPEND;
+ break;
+ default:assert(0);
+ break;
+ }
+
+ pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
+ if(pThis->fd == -1) {
+ int ierrnoSave = errno;
+ dbgoprint((obj_t*) pThis, "open error %d, file '%s'\n", errno, pThis->pszCurrFName);
+ if(ierrnoSave == ENOENT)
+ ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
+ else
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ } else {
+ if(!ustrcmp(pThis->pszCurrFName, UCHAR_CONSTANT(_PATH_CONSOLE)) || isatty(pThis->fd)) {
+ DBGPRINTF("file %d is a tty-type file\n", pThis->fd);
+ pThis->bIsTTY = 1;
+ } else {
+ pThis->bIsTTY = 0;
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
/* open a strm file
* It is OK to call this function when the stream is already open. In that
* case, it returns immediately with RS_RET_OK
@@ -65,10 +239,8 @@ DEFobjStaticHelpers
static rsRetVal strmOpenFile(strm_t *pThis)
{
DEFiRet;
- int iFlags;
ASSERT(pThis != NULL);
- ASSERT(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE);
if(pThis->fd != -1)
ABORT_FINALIZE(RS_RET_OK);
@@ -89,31 +261,18 @@ static rsRetVal strmOpenFile(strm_t *pThis)
}
}
- /* compute which flags we need to provide to open */
- if(pThis->tOperationsMode == STREAMMODE_READ)
- iFlags = O_RDONLY;
- else
- iFlags = O_WRONLY | O_CREAT;
-
- iFlags |= pThis->iAddtlOpenFlags;
-
- pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
- if(pThis->fd == -1) {
- int ierrnoSave = errno;
- char errStr[1024];
- dbgoprint((obj_t*) pThis, "open error[%d]: '%s'; file '%s'/%s\n", errno,
- rs_strerror_r(errno, errStr, sizeof(errStr)), pThis->pszCurrFName,
- (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE");
- if(ierrnoSave == ENOENT)
- ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
- else
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- }
+ CHKiRet(doPhysOpen(pThis));
pThis->iCurrOffs = 0;
+ if(pThis->tOperationsMode == STREAMMODE_WRITE_APPEND) {
+ /* we need to obtain the current offset */
+ off_t offset;
+ CHKiRet(getFileSize(pThis->pszCurrFName, &offset));
+ pThis->iCurrOffs = offset;
+ }
- dbgoprint((obj_t*) pThis, "opened file '%s' for %s (0x%x) as %d\n", pThis->pszCurrFName,
- (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", iFlags, pThis->fd);
+ dbgoprint((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName,
+ (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd);
finalize_it:
RETiRet;
@@ -132,14 +291,26 @@ static rsRetVal strmCloseFile(strm_t *pThis)
ASSERT(pThis->fd != -1);
dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd);
- if(pThis->tOperationsMode == STREAMMODE_WRITE)
+ if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
- close(pThis->fd); // TODO: error check
+ close(pThis->fd);
pThis->fd = -1;
+ if(pThis->fdDir != -1) {
+ /* close associated directory handle, if it is open */
+ close(pThis->fdDir);
+ pThis->fdDir = -1;
+ }
+
if(pThis->bDeleteOnClose) {
- unlink((char*) pThis->pszCurrFName); // TODO: check returncode
+ if(unlink((char*) pThis->pszCurrFName) == -1) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("error %d unlinking '%s' - ignored: %s\n",
+ errno, pThis->pszCurrFName, errStr);
+ }
}
pThis->iCurrOffs = 0; /* we are back at begin of file */
@@ -238,10 +409,6 @@ strmHandleEOF(strm_t *pThis)
case STREAMTYPE_FILE_CIRCULAR:
/* we have multiple files and need to switch to the next one */
/* TODO: think about emulating EOF in this case (not yet needed) */
-#if 0
- if(pThis->iMaxFiles == 0) /* TODO: why do we need this? ;) */
- ABORT_FINALIZE(RS_RET_EOF);
-#endif
dbgoprint((obj_t*) pThis, "file %d EOF\n", pThis->fd);
CHKiRet(strmNextFile(pThis));
break;
@@ -299,7 +466,7 @@ finalize_it:
* NOTE: needs to be enhanced to support sticking with a strm entry (if not
* deleted).
*/
-rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
+static rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
{
DEFiRet;
@@ -333,7 +500,7 @@ finalize_it:
* character buffering capability.
* rgerhards, 2008-01-07
*/
-rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
+static rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
{
ASSERT(pThis != NULL);
ASSERT(pThis->iUngetC == -1);
@@ -355,7 +522,7 @@ rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
* are pthread_killed() upon termination. So if we use their native pointer, they
* can cleanup (but only then).
*/
-rsRetVal
+static rsRetVal
strmReadLine(strm_t *pThis, cstr_t **ppCStr)
{
DEFiRet;
@@ -372,7 +539,7 @@ strmReadLine(strm_t *pThis, cstr_t **ppCStr)
CHKiRet(rsCStrAppendChar(*ppCStr, c));
CHKiRet(strmReadChar(pThis, &c));
}
- CHKiRet(rsCStrFinish(*ppCStr));
+ CHKiRet(cstrFinalize(*ppCStr));
finalize_it:
if(iRet != RS_RET_OK && *ppCStr != NULL)
@@ -387,26 +554,55 @@ finalize_it:
BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */
pThis->iCurrFNum = 1;
pThis->fd = -1;
+ pThis->fdDir = -1;
pThis->iUngetC = -1;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
- pThis->tOpenMode = 0600; /* TODO: make configurable */
+ pThis->tOpenMode = 0600;
ENDobjConstruct(strm)
/* ConstructionFinalizer
* rgerhards, 2008-01-09
*/
-rsRetVal strmConstructFinalize(strm_t *pThis)
+static rsRetVal strmConstructFinalize(strm_t *pThis)
{
+ rsRetVal localRet;
DEFiRet;
ASSERT(pThis != NULL);
- if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */
- if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- pThis->iBufPtrMax = 0; /* results in immediate read request */
+ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ pThis->iBufPtrMax = 0; /* results in immediate read request */
+ if(pThis->iZipLevel) { /* do we need a zip buf? */
+ localRet = objUse(zlibw, LM_ZLIBW_FILENAME);
+ if(localRet != RS_RET_OK) {
+ pThis->iZipLevel = 0;
+ DBGPRINTF("stream was requested with zip mode, but zlibw module unavailable (%d) - using "
+ "without zip\n", localRet);
+ } else {
+ /* we use the same size as the original buf, as we would like
+ * to make sure we can write out everyting with a SINGLE api call!
+ */
+ CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ }
+ }
+
+ /* if we are aset to sync, we must obtain a file handle to the directory for fsync() purposes */
+ if(pThis->bSync && !pThis->bIsTTY) {
+ pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY | O_CLOEXEC | O_NOCTTY);
+ if(pThis->fdDir == -1) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("error %d opening directory file for fsync() use - fsync for directory disabled: %s\n",
+ errno, errStr);
+ }
+ }
+
+ /* if we should call flush apc's, we need a mutex */
+ if(pThis->iFlushInterval != 0) {
+ pthread_mutex_init(&pThis->mut, 0);
}
finalize_it:
@@ -417,21 +613,27 @@ finalize_it:
/* destructor for the strm object */
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(strm)
- if(pThis->tOperationsMode == STREAMMODE_WRITE)
+ if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
/* ... then free resources */
if(pThis->fd != -1)
strmCloseFile(pThis);
- if(pThis->pszDir != NULL)
- free(pThis->pszDir);
- if(pThis->pIOBuf != NULL)
- free(pThis->pIOBuf);
- if(pThis->pszCurrFName != NULL)
- free(pThis->pszCurrFName);
- if(pThis->pszFName != NULL)
- free(pThis->pszFName);
+ if(pThis->iZipLevel) { /* do we need a zip buf? */
+ objRelease(zlibw, LM_ZLIBW_FILENAME);
+ }
+
+ if(pThis->iFlushInterval != 0) {
+ // TODO: check if there is an apc and remove it!
+ pthread_mutex_destroy(&pThis->mut);
+ }
+
+ free(pThis->pszDir);
+ free(pThis->pIOBuf);
+ free(pThis->pZipBuf);
+ free(pThis->pszCurrFName);
+ free(pThis->pszFName);
ENDobjDestruct(strm)
@@ -457,48 +659,240 @@ finalize_it:
RETiRet;
}
-/* write memory buffer to a stream object.
- * To support direct writes of large objects, this method may be called
- * with a buffer pointing to some region other than the stream buffer itself.
- * However, in that case the stream buffer must be empty (strmFlush() has to
- * be called before), because we would otherwise mess up with the sequence
- * inside the stream. -- rgerhards, 2008-01-10
+
+/* try to recover a tty after a write error. This may have happend
+ * due to vhangup(), and, if so, we can simply re-open it.
*/
-static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+#ifdef linux
+# define ERR_TTYHUP EIO
+#else
+# define ERR_TTYHUP EBADF
+#endif
+static rsRetVal
+tryTTYRecover(strm_t *pThis, int err)
{
DEFiRet;
- int iWritten;
+ ISOBJ_TYPE_assert(pThis, strm);
+ if(err == ERR_TTYHUP) {
+ close(pThis->fd);
+ CHKiRet(doPhysOpen(pThis));
+ }
- ASSERT(pThis != NULL);
- ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
+finalize_it:
+ RETiRet;
+}
+#undef ER_TTYHUP
+
+
+/* issue write() api calls until either the buffer is completely
+ * written or an error occured (it may happen that multiple writes
+ * are required, what is perfectly legal. On exit, *pLenBuf contains
+ * the number of bytes actually written.
+ * rgerhards, 2009-06-08
+ */
+static rsRetVal
+doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf)
+{
+ ssize_t lenBuf;
+ ssize_t iTotalWritten;
+ ssize_t iWritten;
+ char *pWriteBuf;
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ lenBuf = *pLenBuf;
+ pWriteBuf = (char*) pBuf;
+ iTotalWritten = 0;
+ do {
+ iWritten = write(pThis->fd, pWriteBuf, lenBuf);
+ if(iWritten < 0) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("log file (%d) write error %d: %s\n", pThis->fd, err, errStr);
+ if(err == EINTR) {
+ /*NO ERROR, just continue */;
+ } else {
+ if(pThis->bIsTTY) {
+ CHKiRet(tryTTYRecover(pThis, err));
+ } else {
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ /* Would it make sense to cover more error cases? So far, I
+ * do not see good reason to do so.
+ */
+ }
+ }
+ }
+ /* advance buffer to next write position */
+ iTotalWritten += iWritten;
+ lenBuf -= iWritten;
+ pWriteBuf += iWritten;
+ } while(lenBuf > 0); /* Warning: do..while()! */
+
+finalize_it:
+ *pLenBuf = iTotalWritten;
+ RETiRet;
+}
+
+
+/* sync the file to disk, so that any unwritten data is persisted. This
+ * also syncs the directory and thus makes sure that the file survives
+ * fatal failure. Note that we do NOT return an error status if the
+ * sync fails. Doing so would probably cause more trouble than it
+ * is worth (read: data loss may occur where we otherwise might not
+ * have it). -- rgerhards, 2009-06-08
+ */
+static rsRetVal
+syncFile(strm_t *pThis)
+{
+ int ret;
+ DEFiRet;
+
+ if(pThis->bIsTTY)
+ FINALIZE; /* TTYs can not be synced */
+
+ DBGPRINTF("syncing file %d\n", pThis->fd);
+ ret = fdatasync(pThis->fd);
+ if(ret != 0) {
+ char errStr[1024];
+ int err = errno;
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ DBGPRINTF("sync failed for file %d with error (%d): %s - ignoring\n",
+ pThis->fd, err, errStr);
+ }
+
+ if(pThis->fdDir != -1) {
+ ret = fsync(pThis->fdDir);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* physically write to the output file. the provided data is ready for
+ * writing (e.g. zipped if we are requested to do that).
+ * Note that if the write() API fails, we do not reset any pointers, but return
+ * an error code. That means we may redo work in the next iteration.
+ * rgerhards, 2009-06-04
+ */
+static rsRetVal
+strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ size_t iWritten;
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, strm);
if(pThis->fd == -1)
CHKiRet(strmOpenFile(pThis));
- iWritten = write(pThis->fd, pBuf, lenBuf);
- dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, iWritten);
- /* TODO: handle error case -- rgerhards, 2008-01-07 */
-
- /* Now indicate buffer empty again. We do this in any case, because there
- * is no way we could react more intelligently to an error during write.
- * This MUST be done BEFORE strCheckNextOutputFile(), otherwise we have an
- * endless loop. We reset the buffer pointer also in finalize_it - this is
- * necessary if we run into problems. Not resetting it would again cause an
- * endless loop. So it is better to loose some data (which also justifies
- * duplicating that code, too...) -- rgerhards, 2008-01-10
- */
+ iWritten = lenBuf;
+ CHKiRet(doWriteCall(pThis, pBuf, &iWritten));
+ dbgoprint((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten);
+
pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
/* update user counter, if provided */
if(pThis->pUsrWCntr != NULL)
*pThis->pUsrWCntr += iWritten;
- if(pThis->sType == STREAMTYPE_FILE_CIRCULAR)
+ if(pThis->bSync) {
+ CHKiRet(syncFile(pThis));
+ }
+
+ if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) {
CHKiRet(strmCheckNextOutputFile(pThis));
+ } else if(pThis->iSizeLimit != 0) {
+ CHKiRet(doSizeLimitProcessing(pThis));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* write the output buffer in zip mode
+ * This means we compress it first and then do a physical write.
+ * Note that we always do a full deflateInit ... deflate ... deflateEnd
+ * sequence. While this is not optimal, we need to do it because we need
+ * to ensure that the file is readable even when we are aborted. Doing the
+ * full sequence brings us as far towards this goal as possible (and not
+ * doing it would be a total failure). It may be worth considering to
+ * add a config switch so that the user can decide the risk he is ready
+ * to take, but so far this is not yet implemented (not even requested ;)).
+ * rgerhards, 2009-06-04
+ */
+static rsRetVal
+doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ z_stream zstrm;
+ int zRet; /* zlib return state */
+ DEFiRet;
+ assert(pThis != NULL);
+ assert(pBuf != NULL);
+
+ /* allocate deflate state */
+ zstrm.zalloc = Z_NULL;
+ zstrm.zfree = Z_NULL;
+ zstrm.opaque = Z_NULL;
+ /* see note in file header for the params we use with deflateInit2() */
+ zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
+ if(zRet != Z_OK) {
+ dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+
+ /* now doing the compression */
+ zstrm.avail_in = lenBuf;
+ zstrm.next_in = (Bytef*) pBuf;
+ /* run deflate() on input until output buffer not full, finish
+ compression if all of source has been read in */
+ do {
+ dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
+ zstrm.avail_out = pThis->sIOBufSize;
+ zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
+ dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
+ assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
+ } while (zstrm.avail_out == 0);
+ assert(zstrm.avail_in == 0); /* all input will be used */
+
+
+ zRet = zlibw.DeflateEnd(&zstrm);
+ if(zRet != Z_OK) {
+ dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
finalize_it:
- pThis->iBufPtr = 0; /* see comment above */
+ RETiRet;
+}
+
+
+/* write memory buffer to a stream object.
+ * To support direct writes of large objects, this method may be called
+ * with a buffer pointing to some region other than the stream buffer itself.
+ * However, in that case the stream buffer must be empty (strmFlush() has to
+ * be called before), because we would otherwise mess up with the sequence
+ * inside the stream. -- rgerhards, 2008-01-10
+ */
+static rsRetVal
+strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
+ if(pThis->iZipLevel) {
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ } else {
+ /* write without zipping */
+ CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
+ }
+
+finalize_it:
RETiRet;
}
@@ -507,14 +901,15 @@ finalize_it:
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
*/
-rsRetVal strmFlush(strm_t *pThis)
+static rsRetVal
+strmFlush(strm_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr);
- if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) {
+ if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
@@ -549,7 +944,7 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs)
/* seek to current offset. This is primarily a helper to readjust the OS file
* pointer after a strm object has been deserialized.
*/
-rsRetVal strmSeekCurrOffs(strm_t *pThis)
+static rsRetVal strmSeekCurrOffs(strm_t *pThis)
{
DEFiRet;
@@ -562,7 +957,7 @@ rsRetVal strmSeekCurrOffs(strm_t *pThis)
/* write a *single* character to a stream object -- rgerhards, 2008-01-10
*/
-rsRetVal strmWriteChar(strm_t *pThis, uchar c)
+static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
{
DEFiRet;
@@ -582,7 +977,7 @@ finalize_it:
/* write an integer value (actually a long) to a stream object */
-rsRetVal strmWriteLong(strm_t *pThis, long i)
+static rsRetVal strmWriteLong(strm_t *pThis, long i)
{
DEFiRet;
uchar szBuf[32];
@@ -597,16 +992,48 @@ finalize_it:
}
+/* schedule an Apc flush request.
+ * rgerhards, 2009-06-15
+ */
+static inline rsRetVal
+scheduleFlushRequest(strm_t *pThis)
+{
+ apc_t *pApc;
+ DEFiRet;
+
+ CHKiRet(apc.CancelApc(pThis->apcID));
+dbgprintf("XXX: requesting to add apc!\n");
+ CHKiRet(apc.Construct(&pApc));
+ CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc));
+ CHKiRet(apc.SetParam1(pApc, pThis));
+ CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID));
+
+finalize_it:
+ RETiRet;
+}
+
+
/* write memory buffer to a stream object
*/
-rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+static rsRetVal
+strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
+ DEFVARS_mutexProtection_uncond;
DEFiRet;
size_t iPartial;
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
+dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n", pThis, pBuf,lenBuf, pThis->bDisabled, pThis->iSizeLimit, pThis->iCurrOffs);
+ if(pThis->bDisabled)
+ ABORT_FINALIZE(RS_RET_STREAM_DISABLED);
+
+RUNLOG_VAR("%d", pThis->iFlushInterval);
+ if(pThis->iFlushInterval != 0) {
+ BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ }
+
/* check if the to-be-written data is larger than our buffer size */
if(lenBuf >= pThis->sIOBufSize) {
/* it is - so we do a direct write, that is most efficient.
@@ -635,7 +1062,17 @@ rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
}
}
+ /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at
+ * termination. For Zip mode, it could be fatal if we write after each record.
+ */
+ if(pThis->iFlushInterval != 0)
+ scheduleFlushRequest(pThis);
+
finalize_it:
+ if(pThis->iFlushInterval != 0) {
+ END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut);
+ }
+
RETiRet;
}
@@ -648,34 +1085,27 @@ DEFpropSetMeth(strm, iFileNumDigits, int)
DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
-
-rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
+DEFpropSetMeth(strm, iZipLevel, int)
+DEFpropSetMeth(strm, bSync, int)
+DEFpropSetMeth(strm, sIOBufSize, size_t)
+DEFpropSetMeth(strm, iSizeLimit, off_t)
+DEFpropSetMeth(strm, iFlushInterval, int)
+DEFpropSetMeth(strm, pszSizeLimitCmd, uchar*)
+
+static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
{
pThis->iMaxFiles = iNewVal;
pThis->iFileNumDigits = getNumberDigits(iNewVal);
return RS_RET_OK;
}
-rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal)
-{
- DEFiRet;
-
- if(iNewVal & O_APPEND)
- ABORT_FINALIZE(RS_RET_PARAM_ERROR);
-
- pThis->iAddtlOpenFlags = iNewVal;
-
-finalize_it:
- RETiRet;
-}
-
/* set the stream's file prefix
* The passed-in string is duplicated. So if the caller does not need
* it any longer, it must free it.
* rgerhards, 2008-01-09
*/
-rsRetVal
+static rsRetVal
strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName)
{
DEFiRet;
@@ -689,7 +1119,7 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName)
if(pThis->pszFName != NULL)
free(pThis->pszFName);
- if((pThis->pszFName = malloc(sizeof(uchar) * iLenName + 1)) == NULL)
+ if((pThis->pszFName = malloc(sizeof(uchar) * (iLenName + 1))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */
@@ -705,7 +1135,7 @@ finalize_it:
* it any longer, it must free it.
* rgerhards, 2008-01-09
*/
-rsRetVal
+static rsRetVal
strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir)
{
DEFiRet;
@@ -749,7 +1179,7 @@ finalize_it:
*
* rgerhards, 2008-01-10
*/
-rsRetVal strmRecordBegin(strm_t *pThis)
+static rsRetVal strmRecordBegin(strm_t *pThis)
{
ASSERT(pThis != NULL);
ASSERT(pThis->bInRecord == 0);
@@ -757,7 +1187,7 @@ rsRetVal strmRecordBegin(strm_t *pThis)
return RS_RET_OK;
}
-rsRetVal strmRecordEnd(strm_t *pThis)
+static rsRetVal strmRecordEnd(strm_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
@@ -779,7 +1209,7 @@ rsRetVal strmRecordEnd(strm_t *pThis)
* We do not serialize the dynamic properties.
* rgerhards, 2008-01-10
*/
-rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
+static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
{
DEFiRet;
int i;
@@ -840,7 +1270,6 @@ strmDup(strm_t *pThis, strm_t **ppNew)
pNew->lenDir = pThis->lenDir;
pNew->tOperationsMode = pThis->tOperationsMode;
pNew->tOpenMode = pThis->tOpenMode;
- pNew->iAddtlOpenFlags = pThis->iAddtlOpenFlags;
pNew->iMaxFileSize = pThis->iMaxFileSize;
pNew->iMaxFiles = pThis->iMaxFiles;
pNew->iFileNumDigits = pThis->iFileNumDigits;
@@ -866,7 +1295,7 @@ finalize_it:
* any new set overwrites the previous one.
* rgerhards, 2008-02-27
*/
-rsRetVal
+static rsRetVal
strmSetWCntr(strm_t *pThis, number_t *pWCnt)
{
DEFiRet;
@@ -886,8 +1315,8 @@ strmSetWCntr(strm_t *pThis, number_t *pWCnt)
/* This function can be used as a generic way to set properties.
* rgerhards, 2008-01-11
*/
-#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1)
-rsRetVal strmSetProperty(strm_t *pThis, var_t *pProp)
+#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, UCHAR_CONSTANT(name), sizeof(name) - 1)
+static rsRetVal strmSetProperty(strm_t *pThis, var_t *pProp)
{
DEFiRet;
@@ -926,7 +1355,7 @@ finalize_it:
* reported on the second call may actually be lower than on the first call. This is due to
* file circulation. A caller must deal with that. -- rgerhards, 2008-01-30
*/
-rsRetVal
+static rsRetVal
strmGetCurrOffset(strm_t *pThis, int64 *pOffs)
{
DEFiRet;
@@ -954,8 +1383,39 @@ CODESTARTobjQueryInterface(strm)
* work here (if we can support an older interface version - that,
* of course, also affects the "if" above).
*/
- /*xxxpIf->oID = OBJvm; SAMPLE */
-
+ pIf->Construct = strmConstruct;
+ pIf->ConstructFinalize = strmConstructFinalize;
+ pIf->Destruct = strmDestruct;
+ pIf->ReadChar = strmReadChar;
+ pIf->UnreadChar = strmUnreadChar;
+ pIf->ReadLine = strmReadLine;
+ pIf->SeekCurrOffs = strmSeekCurrOffs;
+ pIf->Write = strmWrite;
+ pIf->WriteChar = strmWriteChar;
+ pIf->WriteLong = strmWriteLong;
+ pIf->SetFName = strmSetFName;
+ pIf->SetDir = strmSetDir;
+ pIf->Flush = strmFlush;
+ pIf->RecordBegin = strmRecordBegin;
+ pIf->RecordEnd = strmRecordEnd;
+ pIf->Serialize = strmSerialize;
+ pIf->GetCurrOffset = strmGetCurrOffset;
+ pIf->Dup = strmDup;
+ pIf->SetWCntr = strmSetWCntr;
+ /* set methods */
+ pIf->SetbDeleteOnClose = strmSetbDeleteOnClose;
+ pIf->SetiMaxFileSize = strmSetiMaxFileSize;
+ pIf->SetiMaxFiles = strmSetiMaxFiles;
+ pIf->SetiFileNumDigits = strmSetiFileNumDigits;
+ pIf->SettOperationsMode = strmSettOperationsMode;
+ pIf->SettOpenMode = strmSettOpenMode;
+ pIf->SetsType = strmSetsType;
+ pIf->SetiZipLevel = strmSetiZipLevel;
+ pIf->SetbSync = strmSetbSync;
+ pIf->SetsIOBufSize = strmSetsIOBufSize;
+ pIf->SetiSizeLimit = strmSetiSizeLimit;
+ pIf->SetiFlushInterval = strmSetiFlushInterval;
+ pIf->SetpszSizeLimitCmd = strmSetpszSizeLimitCmd;
finalize_it:
ENDobjQueryInterface(strm)
@@ -966,13 +1426,12 @@ ENDobjQueryInterface(strm)
*/
BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
+ CHKiRet(objUse(apc, CORE_COMPONENT));
OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty);
OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);
ENDObjClassInit(strm)
-
-/*
- * vi:set ai:
+/* vi:set ai:
*/