summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-04-27 17:49:06 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-04-27 17:49:06 +0200
commit4a5a3196fbe4e5a4e9f8dea49f916462adbf3098 (patch)
treedf7750c93603d4a979188f3385c9fe4997072c7f /runtime
parent9cf1756dc6c06682b32ed7a789ceb4254b5792df (diff)
parentcbe2e3d44496ec7c6418e7e74ce917f2086a2947 (diff)
downloadrsyslog-4a5a3196fbe4e5a4e9f8dea49f916462adbf3098.tar.gz
rsyslog-4a5a3196fbe4e5a4e9f8dea49f916462adbf3098.tar.xz
rsyslog-4a5a3196fbe4e5a4e9f8dea49f916462adbf3098.zip
Merge branch 'v4-devel' into master
Conflicts: runtime/Makefile.am runtime/atomic.h runtime/queue.c runtime/queue.h runtime/wti.c runtime/wti.h runtime/wtp.c runtime/wtp.h
Diffstat (limited to 'runtime')
-rw-r--r--runtime/Makefile.am2
-rw-r--r--runtime/atomic-posix-sem.c70
-rw-r--r--runtime/atomic.h201
-rw-r--r--runtime/debug.c2
-rw-r--r--runtime/glbl.c6
-rw-r--r--runtime/msg.c13
-rw-r--r--runtime/msg.h1
-rw-r--r--runtime/prop.c6
-rw-r--r--runtime/prop.h2
-rw-r--r--runtime/queue.c8
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.c12
-rw-r--r--runtime/wti.c3
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c2
-rw-r--r--runtime/wtp.h3
16 files changed, 112 insertions, 221 deletions
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 2e85d846..9047c83d 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -9,9 +9,7 @@ librsyslog_la_SOURCES = \
rsyslog.h \
unicode-helper.h \
atomic.h \
- atomic-posix-sem.c \
batch.h \
- atomic-posix-sem.c \
syslogd-types.h \
module-template.h \
obj-types.h \
diff --git a/runtime/atomic-posix-sem.c b/runtime/atomic-posix-sem.c
deleted file mode 100644
index 979fae02..00000000
--- a/runtime/atomic-posix-sem.c
+++ /dev/null
@@ -1,70 +0,0 @@
-/* atomic_posix_sem.c: This file supplies an emulation for atomic operations using
- * POSIX semaphores.
- *
- * Copyright 2010 DResearch Digital Media Systems GmbH
- *
- * This file is part of the rsyslog runtime library.
- *
- * The rsyslog runtime library is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * The rsyslog runtime library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
- *
- * A copy of the GPL can be found in the file "COPYING" in this distribution.
- * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
- */
-
-#include "config.h"
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
-#include <semaphore.h>
-#include <errno.h>
-
-#include "atomic.h"
-#include "rsyslog.h"
-#include "srUtils.h"
-
-sem_t atomicSem;
-
-rsRetVal
-atomicSemInit(void)
-{
- DEFiRet;
-
- dbgprintf("init posix semaphore for atomics emulation\n");
- if(sem_init(&atomicSem, 0, 1) == -1)
- {
- char errStr[1024];
- rs_strerror_r(errno, errStr, sizeof(errStr));
- dbgprintf("init posix semaphore for atomics emulation failed: %s\n", errStr);
- iRet = RS_RET_SYS_ERR; /* the right error code ??? */
- }
-
- RETiRet;
-}
-
-void
-atomicSemExit(void)
-{
- dbgprintf("destroy posix semaphore for atomics emulation\n");
- if(sem_destroy(&atomicSem) == -1)
- {
- char errStr[1024];
- rs_strerror_r(errno, errStr, sizeof(errStr));
- dbgprintf("destroy posix semaphore for atomics emulation failed: %s\n", errStr);
- }
-}
-
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
-/* vim:set ai:
- */
diff --git a/runtime/atomic.h b/runtime/atomic.h
index cdcb1410..520d2acd 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -41,133 +41,29 @@
#ifdef HAVE_ATOMIC_BUILTINS
# define ATOMIC_SUB(data, val) __sync_fetch_and_sub(&(data), val)
# define ATOMIC_ADD(data, val) __sync_fetch_and_add(&(data), val)
-# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1))
# define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1)
-# define ATOMIC_DEC(data) ((void) __sync_sub_and_fetch(&(data), 1))
-# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1)
+# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1))
+# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1)
# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
-# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0)
-# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1)
+# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
+# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
+# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal));
-#else
-#ifdef HAVE_SEMAPHORE_H
- /* we use POSIX semaphores instead */
-
-#include "rsyslog.h"
-#include <semaphore.h>
-
-extern sem_t atomicSem;
-rsRetVal atomicSemInit(void);
-void atomicSemExit(void);
-
-#if HAVE_TYPEOF
-#define my_typeof(x) typeof(x)
-#else /* sorry, can't determine types, using 'int' */
-#define my_typeof(x) int
-#endif
-
-# define ATOMIC_SUB(data, val) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data -= val; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_ADD(data, val) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data += val; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_INC_AND_FETCH(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data += 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_INC(data) ((void) ATOMIC_INC_AND_FETCH(data))
-
-# define ATOMIC_DEC_AND_FETCH(data) \
-({ \
- sem_wait(&atomicSem); \
- data -= 1; \
- sem_post(&atomicSem); \
- data; \
-})
+# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
-# define ATOMIC_DEC(data) ((void) ATOMIC_DEC_AND_FETCH(data))
+ /* functions below are not needed if we have atomics */
+# define DEF_ATOMIC_HELPER_MUT(x)
+# define INIT_ATOMIC_HELPER_MUT(x)
+# define DESTROY_ATOMIC_HELPER_MUT(x)
-# define ATOMIC_FETCH_32BIT(data) ((unsigned) ATOMIC_ADD((data), 0xffffffff))
-
-# define ATOMIC_STORE_1_TO_32BIT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_STORE_0_TO_INT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 0; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_STORE_1_TO_INT(data) \
-({ \
- my_typeof(data) tmp; \
- sem_wait(&atomicSem); \
- tmp = data; \
- data = 1; \
- sem_post(&atomicSem); \
- tmp; \
-})
-
-# define ATOMIC_CAS(data, oldVal, newVal) \
-({ \
- int ret; \
- sem_wait(&atomicSem); \
- if(data != oldVal) ret = 0; \
- else \
- { \
- data = newVal; \
- ret = 1; \
- } \
- sem_post(&atomicSem); \
- ret; \
-})
-
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) \
-({ \
- sem_wait(&atomicSem); \
- if(data == oldVal) \
- { \
- data = newVal; \
- } \
- sem_post(&atomicSem); \
- data; \
-})
-
-#else /* not HAVE_SEMAPHORE_H */
+ /* the following operations should preferrably be done atomic, but it is
+ * not fatal if not -- that means we can live with some missed updates. So be
+ * sure to use these macros only if that really does not matter!
+ */
+# define PREFER_ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1))
+#else
/* note that we gained parctical proof that theoretical problems DO occur
* if we do not properly address them. See this blog post for details:
* http://blog.gerhards.net/2009/01/rsyslog-data-race-analysis.html
@@ -175,16 +71,63 @@ void atomicSemExit(void);
* simply go ahead and do without them - use mutexes or other things. The
* code needs to be checked against all those cases. -- rgerhards, 2009-01-30
*/
+ #include <pthread.h>
+# define ATOMIC_INC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ ++(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(&hlpmut); \
+ *(data) = 0; \
+ pthread_mutex_unlock(&hlpmut); \
+ }
+
+# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \
+ pthread_mutex_lock(&hlpmut); \
+ *(data) = 1; \
+ pthread_mutex_unlock(&hlpmut); \
+ }
+
+ static inline int
+ ATOMIC_CAS_VAL(int *data, int oldVal, int newVal, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ if(*data == oldVal) {
+ *data = newVal;
+ }
+ val = *data;
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+# define ATOMIC_DEC(data, phlpmut) { \
+ pthread_mutex_lock(phlpmut); \
+ --(*(data)); \
+ pthread_mutex_unlock(phlpmut); \
+ }
+
+ static inline int
+ ATOMIC_DEC_AND_FETCH(int *data, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ val = --(*data);
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+#if 0
# warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!"
-# define ATOMIC_INC(data) (++(data))
-# define ATOMIC_DEC(data) (--(data))
-# define ATOMIC_DEC_AND_FETCH(data) (--(data))
-# define ATOMIC_FETCH_32BIT(data) (data)
-# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1
-# define ATOMIC_STORE_1_TO_INT(data) (data) = 1
-# define ATOMIC_STORE_0_TO_INT(data) (data) = 0
-# define ATOMIC_CAS_VAL(data, oldVal, newVal) (data) = (newVal)
+# define ATOMIC_INC_AND_FETCH(data) (++(data))
+# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del
+# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del
#endif
+# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x
+# define INIT_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+# define DESTROY_ATOMIC_HELPER_MUT(x) pthread_mutex_init(&(x), NULL)
+
+# define PREFER_ATOMIC_INC(data) ((void) ++data)
+
#endif
#endif /* #ifndef INCLUDED_ATOMIC_H */
diff --git a/runtime/debug.c b/runtime/debug.c
index 899d05da..64e251e5 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -1093,7 +1093,7 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int
}
/* when we reach this point, we have a fully-initialized FuncDB! */
- ATOMIC_INC(pFuncDB->nTimesCalled);
+ PREFER_ATOMIC_INC(pFuncDB->nTimesCalled);
if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot))
if(strcmp(pFuncDB->file, "stringbuf.c")) { /* TODO: make configurable */
dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
diff --git a/runtime/glbl.c b/runtime/glbl.c
index ac08791f..5951d21a 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -74,6 +74,7 @@ static uchar *pszDfltNetstrmDrvrCAF = NULL; /* default CA file for the netstrm d
static uchar *pszDfltNetstrmDrvrKeyFile = NULL; /* default key file for the netstrm driver (server) */
static uchar *pszDfltNetstrmDrvrCertFile = NULL; /* default cert file for the netstrm driver (server) */
static int bTerminateInputs = 0; /* global switch that inputs shall terminate ASAP (1=> terminate) */
+static DEF_ATOMIC_HELPER_MUT(mutTerminateInputs);
#ifdef USE_UNLIMITED_SELECT
static int iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); /* size of select() bitmask in bytes */
#endif
@@ -139,7 +140,7 @@ static int GetGlobalInputTermState(void)
*/
static void SetGlobalInputTermination(void)
{
- ATOMIC_STORE_1_TO_INT(bTerminateInputs);
+ ATOMIC_STORE_1_TO_INT(&bTerminateInputs, &mutTerminateInputs);
}
@@ -352,6 +353,8 @@ BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */
CHKiRet(regCfSysLineHdlr((uchar *)"optimizeforuniprocessor", 0, eCmdHdlrBinary, NULL, &bOptimizeUniProc, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"preservefqdn", 0, eCmdHdlrBinary, NULL, &bPreserveFQDN, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
+
+ INIT_ATOMIC_HELPER_MUT(mutTerminateInputs);
ENDObjClassInit(glbl)
@@ -374,6 +377,7 @@ BEGINObjClassExit(glbl, OBJ_IS_CORE_MODULE) /* class, version */
if(LocalFQDNName != NULL)
free(LocalFQDNName);
objRelease(prop, CORE_COMPONENT);
+ DESTROY_ATOMIC_HELPER_MUT(mutTerminateInputs);
ENDObjClassExit(glbl)
/* vi:set ai:
diff --git a/runtime/msg.c b/runtime/msg.c
index 5885cada..2c8c36a3 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -787,7 +787,7 @@ BEGINobjDestruct(msg) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(msg)
/* DEV Debugging only ! dbgprintf("msgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pThis, pThis->iRefCount - 1); */
# ifdef HAVE_ATOMIC_BUILTINS
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, NULL);
# else
MsgLock(pThis);
currRefCount = --pThis->iRefCount;
@@ -843,9 +843,18 @@ CODESTARTobjDestruct(msg)
* that we trim too often when the counter wraps.
*/
static unsigned iTrimCtr = 1;
+# ifdef HAVE_ATOMICS
if(ATOMIC_INC_AND_FETCH(iTrimCtr) % 100000 == 0) {
malloc_trim(128*1024);
}
+# else
+static pthread_mutex_t mutTrimCtr = PTHREAD_MUTEX_INITIALIZER;
+ d_pthread_mutex_lock(&mutTrimCtr);
+ if(iTrimCtr++ % 100000 == 0) {
+ malloc_trim(128*1024);
+ }
+ d_pthread_mutex_unlock(&mutTrimCtr);
+# endif
}
# endif
} else {
@@ -1055,7 +1064,7 @@ msg_t *MsgAddRef(msg_t *pM)
{
assert(pM != NULL);
# ifdef HAVE_ATOMIC_BUILTINS
- ATOMIC_INC(pM->iRefCount);
+ ATOMIC_INC(&pM->iRefCount, NULL);
# else
MsgLock(pM);
pM->iRefCount++;
diff --git a/runtime/msg.h b/runtime/msg.h
index 712609f6..97cac62f 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -32,6 +32,7 @@
#include "obj.h"
#include "syslogd-types.h"
#include "template.h"
+#include "atomic.h"
/* rgerhards 2004-11-08: The following structure represents a
diff --git a/runtime/prop.c b/runtime/prop.c
index 94d1bd49..d925bb43 100644
--- a/runtime/prop.c
+++ b/runtime/prop.c
@@ -53,6 +53,7 @@ DEFobjStaticHelpers
*/
BEGINobjConstruct(prop) /* be sure to specify the object type also in END macro! */
pThis->iRefCount = 1;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutRefCount);
ENDobjConstruct(prop)
@@ -60,11 +61,12 @@ ENDobjConstruct(prop)
BEGINobjDestruct(prop) /* be sure to specify the object type also in END and CODESTART macros! */
int currRefCount;
CODESTARTobjDestruct(prop)
- currRefCount = ATOMIC_DEC_AND_FETCH(pThis->iRefCount);
+ currRefCount = ATOMIC_DEC_AND_FETCH(&pThis->iRefCount, &pThis->mutRefCount);
if(currRefCount == 0) {
/* (only) in this case we need to actually destruct the object */
if(pThis->len >= CONF_PROP_BUFSIZE)
free(pThis->szVal.psz);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutRefCount);
} else {
pThis = NULL; /* tell framework NOT to destructing the object! */
}
@@ -132,7 +134,7 @@ propConstructFinalize(prop_t __attribute__((unused)) *pThis)
*/
static rsRetVal AddRef(prop_t *pThis)
{
- ATOMIC_INC(pThis->iRefCount);
+ ATOMIC_INC(&pThis->iRefCount, &pThis->mutRefCount);
return RS_RET_OK;
}
diff --git a/runtime/prop.h b/runtime/prop.h
index e3519664..07b2ab7e 100644
--- a/runtime/prop.h
+++ b/runtime/prop.h
@@ -24,6 +24,7 @@
*/
#ifndef INCLUDED_PROP_H
#define INCLUDED_PROP_H
+#include "atomic.h"
/* the prop object */
struct prop_s {
@@ -34,6 +35,7 @@ struct prop_s {
uchar sz[CONF_PROP_BUFSIZE];
} szVal;
int len; /* we use int intentionally, otherwise we may get some troubles... */
+ DEF_ATOMIC_HELPER_MUT(mutRefCount);
};
/* interfaces */
diff --git a/runtime/queue.c b/runtime/queue.c
index b29ec7ac..bf2164a6 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -214,8 +214,8 @@ static inline void queueDrain(qqueue_t *pThis)
BEGINfunc
DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
- pThis->qDeq(pThis, &pUsr);
+ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
+ pThis->qDel(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
}
@@ -1226,6 +1226,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
break;
}
+ INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
RETiRet;
@@ -2063,6 +2065,8 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
diff --git a/runtime/queue.h b/runtime/queue.h
index 38c0d491..45d3a51b 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -160,6 +160,7 @@ struct queue_s {
strm_t *pReadDel; /* current file for deleting */
} disk;
} tVars;
+ DEF_ATOMIC_HELPER_MUT(mutQueueSize);
};
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 921ad0bd..c321484d 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -141,12 +141,6 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */
CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
- CHKiRet(atomicSemInit());
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
/* initialize core classes. We must be very careful with the order of events. Some
* classes use others and if we do not initialize them in the right order, we may end
* up with an invalid call. The most important thing that can happen is that an error
@@ -224,12 +218,6 @@ rsrtExit(void)
rulesetClassExit();
ruleClassExit();
-#ifndef HAVE_ATOMIC_BUILTINS
-#ifdef HAVE_SEMAPHORE_H
- atomicSemExit();
-#endif /* HAVE_SEMAPHORE_H */
-#endif /* !defined(HAVE_ATOMIC_BUILTINS) */
-
objClassExit(); /* *THIS* *MUST/SHOULD?* always be the first class initilizer being called (except debug)! */
}
diff --git a/runtime/wti.c b/runtime/wti.c
index 14964fb0..8994d84b 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -147,6 +147,8 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(wti)
/* actual destruction */
free(pThis->batch.pElem);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
+
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -154,6 +156,7 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
+ INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
ENDobjConstruct(wti)
diff --git a/runtime/wti.h b/runtime/wti.h
index e587c69e..ab23f8c1 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -39,6 +39,7 @@ struct wti_s {
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutCurrCmd);
};
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 649ffa5a..db6d4fcb 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -96,6 +96,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfObjProcessed = NotImplementedDummy;
+ INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
ENDobjConstruct(wtp)
@@ -149,6 +150,7 @@ CODESTARTobjDestruct(wtp)
pthread_cond_destroy(&pThis->condThrdTrm);
pthread_mutex_destroy(&pThis->mutWtp);
pthread_attr_destroy(&pThis->attrThrd);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 05c02a8c..baeba1a2 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -26,6 +26,7 @@
#include <pthread.h>
#include "obj.h"
+#include "atomic.h"
/* states for worker threads. */
#define WRKTHRD_STOPPED FALSE
@@ -65,6 +66,7 @@ struct wtp_s {
rsRetVal (*pfDoWork)(void *pUsr, void *pWti);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
+ DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged);
};
/* some symbolic constants for easier reference */
@@ -82,6 +84,7 @@ rsRetVal wtpWakeupAllWrkr(wtp_t *pThis);
rsRetVal wtpCancelAll(wtp_t *pThis);
rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
+//void wtpSetThrdStateChanged(wtp_t *pThis, int val);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));