diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-17 19:59:22 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-17 19:59:22 +0200 |
commit | ac186f1c3d6d4a8fa4230a205c7fce91b3fef535 (patch) | |
tree | d8e52e481c1c7903d21bbc5e4383299062e85061 /runtime | |
parent | 4c9eded44dbae1701bb3b8f255865892b19e7f72 (diff) | |
download | rsyslog-ac186f1c3d6d4a8fa4230a205c7fce91b3fef535.tar.gz rsyslog-ac186f1c3d6d4a8fa4230a205c7fce91b3fef535.tar.xz rsyslog-ac186f1c3d6d4a8fa4230a205c7fce91b3fef535.zip |
removed mutex locks
... by utilizing that we need to modify a state variable only in
a sequential way during shutdown.
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/atomic.h | 1 | ||||
-rw-r--r-- | runtime/wti.c | 2 | ||||
-rw-r--r-- | runtime/wtp.c | 63 | ||||
-rw-r--r-- | runtime/wtp.h | 4 |
4 files changed, 36 insertions, 34 deletions
diff --git a/runtime/atomic.h b/runtime/atomic.h index f0733f09..b507b769 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -51,7 +51,6 @@ # define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1) # define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0) # define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1) -# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) # define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); # define ATOMIC_CAS_VAL(data, oldVal, newVal) __sync_val_compare_and_swap(&(data), (oldVal), (newVal)); #else diff --git a/runtime/wti.c b/runtime/wti.c index b6a09c65..b55ff69c 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -250,7 +250,7 @@ wtiWorker(wti_t *pThis) d_pthread_mutex_lock(pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ - terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED); + terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); diff --git a/runtime/wtp.c b/runtime/wtp.c index 93e87987..74304d0f 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -87,7 +87,7 @@ static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } /* Standard-Constructor for the wtp object */ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ - pthread_mutex_init(&pThis->mut, NULL); + pthread_mutex_init(&pThis->mutWtp, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; @@ -151,7 +151,7 @@ CODESTARTobjDestruct(wtp) /* actual destruction */ pthread_cond_destroy(&pThis->condThrdTrm); - pthread_mutex_destroy(&pThis->mut); + pthread_mutex_destroy(&pThis->mutWtp); free(pThis->pszDbgHdr); ENDobjDestruct(wtp) @@ -187,19 +187,20 @@ wtpWakeupAllWrkr(wtp_t *pThis) } -/* Sent a specific state for the worker thread pool. - * rgerhards, 2008-01-21 +/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21 + * We do not need to do atomic instructions as set operations are only + * called when terminating the pool, and then in strict sequence. So we + * can never overwrite each other. On the other hand, it also doesn't + * matter if the read operation obtains an older value, as we then simply + * do one more iteration, what is perfectly legal (during shutdown + * they are awoken in any case). -- rgerhards, 2009-07-20 */ rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState) { - DEFiRet; - ISOBJ_TYPE_assert(pThis, wtp); pThis->wtpState = iNewState; - /* TODO: must wakeup workers? seen to be not needed -- rgerhards, 2008-01-28 */ - - RETiRet; + return RS_RET_OK; } @@ -209,17 +210,20 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState) * rgerhards, 2008-01-21 */ rsRetVal -wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) +wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex) { DEFiRet; - DEFVARS_mutexProtection; + wtpState_t wtpState; ISOBJ_TYPE_assert(pThis, wtp); + /* we need a consistent value, but it doesn't really matter if it is changed + * right after the fetch - then we simply do one more iteration in the worker + */ + wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { + if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) { ABORT_FINALIZE(RS_RET_TERMINATE_NOW); - } else if(pThis->wtpState == wtpState_SHUTDOWN) { + } else if(wtpState == wtpState_SHUTDOWN) { ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); } @@ -229,7 +233,6 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) } finalize_it: - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -251,14 +254,14 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout wtpWakeupAllWrkr(pThis); /* wait for worker thread termination */ - d_pthread_mutex_lock(&pThis->mut); - pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); + d_pthread_mutex_lock(&pThis->mutWtp); + pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); - if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) { + if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) { dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } @@ -284,12 +287,12 @@ wtpCancelAll(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - d_pthread_mutex_lock(&pThis->mut); + d_pthread_mutex_lock(&pThis->mutWtp); /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { wtiCancelThrd(pThis->pWrkr[i]); } - d_pthread_mutex_unlock(&pThis->mut); + d_pthread_mutex_unlock(&pThis->mutWtp); RETiRet; } @@ -305,9 +308,9 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) DEFiRet; DEFVARS_mutexProtection; - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp, bLockMutex); pThis->bInactivityGuard = bNewState; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + END_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp); RETiRet; } @@ -330,11 +333,11 @@ wtpWrkrExecCancelCleanup(void *arg) // TODO: the mutex_lock is dangerous, if we are cancelled within some function // that already has the mutex locked... - d_pthread_mutex_lock(&pThis->mut); + d_pthread_mutex_lock(&pThis->mutWtp); pThis->iCurNumWrkThrd--; wtiSetState(pWti, WRKTHRD_STOPPED); pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mut); + d_pthread_mutex_unlock(&pThis->mutWtp); DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); @@ -396,7 +399,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) ISOBJ_TYPE_assert(pThis, wtp); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp, bLockMutex); pThis->iCurNumWrkThrd++; @@ -420,7 +423,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); finalize_it: - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + END_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp); RETiRet; } @@ -445,7 +448,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) if(nMaxWrkr == 0) FINALIZE; - d_pthread_mutex_lock(&pThis->mut); + d_pthread_mutex_lock(&pThis->mutWtp); if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */ nMaxWrkr = pThis->iNumWorkerThreads; @@ -466,7 +469,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) finalize_it: - d_pthread_mutex_unlock(&pThis->mut); + d_pthread_mutex_unlock(&pThis->mutWtp); RETiRet; } @@ -504,9 +507,9 @@ wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex) BEGINfunc ISOBJ_TYPE_assert(pThis, wtp); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp, bLockMutex); iNumWrkr = pThis->iCurNumWrkThrd; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + END_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp); ENDfunc return iNumWrkr; diff --git a/runtime/wtp.h b/runtime/wtp.h index cff6d3f9..69a05a14 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -51,7 +51,7 @@ struct wtp_s { bool bInactivityGuard;/* prevents inactivity due to race condition */ rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dewtpd messages */ /* synchronization variables */ - pthread_mutex_t mut; /* mutex for the wtp's thread management */ + pthread_mutex_t mutWtp; /* mutex for the wtp's thread management */ pthread_cond_t condThrdTrm;/* signalled when threads terminate */ /* end sync variables */ /* user objects */ @@ -82,7 +82,7 @@ rsRetVal wtpDestruct(wtp_t **ppThis); rsRetVal wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr); rsRetVal wtpProcessThrdChanges(wtp_t *pThis); rsRetVal wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex); -rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex); +rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex); rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState); rsRetVal wtpWakeupWrkr(wtp_t *pThis); rsRetVal wtpWakeupAllWrkr(wtp_t *pThis); |