diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 40 |
1 files changed, 37 insertions, 3 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index c3a8e9d4..6bea338a 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1390,13 +1390,47 @@ finalize_it: } +/* Delete a batch of processed user objects from the queue, which includes + * destructing the objects themself. The pointer piRemainingQueu + * rgerhards, 2009-05-13 + */ +static inline rsRetVal +DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) +{ + int i; + void *pUsr; + DEFiRet; + + /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pBatch != NULL); + + /* if the queue runs in DA mode, the DA worker already deleted the message. But + * in regular mode, we need to do it ourselfs. We differentiate between the two cases, + * because it is actually the easiest way to handle the destruct-Problem in a simple + * and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). + */ + if(!pThis->bRunsDA) { + for(i = 0 ; i < pBatch->nElem ; ++i) { + /* TODO: pull msgs off the queue (not yet necessary) */ + pUsr = pBatch->pElem[i].pUsrp; + objDestruct(pUsr); + } + } + pBatch->nElem = 0; + + RETiRet; +} + + /* dequeue as many user points as are available, until we hit the configured * upper limit of pointers. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. */ static inline rsRetVal -DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize) +DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSize) { int nDequeued; int iQueueSize; @@ -1405,6 +1439,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize DEFiRet; /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + DeleteProcessedBatch(pThis, &pWti->batch); nDequeued = 0; do { @@ -1427,9 +1462,8 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ - //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ pWti->batch.nElem = nDequeued; - *iRemainingQueueSize = iQueueSize; + *piRemainingQueueSize = iQueueSize; finalize_it: RETiRet; |