summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--msg.c54
-rw-r--r--obj.c135
-rw-r--r--obj.h15
-rw-r--r--queue.c9
-rw-r--r--stream.c132
-rw-r--r--stream.h22
6 files changed, 210 insertions, 157 deletions
diff --git a/msg.c b/msg.c
index 779224ee..f5ebf36a 100644
--- a/msg.c
+++ b/msg.c
@@ -387,40 +387,36 @@ msg_t* MsgDup(msg_t* pOld)
* during msg construction - and never again used later.
* rgerhards, 2008-01-03
*/
-static rsRetVal MsgSerialize(msg_t *pThis, rsCStrObj **ppCStr)
+static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm)
{
DEFiRet;
- rsCStrObj *pCStr;
- assert(ppCStr != NULL);
-
- CHKiRet(objBeginSerialize(&pCStr, (obj_t*) pThis, 1024));
- objSerializeSCALAR(iProtocolVersion, SHORT);
- objSerializeSCALAR(iSeverity, SHORT);
- objSerializeSCALAR(iFacility, SHORT);
- objSerializeSCALAR(msgFlags, INT);
- objSerializeSCALAR(tRcvdAt, SYSLOGTIME);
- objSerializeSCALAR(tTIMESTAMP, SYSLOGTIME);
-
- objSerializePTR(pszRawMsg, PSZ);
- objSerializePTR(pszMSG, PSZ);
- objSerializePTR(pszUxTradMsg, PSZ);
- objSerializePTR(pszTAG, PSZ);
- objSerializePTR(pszHOSTNAME, PSZ);
- objSerializePTR(pszRcvFrom, PSZ);
-
- objSerializePTR(pCSStrucData, CSTR);
- objSerializePTR(pCSAPPNAME, CSTR);
- objSerializePTR(pCSPROCID, CSTR);
- objSerializePTR(pCSMSGID, CSTR);
-
- CHKiRet(objEndSerialize((&pCStr), (obj_t*) pThis));
- *ppCStr = pCStr;
+ assert(pThis != NULL);
+ assert(pStrm != NULL);
+
+ CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis));
+ objSerializeSCALAR(pStrm, iProtocolVersion, SHORT);
+ objSerializeSCALAR(pStrm, iSeverity, SHORT);
+ objSerializeSCALAR(pStrm, iFacility, SHORT);
+ objSerializeSCALAR(pStrm, msgFlags, INT);
+ objSerializeSCALAR(pStrm, tRcvdAt, SYSLOGTIME);
+ objSerializeSCALAR(pStrm, tTIMESTAMP, SYSLOGTIME);
+
+ objSerializePTR(pStrm, pszRawMsg, PSZ);
+ objSerializePTR(pStrm, pszMSG, PSZ);
+ objSerializePTR(pStrm, pszUxTradMsg, PSZ);
+ objSerializePTR(pStrm, pszTAG, PSZ);
+ objSerializePTR(pStrm, pszHOSTNAME, PSZ);
+ objSerializePTR(pStrm, pszRcvFrom, PSZ);
+
+ objSerializePTR(pStrm, pCSStrucData, CSTR);
+ objSerializePTR(pStrm, pCSAPPNAME, CSTR);
+ objSerializePTR(pStrm, pCSPROCID, CSTR);
+ objSerializePTR(pStrm, pCSMSGID, CSTR);
+
+ CHKiRet(objEndSerialize(pStrm, (obj_t*) pThis));
finalize_it:
- if(iRet != RS_RET_OK && pCStr != NULL)
- rsCStrDestruct(pCStr);
-
return iRet;
}
diff --git a/obj.c b/obj.c
index eb9fd2c9..c68f2c24 100644
--- a/obj.c
+++ b/obj.c
@@ -118,6 +118,38 @@ rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pH
/* --------------- object serializiation / deserialization support --------------- */
+
+/* serialize the header of an object */
+static rsRetVal objSerializeHeader(strm_t *pStrm, obj_t *pObj)
+{
+ DEFiRet;
+
+ assert(pStrm != NULL);
+ assert(pObj != NULL);
+
+ /* object cookie and serializer version (so far always 1) */
+ CHKiRet(strmWriteChar(pStrm, COOKIE_OBJLINE));
+ CHKiRet(strmWrite(pStrm, (uchar*) "Obj1", sizeof("Obj1") - 1));
+
+ /* object type, version and string length */
+ CHKiRet(strmWriteChar(pStrm, ':'));
+ CHKiRet(strmWriteLong(pStrm, objGetObjID(pObj)));
+ CHKiRet(strmWriteChar(pStrm, ':'));
+ CHKiRet(strmWriteLong(pStrm, objGetVersion(pObj)));
+
+ /* and finally we write the object name - this is primarily meant for
+ * human readers. The idea is that it can be easily skipped when reading
+ * the object back in
+ */
+ CHKiRet(strmWriteChar(pStrm, ':'));
+ CHKiRet(strmWrite(pStrm, objGetName(pObj), strlen((char*)objGetName(pObj))));
+ /* record trailer */
+ CHKiRet(strmWriteChar(pStrm, ':'));
+ CHKiRet(strmWriteChar(pStrm, '\n'));
+
+finalize_it:
+ return iRet;
+}
/* begin serialization of an object - this is a very simple hook. It once wrote the wrapper,
* now it only constructs the string object. We still leave it in here so that we may utilize
* it in the future (it is a nice abstraction). iExpcectedObjSize is an optimization setting.
@@ -127,17 +159,15 @@ rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pH
* the caller is advised to be conservative in guessing. Binary multiples are recommended.
* rgerhards, 2008-01-06
*/
-rsRetVal objBeginSerialize(rsCStrObj **ppCStr, obj_t *pObj, size_t iExpectedObjSize)
+rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj)
{
DEFiRet;
- assert(ppCStr != NULL);
+ assert(pStrm != NULL);
assert(pObj != NULL);
-
- if((*ppCStr = rsCStrConstruct()) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- rsCStrSetAllocIncrement(*ppCStr, iExpectedObjSize);
+
+ CHKiRet(strmRecordBegin(pStrm));
+ CHKiRet(objSerializeHeader(pStrm, pObj));
finalize_it:
return iRet;
@@ -146,14 +176,14 @@ finalize_it:
/* append a property
*/
-rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t propType, void *pUsr)
+rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t propType, void *pUsr)
{
DEFiRet;
uchar *pszBuf = NULL;
size_t lenBuf = 0;
uchar szBuf[64];
- assert(pCStr != NULL);
+ assert(pStrm != NULL);
assert(pszPropName != NULL);
/* if we have no user pointer, there is no need to write this property.
@@ -209,64 +239,23 @@ rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t p
}
/* cookie */
- CHKiRet(rsCStrAppendChar(pCStr, COOKIE_PROPLINE));
+ CHKiRet(strmWriteChar(pStrm, COOKIE_PROPLINE));
/* name */
- CHKiRet(rsCStrAppendStr(pCStr, pszPropName));
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
+ CHKiRet(strmWrite(pStrm, pszPropName, strlen((char*)pszPropName)));
+ CHKiRet(strmWriteChar(pStrm, ':'));
/* type */
- CHKiRet(rsCStrAppendInt(pCStr, (int) propType));
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
+ CHKiRet(strmWriteLong(pStrm, (int) propType));
+ CHKiRet(strmWriteChar(pStrm, ':'));
/* length */
- CHKiRet(rsCStrAppendInt(pCStr, lenBuf));
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
+ CHKiRet(strmWriteLong(pStrm, lenBuf));
+ CHKiRet(strmWriteChar(pStrm, ':'));
/* data */
- CHKiRet(rsCStrAppendStrWithLen(pCStr, (uchar*) pszBuf, lenBuf));
+ CHKiRet(strmWrite(pStrm, (uchar*) pszBuf, lenBuf));
/* trailer */
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
- CHKiRet(rsCStrAppendChar(pCStr, '\n'));
-
-finalize_it:
- return iRet;
-}
-
-
-static rsRetVal objSerializeHeader(rsCStrObj **ppCStr, obj_t *pObj, rsCStrObj *pCSObjString, size_t iAllocIncrement)
-{
- DEFiRet;
- rsCStrObj *pCStr;
-
- assert(ppCStr != NULL);
- assert(pObj != NULL);
-
- if((pCStr = rsCStrConstruct()) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- rsCStrSetAllocIncrement(pCStr, iAllocIncrement);
-
- /* object cookie and serializer version (so far always 1) */
- CHKiRet(rsCStrAppendChar(pCStr, COOKIE_OBJLINE));
- CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "Obj1"));
-
- /* object type, version and string length */
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
- CHKiRet(rsCStrAppendInt(pCStr, objGetObjID(pObj)));
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
- CHKiRet(rsCStrAppendInt(pCStr, objGetVersion(pObj)));
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
- CHKiRet(rsCStrAppendInt(pCStr, rsCStrLen(pCSObjString)));
-
- /* and finally we write the object name - this is primarily meant for
- * human readers. The idea is that it can be easily skipped when reading
- * the object back in
- */
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
- CHKiRet(rsCStrAppendStr(pCStr, objGetName(pObj)));
- /* record trailer */
- CHKiRet(rsCStrAppendChar(pCStr, ':'));
- CHKiRet(rsCStrAppendChar(pCStr, '\n'));
-
- *ppCStr = pCStr;
+ CHKiRet(strmWriteChar(pStrm, ':'));
+ CHKiRet(strmWriteChar(pStrm, '\n'));
finalize_it:
return iRet;
@@ -276,27 +265,20 @@ finalize_it:
/* end serialization of an object. The caller receives a
* standard C string, which he must free when no longer needed.
*/
-rsRetVal objEndSerialize(rsCStrObj **ppCStr, obj_t *pObj)
+rsRetVal objEndSerialize(strm_t *pStrm, obj_t *pObj)
{
DEFiRet;
- rsCStrObj *pCStr = NULL;
- assert(ppCStr != NULL);
- CHKiRet(objSerializeHeader(&pCStr, pObj, *ppCStr, rsCStrGetAllocIncrement(*ppCStr)));
+ assert(pStrm != NULL);
- CHKiRet(rsCStrAppendStrWithLen(pCStr, rsCStrGetBufBeg(*ppCStr), rsCStrLen(*ppCStr)));
- CHKiRet(rsCStrAppendChar(pCStr, COOKIE_ENDLINE));
- CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "EndObj\n"));
- CHKiRet(rsCStrAppendChar(pCStr, COOKIE_BLANKLINE));
- CHKiRet(rsCStrAppendChar(pCStr, '\n'));
- CHKiRet(rsCStrFinish(pCStr));
+ CHKiRet(strmWriteChar(pStrm, COOKIE_ENDLINE));
+ CHKiRet(strmWrite(pStrm, (uchar*) "End\n", sizeof("END\n") - 1));
+ CHKiRet(strmWriteChar(pStrm, COOKIE_BLANKLINE));
+ CHKiRet(strmWriteChar(pStrm, '\n'));
- rsCStrDestruct(*ppCStr);
- *ppCStr = pCStr;
+ CHKiRet(strmRecordEnd(pStrm));
finalize_it:
- if(iRet != RS_RET_OK && pCStr != NULL)
- rsCStrDestruct(pCStr);
return iRet;
}
@@ -415,7 +397,7 @@ static rsRetVal objDeserializeHeader(objID_t *poID, int* poVers, strm_t *pStrm)
NEXTC; if(c != '1') ABORT_FINALIZE(RS_RET_INVALID_HEADER_VERS);
NEXTC; if(c != ':') ABORT_FINALIZE(RS_RET_INVALID_HEADER_VERS);
- /* object type and version and string length */
+ /* object type and version */
CHKiRet(objDeserializeLong(&ioID, pStrm));
CHKiRet(objDeserializeLong(&oVers, pStrm));
@@ -522,9 +504,6 @@ static rsRetVal objDeserializeTrailer(strm_t *pStrm)
NEXTC; if(c != 'E') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
NEXTC; if(c != 'n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
NEXTC; if(c != 'd') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
- NEXTC; if(c != 'O') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
- NEXTC; if(c != 'b') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
- NEXTC; if(c != 'j') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
NEXTC; if(c != COOKIE_BLANKLINE) ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
diff --git a/obj.h b/obj.h
index ab9470ac..c27e228c 100644
--- a/obj.h
+++ b/obj.h
@@ -59,10 +59,10 @@
free(pThis); \
}
-#define objSerializeSCALAR(propName, propType) \
- CHKiRet(objSerializeProp(pCStr, (uchar*) #propName, PROPTYPE_##propType, (void*) &pThis->propName));
-#define objSerializePTR(propName, propType) \
- CHKiRet(objSerializeProp(pCStr, (uchar*) #propName, PROPTYPE_##propType, (void*) pThis->propName));
+#define objSerializeSCALAR(strm, propName, propType) \
+ CHKiRet(objSerializeProp(strm, (uchar*) #propName, PROPTYPE_##propType, (void*) &pThis->propName));
+#define objSerializePTR(strm, propName, propType) \
+ CHKiRet(objSerializeProp(strm, (uchar*) #propName, PROPTYPE_##propType, (void*) pThis->propName));
#define DEFobjStaticHelpers static objInfo_t *pObjInfoOBJ = NULL;
#define objGetName(pThis) (((obj_t*) (pThis))->pObjInfo->pszName)
#define objGetObjID(pThis) (((obj_t*) (pThis))->pObjInfo->objID)
@@ -78,10 +78,9 @@
/* prototypes */
rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int iObjVers, rsRetVal (*pConstruct)(void *), rsRetVal (*pDestruct)(void *));
rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pHandler)(void*));
-rsRetVal objBeginSerialize(rsCStrObj **ppCStr, obj_t *pObj, size_t iExpectedObjSize);
-rsRetVal objSerializePsz(rsCStrObj *pCStr, uchar *psz, size_t len);
-rsRetVal objEndSerialize(rsCStrObj **ppCStr, obj_t *pObj);
-rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t propType, void *pUsr);
+rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj);
+rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t propType, void *pUsr);
+rsRetVal objEndSerialize(strm_t *pStrm, obj_t *pObj);
rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo);
rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pSerStore);
PROTOTYPEObjClassInit(obj);
diff --git a/queue.c b/queue.c
index 6ff4358f..a716c725 100644
--- a/queue.c
+++ b/queue.c
@@ -238,15 +238,16 @@ static rsRetVal qDestructDisk(queue_t *pThis)
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
- rsCStrObj *pCStr;
assert(pThis != NULL);
- //CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
-
+ CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
+ CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
+#if 0
+ //rsCStrObj *pCStr;
CHKiRet((objSerialize(pUsr))(pUsr, &pCStr));
CHKiRet(strmWrite(pThis->tVars.disk.pWrite, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr)));
- CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
+#endif
finalize_it:
return iRet;
diff --git a/stream.c b/stream.c
index 1ed46185..6f5e058a 100644
--- a/stream.c
+++ b/stream.c
@@ -104,6 +104,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
DEFiRet;
assert(pThis != NULL);
+ assert(pThis->fd != -1);
dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd);
if(pThis->tOperationsMode == STREAMMODE_WRITE)
@@ -121,6 +122,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
pThis->pszCurrFName = NULL;
}
+dbgprintf("exit strmCloseFile, fd: %d\n", pThis->fd);
return iRet;
}
@@ -128,12 +130,15 @@ static rsRetVal strmCloseFile(strm_t *pThis)
/* switch to next strm file
* This method must only be called if we are in a multi-file mode!
*/
-rsRetVal strmNextFile(strm_t *pThis)
+static rsRetVal
+strmNextFile(strm_t *pThis)
{
DEFiRet;
+dbgprintf("strmNextFile, old num %d\n", pThis->iCurrFNum);
assert(pThis != NULL);
assert(pThis->iMaxFiles != 0);
+ assert(pThis->fd != -1);
CHKiRet(strmCloseFile(pThis));
@@ -161,6 +166,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
{
DEFiRet;
int bRun;
+ long iLenRead;
assert(pThis != NULL);
assert(pC != NULL);
@@ -179,10 +185,10 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
while(bRun) {
/* first check if we need to (re)open the file (we may have switched to a new one!) */
CHKiRet(strmOpenFile(pThis));
- pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
- dbgprintf("Stream 0x%lx: read %d bytes from file %d\n", (unsigned long) pThis,
- pThis->iBufPtrMax, pThis->fd);
- if(pThis->iBufPtrMax == 0) {
+ iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize);
+ dbgprintf("Stream 0x%lx: read %ld bytes from file %d\n", (unsigned long) pThis,
+ iLenRead, pThis->fd);
+ if(iLenRead == 0) {
if(pThis->iMaxFiles == 0)
ABORT_FINALIZE(RS_RET_EOF);
else {
@@ -191,10 +197,12 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
dbgprintf("Stream 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->fd);
CHKiRet(strmNextFile(pThis));
}
- } else if(pThis->iBufPtrMax < 0)
+ } else if(iLenRead < 0)
ABORT_FINALIZE(RS_RET_IO_ERROR);
- else
+ else { /* good read */
+ pThis->iBufPtrMax = iLenRead;
bRun = 0; /* exit loop */
+ }
}
/* if we reach this point, we had a good read */
pThis->iBufPtr = 0;
@@ -317,11 +325,16 @@ rsRetVal strmDestruct(strm_t *pThis)
/* check if we need to open a new file (in output mode only).
* The decision is based on file size AND record delimition state.
+ * This method may also be called on a closed file, in which case
+ * it immediately returns.
*/
static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
{
DEFiRet;
+ if(pThis->fd == -1)
+ FINALIZE;
+
if(pThis->iCurrOffs >= pThis->iMaxFileSize) {
dbgprintf("Stream 0x%lx: max file size %ld reached for %d, now %ld - starting new file\n",
(unsigned long) pThis, (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs);
@@ -333,15 +346,19 @@ finalize_it:
}
/* write memory buffer to a stream object.
+ * To support direct writes of large objects, this method may be called
+ * with a buffer pointing to some region other than the stream buffer itself.
+ * However, in that case the stream buffer must be empty (strmFlush() has to
+ * be called before), because we would otherwise mess up with the sequence
+ * inside the stream. -- rgerhards, 2008-01-10
*/
static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
int iWritten;
-dbgprintf("strmWriteInternal()\n");
assert(pThis != NULL);
- assert(pBuf != NULL);
+ assert(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0);
if(pThis->fd == -1)
CHKiRet(strmOpenFile(pThis));
@@ -351,10 +368,21 @@ dbgprintf("strmWriteInternal()\n");
iWritten, errno, strerror(errno));
/* TODO: handle error case -- rgerhards, 2008-01-07 */
+ /* Now indicate buffer empty again. We do this in any case, because there
+ * is no way we could react more intelligently to an error during write.
+ * This MUST be done BEFORE strCheckNextOutputFile(), otherwise we have an
+ * endless loop. We reset the buffer pointer also in finalize_it - this is
+ * necessary if we run into problems. Not resetting it would again cause an
+ * endless loop. So it is better to loose some data (which also justifies
+ * duplicating that code, too...) -- rgerhards, 2008-01-10
+ */
+ pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
CHKiRet(strmCheckNextOutputFile(pThis));
finalize_it:
+ pThis->iBufPtr = 0; /* see comment above */
+
return iRet;
}
@@ -366,47 +394,90 @@ rsRetVal strmFlush(strm_t *pThis)
{
DEFiRet;
- dbgprintf("Stream 0x%lx: flush file %d, buflen %d\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
assert(pThis != NULL);
+ dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
if(pThis->iBufPtr > 0) {
iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
- /* Now indicate buffer empty again. We do this in any case, because there
- * is no way we could react more intelligently to an error during write.
- * We have not used CHKiRet(), as that would have presented some sequence
- * problems, which are not necessary to look at given what we do.
- */
- pThis->iBufPtr = 0;
}
return iRet;
}
+/* write a *single* character to a stream object -- rgerhards, 2008-01-10
+ */
+rsRetVal strmWriteChar(strm_t *pThis, uchar c)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ /* if the buffer is full, we need to flush before we can write */
+ if(pThis->iBufPtr == pThis->sIOBufSize) {
+ CHKiRet(strmFlush(pThis));
+ }
+ /* we now always have space for one character, so we simply copy it */
+ *(pThis->pIOBuf + pThis->iBufPtr) = c;
+ pThis->iBufPtr++;
+
+finalize_it:
+ return iRet;
+}
+
+
+/* write an integer value (actually a long) to a stream object */
+rsRetVal strmWriteLong(strm_t *pThis, long i)
+{
+ DEFiRet;
+ uchar szBuf[32];
+
+ assert(pThis != NULL);
+
+ CHKiRet(srUtilItoA((char*)szBuf, sizeof(szBuf), i));
+ CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf)));
+
+finalize_it:
+ return iRet;
+}
+
+
/* write memory buffer to a stream object
*/
rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
DEFiRet;
+ size_t iPartial;
-dbgprintf("strmWrite()\n");
assert(pThis != NULL);
assert(pBuf != NULL);
- /* if the string does not fit into the current buffer, we flush that buffer and
- * then do the write ourselfs. So even if we have data that is of multiple
- * buffer lengths, we will write it with a single write operation.
- * rgerhards, 2008-01-10
- */
- if(pThis->iBufPtr + lenBuf >= pThis->sIOBufSize) {
-dbgprintf("strmWrite() uses direct write\n");
- CHKiRet(strmFlush(pThis));
+ /* check if the to-be-written data is larger than our buffer size */
+ if(lenBuf >= pThis->sIOBufSize) {
+ /* it is - so we do a direct write, that is most efficient.
+ * TODO: is it really? think about disk block sizes!
+ */
+ CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */
CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf));
} else {
-dbgprintf("strmWrite() uses buffered write\n");
- /* we have space, so we simply copy over the string */
- memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
- pThis->iBufPtr += lenBuf;
+ /* data fits into a buffer - we just need to see if it
+ * fits into the current buffer...
+ */
+ if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) {
+ /* nope, so we must split it */
+ iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
+ if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial);
+ pThis->iBufPtr += iPartial;
+ }
+ CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */
+ memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial);
+ pThis->iBufPtr = lenBuf - iPartial;
+ } else {
+ /* we have space, so we simply copy over the string */
+ memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf);
+ pThis->iBufPtr += lenBuf;
+ }
}
finalize_it:
@@ -511,6 +582,7 @@ rsRetVal strmRecordBegin(strm_t *pThis)
assert(pThis != NULL);
assert(pThis->bInRecord == 0);
pThis->bInRecord = 1;
+dbgprintf("strmRecordBegin set \n");
return RS_RET_OK;
}
@@ -520,8 +592,10 @@ rsRetVal strmRecordEnd(strm_t *pThis)
assert(pThis != NULL);
assert(pThis->bInRecord == 1);
+dbgprintf("strmRecordEnd in %d\n", iRet);
pThis->bInRecord = 0;
iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */
+dbgprintf("strmRecordEnd out %d\n", iRet);
return iRet;
}
diff --git a/stream.h b/stream.h
index 89d27b54..6e901449 100644
--- a/stream.h
+++ b/stream.h
@@ -62,8 +62,7 @@ typedef enum {
typedef struct strm_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
strmType_t sType;
- int fd; /* the file descriptor, -1 if closed */
- uchar *pszCurrFName; /* name of current file (if open) */
+ /* descriptive properties */
int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */
uchar *pszDir; /* Directory */
int lenDir;
@@ -71,18 +70,21 @@ typedef struct strm_s {
int lenFilePrefix;
strmMode_t tOperationsMode;
mode_t tOpenMode;
- size_t iCurrOffs;/* current offset */
- uchar *pIOBuf; /* io Buffer */
size_t sIOBufSize;/* size of IO buffer */
- int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
- int iBufPtr; /* pointer into current buffer */
- int iUngetC; /* char set via UngetChar() call or -1 if none set */
int iFlagsOpenOS;
int iModeOpenOS;
size_t iMaxFileSize;/* maximum size a file may grow to */
int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
int iMaxFiles; /* maximum number of files if a circular mode is in use */
int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
+ /* dynamic properties, valid only during file open, not to be persistet */
+ int fd; /* the file descriptor, -1 if closed */
+ size_t iCurrOffs;/* current offset */
+ uchar *pszCurrFName; /* name of current file (if open) */
+ uchar *pIOBuf; /* io Buffer */
+ size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
+ size_t iBufPtr; /* pointer into current buffer */
+ int iUngetC; /* char set via UngetChar() call or -1 if none set */
int bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */
} strm_t;
@@ -95,11 +97,13 @@ rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal strmReadChar(strm_t *pThis, uchar *pC);
rsRetVal strmUnreadChar(strm_t *pThis, uchar c);
rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
-rsRetVal strmNextFile(strm_t *pThis);
-//rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode);
+rsRetVal strmWriteChar(strm_t *pThis, uchar c);
+rsRetVal strmWriteLong(strm_t *pThis, long i);
rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir);
rsRetVal strmFlush(strm_t *pThis);
+rsRetVal strmRecordBegin(strm_t *pThis);
+rsRetVal strmRecordEnd(strm_t *pThis);
PROTOTYPEObjClassInit(strm);
PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
PROTOTYPEpropSetMeth(strm, iMaxFileSize, int);