From eb4b1915d1655d801e0232f4196fbdc1af3c857f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 26 Feb 2008 17:49:26 +0000 Subject: worked on queue stability --- wti.c | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) (limited to 'wti.c') diff --git a/wti.c b/wti.c index db31f4dd..402b6bda 100644 --- a/wti.c +++ b/wti.c @@ -143,6 +143,33 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) } +/* Cancel the thread. If the thread is already cancelled or termination, + * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in + * such situations. + * rgerhards, 2008-02-26 + */ +rsRetVal +wtiCancelThrd(wti_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, wti); + + d_pthread_mutex_lock(&pThis->mut); + + if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) { + dbgoprint((obj_t*) pThis, "canceling worker thread\n"); + pthread_cancel(pThis->thrdID); + wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); + pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + } + + d_pthread_mutex_unlock(&pThis->mut); + + RETiRet; +} + + /* Destructor */ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(wti) @@ -233,7 +260,13 @@ wtiProcessThrdChanges(wti_t *pThis, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); switch(pThis->tCurrCmd) { case eWRKTHRD_TERMINATING: + /* we need to at least temporarily release the mutex, because otherwise + * we may deadlock with the thread we intend to join (it aquires the mutex + * during termination processing). -- rgerhards, 2008-02-26 + */ + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); iRet = wtiJoinThrd(pThis); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); break; /* these cases just to satisfy the compiler, we do not act an them: */ case eWRKTHRD_STOPPED: @@ -270,6 +303,7 @@ wtiWorkerCancelCleanup(void *arg) dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); /* call user supplied handler (that one e.g. requeues the element) */ +RUNLOG_VAR("%p", pThis->pUsrp); pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -371,9 +405,11 @@ wtiWorker(wti_t *pThis) /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +// TODO: this deadlocks with wtiProcessThrdChanges(), where it waits on the thread to join (on US TO JOIN!!!) d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ +RUNLOG_VAR("%p", pWtp->pUsr); pWtp->pfOnWorkerShutdown(pWtp->pUsr); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); -- cgit