summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-04-27 18:26:09 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-04-27 18:26:09 +0200
commitdd76d96d676f305aa2d29131321fe5cac5a676c4 (patch)
treef71b499444c137ea1c9dfccc0dda9c1461e9040d /runtime/queue.c
parent4a5a3196fbe4e5a4e9f8dea49f916462adbf3098 (diff)
downloadrsyslog-dd76d96d676f305aa2d29131321fe5cac5a676c4.tar.gz
rsyslog-dd76d96d676f305aa2d29131321fe5cac5a676c4.tar.xz
rsyslog-dd76d96d676f305aa2d29131321fe5cac5a676c4.zip
adapted new atomic instruction emulation to v5 engine
code did not compile after merge from v4
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c12
1 files changed, 7 insertions, 5 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index bf2164a6..4f0d36b9 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -215,7 +215,7 @@ static inline void queueDrain(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
- pThis->qDel(pThis, &pUsr);
+ pThis->qDeq(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
}
@@ -884,7 +884,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
@@ -909,7 +909,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
iRet = pThis->qDeq(pThis, ppUsr);
- ATOMIC_INC(pThis->nLogDeq);
+ ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
@@ -1227,6 +1227,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
}
INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
@@ -1290,8 +1291,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- ATOMIC_SUB(pThis->iQueueSize, nElem);
- ATOMIC_SUB(pThis->nLogDeq, nElem);
+ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
+ ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
@@ -2066,6 +2067,7 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);