summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-13 15:47:41 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-13 15:47:41 +0000
commitabdcea7d8f0eda058a6c6719cf24955878279d7c (patch)
treebab4f9dc61457f63e9a30516cf91eb8112fac1ef /queue.c
parent366060a51de60c717886636d6ef646bf1959972c (diff)
downloadrsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.tar.gz
rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.tar.xz
rsyslog-abdcea7d8f0eda058a6c6719cf24955878279d7c.zip
support for reading back persistet queue information completed
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c253
1 files changed, 154 insertions, 99 deletions
diff --git a/queue.c b/queue.c
index b31173d9..d9a3f130 100644
--- a/queue.c
+++ b/queue.c
@@ -191,6 +191,104 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
/* -------------------- disk -------------------- */
+
+/* This method checks if there is any persistent information on the
+ * queue.
+ */
+#if 0
+static rsRetVal
+queueTryLoadPersistedInfo(queue_t *pThis)
+{
+ DEFiRet;
+ strm_t *psQIF = NULL;
+ uchar pszQIFNam[MAXFNAME];
+ size_t lenQIFNam;
+ AsPropBagstruct stat stat_buf;
+}
+#endif
+
+
+static rsRetVal
+queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pStrm, strm);
+ ISOBJ_TYPE_assert(pThis, queue);
+ CHKiRet(strmSetDir(pStrm, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+finalize_it:
+ return iRet;
+}
+
+
+/* The method loads the persistent queue information.
+ * rgerhards, 2008-01-11
+ */
+static rsRetVal
+queueTryLoadPersistedInfo(queue_t *pThis)
+{
+ DEFiRet;
+ strm_t *psQIF = NULL;
+ uchar pszQIFNam[MAXFNAME];
+ size_t lenQIFNam;
+ struct stat stat_buf;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ /* Construct file name */
+ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
+ (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+
+ /* check if the file exists */
+dbgprintf("stat '%s'\n", pszQIFNam);
+ if(stat((char*) pszQIFNam, &stat_buf) == -1) {
+ if(errno == ENOENT) {
+ dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis));
+ ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
+ } else {
+ dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno);
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+ }
+
+ /* If we reach this point, we have a .qi file */
+
+ CHKiRet(strmConstruct(&psQIF));
+ CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ));
+ CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
+ CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strmConstructFinalize(psQIF));
+
+ /* first, we try to read the property bag for ourselfs */
+ CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
+
+ /* and now the stream objects (some order as when persisted!) */
+ CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF,
+ (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+ CHKiRet(objDeserialize(&pThis->tVars.disk.pRead, OBJstrm, psQIF,
+ (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
+
+ /* OK, we could successfully read the file, so we now can request that it be
+ * deleted when we are done with the persisted information.
+ */
+ pThis->bNeedDelQIF = 1;
+
+finalize_it:
+ if(psQIF != NULL)
+ strmDestruct(psQIF);
+
+ if(iRet != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n",
+ queueGetID(pThis), iRet);
+ }
+
+ return iRet;
+}
+
+
/* disk queue constructor.
* Note that we use a file limit of 10,000,000 files. That number should never pose a
* problem. If so, I guess the user has a design issue... But of course, the code can
@@ -201,23 +299,48 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
static rsRetVal qConstructDisk(queue_t *pThis)
{
DEFiRet;
+ int bRestarted = 0;
assert(pThis != NULL);
- CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite));
- CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
- CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
-
- CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
- CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
- CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
+ /* and now check if there is some persistent information that needs to be read in */
+ iRet = queueTryLoadPersistedInfo(pThis);
+ if(iRet == RS_RET_OK)
+ bRestarted = 1;
+ else if(iRet != RS_RET_FILE_NOT_FOUND)
+ FINALIZE;
+
+dbgprintf("qConstructDisk: bRestarted %d, iRet %d\n", bRestarted, iRet);
+ if(bRestarted == 1) {
+ ;
+ } else {
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
+
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
+
+
+ CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ }
+
+ /* now we set (and overwrite in case of a persisted restart) some parameters which
+ * should always reflect the current configuration variables. Be careful by doing so,
+ * for example file name generation must not be changed as that would break the
+ * ability to read existing queue files. -- rgerhards, 2008-01-12
+ */
+CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
+CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
finalize_it:
return iRet;
@@ -254,7 +377,7 @@ finalize_it:
static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
{
- return objDeserialize(ppUsr, OBJMsg, pThis->tVars.disk.pRead);
+ return objDeserialize(ppUsr, OBJMsg, pThis->tVars.disk.pRead, NULL, NULL);
}
/* -------------------- direct (no queueing) -------------------- */
@@ -480,67 +603,6 @@ queueWorker(void *arg)
}
-/* This method checks if there is any persistent information on the
- * queue and, if so, tries to load it. This method can only legally be
- * called from the destructor (I moved it out from there to keep the
- * Constructor code somewhat smaller). -- rgerhards, 2008-01-11
- */
-static rsRetVal
-queueTryLoadPersistedInfo(queue_t *pThis)
-{
- DEFiRet;
- strm_t *psQIF = NULL;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
- struct stat stat_buf;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
-
- /* check if the file exists */
-dbgprintf("stat '%s'\n", pszQIFNam);
- if(stat((char*) pszQIFNam, &stat_buf) == -1) {
- if(errno == ENOENT) {
- dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis));
- ABORT_FINALIZE(RS_RET_OK);
- } else {
- dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno);
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- }
- }
-
- /* If we reach this point, we have a .qi file */
-
- CHKiRet(strmConstruct(&psQIF));
- CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
- CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ));
- CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
- CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
- CHKiRet(strmConstructFinalize(psQIF));
-
- /* first, we try to read the property bag for ourselfs */
- CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
-
- /* and now the stream objects (some order as when persisted!) */
- CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pWrite, psQIF));
- CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pRead, psQIF));
-
-finalize_it:
- if(psQIF != NULL)
- strmDestruct(psQIF);
-
- if(iRet != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n",
- queueGetID(pThis), iRet);
- }
-
- return iRet;
-}
-
-
/* Constructor for the queue object
* This constructs the data structure, but does not yet start the queue. That
* is done by queueStart(). The reason is that we want to give the caller a chance
@@ -611,9 +673,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
break;
}
- /* call type-specific constructor */
- CHKiRet(pThis->qConstruct(pThis));
-
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
return iRet;
@@ -631,8 +690,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
assert(pThis != NULL);
- /* and now check if there is some persistent information that needs to be read in */
- CHKiRet(queueTryLoadPersistedInfo(pThis));
+ /* call type-specific constructor */
+ CHKiRet(pThis->qConstruct(pThis));
dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType,
pThis->iMaxFileSize);
@@ -681,15 +740,12 @@ finalize_it:
static rsRetVal queuePersist(queue_t *pThis)
{
DEFiRet;
- strm_t *psQIF; /* Queue Info File */
+ strm_t *psQIF = NULL;; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
int i;
assert(pThis != NULL);
- if(pThis->iQueueSize == 0)
- FINALIZE; /* nothing left to do, so be happy */
-
if(pThis->qType != QUEUETYPE_DISK)
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
@@ -697,6 +753,14 @@ static rsRetVal queuePersist(queue_t *pThis)
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+ if(pThis->iQueueSize == 0) {
+ if(pThis->bNeedDelQIF) {
+ unlink((char*)pszQIFNam);
+ pThis->bNeedDelQIF = 0;
+ }
+ FINALIZE; /* nothing left to do, so be happy */
+ }
+
CHKiRet(strmConstruct(&psQIF));
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
@@ -715,23 +779,20 @@ static rsRetVal queuePersist(queue_t *pThis)
objSerializeSCALAR_VAR(psQIF, qType, INT, i);
objSerializeSCALAR(psQIF, iQueueSize, INT);
CHKiRet(objEndSerialize(psQIF));
-dbgprintf("queue serial 1\n");
/* this is disk specific and must be moved to a function */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
-dbgprintf("queue serial 2\n");
/* persist queue object itself */
- /* ready with the queue, now call driver to persist queue data */
- //iRet = ;
-
- /* the the input file object that it must not delete the file on close */
+ /* tell the input file object that it must not delete the file on close */
CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
finalize_it:
- strmDestruct(psQIF);
+ if(psQIF != NULL)
+ strmDestruct(psQIF);
+
return iRet;
}
@@ -794,11 +855,7 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1);
pThis->lenFilePrefix = iLenPrefix;
-
- if(pThis->qType == QUEUETYPE_DISK) {
- CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix));
- CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix));
- }
+
finalize_it:
return iRet;
}
@@ -818,8 +875,6 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
}
pThis->iMaxFileSize = iMaxFileSize;
- if(pThis->qType == QUEUETYPE_DISK)
- CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, iMaxFileSize));
finalize_it:
return iRet;