diff options
-rw-r--r-- | ChangeLog | 4 | ||||
-rw-r--r-- | runtime/Makefile.am | 1 | ||||
-rw-r--r-- | runtime/atomic-posix-sem.c | 70 | ||||
-rw-r--r-- | runtime/atomic.h | 200 | ||||
-rw-r--r-- | runtime/debug.c | 2 | ||||
-rw-r--r-- | runtime/msg.c | 13 | ||||
-rw-r--r-- | runtime/msg.h | 1 | ||||
-rw-r--r-- | runtime/prop.c | 6 | ||||
-rw-r--r-- | runtime/prop.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 10 | ||||
-rw-r--r-- | runtime/queue.h | 1 | ||||
-rw-r--r-- | runtime/rsyslog.c | 12 | ||||
-rw-r--r-- | runtime/wti.c | 11 | ||||
-rw-r--r-- | runtime/wti.h | 1 | ||||
-rw-r--r-- | runtime/wtp.c | 18 | ||||
-rw-r--r-- | runtime/wtp.h | 3 |
16 files changed, 129 insertions, 226 deletions
@@ -1,5 +1,9 @@ --------------------------------------------------------------------------- Version 4.7.2 [v4-devel] (rgerhards), 2010-04-?? +- bugfix: problems with atomic operations emulaton + replaced atomic operation emulation with new code. The previous code + seemed to have some issue and also limited concurrency severely. The + whole atomic operation emulation has been rewritten. - added new $Sleep directive to hold processing for a couple of seconds during startup --------------------------------------------------------------------------- diff --git a/runtime/Makefile.am b/runtime/Makefile.am index ac006bca..c1a15198 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -9,7 +9,6 @@ librsyslog_la_SOURCES = \ rsyslog.h \ unicode-helper.h \ atomic.h \ - atomic-posix-sem.c \ syslogd-types.h \ module-template.h \ obj-types.h \ diff --git a/runtime/atomic-posix-sem.c b/runtime/atomic-posix-sem.c deleted file mode 100644 index 979fae02..00000000 --- a/runtime/atomic-posix-sem.c +++ /dev/null @@ -1,70 +0,0 @@ -/* atomic_posix_sem.c: This file supplies an emulation for atomic operations using - * POSIX semaphores. - * - * Copyright 2010 DResearch Digital Media Systems GmbH - * - * This file is part of the rsyslog runtime library. - * - * The rsyslog runtime library is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * The rsyslog runtime library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. - * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. - */ - -#include "config.h" -#ifndef HAVE_ATOMIC_BUILTINS -#ifdef HAVE_SEMAPHORE_H -#include <semaphore.h> -#include <errno.h> - -#include "atomic.h" -#include "rsyslog.h" -#include "srUtils.h" - -sem_t atomicSem; - -rsRetVal -atomicSemInit(void) -{ - DEFiRet; - - dbgprintf("init posix semaphore for atomics emulation\n"); - if(sem_init(&atomicSem, 0, 1) == -1) - { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("init posix semaphore for atomics emulation failed: %s\n", errStr); - iRet = RS_RET_SYS_ERR; /* the right error code ??? */ - } - - RETiRet; -} - -void -atomicSemExit(void) -{ - dbgprintf("destroy posix semaphore for atomics emulation\n"); - if(sem_destroy(&atomicSem) == -1) - { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("destroy posix semaphore for atomics emulation failed: %s\n", errStr); - } -} - -#endif /* HAVE_SEMAPHORE_H */ -#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ - -/* vim:set ai: - */ diff --git a/runtime/atomic.h b/runtime/atomic.h index 271e825e..fc3e0b2d 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -39,134 +39,29 @@ * They simply came in too late. -- rgerhards, 2008-04-02 */ #ifdef HAVE_ATOMIC_BUILTINS -# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1)) +# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1)) # define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1) -# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1)) -# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1) +# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1)) +# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1) # define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff)) # define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1) -# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0) -# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1) +# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0) +# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1) # define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) # define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); -# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal)); -#else -#ifdef HAVE_SEMAPHORE_H - /* we use POSIX semaphores instead */ - -#include "rsyslog.h" -#include <semaphore.h> - -extern sem_t atomicSem; -rsRetVal atomicSemInit(void); -void atomicSemExit(void); - -#if HAVE_TYPEOF -#define my_typeof(x) typeof(x) -#else /* sorry, can't determine types, using 'int' */ -#define my_typeof(x) int -#endif - -# define ATOMIC_SUB(data, val) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data -= val; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_ADD(data, val) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data += val; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_INC_AND_FETCH(data) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data += 1; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_INC(data) ((void) ATOMIC_INC_AND_FETCH(data)) - -# define ATOMIC_DEC_AND_FETCH(data) \ -({ \ - sem_wait(&atomicSem); \ - data -= 1; \ - sem_post(&atomicSem); \ - data; \ -}) +# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal)); -# define ATOMIC_DEC(data) ((void) ATOMIC_DEC_AND_FETCH(data)) + /* functions below are not needed if we have atomics */ +# define DEF_ATOMIC_HELPER_MUT(x) +# define INIT_ATOMIC_HELPER_MUT(x) +# define DESTROY_ATOMIC_HELPER_MUT(x) -# define ATOMIC_FETCH_32BIT(data) ((unsigned) ATOMIC_ADD((data), 0xffffffff)) - -# define ATOMIC_STORE_1_TO_32BIT(data) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data = 1; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_STORE_0_TO_INT(data) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data = 0; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_STORE_1_TO_INT(data) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data = 1; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_CAS(data, oldVal, newVal) \ -({ \ - int ret; \ - sem_wait(&atomicSem); \ - if(data != oldVal) ret = 0; \ - else \ - { \ - data = newVal; \ - ret = 1; \ - } \ - sem_post(&atomicSem); \ - ret; \ -}) - -# define ATOMIC_CAS_VAL(data, oldVal, newVal) \ -({ \ - sem_wait(&atomicSem); \ - if(data == oldVal) \ - { \ - data = newVal; \ - } \ - sem_post(&atomicSem); \ - data; \ -}) - -#else /* not HAVE_SEMAPHORE_H */ + /* the following operations should preferrably be done atomic, but it is + * not fatal if not -- that means we can live with some missed updates. So be + * sure to use these macros only if that really does not matter! + */ +# define PREFER_ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1)) +#else /* note that we gained parctical proof that theoretical problems DO occur * if we do not properly address them. See this blog post for details: * http://blog.gerhards.net/2009/01/rsyslog-data-race-analysis.html @@ -174,16 +69,63 @@ void atomicSemExit(void); * simply go ahead and do without them - use mutexes or other things. The * code needs to be checked against all those cases. -- rgerhards, 2009-01-30 */ + #include <pthread.h> +# define ATOMIC_INC(data, phlpmut) { \ + pthread_mutex_lock(phlpmut); \ + ++(*(data)); \ + pthread_mutex_unlock(phlpmut); \ + } + +# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \ + pthread_mutex_lock(&hlpmut); \ + *(data) = 0; \ + pthread_mutex_unlock(&hlpmut); \ + } + +# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \ + pthread_mutex_lock(&hlpmut); \ + *(data) = 1; \ + pthread_mutex_unlock(&hlpmut); \ + } + + static inline int + ATOMIC_CAS_VAL(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) { + int val; + pthread_mutex_lock(phlpmut); + if(*data == oldVal) { + *data = newVal; + } + val = *data; + pthread_mutex_unlock(phlpmut); + return(val); + } + +# define ATOMIC_DEC(data, phlpmut) { \ + pthread_mutex_lock(phlpmut); \ + --(*(data)); \ + pthread_mutex_unlock(phlpmut); \ + } + + static inline int + ATOMIC_DEC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) { + int val; + pthread_mutex_lock(phlpmut); + val = --(*data); + pthread_mutex_unlock(phlpmut); + return(val); + } +#if 0 # warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!" -# define ATOMIC_INC(data) (++(data)) -# define ATOMIC_DEC(data) (--(data)) -# define ATOMIC_DEC_AND_FETCH(data) (--(data)) -# define ATOMIC_FETCH_32BIT(data) (data) -# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 -# define ATOMIC_STORE_1_TO_INT(data) (data) = 1 -# define ATOMIC_STORE_0_TO_INT(data) (data) = 0 -# define ATOMIC_CAS_VAL(data, oldVal, newVal) (data) = (newVal) +# define ATOMIC_INC_AND_FETCH(data) (++(data)) +# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del +# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del #endif +# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x +# define INIT_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL) +# define DESTROY_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL) + +# define PREFER_ATOMIC_INC(data) ((void) ++data) + #endif #endif /* #ifndef INCLUDED_ATOMIC_H */ diff --git a/runtime/debug.c b/runtime/debug.c index da471609..0ada909b 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -1073,7 +1073,7 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int } /* when we reach this point, we have a fully-initialized FuncDB! */ - ATOMIC_INC(pFuncDB->nTimesCalled); + PREFER_ATOMIC_INC(pFuncDB->nTimesCalled); if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot)) dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func); if(pThrd->stackPtr >= (int) (sizeof(pThrd->callStack) / sizeof(dbgFuncDB_t*))) { diff --git a/runtime/msg.c b/runtime/msg.c index 6d7e6a89..57291def 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -748,7 +748,7 @@ BEGINobjDestruct(msg) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(msg) /* DEV Debugging only ! dbgprintf("msgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pThis, pThis->iRefCount - 1); */ # ifdef HAVE_ATOMIC_BUILTINS - currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount); + currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, NULL); # else MsgLock(pThis); currRefCount = --pThis->iRefCount; @@ -800,9 +800,18 @@ CODESTARTobjDestruct(msg) * that we trim too often when the counter wraps. */ static unsigned iTrimCtr = 1; +# ifdef HAVE_ATOMICS if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) { malloc_trim(128*1024); } +# else +static pthread_mutex_t mutTrimCtr = PTHREAD_MUTEX_INITIALIZER; + d_pthread_mutex_lock(&mutTrimCtr); + if(iTrimCtr++ % 100000 == 0) { + malloc_trim(128*1024); + } + d_pthread_mutex_unlock(&mutTrimCtr); +# endif } # endif } else { @@ -1002,7 +1011,7 @@ msg_t *MsgAddRef(msg_t *pM) { assert(pM != NULL); # ifdef HAVE_ATOMIC_BUILTINS - ATOMIC_INC(pM->iRefCount); + ATOMIC_INC(&pM->iRefCount, NULL); # else MsgLock(pM); pM->iRefCount++; diff --git a/runtime/msg.h b/runtime/msg.h index 0d3314b7..cda206fc 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -32,6 +32,7 @@ #include "obj.h" #include "syslogd-types.h" #include "template.h" +#include "atomic.h" /* rgerhards 2004-11-08: The following structure represents a diff --git a/runtime/prop.c b/runtime/prop.c index d188b2ed..7f2a56ff 100644 --- a/runtime/prop.c +++ b/runtime/prop.c @@ -53,6 +53,7 @@ DEFobjStaticHelpers */ BEGINobjConstruct(prop) /* be sure to specify the object type also in END macro! */ pThis->iRefCount = 1; + INIT_ATOMIC_HELPER_MUT(pThis->mutRefCount); ENDobjConstruct(prop) @@ -60,11 +61,12 @@ ENDobjConstruct(prop) BEGINobjDestruct(prop) /* be sure to specify the object type also in END and CODESTART macros! */ int currRefCount; CODESTARTobjDestruct(prop) - currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount); + currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, &pThis->mutRefCount); if(currRefCount == 0) { /* (only) in this case we need to actually destruct the object */ if(pThis->len >= CONF_PROP_BUFSIZE) free(pThis->szVal.psz); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutRefCount); } else { pThis = NULL; /* tell framework NOT to destructing the object! */ } @@ -132,7 +134,7 @@ propConstructFinalize(prop_t __attribute__((unused)) *pThis) */ static rsRetVal AddRef(prop_t *pThis) { - ATOMIC_INC(pThis->iRefCount); + ATOMIC_INC(&pThis->iRefCount, &pThis->mutRefCount); return RS_RET_OK; } diff --git a/runtime/prop.h b/runtime/prop.h index e3519664..07b2ab7e 100644 --- a/runtime/prop.h +++ b/runtime/prop.h @@ -24,6 +24,7 @@ */ #ifndef INCLUDED_PROP_H #define INCLUDED_PROP_H +#include "atomic.h" /* the prop object */ struct prop_s { @@ -34,6 +35,7 @@ struct prop_s { uchar sz[CONF_PROP_BUFSIZE]; } szVal; int len; /* we use int intentionally, otherwise we may get some troubles... */ + DEF_ATOMIC_HELPER_MUT(mutRefCount); }; /* interfaces */ diff --git a/runtime/queue.c b/runtime/queue.c index 9d7a9058..bedefb77 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -110,7 +110,7 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { + while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { pThis->qDel(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); @@ -1028,7 +1028,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) CHKiRet(pThis->qAdd(pThis, pUsr)); if(pThis->qType != QUEUETYPE_DIRECT) { - ATOMIC_INC(pThis->iQueueSize); + ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize); } @@ -1057,7 +1057,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr) iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); - ATOMIC_DEC(pThis->iQueueSize); + ATOMIC_DEC(&pThis->iQueueSize, &pThis->mutQueueSize); } dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", @@ -1345,6 +1345,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread break; } + INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP RETiRet; @@ -2065,6 +2067,8 @@ CODESTARTobjDestruct(qqueue) pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + /* type-specific destructor */ iRet = pThis->qDestruct(pThis); diff --git a/runtime/queue.h b/runtime/queue.h index 1d82d8d9..aafdaa45 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -160,6 +160,7 @@ typedef struct queue_s { strm_t *pRead; /* current file to be read */ } disk; } tVars; + DEF_ATOMIC_HELPER_MUT(mutQueueSize); } qqueue_t; /* some symbolic constants for easier reference */ diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index 5750ca76..c209ae30 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -140,12 +140,6 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */ CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */ -#ifndef HAVE_ATOMIC_BUILTINS -#ifdef HAVE_SEMAPHORE_H - CHKiRet(atomicSemInit()); -#endif /* HAVE_SEMAPHORE_H */ -#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ - /* initialize core classes. We must be very careful with the order of events. Some * classes use others and if we do not initialize them in the right order, we may end * up with an invalid call. The most important thing that can happen is that an error @@ -223,12 +217,6 @@ rsrtExit(void) rulesetClassExit(); ruleClassExit(); -#ifndef HAVE_ATOMIC_BUILTINS -#ifdef HAVE_SEMAPHORE_H - atomicSemExit(); -#endif /* HAVE_SEMAPHORE_H */ -#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ - objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */ } diff --git a/runtime/wti.c b/runtime/wti.c index abdf4add..90bb14ed 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -146,7 +146,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) break; } /* apply the new state */ - unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd); + unsigned val = ATOMIC_CAS_VAL((int*)&pThis->tCurrCmd, tCurrCmd, tCmd, &pThis->mutCurrCmd); if(val != tCurrCmd) { DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd); } @@ -178,7 +178,7 @@ wtiCancelThrd(wti_t *pThis) dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd); pthread_cancel(pThis->thrdID); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ + wtpSetThrdStateChanged(pThis->pWtp, 1); /* indicate change, so harverster will be called */ } d_pthread_mutex_unlock(&pThis->mut); @@ -209,6 +209,7 @@ CODESTARTobjDestruct(wti) /* actual destruction */ pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd); free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -219,6 +220,7 @@ ENDobjDestruct(wti) BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ pthread_cond_init(&pThis->condExitDone, NULL); pthread_mutex_init(&pThis->mut, NULL); + INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd); ENDobjConstruct(wti) @@ -326,8 +328,7 @@ wtiWorkerCancelCleanup(void *arg) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ - ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ + wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pWtp->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -414,7 +415,7 @@ wtiWorker(wti_t *pThis) pWtp->pfOnWorkerShutdown(pWtp->pUsr); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ + wtpSetThrdStateChanged(pWtp, 1); /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/wti.h b/runtime/wti.h index 72653b15..d81672f3 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -39,6 +39,7 @@ typedef struct wti_s { pthread_mutex_t mut; bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */ uchar *pszDbgHdr; /* header string for debug messages */ + DEF_ATOMIC_HELPER_MUT(mutCurrCmd); } wti_t; /* some symbolic constants for easier reference */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 271c6f0d..fff37c2f 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -97,6 +97,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pThis->pfOnWorkerCancel = NotImplementedDummy; pThis->pfOnWorkerStartup = NotImplementedDummy; pThis->pfOnWorkerShutdown = NotImplementedDummy; + INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); ENDobjConstruct(wtp) @@ -153,6 +154,7 @@ CODESTARTobjDestruct(wtp) pthread_cond_destroy(&pThis->condThrdTrm); pthread_mutex_destroy(&pThis->mut); pthread_mutex_destroy(&pThis->mutThrdShutdwn); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); free(pThis->pszDbgHdr); ENDobjDestruct(wtp) @@ -186,6 +188,20 @@ wtpWakeupAllWrkr(wtp_t *pThis) } +/* set the bThrdStateChanged in an atomic way. Note that + * val may only be 0 or 1. + */ +void +wtpSetThrdStateChanged(wtp_t *pThis, int val) +{ + if(val == 0) { + ATOMIC_STORE_0_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged); + } else { + ATOMIC_STORE_1_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged); + } +} + + /* check if we had any worker thread changes and, if so, act * on them. At a minimum, terminated threads are harvested (joined). * This function MUST NEVER block on the queue mutex! @@ -216,7 +232,7 @@ wtpProcessThrdChanges(wtp_t *pThis) */ do { /* reset the change marker */ - ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged); + wtpSetThrdStateChanged(pThis, 0); /* go through all threads */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX); diff --git a/runtime/wtp.h b/runtime/wtp.h index 1ce171cc..640c3320 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -26,6 +26,7 @@ #include <pthread.h> #include "obj.h" +#include "atomic.h" /* commands and states for worker threads. */ typedef enum { @@ -79,6 +80,7 @@ typedef struct wtp_s { rsRetVal (*pfOnWorkerShutdown)(void *pUsr); /* end user objects */ uchar *pszDbgHdr; /* header string for debug messages */ + DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged); } wtp_t; /* some symbolic constants for easier reference */ @@ -100,6 +102,7 @@ rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp); rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout); int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex); +void wtpSetThrdStateChanged(wtp_t *pThis, int val); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); |