diff options
-rw-r--r-- | runtime/atomic.h | 29 | ||||
-rw-r--r-- | runtime/glbl.c | 4 | ||||
-rw-r--r-- | runtime/queue.c | 12 | ||||
-rw-r--r-- | runtime/queue.h | 1 | ||||
-rw-r--r-- | runtime/wti.c | 15 | ||||
-rw-r--r-- | runtime/wti.h | 2 | ||||
-rw-r--r-- | runtime/wtp.c | 27 | ||||
-rw-r--r-- | runtime/wtp.h | 4 |
8 files changed, 60 insertions, 34 deletions
diff --git a/runtime/atomic.h b/runtime/atomic.h index 520d2acd..30478440 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -39,13 +39,13 @@ * They simply came in too late. -- rgerhards, 2008-04-02 */ #ifdef HAVE_ATOMIC_BUILTINS -# define ATOMIC_SUB(data, val) __sync_fetch_and_sub(&(data), val) +# define ATOMIC_SUB(data, val) __sync_fetch_and_sub(data, val) # define ATOMIC_ADD(data, val) __sync_fetch_and_add(&(data), val) # define ATOMIC_INC(data, phlpmut) ((void) __sync_fetch_and_add(data, 1)) # define ATOMIC_INC_AND_FETCH(data) __sync_fetch_and_add(&(data), 1) # define ATOMIC_DEC(data, phlpmut) ((void) __sync_sub_and_fetch(data, 1)) # define ATOMIC_DEC_AND_FETCH(data, phlpmut) __sync_sub_and_fetch(data, 1) -# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff)) +# define ATOMIC_FETCH_32BIT(data, phlpmut) ((unsigned) __sync_fetch_and_and(data, 0xffffffff)) # define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1) # define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0) # define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1) @@ -79,15 +79,15 @@ } # define ATOMIC_STORE_0_TO_INT(data, hlpmut) { \ - pthread_mutex_lock(&hlpmut); \ + pthread_mutex_lock(hlpmut); \ *(data) = 0; \ - pthread_mutex_unlock(&hlpmut); \ + pthread_mutex_unlock(hlpmut); \ } # define ATOMIC_STORE_1_TO_INT(data, hlpmut) { \ - pthread_mutex_lock(&hlpmut); \ + pthread_mutex_lock(hlpmut); \ *(data) = 1; \ - pthread_mutex_unlock(&hlpmut); \ + pthread_mutex_unlock(hlpmut); \ } static inline int @@ -116,10 +116,25 @@ pthread_mutex_unlock(phlpmut); return(val); } + + static inline int + ATOMIC_FETCH_32BIT(int *data, pthread_mutex_t *phlpmut) { + int val; + pthread_mutex_lock(phlpmut); + val = (*data); + pthread_mutex_unlock(phlpmut); + return(val); + } + + static inline void + ATOMIC_SUB(int *data, int val, pthread_mutex_t *phlpmut) { + pthread_mutex_lock(phlpmut); + (*data) -= val; + pthread_mutex_unlock(phlpmut); + } #if 0 # warning "atomic builtins not available, using nul operations - rsyslogd will probably be racy!" # define ATOMIC_INC_AND_FETCH(data) (++(data)) -# define ATOMIC_FETCH_32BIT(data) (data) // TODO: del # define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 // TODO: del #endif # define DEF_ATOMIC_HELPER_MUT(x) pthread_mutex_t x diff --git a/runtime/glbl.c b/runtime/glbl.c index 5951d21a..278bc4e1 100644 --- a/runtime/glbl.c +++ b/runtime/glbl.c @@ -74,7 +74,9 @@ static uchar *pszDfltNetstrmDrvrCAF = NULL; /* default CA file for the netstrm d static uchar *pszDfltNetstrmDrvrKeyFile = NULL; /* default key file for the netstrm driver (server) */ static uchar *pszDfltNetstrmDrvrCertFile = NULL; /* default cert file for the netstrm driver (server) */ static int bTerminateInputs = 0; /* global switch that inputs shall terminate ASAP (1=> terminate) */ +#ifndef HAVE_ATOMIC_BUILTINS static DEF_ATOMIC_HELPER_MUT(mutTerminateInputs); +#endif #ifdef USE_UNLIMITED_SELECT static int iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); /* size of select() bitmask in bytes */ #endif @@ -131,7 +133,7 @@ SIMP_PROP_SET(DfltNetstrmDrvrCertFile, pszDfltNetstrmDrvrCertFile, uchar*) /* TO */ static int GetGlobalInputTermState(void) { - return ATOMIC_FETCH_32BIT(bTerminateInputs); + return ATOMIC_FETCH_32BIT(&bTerminateInputs, &mutTerminateInputs); } diff --git a/runtime/queue.c b/runtime/queue.c index bf2164a6..4f0d36b9 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -215,7 +215,7 @@ static inline void queueDrain(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { - pThis->qDel(pThis, &pUsr); + pThis->qDeq(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); } @@ -884,7 +884,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) CHKiRet(pThis->qAdd(pThis, pUsr)); if(pThis->qType != QUEUETYPE_DIRECT) { - ATOMIC_INC(pThis->iQueueSize); + ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } @@ -909,7 +909,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ iRet = pThis->qDeq(pThis, ppUsr); - ATOMIC_INC(pThis->nLogDeq); + ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); // DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", // getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -1227,6 +1227,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread } INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq); finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP @@ -1290,8 +1291,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) } /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - ATOMIC_SUB(pThis->iQueueSize, nElem); - ATOMIC_SUB(pThis->nLogDeq, nElem); + ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); + ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -2066,6 +2067,7 @@ CODESTARTobjDestruct(qqueue) pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq); /* type-specific destructor */ iRet = pThis->qDestruct(pThis); diff --git a/runtime/queue.h b/runtime/queue.h index 45d3a51b..8ede6922 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -161,6 +161,7 @@ struct queue_s { } disk; } tVars; DEF_ATOMIC_HELPER_MUT(mutQueueSize); + DEF_ATOMIC_HELPER_MUT(mutLogDeq); }; diff --git a/runtime/wti.c b/runtime/wti.c index 8994d84b..77927453 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -82,7 +82,7 @@ wtiGetDbgHdr(wti_t *pThis) sbool wtiGetState(wti_t *pThis) { - return ATOMIC_FETCH_32BIT(pThis->bIsRunning); + return ATOMIC_FETCH_32BIT(&pThis->bIsRunning, &pThis->mutIsRunning); } @@ -105,10 +105,11 @@ rsRetVal wtiSetState(wti_t *pThis, sbool bNewVal) { ISOBJ_TYPE_assert(pThis, wti); - if(bNewVal) - ATOMIC_STORE_1_TO_INT(pThis->bIsRunning); - else - ATOMIC_STORE_0_TO_INT(pThis->bIsRunning); + if(bNewVal) { + ATOMIC_STORE_1_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning); + } else { + ATOMIC_STORE_0_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning); + } return RS_RET_OK; } @@ -147,7 +148,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ free(pThis->batch.pElem); - DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurrCmd); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -156,7 +157,7 @@ ENDobjDestruct(wti) /* Standard-Constructor for the wti object */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ - INIT_ATOMIC_HELPER_MUT(pThis->mutCurrCmd); + INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning); ENDobjConstruct(wti) diff --git a/runtime/wti.h b/runtime/wti.h index ab23f8c1..ab575427 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -39,7 +39,7 @@ struct wti_s { wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ - DEF_ATOMIC_HELPER_MUT(mutCurrCmd); + DEF_ATOMIC_HELPER_MUT(mutIsRunning); }; diff --git a/runtime/wtp.c b/runtime/wtp.c index db6d4fcb..51fab191 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -96,7 +96,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfObjProcessed = NotImplementedDummy; - INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); + INIT_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd); + INIT_ATOMIC_HELPER_MUT(pThis->mutWtpState); ENDobjConstruct(wtp) @@ -150,14 +151,15 @@ CODESTARTobjDestruct(wtp) pthread_cond_destroy(&pThis->condThrdTrm); pthread_mutex_destroy(&pThis->mutWtp); pthread_attr_destroy(&pThis->attrThrd); - DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutWtpState); free(pThis->pszDbgHdr); ENDobjDestruct(wtp) /* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21 - * We do not need to do atomic instructions as set operations are only + * We do not need to do atomic instructions as set operations are only * called when terminating the pool, and then in strict sequence. So we * can never overwrite each other. On the other hand, it also doesn't * matter if the read operation obtains an older value, as we then simply @@ -168,7 +170,7 @@ rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState) { ISOBJ_TYPE_assert(pThis, wtp); - pThis->wtpState = iNewState; + pThis->wtpState = iNewState; // TODO: do we need a mutex here? 2010-04-26 return RS_RET_OK; } @@ -188,7 +190,7 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex) /* we need a consistent value, but it doesn't really matter if it is changed * right after the fetch - then we simply do one more iteration in the worker */ - wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState); + wtpState = (wtpState_t) ATOMIC_FETCH_32BIT((int*)&pThis->wtpState, &pThis->mutWtpState); if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) { ABORT_FINALIZE(RS_RET_TERMINATE_NOW); @@ -233,7 +235,8 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n", - wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); + wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), + ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) { DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); @@ -287,10 +290,11 @@ wtpWrkrExecCleanup(wti_t *pWti) /* the order of the next two statements is important! */ wtiSetState(pWti, WRKTHRD_STOPPED); - ATOMIC_DEC(pThis->iCurNumWrkThrd); + ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", - wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); + wtpGetDbgHdr(pThis), (unsigned long) pWti, + ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); ENDfunc } @@ -402,10 +406,11 @@ wtpStartWrkr(wtp_t *pThis) pWti = pThis->pWrkr[i]; wtiSetState(pWti, WRKTHRD_RUNNING); iState = pthread_create(&(pWti->thrdID), &pThis->attrThrd, wtpWorker, (void*) pWti); - ATOMIC_INC(pThis->iCurNumWrkThrd); /* we got one more! */ + ATOMIC_INC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); /* we got one more! */ DBGPRINTF("%s: started with state %d, num workers now %d\n", - wtpGetDbgHdr(pThis), iState, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); + wtpGetDbgHdr(pThis), iState, + ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); finalize_it: d_pthread_mutex_unlock(&pThis->mutWtp); @@ -436,7 +441,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */ nMaxWrkr = pThis->iNumWorkerThreads; - nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); + nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); if(nMissing > 0) { DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", diff --git a/runtime/wtp.h b/runtime/wtp.h index baeba1a2..7e6b4394 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -66,7 +66,8 @@ struct wtp_s { rsRetVal (*pfDoWork)(void *pUsr, void *pWti); /* end user objects */ uchar *pszDbgHdr; /* header string for debug messages */ - DEF_ATOMIC_HELPER_MUT(mutThrdStateChanged); + DEF_ATOMIC_HELPER_MUT(mutCurNumWrkThrd); + DEF_ATOMIC_HELPER_MUT(mutWtpState); }; /* some symbolic constants for easier reference */ @@ -84,7 +85,6 @@ rsRetVal wtpWakeupAllWrkr(wtp_t *pThis); rsRetVal wtpCancelAll(wtp_t *pThis); rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout); -//void wtpSetThrdStateChanged(wtp_t *pThis, int val); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); |