From 8f5c0764aaafc9eab72d20761ecba6a27d321f89 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 15 Jan 2008 11:04:46 +0000 Subject: disk assisted queue works quite well, except for startup from disk queue --- queue.c | 232 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 204 insertions(+), 28 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 10a560d2..9bc39464 100644 --- a/queue.c +++ b/queue.c @@ -1,3 +1,4 @@ +// TODO: do an if(debug) in dbgrintf - performanc ein release build! // TODO: remove bIsDA? // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could @@ -71,6 +72,28 @@ queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) } +/* join a specific worker thread + */ +static inline rsRetVal +queueJoinWrkThrd(queue_t *pThis, int iIdx) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); + assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRDCMD_NEVER_RAN); + + dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx, + pThis->pWrkThrds[iIdx].tCurrCmd); + pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); + pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRDCMD_NEVER_RAN; /* back to virgin... */ + dbgprintf("Queue 0x%lx: thread %d state %d, exited\n", queueGetID(pThis), iIdx, + pThis->pWrkThrds[iIdx].tCurrCmd); + + return iRet; +} + + /* Starts a worker thread (on a specific index [i]!) */ static inline rsRetVal @@ -113,6 +136,23 @@ queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) return iRet; } +/* start all regular worker threads + * rgerhards, 2008-01-15 + */ +static inline rsRetVal +queueStrtAllWrkThrds(queue_t *pThis) +{ + DEFiRet; + int i; + + /* fire up the worker threads */ + for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { + queueStrtWrkThrd(pThis, i); + } + + return iRet; +} + /* compute an absolute time timeout suitable for calls to pthread_cond_timedwait() * rgerhards, 2008-01-14 @@ -137,6 +177,80 @@ queueTimeoutComp(struct timespec *pt, int iTimeout) /* --------------- code for disk-assisted (DA) queue modes -------------------- */ +/* Destruct DA queue. This is the last part of DA-to-normal-mode + * transistion. This is called asynchronously and some time quite a + * while after the actual transistion. The key point is that we need to + * do it at some later time, because we need to destruct the DA queue. That, + * however, can not be done in a thread that has been signalled + * This is to be called when we revert back to our own queue. + * rgerhards, 2008-01-15 + */ +static inline rsRetVal +queueTurnOffDAMode(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + assert(pThis->bRunsDA == 1); + + /* pull any data that we still need from the (child) disk queue... */ + pThis->pConsumer = pThis->pqDA->pConsumer; /* restore regular consumer */ + + queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */ + pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ + + /* note: a disk queue alsways has a single worker and it alwas has the ID 1 */ + queueTellWrkThrd(pThis->pqDA, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* tell the DA worker to terminate... */ + pthread_mutex_unlock(&pThis->mutDA); /* ... permit it to run ... */ + queueJoinWrkThrd(pThis->pqDA, 1); /* ... and wait for the shutdown to happen */ + queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */ + pThis->pqDA = NULL; + queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ + + dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", + queueGetID(pThis), iRet); + + return iRet; +} + +/* check if we had any worker thread changes and, if so, act + * on them. At a minimum, terminated threads are harvested (joined). + */ +static rsRetVal +queueChkWrkThrdChanges(queue_t *pThis) +{ + DEFiRet; + int i; + + ISOBJ_TYPE_assert(pThis, queue); + + if(pThis->bThrdStateChanged == 0) + FINALIZE; + + /* go through all threads (including DA thread) */ + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + switch(pThis->pWrkThrds[i].tCurrCmd) { + case eWRKTHRDCMD_TERMINATED: + queueJoinWrkThrd(pThis, i); + break; + case eWRKTHRDCMD_TURN_OFF_DA_MODE: + queueTurnOffDAMode(pThis); + break; + /* these cases just to satisfy the compiler, we do not act an them: */ + case eWRKTHRDCMD_NEVER_RAN: + case eWRKTHRDCMD_RUN: + case eWRKTHRDCMD_SHUTDOWN: + case eWRKTHRDCMD_SHUTDOWN_IMMEDIATE: + /* DO NOTHING */ + break; + } + } + +finalize_it: + return iRet; +} + + /* check if we run in disk-assisted mode and record that * setting for easy (and quick!) access in the future. This * function must only be called from constructors and only @@ -161,7 +275,6 @@ queueChkIsDA(queue_t *pThis) } - /* This is a special consumer to feed the disk-queue in disk-assited mode. * When active, our own queue more or less acts as a memory buffer to the disk. * So this consumer just needs to drain the memory queue and submit entries @@ -175,6 +288,7 @@ static inline rsRetVal queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) { DEFiRet; + int iCancelStateSave; ISOBJ_TYPE_assert(pThis, queue); ISOBJ_assert(pUsr); @@ -186,13 +300,27 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, if(pThis->iQueueSize == pThis->iLowWtrMrk) { dbgprintf("Queue 0x%lx: %d entries - passed low water mark in DA mode, sleeping\n", queueGetID(pThis), pThis->iQueueSize); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +dbgprintf("pre mutex lock (think about CLEANUP!)\n"); pthread_mutex_lock(&pThis->mutDA); dbgprintf("mutex locked (think about CLEANUP!)\n"); pthread_cond_wait(&pThis->condDA, &pThis->mutDA); dbgprintf("condition returned\n"); - pthread_mutex_unlock(&pThis->mutDA); + /* we are back. either we have passed the high water mark or the child disk queue + * is empty. We check for the later. If that is the case, we switch back to + * non-DA mode + */ + if(pThis->pqDA->iQueueSize == 0) { + dbgprintf("Queue 0x%lx: %d entries - disk assisted child queue signaled it is empty\n", + queueGetID(pThis), pThis->iQueueSize); + CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */ + } else { + pthread_mutex_unlock(&pThis->mutDA); + } dbgprintf("mutex unlocked (think about CLEANUP!)\n"); + pthread_setcancelstate(iCancelStateSave, NULL); } +dbgprintf("DAConsumer returns\n"); finalize_it: return iRet; @@ -206,28 +334,40 @@ static rsRetVal queueChkStrtDA(queue_t *pThis) { DEFiRet; + int iCancelStateSave; ISOBJ_TYPE_assert(pThis, queue); + if(pThis->bRunsDA) { + if(pThis->iQueueSize < pThis->iHighWtrMrk) + pThis->bWasBelowHighWtr = 1; + else if(pThis->iQueueSize == pThis->iHighWtrMrk && pThis->bWasBelowHighWtr) { + /* then we need to signal that we are at the high water mark again.*/ + dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", + queueGetID(pThis), pThis->iQueueSize); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(&pThis->mutDA); + pthread_cond_signal(&pThis->condDA); + pthread_mutex_unlock(&pThis->mutDA); + pthread_setcancelstate(iCancelStateSave, NULL); + queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ + } + /* we need to re-check if we run disk-assisted, because that status may have changed + * in our high water mark processing. + */ + if(pThis->bRunsDA) + FINALIZE; + } + + /* we run into this part if we are NOT currently running DA. + * TODO: split this function, I think that would make the code easier + * to read. -- rgerhards, 2008-10-15 + */ /* if we do not hit the high water mark, we have nothing to do */ if(pThis->iQueueSize != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); - if(pThis->bRunsDA) { - /* then we need to signal that we are at the high water mark again. The DA - * consumer shall check if it needs to restart. Note that we may pass the - * high water mark while we drain the queue. - * TODO: problem here? (condition signalled on drain...) - */ - dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", - queueGetID(pThis), pThis->iQueueSize); - pthread_mutex_lock(&pThis->mutDA); - pthread_cond_signal(&pThis->condDA); - pthread_mutex_unlock(&pThis->mutDA); - FINALIZE; - } - dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", queueGetID(pThis), pThis->iQueueSize); @@ -263,6 +403,7 @@ queueChkStrtDA(queue_t *pThis) * so we now need to change our consumer to utilize it. */ pThis->bRunsDA = 1; /* and that's all we need to do - the worker handles the rest ;) */ + pThis->bWasBelowHighWtr = 0;/* init to be sure */ /* now we must start our DA worker thread and shutdown all others */ CHKiRet(queueStrtWrkThrd(pThis, 0)); @@ -723,6 +864,9 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); +dbgprintf("queueWrkThrdTrm broadcasted notEmpty\n"); + if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */ + pthread_cond_broadcast(&pThis->condDA); /* get timeout */ queueTimeoutComp(&t, iTimeout); @@ -761,6 +905,8 @@ queueWrkThrdCancel(queue_t *pThis) /* awake the workers one more time, just to be sure */ pthread_cond_broadcast(pThis->notEmpty); + if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */ + pthread_cond_broadcast(&pThis->condDA); /* first tell the workers our request */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) @@ -816,8 +962,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) { - dbgprintf("Queue 0x%lx: joining worker thread %d\n", queueGetID(pThis), i); - pthread_join(pThis->pWrkThrds[i].thrdID, NULL); + queueJoinWrkThrd(pThis, i); } } @@ -872,6 +1017,7 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) } } +dbgprintf("CallConsumer returns\n"); return iRet; } @@ -889,6 +1035,10 @@ queueWorker(void *arg) sigset_t sigSet; int iMyThrdIndx; /* index for this thread in queue thread table */ int iCancelStateSave; + int bInitialEmpty = 1; /* if running as a DA child, we do NOT need to signal the parent + * on the first occasion we are empty (because that happens on every + * startup. This var keeps track of the state. + */ assert(pThis != NULL); @@ -915,17 +1065,33 @@ queueWorker(void *arg) || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); - while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { + while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); if(pThis->bSignalOnEmpty) { - /* we need to signal our parent queue that we are empty */ + if(bInitialEmpty == 1) { + /* ignore */ + bInitialEmpty = 0; + } else { + /* we need to signal our parent queue that we are empty */ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); - pthread_mutex_lock(pThis->mutSignalOnEmpty); - pthread_cond_signal(pThis->condSignalOnEmpty); - pthread_mutex_unlock(pThis->mutSignalOnEmpty); + pthread_mutex_lock(pThis->mutSignalOnEmpty); + pthread_cond_signal(pThis->condSignalOnEmpty); + pthread_mutex_unlock(pThis->mutSignalOnEmpty); +dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); + /* we now need to re-check if we still shall continue to + * run. This is important because the parent may have changed our + * state. So we simply go back to the begin of the loop. + */ + continue; + } } + /* If we arrive here, we have the regular case, where we can safely assume that + * iQueueSize and tCmd have not changed since the while(). + */ +dbgprintf("Queue %p/w%d: pre condwait ->notEmpty\n", pThis, iMyThrdIndx); pthread_cond_wait(pThis->notEmpty, pThis->mut); +dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); } if(pThis->iQueueSize > 0) { /* dequeue element (still protected from mutex) */ @@ -969,8 +1135,9 @@ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); * and would be very hard to debug. The yield() is a sure fix, its performance overhead * should be well accepted given the above facts. -- rgerhards, 2008-01-10 */ +dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", + queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); pthread_yield(); - if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); @@ -980,7 +1147,14 @@ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd--; - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; + if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN || + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN_IMMEDIATE) { + /* in shutdown case, we need to flag termination. All other commands + * have a meaning to the thread harvester, so we can not overwrite them + */ + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; + } + pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); @@ -1334,6 +1508,9 @@ queueEnqObj(queue_t *pThis, void *pUsr) pthread_mutex_lock(pThis->mut); } + /* process any pending thread requests */ + queueChkWrkThrdChanges(pThis); + /* first check if we can discard anything */ if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); @@ -1352,13 +1529,12 @@ queueEnqObj(queue_t *pThis, void *pUsr) /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); - + /* and finally (try to) enqueue what is left over */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); queueTimeoutComp(&t, pThis->toEnq); - if(pthread_cond_timedwait (pThis->notFull, - pThis->mut, &t) != 0) { + if(pthread_cond_timedwait (pThis->notFull, pThis->mut, &t) != 0) { dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis)); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); -- cgit