summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c12
1 files changed, 7 insertions, 5 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index bf2164a6..4f0d36b9 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -215,7 +215,7 @@ static inline void queueDrain(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
- pThis->qDel(pThis, &pUsr);
+ pThis->qDeq(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
}
@@ -884,7 +884,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
@@ -909,7 +909,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
iRet = pThis->qDeq(pThis, ppUsr);
- ATOMIC_INC(pThis->nLogDeq);
+ ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
@@ -1227,6 +1227,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
}
INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
@@ -1290,8 +1291,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- ATOMIC_SUB(pThis->iQueueSize, nElem);
- ATOMIC_SUB(pThis->nLogDeq, nElem);
+ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
+ ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
@@ -2066,6 +2067,7 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);