From eb4b1915d1655d801e0232f4196fbdc1af3c857f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 26 Feb 2008 17:49:26 +0000 Subject: worked on queue stability --- queue.c | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 18ea416c..70ee6f1f 100644 --- a/queue.c +++ b/queue.c @@ -1152,10 +1152,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } - if(pThis->bIsDA) { - /* we need to re-aquire the mutex for the next check in this case! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - } + /* we need to re-aquire the mutex for the next check in this case! */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ } if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { /* and now the same for the DA queue */ @@ -1169,6 +1167,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } } else { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1316,6 +1316,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) ISOBJ_TYPE_assert(pThis, queue); +RUNLOG_VAR("%p", pUsr); if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); @@ -1560,11 +1561,15 @@ queueRegOnWrkrShutdown(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); if(pThis->pqParent != NULL) { - ASSERT(pThis->pqParent->pWtpDA != NULL); - pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ - wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ +RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); +if(pThis->pqParent->pWtpDA == NULL) + FINALIZE; + ASSERT(pThis->pqParent->pWtpDA != NULL); + pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ + wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ } +finalize_it: RETiRet; } -- cgit