summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c49
1 files changed, 30 insertions, 19 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index c2df928b..c3a8e9d4 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -65,7 +65,7 @@ DEFobjStaticHelpers
DEFobjCurrIf(glbl)
/* forward-definitions */
-rsRetVal qqueueChkPersist(qqueue_t *pThis);
+static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -896,8 +896,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
- aUsrp_t aUsrp;
- obj_t *pMsgp;
+ batch_t singleBatch;
+ batch_obj_t batchObj;
DEFiRet;
ASSERT(pThis != NULL);
@@ -907,17 +907,19 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
- * We use our knowledge about the aUsrp_t structure below, but without that, we
+ * We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- pMsgp = (obj_t*) pUsr;
- aUsrp.nElem = 1; /* there always is only one in direct mode */
- aUsrp.pUsrp = &pMsgp;
- iRet = pThis->pConsumer(pThis->pUsr, &aUsrp);
+ batchObj.state = BATCH_STATE_RDY;
+ batchObj.pUsrp = (obj_t*) pUsr;
+ singleBatch.nElem = 1; /* there always is only one in direct mode */
+ singleBatch.pElem = &batchObj;
+ iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
RETiRet;
}
+
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
{
return RS_RET_OK;
@@ -1247,7 +1249,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1402,11 +1404,12 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize
rsRetVal localRet;
DEFiRet;
+ /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */
+
nDequeued = 0;
do {
dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDel(pThis, &pUsr));
- qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */
iQueueSize = qqueueGetOverallQueueSize(pThis);
/* check if we should discard this element */
@@ -1417,11 +1420,15 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
ABORT_FINALIZE(localRet);
/* all well, use this element */
- pWti->paUsrp->pUsrp[nDequeued++] = pUsr;
+ pWti->batch.pElem[nDequeued].pUsrp = pUsr;
+ pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ ++nDequeued;
} while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+ qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
+
//bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
- pWti->paUsrp->nElem = nDequeued;
+ pWti->batch.nElem = nDequeued;
*iRemainingQueueSize = iQueueSize;
finalize_it:
@@ -1582,7 +1589,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1619,8 +1626,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
/* iterate over returned results and enqueue them in DA queue */
- for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
+ for(i = 0 ; i < pWti->batch.nElem ; i++)
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1974,17 +1981,21 @@ finalize_it:
/* check if we need to persist the current queue info. If an
- * error occurs, thus should be ignored by caller (but we still
+ * error occurs, this should be ignored by caller (but we still
* abide to our regular call interface)...
* rgerhards, 2008-01-13
+ * nUpdates is the number of updates since the last call to this function.
+ * It may be > 1 due to batches. -- rgerhards, 2009-05-12
*/
-rsRetVal qqueueChkPersist(qqueue_t *pThis)
+static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(nUpdates > 0);
- if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
+ pThis->iUpdsSincePersist += nUpdates;
+ if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
qqueuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
@@ -2199,7 +2210,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* and finally enqueue the message */
CHKiRet(qqueueAdd(pThis, pUsr));
- qqueueChkPersist(pThis);
+ qqueueChkPersist(pThis, 1);
finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {