diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-30 09:16:27 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-30 09:16:27 +0000 |
commit | 09a99b4d971607269525e56f1680d0ae6a0ea137 (patch) | |
tree | 5b1f96ec8829849a11b43726f5e49821bd2770e3 /queue.c | |
parent | f547af1f86b489faaf16b4a040df5ebd3c974af5 (diff) | |
download | rsyslog-09a99b4d971607269525e56f1680d0ae6a0ea137.tar.gz rsyslog-09a99b4d971607269525e56f1680d0ae6a0ea137.tar.xz rsyslog-09a99b4d971607269525e56f1680d0ae6a0ea137.zip |
fixed a bug that caused $MainMsgQueueCheckpointInterval to work incorrectly
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 85 |
1 files changed, 46 insertions, 39 deletions
@@ -60,9 +60,28 @@ static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSav static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex); +/* some constants for queuePersist () */ +#define QUEUE_CHECKPOINT 1 +#define QUEUE_NO_CHECKPOINT 0 + /* methods */ +/* get the overall queue size, which includes ungotten objects. Must only be called + * while mutex is locked! + * rgerhards, 2008-01-29 + */ +static inline int +queueGetOverallQueueSize(queue_t *pThis) +{ +#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */ +BEGINfunc +dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n", + pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs); +ENDfunc +#endif + return pThis->iQueueSize + pThis->iUngottenObjs; +} /* --------------- code for disk-assisted (DA) queue modes -------------------- */ @@ -83,14 +102,14 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) /* if we have not yet reached the high water mark, there is no need to start a * worker. -- rgerhards, 2008-01-26 */ - if(pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { + if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } } else { if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { iMaxWorkers = 1; } else { - iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; + iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } @@ -148,7 +167,7 @@ queueTurnOffDAMode(queue_t *pThis) * when it is waiting that the high water mark is reached again. If so, we need to start up * a regular worker. -- rgerhards, 2008-01-26 */ - if(pThis->iQueueSize > 0) { + if(queueGetOverallQueueSize(pThis) > 0) { queueAdviseMaxWorkers(pThis); } } @@ -338,7 +357,7 @@ queueChkStrtDA(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); /* if we do not hit the high water mark, we have nothing to do */ - if(pThis->iQueueSize != pThis->iHighWtrMrk) + if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); if(pThis->bRunsDA) { @@ -352,14 +371,14 @@ queueChkStrtDA(queue_t *pThis) * we need at least one). */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - pThis->iQueueSize); + queueGetOverallQueueSize(pThis)); queueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - pThis->iQueueSize); + queueGetOverallQueueSize(pThis)); queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } @@ -847,21 +866,6 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__ /* --------------- end type-specific handlers -------------------- */ -/* get the overall queue size, which includes ungotten objects. Must only be called - * while mutex is locked! - * rgerhards, 2008-01-29 - */ -static inline int -queueGetOverallQueueSize(queue_t *pThis) -{ -BEGINfunc -RUNLOG_VAR("%d", pThis->iQueueSize); -RUNLOG_VAR("%d", pThis->iUngottenObjs); -ENDfunc - return pThis->iQueueSize + pThis->iUngottenObjs; -} - - /* unget a user pointer that has been dequeued. This functionality is especially important * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers * is maintened in memory. @@ -989,7 +993,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(pThis->iQueueSize > 0) { + if(queueGetOverallQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we * see if we reactivate the worker. @@ -1057,7 +1061,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) { + if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ @@ -1084,7 +1088,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown); * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(pThis->iQueueSize > 0) { + if(queueGetOverallQueueSize(pThis) > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1166,7 +1170,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown); * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis)); RETiRet; } @@ -1293,7 +1297,6 @@ static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void int iSeverity; ISOBJ_TYPE_assert(pThis, queue); -RUNLOG_VAR("%p", pUsr); ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { @@ -1429,7 +1432,7 @@ queueChkStopWrkrDA(queue_t *pThis) bStopWrkr = 1; } else { if(pThis->bRunsDA) { - if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { bStopWrkr = 0; @@ -1454,7 +1457,7 @@ queueChkStopWrkrDA(queue_t *pThis) static int queueChkStopWrkrReg(queue_t *pThis) { - return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0); + return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0); } @@ -1556,7 +1559,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %ld, qsize %d, child %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, - pThis->iQueueSize, pThis->pqParent == NULL ? 0 : 1); + queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1620,9 +1623,11 @@ finalize_it: /* persist the queue to disk. If we have something to persist, we first * save the information on the queue properties itself and then we call * the queue-type specific drivers. + * Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint, + * and 0 otherwise. * rgerhards, 2008-01-10 */ -static rsRetVal queuePersist(queue_t *pThis) +static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) { DEFiRet; strm_t *psQIF = NULL; /* Queue Info File */ @@ -1633,7 +1638,7 @@ static rsRetVal queuePersist(queue_t *pThis) ASSERT(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) { - if(pThis->iQueueSize > 0) { + if(queueGetOverallQueueSize(pThis) > 0) { /* This error code is OK, but we will probably not implement this any time * The reason is that persistence happens via DA queues. But I would like to * leave the code as is, as we so have a hook in case we need one. @@ -1644,12 +1649,12 @@ static rsRetVal queuePersist(queue_t *pThis) FINALIZE; /* if the queue is empty, we are happy and done... */ } - dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis)); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); - if(pThis->iQueueSize == 0) { + if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { unlink((char*)pszQIFNam); pThis->bNeedDelQIF = 0; @@ -1691,8 +1696,12 @@ static rsRetVal queuePersist(queue_t *pThis) CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); - /* tell the input file object that it must not delete the file on close if the queue is non-empty */ - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); + /* tell the input file object that it must not delete the file on close if the queue + * is non-empty - but only if we are not during a simple checkpoint + */ + if(bIsCheckpoint != QUEUE_CHECKPOINT) { + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); + } /* we have persisted the queue object. So whenever it comes to an empty queue, * we need to delete the QIF. Thus, we indicte that need. @@ -1719,7 +1728,7 @@ rsRetVal queueChkPersist(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { - queuePersist(pThis); + queuePersist(pThis, QUEUE_CHECKPOINT); pThis->iUpdsSincePersist = 0; } @@ -1772,7 +1781,7 @@ CODESTARTobjDestruct(queue) * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here * if need arises (what I doubt...) -- rgerhards, 2008-01-25 */ - CHKiRet_Hdlr(queuePersist(pThis)) { + CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) { dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); } @@ -1890,8 +1899,6 @@ RUNLOG_VAR("%d", pThis->bRunsDA); } /* and finally enqueue the message */ -RUNLOG_VAR("%p", pThis); -RUNLOG_VAR("%d", pThis->bRunsDA); CHKiRet(queueAdd(pThis, pUsr)); queueChkPersist(pThis); |