summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c22
1 files changed, 18 insertions, 4 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 96ebd6d5..101052a1 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1669,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
// TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
- d_pthread_mutex_unlock(pThis->mut);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1776,9 +1775,7 @@ RateLimiter(qqueue_t *pThis)
}
-/* This dequeues the next batch and checks if the queue is empty. If it is
- * empty, return RS_RET_IDLE. That will trigger termination of the function
- * and tell the upper layer caller to initiate idle processing.
+/* This dequeues the next batch.
* rgerhards, 2009-05-20
*/
static inline rsRetVal
@@ -1789,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
+dbgprintf("YYY: deqeueu for consumer");
CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
ABORT_FINALIZE(RS_RET_IDLE);
+
finalize_it:
RETiRet;
}
@@ -1832,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1844,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
@@ -1869,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
@@ -1878,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
}
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
@@ -2531,6 +2544,7 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
+dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);