diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index bf2164a6..4f0d36b9 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -215,7 +215,7 @@ static inline void queueDrain(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { - pThis->qDel(pThis, &pUsr); + pThis->qDeq(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); } @@ -884,7 +884,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) CHKiRet(pThis->qAdd(pThis, pUsr)); if(pThis->qType != QUEUETYPE_DIRECT) { - ATOMIC_INC(pThis->iQueueSize); + ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } @@ -909,7 +909,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ iRet = pThis->qDeq(pThis, ppUsr); - ATOMIC_INC(pThis->nLogDeq); + ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); // DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", // getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -1227,6 +1227,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread } INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq); finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP @@ -1290,8 +1291,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) } /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - ATOMIC_SUB(pThis->iQueueSize, nElem); - ATOMIC_SUB(pThis->nLogDeq, nElem); + ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); + ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -2066,6 +2067,7 @@ CODESTARTobjDestruct(qqueue) pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq); /* type-specific destructor */ iRet = pThis->qDestruct(pThis); |