From 4ec7b9d9ec12d91dde3d030bdaf87cfdd6b5d81d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 20 Jul 2009 11:52:05 +0200 Subject: enhanced worker thread pool by atomic ops ... greater performance and was able to remove a potential troublespot in a cancel cleanup handler. --- runtime/wtp.c | 41 +++++++++++++++-------------------------- 1 file changed, 15 insertions(+), 26 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index e8bc5120..23726802 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -116,7 +116,7 @@ wtpConstructFinalize(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ @@ -246,11 +246,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", - wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); + DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n", + wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) { - dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } } @@ -275,12 +275,10 @@ wtpCancelAll(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - d_pthread_mutex_lock(&pThis->mutWtp); /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { wtiCancelThrd(pThis->pWrkr[i]); } - d_pthread_mutex_unlock(&pThis->mutWtp); RETiRet; } @@ -301,16 +299,12 @@ wtpWrkrExecCancelCleanup(void *arg) pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); - // TODO: the mutex_lock is dangerous, if we are cancelled within some function - // that already has the mutex locked... - d_pthread_mutex_lock(&pThis->mutWtp); wtiSetState(pWti, WRKTHRD_STOPPED); - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mutWtp); + ATOMIC_DEC(pThis->iCurNumWrkThrd); + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", - wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); + wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); ENDfunc } @@ -391,10 +385,10 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti); pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */ - pThis->iCurNumWrkThrd++; /* we got one more! */ + ATOMIC_INC(pThis->iCurNumWrkThrd); /* we got one more! */ - dbgprintf("%s: started with state %d, num workers now %d\n", - wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); + DBGPRINTF("%s: started with state %d, num workers now %d\n", + wtpGetDbgHdr(pThis), iState, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp); @@ -422,28 +416,23 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) if(nMaxWrkr == 0) FINALIZE; - d_pthread_mutex_lock(&pThis->mutWtp); - if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */ nMaxWrkr = pThis->iNumWorkerThreads; - nMissing = nMaxWrkr - pThis->iCurNumWrkThrd; + nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); if(nMissing > 0) { - dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); + DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); /* start the rqtd nbr of workers */ for(i = 0 ; i < nMissing ; ++i) { - CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); - } - } else { - if(nMaxWrkr > 0) { - pthread_cond_signal(pThis->pcondBusy); + CHKiRet(wtpStartWrkr(pThis, LOCK_MUTEX)); } + } else { + pthread_cond_signal(pThis->pcondBusy); } finalize_it: - d_pthread_mutex_unlock(&pThis->mutWtp); RETiRet; } -- cgit