diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-13 15:47:41 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-13 15:47:41 +0000 |
commit | abdcea7d8f0eda058a6c6719cf24955878279d7c (patch) | |
tree | bab4f9dc61457f63e9a30516cf91eb8112fac1ef | |
parent | 366060a51de60c717886636d6ef646bf1959972c (diff) | |
download | rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.tar.gz rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.tar.xz rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.zip |
support for reading back persistet queue information completed
-rw-r--r-- | obj-types.h | 1 | ||||
-rw-r--r-- | obj.c | 12 | ||||
-rw-r--r-- | obj.h | 2 | ||||
-rw-r--r-- | queue.c | 253 | ||||
-rw-r--r-- | queue.h | 2 | ||||
-rw-r--r-- | rsyslog.h | 6 | ||||
-rw-r--r-- | stream.c | 64 | ||||
-rw-r--r-- | stream.h | 2 | ||||
-rw-r--r-- | syslogd.c | 1 |
9 files changed, 232 insertions, 111 deletions
diff --git a/obj-types.h b/obj-types.h index e3f33a4c..cd818228 100644 --- a/obj-types.h +++ b/obj-types.h @@ -116,6 +116,7 @@ typedef struct obj { /* the dummy struct that each derived class can be casted t #define DEFpropSetMeth(obj, prop, dataType)\ rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\ { \ + /* DEV debug: dbgprintf("%sSet%s()\n", #obj, #prop); */\ pThis->prop = pVal; \ return RS_RET_OK; \ } @@ -213,7 +213,6 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop assert(pszPropName != NULL); /*dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);*/ - dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr); /* if we have no user pointer, there is no need to write this property. * TODO: think if that's the righ point of view * rgerhards, 2008-01-06 @@ -620,7 +619,7 @@ finalize_it: * The caller must destruct the created object. * rgerhards, 2008-01-07 */ -rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm) +rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr) { DEFiRet; rsRetVal iRetLocal; @@ -654,6 +653,12 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm) /* we got the object, now we need to fill the properties */ CHKiRet(objDeserializeProperties(pObj, oID, pStrm)); + /* check if we need to call a fixup function that modifies the object + * before it is finalized. -- rgerhards, 2008-01-13 + */ + if(fFixup != NULL) + CHKiRet(fFixup(pObj, pUsr)); + /* we have a valid object, let's finalize our work and return */ if(objInfoIsImplemented(arrObjInfo[oID], objMethod_CONSTRUCTION_FINALIZER)) CHKiRet(arrObjInfo[oID]->objMethods[objMethod_CONSTRUCTION_FINALIZER](pObj)); @@ -661,6 +666,9 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm) *((obj_t**) ppObj) = pObj; finalize_it: + if(iRet != RS_RET_OK && pObj != NULL) + free(pObj); // TODO: check if we can call destructor 2008-01-13 rger + return iRet; } @@ -92,7 +92,7 @@ rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj); rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t propType, void *pUsr); rsRetVal objEndSerialize(strm_t *pStrm); rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo); -rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pSerStore); +rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr); rsRetVal objDeserializePropBag(obj_t *pObj, strm_t *pStrm); PROTOTYPEObjClassInit(obj); @@ -191,6 +191,104 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) /* -------------------- disk -------------------- */ + +/* This method checks if there is any persistent information on the + * queue. + */ +#if 0 +static rsRetVal +queueTryLoadPersistedInfo(queue_t *pThis) +{ + DEFiRet; + strm_t *psQIF = NULL; + uchar pszQIFNam[MAXFNAME]; + size_t lenQIFNam; + AsPropBagstruct stat stat_buf; +} +#endif + + +static rsRetVal +queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis) +{ + DEFiRet; + ISOBJ_TYPE_assert(pStrm, strm); + ISOBJ_TYPE_assert(pThis, queue); + CHKiRet(strmSetDir(pStrm, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); +finalize_it: + return iRet; +} + + +/* The method loads the persistent queue information. + * rgerhards, 2008-01-11 + */ +static rsRetVal +queueTryLoadPersistedInfo(queue_t *pThis) +{ + DEFiRet; + strm_t *psQIF = NULL; + uchar pszQIFNam[MAXFNAME]; + size_t lenQIFNam; + struct stat stat_buf; + + ISOBJ_TYPE_assert(pThis, queue); + + /* Construct file name */ + lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", + (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); + + /* check if the file exists */ +dbgprintf("stat '%s'\n", pszQIFNam); + if(stat((char*) pszQIFNam, &stat_buf) == -1) { + if(errno == ENOENT) { + dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis)); + ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + } else { + dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + } + + /* If we reach this point, we have a .qi file */ + + CHKiRet(strmConstruct(&psQIF)); + CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ)); + CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strmConstructFinalize(psQIF)); + + /* first, we try to read the property bag for ourselfs */ + CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF)); + + /* and now the stream objects (some order as when persisted!) */ + CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF, + (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + CHKiRet(objDeserialize(&pThis->tVars.disk.pRead, OBJstrm, psQIF, + (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead)); + + /* OK, we could successfully read the file, so we now can request that it be + * deleted when we are done with the persisted information. + */ + pThis->bNeedDelQIF = 1; + +finalize_it: + if(psQIF != NULL) + strmDestruct(psQIF); + + if(iRet != RS_RET_OK) { + dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n", + queueGetID(pThis), iRet); + } + + return iRet; +} + + /* disk queue constructor. * Note that we use a file limit of 10,000,000 files. That number should never pose a * problem. If so, I guess the user has a design issue... But of course, the code can @@ -201,23 +299,48 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) static rsRetVal qConstructDisk(queue_t *pThis) { DEFiRet; + int bRestarted = 0; assert(pThis != NULL); - CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite)); - CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); - CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); - CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); - - CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); - CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); - CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); - CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); + /* and now check if there is some persistent information that needs to be read in */ + iRet = queueTryLoadPersistedInfo(pThis); + if(iRet == RS_RET_OK) + bRestarted = 1; + else if(iRet != RS_RET_FILE_NOT_FOUND) + FINALIZE; + +dbgprintf("qConstructDisk: bRestarted %d, iRet %d\n", bRestarted, iRet); + if(bRestarted == 1) { + ; + } else { + CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite)); + CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); + CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); + + CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); + CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); + + + CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix)); + } + + /* now we set (and overwrite in case of a persisted restart) some parameters which + * should always reflect the current configuration variables. Be careful by doing so, + * for example file name generation must not be changed as that would break the + * ability to read existing queue files. -- rgerhards, 2008-01-12 + */ +CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); +CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); finalize_it: return iRet; @@ -254,7 +377,7 @@ finalize_it: static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) { - return objDeserialize(ppUsr, OBJMsg, pThis->tVars.disk.pRead); + return objDeserialize(ppUsr, OBJMsg, pThis->tVars.disk.pRead, NULL, NULL); } /* -------------------- direct (no queueing) -------------------- */ @@ -480,67 +603,6 @@ queueWorker(void *arg) } -/* This method checks if there is any persistent information on the - * queue and, if so, tries to load it. This method can only legally be - * called from the destructor (I moved it out from there to keep the - * Constructor code somewhat smaller). -- rgerhards, 2008-01-11 - */ -static rsRetVal -queueTryLoadPersistedInfo(queue_t *pThis) -{ - DEFiRet; - strm_t *psQIF = NULL; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; - struct stat stat_buf; - - ISOBJ_TYPE_assert(pThis, queue); - - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); - - /* check if the file exists */ -dbgprintf("stat '%s'\n", pszQIFNam); - if(stat((char*) pszQIFNam, &stat_buf) == -1) { - if(errno == ENOENT) { - dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis)); - ABORT_FINALIZE(RS_RET_OK); - } else { - dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno); - ABORT_FINALIZE(RS_RET_IO_ERROR); - } - } - - /* If we reach this point, we have a .qi file */ - - CHKiRet(strmConstruct(&psQIF)); - CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); - CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ)); - CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); - CHKiRet(strmConstructFinalize(psQIF)); - - /* first, we try to read the property bag for ourselfs */ - CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF)); - - /* and now the stream objects (some order as when persisted!) */ - CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pWrite, psQIF)); - CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pRead, psQIF)); - -finalize_it: - if(psQIF != NULL) - strmDestruct(psQIF); - - if(iRet != RS_RET_OK) { - dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n", - queueGetID(pThis), iRet); - } - - return iRet; -} - - /* Constructor for the queue object * This constructs the data structure, but does not yet start the queue. That * is done by queueStart(). The reason is that we want to give the caller a chance @@ -611,9 +673,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, break; } - /* call type-specific constructor */ - CHKiRet(pThis->qConstruct(pThis)); - finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP return iRet; @@ -631,8 +690,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ assert(pThis != NULL); - /* and now check if there is some persistent information that needs to be read in */ - CHKiRet(queueTryLoadPersistedInfo(pThis)); + /* call type-specific constructor */ + CHKiRet(pThis->qConstruct(pThis)); dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType, pThis->iMaxFileSize); @@ -681,15 +740,12 @@ finalize_it: static rsRetVal queuePersist(queue_t *pThis) { DEFiRet; - strm_t *psQIF; /* Queue Info File */ + strm_t *psQIF = NULL;; /* Queue Info File */ uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; int i; assert(pThis != NULL); - if(pThis->iQueueSize == 0) - FINALIZE; /* nothing left to do, so be happy */ - if(pThis->qType != QUEUETYPE_DISK) ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */ @@ -697,6 +753,14 @@ static rsRetVal queuePersist(queue_t *pThis) /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); + if(pThis->iQueueSize == 0) { + if(pThis->bNeedDelQIF) { + unlink((char*)pszQIFNam); + pThis->bNeedDelQIF = 0; + } + FINALIZE; /* nothing left to do, so be happy */ + } + CHKiRet(strmConstruct(&psQIF)); CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE)); @@ -715,23 +779,20 @@ static rsRetVal queuePersist(queue_t *pThis) objSerializeSCALAR_VAR(psQIF, qType, INT, i); objSerializeSCALAR(psQIF, iQueueSize, INT); CHKiRet(objEndSerialize(psQIF)); -dbgprintf("queue serial 1\n"); /* this is disk specific and must be moved to a function */ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); -dbgprintf("queue serial 2\n"); /* persist queue object itself */ - /* ready with the queue, now call driver to persist queue data */ - //iRet = ; - - /* the the input file object that it must not delete the file on close */ + /* tell the input file object that it must not delete the file on close */ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); finalize_it: - strmDestruct(psQIF); + if(psQIF != NULL) + strmDestruct(psQIF); + return iRet; } @@ -794,11 +855,7 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); pThis->lenFilePrefix = iLenPrefix; - - if(pThis->qType == QUEUETYPE_DISK) { - CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix)); - CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix)); - } + finalize_it: return iRet; } @@ -818,8 +875,6 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) } pThis->iMaxFileSize = iMaxFileSize; - if(pThis->qType == QUEUETYPE_DISK) - CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, iMaxFileSize)); finalize_it: return iRet; @@ -81,6 +81,8 @@ typedef struct queue_s { int iNumWorkerThreads;/* number of worker threads to use */ qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */ int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */ + //int bNeedPersist; /* does the queue need to be persisted on disk (updated since last persist?) */ + int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -125,7 +125,11 @@ typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */ #define CHKiRet_Hdlr(code) if((iRet = code) != RS_RET_OK) /* macro below is used in conjunction with CHKiRet_Hdlr, else use ABORT_FINALIZE */ #define FINALIZE goto finalize_it; -#define DEFiRet rsRetVal iRet = RS_RET_OK +#if 0 /* DEV debug: set to 1 to get a rough call trace -- rgerhards, 2008-01-13 */ +# define DEFiRet dbgprintf("Entering %s, line %d\n", __FILE__, __LINE__); rsRetVal iRet = RS_RET_OK +#else +# define DEFiRet rsRetVal iRet = RS_RET_OK +#endif #define ABORT_FINALIZE(errCode) \ do { \ iRet = errCode; \ @@ -70,6 +70,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) if(pThis->fd != -1) ABORT_FINALIZE(RS_RET_OK); +dbgprintf("strmOpenFile actual open %p, iFileNumDigits: %d\n", pThis, pThis->iFileNumDigits); if(pThis->pszFName == NULL) ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); @@ -85,12 +86,13 @@ static rsRetVal strmOpenFile(strm_t *pThis) if(pThis->tOperationsMode == STREAMMODE_READ) iFlags = O_RDONLY; else - iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; + //iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; + iFlags = O_WRONLY | O_CREAT; pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); if(pThis->fd == -1) { int ierrnoSave = errno; - dbgprintf("Stream 0x%lx: open error %d\n", (unsigned long) pThis, errno); + dbgprintf("Stream 0x%lx: open error %d, file '%s'\n", (unsigned long) pThis, errno, pThis->pszCurrFName); if(ierrnoSave == ENOENT) ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); else @@ -185,6 +187,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) /* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */ if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ *pC = pThis->iUngetC; + ++pThis->iCurrOffs; /* one more octet read */ pThis->iUngetC = -1; ABORT_FINALIZE(RS_RET_OK); } @@ -220,6 +223,8 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) } *pC = pThis->pIOBuf[pThis->iBufPtr++]; + ++pThis->iCurrOffs; /* one more octet read */ +//dbgprintf("ReadChar: read %c, offset %d\n", *pC, pThis->iCurrOffs); finalize_it: return iRet; @@ -235,6 +240,10 @@ rsRetVal strmUnreadChar(strm_t *pThis, uchar c) assert(pThis != NULL); assert(pThis->iUngetC == -1); pThis->iUngetC = c; + --pThis->iCurrOffs; /* one less octet read - NOTE: this can cause problems if we got a file change + and immediately do an unread and the file is on a buffer boundary and the stream is then persisted. + With the queue, this can not happen as an Unread is only done on record begin, which is never split + accross files. For other cases we accept the very remote risk. -- rgerhards, 2008-01-12 */ return RS_RET_OK; } @@ -402,6 +411,7 @@ finalize_it: return iRet; } + /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 @@ -413,7 +423,7 @@ rsRetVal strmFlush(strm_t *pThis) assert(pThis != NULL); dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); - if(pThis->iBufPtr > 0) { + if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); } @@ -421,6 +431,45 @@ rsRetVal strmFlush(strm_t *pThis) } +/* seek a stream to a specific location. Pending writes are flushed, read data + * is invalidated. + * rgerhards, 2008-01-12 + */ +static rsRetVal strmSeek(strm_t *pThis, off_t offs) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + if(pThis->fd == -1) + strmOpenFile(pThis); + else + strmFlush(pThis); + int i; + dbgprintf("Stream 0x%lx: seek file %d, pos %ld\n", (unsigned long) pThis, pThis->fd, offs); + i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error! +dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i); + pThis->iCurrOffs = offs; /* we are now at *this* offset */ + pThis->iBufPtr = 0; /* buffer invalidated */ + + return iRet; +} + + +/* seek to current offset. This is primarily a helper to readjust the OS file + * pointer after a strm object has been deserialized. + */ +rsRetVal strmSeekCurrOffs(strm_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + iRet = strmSeek(pThis, pThis->iCurrOffs); + return iRet; +} + + /* write a *single* character to a stream object -- rgerhards, 2008-01-10 */ rsRetVal strmWriteChar(strm_t *pThis, uchar c) @@ -514,6 +563,7 @@ rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { pThis->iMaxFiles = iNewVal; pThis->iFileNumDigits = getNumberDigits(iNewVal); +dbgprintf("strmSetiMaxFiles %p val %d, digits %d\n", pThis, iNewVal, pThis->iFileNumDigits); return RS_RET_OK; } @@ -633,14 +683,12 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) ISOBJ_TYPE_assert(pThis, strm); ISOBJ_TYPE_assert(pStrm, strm); -dbgprintf("strmSerialize 1\n"); + strmFlush(pThis); CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis)); -dbgprintf("strmSerialize 2\n"); objSerializeSCALAR(pStrm, iCurrFNum, INT); objSerializePTR(pStrm, pszFName, PSZ); objSerializeSCALAR(pStrm, iMaxFiles, INT); - objSerializeSCALAR(pStrm, iFileNumDigits, INT); objSerializeSCALAR(pStrm, bDeleteOnClose, INT); i = pThis->sType; @@ -662,11 +710,11 @@ dbgprintf("strmSerialize 2\n"); CHKiRet(objEndSerialize(pStrm)); finalize_it: -dbgprintf("strmSerialize out %d\n", iRet); return iRet; } +#include "stringbuf.h" /* This function can be used as a generic way to set properties. * rgerhards, 2008-01-11 @@ -698,7 +746,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp) } else if(isProp("iFileNumDigits")) { CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.vInt)); } else if(isProp("bDeleteOnClose")) { - CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.vInt)); + CHKiRet(strmSetbDeleteOnClose(pThis, pProp->val.vInt)); } finalize_it: @@ -95,6 +95,8 @@ rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize); rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName); rsRetVal strmReadChar(strm_t *pThis, uchar *pC); rsRetVal strmUnreadChar(strm_t *pThis, uchar c); +//rsRetVal strmSeek(strm_t *pThis, off_t offs); +rsRetVal strmSeekCurrOffs(strm_t *pThis); rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); rsRetVal strmWriteChar(strm_t *pThis, uchar c); rsRetVal strmWriteLong(strm_t *pThis, long i); @@ -3403,6 +3403,7 @@ init(void) # undef setQPROPstr /* ... and finally start the queue! */ +Initialized = 1; CHKiRet_Hdlr(queueStart(pMsgQueue)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet); |