diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-08 18:12:05 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-08 18:12:05 +0200 |
commit | 4cfbc7254a49503a3c980a1b20264915d6b3b807 (patch) | |
tree | 29c6fb1789588760b8c68b48ba2617b90262ac52 /runtime/wti.c | |
parent | 9f286c0c4c21128c66305166ae379d3f7b07f673 (diff) | |
download | rsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.tar.gz rsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.tar.xz rsyslog-4cfbc7254a49503a3c980a1b20264915d6b3b807.zip |
simplified worker thread handling
based on now working with detached threads. This is probably the biggest
patch in this series and with large bug potential.
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 114 |
1 files changed, 11 insertions, 103 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index 93c66028..f6a32c34 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -118,7 +118,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) tCurrCmd = pThis->tCurrCmd; /* all worker states must be followed sequentially, only termination can be set in any state */ if( (bActiveOnly && (tCurrCmd < eWRKTHRD_RUN_CREATED)) - || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { + || (tCurrCmd > tCmd && !(tCmd == eWRKTHRD_STOPPED))) { DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n", wtiGetDbgHdr(pThis), tCmd, tCurrCmd); } else { @@ -126,25 +126,9 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) /* we could replace this with a simple if, but we leave the switch in in case we need * to add something at a later stage. -- rgerhards, 2008-09-30 */ - switch(tCmd) { - case eWRKTHRD_TERMINATING: - /* TODO: re-enable meaningful debug msg! (via function callback?) - dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n", - wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize, - pThis->pQueue->iCurNumWrkThrd); - */ - pthread_cond_signal(&pThis->condExitDone); - dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis)); - break; - /* these cases just to satisfy the compiler, we do (yet) not act an them: */ - case eWRKTHRD_RUNNING: - case eWRKTHRD_STOPPED: - case eWRKTHRD_RUN_CREATED: - case eWRKTHRD_RUN_INIT: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; + if(tCmd == eWRKTHRD_STOPPED) { + dbgprintf("%s: worker almost stopped, assuming it has\n", wtiGetDbgHdr(pThis)); + pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ } /* apply the new state */ dbgprintf("worker terminator will write stateval %d\n", tCmd); @@ -152,7 +136,6 @@ dbgprintf("worker terminator will write stateval %d\n", tCmd); if(val != tCurrCmd) { DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd); } -//dbgprintf("worker terminator has written stateval %d\n", tCmd); } END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -174,12 +157,11 @@ wtiCancelThrd(wti_t *pThis) d_pthread_mutex_lock(&pThis->mut); - wtiProcessThrdChanges(pThis, MUTEX_ALREADY_LOCKED); /* process state change, so that we have current state vars */ - - if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) { + if(pThis->tCurrCmd != eWRKTHRD_STOPPED) { dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd); pthread_cancel(pThis->thrdID); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); + /* TODO: check: the following check should automatically be done by cancel cleanup handler! 2009-07-08 rgerhards */ + wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); ATOMIC_STORE_1_TO_INT(pThis->pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ } @@ -192,24 +174,12 @@ wtiCancelThrd(wti_t *pThis) /* Destructor */ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(wti) - /* if we reach this point, we must make sure the associated worker has terminated. It is - * the callers duty to make sure the worker already knows it shall terminate. - * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25 - */ - wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */ - - d_pthread_mutex_lock(&pThis->mut); - if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { - dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n", + if(Debug && wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { + dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - ignoring\n", wtiGetDbgHdr(pThis), pThis); - /* let's hope the caller actually instructed it to shutdown... */ - pthread_cond_wait(&pThis->condExitDone, &pThis->mut); - wtiJoinThrd(pThis); } - d_pthread_mutex_unlock(&pThis->mut); /* actual destruction */ - pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); free(pThis->batch.pElem); @@ -220,7 +190,6 @@ ENDobjDestruct(wti) /* Standard-Constructor for the wti object */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ - pthread_cond_init(&pThis->condExitDone, NULL); pthread_mutex_init(&pThis->mut, NULL); ENDobjConstruct(wti) @@ -250,66 +219,6 @@ finalize_it: } -/* join a specific worker thread - * we do not lock the mutex, because join will sync anyways... - */ -rsRetVal -wtiJoinThrd(wti_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wti); - dbgprintf("waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); - if (pThis->thrdID == 0) { - dbgprintf("worker %s was already stopped\n", wtiGetDbgHdr(pThis)); - } else { - //pthread_join(pThis->thrdID, NULL); - wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */ - pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - dbgprintf("worker %s has stopped\n", wtiGetDbgHdr(pThis)); - } - - RETiRet; -} - -/* check if we had a worker thread changes and, if so, act - * on it. At a minimum, terminated threads are harvested (joined). - */ -rsRetVal -wtiProcessThrdChanges(wti_t *pThis, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, wti); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - switch(pThis->tCurrCmd) { - case eWRKTHRD_TERMINATING: - /* we need to at least temporarily release the mutex, because otherwise - * we may deadlock with the thread we intend to join (it aquires the mutex - * during termination processing). -- rgerhards, 2008-02-26 - */ - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - iRet = wtiJoinThrd(pThis); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - break; - /* these cases just to satisfy the compiler, we do not act an them: */ - case eWRKTHRD_STOPPED: - case eWRKTHRD_RUN_CREATED: - case eWRKTHRD_RUN_INIT: - case eWRKTHRD_RUNNING: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; - } - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - - RETiRet; -} - - /* cancellation cleanup handler for queueWorker () * Updates admin structure and frees ressources. * rgerhards, 2008-01-16 @@ -333,7 +242,7 @@ wtiWorkerCancelCleanup(void *arg) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); + wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ @@ -399,7 +308,6 @@ wtiWorker(wti_t *pThis) /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ /* process any pending thread requests */ - // wtpProcessThrdChanges(pWtp); if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); @@ -455,7 +363,7 @@ wtiWorker(wti_t *pThis) RUNLOG_STR("XXX: Worker shutdown"); pWtp->pfOnWorkerShutdown(pWtp->pUsr); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); + wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); ATOMIC_STORE_1_TO_INT(pWtp->bThrdStateChanged); /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); |