diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-08 18:12:05 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-08 18:12:05 +0200 |
commit | 4cfbc7254a49503a3c980a1b20264915d6b3b807 (patch) | |
tree | 29c6fb1789588760b8c68b48ba2617b90262ac52 /runtime | |
parent | 9f286c0c4c21128c66305166ae379d3f7b07f673 (diff) | |
download | rsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.tar.gz rsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.tar.xz rsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.zip |
simplified worker thread handling
based on now working with detached threads. This is probably the biggest
patch in this series and with large bug potential.
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/wti.c | 114 | ||||
-rw-r--r-- | runtime/wti.h | 2 | ||||
-rw-r--r-- | runtime/wtp.c | 68 | ||||
-rw-r--r-- | runtime/wtp.h | 2 |
4 files changed, 15 insertions, 171 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index 93c66028..f6a32c34 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -118,7 +118,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) tCurrCmd = pThis->tCurrCmd; /* all worker states must be followed sequentially, only termination can be set in any state */ if( (bActiveOnly && (tCurrCmd < eWRKTHRD_RUN_CREATED)) - || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { + || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_STOPPED))) { DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n", wtiGetDbgHdr(pThis), tCmd, tCurrCmd); } else { @@ -126,25 +126,9 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) /* we could replace this with a simple if, but we leave the switch in in case we need * to add something at a later stage. -- rgerhards, 2008-09-30 */ - switch(tCmd) { - case eWRKTHRD_TERMINATING: - /* TODO: re-enable meaningful debug msg! (via function callback?) - dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n", - wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize, - pThis->pQueue->iCurNumWrkThrd); - */ - pthread_cond_signal(&pThis->condExitDone); - dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis)); - break; - /* these cases just to satisfy the compiler, we do (yet) not act an them: */ - case eWRKTHRD_RUNNING: - case eWRKTHRD_STOPPED: - case eWRKTHRD_RUN_CREATED: - case eWRKTHRD_RUN_INIT: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; + if(tCmd == eWRKTHRD_STOPPED) { + dbgprintf("%s: worker almost stopped, assuming it has\n", wtiGetDbgHdr(pThis)); + pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ } /* apply the new state */ dbgprintf("worker terminator will write stateval %d\n", tCmd); @@ -152,7 +136,6 @@ dbgprintf("worker terminator will write stateval %d\n", tCmd); if(val != tCurrCmd) { DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd); } -//dbgprintf("worker terminator has written stateval %d\n", tCmd); } END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -174,12 +157,11 @@ wtiCancelThrd(wti_t *pThis) d_pthread_mutex_lock(&pThis->mut); - wtiProcessThrdChanges(pThis, MUTEX_ALREADY_LOCKED); /* process state change, so that we have current state vars */ - - if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) { + if(pThis->tCurrCmd != eWRKTHRD_STOPPED) { dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd); pthread_cancel(pThis->thrdID); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); + /* TODO: check: the following check should automatically be done by cancel cleanup handler! 2009-07-08 rgerhards */ + wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ } @@ -192,24 +174,12 @@ wtiCancelThrd(wti_t *pThis) /* Destructor */ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(wti) - /* if we reach this point, we must make sure the associated worker has terminated. It is - * the callers duty to make sure the worker already knows it shall terminate. - * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25 - */ - wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */ - - d_pthread_mutex_lock(&pThis->mut); - if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { - dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n", + if(Debug && wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { + dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - ignoring\n", wtiGetDbgHdr(pThis), pThis); - /* let's hope the caller actually instructed it to shutdown... */ - pthread_cond_wait(&pThis->condExitDone, &pThis->mut); - wtiJoinThrd(pThis); } - d_pthread_mutex_unlock(&pThis->mut); /* actual destruction */ - pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); free(pThis->batch.pElem); @@ -220,7 +190,6 @@ ENDobjDestruct(wti) /* Standard-Constructor for the wti object */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ - pthread_cond_init(&pThis->condExitDone, NULL); pthread_mutex_init(&pThis->mut, NULL); ENDobjConstruct(wti) @@ -250,66 +219,6 @@ finalize_it: } -/* join a specific worker thread - * we do not lock the mutex, because join will sync anyways... - */ -rsRetVal -wtiJoinThrd(wti_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wti); - dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); - if (pThis->thrdID == 0) { - dbgprintf("worker %s was already stopped\n", wtiGetDbgHdr(pThis)); - } else { - //pthread_join(pThis->thrdID, NULL); - wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */ - pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis)); - } - - RETiRet; -} - -/* check if we had a worker thread changes and, if so, act - * on it. At a minimum, terminated threads are harvested (joined). - */ -rsRetVal -wtiProcessThrdChanges(wti_t *pThis, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, wti); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - switch(pThis->tCurrCmd) { - case eWRKTHRD_TERMINATING: - /* we need to at least temporarily release the mutex, because otherwise - * we may deadlock with the thread we intend to join (it aquires the mutex - * during termination processing). -- rgerhards, 2008-02-26 - */ - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - iRet = wtiJoinThrd(pThis); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - break; - /* these cases just to satisfy the compiler, we do not act an them: */ - case eWRKTHRD_STOPPED: - case eWRKTHRD_RUN_CREATED: - case eWRKTHRD_RUN_INIT: - case eWRKTHRD_RUNNING: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; - } - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - - RETiRet; -} - - /* cancellation cleanup handler for queueWorker () * Updates admin structure and frees ressources. * rgerhards, 2008-01-16 @@ -333,7 +242,7 @@ wtiWorkerCancelCleanup(void *arg) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); + wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ @@ -399,7 +308,6 @@ wtiWorker(wti_t *pThis) /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ /* process any pending thread requests */ - // wtpProcessThrdChanges(pWtp); if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); @@ -455,7 +363,7 @@ wtiWorker(wti_t *pThis) RUNLOG_STR("XXX: Worker shutdown"); pWtp->pfOnWorkerShutdown(pWtp->pUsr); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); + wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/wti.h b/runtime/wti.h index 2acd3cf6..5b7e5baf 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -36,7 +36,7 @@ struct wti_s { pthread_t thrdID; /* thread ID */ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ - pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */ + //pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */ pthread_mutex_t mut; batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ bool bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 59553984..b27ce9f0 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -142,8 +142,6 @@ finalize_it: BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(wtp) - wtpProcessThrdChanges(pThis); /* process thread changes one last time */ - /* destruct workers */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) wtiDestruct(&pThis->pWrkr[i]); @@ -190,51 +188,6 @@ wtpWakeupAllWrkr(wtp_t *pThis) } -/* check if we had any worker thread changes and, if so, act - * on them. At a minimum, terminated threads are harvested (joined). - * This function MUST NEVER block on the queue mutex! - */ -rsRetVal -wtpProcessThrdChanges(wtp_t *pThis) -{ - DEFiRet; - int i; - - ISOBJ_TYPE_assert(pThis, wtp); - - if(pThis->bThrdStateChanged == 0) - FINALIZE; - - if(d_pthread_mutex_trylock(&(pThis->mutThrdShutdwn)) != 0) { - /* another thread is already in the loop */ - FINALIZE; - } - - /* Note: there is a left-over potential race condition below: - * pThis->bThrdStateChanged may be re-set by another thread while - * we work on it and thus the loop may terminate too early. However, - * there are no really bad effects from that so I perfer - for this - * version - to live with the problem as is. Not a good idea to - * introduce that large change into the stable branch without very - * good reason. -- rgerhards, 2009-04-02 - */ - do { - /* reset the change marker */ - ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged); - /* go through all threads */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX); - } - /* restart if another change occured while we were processing the changes */ - } while(pThis->bThrdStateChanged != 0); - - d_pthread_mutex_unlock(&(pThis->mutThrdShutdwn)); - -finalize_it: - RETiRet; -} - - /* Sent a specific state for the worker thread pool. * rgerhards, 2008-01-21 */ @@ -299,12 +252,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout wtpSetState(pThis, tShutdownCmd); wtpWakeupAllWrkr(pThis); - /* see if we need to harvest (join) any terminated threads (even in timeout case, - * some may have terminated... - */ - wtpProcessThrdChanges(pThis); - - /* and wait for their termination */ + /* wait for worker thread termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); @@ -324,11 +272,6 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout if(bTimedOut) iRet = RS_RET_TIMED_OUT; - /* see if we need to harvest (join) any terminated threads (even in timeout case, - * some may have terminated... - */ - wtpProcessThrdChanges(pThis); - RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" @@ -369,9 +312,6 @@ wtpCancelAll(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - /* process any pending thread requests so that we know who actually is still running */ - wtpProcessThrdChanges(pThis); - /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); @@ -502,15 +442,11 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) ISOBJ_TYPE_assert(pThis, wtp); - wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); pThis->iCurNumWrkThrd++; - /* find free spot in thread table. If we find at least one worker that is in initialization, - * we do NOT start a new one. Let's give the other one a chance, first. - */ + /* find free spot in thread table. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { break; diff --git a/runtime/wtp.h b/runtime/wtp.h index ef61d7e6..03bc538f 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -30,7 +30,7 @@ /* commands and states for worker threads. */ typedef enum { eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */ - eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */ + //eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */ /* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */ eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but not yet begun initialization (prob. not yet scheduled) */ eWRKTHRD_RUN_INIT = 3, /* worker thread is initializing, but not yet fully running */ |