diff options
-rw-r--r-- | doc/rsyslog_conf.html | 1 | ||||
-rw-r--r-- | queue.c | 49 | ||||
-rw-r--r-- | queue.h | 4 | ||||
-rw-r--r-- | syslogd.c | 8 |
4 files changed, 55 insertions, 7 deletions
diff --git a/doc/rsyslog_conf.html b/doc/rsyslog_conf.html index 073d9697..a8eaf062 100644 --- a/doc/rsyslog_conf.html +++ b/doc/rsyslog_conf.html @@ -63,6 +63,7 @@ development and quite unstable...). So you have been warned ;)</p> <li>$MainMsgQueueImmediateShutdown [on/<b>off</b>]</li> <li><a href="rsconf1_mainmsgqueuesize.html">$MainMsgQueueSize</a></li> <li>$MainMsgQueueMaxFileSize <size_nbr>, default 1m</li> + <li>$MainMsgQueuePersistUpdateCount <number></li> <li>$MainMsgQueueType [<b>FixedArray</b>/LinkedList/Direct/Disk]</li> <li>$MainMsgQueueWorkerThreads <number>, num worker threads, default 1, recommended 1</li> @@ -48,6 +48,9 @@ /* static data */ DEFobjStaticHelpers +/* forward-definitions */ +rsRetVal queueChkPersist(queue_t *pThis); + /* methods */ /* first, we define type-specific handlers. The provide a generic functionality, @@ -550,6 +553,7 @@ queueWorker(void *arg) if(pThis->iQueueSize > 0) { /* dequeue element (still protected from mutex) */ iRet = queueDel(pThis, &pUsr); + queueChkPersist(pThis); // when we support peek(), we must do this down after the del! pthread_mutex_unlock(pThis->mut); pthread_cond_signal (pThis->notFull); /* do actual processing (the lengthy part, runs in parallel) @@ -743,7 +747,6 @@ static rsRetVal queuePersist(queue_t *pThis) strm_t *psQIF = NULL;; /* Queue Info File */ uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; - int i; assert(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) @@ -758,6 +761,8 @@ static rsRetVal queuePersist(queue_t *pThis) unlink((char*)pszQIFNam); pThis->bNeedDelQIF = 0; } + /* indicate spool file needs to be deleted */ + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); FINALIZE; /* nothing left to do, so be happy */ } @@ -775,8 +780,6 @@ static rsRetVal queuePersist(queue_t *pThis) * we know when somebody has changed the queue type... -- rgerhards, 2008-01-11 */ CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis)); - i = pThis->qType; - objSerializeSCALAR_VAR(psQIF, qType, INT, i); objSerializeSCALAR(psQIF, iQueueSize, INT); CHKiRet(objEndSerialize(psQIF)); @@ -786,9 +789,14 @@ static rsRetVal queuePersist(queue_t *pThis) /* persist queue object itself */ - /* tell the input file object that it must not delete the file on close */ + /* tell the input file object that it must not delete the file on close if the queue is non-empty */ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); + /* we have persisted the queue object. So whenever it comes to an empty queue, + * we need to delete the QIF. Thus, we indicte that need. + */ + pThis->bNeedDelQIF = 1; + finalize_it: if(psQIF != NULL) strmDestruct(psQIF); @@ -797,6 +805,28 @@ finalize_it: } +/* check if we need to persist the current queue info. If an + * error occurs, thus should be ignored by caller (but we still + * abide to our regular call interface)... + * rgerhards, 2008-01-13 + */ +rsRetVal queueChkPersist(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + +dbgprintf("chkPersist: PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist); + if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { +dbgprintf("persistintg queue info!\n"); + queuePersist(pThis); + pThis->iUpdsSincePersist = 0; + } + + return iRet; +} + + /* destructor for the queue object */ rsRetVal queueDestruct(queue_t *pThis) { @@ -922,6 +952,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) } } CHKiRet(queueAdd(pThis, pUsr)); + queueChkPersist(pThis); finalize_it: /* now activate the worker thread */ @@ -938,6 +969,16 @@ finalize_it: /* some simple object access methods */ DEFpropSetMeth(queue, bImmediateShutdown, int); +DEFpropSetMeth(queue, iPersistUpdCnt, int); +#if 0 +rsRetVal queueSetiPersistUpdCnt(queue_t *pThis, int pVal) +{ + dbgprintf("queueSetiPersistUpdCnt(), val %d\n", pVal); + pThis->iPersistUpdCnt = pVal; +dbgprintf("queSetiPersist..(): PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist); + return RS_RET_OK; +} +#endif /* This function can be used as a generic way to set properties. Only the subset @@ -81,7 +81,8 @@ typedef struct queue_s { int iNumWorkerThreads;/* number of worker threads to use */ qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */ int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */ - //int bNeedPersist; /* does the queue need to be persisted on disk (updated since last persist?) */ + int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ + int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */ /* type-specific handlers (set during construction) */ @@ -132,6 +133,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*)); PROTOTYPEObjClassInit(queue); PROTOTYPEpropSetMeth(queue, bImmediateShutdown, int); +PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int); #define queueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ @@ -421,6 +421,7 @@ static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main m static uchar *pszMainMsgQFName = NULL; /* prefix for the main message queue file */ static size_t iMainMsgQueMaxFileSize = 1024*1024; static int bMainMsgQImmediateShutdown = 0; /* shut down the queue immediately? */ +static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates */ /* This structure represents the files that will have log @@ -525,6 +526,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iMainMsgQueMaxFileSize = 1024 * 1024; iMainMsgQueueNumWorkers = 1; bMainMsgQImmediateShutdown = 0; + iMainMsgQPersistUpdCnt = 0; MainMsgQueType = QUEUETYPE_FIXED_ARRAY; return RS_RET_OK; @@ -3138,8 +3140,8 @@ static void dbgPrintInitInfo(void) cCCEscapeChar); dbgprintf("Main queue size %d messages.\n", iMainMsgQueueSize); - dbgprintf("Main queue worker threads: %d, ImmediateShutdown: %d\n", - iMainMsgQueueNumWorkers, bMainMsgQImmediateShutdown); + dbgprintf("Main queue worker threads: %d, ImmediateShutdown: %d, Perists every %d updates.\n", + iMainMsgQueueNumWorkers, bMainMsgQImmediateShutdown, iMainMsgQPersistUpdCnt); dbgprintf("Work Directory: '%s'.\n", pszWorkDir); } @@ -3398,6 +3400,7 @@ init(void) setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", (pszMainMsgQFName == NULL ? (uchar*) "mainq" : pszMainMsgQFName)); + setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueuePersistUpdateCount", iMainMsgQPersistUpdCnt); # undef setQPROP # undef setQPROPstr @@ -4559,6 +4562,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszMainMsgQFName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueimmediateshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQImmediateShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuepersistupdatecount", 0, eCmdHdlrInt, NULL, &iMainMsgQPersistUpdCnt, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); |