summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-04-27 18:26:09 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-04-27 18:26:09 +0200
commitdd76d96d676f305aa2d29131321fe5cac5a676c4 (patch)
treef71b499444c137ea1c9dfccc0dda9c1461e9040d /runtime
parent4a5a3196fbe4e5a4e9f8dea49f916462adbf3098 (diff)
downloadrsyslog-dd76d96d676f305aa2d29131321fe5cac5a676c4.tar.gz
rsyslog-dd76d96d676f305aa2d29131321fe5cac5a676c4.tar.xz
rsyslog-dd76d96d676f305aa2d29131321fe5cac5a676c4.zip
adapted new atomic instruction emulation to v5 engine
code did not compile after merge from v4
Diffstat (limited to 'runtime')
-rw-r--r--runtime/atomic.h29
-rw-r--r--runtime/glbl.c4
-rw-r--r--runtime/queue.c12
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/wti.c15
-rw-r--r--runtime/wti.h2
-rw-r--r--runtime/wtp.c27
-rw-r--r--runtime/wtp.h4
8 files changed, 60 insertions, 34 deletions
diff --git a/runtime/atomic.h b/runtime/atomic.h
index 520d2acd..30478440 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -39,13 +39,13 @@
* They simply came in too late. -- rgerhards, 2008-04-02
*/
#ifdef HAVE_ATOMIC_BUILTINS
-# define ATOMIC_SUB(data, val) __sync_fetch_and_sub(&(data), val)
+# define ATOMIC_SUB(data, val) __sync_fetch_and_sub(data, val)
# define ATOMIC_ADD(data, val) __sync_fetch_and_add(&(data), val)
# define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1))
# define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1)
# define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1))
# define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1)
-# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff))
+# define ATOMIC_FETCH_32BIT(data, phlpmut) ((unsigned) __sync_fetch_and_and(data, 0xffffffff))
# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1)
# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
@@ -79,15 +79,15 @@
}
# define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \
- pthread_mutex_lock(&hlpmut); \
+ pthread_mutex_lock(hlpmut); \
*(data) = 0; \
- pthread_mutex_unlock(&hlpmut); \
+ pthread_mutex_unlock(hlpmut); \
}
# define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \
- pthread_mutex_lock(&hlpmut); \
+ pthread_mutex_lock(hlpmut); \
*(data) = 1; \
- pthread_mutex_unlock(&hlpmut); \
+ pthread_mutex_unlock(hlpmut); \
}
static inline int
@@ -116,10 +116,25 @@
pthread_mutex_unlock(phlpmut);
return(val);
}
+
+ static inline int
+ ATOMIC_FETCH_32BIT(int *data, pthread_mutex_t *phlpmut) {
+ int val;
+ pthread_mutex_lock(phlpmut);
+ val = (*data);
+ pthread_mutex_unlock(phlpmut);
+ return(val);
+ }
+
+ static inline void
+ ATOMIC_SUB(int *data, int val, pthread_mutex_t *phlpmut) {
+ pthread_mutex_lock(phlpmut);
+ (*data) -= val;
+ pthread_mutex_unlock(phlpmut);
+ }
#if 0
# warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!"
# define ATOMIC_INC_AND_FETCH(data) (++(data))
-# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del
# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del
#endif
# define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x
diff --git a/runtime/glbl.c b/runtime/glbl.c
index 5951d21a..278bc4e1 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -74,7 +74,9 @@ static uchar *pszDfltNetstrmDrvrCAF = NULL; /* default CA file for the netstrm d
static uchar *pszDfltNetstrmDrvrKeyFile = NULL; /* default key file for the netstrm driver (server) */
static uchar *pszDfltNetstrmDrvrCertFile = NULL; /* default cert file for the netstrm driver (server) */
static int bTerminateInputs = 0; /* global switch that inputs shall terminate ASAP (1=> terminate) */
+#ifndef HAVE_ATOMIC_BUILTINS
static DEF_ATOMIC_HELPER_MUT(mutTerminateInputs);
+#endif
#ifdef USE_UNLIMITED_SELECT
static int iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); /* size of select() bitmask in bytes */
#endif
@@ -131,7 +133,7 @@ SIMP_PROP_SET(DfltNetstrmDrvrCertFile, pszDfltNetstrmDrvrCertFile, uchar*) /* TO
*/
static int GetGlobalInputTermState(void)
{
- return ATOMIC_FETCH_32BIT(bTerminateInputs);
+ return ATOMIC_FETCH_32BIT(&bTerminateInputs, &mutTerminateInputs);
}
diff --git a/runtime/queue.c b/runtime/queue.c
index bf2164a6..4f0d36b9 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -215,7 +215,7 @@ static inline void queueDrain(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
- pThis->qDel(pThis, &pUsr);
+ pThis->qDeq(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
}
@@ -884,7 +884,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
@@ -909,7 +909,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
iRet = pThis->qDeq(pThis, ppUsr);
- ATOMIC_INC(pThis->nLogDeq);
+ ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
@@ -1227,6 +1227,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
}
INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
@@ -1290,8 +1291,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- ATOMIC_SUB(pThis->iQueueSize, nElem);
- ATOMIC_SUB(pThis->nLogDeq, nElem);
+ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
+ ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
@@ -2066,6 +2067,7 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
diff --git a/runtime/queue.h b/runtime/queue.h
index 45d3a51b..8ede6922 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -161,6 +161,7 @@ struct queue_s {
} disk;
} tVars;
DEF_ATOMIC_HELPER_MUT(mutQueueSize);
+ DEF_ATOMIC_HELPER_MUT(mutLogDeq);
};
diff --git a/runtime/wti.c b/runtime/wti.c
index 8994d84b..77927453 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -82,7 +82,7 @@ wtiGetDbgHdr(wti_t *pThis)
sbool
wtiGetState(wti_t *pThis)
{
- return ATOMIC_FETCH_32BIT(pThis->bIsRunning);
+ return ATOMIC_FETCH_32BIT(&pThis->bIsRunning, &pThis->mutIsRunning);
}
@@ -105,10 +105,11 @@ rsRetVal
wtiSetState(wti_t *pThis, sbool bNewVal)
{
ISOBJ_TYPE_assert(pThis, wti);
- if(bNewVal)
- ATOMIC_STORE_1_TO_INT(pThis->bIsRunning);
- else
- ATOMIC_STORE_0_TO_INT(pThis->bIsRunning);
+ if(bNewVal) {
+ ATOMIC_STORE_1_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning);
+ } else {
+ ATOMIC_STORE_0_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning);
+ }
return RS_RET_OK;
}
@@ -147,7 +148,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(wti)
/* actual destruction */
free(pThis->batch.pElem);
- DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -156,7 +157,7 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
- INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
ENDobjConstruct(wti)
diff --git a/runtime/wti.h b/runtime/wti.h
index ab23f8c1..ab575427 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -39,7 +39,7 @@ struct wti_s {
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
- DEF_ATOMIC_HELPER_MUT(mutCurrCmd);
+ DEF_ATOMIC_HELPER_MUT(mutIsRunning);
};
diff --git a/runtime/wtp.c b/runtime/wtp.c
index db6d4fcb..51fab191 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -96,7 +96,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfObjProcessed = NotImplementedDummy;
- INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutWtpState);
ENDobjConstruct(wtp)
@@ -150,14 +151,15 @@ CODESTARTobjDestruct(wtp)
pthread_cond_destroy(&pThis->condThrdTrm);
pthread_mutex_destroy(&pThis->mutWtp);
pthread_attr_destroy(&pThis->attrThrd);
- DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutWtpState);
free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21
- * We do not need to do atomic instructions as set operations are only
+ * We do not need to do atomic instructions as set operations are only
* called when terminating the pool, and then in strict sequence. So we
* can never overwrite each other. On the other hand, it also doesn't
* matter if the read operation obtains an older value, as we then simply
@@ -168,7 +170,7 @@ rsRetVal
wtpSetState(wtp_t *pThis, wtpState_t iNewState)
{
ISOBJ_TYPE_assert(pThis, wtp);
- pThis->wtpState = iNewState;
+ pThis->wtpState = iNewState; // TODO: do we need a mutex here? 2010-04-26
return RS_RET_OK;
}
@@ -188,7 +190,7 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
/* we need a consistent value, but it doesn't really matter if it is changed
* right after the fetch - then we simply do one more iteration in the worker
*/
- wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState);
+ wtpState = (wtpState_t) ATOMIC_FETCH_32BIT((int*)&pThis->wtpState, &pThis->mutWtpState);
if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
@@ -233,7 +235,8 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
- wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
+ wtpGetDbgHdr(pThis), timeoutVal(ptTimeout),
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) {
DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
@@ -287,10 +290,11 @@ wtpWrkrExecCleanup(wti_t *pWti)
/* the order of the next two statements is important! */
wtiSetState(pWti, WRKTHRD_STOPPED);
- ATOMIC_DEC(pThis->iCurNumWrkThrd);
+ ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n",
- wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
+ wtpGetDbgHdr(pThis), (unsigned long) pWti,
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
ENDfunc
}
@@ -402,10 +406,11 @@ wtpStartWrkr(wtp_t *pThis)
pWti = pThis->pWrkr[i];
wtiSetState(pWti, WRKTHRD_RUNNING);
iState = pthread_create(&(pWti->thrdID), &pThis->attrThrd, wtpWorker, (void*) pWti);
- ATOMIC_INC(pThis->iCurNumWrkThrd); /* we got one more! */
+ ATOMIC_INC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); /* we got one more! */
DBGPRINTF("%s: started with state %d, num workers now %d\n",
- wtpGetDbgHdr(pThis), iState, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd));
+ wtpGetDbgHdr(pThis), iState,
+ ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
finalize_it:
d_pthread_mutex_unlock(&pThis->mutWtp);
@@ -436,7 +441,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
nMaxWrkr = pThis->iNumWorkerThreads;
- nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd);
+ nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd);
if(nMissing > 0) {
DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n",
diff --git a/runtime/wtp.h b/runtime/wtp.h
index baeba1a2..7e6b4394 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -66,7 +66,8 @@ struct wtp_s {
rsRetVal (*pfDoWork)(void *pUsr, void *pWti);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
- DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged);
+ DEF_ATOMIC_HELPER_MUT(mutCurNumWrkThrd);
+ DEF_ATOMIC_HELPER_MUT(mutWtpState);
};
/* some symbolic constants for easier reference */
@@ -84,7 +85,6 @@ rsRetVal wtpWakeupAllWrkr(wtp_t *pThis);
rsRetVal wtpCancelAll(wtp_t *pThis);
rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
-//void wtpSetThrdStateChanged(wtp_t *pThis, int val);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));