summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c51
1 files changed, 41 insertions, 10 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 3118bb19..0019297b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -469,7 +469,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
@@ -1257,7 +1257,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
/* we need to re-aquire the mutex for the next check in this case! */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
}
- if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
+ if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n");
@@ -1548,18 +1548,19 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
{
int nDequeued;
int nDiscarded;
+ int nDeleted;
int iQueueSize;
void *pUsr;
rsRetVal localRet;
DEFiRet;
+ nDeleted = pWti->batch.nElemDeq;
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
- do {
+ while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
- iQueueSize = getLogicalQueueSize(pThis);
/* check if we should discard this element */
localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
@@ -1574,9 +1575,10 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
++nDequeued;
- } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+ }
- qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
+ /* it is sufficient to persist only when the bulk of work is done */
+ qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
pWti->batch.nElem = nDequeued;
pWti->batch.nElemDeq = nDequeued + nDiscarded;
@@ -1618,6 +1620,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
+ // TODO: MULTI: check physical queue size!
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1727,6 +1730,27 @@ RateLimiter(qqueue_t *pThis)
}
+/* This dequeues the next batch and checks if the queue is empty. If it is
+ * empty, return RS_RET_IDLE. That will trigger termination of the function
+ * and tell the upper layer caller to initiate idle processing.
+ * rgerhards, 2009-05-20
+ */
+static inline rsRetVal
+DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+
+ if(pWti->batch.nElem == 0)
+ ABORT_FINALIZE(RS_RET_IDLE);
+
+finalize_it:
+ RETiRet;
+}
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
@@ -1740,7 +1764,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1754,6 +1778,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
}
finalize_it:
+dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1776,7 +1801,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
@@ -1855,6 +1880,8 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
RETiRet;
}
+
+/* common function for the idle functions that deletes the last batch if TODO MULTI */
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1990,7 +2017,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
@@ -2133,7 +2160,10 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- assert(nUpdates > 0);
+ assert(nUpdates >= 0);
+
+ if(nUpdates == 0)
+ FINALIZE;
pThis->iUpdsSincePersist += nUpdates;
if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
@@ -2141,6 +2171,7 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
pThis->iUpdsSincePersist = 0;
}
+finalize_it:
RETiRet;
}