summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog1
-rw-r--r--runtime/atomic.h5
-rw-r--r--runtime/wti.c26
-rw-r--r--runtime/wtp.c3
4 files changed, 25 insertions, 10 deletions
diff --git a/ChangeLog b/ChangeLog
index 6cbfcf70..e7de1c4d 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -23,6 +23,7 @@ Version 4.5.0 [DEVEL] (rgerhards), 2009-??-??
* $OMFileFlushOnTXEnd
* $MainMsgQueueSyncQueueFiles
* $ActionQueueSyncQueueFiles
+- done some memory accesses explicitely atomic
---------------------------------------------------------------------------
Version 4.3.2 [beta] (rgerhards), 2009-06-24
- removed long-obsoleted property UxTradMsg
diff --git a/runtime/atomic.h b/runtime/atomic.h
index fdf64214..f5ae9357 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -46,6 +46,11 @@
# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
+# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0)
+# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1)
+# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
+# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
+# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal));
#else
/* note that we gained parctical proof that theoretical problems DO occur
* if we do not properly address them. See this blog post for details:
diff --git a/runtime/wti.c b/runtime/wti.c
index f20812f8..9de7c365 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -51,6 +51,7 @@
#include "wti.h"
#include "obj.h"
#include "glbl.h"
+#include "atomic.h"
/* static data */
DEFobjStaticHelpers
@@ -106,6 +107,7 @@ rsRetVal
wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
{
DEFiRet;
+ qWrkCmd_t tCurrCmd;
DEFVARS_mutexProtection;
ISOBJ_TYPE_assert(pThis, wti);
@@ -113,13 +115,14 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+ tCurrCmd = pThis->tCurrCmd;
/* all worker states must be followed sequentially, only termination can be set in any state */
- if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED))
- || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
- dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n",
- wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd);
+ if( (bActiveOnly && (tCurrCmd < eWRKTHRD_RUN_CREATED))
+ || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
+ DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n",
+ wtiGetDbgHdr(pThis), tCmd, tCurrCmd);
} else {
- dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
+ DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
/* we could replace this with a simple if, but we leave the switch in in case we need
* to add something at a later stage. -- rgerhards, 2008-09-30
*/
@@ -143,7 +146,12 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
/* DO NOTHING */
break;
}
- pThis->tCurrCmd = tCmd; /* apply the new state */
+ /* apply the new state */
+ unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
+ if(val != tCurrCmd) {
+ DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
+ }
+
}
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
@@ -169,7 +177,7 @@ wtiCancelThrd(wti_t *pThis)
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
}
d_pthread_mutex_unlock(&pThis->mut);
@@ -318,7 +326,7 @@ wtiWorkerCancelCleanup(void *arg)
d_pthread_mutex_lock(&pWtp->mut);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
/* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pWtp->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -405,7 +413,7 @@ wtiWorker(wti_t *pThis)
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 611b3f25..218a5db6 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -39,6 +39,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
+#include <atomic.h>
#ifdef OS_SOLARIS
# include <sched.h>
@@ -214,7 +215,7 @@ wtpProcessThrdChanges(wtp_t *pThis)
*/
do {
/* reset the change marker */
- pThis->bThrdStateChanged = 0;
+ ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged);
/* go through all threads */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);