diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-27 14:46:23 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-27 14:46:23 +0000 |
commit | 7d8b1c293746d325db7f93d343a952e382da9ddd (patch) | |
tree | 11eb0c0bceb920fc7e89ecb1fe83bd89e46b9fd2 /queue.c | |
parent | ea7fd874d7b294dacc909a0f8e9c51dcc639d879 (diff) | |
download | rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.tar.gz rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.tar.xz rsyslog-7d8b1c293746d325db7f93d343a952e382da9ddd.zip |
fixed a bug when shutting down DA queue
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 33 |
1 files changed, 23 insertions, 10 deletions
@@ -854,11 +854,11 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); - // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25 /* we reduce the low water mark in any case. This is not absolutely necessary, but * it is useful because we enable DA mode at several spots below and so we do not need * to think about the low water mark each time. */ + pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ pThis->iLowWtrMrk = 0; /* first try to shutdown the queue within the regular shutdown period */ @@ -916,8 +916,13 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* when we reach this point, both queues are either empty or the regular queue shutdown timeout * has expired. Now we need to check if we are configured to not loose messages. If so, we need - * to persist the queue to disk (this is only possible if the queue is DA-enabled). + * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also + * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer + * is done. This is especially important as we otherwise may interfere with queue order while the + * DA consumer is running. -- rgerhards, 2008-01-27 */ + wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ + // TODO: what about pure disk queues and bSaveOnShutdown? BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ @@ -936,8 +941,8 @@ RUNLOG; /* make sure we do not timeout before we are done */ dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis)); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); - /* and run the primary's queue worker to drain the queue */ - iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + /* and run the primary queue's DA worker to drain the queue */ + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, " "continuing, but results are unpredictable\n", @@ -949,13 +954,15 @@ RUNLOG; RUNLOG; /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we - * can now request immediate shutdown of any remaining workers. + * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set, + * the queue is now empty. If regular workers are still running, and try to pull the next message, + * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); - timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + timeoutComp(&tTimeout, pThis->toActShutdown); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue " @@ -992,12 +999,12 @@ RUNLOG_VAR("%d", pThis->iQueueSize); /* ... and now the DA queue, if it exists (should always be after the primary one) */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ //TODO: use right mutex! - //old: if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { + // was used: if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) { if(pThis->pqDA != NULL) { RUNLOG_VAR("%p", pThis->pqDA->pWtpReg); RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd); } - if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) { + if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n", queueGetID(pThis)); @@ -1098,6 +1105,7 @@ finalize_it: * Params: * arg1 - user pointer (in this case a queue_t) * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) + * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists! * rgerhards, 2008-01-16 */ static rsRetVal @@ -1109,7 +1117,6 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) obj_t *pUsr = (obj_t*) arg2; ISOBJ_TYPE_assert(pThis, queue); - ISOBJ_assert(pUsr); dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n", queueGetID(pThis)); @@ -1278,6 +1285,12 @@ queueChkStopWrkrDA(queue_t *pThis) bStopWrkr = 1; } else { if(pThis->bRunsDA) { +#if 0 +RUNLOG_VAR("%d", pThis->iQueueSize); +RUNLOG_VAR("%d", pThis->iHighWtrMrk); +if(pThis->pqDA != NULL) + RUNLOG_VAR("%d", pThis->pqDA->bQueueStarted); +#endif if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { @@ -1288,7 +1301,7 @@ queueChkStopWrkrDA(queue_t *pThis) } } -RUNLOG_VAR("%d", bStopWrkr); +//RUNLOG_VAR("%d", bStopWrkr); ENDfunc return bStopWrkr; } |