diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-26 20:24:28 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-26 20:24:28 +0100 |
commit | a5cddbdbce76f14b4216aa74698bbc168ca5409f (patch) | |
tree | f30b2ef19bacd41feb116591f67eeb2f7a67a8af /runtime/queue.c | |
parent | 620cacf47b11c0ce4898be6d0def5996a7ba3e9a (diff) | |
download | rsyslog-a5cddbdbce76f14b4216aa74698bbc168ca5409f.tar.gz rsyslog-a5cddbdbce76f14b4216aa74698bbc168ca5409f.tar.xz rsyslog-a5cddbdbce76f14b4216aa74698bbc168ca5409f.zip |
shuffled cancelability state to different spot
... but in anticipation of changing cancel processing altogether...
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index d9942365..be169be2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1650,14 +1650,12 @@ RateLimiter(qqueue_t *pThis) static inline rsRetVal DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) { - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); dbgprintf("YYY: dequeue for consumer\n"); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -1665,7 +1663,6 @@ dbgprintf("YYY: dequeue for consumer\n"); finalize_it: - pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -1696,16 +1693,20 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti) static rsRetVal ConsumerReg(qqueue_t *pThis, wti_t *pWti) { + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit @@ -1740,16 +1741,20 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti) { int i; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { //for(i = 0 ; i < pWti->batch.nElem ; i++) { |