diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-11 14:12:25 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-11 14:12:25 +0000 |
commit | 8dad3997505f71e6e9962892f79d7b7dad0a89ce (patch) | |
tree | ce49da850c9d4deaf6143ef2232766e8c1ccdd35 /queue.c | |
parent | e095d1ab45b205b4849151b15592c2824f04373a (diff) | |
download | rsyslog-8dad3997505f71e6e9962892f79d7b7dad0a89ce.tar.gz rsyslog-8dad3997505f71e6e9962892f79d7b7dad0a89ce.tar.xz rsyslog-8dad3997505f71e6e9962892f79d7b7dad0a89ce.zip |
file stream objects are now persistet on immediate queue shutdown (queue
itself is not yet fully persisted)
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 90 |
1 files changed, 88 insertions, 2 deletions
@@ -28,6 +28,7 @@ */ #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <string.h> #include <assert.h> @@ -206,6 +207,7 @@ static rsRetVal qConstructDisk(queue_t *pThis) CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); + CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); @@ -213,6 +215,7 @@ static rsRetVal qConstructDisk(queue_t *pThis) CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); finalize_it: @@ -584,6 +587,70 @@ finalize_it: return iRet; } + +#if 0 +/* persist disk status on disk. This is necessary if we run either + * a disk queue or in a disk assisted mode. + */ +static rsRetVal queuePersistDskFilInfo(queue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + +finalize_it: + return iRet; +} +#endif + + + +/* persist the queue to disk. If we have something to persist, we first + * save the information on the queue properties itself and then we call + * the queue-type specific drivers. + * rgerhards, 2008-01-10 + */ +static rsRetVal queuePersist(queue_t *pThis) +{ + DEFiRet; + strm_t *psQIF; /* Queue Info File */ + uchar pszQIFNam[MAXFNAME]; + size_t lenQIFNam; + + assert(pThis != NULL); + if(pThis->iQueueSize == 0) + FINALIZE; /* nothing left to do, so be happy */ + + dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize); + /* Construct file name */ + lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", + (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); + CHKiRet(strmConstruct(&psQIF)); + CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE)); + CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strmConstructFinalize(psQIF)); + + /* this is disk specific and must be moved to a function */ + CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); + CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); + + /* persist queue object itself */ + + /* ready with the queue, now call driver to persist queue data */ + //iRet = ; + + /* the the input file object that it must not delete the file on close */ + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); + +finalize_it: + strmDestruct(psQIF); + return iRet; +} + + /* destructor for the queue object */ rsRetVal queueDestruct(queue_t *pThis) { @@ -598,6 +665,13 @@ rsRetVal queueDestruct(queue_t *pThis) pThis->pWrkThrds = NULL; } + /* now check if we need to persist the queue */ + if(pThis->bImmediateShutdown) { + CHKiRet_Hdlr(queuePersist(pThis)) { + dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet); + } + } + /* ... then free resources */ pthread_mutex_destroy(pThis->mut); free(pThis->mut); @@ -608,6 +682,9 @@ rsRetVal queueDestruct(queue_t *pThis) /* type-specific destructor */ iRet = pThis->qDestruct(pThis); + if(pThis->pszFilePrefix != NULL) + free(pThis->pszFilePrefix); + /* and finally delete the queue objet itself */ free(pThis); @@ -624,9 +701,18 @@ rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; + + if(pThis->pszFilePrefix != NULL) + free(pThis->pszFilePrefix); + + if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); + pThis->lenFilePrefix = iLenPrefix; + if(pThis->qType == QUEUETYPE_DISK) { - CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix)); - CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix)); } finalize_it: return iRet; |