diff options
-rw-r--r-- | runtime/batch.h | 6 | ||||
-rw-r--r-- | runtime/queue.c | 22 | ||||
-rw-r--r-- | runtime/queue.h | 2 |
3 files changed, 19 insertions, 11 deletions
diff --git a/runtime/batch.h b/runtime/batch.h index eb266b3f..031718a7 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -55,9 +55,15 @@ struct batch_obj_s { * object. We stick to the more generic term because queues may potentially hold * other types of objects, too. * rgerhards, 2009-05-12 + * Note that nElem is not necessarily equal to nElemDeq. This is the case when we + * discard some elements (because of configuration) during dequeue processing. As + * all Elements are only deleted when the batch is processed, we can not immediately + * delete them. So we need to keep their number that we can delete them when the batch + * is completed (else, the whole process does not work correctly). */ struct batch_s { int nElem; /* actual number of element in this entry */ + int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */ int iDoneUpTo; /* all messages below this index have state other than RDY */ qDeqID deqID; /* ID of dequeue operation that generated this batch */ batch_obj_t *pElem; /* batch elements */ diff --git a/runtime/queue.c b/runtime/queue.c index 0f87b235..672ba9f5 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -131,7 +131,7 @@ static inline rsRetVal tdlPop(qqueue_t *pQueue) * structure, populates it with the values provided and links the new * element into the correct place inside the list. */ -static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElem) +static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElemDeq) { toDeleteLst_t *pNew; toDeleteLst_t *pPrev; @@ -142,7 +142,7 @@ static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElem) CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t))); pNew->deqID = deqID; - pNew->nElem = nElem; + pNew->nElemDeq = nElemDeq; /* now find right spot */ for( pPrev = pQueue->toDeleteLst @@ -1483,19 +1483,19 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) pTdl = tdlPeek(pThis); if(pTdl == NULL) { - DoDeleteBatchFromQStore(pThis, pBatch->nElem); + DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); while(pTdl != NULL && deqIDDel == pTdl->deqID) { - DoDeleteBatchFromQStore(pThis, pTdl->nElem); + DoDeleteBatchFromQStore(pThis, pTdl->nElemDeq); tdlPop(pThis); ++deqIDDel; pTdl = tdlPeek(pThis); } } else { /* can not delete, insert into to-delete list */ - CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq)); } finalize_it: @@ -1547,6 +1547,7 @@ static inline rsRetVal DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSize) { int nDequeued; + int nDiscarded; int iQueueSize; void *pUsr; rsRetVal localRet; @@ -1554,7 +1555,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz DeleteProcessedBatch(pThis, &pWti->batch); - nDequeued = 0; + nDequeued = nDiscarded = 0; do { dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); @@ -1562,12 +1563,12 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); /* check if we should discard this element */ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); - //MULTI-DEQUEUE / ULTRA-RELIABLE: we need to handle this case, we need to advance the - // DEQ pointer (or so...) TODO!!! Idea: get a second nElem int in pBatch, nDequeued. Use that when deleting! - if(localRet == RS_RET_QUEUE_FULL) + if(localRet == RS_RET_QUEUE_FULL) { + ++nDiscarded; continue; - else if(localRet != RS_RET_OK) + } else if(localRet != RS_RET_OK) { ABORT_FINALIZE(localRet); + } /* all well, use this element */ pWti->batch.pElem[nDequeued].pUsrp = pUsr; @@ -1578,6 +1579,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ pWti->batch.nElem = nDequeued; + pWti->batch.nElemDeq = nDequeued + nDiscarded; pWti->batch.deqID = getNextDeqID(pThis); *piRemainingQueueSize = iQueueSize; diff --git a/runtime/queue.h b/runtime/queue.h index 92bf8ae5..954a7fd4 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -34,7 +34,7 @@ typedef struct toDeleteLst_s toDeleteLst_t; struct toDeleteLst_s { qDeqID deqID; - int nElem; + int nElemDeq; /* numbe of elements that were dequeued and as such must now be discarded */ struct toDeleteLst_s *pNext; }; |