summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-04-27 17:31:28 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-04-27 17:31:28 +0200
commitcbe2e3d44496ec7c6418e7e74ce917f2086a2947 (patch)
treec8a38b598ad1ea3bdb375674f27386d5fbc361e8
parentd19806431653e6575a002ab48206c16d3041e465 (diff)
downloadrsyslog-cbe2e3d44496ec7c6418e7e74ce917f2086a2947.tar.gz
rsyslog-cbe2e3d44496ec7c6418e7e74ce917f2086a2947.tar.xz
rsyslog-cbe2e3d44496ec7c6418e7e74ce917f2086a2947.zip
bugfix: problems with atomic operations emulation
replaced atomic operation emulation with new code. The previous code seemed to have some issue and also limited concurrency severely. The whole atomic operation emulation has been rewritten.
-rw-r--r--ChangeLog4
-rw-r--r--runtime/Makefile.am1
-rw-r--r--runtime/atomic-posix-sem.c70
-rw-r--r--runtime/atomic.h200
-rw-r--r--runtime/debug.c2
-rw-r--r--runtime/msg.c13
-rw-r--r--runtime/msg.h1
-rw-r--r--runtime/prop.c6
-rw-r--r--runtime/prop.h2
-rw-r--r--runtime/queue.c10
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.c12
-rw-r--r--runtime/wti.c11
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c18
-rw-r--r--runtime/wtp.h3
16 files changed, 129 insertions, 226 deletions
diff --git a/ChangeLog b/ChangeLog
index 80779457..f7b838d8 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,9 @@
---------------------------------------------------------------------------
Version 4.7.2 [v4-devel] (rgerhards), 2010-04-??
+- bugfix: problems with atomic operations emulaton
+ replaced atomic operation emulation with new code. The previous code
+ seemed to have some issue and also limited concurrency severely. The
+ whole atomic operation emulation has been rewritten.
- added new $Sleep directive to hold processing for a couple of seconds
during startup
---------------------------------------------------------------------------
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index ac006bca..c1a15198 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -9,7 +9,6 @@ librsyslog_la_SOURCES = \
rsyslog.h \
unicode-helper.h \
atomic.h \
- atomic-posix-sem.c \
syslogd-types.h \
module-template.h \
obj-types.h \
diff --git a/runtime/atomic-posix-sem.c b/runtime/atomic-posix-sem.c
deleted file mode 100644
index 979fae02..00000000
--- a/runtime/atomic-posix-sem.c
+++ /dev/null
@@ -1,70 +0,0 @@
-/* atomic_posix_sem.c: This file supplies an emulation for atomic operations using
- * POSIX semaphores.
- *
- * Copyright 2010 DResearch Digital Media Systems GmbH
- *
- * This file is part of the rsyslog runtime library.
- *
- * The rsyslog runtime library is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * The rsyslog runtime library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
- *
- * A copy of the GPL can be found in the file "COPYING" in this distribution.
- * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
- */
-
-#include "config.h"
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
-#include <semaphore.h>
-#include <errno.h>
-
-#include "atomic.h"
-#include "rsyslog.h"
-#include "srUtils.h"
-
-sem_t atomicSem;
-
-rsRetVal
-atomicSemInit(void)
-{
- DEFiRet;
-
- dbgprintf("init posix semaphore for atomics emulation\n");
- if(sem_init(&atomicSem, 0, 1) == -1)
- {
- char errStr[1024];
- rs_strerror_r(errno, errStr, sizeof(errStr));
- dbgprintf("init posix semaphore for atomics emulation failed: %s\n", errStr);
- iRet = RS_RET_SYS_ERR; /* the right error code ??? */
- }
-
- RETiRet;
-}
-
-void
-atomicSemExit(void)
-{
- dbgprintf("destroy posix semaphore for atomics emulation\n");
- if(sem_destroy(&atomicSem) == -1)
- {
- char errStr[1024];
- rs_strerror_r(errno, errStr, sizeof(errStr));
- dbgprintf("destroy posix semaphore for atomics emulation failed: %s\n", errStr);
- }
-}
-
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
-/* vim:set ai:
- */
diff --git a/runtime/atomic.h b/runtime/atomic.h
index 271e825e..fc3e0b2d 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -39,134 +39,29 @@
* They simply came in too late. -- rgerhards, 2008-04-02
*/
#ifdef HAVE_ATOMIC_BUILTINS
-# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1))
# define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1)
-# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
-# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
+# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1))
+# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
-# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0)
-# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1)
+# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
+# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal));
-#else
-#ifdef HAVE_SEMAPHORE_H
- /* we use POSIX semaphores instead */
-
-#include "rsyslog.h"
-#include <semaphore.h>
-
-extern sem_t atomicSem;
-rsRetVal atomicSemInit(void);
-void atomicSemExit(void);
-
-#if HAVE_TYPEOF
-#define my_typeof(x) typeof(x)
-#else /* sorry, can't determine types, using 'int' */
-#define my_typeof(x) int
-#endif
-
-# define ATOMIC_SUB(data, val) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data -= val; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_ADD(data, val) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data += val; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_INC_AND_FETCH(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data += 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_INC(data) ((void) ATOMIC_INC_AND_FETCH(data))
-
-# define ATOMIC_DEC_AND_FETCH(data) \
-({ \
- sem_wait(&atomicSem); \
- data -= 1; \
- sem_post(&atomicSem); \
- data; \
-})
+# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
-# define ATOMIC_DEC(data) ((void) ATOMIC_DEC_AND_FETCH(data))
+ /* functions below are not needed if we have atomics */
+# define DEF_ATOMIC_HELPER_MUT(x)
+# define INIT_ATOMIC_HELPER_MUT(x)
+# define DESTROY_ATOMIC_HELPER_MUT(x)
-# define ATOMIC_FETCH_32BIT(data) ((unsigned) ATOMIC_ADD((data), 0xffffffff))
-
-# define ATOMIC_STORE_1_TO_32BIT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_STORE_0_TO_INT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 0; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_STORE_1_TO_INT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_CAS(data, oldVal, newVal) \
-({ \
- int ret; \
- sem_wait(&atomicSem); \
- if(data != oldVal) ret = 0; \
- else \
- { \
- data = newVal; \
- ret = 1; \
- } \
- sem_post(&atomicSem); \
- ret; \
-})
-
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) \
-({ \
- sem_wait(&atomicSem); \
- if(data == oldVal) \
- { \
- data = newVal; \
- } \
- sem_post(&atomicSem); \
- data; \
-})
-
-#else /* not HAVE_SEMAPHORE_H */
+ /* the following operations should preferrably be done atomic, but it is
+ * not fatal if not -- that means we can live with some missed updates. So be
+ * sure to use these macros only if that really does not matter!
+ */
+# define PREFER_ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+#else
/* note that we gained parctical proof that theoretical problems DO occur
* if we do not properly address them. See this blog post for details:
* http://blog.gerhards.net/2009/01/rsyslog-data-race-analysis.html
@@ -174,16 +69,63 @@ void atomicSemExit(void);
* simply go ahead and do without them - use mutexes or other things. The
* code needs to be checked against all those cases. -- rgerhards, 2009-01-30
*/
+ #include <pthread.h>
+# define ATOMIC_INC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ ++(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(&hlpmut); \
+ *(data) = 0; \
+ pthread_mutex_unlock(&hlpmut); \
+ }
+
+# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(&hlpmut); \
+ *(data) = 1; \
+ pthread_mutex_unlock(&hlpmut); \
+ }
+
+ static inline int
+ ATOMIC_CAS_VAL(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ if(*data == oldVal) {
+ *data = newVal;
+ }
+ val = *data;
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+# define ATOMIC_DEC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ --(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+ static inline int
+ ATOMIC_DEC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ val = --(*data);
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+#if 0
# warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!"
-# define ATOMIC_INC(data) (++(data))
-# define ATOMIC_DEC(data) (--(data))
-# define ATOMIC_DEC_AND_FETCH(data) (--(data))
-# define ATOMIC_FETCH_32BIT(data) (data)
-# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1
-# define ATOMIC_STORE_1_TO_INT(data) (data) = 1
-# define ATOMIC_STORE_0_TO_INT(data) (data) = 0
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) (data) = (newVal)
+# define ATOMIC_INC_AND_FETCH(data) (++(data))
+# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del
+# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del
#endif
+# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x
+# define INIT_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+# define DESTROY_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+
+# define PREFER_ATOMIC_INC(data) ((void) ++data)
+
#endif
#endif /* #ifndef INCLUDED_ATOMIC_H */
diff --git a/runtime/debug.c b/runtime/debug.c
index da471609..0ada909b 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -1073,7 +1073,7 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int
}
/* when we reach this point, we have a fully-initialized FuncDB! */
- ATOMIC_INC(pFuncDB->nTimesCalled);
+ PREFER_ATOMIC_INC(pFuncDB->nTimesCalled);
if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot))
dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
if(pThrd->stackPtr >= (int) (sizeof(pThrd->callStack) / sizeof(dbgFuncDB_t*))) {
diff --git a/runtime/msg.c b/runtime/msg.c
index 6d7e6a89..57291def 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -748,7 +748,7 @@ BEGINobjDestruct(msg) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(msg)
/* DEV Debugging only ! dbgprintf("msgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pThis, pThis->iRefCount - 1); */
# ifdef HAVE_ATOMIC_BUILTINS
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, NULL);
# else
MsgLock(pThis);
currRefCount = --pThis->iRefCount;
@@ -800,9 +800,18 @@ CODESTARTobjDestruct(msg)
* that we trim too often when the counter wraps.
*/
static unsigned iTrimCtr = 1;
+# ifdef HAVE_ATOMICS
if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) {
malloc_trim(128*1024);
}
+# else
+static pthread_mutex_t mutTrimCtr = PTHREAD_MUTEX_INITIALIZER;
+ d_pthread_mutex_lock(&mutTrimCtr);
+ if(iTrimCtr++ % 100000 == 0) {
+ malloc_trim(128*1024);
+ }
+ d_pthread_mutex_unlock(&mutTrimCtr);
+# endif
}
# endif
} else {
@@ -1002,7 +1011,7 @@ msg_t *MsgAddRef(msg_t *pM)
{
assert(pM != NULL);
# ifdef HAVE_ATOMIC_BUILTINS
- ATOMIC_INC(pM->iRefCount);
+ ATOMIC_INC(&pM->iRefCount, NULL);
# else
MsgLock(pM);
pM->iRefCount++;
diff --git a/runtime/msg.h b/runtime/msg.h
index 0d3314b7..cda206fc 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -32,6 +32,7 @@
#include "obj.h"
#include "syslogd-types.h"
#include "template.h"
+#include "atomic.h"
/* rgerhards 2004-11-08: The following structure represents a
diff --git a/runtime/prop.c b/runtime/prop.c
index d188b2ed..7f2a56ff 100644
--- a/runtime/prop.c
+++ b/runtime/prop.c
@@ -53,6 +53,7 @@ DEFobjStaticHelpers
*/
BEGINobjConstruct(prop) /* be sure to specify the object type also in END macro! */
pThis->iRefCount = 1;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutRefCount);
ENDobjConstruct(prop)
@@ -60,11 +61,12 @@ ENDobjConstruct(prop)
BEGINobjDestruct(prop) /* be sure to specify the object type also in END and CODESTART macros! */
int currRefCount;
CODESTARTobjDestruct(prop)
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, &pThis->mutRefCount);
if(currRefCount == 0) {
/* (only) in this case we need to actually destruct the object */
if(pThis->len >= CONF_PROP_BUFSIZE)
free(pThis->szVal.psz);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutRefCount);
} else {
pThis = NULL; /* tell framework NOT to destructing the object! */
}
@@ -132,7 +134,7 @@ propConstructFinalize(prop_t __attribute__((unused)) *pThis)
*/
static rsRetVal AddRef(prop_t *pThis)
{
- ATOMIC_INC(pThis->iRefCount);
+ ATOMIC_INC(&pThis->iRefCount, &pThis->mutRefCount);
return RS_RET_OK;
}
diff --git a/runtime/prop.h b/runtime/prop.h
index e3519664..07b2ab7e 100644
--- a/runtime/prop.h
+++ b/runtime/prop.h
@@ -24,6 +24,7 @@
*/
#ifndef INCLUDED_PROP_H
#define INCLUDED_PROP_H
+#include "atomic.h"
/* the prop object */
struct prop_s {
@@ -34,6 +35,7 @@ struct prop_s {
uchar sz[CONF_PROP_BUFSIZE];
} szVal;
int len; /* we use int intentionally, otherwise we may get some troubles... */
+ DEF_ATOMIC_HELPER_MUT(mutRefCount);
};
/* interfaces */
diff --git a/runtime/queue.c b/runtime/queue.c
index 9d7a9058..bedefb77 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -110,7 +110,7 @@ static inline void queueDrain(qqueue_t *pThis)
ASSERT(pThis != NULL);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
+ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
pThis->qDel(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
@@ -1028,7 +1028,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1057,7 +1057,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
+ ATOMIC_DEC(&pThis->iQueueSize, &pThis->mutQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
@@ -1345,6 +1345,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
break;
}
+ INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
RETiRet;
@@ -2065,6 +2067,8 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
diff --git a/runtime/queue.h b/runtime/queue.h
index 1d82d8d9..aafdaa45 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -160,6 +160,7 @@ typedef struct queue_s {
strm_t *pRead; /* current file to be read */
} disk;
} tVars;
+ DEF_ATOMIC_HELPER_MUT(mutQueueSize);
} qqueue_t;
/* some symbolic constants for easier reference */
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 5750ca76..c209ae30 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -140,12 +140,6 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */
CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
- CHKiRet(atomicSemInit());
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
/* initialize core classes. We must be very careful with the order of events. Some
* classes use others and if we do not initialize them in the right order, we may end
* up with an invalid call. The most important thing that can happen is that an error
@@ -223,12 +217,6 @@ rsrtExit(void)
rulesetClassExit();
ruleClassExit();
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
- atomicSemExit();
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */
}
diff --git a/runtime/wti.c b/runtime/wti.c
index abdf4add..90bb14ed 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -146,7 +146,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
break;
}
/* apply the new state */
- unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
+ unsigned val = ATOMIC_CAS_VAL((int*)&pThis->tCurrCmd, tCurrCmd, tCmd, &pThis->mutCurrCmd);
if(val != tCurrCmd) {
DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
}
@@ -178,7 +178,7 @@ wtiCancelThrd(wti_t *pThis)
dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
pthread_cancel(pThis->thrdID);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pThis->pWtp, 1); /* indicate change, so harverster will be called */
}
d_pthread_mutex_unlock(&pThis->mut);
@@ -209,6 +209,7 @@ CODESTARTobjDestruct(wti)
/* actual destruction */
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -219,6 +220,7 @@ ENDobjDestruct(wti)
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
ENDobjConstruct(wti)
@@ -326,8 +328,7 @@ wtiWorkerCancelCleanup(void *arg)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pWtp->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -414,7 +415,7 @@ wtiWorker(wti_t *pThis)
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/wti.h b/runtime/wti.h
index 72653b15..d81672f3 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -39,6 +39,7 @@ typedef struct wti_s {
pthread_mutex_t mut;
bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutCurrCmd);
} wti_t;
/* some symbolic constants for easier reference */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 271c6f0d..fff37c2f 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -97,6 +97,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfOnWorkerCancel = NotImplementedDummy;
pThis->pfOnWorkerStartup = NotImplementedDummy;
pThis->pfOnWorkerShutdown = NotImplementedDummy;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
ENDobjConstruct(wtp)
@@ -153,6 +154,7 @@ CODESTARTobjDestruct(wtp)
pthread_cond_destroy(&pThis->condThrdTrm);
pthread_mutex_destroy(&pThis->mut);
pthread_mutex_destroy(&pThis->mutThrdShutdwn);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
@@ -186,6 +188,20 @@ wtpWakeupAllWrkr(wtp_t *pThis)
}
+/* set the bThrdStateChanged in an atomic way. Note that
+ * val may only be 0 or 1.
+ */
+void
+wtpSetThrdStateChanged(wtp_t *pThis, int val)
+{
+ if(val == 0) {
+ ATOMIC_STORE_0_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged);
+ } else {
+ ATOMIC_STORE_1_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged);
+ }
+}
+
+
/* check if we had any worker thread changes and, if so, act
* on them. At a minimum, terminated threads are harvested (joined).
* This function MUST NEVER block on the queue mutex!
@@ -216,7 +232,7 @@ wtpProcessThrdChanges(wtp_t *pThis)
*/
do {
/* reset the change marker */
- ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged);
+ wtpSetThrdStateChanged(pThis, 0);
/* go through all threads */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 1ce171cc..640c3320 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -26,6 +26,7 @@
#include <pthread.h>
#include "obj.h"
+#include "atomic.h"
/* commands and states for worker threads. */
typedef enum {
@@ -79,6 +80,7 @@ typedef struct wtp_s {
rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged);
} wtp_t;
/* some symbolic constants for easier reference */
@@ -100,6 +102,7 @@ rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp);
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex);
+void wtpSetThrdStateChanged(wtp_t *pThis, int val);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));