diff options
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r-- | runtime/wtp.c | 438 |
1 files changed, 146 insertions, 292 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c index fff37c2f..ece80911 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -8,7 +8,7 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -44,9 +44,10 @@ # include <sys/prctl.h> #endif -#ifdef OS_SOLARIS -# include <sched.h> -#endif +/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 +//#ifdef OS_SOLARIS +//# include <sched.h> +//#endif #include "rsyslog.h" #include "stringbuf.h" @@ -82,22 +83,21 @@ wtpGetDbgHdr(wtp_t *pThis) /* Not implemented dummy function for constructor */ -static rsRetVal NotImplementedDummy() { return RS_RET_OK; } +static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } /* Standard-Constructor for the wtp object */ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ - pthread_mutex_init(&pThis->mut, NULL); - pthread_mutex_init(&pThis->mutThrdShutdwn, NULL); + pthread_mutex_init(&pThis->mutWtp, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); + pthread_attr_init(&pThis->attrThrd); + pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; - pThis->pfIsIdle = NotImplementedDummy; + pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; - pThis->pfOnIdle = NotImplementedDummy; - pThis->pfOnWorkerCancel = NotImplementedDummy; - pThis->pfOnWorkerStartup = NotImplementedDummy; - pThis->pfOnWorkerShutdown = NotImplementedDummy; - INIT_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); + pThis->pfObjProcessed = NotImplementedDummy; + INIT_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd); + INIT_ATOMIC_HELPER_MUT(pThis->mutWtpState); ENDobjConstruct(wtp) @@ -115,13 +115,12 @@ wtpConstructFinalize(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ - if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - + CHKmalloc(pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads)); + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); pWti = pThis->pWrkr[i]; @@ -141,8 +140,6 @@ finalize_it: BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(wtp) - wtpProcessThrdChanges(pThis); /* process thread changes one last time */ - /* destruct workers */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) wtiDestruct(&pThis->pWrkr[i]); @@ -152,149 +149,70 @@ CODESTARTobjDestruct(wtp) /* actual destruction */ pthread_cond_destroy(&pThis->condThrdTrm); - pthread_mutex_destroy(&pThis->mut); - pthread_mutex_destroy(&pThis->mutThrdShutdwn); - DESTROY_ATOMIC_HELPER_MUT(pThis->mutThrdStateChanged); + pthread_mutex_destroy(&pThis->mutWtp); + pthread_attr_destroy(&pThis->attrThrd); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutCurNumWrkThrd); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutWtpState); free(pThis->pszDbgHdr); ENDobjDestruct(wtp) -/* wake up at least one worker thread. - * rgerhards, 2008-01-20 - */ -rsRetVal -wtpWakeupWrkr(wtp_t *pThis) -{ - DEFiRet; - - /* TODO; mutex? I think not needed, as we do not need predictable exec order -- rgerhards, 2008-01-28 */ - ISOBJ_TYPE_assert(pThis, wtp); - pthread_cond_signal(pThis->pcondBusy); - RETiRet; -} - -/* wake up all worker threads. - * rgerhards, 2008-01-16 - */ -rsRetVal -wtpWakeupAllWrkr(wtp_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wtp); - pthread_cond_broadcast(pThis->pcondBusy); - RETiRet; -} - - -/* set the bThrdStateChanged in an atomic way. Note that - * val may only be 0 or 1. - */ -void -wtpSetThrdStateChanged(wtp_t *pThis, int val) -{ - if(val == 0) { - ATOMIC_STORE_0_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged); - } else { - ATOMIC_STORE_1_TO_INT(&pThis->bThrdStateChanged, pThis->mutThrdStateChanged); - } -} - - -/* check if we had any worker thread changes and, if so, act - * on them. At a minimum, terminated threads are harvested (joined). - * This function MUST NEVER block on the queue mutex! - */ -rsRetVal -wtpProcessThrdChanges(wtp_t *pThis) -{ - DEFiRet; - int i; - - ISOBJ_TYPE_assert(pThis, wtp); - - if(pThis->bThrdStateChanged == 0) - FINALIZE; - - if(d_pthread_mutex_trylock(&(pThis->mutThrdShutdwn)) != 0) { - /* another thread is already in the loop */ - FINALIZE; - } - - /* Note: there is a left-over potential race condition below: - * pThis->bThrdStateChanged may be re-set by another thread while - * we work on it and thus the loop may terminate too early. However, - * there are no really bad effects from that so I perfer - for this - * version - to live with the problem as is. Not a good idea to - * introduce that large change into the stable branch without very - * good reason. -- rgerhards, 2009-04-02 - */ - do { - /* reset the change marker */ - wtpSetThrdStateChanged(pThis, 0); - /* go through all threads */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX); - } - /* restart if another change occured while we were processing the changes */ - } while(pThis->bThrdStateChanged != 0); - - d_pthread_mutex_unlock(&(pThis->mutThrdShutdwn)); - -finalize_it: - RETiRet; -} - - -/* Sent a specific state for the worker thread pool. - * rgerhards, 2008-01-21 +/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21 + * We do not need to do atomic instructions as set operations are only + * called when terminating the pool, and then in strict sequence. So we + * can never overwrite each other. On the other hand, it also doesn't + * matter if the read operation obtains an older value, as we then simply + * do one more iteration, what is perfectly legal (during shutdown + * they are awoken in any case). -- rgerhards, 2009-07-20 */ rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState) { - DEFiRet; - ISOBJ_TYPE_assert(pThis, wtp); - pThis->wtpState = iNewState; - /* TODO: must wakeup workers? seen to be not needed -- rgerhards, 2008-01-28 */ - - RETiRet; + pThis->wtpState = iNewState; // TODO: do we need a mutex here? 2010-04-26 + return RS_RET_OK; } /* check if the worker shall shutdown (1 = yes, 0 = no) - * TODO: check if we can use atomic operations to enhance performance * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" * (e.g. the queue clas) * rgerhards, 2008-01-21 */ rsRetVal -wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) +wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex) { DEFiRet; - DEFVARS_mutexProtection; + wtpState_t wtpState; ISOBJ_TYPE_assert(pThis, wtp); + /* we need a consistent value, but it doesn't really matter if it is changed + * right after the fetch - then we simply do one more iteration in the worker + */ + wtpState = (wtpState_t) ATOMIC_FETCH_32BIT((int*)&pThis->wtpState, &pThis->mutWtpState); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) - || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) - iRet = RS_RET_TERMINATE_NOW; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) { + ABORT_FINALIZE(RS_RET_TERMINATE_NOW); + } else if(wtpState == wtpState_SHUTDOWN) { + ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); + } /* try customer handler if one was set and we do not yet have a definite result */ - if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) { + if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } +finalize_it: RETiRet; } #pragma GCC diagnostic ignored "-Wempty-body" /* Send a shutdown command to all workers and see if they terminate. - * A timeout may be specified. + * A timeout may be specified. This function may also be called with + * the current number of workers being 0, in which case it does not + * shut down any worker. * rgerhards, 2008-01-14 */ rsRetVal @@ -302,72 +220,50 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout { DEFiRet; int bTimedOut; - int iCancelStateSave; + int i; ISOBJ_TYPE_assert(pThis, wtp); + /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ + d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - wtpWakeupAllWrkr(pThis); + pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ + /* awake workers in retry loop */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + wtiWakeupThrd(pThis->pWrkr[i]); + } + d_pthread_mutex_unlock(pThis->pmutUsr); - /* see if we need to harvest (join) any terminated threads (even in timeout case, - * some may have terminated... - */ - wtpProcessThrdChanges(pThis); - - /* and wait for their termination */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mut); - pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + /* wait for worker thread termination */ + d_pthread_mutex_lock(&pThis->mutWtp); + pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", - wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); + DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n", + wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), + ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); - if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) { - dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); + if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutWtp, ptTimeout) != 0) { + DBGPRINTF("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } + + /* awake workers in retry loop */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + wtiWakeupThrd(pThis->pWrkr[i]); + } + } pthread_cleanup_pop(1); if(bTimedOut) iRet = RS_RET_TIMED_OUT; - /* see if we need to harvest (join) any terminated threads (even in timeout case, - * some may have terminated... - */ - wtpProcessThrdChanges(pThis); - RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" -/* indicate that a thread has terminated and awake anyone waiting on it - * rgerhards, 2008-01-23 - */ -rsRetVal wtpSignalWrkrTermination(wtp_t *pThis) -{ - DEFiRet; - /* I leave the mutex code here out as it gives us deadlocks. I think it is not really - * needed and we are on the safe side. I leave this comment in if practice proves us - * wrong. The whole thing should be removed after half a year or year if we see there - * actually is no issue (or revisit it from a theoretical POV). - * rgerhards, 2008-01-28 - * revisited 2008-09-30, still a bit unclear, leave in - */ - /*TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;*/ - - ISOBJ_TYPE_assert(pThis, wtp); - - /*BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);*/ - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - /*END_MTX_PROTECTED_OPERATIONS(&pThis->mut);*/ - RETiRet; -} - - /* Unconditionally cancel all running worker threads. * rgerhards, 2008-01-14 */ @@ -379,12 +275,8 @@ wtpCancelAll(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - /* process any pending thread requests so that we know who actually is still running */ - wtpProcessThrdChanges(pThis); - /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); wtiCancelThrd(pThis->pWrkr[i]); } @@ -392,40 +284,57 @@ wtpCancelAll(wtp_t *pThis) } - -/* Set the Inactivity Guard - * rgerhards, 2008-01-21 +/* this function contains shared code for both regular worker shutdown as + * well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1) + * as this introduces a race in the debug system (RETiRet system). + * rgerhards, 2009-10-26 */ -rsRetVal -wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) +static inline void +wtpWrkrExecCleanup(wti_t *pWti) { - DEFiRet; - DEFVARS_mutexProtection; + wtp_t *pThis; - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - pThis->bInactivityGuard = bNewState; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + BEGINfunc + ISOBJ_TYPE_assert(pWti, wti); + pThis = pWti->pWtp; + ISOBJ_TYPE_assert(pThis, wtp); - RETiRet; + /* the order of the next two statements is important! */ + wtiSetState(pWti, WRKTHRD_STOPPED); + ATOMIC_DEC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); + + DBGPRINTF("%s: Worker thread %lx, terminated, um workers now %d\n", + wtpGetDbgHdr(pThis), (unsigned long) pWti, + ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); + + ENDfunc } -/* cancellation cleanup handler for executing worker - * decrements the worker counter - * rgerhards, 2008-01-20 +/* cancellation cleanup handler for executing worker decrements the worker counter. + * rgerhards, 2009-07-20 */ -void +static void wtpWrkrExecCancelCleanup(void *arg) { - wtp_t *pThis = (wtp_t*) arg; + wti_t *pWti = (wti_t*) arg; + wtp_t *pThis; BEGINfunc + ISOBJ_TYPE_assert(pWti, wti); + pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); - pThis->iCurNumWrkThrd--; - wtpSignalWrkrTermination(pThis); + DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n", + wtpGetDbgHdr(pThis), (unsigned long) pWti); + + wtpWrkrExecCleanup(pWti); - dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); ENDfunc + /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main + * thread after the broadcast, which could destroy the debug class, resulting in a potential + * segfault. So we need to do the broadcast as actually the last action in our processing + */ + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ } @@ -437,8 +346,6 @@ wtpWrkrExecCancelCleanup(void *arg) static void * wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */ { - DEFiRet; - DEFVARS_mutexProtection; wti_t *pWti = (wti_t*) arg; wtp_t *pThis; sigset_t sigSet; @@ -447,13 +354,20 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in uchar thrdName[32] = "rs:"; # endif + BEGINfunc ISOBJ_TYPE_assert(pWti, wti); pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); + /* block all signals */ sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + /* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */ + sigemptyset(&sigSet); + sigaddset(&sigSet, SIGTTIN); + pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL); + # if HAVE_PRCTL && defined PR_SET_NAME /* set thread name - we ignore if the call fails, has no harsh consequences... */ pszDbgHdr = wtpGetDbgHdr(pThis); @@ -463,41 +377,17 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in } # endif - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); - - /* do some late initialization */ - - pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis); - - /* finally change to RUNNING state. We need to check if we actually should still run, - * because someone may have requested us to shut down even before we got a chance to do - * our init. That would be a bad race... -- rgerhards, 2008-01-16 - */ - wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */ - - do { - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - - iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */ - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); - } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1); - /* inactivity guard prevents shutdown of all workers while one should be running due to race - * condition. It can lead to one more worker running than desired, but that is acceptable. After - * all, that worker will shutdown itself due to inactivity timeout. If, however, none were running - * when one was required, processing could come to a halt. -- rgerhards, 2008-01-21 - */ - + pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti); + wtiWorker(pWti); pthread_cleanup_pop(0); - pThis->iCurNumWrkThrd--; - wtpSignalWrkrTermination(pThis); - - dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", - wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); - - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + wtpWrkrExecCleanup(pWti); ENDfunc + /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main + * thread after the broadcast, which could destroy the debug class, resulting in a potential + * segfault. So we need to do the broadcast as actually the last action in our processing + */ + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ pthread_exit(0); } #pragma GCC diagnostic warning "-Wempty-body" @@ -505,27 +395,20 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in /* start a new worker */ static rsRetVal -wtpStartWrkr(wtp_t *pThis, int bLockMutex) +wtpStartWrkr(wtp_t *pThis) { - DEFiRet; - DEFVARS_mutexProtection; wti_t *pWti; int i; int iState; + DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); - wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls + d_pthread_mutex_lock(&pThis->mutWtp); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - - pThis->iCurNumWrkThrd++; - - /* find free spot in thread table. If we find at least one worker that is in initialization, - * we do NOT start a new one. Let's give the other one a chance, first. - */ + /* find free spot in thread table. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { + if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) { break; } } @@ -533,17 +416,21 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) if(i == pThis->iNumWorkerThreads) ABORT_FINALIZE(RS_RET_NO_MORE_THREADS); + if(i == 0 || pThis->toWrkShutdown == -1) { + wtiSetAlwaysRunning(pThis->pWrkr[i]); + } + pWti = pThis->pWrkr[i]; - wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX); - iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti); - dbgprintf("%s: started with state %d, num workers now %d\n", - wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); + wtiSetState(pWti, WRKTHRD_RUNNING); + iState = pthread_create(&(pWti->thrdID), &pThis->attrThrd, wtpWorker, (void*) pWti); + ATOMIC_INC(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); /* we got one more! */ - /* indicate we just started a worker and would like to see it running */ - wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED); + DBGPRINTF("%s: started with state %d, num workers now %d\n", + wtpGetDbgHdr(pThis), iState, + ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); finalize_it: - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + d_pthread_mutex_unlock(&pThis->mutWtp); RETiRet; } @@ -560,7 +447,6 @@ rsRetVal wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) { DEFiRet; - DEFVARS_mutexProtection; int nMissing; /* number workers missing to run */ int i; @@ -569,29 +455,24 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) if(nMaxWrkr == 0) FINALIZE; - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); - if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */ nMaxWrkr = pThis->iNumWorkerThreads; - nMissing = nMaxWrkr - pThis->iCurNumWrkThrd; + nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd); if(nMissing > 0) { - dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); + DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", + wtpGetDbgHdr(pThis), nMissing); /* start the rqtd nbr of workers */ for(i = 0 ; i < nMissing ; ++i) { - CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); - } - } else { - if(nMaxWrkr > 0) { - dbgprintf("wtpAdviseMaxWorkers signals busy\n"); - wtpWakeupWrkr(pThis); + CHKiRet(wtpStartWrkr(pThis)); } + } else { + pthread_cond_signal(pThis->pcondBusy); } finalize_it: - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -605,35 +486,9 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) -DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) -DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) -DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) -DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)) - - -/* return the current number of worker threads. - * TODO: atomic operation would bring a nice performance - * enhancemcent - * rgerhards, 2008-01-27 - */ -int -wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex) -{ - DEFVARS_mutexProtection; - int iNumWrkr; - - BEGINfunc - ISOBJ_TYPE_assert(pThis, wtp); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - iNumWrkr = pThis->iCurNumWrkThrd; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - - ENDfunc - return iNumWrkr; -} +DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) +DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*)) +DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) /* set the debug header message @@ -657,7 +512,7 @@ wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg) pThis->pszDbgHdr = NULL; } - if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL) + if((pThis->pszDbgHdr = MALLOC(sizeof(uchar) * lenMsg + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */ @@ -687,6 +542,5 @@ BEGINObjClassInit(wtp, 1, OBJ_IS_CORE_MODULE) CHKiRet(objUse(glbl, CORE_COMPONENT)); ENDObjClassInit(wtp) -/* - * vi:set ai: +/* vi:set ai: */ |