diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-25 11:06:42 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-06-25 11:06:42 +0200 |
commit | 46024834449840dabf399dda196c9dd11cf78ace (patch) | |
tree | 36dc3dc8f85d756afea2924398d5deb679852026 /runtime | |
parent | c60cbd99df4710545587659be6344392e99745ff (diff) | |
download | rsyslog-46024834449840dabf399dda196c9dd11cf78ace.tar.gz rsyslog-46024834449840dabf399dda196c9dd11cf78ace.tar.xz rsyslog-46024834449840dabf399dda196c9dd11cf78ace.zip |
added a few atomic operations
mostly to get thread debugger errors clean (plus, of course, it
makes things more deterministic)
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/atomic.h | 4 | ||||
-rw-r--r-- | runtime/queue.c | 17 | ||||
-rw-r--r-- | runtime/wti.c | 24 | ||||
-rw-r--r-- | runtime/wtp.c | 3 |
4 files changed, 28 insertions, 20 deletions
diff --git a/runtime/atomic.h b/runtime/atomic.h index fdf64214..4cb832f2 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -46,6 +46,10 @@ # define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1) # define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff)) # define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1) +# define ATOMIC_STORE_0_TO_INT(data) __sync_fetch_and_and(&(data), 0) +# define ATOMIC_STORE_1_TO_INT(data) __sync_fetch_and_or(&(data), 1) +# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) +# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); #else /* note that we gained parctical proof that theoretical problems DO occur * if we do not properly address them. See this blog post for details: diff --git a/runtime/queue.c b/runtime/queue.c index 1ae386e7..5102b0df 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1649,6 +1649,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDeleted = pWti->batch.nElemDeq; DeleteProcessedBatch(pThis, &pWti->batch); +//int iii = pthread_mutex_trylock(pThis->mut); +//char errStr[1024]; +//rs_strerror_r(iii, errStr, sizeof(errStr)); +//dbgprintf("DequeueConsumableElemnts mutex locked: %d (16 is EBUSY = OK): %s\n", iii, errStr); nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { //dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); @@ -2473,15 +2477,6 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) ISOBJ_TYPE_assert(pThis, qqueue); - /* first check if we need to discard this message (which will cause CHKiRet() to exit) - * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The queue size - * and bRunsDA parameters may not reflect the correct settings here, but they are - * "good enough" in the sense that they can be used to drive the decision. Valgrind's - * threading tools may point this access to be an error, but this is done - * intentional. I do not see this causes problems to us. - */ - CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr)); - /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the @@ -2493,6 +2488,10 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) d_pthread_mutex_lock(pThis->mut); } + /* first check if we need to discard this message (which will cause CHKiRet() to exit) + */ + CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr)); + /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(ChkStrtDA(pThis)); diff --git a/runtime/wti.c b/runtime/wti.c index e43c6cce..c9fc4879 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -51,6 +51,7 @@ #include "wti.h" #include "obj.h" #include "glbl.h" +#include "atomic.h" /* static data */ DEFobjStaticHelpers @@ -106,6 +107,7 @@ rsRetVal wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) { DEFiRet; + qWrkCmd_t tCurrCmd; DEFVARS_mutexProtection; ISOBJ_TYPE_assert(pThis, wti); @@ -113,13 +115,14 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + tCurrCmd = pThis->tCurrCmd; /* all worker states must be followed sequentially, only termination can be set in any state */ - if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED)) - || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { - dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n", - wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd); + if( (bActiveOnly && (tCurrCmd < eWRKTHRD_RUN_CREATED)) + || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { + DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n", + wtiGetDbgHdr(pThis), tCmd, tCurrCmd); } else { - dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd); + DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd); /* we could replace this with a simple if, but we leave the switch in in case we need * to add something at a later stage. -- rgerhards, 2008-09-30 */ @@ -143,7 +146,8 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) /* DO NOTHING */ break; } - pThis->tCurrCmd = tCmd; /* apply the new state */ + /* better do a CAS? */ + ATOMIC_STORE_INT_TO_INT(pThis->tCurrCmd, tCmd); /* apply the new state */ } END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -169,7 +173,7 @@ wtiCancelThrd(wti_t *pThis) dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ } d_pthread_mutex_unlock(&pThis->mut); @@ -324,7 +328,7 @@ wtiWorkerCancelCleanup(void *arg) d_pthread_mutex_lock(&pWtp->mut); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ - pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pWtp->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -409,8 +413,8 @@ wtiWorker(wti_t *pThis) } /* try to execute and process whatever we have */ - localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); /* This function must and does RELEASE the MUTEX! */ + localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); bMutexIsLocked = FALSE; if(localRet == RS_RET_IDLE) { @@ -445,7 +449,7 @@ RUNLOG_STR("XXX: Worker shutdown"); pWtp->pfOnWorkerShutdown(pWtp->pUsr); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/wtp.c b/runtime/wtp.c index 267555cd..f5769a72 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -39,6 +39,7 @@ #include <fcntl.h> #include <unistd.h> #include <errno.h> +#include <atomic.h> /// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 //#ifdef OS_SOLARIS @@ -217,7 +218,7 @@ wtpProcessThrdChanges(wtp_t *pThis) */ do { /* reset the change marker */ - pThis->bThrdStateChanged = 0; + ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged); /* go through all threads */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX); |