summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-06-25 15:21:29 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-06-25 15:21:29 +0200
commit4818b0081d3a265a87f9f646d79f2a2ffbcda819 (patch)
treea9edb0a994fec151300e9c11faf71356ddb7e3c7
parentd116f30a877c65b4f23dbb92601251402b0f957e (diff)
downloadrsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.tar.gz
rsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.tar.xz
rsyslog-4818b0081d3a265a87f9f646d79f2a2ffbcda819.zip
bugfix: subtle synchronization issue
This may have caused a segfault under strange circumstances (but if we just run long enough with a high enough message volume, even the strangest circumstances will occur...)
-rw-r--r--runtime/atomic.h3
-rw-r--r--runtime/msg.c8
-rw-r--r--runtime/queue.c13
3 files changed, 12 insertions, 12 deletions
diff --git a/runtime/atomic.h b/runtime/atomic.h
index f5ae9357..f0733f09 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -41,7 +41,10 @@
* They simply came in too late. -- rgerhards, 2008-04-02
*/
#ifdef HAVE_ATOMIC_BUILTINS
+# define ATOMIC_SUB(data, val) __sync_fetch_and_sub(&(data), val)
+# define ATOMIC_ADD(data, val) __sync_fetch_and_add(&(data), val)
# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+# define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1)
# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
diff --git a/runtime/msg.c b/runtime/msg.c
index 8aab5317..a1a4714f 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -607,10 +607,12 @@ CODESTARTobjDestruct(msg)
* operations on the counter. --- rgerhards, 2009-06-22.
*/
# if HAVE_MALLOC_TRIM
- { /* standard C requires a new block for a new variable definition! */
+ { /* standard C requires a new block for a new variable definition!
+ * To simplify matters, we use modulo arithmetic and live with the fact
+ * that we trim too often when the counter wraps.
+ */
static unsigned iTrimCtr = 1;
- if(iTrimCtr ++ % 100000 == 0) {
- iTrimCtr = 1;
+ if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) {
malloc_trim(128*1024);
}
}
diff --git a/runtime/queue.c b/runtime/queue.c
index 5102b0df..7438fbaa 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -212,7 +212,7 @@ static inline void queueDrain(qqueue_t *pThis)
BEGINfunc
dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(pThis->iQueueSize-- > 0) {
+ while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
pThis->qDeq(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
@@ -1547,15 +1547,14 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
ISOBJ_TYPE_assert(pThis, qqueue);
-//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem);
/* now send delete request to storage driver */
for(i = 0 ; i < nElem ; ++i) {
pThis->qDel(pThis);
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- pThis->iQueueSize -= nElem;
- pThis->nLogDeq -= nElem;
+ ATOMIC_SUB(pThis->iQueueSize, nElem);
+ ATOMIC_SUB(pThis->nLogDeq, nElem);
dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
@@ -1649,13 +1648,9 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
nDeleted = pWti->batch.nElemDeq;
DeleteProcessedBatch(pThis, &pWti->batch);
-//int iii = pthread_mutex_trylock(pThis->mut);
-//char errStr[1024];
-//rs_strerror_r(iii, errStr, sizeof(errStr));
-//dbgprintf("DequeueConsumableElemnts mutex locked: %d (16 is EBUSY = OK): %s\n", iii, errStr);
nDequeued = nDiscarded = 0;
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
-//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
/* check if we should discard this element */