From 87f0e9b5f91407418a43a06f39831febfbd4e3ad Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 Jan 2008 19:25:46 +0000 Subject: disk-assisted queue mode finally begins to look good ;) --- wti.c | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 10 deletions(-) (limited to 'wti.c') diff --git a/wti.c b/wti.c index 2e1dd548..3f60afb2 100644 --- a/wti.c +++ b/wti.c @@ -91,7 +91,6 @@ wtiGetState(wti_t *pThis, int bLockMutex) } - /* send a command to a specific thread * bActiveOnly specifies if the command should be sent only when the worker is * in an active state. -- rgerhards, 2008-01-20 @@ -107,7 +106,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); -RUNLOG_VAR("%d", bActiveOnly); /* all worker states must be followed sequentially, only termination can be set in any state */ if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED)) || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { @@ -116,14 +114,13 @@ RUNLOG_VAR("%d", bActiveOnly); } else { dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd); switch(tCmd) { - case eWRKTHRD_RUN_CREATED: - break; case eWRKTHRD_TERMINATING: /* TODO: re-enable meaningful debug msg! (via function callback?) dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n", wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize, pThis->pQueue->iCurNumWrkThrd); */ + pthread_cond_signal(&pThis->condExitDone); dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis)); break; case eWRKTHRD_RUNNING: @@ -131,6 +128,7 @@ RUNLOG_VAR("%d", bActiveOnly); break; /* these cases just to satisfy the compiler, we do (yet) not act an them: */ case eWRKTHRD_STOPPED: + case eWRKTHRD_RUN_CREATED: case eWRKTHRD_RUN_INIT: case eWRKTHRD_SHUTDOWN: case eWRKTHRD_SHUTDOWN_IMMEDIATE: @@ -145,6 +143,39 @@ RUNLOG_VAR("%d", bActiveOnly); } +#if 0 +/* check if the worker shall shutdown (1 = yes, 0 = no) + * TODO: check if we can use atomic operations to enhance performance + * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" + * (e.g. the queue clas) + * rgerhards, 2008-01-24 + * TODO: we can optimize this via function pointers, as the code is only called during + * termination. So we can call the function via ptr in wtiWorker () and change that pointer + * to this function here upon shutdown. + */ +static inline rsRetVal +wtiChkStopWrkr(wti_t *pThis, wtp_t *pWtp, int bLockMutex, int bLockUsrMutex) +{ + DEFiRet; + DEFVARS_mutexProtection; + + ISOBJ_TYPE_assert(pThis, wti); + + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut); + if(pThis->bShutdownRqtd) { + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + iRet = RS_RET_TERMINATE_NOW; + } else { + /* regular case */ + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + iRet = wtpChkStopWrkr(pWtp, bLockMutex, bLockUsrMutex); + } + + RETiRet; +} +#endif + + /* Destructor */ rsRetVal wtiDestruct(wti_t **ppThis) { @@ -160,17 +191,28 @@ rsRetVal wtiDestruct(wti_t **ppThis) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); /* if we reach this point, we must make sure the associated worker has terminated. It is - * the callers duty to make sure the worker has already terminated. + * the callers duty to make sure the worker already knows it shall terminate. * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25 */ wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */ d_pthread_mutex_lock(&pThis->mut); - assert(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) <= eWRKTHRD_TERMINATING); // I knew it smelled bad... +RUNLOG_VAR("%d", pThis->tCurrCmd); + if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { + dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n", + wtiGetDbgHdr(pThis), pThis); + /* let's hope the caller actually instructed it to shutdown... */ + pthread_cond_wait(&pThis->condExitDone, &pThis->mut); +RUNLOG; + wtiJoinThrd(pThis); +RUNLOG; + } +RUNLOG; d_pthread_mutex_unlock(&pThis->mut); /* actual destruction */ pthread_cond_destroy(&pThis->condInitDone); + pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); if(pThis->pszDbgHdr != NULL) @@ -191,6 +233,7 @@ rsRetVal wtiDestruct(wti_t **ppThis) */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ pthread_cond_init(&pThis->condInitDone, NULL); + pthread_cond_init(&pThis->condExitDone, NULL); pthread_mutex_init(&pThis->mut, NULL); ENDobjConstruct(wti) @@ -228,7 +271,7 @@ wtiJoinThrd(wti_t *pThis) pthread_join(pThis->thrdID, NULL); RUNLOG; wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */ -RUNLOG; +RUNLOG_VAR("%p", pThis->thrdID); pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis)); @@ -339,7 +382,9 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); pWtp->pfOnWorkerStartup(pWtp->pUsr); + END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -354,6 +399,7 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { + //|| wtiChkStopWrkr(pThis, pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); break; /* end worker thread run */ } @@ -393,14 +439,13 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), // " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize); } - pWtp->pfOnWorkerShutdown(pWtp->pUsr); - /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis)); d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ + pWtp->pfOnWorkerShutdown(pWtp->pUsr); + // TODO: I think we no longer need that - but check! #if 0 /* if we ever need finalize_it, here would be the place for it! */ -- cgit