From 75a8f92d5001f555606b2ddb5de30acf689e2422 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 16 Jan 2008 16:40:11 +0000 Subject: implemented dynamic startup and shutdown of worker threads based on current activity --- queue.c | 324 +++++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 207 insertions(+), 117 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 00f6e63f..acfdefb3 100644 --- a/queue.c +++ b/queue.c @@ -1,3 +1,4 @@ +// TODO: "preforked" worker threads // TODO: do an if(debug) in dbgrintf - performanc ein release build! // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could @@ -53,6 +54,7 @@ DEFobjStaticHelpers rsRetVal queueChkPersist(queue_t *pThis); static void *queueWorker(void *arg); static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize); +static rsRetVal queueChkWrkThrdChanges(queue_t *pThis); /* methods */ @@ -67,7 +69,7 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) ISOBJ_TYPE_assert(pThis, queue); assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); - if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRDCMD_RUN) { + if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_INIT) { dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); pThis->pWrkThrds[iIdx].tCurrCmd = tCmd; } else { @@ -104,12 +106,12 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx) ISOBJ_TYPE_assert(pThis, queue); assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); - assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRDCMD_NEVER_RAN); + assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRD_STOPPED); dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx, pThis->pWrkThrds[iIdx].tCurrCmd); pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); - pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRDCMD_NEVER_RAN; /* back to virgin... */ + pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRD_STOPPED; /* back to virgin... */ dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx, pThis->pWrkThrds[iIdx].tCurrCmd); @@ -127,9 +129,9 @@ queueStrtWrkThrd(queue_t *pThis, int i) ISOBJ_TYPE_assert(pThis, queue); assert(i >= 0 && i <= pThis->iNumWorkerThreads); - assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRDCMD_RUN); + assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_INIT); - queueTellWrkThrd(pThis, i, eWRKTHRDCMD_RUN); + queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT); iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); @@ -138,37 +140,59 @@ queueStrtWrkThrd(queue_t *pThis, int i) } -/* send a command to all active worker threads. A start index can be - * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted - * mode and this start index take care of the special handling it needs to - * receive. -- rgerhards, 2008-01-16 +/* Starts a *new* worker thread. Function searches itself for a free index spot. It must only + * be called when we have less than max workers active. Pending wrkr thread requests MUST have + * been processed before calling this function. -- rgerhards, 2008-01-16 */ static inline rsRetVal -queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) +queueStrtNewWrkThrd(queue_t *pThis) { DEFiRet; int i; + int iStartingUp; + int iState; ISOBJ_TYPE_assert(pThis, queue); - assert(iStartIdx == 0 || iStartIdx == 1); - /* tell the workers our request */ - for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED) - queueTellActWrkThrd(pThis, i, tCmd); + /* find free spot in thread table. If we find at least one worker that is in initializiation, + * we do NOT start a new one. Let's give the other one a chance, first. + */ + iStartingUp = -1; + for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) + if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) { + break; + } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_INIT) { + iStartingUp = i; + break; + } +dbgprintf("after thrd search: i %d, iStartingUp %d\n", i, iStartingUp); + if(iStartingUp > -1) + ABORT_FINALIZE(RS_RET_ALREADY_STARTING); + + assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */ + + queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT); + iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); + dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", + (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); + /* we try to give the starting worker a little boost. It won't help much as we still + * hold the queue's mutex, but at least it has a chance to start on a single-CPU system. + */ + pthread_yield(); + +finalize_it: return iRet; } -/* send a command to all worker threads. A start index can be +/* send a command to all active worker threads. A start index can be * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted * mode and this start index take care of the special handling it needs to - * receive. - * TODO: check if we run into trouble with inactive worker threads + * receive. -- rgerhards, 2008-01-16 */ static inline rsRetVal -queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) +queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) { DEFiRet; int i; @@ -178,25 +202,27 @@ queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) /* tell the workers our request */ for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED) - queueTellWrkThrd(pThis, i, tCmd); + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) + queueTellActWrkThrd(pThis, i, tCmd); return iRet; } -/* start all regular worker threads - * rgerhards, 2008-01-15 + +/* This once was used to start all regular worker threads. Now, we have + * dynamic grow of the worker thread pool, based on needs. This function is + * still preserved, but it now does not start all but only worker 1, which + * is always present. + * rgerhards, 2008-01-16 */ static inline rsRetVal queueStrtAllWrkThrds(queue_t *pThis) { DEFiRet; - int i; - /* fire up the worker threads */ - for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { - queueStrtWrkThrd(pThis, i); - } + ISOBJ_TYPE_assert(pThis, queue); + assert(pThis->pWrkThrds[1].tCurrCmd < eWRKTHRD_RUN_INIT); + //iRet = queueStrtWrkThrd(pThis, 1); return iRet; } @@ -243,6 +269,41 @@ queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk) } +/* This function Checks if (another) worker threads needs to be started. It + * must be called while the caller holds a lock on the queue mutex. So it must not + * do anything that either reaquires the mutex or forces somebody else to aquire + * it (that would lead to a deadlock). + * rgerhards, 2008-01-16 + */ +static inline rsRetVal +queueChkAndStrtWrk(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + + /* process any pending thread requests */ + queueChkWrkThrdChanges(pThis); + + /* check if we need to start up another worker (only in regular mode) */ + if(pThis->qRunsDA == QRUNS_REGULAR) { + if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { +dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d\n", + pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd); + /* check if we satisfy the min nbr of messages per worker to start a new one */ + if(pThis->iCurNumWrkThrd == 0 || + pThis->iQueueSize / pThis->iCurNumWrkThrd > pThis->iMinMsgsPerWrkr) { + dbgprintf("Queue 0x%lx: high activity - starting additional worker thread.\n", + queueGetID(pThis)); + queueStrtNewWrkThrd(pThis); + } + } + } + + return iRet; +} + + /* --------------- code for disk-assisted (DA) queue modes -------------------- */ @@ -280,7 +341,7 @@ queueTurnOffDAMode(queue_t *pThis) pthread_mutex_destroy(&pThis->mutDA); pthread_cond_destroy(&pThis->condDA); - queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ + queueTellWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", queueGetID(pThis), iRet); @@ -289,6 +350,7 @@ queueTurnOffDAMode(queue_t *pThis) /* check if we had any worker thread changes and, if so, act * on them. At a minimum, terminated threads are harvested (joined). + * This function MUST NEVER block on the queue mutex! */ static rsRetVal queueChkWrkThrdChanges(queue_t *pThis) @@ -304,14 +366,15 @@ queueChkWrkThrdChanges(queue_t *pThis) /* go through all threads (including DA thread) */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { switch(pThis->pWrkThrds[i].tCurrCmd) { - case eWRKTHRDCMD_TERMINATED: + case eWRKTHRD_TERMINATED: queueJoinWrkThrd(pThis, i); break; /* these cases just to satisfy the compiler, we do not act an them: */ - case eWRKTHRDCMD_NEVER_RAN: - case eWRKTHRDCMD_RUN: - case eWRKTHRDCMD_SHUTDOWN: - case eWRKTHRDCMD_SHUTDOWN_IMMEDIATE: + case eWRKTHRD_STOPPED: + case eWRKTHRD_RUN_INIT: + case eWRKTHRD_RUNNING: + case eWRKTHRD_SHUTDOWN: + case eWRKTHRD_SHUTDOWN_IMMEDIATE: /* DO NOTHING */ break; } @@ -373,7 +436,7 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, * Note that the child queue now in almost all cases is non-empty, because we just enqueued * a message. */ - if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { + if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", queueGetID(pThis), iMyThrdIndx, iQueueSize); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -463,7 +526,7 @@ queueStrtDA(queue_t *pThis) * reserving worker thread 0 for DA queues. So if we would join the other * workers, we would screw up and do against our design goal. */ - CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE)); + CHKiRet(queueTellActWrkThrds(pThis, 1, eWRKTHRD_SHUTDOWN_IMMEDIATE)); /* as we are right now starting DA mode because we are so busy, it is * extremely unlikely that any worker is sleeping on empty queue. HOWEVER, @@ -493,6 +556,24 @@ finalize_it: } +/* initiate DA mode + * rgerhards, 2008-01-16 + */ +static inline rsRetVal +queueInitDA(queue_t *pThis) +{ + DEFiRet; + + /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ + pThis->qRunsDA = QRUNS_DA_INIT; + + /* now we must start our DA worker thread - it does the rest of the initialization */ + iRet = queueStrtWrkThrd(pThis, 0); + + return iRet; +} + + /* check if we need to start disk assisted mode and send some signals to * keep it running if we are already in it. * rgerhards, 2008-01-14 @@ -522,25 +603,18 @@ queueChkStrtDA(queue_t *pThis) pthread_mutex_unlock(&pThis->mutDA); pthread_setcancelstate(iCancelStateSave, NULL); queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ - - /* we need to re-check if we run disk-assisted, because that status may have changed - * in our high water mark processing. - */ - if(pThis->qRunsDA != QRUNS_REGULAR) - FINALIZE; } - /* if we reach this point, we are NOT currently running in DA mode. - * TODO: split this function, I think that would make the code easier - * to read. -- rgerhards, 2008-10-15 + /* we need to re-check if we run disk-assisted, because that status may have changed + * in our high water mark processing. */ - dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", - queueGetID(pThis), pThis->iQueueSize); - - pThis->qRunsDA = QRUNS_DA_INIT; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ + if(pThis->qRunsDA == QRUNS_REGULAR) { + /* if we reach this point, we are NOT currently running in DA mode. */ + dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", + queueGetID(pThis), pThis->iQueueSize); - /* now we must start our DA worker thread - it does the rest of the initialization */ - CHKiRet(queueStrtWrkThrd(pThis, 0)); + queueInitDA(pThis); /* initiate DA mode */ + } finalize_it: return iRet; @@ -1082,7 +1156,7 @@ queueWrkThrdCancel(queue_t *pThis) /* first tell the workers our request */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED) { + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) { dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); pthread_cancel(pThis->pWrkThrds[i].thrdID); } @@ -1108,7 +1182,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as * good. */ - iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, pThis->toQShutdown); + iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown); if(iRet == RS_RET_TIMED_OUT) { if(pThis->toQShutdown == 0) { iRet = RS_RET_OK; @@ -1116,7 +1190,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* OK, we now need to try force the shutdown */ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", queueGetID(pThis)); - iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); } } @@ -1133,7 +1207,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * time set (timeout == 0)! -- rgerhards, 2008-01-14 */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) { + if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) { queueJoinWrkThrd(pThis, i); } } @@ -1142,8 +1216,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * terminated now. For simplicity, we simply overwrite the states. */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) { - pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED; + if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) { + pThis->pWrkThrds[i].tCurrCmd = eWRKTHRD_TERMINATED; } } @@ -1161,7 +1235,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * rgerhards, 2008-01-14 */ static inline rsRetVal -queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) +queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) { DEFiRet; rsRetVal iRetLocal; @@ -1172,7 +1246,10 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) /* first check if we have still something to process */ - if(pThis->iQueueSize == 0) { + if(pThis->iQueueSize == 0 || + ( (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd != eWRKTHRD_RUNNING) + && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd != eWRKTHRD_SHUTDOWN) + )) { pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); FINALIZE; @@ -1242,6 +1319,7 @@ queueWorker(void *arg) { queue_t *pThis = (queue_t*) arg; sigset_t sigSet; + struct timespec t; int iMyThrdIndx; /* index for this thread in queue thread table */ int iCancelStateSave; @@ -1267,20 +1345,32 @@ queueWorker(void *arg) if(iMyThrdIndx == 0) { /* are we the DA worker? */ if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */ /* if we could not init the DA queue, we have nothing to do, so shut down. */ - queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE); + queueTellWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE); } } + /* finally change to RUNNING state. We need to check if we actually should still run, + * because someone may have requested us to shut down even before we got a chance to do + * our init. That would be a bad race... -- rgerhards, 2008-01-16 + */ + if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUN_INIT) + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_RUNNING; /* we are running now! */ + pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); /* end one-time stuff */ /* now we have our identity, on to real processing */ - while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN - || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) { + while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING + || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); - while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { + + /* process any pending thread requests */ + queueChkWrkThrdChanges(pThis); + +dbgprintf("Queue %p/w%d: pre empty queue, qsize %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); + while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); if(pThis->bSignalOnEmpty > 0) { @@ -1304,11 +1394,26 @@ dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); /* If we arrive here, we have the regular case, where we can safely assume that * iQueueSize and tCmd have not changed since the while(). */ -dbgprintf("Queue %p/w%d: pre condwait ->notEmpty\n", pThis, iMyThrdIndx); - pthread_cond_wait(pThis->notEmpty, pThis->mut); +dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown); + if(pThis->toWrkShutdown == -1) { +dbgprintf("worker never times out!\n"); + /* never shut down any started worker */ + pthread_cond_wait(pThis->notEmpty, pThis->mut); + } else { + queueTimeoutComp(&t, pThis->toWrkShutdown);/* get absolute timeout */ + if(pthread_cond_timedwait (pThis->notEmpty, pThis->mut, &t) != 0) { + dbgprintf("Queue 0x%lx/w%d: inactivity timeout, worker terminating...\n", + queueGetID(pThis), iMyThrdIndx); + /* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker + * does not terminate if in the mean time a new message arrived. + */ + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_SHUTDOWN; + } + } dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); } - queueWorkerCallConsumer(pThis, iMyThrdIndx, iCancelStateSave); + + queueWorkerChkAndCallConsumer(pThis, iMyThrdIndx, iCancelStateSave); /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is * a cancellation point in itself. As we run most of the time without cancel enabled, I fear @@ -1328,10 +1433,10 @@ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); * and would be very hard to debug. The yield() is a sure fix, its performance overhead * should be well accepted given the above facts. -- rgerhards, 2008-01-10 */ + pthread_yield(); dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); - pthread_yield(); - if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0) + if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); } @@ -1340,12 +1445,12 @@ dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd--; - if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN || - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN_IMMEDIATE) { + if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN || + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE) { /* in shutdown case, we need to flag termination. All other commands * have a meaning to the thread harvester, so we can not overwrite them */ - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_TERMINATED; } pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ @@ -1440,7 +1545,6 @@ finalize_it: rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; - int i; rsRetVal iRetLocal; int bInitialized = 0; /* is queue already initialized? */ @@ -1452,61 +1556,42 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ dbgprintf("Queue 0x%lx: type %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType, pThis->bIsDA, pThis->iMaxFileSize); - if(pThis->qType != QUEUETYPE_DIRECT) { - if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - - if(pThis->bIsDA) { - /* If we are disk-assisted, we need to check if there is a QIF file - * which we need to load. -- rgerhards, 2008-01-15 - */ - iRetLocal = queueHaveQIF(pThis); - if(iRetLocal == RS_RET_OK) { - dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", - queueGetID(pThis)); + if(pThis->qType == QUEUETYPE_DIRECT) + FINALIZE; /* with direct queues, we are already finished... */ - /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ - pThis->qRunsDA = QRUNS_DA_INIT; - - /* now we must start our DA worker thread - it does the rest of the initialization */ - CHKiRet(queueStrtWrkThrd(pThis, 0)); - bInitialized = 1; - } - } + if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - if(!bInitialized) { - dbgprintf("Queue 0x%lx: queue starts up without loading any disk state\n", queueGetID(pThis)); - /* worker 0 is reserved for disk-assisted mode, so do not start */ - queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_NEVER_RAN); + if(pThis->bIsDA) { + /* If we are disk-assisted, we need to check if there is a QIF file + * which we need to load. -- rgerhards, 2008-01-15 + */ + iRetLocal = queueHaveQIF(pThis); + if(iRetLocal == RS_RET_OK) { + dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", + queueGetID(pThis)); - /* fire up the worker threads */ - for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { - queueStrtWrkThrd(pThis, i); - } + queueInitDA(pThis); /* initiate DA mode */ + bInitialized = 1; /* we are done */ + } else { + // TODO: use logerror? -- rgerhards, 2008-01-16 + dbgprintf("Queue 0x%lx: error %d trying to access on-disk queue files, starting without them. " + "Some data may be lost\n", queueGetID(pThis), iRetLocal); } } -finalize_it: - return iRet; -} - - -#if 0 -/* persist disk status on disk. This is necessary if we run either - * a disk queue or in a disk assisted mode. - */ -static rsRetVal queuePersistDskFilInfo(queue_t *pThis) -{ - DEFiRet; - - assert(pThis != NULL); + if(!bInitialized) { + dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); + /* worker 0 is reserved for disk-assisted mode, so do not start */ + queueTellWrkThrd(pThis, 0, eWRKTHRD_STOPPED); + /* fire up the worker threads */ + queueStrtAllWrkThrds(pThis); + } finalize_it: return iRet; } -#endif - /* persist the queue to disk. If we have something to persist, we first @@ -1619,7 +1704,7 @@ rsRetVal queueDestruct(queue_t *pThis) * leave it for the time being. -- rgerhards, 2008-01-16 */ if(pThis->qRunsDA != QRUNS_REGULAR) - queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 0); + queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0); /* then, terminate our own worker threads */ if(pThis->pWrkThrds != NULL) { @@ -1760,6 +1845,9 @@ queueEnqObj(queue_t *pThis, void *pUsr) if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); + /* re-process any new pending thread requests and see if we need to start workers */ + queueChkAndStrtWrk(pThis); + /* and finally (try to) enqueue what is left over */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); @@ -1790,12 +1878,14 @@ finalize_it: DEFpropSetMeth(queue, iPersistUpdCnt, int); DEFpropSetMeth(queue, toQShutdown, long); DEFpropSetMeth(queue, toActShutdown, long); +DEFpropSetMeth(queue, toWrkShutdown, long); DEFpropSetMeth(queue, toEnq, long); DEFpropSetMeth(queue, iHighWtrMrk, int); DEFpropSetMeth(queue, iLowWtrMrk, int); DEFpropSetMeth(queue, iDiscardMrk, int); DEFpropSetMeth(queue, iDiscardSeverity, int); DEFpropSetMeth(queue, bIsDA, int); +DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); /* get the size of this queue. The important thing about this get method is that it -- cgit