summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-18 16:01:07 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-18 16:01:07 +0000
commit2bd1e283527bae01d61b85682a7e8ecc778997a8 (patch)
treef76419b016a63a5bd10a347f67598cebd2bb6b0f
parent1acd5c7a51432e80e0670df38667f4af445228c5 (diff)
downloadrsyslog-2bd1e283527bae01d61b85682a7e8ecc778997a8.tar.gz
rsyslog-2bd1e283527bae01d61b85682a7e8ecc778997a8.tar.xz
rsyslog-2bd1e283527bae01d61b85682a7e8ecc778997a8.zip
- created an in-depth description of DA assisted queue mode
- snapshot of new thread coding - DA mode still does not work, but need to save
-rw-r--r--doc/Makefile.am4
-rw-r--r--queue.c314
-rw-r--r--queue.h1
3 files changed, 219 insertions, 100 deletions
diff --git a/doc/Makefile.am b/doc/Makefile.am
index 74e1be69..bda32d6e 100644
--- a/doc/Makefile.am
+++ b/doc/Makefile.am
@@ -20,6 +20,10 @@ html_files = \
syslog-protocol.html \
version_naming.html \
contributors.html \
+ dev_queue.html \
+ queueWorkerLogic.dia \
+ queueWorkerLogic.jpg \
+ queueWorkerLogic_small.jpg \
rsconf1_actionexeconlyifpreviousissuspended.html \
rsconf1_actionresumeinterval.html \
rsconf1_allowedsender.html \
diff --git a/queue.c b/queue.c
index 83d65a4f..6ac69c11 100644
--- a/queue.c
+++ b/queue.c
@@ -10,6 +10,10 @@
*
* File begun on 2008-01-03 by RGerhards
*
+ * There is some in-depth documentation available in doc/dev_queue.html
+ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
+ * if you are getting aquainted to the object.
+ *
* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
@@ -80,6 +84,11 @@ qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd)
DEFiRet;
assert(pThis != NULL);
+
+dbgprintf("Queue 0x%lx: trying to send command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd);
+ if(pThis->tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE && tCmd != eWRKTHRD_TERMINATING)
+ FINALIZE;
+
dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd);
/* change some admin structures */
@@ -108,6 +117,7 @@ qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd)
pThis->tCurrCmd = tCmd;
+finalize_it:
return iRet;
}
@@ -155,7 +165,7 @@ qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i)
}
-/* Waitis until the specified worker thread
+/* Waits until the specified worker thread
* changed to full running state (aka have started up). This function
* MUST NOT be called while the queue mutex is locked as it does
* this itself. The wait is without timeout.
@@ -183,6 +193,23 @@ dbgprintf("startup done!\n");
}
+/* waits until all worker threads that a currently initializing are fully started up
+ * rgerhards, 2008-01-18
+ */
+static rsRetVal
+qWrkrWaitAllWrkrStartup(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+
+ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ qWrkrWaitStartup(pThis->pWrkThrds + i);
+ }
+
+ return iRet;
+}
+
+
/* initialize the qWrkThrd_t structure - this MUST be called right after
* startup of a worker thread. -- rgerhards, 2008-01-17
*/
@@ -388,8 +415,11 @@ queueChkAndStrtWrk(queue_t *pThis)
/* process any pending thread requests */
queueChkWrkThrdChanges(pThis);
- /* check if we need to start up another worker (only in regular mode) */
- if(pThis->qRunsDA == QRUNS_REGULAR && pThis->bEnqOnly == 0) {
+ if(pThis->bEnqOnly == 1)
+ FINALIZE; /* in enqueue-only mode we have no workers */
+
+ /* check if we need to start up another worker */
+ if(pThis->qRunsDA == QRUNS_REGULAR) {
if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) {
dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n",
pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA);
@@ -401,8 +431,16 @@ dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qR
queueStrtNewWrkThrd(pThis);
}
}
+ } else {
+ if(pThis->iCurNumWrkThrd == 0 && pThis->bEnqOnly == 0) {
+dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, workers %d, qRunsDA: %d\n",
+ pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA);
+ /* DA worker has timed out and needs to be restarted */
+ iRet = queueStrtWrkThrd(pThis, 0);
+ }
}
-
+
+finalize_it:
return iRet;
}
@@ -438,7 +476,7 @@ queueTurnOffDAMode(queue_t *pThis)
* messages come into the queue, we may be well off with a single worker.
* rgerhards, 2008-01-16
*/
- if(pThis->bEnqOnly == 0)
+ if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0)
queueStrtNewWrkThrd(pThis);
pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
@@ -520,64 +558,6 @@ queueChkIsDA(queue_t *pThis)
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
- * When active, our own queue more or less acts as a memory buffer to the disk.
- * So this consumer just needs to drain the memory queue and submit entries
- * to the disk queue. The disk queue will then call the actual consumer from
- * the app point of view (we chain two queues here).
- * This function must also handle the LowWaterMark situation, at which it is
- * switched back to in-memory queueing.
- * rgerhards, 2008-01-14
- */
-static inline rsRetVal
-queueDAConsumer(queue_t *pThis, int iMyThrdIndx, int iQueueSize, void *pUsr)
-{
- DEFiRet;
- int iCancelStateSave;
- int iSizeDAQueue;
-
- ISOBJ_TYPE_assert(pThis, queue);
- ISOBJ_assert(pUsr);
- assert(pThis->qRunsDA != QRUNS_REGULAR);
-
-dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, iQueueSize);
- CHKiRet(queueEnqObj(pThis->pqDA, pUsr));
-
- /* We check if we reached the low water mark (but only if we are not in shutdown mode)
- * Note that the child queue now in almost all cases is non-empty, because we just enqueued
- * a message.
- */
- if(iQueueSize <= pThis->iLowWtrMrk && iQueueSize != 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
- dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
- queueGetID(pThis), iMyThrdIndx, iQueueSize);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-dbgprintf("pre mutex lock (think about CLEANUP!)\n");
- pthread_mutex_lock(&pThis->mutDA);
-dbgprintf("mutex locked (think about CLEANUP!)\n");
- /* wait for either passing the high water mark or the child disk queue drain */
- pthread_cond_wait(&pThis->condDA, &pThis->mutDA);
-dbgprintf("condition returned\n");
- pthread_mutex_unlock(&pThis->mutDA);
-dbgprintf("mutex unlocked (think about CLEANUP!)\n");
- pthread_setcancelstate(iCancelStateSave, NULL);
- }
-
- /* now check if the DA queue is empty. If so, we can turn off DA mode. Note that we must
- * use queueGetQueueSize() in order to avoid a race on child iQueueSize. -- rgerhards, 2008-01-16
- */
- CHKiRet(queueGetQueueSize(pThis->pqDA, &iSizeDAQueue));
-
-dbgprintf("Queue %p/w%d: DA queue size now %d\n", pThis, iMyThrdIndx, iSizeDAQueue);
- if(iSizeDAQueue == 0) {
- CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */
- }
-
-finalize_it:
-dbgprintf("DAConsumer returns with iRet %d\n", iRet);
- return iRet;
-}
-
-
/* Start disk-assisted queue mode. All internal settings are changed. This is supposed
* to be called from the DA worker, which must have been started before. The most important
* chore of this function is to create the DA queue object. If that function fails,
@@ -714,11 +694,11 @@ queueChkStrtDA(queue_t *pThis)
*/
dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
queueGetID(pThis), pThis->iQueueSize);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(&pThis->mutDA);
+ //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ //pthread_mutex_lock(&pThis->mutDA);
pthread_cond_signal(&pThis->condDA);
- pthread_mutex_unlock(&pThis->mutDA);
- pthread_setcancelstate(iCancelStateSave, NULL);
+ //pthread_mutex_unlock(&pThis->mutDA);
+ //pthread_setcancelstate(iCancelStateSave, NULL);
queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
}
@@ -1323,6 +1303,63 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
}
+/* cancellation cleanup handler - frees provided mutex
+ * rgerhards, 2008-01-14
+ */
+static void queueMutexCleanup(void *arg)
+{
+ assert(arg != NULL);
+ pthread_mutex_unlock((pthread_mutex_t*) arg);
+}
+
+/* This is a special consumer to feed the disk-queue in disk-assited mode.
+ * When active, our own queue more or less acts as a memory buffer to the disk.
+ * So this consumer just needs to drain the memory queue and submit entries
+ * to the disk queue. The disk queue will then call the actual consumer from
+ * the app point of view (we chain two queues here).
+ * rgerhards, 2008-01-14
+ */
+static inline rsRetVal
+queueDAConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(pThis->qRunsDA != QRUNS_REGULAR);
+ ISOBJ_assert(pWrkrInst->pUsr);
+
+dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize);/* dirty iQueueSize! */
+ CHKiRet(queueEnqObj(pThis->pqDA, pWrkrInst->pUsr));
+
+ /* We check if we reached the low water mark (but only if we are not in shutdown mode)
+ * Note that the child queue now in almost all cases is non-empty, because we just enqueued
+ * a message. Note that we need a quick check below to see if we are still in running state.
+ * If not, we do not go into the wait, because that's not a good thing to do. We do not
+ * do a full termination check, as this is done when we go back to the main worker loop.
+ * We need to re-aquire the queue mutex here, because we need to have a consistent
+ * access to the queue's admin data.
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+dbgprintf("pre mutex lock (think about CLEANUP!)\n");
+ pthread_mutex_lock(pThis->mut);
+ pthread_cleanup_push(queueMutexCleanup, pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+dbgprintf("mutex locked (think about CLEANUP!)\n");
+ if(pThis->iQueueSize <= pThis->iLowWtrMrk && pWrkrInst->tCurrCmd == eWRKTHRD_RUNNING) {
+ dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
+ queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize);
+ /* wait for either passing the high water mark or the child disk queue drain */
+ pthread_cond_wait(&pThis->condDA, pThis->mut);
+ }
+ pthread_cleanup_pop(1); /* release mutex in an atomic way via cleanup handler */
+
+finalize_it:
+dbgprintf("DAConsumer returns with iRet %d\n", iRet);
+ return iRet;
+}
+
+
/* This is a helper for queueWorker () it either calls the configured
* consumer or the DA-consumer (if in disk-assisted mode). It is
* protected by the queue mutex, but MUST release it as soon as possible.
@@ -1345,25 +1382,17 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancel
iMyThrdIndx = pWrkrInst->iThrd;
- /* first check if we have still something to process */
- if(pThis->iQueueSize == 0 ||
- ( (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd != eWRKTHRD_RUNNING)
- && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd != eWRKTHRD_SHUTDOWN)
- )) {
- pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- FINALIZE;
- }
-
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
- qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */
- iQueueSize = pThis->iQueueSize; /* ... and the same for this property */
+ iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */
pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */
+ qRunsDA = pThis->qRunsDA;
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal(pThis->notFull);
pthread_setcancelstate(iCancelStateSave, NULL);
+ /* WE ARE NO LONGER PROTECTED FROM THE MUTEX */
+
/* do actual processing (the lengthy part, runs in parallel)
* If we had a problem while dequeing, we do not call the consumer,
* but we otherwise ignore it. This is in the hopes that it will be
@@ -1373,11 +1402,14 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancel
if(iRet != RS_RET_OK)
FINALIZE;
+ /* call consumer depending on queue mode (in DA mode, we have just one thread, so it can not change) */
if(qRunsDA == QRUNS_DA) {
- queueDAConsumer(pThis, iMyThrdIndx, iQueueSize, pUsr);
+ queueDAConsumer(pThis, pWrkrInst);
} else {
- /* we are running in normal, non-disk-assisted mode */
- /* do a quick check if we need to drain the queue */
+ /* we are running in normal, non-disk-assisted mode
+ * do a quick check if we need to drain the queue. It is OK to use the cached
+ * iQueueSize here, because it does not hurt if it is slightly wrong.
+ */
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
@@ -1435,6 +1467,37 @@ static void queueWorkerCancelCleanup(void *arg)
}
+/* This function is created to keep the code in queueWorker () short. Thus it
+ * also does not abide to the usual calling conventions used in rsyslog. It is more
+ * like a macro. Its sole purpose is to have a handy shortcut for the queue
+ * termination condition. For the same reason, the calling parameters are a bit
+ * more verbose than the need to be in theory. The reasoning is the Worker has
+ * everything handy and so we do not need to access it from memory (OK, the
+ * optimized would probably have created the same code, but why not do it
+ * optimal right away...). The function returns 0 if the worker should terminate
+ * and something else if it should continue to run.
+ * rgerhards, 2008-01-18
+ */
+static inline int
+queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst)
+{
+ register int b; /* this is a boolean! */
+ int iSizeDAQueue;
+
+ /* first check the usual termination condition that applies to all workers */
+ b = ( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING)
+ || ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0)));
+dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d, cont run: %d, cmd %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b, qWrkrGetState(pWrkrInst));
+ if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) {
+ queueGetQueueSize(pThis->pqDA, &iSizeDAQueue);
+ b = pThis->iQueueSize >= pThis->iHighWtrMrk || iSizeDAQueue != 0;
+ }
+
+dbgprintf("Queue %p/w%d: pre empty queue, qsize %d, cont run: %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b);
+ return b;
+}
+
+
/* Each queue has at least one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
@@ -1452,6 +1515,7 @@ queueWorker(void *arg)
int iMyThrdIndx; /* index for this thread in queue thread table */
int iCancelStateSave;
qWrkThrd_t *pWrkrInst; /* for cleanup handler */
+ int bContinueRun;
ISOBJ_TYPE_assert(pThis, queue);
@@ -1470,7 +1534,8 @@ queueWorker(void *arg)
pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
- if(iMyThrdIndx == 0) { /* are we the DA worker? */
+dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA);
+ if((iMyThrdIndx == 0) && (pThis->qRunsDA != QRUNS_DA)) { /* are we the DA worker? */
if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */
/* if we could not init the DA queue, we have nothing to do, so shut down. */
queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);
@@ -1491,30 +1556,36 @@ queueWorker(void *arg)
/* end one-time stuff */
/* now we have our identity, on to real processing */
- while( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING)
- || (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) {
+ bContinueRun = 1; /* we need this variable, because we need to check the actual termination condition
+ * while protected by mutex */
+ while(bContinueRun) {
+dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
+ queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
/* process any pending thread requests */
queueChkWrkThrdChanges(pThis);
-dbgprintf("Queue %p/w%d: pre empty queue, qsize %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
- while(pThis->iQueueSize == 0 && qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) {
+ if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) {
+ pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ continue; /* and break loop */
+ }
+
+ /* if we reach this point, we are still protected by the mutex */
+
+ if(pThis->iQueueSize == 0) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
+ /* TODO: check if the parent DA worker is running and, if not, initiate it */
if(pThis->bSignalOnEmpty > 0) {
/* we need to signal our parent queue that we are empty */
-dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
+ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
pthread_mutex_lock(pThis->mutSignalOnEmpty);
pthread_cond_signal(pThis->condSignalOnEmpty);
pthread_mutex_unlock(pThis->mutSignalOnEmpty);
-dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
- /* we now need to re-check if we still shall continue to
- * run. This is important because the parent may have changed our
- * state. So we simply go back to the begin of the loop.
- */
- //continue;
+ dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
}
if(pThis->bSignalOnEmpty > 1) {
/* no mutex associated with this condition, it's just a try (but needed
@@ -1524,9 +1595,9 @@ dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
/* If we arrive here, we have the regular case, where we can safely assume that
* iQueueSize and tCmd have not changed since the while().
*/
-dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown);
+ dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown);
if(pThis->toWrkShutdown == -1) {
-dbgprintf("worker never times out!\n");
+ dbgprintf("worker never times out!\n");
/* never shut down any started worker */
pthread_cond_wait(pThis->notEmpty, pThis->mut);
} else {
@@ -1540,9 +1611,15 @@ dbgprintf("worker never times out!\n");
qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN);
}
}
-dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
+ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
+ pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ pthread_testcancel(); /* see big comment below */
+ pthread_yield(); /* see big comment below */
+ continue; /* request next iteration */
}
+ /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave);
/* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
@@ -1564,8 +1641,6 @@ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
* should be well accepted given the above facts. -- rgerhards, 2008-01-10
*/
pthread_yield();
-dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
- queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
" %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
@@ -1574,6 +1649,11 @@ dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
+ /* check if we are the DA worker and, if so, switch back to regular mode */
+ if(pWrkrInst->iThrd == 0) {
+ queueTurnOffDAMode(pThis);
+ }
+
pThis->iCurNumWrkThrd--;
pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
pthread_cleanup_pop(0); /* remove cleanup handler */
@@ -1840,6 +1920,38 @@ rsRetVal queueDestruct(queue_t **ppThis)
ISOBJ_TYPE_assert(pThis, queue);
pThis->bSaveOnShutdown = 1; // TODO: Test remove
+
+ pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
+
+ /* optimize parameters for shutdown of DA-enabled queues */
+ if(pThis->bIsDA) {
+dbgprintf("IsDA queue, modifying params for draining\n");
+ pThis->iHighWtrMrk = 1; /* make sure we drain */
+ pThis->iLowWtrMrk = 0; /* disable low water mark algo */
+ if(pThis->qRunsDA == QRUNS_REGULAR) {
+ if(pThis->iQueueSize > 0) {
+ queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */
+ }
+ } else {
+ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
+ }
+ if(pThis->bSaveOnShutdown) {
+dbgprintf("bSaveOnShutdown set, eternal timeout set\n");
+ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
+ }
+ /* now we need to activate workers (read doc/dev_queue.html) */
+ }
+
+ // TODO: we may need to startup a regular worker if not in DA mode!
+ /* wait until all pending workers are started up */
+ qWrkrWaitAllWrkrStartup(pThis);
+
+ /* terminate our own worker threads */
+ if(pThis->pWrkThrds != NULL) {
+ queueShutdownWorkers(pThis);
+ }
+
+#if 0
/* if running DA, switch the DA queue to enqueue-only mode. That saves us some CPU cycles as
* its workers do no longer need to run. It also prevents longer-running actions to spring into
* existence while we are draining the main (memory) queue. -- rgerhads, 2008-01-16
@@ -1865,13 +1977,15 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
dbgprintf("Queue 0x%lx: in-memory queue contains %d entries after worker shutdown - using DA to save to disk\n",
queueGetID(pThis), pThis->iQueueSize);
pThis->iLowWtrMrk = 0; /* disable low water mark algo */
+ pThis->iHighWtrMrk = 1; /* make sure we drain */
queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */
qWrkrWaitStartup(QUEUE_PTR_DA_WORKER(pThis)); /* wait until DA worker has actually started */
pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
queueShutdownWorkers(pThis); /* and tell it to shut down. The trick is it will run until q is drained */
}
+#endif
- /* if running DA, terminate disk queue */
+ /* if still running DA, terminate disk queue */
if(pThis->qRunsDA != QRUNS_REGULAR)
queueDestruct(&pThis->pqDA);
diff --git a/queue.h b/queue.h
index c7e2db21..20e08147 100644
--- a/queue.h
+++ b/queue.h
@@ -88,6 +88,7 @@ typedef struct queue_s {
int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
+ int bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */