diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-25 15:21:29 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-25 15:21:29 +0200 |
commit | 4818b0081d3a265a87f9f646d79f2a2ffbcda819 (patch) | |
tree | a9edb0a994fec151300e9c11faf71356ddb7e3c7 /runtime/queue.c | |
parent | d116f30a877c65b4f23dbb92601251402b0f957e (diff) | |
download | rsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.tar.gz rsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.tar.xz rsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.zip |
bugfix: subtle synchronization issue
This may have caused a segfault under strange circumstances (but if
we just run long enough with a high enough message volume, even the
strangest circumstances will occur...)
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 13 |
1 files changed, 4 insertions, 9 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 5102b0df..7438fbaa 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -212,7 +212,7 @@ static inline void queueDrain(qqueue_t *pThis) BEGINfunc dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - while(pThis->iQueueSize-- > 0) { + while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { pThis->qDeq(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); @@ -1547,15 +1547,14 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) ISOBJ_TYPE_assert(pThis, qqueue); -//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); /* now send delete request to storage driver */ for(i = 0 ; i < nElem ; ++i) { pThis->qDel(pThis); } /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - pThis->iQueueSize -= nElem; - pThis->nLogDeq -= nElem; + ATOMIC_SUB(pThis->iQueueSize, nElem); + ATOMIC_SUB(pThis->nLogDeq, nElem); dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -1649,13 +1648,9 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDeleted = pWti->batch.nElemDeq; DeleteProcessedBatch(pThis, &pWti->batch); -//int iii = pthread_mutex_trylock(pThis->mut); -//char errStr[1024]; -//rs_strerror_r(iii, errStr, sizeof(errStr)); -//dbgprintf("DequeueConsumableElemnts mutex locked: %d (16 is EBUSY = OK): %s\n", iii, errStr); nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); +dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ |