diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-18 16:01:07 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-18 16:01:07 +0000 |
commit | 2bd1e283527bae01d61b85682a7e8ecc778997a8 (patch) | |
tree | f76419b016a63a5bd10a347f67598cebd2bb6b0f | |
parent | 1acd5c7a51432e80e0670df38667f4af445228c5 (diff) | |
download | rsyslog-2bd1e283527bae01d61b85682a7e8ecc778997a8.tar.gz rsyslog-2bd1e283527bae01d61b85682a7e8ecc778997a8.tar.xz rsyslog-2bd1e283527bae01d61b85682a7e8ecc778997a8.zip |
- created an in-depth description of DA assisted queue mode
- snapshot of new thread coding - DA mode still does not work, but need to
save
-rw-r--r-- | doc/Makefile.am | 4 | ||||
-rw-r--r-- | queue.c | 314 | ||||
-rw-r--r-- | queue.h | 1 |
3 files changed, 219 insertions, 100 deletions
diff --git a/doc/Makefile.am b/doc/Makefile.am index 74e1be69..bda32d6e 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -20,6 +20,10 @@ html_files = \ syslog-protocol.html \ version_naming.html \ contributors.html \ + dev_queue.html \ + queueWorkerLogic.dia \ + queueWorkerLogic.jpg \ + queueWorkerLogic_small.jpg \ rsconf1_actionexeconlyifpreviousissuspended.html \ rsconf1_actionresumeinterval.html \ rsconf1_allowedsender.html \ @@ -10,6 +10,10 @@ * * File begun on 2008-01-03 by RGerhards * + * There is some in-depth documentation available in doc/dev_queue.html + * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it + * if you are getting aquainted to the object. + * * Copyright 2008 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. @@ -80,6 +84,11 @@ qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd) DEFiRet; assert(pThis != NULL); + +dbgprintf("Queue 0x%lx: trying to send command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd); + if(pThis->tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE && tCmd != eWRKTHRD_TERMINATING) + FINALIZE; + dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd); /* change some admin structures */ @@ -108,6 +117,7 @@ qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd) pThis->tCurrCmd = tCmd; +finalize_it: return iRet; } @@ -155,7 +165,7 @@ qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i) } -/* Waitis until the specified worker thread +/* Waits until the specified worker thread * changed to full running state (aka have started up). This function * MUST NOT be called while the queue mutex is locked as it does * this itself. The wait is without timeout. @@ -183,6 +193,23 @@ dbgprintf("startup done!\n"); } +/* waits until all worker threads that a currently initializing are fully started up + * rgerhards, 2008-01-18 + */ +static rsRetVal +qWrkrWaitAllWrkrStartup(queue_t *pThis) +{ + DEFiRet; + int i; + + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + qWrkrWaitStartup(pThis->pWrkThrds + i); + } + + return iRet; +} + + /* initialize the qWrkThrd_t structure - this MUST be called right after * startup of a worker thread. -- rgerhards, 2008-01-17 */ @@ -388,8 +415,11 @@ queueChkAndStrtWrk(queue_t *pThis) /* process any pending thread requests */ queueChkWrkThrdChanges(pThis); - /* check if we need to start up another worker (only in regular mode) */ - if(pThis->qRunsDA == QRUNS_REGULAR && pThis->bEnqOnly == 0) { + if(pThis->bEnqOnly == 1) + FINALIZE; /* in enqueue-only mode we have no workers */ + + /* check if we need to start up another worker */ + if(pThis->qRunsDA == QRUNS_REGULAR) { if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n", pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); @@ -401,8 +431,16 @@ dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qR queueStrtNewWrkThrd(pThis); } } + } else { + if(pThis->iCurNumWrkThrd == 0 && pThis->bEnqOnly == 0) { +dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, workers %d, qRunsDA: %d\n", + pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); + /* DA worker has timed out and needs to be restarted */ + iRet = queueStrtWrkThrd(pThis, 0); + } } - + +finalize_it: return iRet; } @@ -438,7 +476,7 @@ queueTurnOffDAMode(queue_t *pThis) * messages come into the queue, we may be well off with a single worker. * rgerhards, 2008-01-16 */ - if(pThis->bEnqOnly == 0) + if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0) queueStrtNewWrkThrd(pThis); pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ @@ -520,64 +558,6 @@ queueChkIsDA(queue_t *pThis) } -/* This is a special consumer to feed the disk-queue in disk-assited mode. - * When active, our own queue more or less acts as a memory buffer to the disk. - * So this consumer just needs to drain the memory queue and submit entries - * to the disk queue. The disk queue will then call the actual consumer from - * the app point of view (we chain two queues here). - * This function must also handle the LowWaterMark situation, at which it is - * switched back to in-memory queueing. - * rgerhards, 2008-01-14 - */ -static inline rsRetVal -queueDAConsumer(queue_t *pThis, int iMyThrdIndx, int iQueueSize, void *pUsr) -{ - DEFiRet; - int iCancelStateSave; - int iSizeDAQueue; - - ISOBJ_TYPE_assert(pThis, queue); - ISOBJ_assert(pUsr); - assert(pThis->qRunsDA != QRUNS_REGULAR); - -dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, iQueueSize); - CHKiRet(queueEnqObj(pThis->pqDA, pUsr)); - - /* We check if we reached the low water mark (but only if we are not in shutdown mode) - * Note that the child queue now in almost all cases is non-empty, because we just enqueued - * a message. - */ - if(iQueueSize <= pThis->iLowWtrMrk && iQueueSize != 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { - dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", - queueGetID(pThis), iMyThrdIndx, iQueueSize); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -dbgprintf("pre mutex lock (think about CLEANUP!)\n"); - pthread_mutex_lock(&pThis->mutDA); -dbgprintf("mutex locked (think about CLEANUP!)\n"); - /* wait for either passing the high water mark or the child disk queue drain */ - pthread_cond_wait(&pThis->condDA, &pThis->mutDA); -dbgprintf("condition returned\n"); - pthread_mutex_unlock(&pThis->mutDA); -dbgprintf("mutex unlocked (think about CLEANUP!)\n"); - pthread_setcancelstate(iCancelStateSave, NULL); - } - - /* now check if the DA queue is empty. If so, we can turn off DA mode. Note that we must - * use queueGetQueueSize() in order to avoid a race on child iQueueSize. -- rgerhards, 2008-01-16 - */ - CHKiRet(queueGetQueueSize(pThis->pqDA, &iSizeDAQueue)); - -dbgprintf("Queue %p/w%d: DA queue size now %d\n", pThis, iMyThrdIndx, iSizeDAQueue); - if(iSizeDAQueue == 0) { - CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */ - } - -finalize_it: -dbgprintf("DAConsumer returns with iRet %d\n", iRet); - return iRet; -} - - /* Start disk-assisted queue mode. All internal settings are changed. This is supposed * to be called from the DA worker, which must have been started before. The most important * chore of this function is to create the DA queue object. If that function fails, @@ -714,11 +694,11 @@ queueChkStrtDA(queue_t *pThis) */ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", queueGetID(pThis), pThis->iQueueSize); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(&pThis->mutDA); + //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + //pthread_mutex_lock(&pThis->mutDA); pthread_cond_signal(&pThis->condDA); - pthread_mutex_unlock(&pThis->mutDA); - pthread_setcancelstate(iCancelStateSave, NULL); + //pthread_mutex_unlock(&pThis->mutDA); + //pthread_setcancelstate(iCancelStateSave, NULL); queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ } @@ -1323,6 +1303,63 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) } +/* cancellation cleanup handler - frees provided mutex + * rgerhards, 2008-01-14 + */ +static void queueMutexCleanup(void *arg) +{ + assert(arg != NULL); + pthread_mutex_unlock((pthread_mutex_t*) arg); +} + +/* This is a special consumer to feed the disk-queue in disk-assited mode. + * When active, our own queue more or less acts as a memory buffer to the disk. + * So this consumer just needs to drain the memory queue and submit entries + * to the disk queue. The disk queue will then call the actual consumer from + * the app point of view (we chain two queues here). + * rgerhards, 2008-01-14 + */ +static inline rsRetVal +queueDAConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, queue); + assert(pThis->qRunsDA != QRUNS_REGULAR); + ISOBJ_assert(pWrkrInst->pUsr); + +dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize);/* dirty iQueueSize! */ + CHKiRet(queueEnqObj(pThis->pqDA, pWrkrInst->pUsr)); + + /* We check if we reached the low water mark (but only if we are not in shutdown mode) + * Note that the child queue now in almost all cases is non-empty, because we just enqueued + * a message. Note that we need a quick check below to see if we are still in running state. + * If not, we do not go into the wait, because that's not a good thing to do. We do not + * do a full termination check, as this is done when we go back to the main worker loop. + * We need to re-aquire the queue mutex here, because we need to have a consistent + * access to the queue's admin data. + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +dbgprintf("pre mutex lock (think about CLEANUP!)\n"); + pthread_mutex_lock(pThis->mut); + pthread_cleanup_push(queueMutexCleanup, pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); +dbgprintf("mutex locked (think about CLEANUP!)\n"); + if(pThis->iQueueSize <= pThis->iLowWtrMrk && pWrkrInst->tCurrCmd == eWRKTHRD_RUNNING) { + dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", + queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize); + /* wait for either passing the high water mark or the child disk queue drain */ + pthread_cond_wait(&pThis->condDA, pThis->mut); + } + pthread_cleanup_pop(1); /* release mutex in an atomic way via cleanup handler */ + +finalize_it: +dbgprintf("DAConsumer returns with iRet %d\n", iRet); + return iRet; +} + + /* This is a helper for queueWorker () it either calls the configured * consumer or the DA-consumer (if in disk-assisted mode). It is * protected by the queue mutex, but MUST release it as soon as possible. @@ -1345,25 +1382,17 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancel iMyThrdIndx = pWrkrInst->iThrd; - /* first check if we have still something to process */ - if(pThis->iQueueSize == 0 || - ( (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd != eWRKTHRD_RUNNING) - && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd != eWRKTHRD_SHUTDOWN) - )) { - pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - FINALIZE; - } - /* dequeue element (still protected from mutex) */ iRet = queueDel(pThis, &pUsr); queueChkPersist(pThis); // when we support peek(), we must do this down after the del! - qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */ - iQueueSize = pThis->iQueueSize; /* ... and the same for this property */ + iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */ pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */ + qRunsDA = pThis->qRunsDA; pthread_mutex_unlock(pThis->mut); pthread_cond_signal(pThis->notFull); pthread_setcancelstate(iCancelStateSave, NULL); + /* WE ARE NO LONGER PROTECTED FROM THE MUTEX */ + /* do actual processing (the lengthy part, runs in parallel) * If we had a problem while dequeing, we do not call the consumer, * but we otherwise ignore it. This is in the hopes that it will be @@ -1373,11 +1402,14 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancel if(iRet != RS_RET_OK) FINALIZE; + /* call consumer depending on queue mode (in DA mode, we have just one thread, so it can not change) */ if(qRunsDA == QRUNS_DA) { - queueDAConsumer(pThis, iMyThrdIndx, iQueueSize, pUsr); + queueDAConsumer(pThis, pWrkrInst); } else { - /* we are running in normal, non-disk-assisted mode */ - /* do a quick check if we need to drain the queue */ + /* we are running in normal, non-disk-assisted mode + * do a quick check if we need to drain the queue. It is OK to use the cached + * iQueueSize here, because it does not hurt if it is slightly wrong. + */ if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { @@ -1435,6 +1467,37 @@ static void queueWorkerCancelCleanup(void *arg) } +/* This function is created to keep the code in queueWorker () short. Thus it + * also does not abide to the usual calling conventions used in rsyslog. It is more + * like a macro. Its sole purpose is to have a handy shortcut for the queue + * termination condition. For the same reason, the calling parameters are a bit + * more verbose than the need to be in theory. The reasoning is the Worker has + * everything handy and so we do not need to access it from memory (OK, the + * optimized would probably have created the same code, but why not do it + * optimal right away...). The function returns 0 if the worker should terminate + * and something else if it should continue to run. + * rgerhards, 2008-01-18 + */ +static inline int +queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst) +{ + register int b; /* this is a boolean! */ + int iSizeDAQueue; + + /* first check the usual termination condition that applies to all workers */ + b = ( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) + || ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0))); +dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d, cont run: %d, cmd %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b, qWrkrGetState(pWrkrInst)); + if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) { + queueGetQueueSize(pThis->pqDA, &iSizeDAQueue); + b = pThis->iQueueSize >= pThis->iHighWtrMrk || iSizeDAQueue != 0; + } + +dbgprintf("Queue %p/w%d: pre empty queue, qsize %d, cont run: %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b); + return b; +} + + /* Each queue has at least one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. @@ -1452,6 +1515,7 @@ queueWorker(void *arg) int iMyThrdIndx; /* index for this thread in queue thread table */ int iCancelStateSave; qWrkThrd_t *pWrkrInst; /* for cleanup handler */ + int bContinueRun; ISOBJ_TYPE_assert(pThis, queue); @@ -1470,7 +1534,8 @@ queueWorker(void *arg) pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */ dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); - if(iMyThrdIndx == 0) { /* are we the DA worker? */ +dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA); + if((iMyThrdIndx == 0) && (pThis->qRunsDA != QRUNS_DA)) { /* are we the DA worker? */ if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */ /* if we could not init the DA queue, we have nothing to do, so shut down. */ queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE); @@ -1491,30 +1556,36 @@ queueWorker(void *arg) /* end one-time stuff */ /* now we have our identity, on to real processing */ - while( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) - || (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) { + bContinueRun = 1; /* we need this variable, because we need to check the actual termination condition + * while protected by mutex */ + while(bContinueRun) { +dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", + queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); /* process any pending thread requests */ queueChkWrkThrdChanges(pThis); -dbgprintf("Queue %p/w%d: pre empty queue, qsize %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); - while(pThis->iQueueSize == 0 && qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) { + if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) { + pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + continue; /* and break loop */ + } + + /* if we reach this point, we are still protected by the mutex */ + + if(pThis->iQueueSize == 0) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); + /* TODO: check if the parent DA worker is running and, if not, initiate it */ if(pThis->bSignalOnEmpty > 0) { /* we need to signal our parent queue that we are empty */ -dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); + dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); pthread_mutex_lock(pThis->mutSignalOnEmpty); pthread_cond_signal(pThis->condSignalOnEmpty); pthread_mutex_unlock(pThis->mutSignalOnEmpty); -dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); - /* we now need to re-check if we still shall continue to - * run. This is important because the parent may have changed our - * state. So we simply go back to the begin of the loop. - */ - //continue; + dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); } if(pThis->bSignalOnEmpty > 1) { /* no mutex associated with this condition, it's just a try (but needed @@ -1524,9 +1595,9 @@ dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); /* If we arrive here, we have the regular case, where we can safely assume that * iQueueSize and tCmd have not changed since the while(). */ -dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown); + dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown); if(pThis->toWrkShutdown == -1) { -dbgprintf("worker never times out!\n"); + dbgprintf("worker never times out!\n"); /* never shut down any started worker */ pthread_cond_wait(pThis->notEmpty, pThis->mut); } else { @@ -1540,9 +1611,15 @@ dbgprintf("worker never times out!\n"); qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN); } } -dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); + dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); + pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + pthread_testcancel(); /* see big comment below */ + pthread_yield(); /* see big comment below */ + continue; /* request next iteration */ } + /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave); /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is @@ -1564,8 +1641,6 @@ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); * should be well accepted given the above facts. -- rgerhards, 2008-01-10 */ pthread_yield(); -dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", - queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); @@ -1574,6 +1649,11 @@ dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); + /* check if we are the DA worker and, if so, switch back to regular mode */ + if(pWrkrInst->iThrd == 0) { + queueTurnOffDAMode(pThis); + } + pThis->iCurNumWrkThrd--; pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ pthread_cleanup_pop(0); /* remove cleanup handler */ @@ -1840,6 +1920,38 @@ rsRetVal queueDestruct(queue_t **ppThis) ISOBJ_TYPE_assert(pThis, queue); pThis->bSaveOnShutdown = 1; // TODO: Test remove + + pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ + + /* optimize parameters for shutdown of DA-enabled queues */ + if(pThis->bIsDA) { +dbgprintf("IsDA queue, modifying params for draining\n"); + pThis->iHighWtrMrk = 1; /* make sure we drain */ + pThis->iLowWtrMrk = 0; /* disable low water mark algo */ + if(pThis->qRunsDA == QRUNS_REGULAR) { + if(pThis->iQueueSize > 0) { + queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */ + } + } else { + queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ + } + if(pThis->bSaveOnShutdown) { +dbgprintf("bSaveOnShutdown set, eternal timeout set\n"); + pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; + } + /* now we need to activate workers (read doc/dev_queue.html) */ + } + + // TODO: we may need to startup a regular worker if not in DA mode! + /* wait until all pending workers are started up */ + qWrkrWaitAllWrkrStartup(pThis); + + /* terminate our own worker threads */ + if(pThis->pWrkThrds != NULL) { + queueShutdownWorkers(pThis); + } + +#if 0 /* if running DA, switch the DA queue to enqueue-only mode. That saves us some CPU cycles as * its workers do no longer need to run. It also prevents longer-running actions to spring into * existence while we are draining the main (memory) queue. -- rgerhads, 2008-01-16 @@ -1865,13 +1977,15 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove dbgprintf("Queue 0x%lx: in-memory queue contains %d entries after worker shutdown - using DA to save to disk\n", queueGetID(pThis), pThis->iQueueSize); pThis->iLowWtrMrk = 0; /* disable low water mark algo */ + pThis->iHighWtrMrk = 1; /* make sure we drain */ queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */ qWrkrWaitStartup(QUEUE_PTR_DA_WORKER(pThis)); /* wait until DA worker has actually started */ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; queueShutdownWorkers(pThis); /* and tell it to shut down. The trick is it will run until q is drained */ } +#endif - /* if running DA, terminate disk queue */ + /* if still running DA, terminate disk queue */ if(pThis->qRunsDA != QRUNS_REGULAR) queueDestruct(&pThis->pqDA); @@ -88,6 +88,7 @@ typedef struct queue_s { int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ + int bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ |