diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-04-27 17:49:06 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-04-27 17:49:06 +0200 |
commit | 4a5a3196fbe4e5a4e9f8dea49f916462adbf3098 (patch) | |
tree | df7750c93603d4a979188f3385c9fe4997072c7f /runtime | |
parent | 9cf1756dc6c06682b32ed7a789ceb4254b5792df (diff) | |
parent | cbe2e3d44496ec7c6418e7e74ce917f2086a2947 (diff) | |
download | rsyslog-4a5a3196fbe4e5a4e9f8dea49f916462adbf3098.tar.gz rsyslog-4a5a3196fbe4e5a4e9f8dea49f916462adbf3098.tar.xz rsyslog-4a5a3196fbe4e5a4e9f8dea49f916462adbf3098.zip |
Merge branch 'v4-devel' into master
Conflicts:
runtime/Makefile.am
runtime/atomic.h
runtime/queue.c
runtime/queue.h
runtime/wti.c
runtime/wti.h
runtime/wtp.c
runtime/wtp.h
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/Makefile.am | 2 | ||||
-rw-r--r-- | runtime/atomic-posix-sem.c | 70 | ||||
-rw-r--r-- | runtime/atomic.h | 201 | ||||
-rw-r--r-- | runtime/debug.c | 2 | ||||
-rw-r--r-- | runtime/glbl.c | 6 | ||||
-rw-r--r-- | runtime/msg.c | 13 | ||||
-rw-r--r-- | runtime/msg.h | 1 | ||||
-rw-r--r-- | runtime/prop.c | 6 | ||||
-rw-r--r-- | runtime/prop.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 8 | ||||
-rw-r--r-- | runtime/queue.h | 1 | ||||
-rw-r--r-- | runtime/rsyslog.c | 12 | ||||
-rw-r--r-- | runtime/wti.c | 3 | ||||
-rw-r--r-- | runtime/wti.h | 1 | ||||
-rw-r--r-- | runtime/wtp.c | 2 | ||||
-rw-r--r-- | runtime/wtp.h | 3 |
16 files changed, 112 insertions, 221 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am index 2e85d846..9047c83d 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -9,9 +9,7 @@ librsyslog_la_SOURCES = \ rsyslog.h \ unicode-helper.h \ atomic.h \ - atomic-posix-sem.c \ batch.h \ - atomic-posix-sem.c \ syslogd-types.h \ module-template.h \ obj-types.h \ diff --git a/runtime/atomic-posix-sem.c b/runtime/atomic-posix-sem.c deleted file mode 100644 index 979fae02..00000000 --- a/runtime/atomic-posix-sem.c +++ /dev/null @@ -1,70 +0,0 @@ -/* atomic_posix_sem.c: This file supplies an emulation for atomic operations using - * POSIX semaphores. - * - * Copyright 2010 DResearch Digital Media Systems GmbH - * - * This file is part of the rsyslog runtime library. - * - * The rsyslog runtime library is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * The rsyslog runtime library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. - * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. - */ - -#include "config.h" -#ifndef HAVE_ATOMIC_BUILTINS -#ifdef HAVE_SEMAPHORE_H -#include <semaphore.h> -#include <errno.h> - -#include "atomic.h" -#include "rsyslog.h" -#include "srUtils.h" - -sem_t atomicSem; - -rsRetVal -atomicSemInit(void) -{ - DEFiRet; - - dbgprintf("init posix semaphore for atomics emulation\n"); - if(sem_init(&atomicSem, 0, 1) == -1) - { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("init posix semaphore for atomics emulation failed: %s\n", errStr); - iRet = RS_RET_SYS_ERR; /* the right error code ??? */ - } - - RETiRet; -} - -void -atomicSemExit(void) -{ - dbgprintf("destroy posix semaphore for atomics emulation\n"); - if(sem_destroy(&atomicSem) == -1) - { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("destroy posix semaphore for atomics emulation failed: %s\n", errStr); - } -} - -#endif /* HAVE_SEMAPHORE_H */ -#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ - -/* vim:set ai: - */ diff --git a/runtime/atomic.h b/runtime/atomic.h index cdcb1410..520d2acd 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -41,133 +41,29 @@ #ifdef HAVE_ATOMIC_BUILTINS # define ATOMIC_SUB(data, val) __sync_fetch_and_sub(&(data), val) # define ATOMIC_ADD(data, val) __sync_fetch_and_add(&(data), val) -# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1)) +# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1)) # define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1) -# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1)) -# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1) +# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1)) +# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1) # define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff)) # define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1) -# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0) -# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1) +# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0) +# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1) +# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) # define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); -# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal)); -#else -#ifdef HAVE_SEMAPHORE_H - /* we use POSIX semaphores instead */ - -#include "rsyslog.h" -#include <semaphore.h> - -extern sem_t atomicSem; -rsRetVal atomicSemInit(void); -void atomicSemExit(void); - -#if HAVE_TYPEOF -#define my_typeof(x) typeof(x) -#else /* sorry, can't determine types, using 'int' */ -#define my_typeof(x) int -#endif - -# define ATOMIC_SUB(data, val) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data -= val; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_ADD(data, val) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data += val; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_INC_AND_FETCH(data) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data += 1; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_INC(data) ((void) ATOMIC_INC_AND_FETCH(data)) - -# define ATOMIC_DEC_AND_FETCH(data) \ -({ \ - sem_wait(&atomicSem); \ - data -= 1; \ - sem_post(&atomicSem); \ - data; \ -}) +# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal)); -# define ATOMIC_DEC(data) ((void) ATOMIC_DEC_AND_FETCH(data)) + /* functions below are not needed if we have atomics */ +# define DEF_ATOMIC_HELPER_MUT(x) +# define INIT_ATOMIC_HELPER_MUT(x) +# define DESTROY_ATOMIC_HELPER_MUT(x) -# define ATOMIC_FETCH_32BIT(data) ((unsigned) ATOMIC_ADD((data), 0xffffffff)) - -# define ATOMIC_STORE_1_TO_32BIT(data) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data = 1; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_STORE_0_TO_INT(data) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data = 0; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_STORE_1_TO_INT(data) \ -({ \ - my_typeof(data) tmp; \ - sem_wait(&atomicSem); \ - tmp = data; \ - data = 1; \ - sem_post(&atomicSem); \ - tmp; \ -}) - -# define ATOMIC_CAS(data, oldVal, newVal) \ -({ \ - int ret; \ - sem_wait(&atomicSem); \ - if(data != oldVal) ret = 0; \ - else \ - { \ - data = newVal; \ - ret = 1; \ - } \ - sem_post(&atomicSem); \ - ret; \ -}) - -# define ATOMIC_CAS_VAL(data, oldVal, newVal) \ -({ \ - sem_wait(&atomicSem); \ - if(data == oldVal) \ - { \ - data = newVal; \ - } \ - sem_post(&atomicSem); \ - data; \ -}) - -#else /* not HAVE_SEMAPHORE_H */ + /* the following operations should preferrably be done atomic, but it is + * not fatal if not -- that means we can live with some missed updates. So be + * sure to use these macros only if that really does not matter! + */ +# define PREFER_ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1)) +#else /* note that we gained parctical proof that theoretical problems DO occur * if we do not properly address them. See this blog post for details: * http://blog.gerhards.net/2009/01/rsyslog-data-race-analysis.html @@ -175,16 +71,63 @@ void atomicSemExit(void); * simply go ahead and do without them - use mutexes or other things. The * code needs to be checked against all those cases. -- rgerhards, 2009-01-30 */ + #include <pthread.h> +# define ATOMIC_INC(data, phlpmut) { \ + pthread_mutex_lock(phlpmut); \ + ++(*(data)); \ + pthread_mutex_unlock(phlpmut); \ + } + +# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \ + pthread_mutex_lock(&hlpmut); \ + *(data) = 0; \ + pthread_mutex_unlock(&hlpmut); \ + } + +# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \ + pthread_mutex_lock(&hlpmut); \ + *(data) = 1; \ + pthread_mutex_unlock(&hlpmut); \ + } + + static inline int + ATOMIC_CAS_VAL(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) { + int val; + pthread_mutex_lock(phlpmut); + if(*data == oldVal) { + *data = newVal; + } + val = *data; + pthread_mutex_unlock(phlpmut); + return(val); + } + +# define ATOMIC_DEC(data, phlpmut) { \ + pthread_mutex_lock(phlpmut); \ + --(*(data)); \ + pthread_mutex_unlock(phlpmut); \ + } + + static inline int + ATOMIC_DEC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) { + int val; + pthread_mutex_lock(phlpmut); + val = --(*data); + pthread_mutex_unlock(phlpmut); + return(val); + } +#if 0 # warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!" -# define ATOMIC_INC(data) (++(data)) -# define ATOMIC_DEC(data) (--(data)) -# define ATOMIC_DEC_AND_FETCH(data) (--(data)) -# define ATOMIC_FETCH_32BIT(data) (data) -# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 -# define ATOMIC_STORE_1_TO_INT(data) (data) = 1 -# define ATOMIC_STORE_0_TO_INT(data) (data) = 0 -# define ATOMIC_CAS_VAL(data, oldVal, newVal) (data) = (newVal) +# define ATOMIC_INC_AND_FETCH(data) (++(data)) +# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del +# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del #endif +# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x +# define INIT_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL) +# define DESTROY_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL) + +# define PREFER_ATOMIC_INC(data) ((void) ++data) + #endif #endif /* #ifndef INCLUDED_ATOMIC_H */ diff --git a/runtime/debug.c b/runtime/debug.c index 899d05da..64e251e5 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -1093,7 +1093,7 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int } /* when we reach this point, we have a fully-initialized FuncDB! */ - ATOMIC_INC(pFuncDB->nTimesCalled); + PREFER_ATOMIC_INC(pFuncDB->nTimesCalled); if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot)) if(strcmp(pFuncDB->file, "stringbuf.c")) { /* TODO: make configurable */ dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func); diff --git a/runtime/glbl.c b/runtime/glbl.c index ac08791f..5951d21a 100644 --- a/runtime/glbl.c +++ b/runtime/glbl.c @@ -74,6 +74,7 @@ static uchar *pszDfltNetstrmDrvrCAF = NULL; /* default CA file for the netstrm d static uchar *pszDfltNetstrmDrvrKeyFile = NULL; /* default key file for the netstrm driver (server) */ static uchar *pszDfltNetstrmDrvrCertFile = NULL; /* default cert file for the netstrm driver (server) */ static int bTerminateInputs = 0; /* global switch that inputs shall terminate ASAP (1=> terminate) */ +static DEF_ATOMIC_HELPER_MUT(mutTerminateInputs); #ifdef USE_UNLIMITED_SELECT static int iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); /* size of select() bitmask in bytes */ #endif @@ -139,7 +140,7 @@ static int GetGlobalInputTermState(void) */ static void SetGlobalInputTermination(void) { - ATOMIC_STORE_1_TO_INT(bTerminateInputs); + ATOMIC_STORE_1_TO_INT(&bTerminateInputs, &mutTerminateInputs); } @@ -352,6 +353,8 @@ BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */ CHKiRet(regCfSysLineHdlr((uchar *)"optimizeforuniprocessor", 0, eCmdHdlrBinary, NULL, &bOptimizeUniProc, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"preservefqdn", 0, eCmdHdlrBinary, NULL, &bPreserveFQDN, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); + + INIT_ATOMIC_HELPER_MUT(mutTerminateInputs); ENDObjClassInit(glbl) @@ -374,6 +377,7 @@ BEGINObjClassExit(glbl, OBJ_IS_CORE_MODULE) /* class, version */ if(LocalFQDNName != NULL) free(LocalFQDNName); objRelease(prop, CORE_COMPONENT); + DESTROY_ATOMIC_HELPER_MUT(mutTerminateInputs); ENDObjClassExit(glbl) /* vi:set ai: diff --git a/runtime/msg.c b/runtime/msg.c index 5885cada..2c8c36a3 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -787,7 +787,7 @@ BEGINobjDestruct(msg) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(msg) /* DEV Debugging only ! dbgprintf("msgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pThis, pThis->iRefCount - 1); */ # ifdef HAVE_ATOMIC_BUILTINS - currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount); + currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, NULL); # else MsgLock(pThis); currRefCount = --pThis->iRefCount; @@ -843,9 +843,18 @@ CODESTARTobjDestruct(msg) * that we trim too often when the counter wraps. */ static unsigned iTrimCtr = 1; +# ifdef HAVE_ATOMICS if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) { malloc_trim(128*1024); } +# else +static pthread_mutex_t mutTrimCtr = PTHREAD_MUTEX_INITIALIZER; + d_pthread_mutex_lock(&mutTrimCtr); + if(iTrimCtr++ % 100000 == 0) { + malloc_trim(128*1024); + } + d_pthread_mutex_unlock(&mutTrimCtr); +# endif } # endif } else { @@ -1055,7 +1064,7 @@ msg_t *MsgAddRef(msg_t *pM) { assert(pM != NULL); # ifdef HAVE_ATOMIC_BUILTINS - ATOMIC_INC(pM->iRefCount); + ATOMIC_INC(&pM->iRefCount, NULL); # else MsgLock(pM); pM->iRefCount++; diff --git a/runtime/msg.h b/runtime/msg.h index 712609f6..97cac62f 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -32,6 +32,7 @@ #include "obj.h" #include "syslogd-types.h" #include "template.h" +#include "atomic.h" /* rgerhards 2004-11-08: The following structure represents a diff --git a/runtime/prop.c b/runtime/prop.c index 94d1bd49..d925bb43 100644 --- a/runtime/prop.c +++ b/runtime/prop.c @@ -53,6 +53,7 @@ DEFobjStaticHelpers */ BEGINobjConstruct(prop) /* be sure to specify the object type also in END macro! */ pThis->iRefCount = 1; + INIT_ATOMIC_HELPER_MUT(pThis->mutRefCount); ENDobjConstruct(prop) @@ -60,11 +61,12 @@ ENDobjConstruct(prop) BEGINobjDestruct(prop) /* be sure to specify the object type also in END and CODESTART macros! */ int currRefCount; CODESTARTobjDestruct(prop) - currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount); + currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, &pThis->mutRefCount); if(currRefCount == 0) { /* (only) in this case we need to actually destruct the object */ if(pThis->len >= CONF_PROP_BUFSIZE) free(pThis->szVal.psz); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutRefCount); } else { pThis = NULL; /* tell framework NOT to destructing the object! */ } @@ -132,7 +134,7 @@ propConstructFinalize(prop_t __attribute__((unused)) *pThis) */ static rsRetVal AddRef(prop_t *pThis) { - ATOMIC_INC(pThis->iRefCount); + ATOMIC_INC(&pThis->iRefCount, &pThis->mutRefCount); return RS_RET_OK; } diff --git a/runtime/prop.h b/runtime/prop.h index e3519664..07b2ab7e 100644 --- a/runtime/prop.h +++ b/runtime/prop.h @@ -24,6 +24,7 @@ */ #ifndef INCLUDED_PROP_H #define INCLUDED_PROP_H +#include "atomic.h" /* the prop object */ struct prop_s { @@ -34,6 +35,7 @@ struct prop_s { uchar sz[CONF_PROP_BUFSIZE]; } szVal; int len; /* we use int intentionally, otherwise we may get some troubles... */ + DEF_ATOMIC_HELPER_MUT(mutRefCount); }; /* interfaces */ diff --git a/runtime/queue.c b/runtime/queue.c index b29ec7ac..bf2164a6 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -214,8 +214,8 @@ static inline void queueDrain(qqueue_t *pThis) BEGINfunc DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { - pThis->qDeq(pThis, &pUsr); + while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { + pThis->qDel(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); } @@ -1226,6 +1226,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread break; } + INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP RETiRet; @@ -2063,6 +2065,8 @@ CODESTARTobjDestruct(qqueue) pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + /* type-specific destructor */ iRet = pThis->qDestruct(pThis); diff --git a/runtime/queue.h b/runtime/queue.h index 38c0d491..45d3a51b 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -160,6 +160,7 @@ struct queue_s { strm_t *pReadDel; /* current file for deleting */ } disk; } tVars; + DEF_ATOMIC_HELPER_MUT(mutQueueSize); }; diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index 921ad0bd..c321484d 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -141,12 +141,6 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */ CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */ -#ifndef HAVE_ATOMIC_BUILTINS -#ifdef HAVE_SEMAPHORE_H - CHKiRet(atomicSemInit()); -#endif /* HAVE_SEMAPHORE_H */ -#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ - /* initialize core classes. We must be very careful with the order of events. Some * classes use others and if we do not initialize them in the right order, we may end * up with an invalid call. The most important thing that can happen is that an error @@ -224,12 +218,6 @@ rsrtExit(void) rulesetClassExit(); ruleClassExit(); -#ifndef HAVE_ATOMIC_BUILTINS -#ifdef HAVE_SEMAPHORE_H - atomicSemExit(); -#endif /* HAVE_SEMAPHORE_H */ -#endif /* !defined(HAVE_ATOMIC_BUILTINS) */ - objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */ } diff --git a/runtime/wti.c b/runtime/wti.c index 14964fb0..8994d84b 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -147,6 +147,8 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ free(pThis->batch.pElem); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd); + free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -154,6 +156,7 @@ ENDobjDestruct(wti) /* Standard-Constructor for the wti object */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ + INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd); ENDobjConstruct(wti) diff --git a/runtime/wti.h b/runtime/wti.h index e587c69e..ab23f8c1 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -39,6 +39,7 @@ struct wti_s { wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ + DEF_ATOMIC_HELPER_MUT(mutCurrCmd); }; diff --git a/runtime/wtp.c b/runtime/wtp.c index 649ffa5a..db6d4fcb 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -96,6 +96,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfObjProcessed = NotImplementedDummy; + INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); ENDobjConstruct(wtp) @@ -149,6 +150,7 @@ CODESTARTobjDestruct(wtp) pthread_cond_destroy(&pThis->condThrdTrm); pthread_mutex_destroy(&pThis->mutWtp); pthread_attr_destroy(&pThis->attrThrd); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); free(pThis->pszDbgHdr); ENDobjDestruct(wtp) diff --git a/runtime/wtp.h b/runtime/wtp.h index 05c02a8c..baeba1a2 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -26,6 +26,7 @@ #include <pthread.h> #include "obj.h" +#include "atomic.h" /* states for worker threads. */ #define WRKTHRD_STOPPED FALSE @@ -65,6 +66,7 @@ struct wtp_s { rsRetVal (*pfDoWork)(void *pUsr, void *pWti); /* end user objects */ uchar *pszDbgHdr; /* header string for debug messages */ + DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged); }; /* some symbolic constants for easier reference */ @@ -82,6 +84,7 @@ rsRetVal wtpWakeupAllWrkr(wtp_t *pThis); rsRetVal wtpCancelAll(wtp_t *pThis); rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout); +//void wtpSetThrdStateChanged(wtp_t *pThis, int val); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); |