diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 49 |
1 files changed, 30 insertions, 19 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index c2df928b..c3a8e9d4 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -65,7 +65,7 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) /* forward-definitions */ -rsRetVal qqueueChkPersist(qqueue_t *pThis); +static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -896,8 +896,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { - aUsrp_t aUsrp; - obj_t *pMsgp; + batch_t singleBatch; + batch_obj_t batchObj; DEFiRet; ASSERT(pThis != NULL); @@ -907,17 +907,19 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * mode the consumer probably has a lot to convey (which get's lost in the other modes * because they are asynchronous. But direct mode is deliberately synchronous. * rgerhards, 2008-02-12 - * We use our knowledge about the aUsrp_t structure below, but without that, we + * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - pMsgp = (obj_t*) pUsr; - aUsrp.nElem = 1; /* there always is only one in direct mode */ - aUsrp.pUsrp = &pMsgp; - iRet = pThis->pConsumer(pThis->pUsr, &aUsrp); + batchObj.state = BATCH_STATE_RDY; + batchObj.pUsrp = (obj_t*) pUsr; + singleBatch.nElem = 1; /* there always is only one in direct mode */ + singleBatch.pElem = &batchObj; + iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); RETiRet; } + static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) { return RS_RET_OK; @@ -1247,7 +1249,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*)) { DEFiRet; qqueue_t *pThis; @@ -1402,11 +1404,12 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize rsRetVal localRet; DEFiRet; + /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + nDequeued = 0; do { dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDel(pThis, &pUsr)); - qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */ iQueueSize = qqueueGetOverallQueueSize(pThis); /* check if we should discard this element */ @@ -1417,11 +1420,15 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); ABORT_FINALIZE(localRet); /* all well, use this element */ - pWti->paUsrp->pUsrp[nDequeued++] = pUsr; + pWti->batch.pElem[nDequeued].pUsrp = pUsr; + pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + ++nDequeued; } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); + qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ + //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ - pWti->paUsrp->nElem = nDequeued; + pWti->batch.nElem = nDequeued; *iRemainingQueueSize = iQueueSize; finalize_it: @@ -1582,7 +1589,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp)); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1619,8 +1626,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->paUsrp->nElem ; i++) - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); + for(i = 0 ; i < pWti->batch.nElem ; i++) + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp)); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1974,17 +1981,21 @@ finalize_it: /* check if we need to persist the current queue info. If an - * error occurs, thus should be ignored by caller (but we still + * error occurs, this should be ignored by caller (but we still * abide to our regular call interface)... * rgerhards, 2008-01-13 + * nUpdates is the number of updates since the last call to this function. + * It may be > 1 due to batches. -- rgerhards, 2009-05-12 */ -rsRetVal qqueueChkPersist(qqueue_t *pThis) +static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); + assert(nUpdates > 0); - if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { + pThis->iUpdsSincePersist += nUpdates; + if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { qqueuePersist(pThis, QUEUE_CHECKPOINT); pThis->iUpdsSincePersist = 0; } @@ -2199,7 +2210,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* and finally enqueue the message */ CHKiRet(qqueueAdd(pThis, pUsr)); - qqueueChkPersist(pThis); + qqueueChkPersist(pThis, 1); finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { |