summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-11 09:53:53 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-11 09:53:53 +0000
commitc9430404dbf38d5c7fdbbb8aebc78fce38c0906c (patch)
tree5c65306b8a4ec500e7c6ec21f110274d371c98ad /queue.c
parent68efb41220a834870681f293481655ed47e7b197 (diff)
downloadrsyslog-c9430404dbf38d5c7fdbbb8aebc78fce38c0906c.tar.gz
rsyslog-c9430404dbf38d5c7fdbbb8aebc78fce38c0906c.tar.xz
rsyslog-c9430404dbf38d5c7fdbbb8aebc78fce38c0906c.zip
- begun to permit queue to terminate without being drained
- fixed a starvation condition in queueWorker (pthread_yield() was needed) could not be seen with any previously released code, came up during new development
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c93
1 files changed, 47 insertions, 46 deletions
diff --git a/queue.c b/queue.c
index c7602b26..bd8f9223 100644
--- a/queue.c
+++ b/queue.c
@@ -336,39 +336,39 @@ queueDel(queue_t *pThis, void *pUsr)
}
-
-/* Worker thread management function carried out each time
- * the main worker is awoken.
- */
-static rsRetVal queueManageWorkers(queue_t *pThis)
-{
- DEFiRet;
-
- return iRet;
-}
-
-
/* Worker thread management function carried out when the main
* worker is about to terminate.
*/
-static rsRetVal queueManageWorkersOnShutdown(queue_t *pThis)
+static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
int i;
+ qWrkCmd_t tShutdownCmd;
- /* ask all other workers to terminate */
- for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i)
- pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_SHUTDOWN;
+pThis->bImmediateShutdown = 1; /*testing */
+ assert(pThis != NULL);
- /* awake those that sleep */
+ /* select shutdown mode */
+ tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN;
+
+ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n",
+ (unsigned long) pThis, (int) tShutdownCmd);
+
+ /* tell all workers to terminate */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+
+ /* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
/* and wait for their termination */
- for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) {
-dbgprintf("WrkShutdown joining thread %d\n", i);
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
}
+ dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
+ (unsigned long) pThis, pThis->iQueueSize);
+
return iRet;
}
@@ -398,7 +398,6 @@ queueWorker(void *arg)
void *pUsr;
sigset_t sigSet;
int iMyThrdIndx; /* index for this thread in queue thread table */
- int bIsAdmin; /* does this thread have admin chores? */
assert(pThis != NULL);
@@ -411,14 +410,12 @@ queueWorker(void *arg)
break;
assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
- bIsAdmin = (iMyThrdIndx == 0 && pThis->iNumWorkerThreads > 1) ? 1 : 0;
- dbgprintf("Queue 0x%lx/w%d: worker thread startup (isAdmin=%d).\n", (unsigned long) pThis, iMyThrdIndx, bIsAdmin);
+ dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx);
/* now we have our identity, on to real processing */
- while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || !pThis->iQueueSize == 0) {
- if(bIsAdmin) /* main worker must do special chores */
- queueManageWorkers(pThis);
-
+ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
+ || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
+dbgprintf("worker %d runs, cmd %d\n", iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
pthread_mutex_lock(pThis->mut);
while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
@@ -441,6 +438,8 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
(unsigned long) pThis, iMyThrdIndx);
iRetLocal = pThis->pConsumer(pUsr);
+ dbgprintf("Queue 0x%lx/w%d: worker: consumer returnd %d\n",
+ (unsigned long) pThis, iMyThrdIndx, iRetLocal);
if(iRetLocal != RS_RET_OK)
dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
(unsigned long) pThis, iMyThrdIndx, iRetLocal);
@@ -451,16 +450,29 @@ queueWorker(void *arg)
} else { /* the mutex must be unlocked in any case (important for termination) */
pthread_mutex_unlock(pThis->mut);
}
+
+ /* We now yield to give the other threads a chance to obtain the mutex. If we do not
+ * do that, this thread may very well aquire the mutex again before another thread
+ * has even a chance to run. The reason is that mutex operations are free to be
+ * implemented in the quickest possible way (and they typically are!). That is, the
+ * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
+ * schedule other threads waiting on the same mutex. That can lead to the same thread
+ * aquiring the mutex ever and ever again while all others are starving for it. We
+ * have exactly seen this behaviour when we deliberately introduced a long-running
+ * test action which basically did a sleep. I understand that with real actions the
+ * likelihood of this starvation condition is very low - but it could still happen
+ * and would be very hard to debug. The yield() is a sure fix, its performance overhead
+ * should be well accepted given the above facts. -- rgerhards, 2008-01-10
+ */
+ pthread_yield();
- if(Debug && !(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) && pThis->iQueueSize > 0)
+ if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
- "messages to process.\n", (unsigned long) pThis, iMyThrdIndx);
+ " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
}
- if(bIsAdmin) /* main worker must do special chores */
- queueManageWorkersOnShutdown(pThis);
-
- dbgprintf("Queue 0x%lx/w%d: worker thread terminates.\n", (unsigned long) pThis, iMyThrdIndx);
+ dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n",
+ (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */
pthread_exit(0);
}
@@ -569,7 +581,6 @@ rsRetVal queueStart(queue_t *pThis)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
/* fire up the worker threads */
- pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN;
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
@@ -589,21 +600,11 @@ rsRetVal queueDestruct(queue_t *pThis)
assert(pThis != NULL);
+ /* first, terminate worker threads */
if(pThis->pWrkThrds != NULL) {
- /* first stop the worker thread */
- dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
- pThis->bDoRun = 0;
-pThis->bImmediateShutdown = 1; /*testing */
- /* request all threads to terminate */
- /* We instruct worker 0 to shutdown, which in turn will terminate all other
- * threads (if any exist) -- rgerhards, 2008-01-10
- */
- pThis->pWrkThrds[0].tCurrCmd = eWRKTHRDCMD_SHUTDOWN;
- /* we must broadcast, because we can not specifically awake worker 0 */
- pthread_cond_broadcast(pThis->notEmpty);
- pthread_join(pThis->pWrkThrds[0].thrdID, NULL);
+ queueShutdownWorkers(pThis);
free(pThis->pWrkThrds);
- dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis);
+ pThis->pWrkThrds = NULL;
}
/* ... then free resources */