summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-20 12:53:00 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-20 12:53:00 +0200
commit1c580743390c704bcfb44148f4a70254c2c247da (patch)
tree885bfb41691bf7ee57d05d4ee0c0fa87742b156d
parentbf8125f4e96a011ec28cc58b225bb815f72fc53c (diff)
downloadrsyslog-1c580743390c704bcfb44148f4a70254c2c247da.tar.gz
rsyslog-1c580743390c704bcfb44148f4a70254c2c247da.tar.xz
rsyslog-1c580743390c704bcfb44148f4a70254c2c247da.zip
solved potential race condition and some cleanup
code review brought up some few places where we may have run into a race. They have most probably been introduced during the recent set of changes. But I do not look at older versions because of the changed architecture, one can not simply backport this patch.
-rw-r--r--runtime/queue.c17
-rw-r--r--runtime/wti.c2
2 files changed, 13 insertions, 6 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 7590af18..50bfaca9 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -264,6 +264,7 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
* call this function. As such,it may validly be that DA is already shut down.
* So we just check if we are in init phase and then wait for full startup.
* If in non-DA mode, we silently return.
+ * IMPORTANT: the QUEUE MUTEX MUST BE LOOKED WHEN this funnction is called!
* rgerhards, 2008-02-27
*/
static rsRetVal
@@ -1167,12 +1168,12 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
}
}
- d_pthread_mutex_unlock(pThis->mut);
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
if(pThis->bRunsDA) {
qqueueWaitDAModeInitialized(pThis);
}
+ d_pthread_mutex_unlock(pThis->mut);
/* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
* out there are no active workers - that doesn't matter: the wtp knows about that and so will
@@ -1200,7 +1201,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
}
/* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
+ d_pthread_mutex_lock(pThis->mut);
if(pThis->bRunsDA) {
+ d_pthread_mutex_unlock(pThis->mut);
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
qqueueGetID(pThis->pqDA));
/* we use the same absolute timeout as above, so we do not use more than the configured
@@ -1216,13 +1219,15 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
/* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
* the queue, it is restarted at a later stage. We don't care here if a timeout happens.
*/
- dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n");
+ dbgoprint((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
} else {
dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n");
}
+ } else {
+ d_pthread_mutex_unlock(pThis->mut);
}
RETiRet;
@@ -1247,7 +1252,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
- /* note that we modify bEnqOnly direclty, because going through the method would
+ /* note that we modify bEnqOnly directly, because going through the method would
* startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
*/
pThis->bEnqOnly = 1;
@@ -2281,12 +2286,14 @@ DoSaveOnShutdown(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
+ d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
if(pThis->bRunsDA != 2) {
- InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
+ InitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
}
+ d_pthread_mutex_unlock(pThis->mut);
/* make sure we do not timeout before we are done */
dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
@@ -2321,9 +2328,7 @@ CODESTARTobjDestruct(qqueue)
* we need to reset the logical dequeue pointer, persist the queue if configured to do
* so and then destruct everything. -- rgerhards, 2009-05-26
*/
-dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
CHKiRet(pThis->qUnDeqAll(pThis));
-dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));
diff --git a/runtime/wti.c b/runtime/wti.c
index 900e1cba..c295ccc9 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -289,10 +289,12 @@ wtiWorker(wti_t *pThis)
}
/* indicate termination */
+ d_pthread_mutex_lock(pWtp->pmutUsr);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_cleanup_pop(0); /* remove cleanup handler */
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
pthread_setcancelstate(iCancelStateSave, NULL);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
RETiRet;
}