summaryrefslogtreecommitdiffstats
path: root/runtime/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/stream.c')
-rw-r--r--runtime/stream.c168
1 files changed, 144 insertions, 24 deletions
diff --git a/runtime/stream.c b/runtime/stream.c
index 261eb9ca..abcfce0c 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -1,4 +1,3 @@
-//TODO: O_TRUC mode!
/* The serial stream class.
*
* A serial stream provides serial data access. In theory, serial streams
@@ -50,6 +49,7 @@
/* static data */
DEFobjStaticHelpers
+DEFobjCurrIf(zlibw)
/* forward definitions */
static rsRetVal strmFlush(strm_t *pThis);
@@ -74,7 +74,6 @@ static rsRetVal strmOpenFile(strm_t *pThis)
int iFlags;
ASSERT(pThis != NULL);
- ASSERT(pThis->tOperationsMode == STREAMMODE_READ || pThis->tOperationsMode == STREAMMODE_WRITE);
if(pThis->fd != -1)
ABORT_FINALIZE(RS_RET_OK);
@@ -95,13 +94,27 @@ static rsRetVal strmOpenFile(strm_t *pThis)
}
}
+RUNLOG_VAR("%d", pThis->tOperationsMode);
/* compute which flags we need to provide to open */
- if(pThis->tOperationsMode == STREAMMODE_READ)
- iFlags = O_RDONLY;
- else
- iFlags = O_WRONLY | O_CREAT;
+ switch(pThis->tOperationsMode) {
+ case STREAMMODE_READ:
+ iFlags = O_CLOEXEC | O_NOCTTY | O_RDONLY;
+ break;
+ case STREAMMODE_WRITE: /* legacy mode used inside queue engine */
+ iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT;
+ break;
+ case STREAMMODE_WRITE_TRUNC:
+ iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_TRUNC;
+ break;
+ case STREAMMODE_WRITE_APPEND:
+ iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_APPEND;
+ break;
+ default:assert(0);
+ break;
+ }
- iFlags |= pThis->iAddtlOpenFlags;
+ iFlags |= pThis->iAddtlOpenFlags; // TODO: remove this!
+dbgprintf("XXX: open with flags %d\n", iFlags);
pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
if(pThis->fd == -1) {
@@ -135,7 +148,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
ASSERT(pThis->fd != -1);
dbgoprint((obj_t*) pThis, "file %d closing\n", pThis->fd);
- if(pThis->tOperationsMode == STREAMMODE_WRITE)
+ if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
close(pThis->fd); // TODO: error check
@@ -398,14 +411,25 @@ ENDobjConstruct(strm)
*/
static rsRetVal strmConstructFinalize(strm_t *pThis)
{
+ rsRetVal localRet;
DEFiRet;
ASSERT(pThis != NULL);
- if(pThis->pIOBuf == NULL) { /* allocate our io buffer in case we have not yet */
- if((pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- pThis->iBufPtrMax = 0; /* results in immediate read request */
+ CHKmalloc(pThis->pIOBuf = (uchar*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ pThis->iBufPtrMax = 0; /* results in immediate read request */
+ if(pThis->iZipLevel) { /* do we need a zip buf? */
+ localRet = objUse(zlibw, LM_ZLIBW_FILENAME);
+ if(localRet != RS_RET_OK) {
+ pThis->iZipLevel = 0;
+ DBGPRINTF("stream was requested with zip mode, but zlibw module unavailable (%d) - using "
+ "without zip\n", localRet);
+ } else {
+ /* we use the same size as the original buf, as we would like
+ * to make sure we can write out everyting with a SINGLE api call!
+ */
+ CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ }
}
finalize_it:
@@ -416,15 +440,20 @@ finalize_it:
/* destructor for the strm object */
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(strm)
- if(pThis->tOperationsMode == STREAMMODE_WRITE)
+ if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
/* ... then free resources */
if(pThis->fd != -1)
strmCloseFile(pThis);
+ if(pThis->iZipLevel) { /* do we need a zip buf? */
+ objRelease(zlibw, LM_ZLIBW_FILENAME);
+ }
+
free(pThis->pszDir);
free(pThis->pIOBuf);
+ free(pThis->pZipBuf);
free(pThis->pszCurrFName);
free(pThis->pszFName);
ENDobjDestruct(strm)
@@ -453,20 +482,17 @@ finalize_it:
}
-/* write memory buffer to a stream object.
- * To support direct writes of large objects, this method may be called
- * with a buffer pointing to some region other than the stream buffer itself.
- * However, in that case the stream buffer must be empty (strmFlush() has to
- * be called before), because we would otherwise mess up with the sequence
- * inside the stream. -- rgerhards, 2008-01-10
+/* physically write to the output file. the provided data is ready for
+ * writing (e.g. zipped if we are requested to do that).
+ * rgerhards, 2009-06-04
*/
-static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+static rsRetVal
+strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
int iWritten;
ASSERT(pThis != NULL);
- ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
if(pThis->fd == -1)
CHKiRet(strmOpenFile(pThis));
@@ -499,6 +525,95 @@ finalize_it:
}
+/* write the output buffer in zip mode
+ * This means we compress it first and then do a physical write.
+ * Note that we always do a full deflateInit ... deflate ... deflateEnd
+ * sequence. While this is not optimal, we need to do it because we need
+ * to ensure that the file is readable even when we are aborted. Doing the
+ * full sequence brings us as far towards this goal as possible (and not
+ * doing it would be a total failure). It may be worth considering to
+ * add a config switch so that the user can decide the risk he is ready
+ * to take, but so far this is not yet implemented (not even requested ;)).
+ * rgerhards, 2009-06-04
+ */
+static rsRetVal
+doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ z_stream zstrm;
+ int zRet; /* zlib return state */
+ DEFiRet;
+ assert(pThis != NULL);
+ assert(pBuf != NULL);
+
+ /* allocate deflate state */
+ zstrm.zalloc = Z_NULL;
+ zstrm.zfree = Z_NULL;
+ zstrm.opaque = Z_NULL;
+ /* see note in file header for the params we use with deflateInit2() */
+ zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
+ if(zRet != Z_OK) {
+ dbgprintf("error %d returned from zlib/deflateInit2()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+RUNLOG_STR("deflateInit2() done successfully\n");
+
+ /* now doing the compression */
+ zstrm.avail_in = lenBuf;
+ zstrm.next_in = (Bytef*) pBuf;
+ /* run deflate() on input until output buffer not full, finish
+ compression if all of source has been read in */
+ do {
+ dbgprintf("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
+ zstrm.avail_out = pThis->sIOBufSize;
+ zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
+ dbgprintf("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
+ assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
+ } while (zstrm.avail_out == 0);
+ assert(zstrm.avail_in == 0); /* all input will be used */
+
+RUNLOG_STR("deflate() should be done successfully\n");
+
+ zRet = zlibw.DeflateEnd(&zstrm);
+ if(zRet != Z_OK) {
+ dbgprintf("error %d returned from zlib/deflateEnd()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+RUNLOG_STR("deflateEnd() done successfully\n");
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* write memory buffer to a stream object.
+ * To support direct writes of large objects, this method may be called
+ * with a buffer pointing to some region other than the stream buffer itself.
+ * However, in that case the stream buffer must be empty (strmFlush() has to
+ * be called before), because we would otherwise mess up with the sequence
+ * inside the stream. -- rgerhards, 2008-01-10
+ */
+static rsRetVal
+strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ASSERT(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
+
+ if(pThis->iZipLevel) {
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ } else {
+ /* write without zipping */
+ CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
@@ -511,7 +626,7 @@ strmFlush(strm_t *pThis)
ASSERT(pThis != NULL);
dbgoprint((obj_t*) pThis, "file %d flush, buflen %ld\n", pThis->fd, (long) pThis->iBufPtr);
- if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) {
+ if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
@@ -605,6 +720,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
ASSERT(pThis != NULL);
ASSERT(pBuf != NULL);
+dbgprintf("strmWrite(%p, '%s', %ld);\n", pThis, pBuf,lenBuf);
/* check if the to-be-written data is larger than our buffer size */
if(lenBuf >= pThis->sIOBufSize) {
/* it is - so we do a direct write, that is most efficient.
@@ -646,6 +762,7 @@ DEFpropSetMeth(strm, iFileNumDigits, int)
DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
+DEFpropSetMeth(strm, iZipLevel, int)
static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
{
@@ -681,13 +798,14 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName)
ASSERT(pThis != NULL);
ASSERT(pszName != NULL);
+dbgprintf("XXX: strm setFname: '%s'\n", pszName);
if(iLenName < 1)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
if(pThis->pszFName != NULL)
free(pThis->pszFName);
- if((pThis->pszFName = malloc(sizeof(uchar) * iLenName + 1)) == NULL)
+ if((pThis->pszFName = malloc(sizeof(uchar) * (iLenName + 1))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */
@@ -711,6 +829,7 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir)
ASSERT(pThis != NULL);
ASSERT(pszDir != NULL);
+dbgprintf("XXX: strm setDir: '%s'\n", pszDir);
if(iLenDir < 1)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
@@ -927,7 +1046,7 @@ CODESTARTobjQueryInterface(strm)
pIf->RecordBegin = strmRecordBegin;
pIf->RecordEnd = strmRecordEnd;
pIf->Serialize = strmSerialize;
- pIf->SetiAddtlOpenFlags = strmSetiAddtlOpenFlags;
+ /* TODO: remove this! */ pIf->SetiAddtlOpenFlags = strmSetiAddtlOpenFlags;
pIf->GetCurrOffset = strmGetCurrOffset;
pIf->SetWCntr = strmSetWCntr;
/* set methods */
@@ -938,6 +1057,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SettOperationsMode = strmSettOperationsMode;
pIf->SettOpenMode = strmSettOpenMode;
pIf->SetsType = strmSetsType;
+ pIf->SetiZipLevel = strmSetiZipLevel;
finalize_it:
ENDobjQueryInterface(strm)