diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-25 15:21:29 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-25 15:21:29 +0200 |
commit | 4818b0081d3a265a87f9f646d79f2a2ffbcda819 (patch) | |
tree | a9edb0a994fec151300e9c11faf71356ddb7e3c7 | |
parent | d116f30a877c65b4f23dbb92601251402b0f957e (diff) | |
download | rsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.tar.gz rsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.tar.xz rsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.zip |
bugfix: subtle synchronization issue
This may have caused a segfault under strange circumstances (but if
we just run long enough with a high enough message volume, even the
strangest circumstances will occur...)
-rw-r--r-- | runtime/atomic.h | 3 | ||||
-rw-r--r-- | runtime/msg.c | 8 | ||||
-rw-r--r-- | runtime/queue.c | 13 |
3 files changed, 12 insertions, 12 deletions
diff --git a/runtime/atomic.h b/runtime/atomic.h index f5ae9357..f0733f09 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -41,7 +41,10 @@ * They simply came in too late. -- rgerhards, 2008-04-02 */ #ifdef HAVE_ATOMIC_BUILTINS +# define ATOMIC_SUB(data, val) __sync_fetch_and_sub(&(data), val) +# define ATOMIC_ADD(data, val) __sync_fetch_and_add(&(data), val) # define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1)) +# define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1) # define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1)) # define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1) # define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff)) diff --git a/runtime/msg.c b/runtime/msg.c index 8aab5317..a1a4714f 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -607,10 +607,12 @@ CODESTARTobjDestruct(msg) * operations on the counter. --- rgerhards, 2009-06-22. */ # if HAVE_MALLOC_TRIM - { /* standard C requires a new block for a new variable definition! */ + { /* standard C requires a new block for a new variable definition! + * To simplify matters, we use modulo arithmetic and live with the fact + * that we trim too often when the counter wraps. + */ static unsigned iTrimCtr = 1; - if(iTrimCtr ++ % 100000 == 0) { - iTrimCtr = 1; + if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) { malloc_trim(128*1024); } } diff --git a/runtime/queue.c b/runtime/queue.c index 5102b0df..7438fbaa 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -212,7 +212,7 @@ static inline void queueDrain(qqueue_t *pThis) BEGINfunc dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - while(pThis->iQueueSize-- > 0) { + while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { pThis->qDeq(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); @@ -1547,15 +1547,14 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) ISOBJ_TYPE_assert(pThis, qqueue); -//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); /* now send delete request to storage driver */ for(i = 0 ; i < nElem ; ++i) { pThis->qDel(pThis); } /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - pThis->iQueueSize -= nElem; - pThis->nLogDeq -= nElem; + ATOMIC_SUB(pThis->iQueueSize, nElem); + ATOMIC_SUB(pThis->nLogDeq, nElem); dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -1649,13 +1648,9 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDeleted = pWti->batch.nElemDeq; DeleteProcessedBatch(pThis, &pWti->batch); -//int iii = pthread_mutex_trylock(pThis->mut); -//char errStr[1024]; -//rs_strerror_r(iii, errStr, sizeof(errStr)); -//dbgprintf("DequeueConsumableElemnts mutex locked: %d (16 is EBUSY = OK): %s\n", iii, errStr); nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); +dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ |