From fa859275c66afc639cd3d2ea8a74cfdc63be8b99 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 10 Jan 2008 13:09:43 +0000 Subject: - added write functions for several types to stream class - changed objSerialize methods to work directly on the stream class --- msg.c | 54 ++++++++++++------------- obj.c | 135 +++++++++++++++++++++++++++------------------------------------ obj.h | 15 ++++--- queue.c | 9 +++-- stream.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++-------------- stream.h | 22 ++++++----- 6 files changed, 210 insertions(+), 157 deletions(-) diff --git a/msg.c b/msg.c index 779224ee..f5ebf36a 100644 --- a/msg.c +++ b/msg.c @@ -387,40 +387,36 @@ msg_t* MsgDup(msg_t* pOld) * during msg construction - and never again used later. * rgerhards, 2008-01-03 */ -static rsRetVal MsgSerialize(msg_t *pThis, rsCStrObj **ppCStr) +static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm) { DEFiRet; - rsCStrObj *pCStr; - assert(ppCStr != NULL); - - CHKiRet(objBeginSerialize(&pCStr, (obj_t*) pThis, 1024)); - objSerializeSCALAR(iProtocolVersion, SHORT); - objSerializeSCALAR(iSeverity, SHORT); - objSerializeSCALAR(iFacility, SHORT); - objSerializeSCALAR(msgFlags, INT); - objSerializeSCALAR(tRcvdAt, SYSLOGTIME); - objSerializeSCALAR(tTIMESTAMP, SYSLOGTIME); - - objSerializePTR(pszRawMsg, PSZ); - objSerializePTR(pszMSG, PSZ); - objSerializePTR(pszUxTradMsg, PSZ); - objSerializePTR(pszTAG, PSZ); - objSerializePTR(pszHOSTNAME, PSZ); - objSerializePTR(pszRcvFrom, PSZ); - - objSerializePTR(pCSStrucData, CSTR); - objSerializePTR(pCSAPPNAME, CSTR); - objSerializePTR(pCSPROCID, CSTR); - objSerializePTR(pCSMSGID, CSTR); - - CHKiRet(objEndSerialize((&pCStr), (obj_t*) pThis)); - *ppCStr = pCStr; + assert(pThis != NULL); + assert(pStrm != NULL); + + CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis)); + objSerializeSCALAR(pStrm, iProtocolVersion, SHORT); + objSerializeSCALAR(pStrm, iSeverity, SHORT); + objSerializeSCALAR(pStrm, iFacility, SHORT); + objSerializeSCALAR(pStrm, msgFlags, INT); + objSerializeSCALAR(pStrm, tRcvdAt, SYSLOGTIME); + objSerializeSCALAR(pStrm, tTIMESTAMP, SYSLOGTIME); + + objSerializePTR(pStrm, pszRawMsg, PSZ); + objSerializePTR(pStrm, pszMSG, PSZ); + objSerializePTR(pStrm, pszUxTradMsg, PSZ); + objSerializePTR(pStrm, pszTAG, PSZ); + objSerializePTR(pStrm, pszHOSTNAME, PSZ); + objSerializePTR(pStrm, pszRcvFrom, PSZ); + + objSerializePTR(pStrm, pCSStrucData, CSTR); + objSerializePTR(pStrm, pCSAPPNAME, CSTR); + objSerializePTR(pStrm, pCSPROCID, CSTR); + objSerializePTR(pStrm, pCSMSGID, CSTR); + + CHKiRet(objEndSerialize(pStrm, (obj_t*) pThis)); finalize_it: - if(iRet != RS_RET_OK && pCStr != NULL) - rsCStrDestruct(pCStr); - return iRet; } diff --git a/obj.c b/obj.c index eb9fd2c9..c68f2c24 100644 --- a/obj.c +++ b/obj.c @@ -118,6 +118,38 @@ rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pH /* --------------- object serializiation / deserialization support --------------- */ + +/* serialize the header of an object */ +static rsRetVal objSerializeHeader(strm_t *pStrm, obj_t *pObj) +{ + DEFiRet; + + assert(pStrm != NULL); + assert(pObj != NULL); + + /* object cookie and serializer version (so far always 1) */ + CHKiRet(strmWriteChar(pStrm, COOKIE_OBJLINE)); + CHKiRet(strmWrite(pStrm, (uchar*) "Obj1", sizeof("Obj1") - 1)); + + /* object type, version and string length */ + CHKiRet(strmWriteChar(pStrm, ':')); + CHKiRet(strmWriteLong(pStrm, objGetObjID(pObj))); + CHKiRet(strmWriteChar(pStrm, ':')); + CHKiRet(strmWriteLong(pStrm, objGetVersion(pObj))); + + /* and finally we write the object name - this is primarily meant for + * human readers. The idea is that it can be easily skipped when reading + * the object back in + */ + CHKiRet(strmWriteChar(pStrm, ':')); + CHKiRet(strmWrite(pStrm, objGetName(pObj), strlen((char*)objGetName(pObj)))); + /* record trailer */ + CHKiRet(strmWriteChar(pStrm, ':')); + CHKiRet(strmWriteChar(pStrm, '\n')); + +finalize_it: + return iRet; +} /* begin serialization of an object - this is a very simple hook. It once wrote the wrapper, * now it only constructs the string object. We still leave it in here so that we may utilize * it in the future (it is a nice abstraction). iExpcectedObjSize is an optimization setting. @@ -127,17 +159,15 @@ rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pH * the caller is advised to be conservative in guessing. Binary multiples are recommended. * rgerhards, 2008-01-06 */ -rsRetVal objBeginSerialize(rsCStrObj **ppCStr, obj_t *pObj, size_t iExpectedObjSize) +rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj) { DEFiRet; - assert(ppCStr != NULL); + assert(pStrm != NULL); assert(pObj != NULL); - - if((*ppCStr = rsCStrConstruct()) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - - rsCStrSetAllocIncrement(*ppCStr, iExpectedObjSize); + + CHKiRet(strmRecordBegin(pStrm)); + CHKiRet(objSerializeHeader(pStrm, pObj)); finalize_it: return iRet; @@ -146,14 +176,14 @@ finalize_it: /* append a property */ -rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t propType, void *pUsr) +rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t propType, void *pUsr) { DEFiRet; uchar *pszBuf = NULL; size_t lenBuf = 0; uchar szBuf[64]; - assert(pCStr != NULL); + assert(pStrm != NULL); assert(pszPropName != NULL); /* if we have no user pointer, there is no need to write this property. @@ -209,64 +239,23 @@ rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t p } /* cookie */ - CHKiRet(rsCStrAppendChar(pCStr, COOKIE_PROPLINE)); + CHKiRet(strmWriteChar(pStrm, COOKIE_PROPLINE)); /* name */ - CHKiRet(rsCStrAppendStr(pCStr, pszPropName)); - CHKiRet(rsCStrAppendChar(pCStr, ':')); + CHKiRet(strmWrite(pStrm, pszPropName, strlen((char*)pszPropName))); + CHKiRet(strmWriteChar(pStrm, ':')); /* type */ - CHKiRet(rsCStrAppendInt(pCStr, (int) propType)); - CHKiRet(rsCStrAppendChar(pCStr, ':')); + CHKiRet(strmWriteLong(pStrm, (int) propType)); + CHKiRet(strmWriteChar(pStrm, ':')); /* length */ - CHKiRet(rsCStrAppendInt(pCStr, lenBuf)); - CHKiRet(rsCStrAppendChar(pCStr, ':')); + CHKiRet(strmWriteLong(pStrm, lenBuf)); + CHKiRet(strmWriteChar(pStrm, ':')); /* data */ - CHKiRet(rsCStrAppendStrWithLen(pCStr, (uchar*) pszBuf, lenBuf)); + CHKiRet(strmWrite(pStrm, (uchar*) pszBuf, lenBuf)); /* trailer */ - CHKiRet(rsCStrAppendChar(pCStr, ':')); - CHKiRet(rsCStrAppendChar(pCStr, '\n')); - -finalize_it: - return iRet; -} - - -static rsRetVal objSerializeHeader(rsCStrObj **ppCStr, obj_t *pObj, rsCStrObj *pCSObjString, size_t iAllocIncrement) -{ - DEFiRet; - rsCStrObj *pCStr; - - assert(ppCStr != NULL); - assert(pObj != NULL); - - if((pCStr = rsCStrConstruct()) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - rsCStrSetAllocIncrement(pCStr, iAllocIncrement); - - /* object cookie and serializer version (so far always 1) */ - CHKiRet(rsCStrAppendChar(pCStr, COOKIE_OBJLINE)); - CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "Obj1")); - - /* object type, version and string length */ - CHKiRet(rsCStrAppendChar(pCStr, ':')); - CHKiRet(rsCStrAppendInt(pCStr, objGetObjID(pObj))); - CHKiRet(rsCStrAppendChar(pCStr, ':')); - CHKiRet(rsCStrAppendInt(pCStr, objGetVersion(pObj))); - CHKiRet(rsCStrAppendChar(pCStr, ':')); - CHKiRet(rsCStrAppendInt(pCStr, rsCStrLen(pCSObjString))); - - /* and finally we write the object name - this is primarily meant for - * human readers. The idea is that it can be easily skipped when reading - * the object back in - */ - CHKiRet(rsCStrAppendChar(pCStr, ':')); - CHKiRet(rsCStrAppendStr(pCStr, objGetName(pObj))); - /* record trailer */ - CHKiRet(rsCStrAppendChar(pCStr, ':')); - CHKiRet(rsCStrAppendChar(pCStr, '\n')); - - *ppCStr = pCStr; + CHKiRet(strmWriteChar(pStrm, ':')); + CHKiRet(strmWriteChar(pStrm, '\n')); finalize_it: return iRet; @@ -276,27 +265,20 @@ finalize_it: /* end serialization of an object. The caller receives a * standard C string, which he must free when no longer needed. */ -rsRetVal objEndSerialize(rsCStrObj **ppCStr, obj_t *pObj) +rsRetVal objEndSerialize(strm_t *pStrm, obj_t *pObj) { DEFiRet; - rsCStrObj *pCStr = NULL; - assert(ppCStr != NULL); - CHKiRet(objSerializeHeader(&pCStr, pObj, *ppCStr, rsCStrGetAllocIncrement(*ppCStr))); + assert(pStrm != NULL); - CHKiRet(rsCStrAppendStrWithLen(pCStr, rsCStrGetBufBeg(*ppCStr), rsCStrLen(*ppCStr))); - CHKiRet(rsCStrAppendChar(pCStr, COOKIE_ENDLINE)); - CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "EndObj\n")); - CHKiRet(rsCStrAppendChar(pCStr, COOKIE_BLANKLINE)); - CHKiRet(rsCStrAppendChar(pCStr, '\n')); - CHKiRet(rsCStrFinish(pCStr)); + CHKiRet(strmWriteChar(pStrm, COOKIE_ENDLINE)); + CHKiRet(strmWrite(pStrm, (uchar*) "End\n", sizeof("END\n") - 1)); + CHKiRet(strmWriteChar(pStrm, COOKIE_BLANKLINE)); + CHKiRet(strmWriteChar(pStrm, '\n')); - rsCStrDestruct(*ppCStr); - *ppCStr = pCStr; + CHKiRet(strmRecordEnd(pStrm)); finalize_it: - if(iRet != RS_RET_OK && pCStr != NULL) - rsCStrDestruct(pCStr); return iRet; } @@ -415,7 +397,7 @@ static rsRetVal objDeserializeHeader(objID_t *poID, int* poVers, strm_t *pStrm) NEXTC; if(c != '1') ABORT_FINALIZE(RS_RET_INVALID_HEADER_VERS); NEXTC; if(c != ':') ABORT_FINALIZE(RS_RET_INVALID_HEADER_VERS); - /* object type and version and string length */ + /* object type and version */ CHKiRet(objDeserializeLong(&ioID, pStrm)); CHKiRet(objDeserializeLong(&oVers, pStrm)); @@ -522,9 +504,6 @@ static rsRetVal objDeserializeTrailer(strm_t *pStrm) NEXTC; if(c != 'E') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != 'n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != 'd') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); - NEXTC; if(c != 'O') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); - NEXTC; if(c != 'b') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); - NEXTC; if(c != 'j') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != COOKIE_BLANKLINE) ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); diff --git a/obj.h b/obj.h index ab9470ac..c27e228c 100644 --- a/obj.h +++ b/obj.h @@ -59,10 +59,10 @@ free(pThis); \ } -#define objSerializeSCALAR(propName, propType) \ - CHKiRet(objSerializeProp(pCStr, (uchar*) #propName, PROPTYPE_##propType, (void*) &pThis->propName)); -#define objSerializePTR(propName, propType) \ - CHKiRet(objSerializeProp(pCStr, (uchar*) #propName, PROPTYPE_##propType, (void*) pThis->propName)); +#define objSerializeSCALAR(strm, propName, propType) \ + CHKiRet(objSerializeProp(strm, (uchar*) #propName, PROPTYPE_##propType, (void*) &pThis->propName)); +#define objSerializePTR(strm, propName, propType) \ + CHKiRet(objSerializeProp(strm, (uchar*) #propName, PROPTYPE_##propType, (void*) pThis->propName)); #define DEFobjStaticHelpers static objInfo_t *pObjInfoOBJ = NULL; #define objGetName(pThis) (((obj_t*) (pThis))->pObjInfo->pszName) #define objGetObjID(pThis) (((obj_t*) (pThis))->pObjInfo->objID) @@ -78,10 +78,9 @@ /* prototypes */ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int iObjVers, rsRetVal (*pConstruct)(void *), rsRetVal (*pDestruct)(void *)); rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pHandler)(void*)); -rsRetVal objBeginSerialize(rsCStrObj **ppCStr, obj_t *pObj, size_t iExpectedObjSize); -rsRetVal objSerializePsz(rsCStrObj *pCStr, uchar *psz, size_t len); -rsRetVal objEndSerialize(rsCStrObj **ppCStr, obj_t *pObj); -rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t propType, void *pUsr); +rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj); +rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t propType, void *pUsr); +rsRetVal objEndSerialize(strm_t *pStrm, obj_t *pObj); rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo); rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pSerStore); PROTOTYPEObjClassInit(obj); diff --git a/queue.c b/queue.c index 6ff4358f..a716c725 100644 --- a/queue.c +++ b/queue.c @@ -238,15 +238,16 @@ static rsRetVal qDestructDisk(queue_t *pThis) static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; - rsCStrObj *pCStr; assert(pThis != NULL); - //CHKiRet(strmOpenFile(pThis->tVars.disk.pWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! - + CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); + CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); +#if 0 + //rsCStrObj *pCStr; CHKiRet((objSerialize(pUsr))(pUsr, &pCStr)); CHKiRet(strmWrite(pThis->tVars.disk.pWrite, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr))); - CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); +#endif finalize_it: return iRet; diff --git a/stream.c b/stream.c index 1ed46185..6f5e058a 100644 --- a/stream.c +++ b/stream.c @@ -104,6 +104,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) DEFiRet; assert(pThis != NULL); + assert(pThis->fd != -1); dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd); if(pThis->tOperationsMode == STREAMMODE_WRITE) @@ -121,6 +122,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) pThis->pszCurrFName = NULL; } +dbgprintf("exit strmCloseFile, fd: %d\n", pThis->fd); return iRet; } @@ -128,12 +130,15 @@ static rsRetVal strmCloseFile(strm_t *pThis) /* switch to next strm file * This method must only be called if we are in a multi-file mode! */ -rsRetVal strmNextFile(strm_t *pThis) +static rsRetVal +strmNextFile(strm_t *pThis) { DEFiRet; +dbgprintf("strmNextFile, old num %d\n", pThis->iCurrFNum); assert(pThis != NULL); assert(pThis->iMaxFiles != 0); + assert(pThis->fd != -1); CHKiRet(strmCloseFile(pThis)); @@ -161,6 +166,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) { DEFiRet; int bRun; + long iLenRead; assert(pThis != NULL); assert(pC != NULL); @@ -179,10 +185,10 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) while(bRun) { /* first check if we need to (re)open the file (we may have switched to a new one!) */ CHKiRet(strmOpenFile(pThis)); - pThis->iBufPtrMax = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); - dbgprintf("Stream 0x%lx: read %d bytes from file %d\n", (unsigned long) pThis, - pThis->iBufPtrMax, pThis->fd); - if(pThis->iBufPtrMax == 0) { + iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); + dbgprintf("Stream 0x%lx: read %ld bytes from file %d\n", (unsigned long) pThis, + iLenRead, pThis->fd); + if(iLenRead == 0) { if(pThis->iMaxFiles == 0) ABORT_FINALIZE(RS_RET_EOF); else { @@ -191,10 +197,12 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) dbgprintf("Stream 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->fd); CHKiRet(strmNextFile(pThis)); } - } else if(pThis->iBufPtrMax < 0) + } else if(iLenRead < 0) ABORT_FINALIZE(RS_RET_IO_ERROR); - else + else { /* good read */ + pThis->iBufPtrMax = iLenRead; bRun = 0; /* exit loop */ + } } /* if we reach this point, we had a good read */ pThis->iBufPtr = 0; @@ -317,11 +325,16 @@ rsRetVal strmDestruct(strm_t *pThis) /* check if we need to open a new file (in output mode only). * The decision is based on file size AND record delimition state. + * This method may also be called on a closed file, in which case + * it immediately returns. */ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) { DEFiRet; + if(pThis->fd == -1) + FINALIZE; + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { dbgprintf("Stream 0x%lx: max file size %ld reached for %d, now %ld - starting new file\n", (unsigned long) pThis, (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); @@ -333,15 +346,19 @@ finalize_it: } /* write memory buffer to a stream object. + * To support direct writes of large objects, this method may be called + * with a buffer pointing to some region other than the stream buffer itself. + * However, in that case the stream buffer must be empty (strmFlush() has to + * be called before), because we would otherwise mess up with the sequence + * inside the stream. -- rgerhards, 2008-01-10 */ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; int iWritten; -dbgprintf("strmWriteInternal()\n"); assert(pThis != NULL); - assert(pBuf != NULL); + assert(pBuf == pThis->pIOBuf || pThis->iBufPtr == 0); if(pThis->fd == -1) CHKiRet(strmOpenFile(pThis)); @@ -351,10 +368,21 @@ dbgprintf("strmWriteInternal()\n"); iWritten, errno, strerror(errno)); /* TODO: handle error case -- rgerhards, 2008-01-07 */ + /* Now indicate buffer empty again. We do this in any case, because there + * is no way we could react more intelligently to an error during write. + * This MUST be done BEFORE strCheckNextOutputFile(), otherwise we have an + * endless loop. We reset the buffer pointer also in finalize_it - this is + * necessary if we run into problems. Not resetting it would again cause an + * endless loop. So it is better to loose some data (which also justifies + * duplicating that code, too...) -- rgerhards, 2008-01-10 + */ + pThis->iBufPtr = 0; pThis->iCurrOffs += iWritten; CHKiRet(strmCheckNextOutputFile(pThis)); finalize_it: + pThis->iBufPtr = 0; /* see comment above */ + return iRet; } @@ -366,47 +394,90 @@ rsRetVal strmFlush(strm_t *pThis) { DEFiRet; - dbgprintf("Stream 0x%lx: flush file %d, buflen %d\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); assert(pThis != NULL); + dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); if(pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); - /* Now indicate buffer empty again. We do this in any case, because there - * is no way we could react more intelligently to an error during write. - * We have not used CHKiRet(), as that would have presented some sequence - * problems, which are not necessary to look at given what we do. - */ - pThis->iBufPtr = 0; } return iRet; } +/* write a *single* character to a stream object -- rgerhards, 2008-01-10 + */ +rsRetVal strmWriteChar(strm_t *pThis, uchar c) +{ + DEFiRet; + + assert(pThis != NULL); + + /* if the buffer is full, we need to flush before we can write */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlush(pThis)); + } + /* we now always have space for one character, so we simply copy it */ + *(pThis->pIOBuf + pThis->iBufPtr) = c; + pThis->iBufPtr++; + +finalize_it: + return iRet; +} + + +/* write an integer value (actually a long) to a stream object */ +rsRetVal strmWriteLong(strm_t *pThis, long i) +{ + DEFiRet; + uchar szBuf[32]; + + assert(pThis != NULL); + + CHKiRet(srUtilItoA((char*)szBuf, sizeof(szBuf), i)); + CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf))); + +finalize_it: + return iRet; +} + + /* write memory buffer to a stream object */ rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { DEFiRet; + size_t iPartial; -dbgprintf("strmWrite()\n"); assert(pThis != NULL); assert(pBuf != NULL); - /* if the string does not fit into the current buffer, we flush that buffer and - * then do the write ourselfs. So even if we have data that is of multiple - * buffer lengths, we will write it with a single write operation. - * rgerhards, 2008-01-10 - */ - if(pThis->iBufPtr + lenBuf >= pThis->sIOBufSize) { -dbgprintf("strmWrite() uses direct write\n"); - CHKiRet(strmFlush(pThis)); + /* check if the to-be-written data is larger than our buffer size */ + if(lenBuf >= pThis->sIOBufSize) { + /* it is - so we do a direct write, that is most efficient. + * TODO: is it really? think about disk block sizes! + */ + CHKiRet(strmFlush(pThis)); /* we need to flush first!!! */ CHKiRet(strmWriteInternal(pThis, pBuf, lenBuf)); } else { -dbgprintf("strmWrite() uses buffered write\n"); - /* we have space, so we simply copy over the string */ - memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); - pThis->iBufPtr += lenBuf; + /* data fits into a buffer - we just need to see if it + * fits into the current buffer... + */ + if(pThis->iBufPtr + lenBuf > pThis->sIOBufSize) { + /* nope, so we must split it */ + iPartial = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iPartial > 0) { /* the buffer was exactly full, can not write anything! */ + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, iPartial); + pThis->iBufPtr += iPartial; + } + CHKiRet(strmFlush(pThis)); /* get a new buffer for rest of data */ + memcpy(pThis->pIOBuf, pBuf + iPartial, lenBuf - iPartial); + pThis->iBufPtr = lenBuf - iPartial; + } else { + /* we have space, so we simply copy over the string */ + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf, lenBuf); + pThis->iBufPtr += lenBuf; + } } finalize_it: @@ -511,6 +582,7 @@ rsRetVal strmRecordBegin(strm_t *pThis) assert(pThis != NULL); assert(pThis->bInRecord == 0); pThis->bInRecord = 1; +dbgprintf("strmRecordBegin set \n"); return RS_RET_OK; } @@ -520,8 +592,10 @@ rsRetVal strmRecordEnd(strm_t *pThis) assert(pThis != NULL); assert(pThis->bInRecord == 1); +dbgprintf("strmRecordEnd in %d\n", iRet); pThis->bInRecord = 0; iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */ +dbgprintf("strmRecordEnd out %d\n", iRet); return iRet; } diff --git a/stream.h b/stream.h index 89d27b54..6e901449 100644 --- a/stream.h +++ b/stream.h @@ -62,8 +62,7 @@ typedef enum { typedef struct strm_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ strmType_t sType; - int fd; /* the file descriptor, -1 if closed */ - uchar *pszCurrFName; /* name of current file (if open) */ + /* descriptive properties */ int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */ uchar *pszDir; /* Directory */ int lenDir; @@ -71,18 +70,21 @@ typedef struct strm_s { int lenFilePrefix; strmMode_t tOperationsMode; mode_t tOpenMode; - size_t iCurrOffs;/* current offset */ - uchar *pIOBuf; /* io Buffer */ size_t sIOBufSize;/* size of IO buffer */ - int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ - int iBufPtr; /* pointer into current buffer */ - int iUngetC; /* char set via UngetChar() call or -1 if none set */ int iFlagsOpenOS; int iModeOpenOS; size_t iMaxFileSize;/* maximum size a file may grow to */ int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ int iMaxFiles; /* maximum number of files if a circular mode is in use */ int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */ + /* dynamic properties, valid only during file open, not to be persistet */ + int fd; /* the file descriptor, -1 if closed */ + size_t iCurrOffs;/* current offset */ + uchar *pszCurrFName; /* name of current file (if open) */ + uchar *pIOBuf; /* io Buffer */ + size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ + size_t iBufPtr; /* pointer into current buffer */ + int iUngetC; /* char set via UngetChar() call or -1 if none set */ int bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */ } strm_t; @@ -95,11 +97,13 @@ rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal strmReadChar(strm_t *pThis, uchar *pC); rsRetVal strmUnreadChar(strm_t *pThis, uchar c); rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); -rsRetVal strmNextFile(strm_t *pThis); -//rsRetVal strmOpenFile(strm_t *pThis, int flags, mode_t mode); +rsRetVal strmWriteChar(strm_t *pThis, uchar c); +rsRetVal strmWriteLong(strm_t *pThis, long i); rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir); rsRetVal strmFlush(strm_t *pThis); +rsRetVal strmRecordBegin(strm_t *pThis); +rsRetVal strmRecordEnd(strm_t *pThis); PROTOTYPEObjClassInit(strm); PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int); PROTOTYPEpropSetMeth(strm, iMaxFileSize, int); -- cgit