summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-27 14:46:23 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-27 14:46:23 +0000
commit7d8b1c293746d325db7f93d343a952e382da9ddd (patch)
tree11eb0c0bceb920fc7e89ecb1fe83bd89e46b9fd2 /queue.c
parentea7fd874d7b294dacc909a0f8e9c51dcc639d879 (diff)
downloadrsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.tar.gz
rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.tar.xz
rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.zip
fixed a bug when shutting down DA queue
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c33
1 files changed, 23 insertions, 10 deletions
diff --git a/queue.c b/queue.c
index ac891584..d26a6e24 100644
--- a/queue.c
+++ b/queue.c
@@ -854,11 +854,11 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
- // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25
/* we reduce the low water mark in any case. This is not absolutely necessary, but
* it is useful because we enable DA mode at several spots below and so we do not need
* to think about the low water mark each time.
*/
+ pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
pThis->iLowWtrMrk = 0;
/* first try to shutdown the queue within the regular shutdown period */
@@ -916,8 +916,13 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* when we reach this point, both queues are either empty or the regular queue shutdown timeout
* has expired. Now we need to check if we are configured to not loose messages. If so, we need
- * to persist the queue to disk (this is only possible if the queue is DA-enabled).
+ * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also
+ * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer
+ * is done. This is especially important as we otherwise may interfere with queue order while the
+ * DA consumer is running. -- rgerhards, 2008-01-27
*/
+ wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
+
// TODO: what about pure disk queues and bSaveOnShutdown?
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
@@ -936,8 +941,8 @@ RUNLOG;
/* make sure we do not timeout before we are done */
dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis));
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
- /* and run the primary's queue worker to drain the queue */
- iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
+ /* and run the primary queue's DA worker to drain the queue */
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
"continuing, but results are unpredictable\n",
@@ -949,13 +954,15 @@ RUNLOG;
RUNLOG;
/* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
- * can now request immediate shutdown of any remaining workers.
+ * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set,
+ * the queue is now empty. If regular workers are still running, and try to pull the next message,
+ * they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ timeoutComp(&tTimeout, pThis->toActShutdown);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
@@ -992,12 +999,12 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
/* ... and now the DA queue, if it exists (should always be after the primary one) */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
//TODO: use right mutex!
- //old: if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
+ // was used: if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) {
if(pThis->pqDA != NULL) {
RUNLOG_VAR("%p", pThis->pqDA->pWtpReg);
RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd);
}
- if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) {
+ if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n",
queueGetID(pThis));
@@ -1098,6 +1105,7 @@ finalize_it:
* Params:
* arg1 - user pointer (in this case a queue_t)
* arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
+ * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
* rgerhards, 2008-01-16
*/
static rsRetVal
@@ -1109,7 +1117,6 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
obj_t *pUsr = (obj_t*) arg2;
ISOBJ_TYPE_assert(pThis, queue);
- ISOBJ_assert(pUsr);
dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
queueGetID(pThis));
@@ -1278,6 +1285,12 @@ queueChkStopWrkrDA(queue_t *pThis)
bStopWrkr = 1;
} else {
if(pThis->bRunsDA) {
+#if 0
+RUNLOG_VAR("%d", pThis->iQueueSize);
+RUNLOG_VAR("%d", pThis->iHighWtrMrk);
+if(pThis->pqDA != NULL)
+ RUNLOG_VAR("%d", pThis->pqDA->bQueueStarted);
+#endif
if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
@@ -1288,7 +1301,7 @@ queueChkStopWrkrDA(queue_t *pThis)
}
}
-RUNLOG_VAR("%d", bStopWrkr);
+//RUNLOG_VAR("%d", bStopWrkr);
ENDfunc
return bStopWrkr;
}