From 5c686c8adcc473cbdbb14e4b2d736f9123210ee6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 24 Jan 2008 17:55:09 +0000 Subject: redesigned queue to utilize helper classes for threading support. This is finally in a running state for regular (non disk-assisted) queues, with a minor nit at shutdown. So I can finally commit the work again to CVS... --- wtp.c | 312 ++++++++++++++++++++++++++++++++++++------------------------------ 1 file changed, 169 insertions(+), 143 deletions(-) (limited to 'wtp.c') diff --git a/wtp.c b/wtp.c index 4bc0cd4f..4c3ea921 100644 --- a/wtp.c +++ b/wtp.c @@ -71,52 +71,59 @@ wtpGetDbgHdr(wtp_t *pThis) /* Not implemented dummy function for constructor */ -static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } +static rsRetVal NotImplementedDummy() { return RS_RET_OK; } /* Standard-Constructor for the wtp object */ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ - int i; - uchar pszBuf[64]; - size_t lenBuf; - wti_t *pWti; - pthread_mutex_init(&pThis->mut, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; - pThis->pfOnShutdownAdvise = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; pThis->pfOnWorkerCancel = NotImplementedDummy; + pThis->pfOnWorkerStartup = NotImplementedDummy; + pThis->pfOnWorkerShutdown = NotImplementedDummy; +ENDobjConstruct(wtp) + + +/* Construction finalizer + * rgerhards, 2008-01-17 + */ +rsRetVal +wtpConstructFinalize(wtp_t *pThis) +{ + DEFiRet; + int i; + uchar pszBuf[64]; + size_t lenBuf; + wti_t *pWti; - /* alloc and construct workers */ + ISOBJ_TYPE_assert(pThis, wtp); + + dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + /* alloc and construct workers - this can only be done in finalizer as we previously do + * not know the max number of workers + */ +RUNLOG; if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { +RUNLOG_VAR("%d", i); +RUNLOG_VAR("%p", pThis->pWrkr[i]); CHKiRet(wtiConstruct(&pThis->pWrkr[i])); pWti = pThis->pWrkr[i]; lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s/w%d", wtpGetDbgHdr(pThis), i); CHKiRet(wtiSetDbgHdr(pWti, pszBuf, lenBuf)); + CHKiRet(wtiSetpWtp(pWti, pThis)); CHKiRet(wtiConstructFinalize(pWti)); } -finalize_it: -ENDobjConstruct(wtp) - -/* Construction finalizer - * rgerhards, 2008-01-17 - */ -rsRetVal -wtpConstructFinalize(wtp_t __attribute__((unused)) *pThis) -{ - ISOBJ_TYPE_assert(pThis, wtp); - - dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); - - return RS_RET_OK; +finalize_it: + RETiRet; } @@ -129,6 +136,8 @@ wtpDestruct(wtp_t **ppThis) int iCancelStateSave; int i; +dbgPrintAllDebugInfo(); +RUNLOG; assert(ppThis != NULL); pThis = *ppThis; ISOBJ_TYPE_assert(pThis, wtp); @@ -161,17 +170,31 @@ wtpDestruct(wtp_t **ppThis) } -/* wake up all worker threads. Param bWithDAWrk tells if the DA worker - * is to be awaken, too. It needs special handling because it waits on - * two different conditions depending on processing state. +/* wake up at least one worker thread. + * rgerhards, 2008-01-20 + */ +rsRetVal +wtpWakeupWrkr(wtp_t *pThis) +{ + DEFiRet; + + // TODO; mutex? + ISOBJ_TYPE_assert(pThis, wtp); +dbgprintf("wtpWakeupWrkr 1, cond %p\n", pThis->pcondBusy); + pthread_cond_signal(pThis->pcondBusy); +dbgprintf("wtpWakeupWrkr 2\n"); + RETiRet; +} +/* wake up all worker threads. * rgerhards, 2008-01-16 */ -static inline rsRetVal -wtpWakeupWrks(wtp_t *pThis) +rsRetVal +wtpWakeupAllWrkr(wtp_t *pThis) { DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); + // TODO; mutex? pthread_cond_broadcast(pThis->pcondBusy); RETiRet; } @@ -189,8 +212,10 @@ wtpProcessThrdChanges(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); + RUNLOG; if(pThis->bThrdStateChanged == 0) FINALIZE; + RUNLOG; /* go through all threads */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { @@ -198,6 +223,7 @@ wtpProcessThrdChanges(wtp_t *pThis) } finalize_it: + RUNLOG; RETiRet; } @@ -218,39 +244,68 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState) } -#if 0 +/* check if the worker shall shutdown (1 = yes, 0 = no) + * TODO: check if we can use atomic operations to enhance performance + * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" + * (e.g. the queue clas) + * rgerhards, 2008-01-21 + */ +rsRetVal +wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) +{ + DEFiRet; + DEFVARS_mutexProtection; + + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) + || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) + iRet = RS_RET_TERMINATE_NOW; + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + + /* try customer handler if one was set and we do not yet have a definite result */ + if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) { + iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); + } + + RETiRet; +} + + /* Send a shutdown command to all workers and see if they terminate. * A timeout may be specified. * rgerhards, 2008-01-14 */ -static rsRetVal -wtpWrkrShutdown(wtp_t *pThis) +rsRetVal +wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout) { DEFiRet; int bTimedOut; struct timespec t; int iCancelStateSave; - // TODO: implement - ISOBJ_TYPE_assert(pThis); - queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ - /* race: must make sure all are running! */ - queueTimeoutComp(&t, iTimeout);/* get timeout */ +dbgPrintAllDebugInfo(); +RUNLOG_VAR("%p", pThis); +RUNLOG_VAR("%ld", iTimeout); +RUNLOG_VAR("%d", tShutdownCmd); + ISOBJ_TYPE_assert(pThis, wtp); + + wtpSetState(pThis, tShutdownCmd); + wtpWakeupAllWrkr(pThis); + timeoutComp(&t, iTimeout);/* get timeout */ /* and wait for their termination */ -dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt); +dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); + d_pthread_mutex_lock(&pThis->mut); + pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", - queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); + dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", + wtpGetDbgHdr(pThis), iTimeout, pThis->iCurNumWrkThrd); - if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) { - dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis)); + if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, &t) != 0) { + dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } } @@ -259,122 +314,61 @@ dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt); if(bTimedOut) iRet = RS_RET_TIMED_OUT; +dbgprintf("wtpShutdownAll exit"); RETiRet; } -/* Unconditionally cancel all running worker threads. - * rgerhards, 2008-01-14 +/* indicate that a thread has terminated and awake anyone waiting on it + * rgerhards, 2008-01-23 */ -static rsRetVal -wtpWrkrCancel(wtp_t *pThis) +rsRetVal wtpSignalWrkrTermination(wtp_t *pThis) { DEFiRet; - int i; - // TODO: we need to implement peek(), without it (today!) we lose one message upon - // worker cancellation! -- rgerhards, 2008-01-14 - - ISOB_TYPE_assert(pThis); - /* process any pending thread requests so that we know who actually is still running */ - queueChkWrkThrdChanges(pThis); - - /* awake the workers one more time, just to be sure */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ + //TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection; - /* first tell the workers our request */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) { - dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); - pthread_cancel(pThis->pWrkThrds[i].thrdID); - } - } + ISOBJ_TYPE_assert(pThis, wtp); + //BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); +dbgprintf("signaling thread termination, cond %p\n", &pThis->condThrdTrm); + pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ + //END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } -/* Worker thread management function carried out when the main - * worker is about to terminate. +/* Unconditionally cancel all running worker threads. + * rgerhards, 2008-01-14 */ -static rsRetVal -wtpShutdownWorkers(wtp_t *pThis) +rsRetVal +wtpCancelAll(wtp_t *pThis) { DEFiRet; int i; + // TODO: we need to implement peek(), without it (today!) we lose one message upon + // worker cancellation! -- rgerhards, 2008-01-14 - assert(pThis != NULL); + ISOBJ_TYPE_assert(pThis, wtp); - dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis); + /* process any pending thread requests so that we know who actually is still running */ + wtpProcessThrdChanges(pThis); - ISOB_TYPE_assert(pThis); - /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This - * is necessary so that all threads get sent the termination command. With a timeout of 0, however, - * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as - * good. - */ - iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown); - if(iRet == RS_RET_TIMED_OUT) { - if(pThis->toQShutdown == 0) { - iRet = RS_RET_OK; - } else { - /* OK, we now need to try force the shutdown */ - dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", - queueGetID(pThis)); - iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); - } - } + /* awake the workers one more time, just to be sure */ + wtpWakeupAllWrkr(pThis); - if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ - /* still didn't work out - so we now need to cancel the workers */ - dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); - iRet = queueWrkThrdCancel(pThis); - } - - /* finally join the threads - * In case of a cancellation, this may actually take some time. This is also - * needed to clean up the thread descriptors, even with a regular termination. - * And, most importantly, this is needed if we have an indifitite termination - * time set (timeout == 0)! -- rgerhards, 2008-01-14 - */ + /* first tell the workers our request */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) { - queueJoinWrkThrd(pThis, i); + // TODO: mutex lock! + if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { + dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); + pthread_cancel(pThis->pWrkr[i]->thrdID); } } - dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", - queueGetID(pThis), pThis->iQueueSize); - RETiRet; } -#endif - - - -/* check if the worker shall shutdown - * TODO: check if we can use atomic operations to enhance performance - * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" - * (e.g. the queue clas) - * rgerhards, 2008-01-21 - */ -rsRetVal -wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) - || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) - iRet = RS_RET_TERMINATE_NOW; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - /* try customer handler if one was set and we do not yet have a definite result */ - if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) - iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); - - RETiRet; -} /* Set the Inactivity Guard * rgerhards, 2008-01-21 @@ -385,9 +379,13 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) DEFiRet; DEFVARS_mutexProtection; +RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); +RUNLOG; pThis->bInactivityGuard = bNewState; +RUNLOG; END_MTX_PROTECTED_OPERATIONS(&pThis->mut); +RUNLOG; RETiRet; } @@ -404,6 +402,7 @@ wtpWrkrExecCancelCleanup(void *arg) ISOBJ_TYPE_assert(pThis, wtp); pThis->iCurNumWrkThrd--; + wtpSignalWrkrTermination(pThis); dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); @@ -415,7 +414,7 @@ wtpWrkrExecCancelCleanup(void *arg) * rgerhards, 2008-01-21 */ static void * -wtpWorker(void *arg) +wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */ { DEFiRet; DEFVARS_mutexProtection; @@ -442,7 +441,7 @@ wtpWorker(void *arg) * our init. That would be a bad race... -- rgerhards, 2008-01-16 */ //if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT) - wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */ + wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */ do { END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -459,12 +458,14 @@ wtpWorker(void *arg) pthread_cleanup_pop(0); pThis->iCurNumWrkThrd--; + wtpSignalWrkrTermination(pThis); dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + ENDfunc pthread_exit(0); } @@ -492,7 +493,9 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { // TODO: sync! - if(pThis->pWrkr[i]->tCurrCmd == eWRKTHRD_STOPPED) { +RUNLOG; +dbgprintf("%s: i %d, wti_T* %p\n", wtpGetDbgHdr(pThis), i, pThis->pWrkr[i]); + if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { break; } } @@ -502,11 +505,12 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis ABORT_FINALIZE(RS_RET_NO_MORE_THREADS); pWti = pThis->pWrkr[i]; - wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX); // TODO: thuink about mutex lock + wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX); iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti); dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); +RUNLOG; /* we try to give the starting worker a little boost. It won't help much as we still * hold the queue's mutex, but at least it has a chance to start on a single-CPU system. */ @@ -517,6 +521,7 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); +RUNLOG; RETiRet; } @@ -524,7 +529,9 @@ finalize_it: /* set the number of worker threads that should be running. If less than currently running, * a new worker may be started. Please note that there is no guarantee the number of workers * said will be running after we exit this function. It is just a hint. If the number is - * higher than one, the "busy" condition is also signaled to awake a worker. + * higher than one, and no worker is started, the "busy" condition is signaled to awake a worker. + * So the caller can assume that there is at least one worker re-checking if there is "work to do" + * after this function call. * rgerhards, 2008-01-21 */ rsRetVal @@ -535,8 +542,10 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) int nMissing; /* number workers missing to run */ int i; + if(pThis == NULL) dbgPrintAllDebugInfo(); ISOBJ_TYPE_assert(pThis, wtp); +dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetDbgHdr(pThis), nMaxWrkr, pThis->iCurNumWrkThrd, pThis->iNumWorkerThreads); if(nMaxWrkr == 0) FINALIZE; @@ -548,15 +557,20 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) else if(nMissing < 0) nMissing = 0; - dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); - /* start the rqtd nbr of workers */ - for(i = 0 ; i < nMissing ; ++i) { - CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); + if(nMissing > 0) { + dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); + /* start the rqtd nbr of workers */ + for(i = 0 ; i < nMissing ; ++i) { + CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); + } + } else { +dbgprintf("wtpAdviseMaxWorkers signals busy\n"); + wtpWakeupWrkr(pThis); } - - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + finalize_it: + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -564,6 +578,17 @@ finalize_it: /* some simple object access methods */ DEFpropSetMeth(wtp, toWrkShutdown, long); DEFpropSetMeth(wtp, wtpState, wtpState_t); +DEFpropSetMeth(wtp, iNumWorkerThreads, int); +DEFpropSetMeth(wtp, pUsr, void*); +DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t); +DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t); +DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); +DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); +DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); +DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); +DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)); +DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)); +DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)); /* set the debug header message @@ -572,11 +597,12 @@ DEFpropSetMeth(wtp, wtpState, wtpState_t); * rgerhards, 2008-01-09 */ rsRetVal -wtpSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg) +wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg) { DEFiRet; - ISOBJ_TYPE_assert(pThis, wti); +dbgprintf("objID: %d\n", pThis->pObjInfo->objID); + ISOBJ_TYPE_assert(pThis, wtp); assert(pszMsg != NULL); if(lenMsg < 1) -- cgit