summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-30 09:16:27 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-30 09:16:27 +0000
commit09a99b4d971607269525e56f1680d0ae6a0ea137 (patch)
tree5b1f96ec8829849a11b43726f5e49821bd2770e3 /queue.c
parentf547af1f86b489faaf16b4a040df5ebd3c974af5 (diff)
downloadrsyslog-09a99b4d971607269525e56f1680d0ae6a0ea137.tar.gz
rsyslog-09a99b4d971607269525e56f1680d0ae6a0ea137.tar.xz
rsyslog-09a99b4d971607269525e56f1680d0ae6a0ea137.zip
fixed a bug that caused $MainMsgQueueCheckpointInterval to work incorrectly
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c85
1 files changed, 46 insertions, 39 deletions
diff --git a/queue.c b/queue.c
index b1309ba2..0fef44e6 100644
--- a/queue.c
+++ b/queue.c
@@ -60,9 +60,28 @@ static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSav
static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
+/* some constants for queuePersist () */
+#define QUEUE_CHECKPOINT 1
+#define QUEUE_NO_CHECKPOINT 0
+
/* methods */
+/* get the overall queue size, which includes ungotten objects. Must only be called
+ * while mutex is locked!
+ * rgerhards, 2008-01-29
+ */
+static inline int
+queueGetOverallQueueSize(queue_t *pThis)
+{
+#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
+BEGINfunc
+dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n",
+ pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs);
+ENDfunc
+#endif
+ return pThis->iQueueSize + pThis->iUngottenObjs;
+}
/* --------------- code for disk-assisted (DA) queue modes -------------------- */
@@ -83,14 +102,14 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
/* if we have not yet reached the high water mark, there is no need to start a
* worker. -- rgerhards, 2008-01-26
*/
- if(pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
+ if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
} else {
if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
- iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1;
+ iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
}
@@ -148,7 +167,7 @@ queueTurnOffDAMode(queue_t *pThis)
* when it is waiting that the high water mark is reached again. If so, we need to start up
* a regular worker. -- rgerhards, 2008-01-26
*/
- if(pThis->iQueueSize > 0) {
+ if(queueGetOverallQueueSize(pThis) > 0) {
queueAdviseMaxWorkers(pThis);
}
}
@@ -338,7 +357,7 @@ queueChkStrtDA(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
/* if we do not hit the high water mark, we have nothing to do */
- if(pThis->iQueueSize != pThis->iHighWtrMrk)
+ if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
if(pThis->bRunsDA) {
@@ -352,14 +371,14 @@ queueChkStrtDA(queue_t *pThis)
* we need at least one).
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- pThis->iQueueSize);
+ queueGetOverallQueueSize(pThis));
queueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- pThis->iQueueSize);
+ queueGetOverallQueueSize(pThis));
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
@@ -847,21 +866,6 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__
/* --------------- end type-specific handlers -------------------- */
-/* get the overall queue size, which includes ungotten objects. Must only be called
- * while mutex is locked!
- * rgerhards, 2008-01-29
- */
-static inline int
-queueGetOverallQueueSize(queue_t *pThis)
-{
-BEGINfunc
-RUNLOG_VAR("%d", pThis->iQueueSize);
-RUNLOG_VAR("%d", pThis->iUngottenObjs);
-ENDfunc
- return pThis->iQueueSize + pThis->iUngottenObjs;
-}
-
-
/* unget a user pointer that has been dequeued. This functionality is especially important
* for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers
* is maintened in memory.
@@ -989,7 +993,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* first try to shutdown the queue within the regular shutdown period */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(pThis->iQueueSize > 0) {
+ if(queueGetOverallQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
/* We may have waited on the low water mark. As it may have changed, we
* see if we reactivate the worker.
@@ -1057,7 +1061,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
+ if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
@@ -1084,7 +1088,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
* they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(pThis->iQueueSize > 0) {
+ if(queueGetOverallQueueSize(pThis) > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1166,7 +1170,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis));
RETiRet;
}
@@ -1293,7 +1297,6 @@ static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void
int iSeverity;
ISOBJ_TYPE_assert(pThis, queue);
-RUNLOG_VAR("%p", pUsr);
ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
@@ -1429,7 +1432,7 @@ queueChkStopWrkrDA(queue_t *pThis)
bStopWrkr = 1;
} else {
if(pThis->bRunsDA) {
- if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+ if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
bStopWrkr = 0;
@@ -1454,7 +1457,7 @@ queueChkStopWrkrDA(queue_t *pThis)
static int
queueChkStopWrkrReg(queue_t *pThis)
{
- return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0);
+ return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0);
}
@@ -1556,7 +1559,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %ld, qsize %d, child %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
- pThis->iQueueSize, pThis->pqParent == NULL ? 0 : 1);
+ queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1620,9 +1623,11 @@ finalize_it:
/* persist the queue to disk. If we have something to persist, we first
* save the information on the queue properties itself and then we call
* the queue-type specific drivers.
+ * Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint,
+ * and 0 otherwise.
* rgerhards, 2008-01-10
*/
-static rsRetVal queuePersist(queue_t *pThis)
+static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
{
DEFiRet;
strm_t *psQIF = NULL; /* Queue Info File */
@@ -1633,7 +1638,7 @@ static rsRetVal queuePersist(queue_t *pThis)
ASSERT(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
- if(pThis->iQueueSize > 0) {
+ if(queueGetOverallQueueSize(pThis) > 0) {
/* This error code is OK, but we will probably not implement this any time
* The reason is that persistence happens via DA queues. But I would like to
* leave the code as is, as we so have a hook in case we need one.
@@ -1644,12 +1649,12 @@ static rsRetVal queuePersist(queue_t *pThis)
FINALIZE; /* if the queue is empty, we are happy and done... */
}
- dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis));
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
- if(pThis->iQueueSize == 0) {
+ if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
pThis->bNeedDelQIF = 0;
@@ -1691,8 +1696,12 @@ static rsRetVal queuePersist(queue_t *pThis)
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
- /* tell the input file object that it must not delete the file on close if the queue is non-empty */
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+ /* tell the input file object that it must not delete the file on close if the queue
+ * is non-empty - but only if we are not during a simple checkpoint
+ */
+ if(bIsCheckpoint != QUEUE_CHECKPOINT) {
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+ }
/* we have persisted the queue object. So whenever it comes to an empty queue,
* we need to delete the QIF. Thus, we indicte that need.
@@ -1719,7 +1728,7 @@ rsRetVal queueChkPersist(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
- queuePersist(pThis);
+ queuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
@@ -1772,7 +1781,7 @@ CODESTARTobjDestruct(queue)
* disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
* if need arises (what I doubt...) -- rgerhards, 2008-01-25
*/
- CHKiRet_Hdlr(queuePersist(pThis)) {
+ CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
}
@@ -1890,8 +1899,6 @@ RUNLOG_VAR("%d", pThis->bRunsDA);
}
/* and finally enqueue the message */
-RUNLOG_VAR("%p", pThis);
-RUNLOG_VAR("%d", pThis->bRunsDA);
CHKiRet(queueAdd(pThis, pUsr));
queueChkPersist(pThis);