diff options
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 125 |
1 files changed, 24 insertions, 101 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index c536e545..b6a09c65 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -76,74 +76,32 @@ wtiGetDbgHdr(wti_t *pThis) } -/* get the current worker state. For simplicity and speed, we have - * NOT used our regular calling interface this time. I hope that won't - * bite in the long term... -- rgerhards, 2008-01-17 - * TODO: may be performance optimized by atomic operations +/* return the current worker processing state. For the sake of + * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17 */ -qWrkCmd_t -wtiGetState(wti_t *pThis, int bLockMutex) +bool +wtiGetState(wti_t *pThis) { - DEFVARS_mutexProtection; - qWrkCmd_t tCmd; - - BEGINfunc - ISOBJ_TYPE_assert(pThis, wti); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - tCmd = pThis->tCurrCmd; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - - ENDfunc - return tCmd; + return pThis->bIsRunning; } -/* send a command to a specific thread - * rgerhards, 2008-01-20 +/* Set status (thread is running or not), actually an property of + * use for wtp, but we need to have it per thread instance (thus it + * is inside wti). -- rgerhards, 2009-07-17 */ rsRetVal -wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bLockMutex) +wtiSetState(wti_t *pThis, bool bNewVal) { - DEFiRet; - qWrkCmd_t tCurrCmd; - DEFVARS_mutexProtection; - ISOBJ_TYPE_assert(pThis, wti); - assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - - tCurrCmd = pThis->tCurrCmd; - /* all worker states must be followed sequentially, only termination can be set in any state */ - if(tCurrCmd > tCmd && !(tCmd == eWRKTHRD_STOPPED)) { - DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n", - wtiGetDbgHdr(pThis), tCmd, tCurrCmd); - } else { - DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd); - /* we could replace this with a simple if, but we leave the switch in in case we need - * to add something at a later stage. -- rgerhards, 2008-09-30 - */ - if(tCmd == eWRKTHRD_STOPPED) { - dbgprintf("%s: worker almost stopped, assuming it has\n", wtiGetDbgHdr(pThis)); - pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - } - /* apply the new state */ -dbgprintf("worker terminator will write stateval %d\n", tCmd); - unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd); - if(val != tCurrCmd) { - DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd); - } - } - - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - RETiRet; + pThis->bIsRunning = bNewVal; + return RS_RET_OK; } -/* Cancel the thread. If the thread is already cancelled or terminated, - * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in - * such situations. +/* Cancel the thread. If the thread is not running. But it is save and legal to + * call wtiCancelThrd() in such situations. + * IMPORTANT: WTP mutex must be locked while this function is called! * rgerhards, 2008-02-26 */ rsRetVal @@ -153,17 +111,11 @@ wtiCancelThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - d_pthread_mutex_lock(&pThis->mut); - - if(pThis->tCurrCmd != eWRKTHRD_STOPPED) { - dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd); + if(pThis->bIsRunning) { + dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); - /* TODO: check: the following check should automatically be done by cancel cleanup handler! 2009-07-08 rgerhards */ - wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED); } - d_pthread_mutex_unlock(&pThis->mut); - RETiRet; } @@ -171,14 +123,7 @@ wtiCancelThrd(wti_t *pThis) /* Destructor */ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(wti) - if(Debug && wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { - dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - ignoring\n", - wtiGetDbgHdr(pThis), pThis); - } - /* actual destruction */ - pthread_mutex_destroy(&pThis->mut); - free(pThis->batch.pElem); free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -187,7 +132,6 @@ ENDobjDestruct(wti) /* Standard-Constructor for the wti object */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ - pthread_mutex_init(&pThis->mut, NULL); ENDobjConstruct(wti) @@ -205,7 +149,7 @@ wtiConstructFinalize(wti_t *pThis) dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor */ - pThis->tCurrCmd = eWRKTHRD_STOPPED; + pThis->bIsRunning = FALSE; /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); @@ -238,17 +182,13 @@ wtiWorkerCancelCleanup(void *arg) /* call user supplied handler (that one e.g. requeues the element) */ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); - d_pthread_mutex_lock(&pWtp->mut); - wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED); - /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ - d_pthread_mutex_unlock(&pWtp->mut); ENDfunc } /* wait for queue to become non-empty or timeout - * helper to wtiWorker - * IMPORTANT: mutex must be locked when this code is called! + * helper to wtiWorker. Note the the predicate is + * re-tested by the caller, so it is OK to NOT do it here. * rgerhards, 2009-05-20 */ static inline void @@ -258,8 +198,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) BEGINfunc DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); + pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); + d_pthread_mutex_lock(pWtp->pmutUsr); if(pWtp->toWrkShutdown == -1) { /* never shut down any started worker */ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); @@ -270,6 +212,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } + d_pthread_mutex_unlock(pWtp->pmutUsr); ENDfunc } @@ -284,7 +227,6 @@ wtiWorker(wti_t *pThis) int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; - bool bMutexIsLocked; int iCancelStateSave; DEFiRet; @@ -305,9 +247,7 @@ wtiWorker(wti_t *pThis) pWtp->pfRateLimiter(pWtp->pUsr); } - wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ d_pthread_mutex_lock(pWtp->pmutUsr); - bMutexIsLocked = TRUE; /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED); @@ -316,46 +256,29 @@ wtiWorker(wti_t *pThis) localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", localRet); + d_pthread_mutex_unlock(pWtp->pmutUsr); break; } /* try to execute and process whatever we have */ /* This function must and does RELEASE the MUTEX! */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); - bMutexIsLocked = FALSE; if(localRet == RS_RET_IDLE) { - if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) { + if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { break; /* end of loop */ } - - if(bInactivityTOOccured) { - /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */ - break; /* end worker thread run */ - } - d_pthread_mutex_lock(pWtp->pmutUsr); doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } bInactivityTOOccured = 0; /* reset for next run */ } - /* if we exit the loop, the mutex may be locked and, if so, must be unlocked */ - if(bMutexIsLocked) { - d_pthread_mutex_unlock(pWtp->pmutUsr); - } - /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ - pWtp->pfOnWorkerShutdown(pWtp->pUsr); - - wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED); - d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; |