summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c13
1 files changed, 4 insertions, 9 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 5102b0df..7438fbaa 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -212,7 +212,7 @@ static inline void queueDrain(qqueue_t *pThis)
BEGINfunc
dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(pThis->iQueueSize-- > 0) {
+ while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
pThis->qDeq(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
@@ -1547,15 +1547,14 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
ISOBJ_TYPE_assert(pThis, qqueue);
-//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem);
/* now send delete request to storage driver */
for(i = 0 ; i < nElem ; ++i) {
pThis->qDel(pThis);
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- pThis->iQueueSize -= nElem;
- pThis->nLogDeq -= nElem;
+ ATOMIC_SUB(pThis->iQueueSize, nElem);
+ ATOMIC_SUB(pThis->nLogDeq, nElem);
dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
@@ -1649,13 +1648,9 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
nDeleted = pWti->batch.nElemDeq;
DeleteProcessedBatch(pThis, &pWti->batch);
-//int iii = pthread_mutex_trylock(pThis->mut);
-//char errStr[1024];
-//rs_strerror_r(iii, errStr, sizeof(errStr));
-//dbgprintf("DequeueConsumableElemnts mutex locked: %d (16 is EBUSY = OK): %s\n", iii, errStr);
nDequeued = nDiscarded = 0;
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
-//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
/* check if we should discard this element */