summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-13 15:47:41 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-13 15:47:41 +0000
commitabdcea7d8f0eda058a6c6719cf24955878279d7c (patch)
treebab4f9dc61457f63e9a30516cf91eb8112fac1ef
parent366060a51de60c717886636d6ef646bf1959972c (diff)
downloadrsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.tar.gz
rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.tar.xz
rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.zip
support for reading back persistet queue information completed
-rw-r--r--obj-types.h1
-rw-r--r--obj.c12
-rw-r--r--obj.h2
-rw-r--r--queue.c253
-rw-r--r--queue.h2
-rw-r--r--rsyslog.h6
-rw-r--r--stream.c64
-rw-r--r--stream.h2
-rw-r--r--syslogd.c1
9 files changed, 232 insertions, 111 deletions
diff --git a/obj-types.h b/obj-types.h
index e3f33a4c..cd818228 100644
--- a/obj-types.h
+++ b/obj-types.h
@@ -116,6 +116,7 @@ typedef struct obj { /* the dummy struct that each derived class can be casted t
#define DEFpropSetMeth(obj, prop, dataType)\
rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\
{ \
+ /* DEV debug: dbgprintf("%sSet%s()\n", #obj, #prop); */\
pThis->prop = pVal; \
return RS_RET_OK; \
}
diff --git a/obj.c b/obj.c
index 3d4acb06..3a9098a0 100644
--- a/obj.c
+++ b/obj.c
@@ -213,7 +213,6 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop
assert(pszPropName != NULL);
/*dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);*/
- dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);
/* if we have no user pointer, there is no need to write this property.
* TODO: think if that's the righ point of view
* rgerhards, 2008-01-06
@@ -620,7 +619,7 @@ finalize_it:
* The caller must destruct the created object.
* rgerhards, 2008-01-07
*/
-rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm)
+rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
@@ -654,6 +653,12 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm)
/* we got the object, now we need to fill the properties */
CHKiRet(objDeserializeProperties(pObj, oID, pStrm));
+ /* check if we need to call a fixup function that modifies the object
+ * before it is finalized. -- rgerhards, 2008-01-13
+ */
+ if(fFixup != NULL)
+ CHKiRet(fFixup(pObj, pUsr));
+
/* we have a valid object, let's finalize our work and return */
if(objInfoIsImplemented(arrObjInfo[oID], objMethod_CONSTRUCTION_FINALIZER))
CHKiRet(arrObjInfo[oID]->objMethods[objMethod_CONSTRUCTION_FINALIZER](pObj));
@@ -661,6 +666,9 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm)
*((obj_t**) ppObj) = pObj;
finalize_it:
+ if(iRet != RS_RET_OK && pObj != NULL)
+ free(pObj); // TODO: check if we can call destructor 2008-01-13 rger
+
return iRet;
}
diff --git a/obj.h b/obj.h
index 776d2030..acdbd24e 100644
--- a/obj.h
+++ b/obj.h
@@ -92,7 +92,7 @@ rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj);
rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t propType, void *pUsr);
rsRetVal objEndSerialize(strm_t *pStrm);
rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo);
-rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pSerStore);
+rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr);
rsRetVal objDeserializePropBag(obj_t *pObj, strm_t *pStrm);
PROTOTYPEObjClassInit(obj);
diff --git a/queue.c b/queue.c
index b31173d9..d9a3f130 100644
--- a/queue.c
+++ b/queue.c
@@ -191,6 +191,104 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
/* -------------------- disk -------------------- */
+
+/* This method checks if there is any persistent information on the
+ * queue.
+ */
+#if 0
+static rsRetVal
+queueTryLoadPersistedInfo(queue_t *pThis)
+{
+ DEFiRet;
+ strm_t *psQIF = NULL;
+ uchar pszQIFNam[MAXFNAME];
+ size_t lenQIFNam;
+ AsPropBagstruct stat stat_buf;
+}
+#endif
+
+
+static rsRetVal
+queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pStrm, strm);
+ ISOBJ_TYPE_assert(pThis, queue);
+ CHKiRet(strmSetDir(pStrm, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+finalize_it:
+ return iRet;
+}
+
+
+/* The method loads the persistent queue information.
+ * rgerhards, 2008-01-11
+ */
+static rsRetVal
+queueTryLoadPersistedInfo(queue_t *pThis)
+{
+ DEFiRet;
+ strm_t *psQIF = NULL;
+ uchar pszQIFNam[MAXFNAME];
+ size_t lenQIFNam;
+ struct stat stat_buf;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ /* Construct file name */
+ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
+ (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+
+ /* check if the file exists */
+dbgprintf("stat '%s'\n", pszQIFNam);
+ if(stat((char*) pszQIFNam, &stat_buf) == -1) {
+ if(errno == ENOENT) {
+ dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis));
+ ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
+ } else {
+ dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno);
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+ }
+
+ /* If we reach this point, we have a .qi file */
+
+ CHKiRet(strmConstruct(&psQIF));
+ CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ));
+ CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
+ CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strmConstructFinalize(psQIF));
+
+ /* first, we try to read the property bag for ourselfs */
+ CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
+
+ /* and now the stream objects (some order as when persisted!) */
+ CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF,
+ (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+ CHKiRet(objDeserialize(&pThis->tVars.disk.pRead, OBJstrm, psQIF,
+ (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
+
+ /* OK, we could successfully read the file, so we now can request that it be
+ * deleted when we are done with the persisted information.
+ */
+ pThis->bNeedDelQIF = 1;
+
+finalize_it:
+ if(psQIF != NULL)
+ strmDestruct(psQIF);
+
+ if(iRet != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n",
+ queueGetID(pThis), iRet);
+ }
+
+ return iRet;
+}
+
+
/* disk queue constructor.
* Note that we use a file limit of 10,000,000 files. That number should never pose a
* problem. If so, I guess the user has a design issue... But of course, the code can
@@ -201,23 +299,48 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
static rsRetVal qConstructDisk(queue_t *pThis)
{
DEFiRet;
+ int bRestarted = 0;
assert(pThis != NULL);
- CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite));
- CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
- CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
-
- CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
- CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
- CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
+ /* and now check if there is some persistent information that needs to be read in */
+ iRet = queueTryLoadPersistedInfo(pThis);
+ if(iRet == RS_RET_OK)
+ bRestarted = 1;
+ else if(iRet != RS_RET_FILE_NOT_FOUND)
+ FINALIZE;
+
+dbgprintf("qConstructDisk: bRestarted %d, iRet %d\n", bRestarted, iRet);
+ if(bRestarted == 1) {
+ ;
+ } else {
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
+
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
+
+
+ CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ }
+
+ /* now we set (and overwrite in case of a persisted restart) some parameters which
+ * should always reflect the current configuration variables. Be careful by doing so,
+ * for example file name generation must not be changed as that would break the
+ * ability to read existing queue files. -- rgerhards, 2008-01-12
+ */
+CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
+CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
finalize_it:
return iRet;
@@ -254,7 +377,7 @@ finalize_it:
static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
{
- return objDeserialize(ppUsr, OBJMsg, pThis->tVars.disk.pRead);
+ return objDeserialize(ppUsr, OBJMsg, pThis->tVars.disk.pRead, NULL, NULL);
}
/* -------------------- direct (no queueing) -------------------- */
@@ -480,67 +603,6 @@ queueWorker(void *arg)
}
-/* This method checks if there is any persistent information on the
- * queue and, if so, tries to load it. This method can only legally be
- * called from the destructor (I moved it out from there to keep the
- * Constructor code somewhat smaller). -- rgerhards, 2008-01-11
- */
-static rsRetVal
-queueTryLoadPersistedInfo(queue_t *pThis)
-{
- DEFiRet;
- strm_t *psQIF = NULL;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
- struct stat stat_buf;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
-
- /* check if the file exists */
-dbgprintf("stat '%s'\n", pszQIFNam);
- if(stat((char*) pszQIFNam, &stat_buf) == -1) {
- if(errno == ENOENT) {
- dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis));
- ABORT_FINALIZE(RS_RET_OK);
- } else {
- dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno);
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- }
- }
-
- /* If we reach this point, we have a .qi file */
-
- CHKiRet(strmConstruct(&psQIF));
- CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
- CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ));
- CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
- CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
- CHKiRet(strmConstructFinalize(psQIF));
-
- /* first, we try to read the property bag for ourselfs */
- CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
-
- /* and now the stream objects (some order as when persisted!) */
- CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pWrite, psQIF));
- CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pRead, psQIF));
-
-finalize_it:
- if(psQIF != NULL)
- strmDestruct(psQIF);
-
- if(iRet != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n",
- queueGetID(pThis), iRet);
- }
-
- return iRet;
-}
-
-
/* Constructor for the queue object
* This constructs the data structure, but does not yet start the queue. That
* is done by queueStart(). The reason is that we want to give the caller a chance
@@ -611,9 +673,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
break;
}
- /* call type-specific constructor */
- CHKiRet(pThis->qConstruct(pThis));
-
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
return iRet;
@@ -631,8 +690,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
assert(pThis != NULL);
- /* and now check if there is some persistent information that needs to be read in */
- CHKiRet(queueTryLoadPersistedInfo(pThis));
+ /* call type-specific constructor */
+ CHKiRet(pThis->qConstruct(pThis));
dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType,
pThis->iMaxFileSize);
@@ -681,15 +740,12 @@ finalize_it:
static rsRetVal queuePersist(queue_t *pThis)
{
DEFiRet;
- strm_t *psQIF; /* Queue Info File */
+ strm_t *psQIF = NULL;; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
int i;
assert(pThis != NULL);
- if(pThis->iQueueSize == 0)
- FINALIZE; /* nothing left to do, so be happy */
-
if(pThis->qType != QUEUETYPE_DISK)
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
@@ -697,6 +753,14 @@ static rsRetVal queuePersist(queue_t *pThis)
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+ if(pThis->iQueueSize == 0) {
+ if(pThis->bNeedDelQIF) {
+ unlink((char*)pszQIFNam);
+ pThis->bNeedDelQIF = 0;
+ }
+ FINALIZE; /* nothing left to do, so be happy */
+ }
+
CHKiRet(strmConstruct(&psQIF));
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
@@ -715,23 +779,20 @@ static rsRetVal queuePersist(queue_t *pThis)
objSerializeSCALAR_VAR(psQIF, qType, INT, i);
objSerializeSCALAR(psQIF, iQueueSize, INT);
CHKiRet(objEndSerialize(psQIF));
-dbgprintf("queue serial 1\n");
/* this is disk specific and must be moved to a function */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
-dbgprintf("queue serial 2\n");
/* persist queue object itself */
- /* ready with the queue, now call driver to persist queue data */
- //iRet = ;
-
- /* the the input file object that it must not delete the file on close */
+ /* tell the input file object that it must not delete the file on close */
CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
finalize_it:
- strmDestruct(psQIF);
+ if(psQIF != NULL)
+ strmDestruct(psQIF);
+
return iRet;
}
@@ -794,11 +855,7 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1);
pThis->lenFilePrefix = iLenPrefix;
-
- if(pThis->qType == QUEUETYPE_DISK) {
- CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix));
- CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix));
- }
+
finalize_it:
return iRet;
}
@@ -818,8 +875,6 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
}
pThis->iMaxFileSize = iMaxFileSize;
- if(pThis->qType == QUEUETYPE_DISK)
- CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, iMaxFileSize));
finalize_it:
return iRet;
diff --git a/queue.h b/queue.h
index 0fda1e50..6593ce84 100644
--- a/queue.h
+++ b/queue.h
@@ -81,6 +81,8 @@ typedef struct queue_s {
int iNumWorkerThreads;/* number of worker threads to use */
qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */
int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */
+ //int bNeedPersist; /* does the queue need to be persisted on disk (updated since last persist?) */
+ int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
diff --git a/rsyslog.h b/rsyslog.h
index 2937db88..1514cc3e 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -125,7 +125,11 @@ typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */
#define CHKiRet_Hdlr(code) if((iRet = code) != RS_RET_OK)
/* macro below is used in conjunction with CHKiRet_Hdlr, else use ABORT_FINALIZE */
#define FINALIZE goto finalize_it;
-#define DEFiRet rsRetVal iRet = RS_RET_OK
+#if 0 /* DEV debug: set to 1 to get a rough call trace -- rgerhards, 2008-01-13 */
+# define DEFiRet dbgprintf("Entering %s, line %d\n", __FILE__, __LINE__); rsRetVal iRet = RS_RET_OK
+#else
+# define DEFiRet rsRetVal iRet = RS_RET_OK
+#endif
#define ABORT_FINALIZE(errCode) \
do { \
iRet = errCode; \
diff --git a/stream.c b/stream.c
index 59067041..eb9fb960 100644
--- a/stream.c
+++ b/stream.c
@@ -70,6 +70,7 @@ static rsRetVal strmOpenFile(strm_t *pThis)
if(pThis->fd != -1)
ABORT_FINALIZE(RS_RET_OK);
+dbgprintf("strmOpenFile actual open %p, iFileNumDigits: %d\n", pThis, pThis->iFileNumDigits);
if(pThis->pszFName == NULL)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
@@ -85,12 +86,13 @@ static rsRetVal strmOpenFile(strm_t *pThis)
if(pThis->tOperationsMode == STREAMMODE_READ)
iFlags = O_RDONLY;
else
- iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
+ //iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
+ iFlags = O_WRONLY | O_CREAT;
pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
if(pThis->fd == -1) {
int ierrnoSave = errno;
- dbgprintf("Stream 0x%lx: open error %d\n", (unsigned long) pThis, errno);
+ dbgprintf("Stream 0x%lx: open error %d, file '%s'\n", (unsigned long) pThis, errno, pThis->pszCurrFName);
if(ierrnoSave == ENOENT)
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
else
@@ -185,6 +187,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
/* DEV debug only: dbgprintf("strmRead index %d, max %d\n", pThis->iBufPtr, pThis->iBufPtrMax); */
if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */
*pC = pThis->iUngetC;
+ ++pThis->iCurrOffs; /* one more octet read */
pThis->iUngetC = -1;
ABORT_FINALIZE(RS_RET_OK);
}
@@ -220,6 +223,8 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
}
*pC = pThis->pIOBuf[pThis->iBufPtr++];
+ ++pThis->iCurrOffs; /* one more octet read */
+//dbgprintf("ReadChar: read %c, offset %d\n", *pC, pThis->iCurrOffs);
finalize_it:
return iRet;
@@ -235,6 +240,10 @@ rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
assert(pThis != NULL);
assert(pThis->iUngetC == -1);
pThis->iUngetC = c;
+ --pThis->iCurrOffs; /* one less octet read - NOTE: this can cause problems if we got a file change
+ and immediately do an unread and the file is on a buffer boundary and the stream is then persisted.
+ With the queue, this can not happen as an Unread is only done on record begin, which is never split
+ accross files. For other cases we accept the very remote risk. -- rgerhards, 2008-01-12 */
return RS_RET_OK;
}
@@ -402,6 +411,7 @@ finalize_it:
return iRet;
}
+
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
@@ -413,7 +423,7 @@ rsRetVal strmFlush(strm_t *pThis)
assert(pThis != NULL);
dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr);
- if(pThis->iBufPtr > 0) {
+ if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) {
iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
@@ -421,6 +431,45 @@ rsRetVal strmFlush(strm_t *pThis)
}
+/* seek a stream to a specific location. Pending writes are flushed, read data
+ * is invalidated.
+ * rgerhards, 2008-01-12
+ */
+static rsRetVal strmSeek(strm_t *pThis, off_t offs)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ if(pThis->fd == -1)
+ strmOpenFile(pThis);
+ else
+ strmFlush(pThis);
+ int i;
+ dbgprintf("Stream 0x%lx: seek file %d, pos %ld\n", (unsigned long) pThis, pThis->fd, offs);
+ i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error!
+dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i);
+ pThis->iCurrOffs = offs; /* we are now at *this* offset */
+ pThis->iBufPtr = 0; /* buffer invalidated */
+
+ return iRet;
+}
+
+
+/* seek to current offset. This is primarily a helper to readjust the OS file
+ * pointer after a strm object has been deserialized.
+ */
+rsRetVal strmSeekCurrOffs(strm_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ iRet = strmSeek(pThis, pThis->iCurrOffs);
+ return iRet;
+}
+
+
/* write a *single* character to a stream object -- rgerhards, 2008-01-10
*/
rsRetVal strmWriteChar(strm_t *pThis, uchar c)
@@ -514,6 +563,7 @@ rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
{
pThis->iMaxFiles = iNewVal;
pThis->iFileNumDigits = getNumberDigits(iNewVal);
+dbgprintf("strmSetiMaxFiles %p val %d, digits %d\n", pThis, iNewVal, pThis->iFileNumDigits);
return RS_RET_OK;
}
@@ -633,14 +683,12 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
-dbgprintf("strmSerialize 1\n");
+ strmFlush(pThis);
CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis));
-dbgprintf("strmSerialize 2\n");
objSerializeSCALAR(pStrm, iCurrFNum, INT);
objSerializePTR(pStrm, pszFName, PSZ);
objSerializeSCALAR(pStrm, iMaxFiles, INT);
- objSerializeSCALAR(pStrm, iFileNumDigits, INT);
objSerializeSCALAR(pStrm, bDeleteOnClose, INT);
i = pThis->sType;
@@ -662,11 +710,11 @@ dbgprintf("strmSerialize 2\n");
CHKiRet(objEndSerialize(pStrm));
finalize_it:
-dbgprintf("strmSerialize out %d\n", iRet);
return iRet;
}
+#include "stringbuf.h"
/* This function can be used as a generic way to set properties.
* rgerhards, 2008-01-11
@@ -698,7 +746,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp)
} else if(isProp("iFileNumDigits")) {
CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.vInt));
} else if(isProp("bDeleteOnClose")) {
- CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.vInt));
+ CHKiRet(strmSetbDeleteOnClose(pThis, pProp->val.vInt));
}
finalize_it:
diff --git a/stream.h b/stream.h
index d5b207ce..403eefaa 100644
--- a/stream.h
+++ b/stream.h
@@ -95,6 +95,8 @@ rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize);
rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName);
rsRetVal strmReadChar(strm_t *pThis, uchar *pC);
rsRetVal strmUnreadChar(strm_t *pThis, uchar c);
+//rsRetVal strmSeek(strm_t *pThis, off_t offs);
+rsRetVal strmSeekCurrOffs(strm_t *pThis);
rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
rsRetVal strmWriteChar(strm_t *pThis, uchar c);
rsRetVal strmWriteLong(strm_t *pThis, long i);
diff --git a/syslogd.c b/syslogd.c
index 25185efc..4950a8f9 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -3403,6 +3403,7 @@ init(void)
# undef setQPROPstr
/* ... and finally start the queue! */
+Initialized = 1;
CHKiRet_Hdlr(queueStart(pMsgQueue)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet);