From 4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 20 Jul 2009 11:52:05 +0200 Subject: enhanced worker thread pool by atomic ops ... greater performance and was able to remove a potential troublespot in a cancel cleanup handler. --- runtime/wti.c | 12 +++++++----- runtime/wti.h | 2 +- runtime/wtp.c | 41 +++++++++++++++-------------------------- 3 files changed, 23 insertions(+), 32 deletions(-) diff --git a/runtime/wti.c b/runtime/wti.c index 1d8f075f..91c63ffe 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -82,7 +82,7 @@ wtiGetDbgHdr(wti_t *pThis) bool wtiGetState(wti_t *pThis) { - return pThis->bIsRunning; + return ATOMIC_FETCH_32BIT(pThis->bIsRunning); } @@ -105,14 +105,16 @@ rsRetVal wtiSetState(wti_t *pThis, bool bNewVal) { ISOBJ_TYPE_assert(pThis, wti); - pThis->bIsRunning = bNewVal; + if(bNewVal) + ATOMIC_STORE_1_TO_INT(pThis->bIsRunning); + else + ATOMIC_STORE_0_TO_INT(pThis->bIsRunning); return RS_RET_OK; } /* Cancel the thread. If the thread is not running. But it is save and legal to * call wtiCancelThrd() in such situations. - * IMPORTANT: WTP mutex must be locked while this function is called! * rgerhards, 2008-02-26 */ rsRetVal @@ -122,7 +124,7 @@ wtiCancelThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - if(pThis->bIsRunning) { + if(wtiGetState(pThis)) { dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); } @@ -159,7 +161,7 @@ wtiConstructFinalize(wti_t *pThis) dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); - /* initialize our thread instance descriptor */ + /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = FALSE; /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ diff --git a/runtime/wti.h b/runtime/wti.h index cd408bde..f466a053 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -34,8 +34,8 @@ struct wti_s { BEGINobjInstance; pthread_t thrdID; /* thread ID */ + int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */ bool bAlwaysRunning; /* should this thread always run? */ - bool bIsRunning; /* is this thread currently running? */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ diff --git a/runtime/wtp.c b/runtime/wtp.c index e8bc5120..23726802 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -116,7 +116,7 @@ wtpConstructFinalize(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ @@ -246,11 +246,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", - wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); + DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n", + wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) { - dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } } @@ -275,12 +275,10 @@ wtpCancelAll(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - d_pthread_mutex_lock(&pThis->mutWtp); /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { wtiCancelThrd(pThis->pWrkr[i]); } - d_pthread_mutex_unlock(&pThis->mutWtp); RETiRet; } @@ -301,16 +299,12 @@ wtpWrkrExecCancelCleanup(void *arg) pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); - // TODO: the mutex_lock is dangerous, if we are cancelled within some function - // that already has the mutex locked... - d_pthread_mutex_lock(&pThis->mutWtp); wtiSetState(pWti, WRKTHRD_STOPPED); - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mutWtp); + ATOMIC_DEC(pThis->iCurNumWrkThrd); + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", - wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); + wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); ENDfunc } @@ -391,10 +385,10 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti); pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */ - pThis->iCurNumWrkThrd++; /* we got one more! */ + ATOMIC_INC(pThis->iCurNumWrkThrd); /* we got one more! */ - dbgprintf("%s: started with state %d, num workers now %d\n", - wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); + DBGPRINTF("%s: started with state %d, num workers now %d\n", + wtpGetDbgHdr(pThis), iState, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp); @@ -422,28 +416,23 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) if(nMaxWrkr == 0) FINALIZE; - d_pthread_mutex_lock(&pThis->mutWtp); - if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */ nMaxWrkr = pThis->iNumWorkerThreads; - nMissing = nMaxWrkr - pThis->iCurNumWrkThrd; + nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); if(nMissing > 0) { - dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); + DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); /* start the rqtd nbr of workers */ for(i = 0 ; i < nMissing ; ++i) { - CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); - } - } else { - if(nMaxWrkr > 0) { - pthread_cond_signal(pThis->pcondBusy); + CHKiRet(wtpStartWrkr(pThis, LOCK_MUTEX)); } + } else { + pthread_cond_signal(pThis->pcondBusy); } finalize_it: - d_pthread_mutex_unlock(&pThis->mutWtp); RETiRet; } -- cgit