diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-20 12:53:00 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-20 12:53:00 +0200 |
commit | 1c580743390c704bcfb44148f4a70254c2c247da (patch) | |
tree | 885bfb41691bf7ee57d05d4ee0c0fa87742b156d /runtime/queue.c | |
parent | bf8125f4e96a011ec28cc58b225bb815f72fc53c (diff) | |
download | rsyslog-1c580743390c704bcfb44148f4a70254c2c247da.tar.gz rsyslog-1c580743390c704bcfb44148f4a70254c2c247da.tar.xz rsyslog-1c580743390c704bcfb44148f4a70254c2c247da.zip |
solved potential race condition and some cleanup
code review brought up some few places where we may have run into a race.
They have most probably been introduced during the recent set of changes. But
I do not look at older versions because of the changed architecture, one can
not simply backport this patch.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 7590af18..50bfaca9 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -264,6 +264,7 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) * call this function. As such,it may validly be that DA is already shut down. * So we just check if we are in init phase and then wait for full startup. * If in non-DA mode, we silently return. + * IMPORTANT: the QUEUE MUTEX MUST BE LOOKED WHEN this funnction is called! * rgerhards, 2008-02-27 */ static rsRetVal @@ -1167,12 +1168,12 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } - d_pthread_mutex_unlock(pThis->mut); /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) { qqueueWaitDAModeInitialized(pThis); } + d_pthread_mutex_unlock(pThis->mut); /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found * out there are no active workers - that doesn't matter: the wtp knows about that and so will @@ -1200,7 +1201,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */ + d_pthread_mutex_lock(pThis->mut); if(pThis->bRunsDA) { + d_pthread_mutex_unlock(pThis->mut); dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", qqueueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured @@ -1216,13 +1219,15 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting * the queue, it is restarted at a later stage. We don't care here if a timeout happens. */ - dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); + dbgoprint((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n"); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); } else { dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n"); } + } else { + d_pthread_mutex_unlock(pThis->mut); } RETiRet; @@ -1247,7 +1252,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ - /* note that we modify bEnqOnly direclty, because going through the method would + /* note that we modify bEnqOnly directly, because going through the method would * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 */ pThis->bEnqOnly = 1; @@ -2281,12 +2286,14 @@ DoSaveOnShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); + d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); if(pThis->bRunsDA != 2) { - InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ + InitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ } + d_pthread_mutex_unlock(pThis->mut); /* make sure we do not timeout before we are done */ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); @@ -2321,9 +2328,7 @@ CODESTARTobjDestruct(qqueue) * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ -dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); CHKiRet(pThis->qUnDeqAll(pThis)); -dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); |