From c701cbaaeb9202887a97a6b2de60852713d1785e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 16 Jan 2008 17:27:16 +0000 Subject: some cleanup and fixes --- queue.c | 45 +++++++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 26 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index acfdefb3..4291c509 100644 --- a/queue.c +++ b/queue.c @@ -112,6 +112,7 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx) pThis->pWrkThrds[iIdx].tCurrCmd); pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRD_STOPPED; /* back to virgin... */ + pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx, pThis->pWrkThrds[iIdx].tCurrCmd); @@ -133,7 +134,7 @@ queueStrtWrkThrd(queue_t *pThis, int i) queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT); iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); - dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", + dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n", (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); return iRet; @@ -158,15 +159,17 @@ queueStrtNewWrkThrd(queue_t *pThis) * we do NOT start a new one. Let's give the other one a chance, first. */ iStartingUp = -1; - for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) + for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { +dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd); if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) { break; } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_INIT) { iStartingUp = i; break; } + } -dbgprintf("after thrd search: i %d, iStartingUp %d\n", i, iStartingUp); +dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStartingUp); if(iStartingUp > -1) ABORT_FINALIZE(RS_RET_ALREADY_STARTING); @@ -209,25 +212,6 @@ queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) } -/* This once was used to start all regular worker threads. Now, we have - * dynamic grow of the worker thread pool, based on needs. This function is - * still preserved, but it now does not start all but only worker 1, which - * is always present. - * rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueStrtAllWrkThrds(queue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, queue); - assert(pThis->pWrkThrds[1].tCurrCmd < eWRKTHRD_RUN_INIT); - //iRet = queueStrtWrkThrd(pThis, 1); - - return iRet; -} - - /* compute an absolute time timeout suitable for calls to pthread_cond_timedwait() * rgerhards, 2008-01-14 */ @@ -288,8 +272,8 @@ queueChkAndStrtWrk(queue_t *pThis) /* check if we need to start up another worker (only in regular mode) */ if(pThis->qRunsDA == QRUNS_REGULAR) { if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { -dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d\n", - pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd); +dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n", + pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); /* check if we satisfy the min nbr of messages per worker to start a new one */ if(pThis->iCurNumWrkThrd == 0 || pThis->iQueueSize / pThis->iCurNumWrkThrd > pThis->iMinMsgsPerWrkr) { @@ -328,7 +312,14 @@ queueTurnOffDAMode(queue_t *pThis) * keep that comment if future need arises. */ - queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */ + /* we start at least one worker thread. If no new messages come in, this will + * be the only one for the time being. I am not yet sure if that is acceptable. + * To solve that issue, queueWorker () would need to check if it needs to fire + * up addtl ones. I am not yet sure if that is justified. After all, if no new + * messages come into the queue, we may be well off with a single worker. + * rgerhards, 2008-01-16 + */ + queueStrtNewWrkThrd(pThis); pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, @@ -1332,6 +1323,8 @@ queueWorker(void *arg) for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx) if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()) break; +dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis, + (unsigned) pThis->pWrkThrds[iMyThrdIndx].thrdID, iMyThrdIndx, (unsigned) pthread_self()); assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); @@ -1586,7 +1579,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ queueTellWrkThrd(pThis, 0, eWRKTHRD_STOPPED); /* fire up the worker threads */ - queueStrtAllWrkThrds(pThis); + // TODO: preforked workers! queueStrtAllWrkThrds(pThis); } finalize_it: -- cgit