summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c19
1 files changed, 12 insertions, 7 deletions
diff --git a/queue.c b/queue.c
index 18ea416c..70ee6f1f 100644
--- a/queue.c
+++ b/queue.c
@@ -1152,10 +1152,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
- if(pThis->bIsDA) {
- /* we need to re-aquire the mutex for the next check in this case! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- }
+ /* we need to re-aquire the mutex for the next check in this case! */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
}
if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
@@ -1169,6 +1167,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
} else {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1316,6 +1316,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, queue);
+RUNLOG_VAR("%p", pUsr);
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
@@ -1560,11 +1561,15 @@ queueRegOnWrkrShutdown(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
if(pThis->pqParent != NULL) {
- ASSERT(pThis->pqParent->pWtpDA != NULL);
- pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
- wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
+RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
+if(pThis->pqParent->pWtpDA == NULL)
+ FINALIZE;
+ ASSERT(pThis->pqParent->pWtpDA != NULL);
+ pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
+ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
}
+finalize_it:
RETiRet;
}