diff options
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r-- | runtime/wtp.c | 64 |
1 files changed, 22 insertions, 42 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c index beeaf01c..93e87987 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -8,7 +8,7 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -99,7 +99,6 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pThis->pfOnWorkerCancel = NotImplementedDummy; pThis->pfOnWorkerStartup = NotImplementedDummy; pThis->pfOnWorkerShutdown = NotImplementedDummy; -dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState); ENDobjConstruct(wtp) @@ -285,11 +284,12 @@ wtpCancelAll(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); + d_pthread_mutex_lock(&pThis->mut); /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); wtiCancelThrd(pThis->pWrkr[i]); } + d_pthread_mutex_unlock(&pThis->mut); RETiRet; } @@ -320,13 +320,24 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) static void wtpWrkrExecCancelCleanup(void *arg) { - wtp_t *pThis = (wtp_t*) arg; + wti_t *pWti = (wti_t*) arg; + wtp_t *pThis; BEGINfunc + ISOBJ_TYPE_assert(pWti, wti); + pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); + + // TODO: the mutex_lock is dangerous, if we are cancelled within some function + // that already has the mutex locked... + d_pthread_mutex_lock(&pThis->mut); pThis->iCurNumWrkThrd--; + wtiSetState(pWti, WRKTHRD_STOPPED); pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); + d_pthread_mutex_unlock(&pThis->mut); + + DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", + wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); ENDfunc } @@ -341,11 +352,11 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in { uchar *pszDbgHdr; uchar thrdName[32] = "rs:"; - DEFiRet; wti_t *pWti = (wti_t*) arg; wtp_t *pThis; sigset_t sigSet; + BEGINfunc ISOBJ_TYPE_assert(pWti, wti); pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); @@ -362,36 +373,9 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in } # endif - d_pthread_mutex_lock(&pThis->mut); - pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis); - - /* change to RUNNING state. We need to check if we actually should still run, - * because someone may have requested us to shut down even before we got a chance to do - * our init. That would be a bad race... -- rgerhards, 2008-01-16 - */ - wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */ - - do { - d_pthread_mutex_unlock(&pThis->mut); - - iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */ - - d_pthread_mutex_lock(&pThis->mut); - } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1); - /* inactivity guard prevents shutdown of all workers while one should be running due to race - * condition. It can lead to one more worker running than desired, but that is acceptable. After - * all, that worker will shutdown itself due to inactivity timeout. If, however, none were running - * when one was required, processing could come to a halt. -- rgerhards, 2008-01-21 - */ - - pthread_cleanup_pop(0); - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - - dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", - wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); - - d_pthread_mutex_unlock(&pThis->mut); + pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti); + wtiWorker(pWti); + pthread_cleanup_pop(1); ENDfunc pthread_exit(0); @@ -418,7 +402,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) /* find free spot in thread table. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { + if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) { break; } } @@ -427,7 +411,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) ABORT_FINALIZE(RS_RET_NO_MORE_THREADS); pWti = pThis->pWrkr[i]; - wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX); + wtiSetState(pWti, WRKTHRD_RUNNING); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti); @@ -435,9 +419,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); - /* indicate we just started a worker and would like to see it running */ - wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED); - finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; @@ -479,7 +460,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) } } else { if(nMaxWrkr > 0) { - dbgprintf("wtpAdviseMaxWorkers signals busy\n"); wtpWakeupWrkr(pThis); } } |