summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog4
-rw-r--r--runtime/Makefile.am1
-rw-r--r--runtime/atomic-posix-sem.c70
-rw-r--r--runtime/atomic.h200
-rw-r--r--runtime/debug.c2
-rw-r--r--runtime/msg.c13
-rw-r--r--runtime/msg.h1
-rw-r--r--runtime/prop.c6
-rw-r--r--runtime/prop.h2
-rw-r--r--runtime/queue.c10
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.c12
-rw-r--r--runtime/wti.c11
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c18
-rw-r--r--runtime/wtp.h3
16 files changed, 129 insertions, 226 deletions
diff --git a/ChangeLog b/ChangeLog
index 80779457..f7b838d8 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,9 @@
---------------------------------------------------------------------------
Version 4.7.2 [v4-devel] (rgerhards), 2010-04-??
+- bugfix: problems with atomic operations emulaton
+ replaced atomic operation emulation with new code. The previous code
+ seemed to have some issue and also limited concurrency severely. The
+ whole atomic operation emulation has been rewritten.
- added new $Sleep directive to hold processing for a couple of seconds
during startup
---------------------------------------------------------------------------
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index ac006bca..c1a15198 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -9,7 +9,6 @@ librsyslog_la_SOURCES = \
rsyslog.h \
unicode-helper.h \
atomic.h \
- atomic-posix-sem.c \
syslogd-types.h \
module-template.h \
obj-types.h \
diff --git a/runtime/atomic-posix-sem.c b/runtime/atomic-posix-sem.c
deleted file mode 100644
index 979fae02..00000000
--- a/runtime/atomic-posix-sem.c
+++ /dev/null
@@ -1,70 +0,0 @@
-/* atomic_posix_sem.c: This file supplies an emulation for atomic operations using
- * POSIX semaphores.
- *
- * Copyright 2010 DResearch Digital Media Systems GmbH
- *
- * This file is part of the rsyslog runtime library.
- *
- * The rsyslog runtime library is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * The rsyslog runtime library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
- *
- * A copy of the GPL can be found in the file "COPYING" in this distribution.
- * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
- */
-
-#include "config.h"
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
-#include <semaphore.h>
-#include <errno.h>
-
-#include "atomic.h"
-#include "rsyslog.h"
-#include "srUtils.h"
-
-sem_t atomicSem;
-
-rsRetVal
-atomicSemInit(void)
-{
- DEFiRet;
-
- dbgprintf("init posix semaphore for atomics emulation\n");
- if(sem_init(&atomicSem, 0, 1) == -1)
- {
- char errStr[1024];
- rs_strerror_r(errno, errStr, sizeof(errStr));
- dbgprintf("init posix semaphore for atomics emulation failed: %s\n", errStr);
- iRet = RS_RET_SYS_ERR; /* the right error code ??? */
- }
-
- RETiRet;
-}
-
-void
-atomicSemExit(void)
-{
- dbgprintf("destroy posix semaphore for atomics emulation\n");
- if(sem_destroy(&atomicSem) == -1)
- {
- char errStr[1024];
- rs_strerror_r(errno, errStr, sizeof(errStr));
- dbgprintf("destroy posix semaphore for atomics emulation failed: %s\n", errStr);
- }
-}
-
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
-/* vim:set ai:
- */
diff --git a/runtime/atomic.h b/runtime/atomic.h
index 271e825e..fc3e0b2d 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -39,134 +39,29 @@
* They simply came in too late. -- rgerhards, 2008-04-02
*/
#ifdef HAVE_ATOMIC_BUILTINS
-# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1))
# define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1)
-# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
-# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
+# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1))
+# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
-# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0)
-# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1)
+# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
+# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal));
-#else
-#ifdef HAVE_SEMAPHORE_H
- /* we use POSIX semaphores instead */
-
-#include "rsyslog.h"
-#include <semaphore.h>
-
-extern sem_t atomicSem;
-rsRetVal atomicSemInit(void);
-void atomicSemExit(void);
-
-#if HAVE_TYPEOF
-#define my_typeof(x) typeof(x)
-#else /* sorry, can't determine types, using 'int' */
-#define my_typeof(x) int
-#endif
-
-# define ATOMIC_SUB(data, val) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data -= val; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_ADD(data, val) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data += val; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_INC_AND_FETCH(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data += 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_INC(data) ((void) ATOMIC_INC_AND_FETCH(data))
-
-# define ATOMIC_DEC_AND_FETCH(data) \
-({ \
- sem_wait(&atomicSem); \
- data -= 1; \
- sem_post(&atomicSem); \
- data; \
-})
+# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
-# define ATOMIC_DEC(data) ((void) ATOMIC_DEC_AND_FETCH(data))
+ /* functions below are not needed if we have atomics */
+# define DEF_ATOMIC_HELPER_MUT(x)
+# define INIT_ATOMIC_HELPER_MUT(x)
+# define DESTROY_ATOMIC_HELPER_MUT(x)
-# define ATOMIC_FETCH_32BIT(data) ((unsigned) ATOMIC_ADD((data), 0xffffffff))
-
-# define ATOMIC_STORE_1_TO_32BIT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_STORE_0_TO_INT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 0; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_STORE_1_TO_INT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_CAS(data, oldVal, newVal) \
-({ \
- int ret; \
- sem_wait(&atomicSem); \
- if(data != oldVal) ret = 0; \
- else \
- { \
- data = newVal; \
- ret = 1; \
- } \
- sem_post(&atomicSem); \
- ret; \
-})
-
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) \
-({ \
- sem_wait(&atomicSem); \
- if(data == oldVal) \
- { \
- data = newVal; \
- } \
- sem_post(&atomicSem); \
- data; \
-})
-
-#else /* not HAVE_SEMAPHORE_H */
+ /* the following operations should preferrably be done atomic, but it is
+ * not fatal if not -- that means we can live with some missed updates. So be
+ * sure to use these macros only if that really does not matter!
+ */
+# define PREFER_ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+#else
/* note that we gained parctical proof that theoretical problems DO occur
* if we do not properly address them. See this blog post for details:
* http://blog.gerhards.net/2009/01/rsyslog-data-race-analysis.html
@@ -174,16 +69,63 @@ void atomicSemExit(void);
* simply go ahead and do without them - use mutexes or other things. The
* code needs to be checked against all those cases. -- rgerhards, 2009-01-30
*/
+ #include <pthread.h>
+# define ATOMIC_INC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ ++(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(&hlpmut); \
+ *(data) = 0; \
+ pthread_mutex_unlock(&hlpmut); \
+ }
+
+# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(&hlpmut); \
+ *(data) = 1; \
+ pthread_mutex_unlock(&hlpmut); \
+ }
+
+ static inline int
+ ATOMIC_CAS_VAL(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ if(*data == oldVal) {
+ *data = newVal;
+ }
+ val = *data;
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+# define ATOMIC_DEC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ --(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+ static inline int
+ ATOMIC_DEC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ val = --(*data);
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+#if 0
# warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!"
-# define ATOMIC_INC(data) (++(data))
-# define ATOMIC_DEC(data) (--(data))
-# define ATOMIC_DEC_AND_FETCH(data) (--(data))
-# define ATOMIC_FETCH_32BIT(data) (data)
-# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1
-# define ATOMIC_STORE_1_TO_INT(data) (data) = 1
-# define ATOMIC_STORE_0_TO_INT(data) (data) = 0
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) (data) = (newVal)
+# define ATOMIC_INC_AND_FETCH(data) (++(data))
+# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del
+# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del
#endif
+# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x
+# define INIT_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+# define DESTROY_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+
+# define PREFER_ATOMIC_INC(data) ((void) ++data)
+
#endif
#endif /* #ifndef INCLUDED_ATOMIC_H */
diff --git a/runtime/debug.c b/runtime/debug.c
index da471609..0ada909b 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -1073,7 +1073,7 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int
}
/* when we reach this point, we have a fully-initialized FuncDB! */
- ATOMIC_INC(pFuncDB->nTimesCalled);
+ PREFER_ATOMIC_INC(pFuncDB->nTimesCalled);
if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot))
dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
if(pThrd->stackPtr >= (int) (sizeof(pThrd->callStack) / sizeof(dbgFuncDB_t*))) {
diff --git a/runtime/msg.c b/runtime/msg.c
index 6d7e6a89..57291def 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -748,7 +748,7 @@ BEGINobjDestruct(msg) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(msg)
/* DEV Debugging only ! dbgprintf("msgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pThis, pThis->iRefCount - 1); */
# ifdef HAVE_ATOMIC_BUILTINS
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, NULL);
# else
MsgLock(pThis);
currRefCount = --pThis->iRefCount;
@@ -800,9 +800,18 @@ CODESTARTobjDestruct(msg)
* that we trim too often when the counter wraps.
*/
static unsigned iTrimCtr = 1;
+# ifdef HAVE_ATOMICS
if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) {
malloc_trim(128*1024);
}
+# else
+static pthread_mutex_t mutTrimCtr = PTHREAD_MUTEX_INITIALIZER;
+ d_pthread_mutex_lock(&mutTrimCtr);
+ if(iTrimCtr++ % 100000 == 0) {
+ malloc_trim(128*1024);
+ }
+ d_pthread_mutex_unlock(&mutTrimCtr);
+# endif
}
# endif
} else {
@@ -1002,7 +1011,7 @@ msg_t *MsgAddRef(msg_t *pM)
{
assert(pM != NULL);
# ifdef HAVE_ATOMIC_BUILTINS
- ATOMIC_INC(pM->iRefCount);
+ ATOMIC_INC(&pM->iRefCount, NULL);
# else
MsgLock(pM);
pM->iRefCount++;
diff --git a/runtime/msg.h b/runtime/msg.h
index 0d3314b7..cda206fc 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -32,6 +32,7 @@
#include "obj.h"
#include "syslogd-types.h"
#include "template.h"
+#include "atomic.h"
/* rgerhards 2004-11-08: The following structure represents a
diff --git a/runtime/prop.c b/runtime/prop.c
index d188b2ed..7f2a56ff 100644
--- a/runtime/prop.c
+++ b/runtime/prop.c
@@ -53,6 +53,7 @@ DEFobjStaticHelpers
*/
BEGINobjConstruct(prop) /* be sure to specify the object type also in END macro! */
pThis->iRefCount = 1;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutRefCount);
ENDobjConstruct(prop)
@@ -60,11 +61,12 @@ ENDobjConstruct(prop)
BEGINobjDestruct(prop) /* be sure to specify the object type also in END and CODESTART macros! */
int currRefCount;
CODESTARTobjDestruct(prop)
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, &pThis->mutRefCount);
if(currRefCount == 0) {
/* (only) in this case we need to actually destruct the object */
if(pThis->len >= CONF_PROP_BUFSIZE)
free(pThis->szVal.psz);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutRefCount);
} else {
pThis = NULL; /* tell framework NOT to destructing the object! */
}
@@ -132,7 +134,7 @@ propConstructFinalize(prop_t __attribute__((unused)) *pThis)
*/
static rsRetVal AddRef(prop_t *pThis)
{
- ATOMIC_INC(pThis->iRefCount);
+ ATOMIC_INC(&pThis->iRefCount, &pThis->mutRefCount);
return RS_RET_OK;
}
diff --git a/runtime/prop.h b/runtime/prop.h
index e3519664..07b2ab7e 100644
--- a/runtime/prop.h
+++ b/runtime/prop.h
@@ -24,6 +24,7 @@
*/
#ifndef INCLUDED_PROP_H
#define INCLUDED_PROP_H
+#include "atomic.h"
/* the prop object */
struct prop_s {
@@ -34,6 +35,7 @@ struct prop_s {
uchar sz[CONF_PROP_BUFSIZE];
} szVal;
int len; /* we use int intentionally, otherwise we may get some troubles... */
+ DEF_ATOMIC_HELPER_MUT(mutRefCount);
};
/* interfaces */
diff --git a/runtime/queue.c b/runtime/queue.c
index 9d7a9058..bedefb77 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -110,7 +110,7 @@ static inline void queueDrain(qqueue_t *pThis)
ASSERT(pThis != NULL);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
+ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
pThis->qDel(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
@@ -1028,7 +1028,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1057,7 +1057,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
+ ATOMIC_DEC(&pThis->iQueueSize, &pThis->mutQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
@@ -1345,6 +1345,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
break;
}
+ INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
RETiRet;
@@ -2065,6 +2067,8 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
diff --git a/runtime/queue.h b/runtime/queue.h
index 1d82d8d9..aafdaa45 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -160,6 +160,7 @@ typedef struct queue_s {
strm_t *pRead; /* current file to be read */
} disk;
} tVars;
+ DEF_ATOMIC_HELPER_MUT(mutQueueSize);
} qqueue_t;
/* some symbolic constants for easier reference */
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 5750ca76..c209ae30 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -140,12 +140,6 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */
CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
- CHKiRet(atomicSemInit());
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
/* initialize core classes. We must be very careful with the order of events. Some
* classes use others and if we do not initialize them in the right order, we may end
* up with an invalid call. The most important thing that can happen is that an error
@@ -223,12 +217,6 @@ rsrtExit(void)
rulesetClassExit();
ruleClassExit();
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
- atomicSemExit();
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */
}
diff --git a/runtime/wti.c b/runtime/wti.c
index abdf4add..90bb14ed 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -146,7 +146,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
break;
}
/* apply the new state */
- unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
+ unsigned val = ATOMIC_CAS_VAL((int*)&pThis->tCurrCmd, tCurrCmd, tCmd, &pThis->mutCurrCmd);
if(val != tCurrCmd) {
DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
}
@@ -178,7 +178,7 @@ wtiCancelThrd(wti_t *pThis)
dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
pthread_cancel(pThis->thrdID);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pThis->pWtp, 1); /* indicate change, so harverster will be called */
}
d_pthread_mutex_unlock(&pThis->mut);
@@ -209,6 +209,7 @@ CODESTARTobjDestruct(wti)
/* actual destruction */
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -219,6 +220,7 @@ ENDobjDestruct(wti)
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
ENDobjConstruct(wti)
@@ -326,8 +328,7 @@ wtiWorkerCancelCleanup(void *arg)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pWtp->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -414,7 +415,7 @@ wtiWorker(wti_t *pThis)
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */
+ wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/wti.h b/runtime/wti.h
index 72653b15..d81672f3 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -39,6 +39,7 @@ typedef struct wti_s {
pthread_mutex_t mut;
bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutCurrCmd);
} wti_t;
/* some symbolic constants for easier reference */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 271c6f0d..fff37c2f 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -97,6 +97,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfOnWorkerCancel = NotImplementedDummy;
pThis->pfOnWorkerStartup = NotImplementedDummy;
pThis->pfOnWorkerShutdown = NotImplementedDummy;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
ENDobjConstruct(wtp)
@@ -153,6 +154,7 @@ CODESTARTobjDestruct(wtp)
pthread_cond_destroy(&pThis->condThrdTrm);
pthread_mutex_destroy(&pThis->mut);
pthread_mutex_destroy(&pThis->mutThrdShutdwn);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
@@ -186,6 +188,20 @@ wtpWakeupAllWrkr(wtp_t *pThis)
}
+/* set the bThrdStateChanged in an atomic way. Note that
+ * val may only be 0 or 1.
+ */
+void
+wtpSetThrdStateChanged(wtp_t *pThis, int val)
+{
+ if(val == 0) {
+ ATOMIC_STORE_0_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged);
+ } else {
+ ATOMIC_STORE_1_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged);
+ }
+}
+
+
/* check if we had any worker thread changes and, if so, act
* on them. At a minimum, terminated threads are harvested (joined).
* This function MUST NEVER block on the queue mutex!
@@ -216,7 +232,7 @@ wtpProcessThrdChanges(wtp_t *pThis)
*/
do {
/* reset the change marker */
- ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged);
+ wtpSetThrdStateChanged(pThis, 0);
/* go through all threads */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 1ce171cc..640c3320 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -26,6 +26,7 @@
#include <pthread.h>
#include "obj.h"
+#include "atomic.h"
/* commands and states for worker threads. */
typedef enum {
@@ -79,6 +80,7 @@ typedef struct wtp_s {
rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged);
} wtp_t;
/* some symbolic constants for easier reference */
@@ -100,6 +102,7 @@ rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp);
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex);
+void wtpSetThrdStateChanged(wtp_t *pThis, int val);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));