summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c44
1 files changed, 38 insertions, 6 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index cb14b58d..101052a1 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1188,7 +1188,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
if(iRetLocal == RS_RET_TIMED_OUT) {
DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
} else {
- DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n");
+ DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n");
}
}
@@ -1247,13 +1247,31 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
+ /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This
+ * is necessary because we may be in a situation where the DA queue regular worker and the
+ * main queue worker stopped rather quickly. In this case, there is almost no time (and
+ * probably no thread switch!) between the point where we instructed the main queue DA
+ * worker to shutdown and this code location. In consequence, it may not even have
+ * noticed that it should should down, less acutally done this. So we provide it with a
+ * fixed 100ms timeout to try complete its work, what usually should be sufficient.
+ * rgerhards, 2009-10-06
+ */
+ timeoutComp(&tTimeout, 100);
+ DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n");
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
+ "(this is not good, but probably OK)\n");
+ } else {
+ DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n");
+ }
}
RETiRet;
}
-/* This function cancels all remenaing regular workers for both the main and the DA
+/* This function cancels all remaining regular workers for both the main and the DA
* queue. The main queue's DA worker pool continues to run (if it exists and is active).
* rgerhards, 2009-05-29
*/
@@ -1651,7 +1669,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
// TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
- d_pthread_mutex_unlock(pThis->mut);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1758,9 +1775,7 @@ RateLimiter(qqueue_t *pThis)
}
-/* This dequeues the next batch and checks if the queue is empty. If it is
- * empty, return RS_RET_IDLE. That will trigger termination of the function
- * and tell the upper layer caller to initiate idle processing.
+/* This dequeues the next batch.
* rgerhards, 2009-05-20
*/
static inline rsRetVal
@@ -1771,11 +1786,13 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
+dbgprintf("YYY: deqeueu for consumer");
CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
ABORT_FINALIZE(RS_RET_IDLE);
+
finalize_it:
RETiRet;
}
@@ -1814,6 +1831,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1826,6 +1847,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
@@ -1851,6 +1875,10 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti));
+
+ /* we now have a non-idle batch of work, so we can release the queue mutex and process it */
+ d_pthread_mutex_unlock(pThis->mut);
+
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
@@ -1860,6 +1888,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
}
+ /* now we are done, but need to re-aquire the mutex */
+ d_pthread_mutex_lock(pThis->mut);
+
finalize_it:
DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
@@ -2513,6 +2544,7 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
+dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);