From a5cddbdbce76f14b4216aa74698bbc168ca5409f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 26 Oct 2009 20:24:28 +0100 Subject: shuffled cancelability state to different spot ... but in anticipation of changing cancel processing altogether... --- runtime/queue.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index d9942365..be169be2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1650,14 +1650,12 @@ RateLimiter(qqueue_t *pThis) static inline rsRetVal DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) { - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); dbgprintf("YYY: dequeue for consumer\n"); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -1665,7 +1663,6 @@ dbgprintf("YYY: dequeue for consumer\n"); finalize_it: - pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -1696,16 +1693,20 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti) static rsRetVal ConsumerReg(qqueue_t *pThis, wti_t *pWti) { + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit @@ -1740,16 +1741,20 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti) { int i; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { //for(i = 0 ; i < pWti->batch.nElem ; i++) { -- cgit