diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 96ebd6d5..101052a1 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1669,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); - d_pthread_mutex_unlock(pThis->mut); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1776,9 +1775,7 @@ RateLimiter(qqueue_t *pThis) } -/* This dequeues the next batch and checks if the queue is empty. If it is - * empty, return RS_RET_IDLE. That will trigger termination of the function - * and tell the upper layer caller to initiate idle processing. +/* This dequeues the next batch. * rgerhards, 2009-05-20 */ static inline rsRetVal @@ -1789,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); +dbgprintf("YYY: deqeueu for consumer"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) ABORT_FINALIZE(RS_RET_IDLE); + finalize_it: RETiRet; } @@ -1832,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit @@ -1844,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; @@ -1869,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueForConsumer(pThis, pWti)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs @@ -1878,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); } + /* now we are done, but need to re-aquire the mutex */ + d_pthread_mutex_lock(pThis->mut); + finalize_it: DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; @@ -2531,6 +2544,7 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); +dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); |