diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-11 09:53:53 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-11 09:53:53 +0000 |
commit | c9430404dbf38d5c7fdbbb8aebc78fce38c0906c (patch) | |
tree | 5c65306b8a4ec500e7c6ec21f110274d371c98ad /queue.c | |
parent | 68efb41220a834870681f293481655ed47e7b197 (diff) | |
download | rsyslog-c9430404dbf38d5c7fdbbb8aebc78fce38c0906c.tar.gz rsyslog-c9430404dbf38d5c7fdbbb8aebc78fce38c0906c.tar.xz rsyslog-c9430404dbf38d5c7fdbbb8aebc78fce38c0906c.zip |
- begun to permit queue to terminate without being drained
- fixed a starvation condition in queueWorker (pthread_yield() was needed)
could not be seen with any previously released code, came up during new
development
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 93 |
1 files changed, 47 insertions, 46 deletions
@@ -336,39 +336,39 @@ queueDel(queue_t *pThis, void *pUsr) } - -/* Worker thread management function carried out each time - * the main worker is awoken. - */ -static rsRetVal queueManageWorkers(queue_t *pThis) -{ - DEFiRet; - - return iRet; -} - - /* Worker thread management function carried out when the main * worker is about to terminate. */ -static rsRetVal queueManageWorkersOnShutdown(queue_t *pThis) +static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; int i; + qWrkCmd_t tShutdownCmd; - /* ask all other workers to terminate */ - for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) - pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_SHUTDOWN; +pThis->bImmediateShutdown = 1; /*testing */ + assert(pThis != NULL); - /* awake those that sleep */ + /* select shutdown mode */ + tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN; + + dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n", + (unsigned long) pThis, (int) tShutdownCmd); + + /* tell all workers to terminate */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) + pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd; + + /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); /* and wait for their termination */ - for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) { -dbgprintf("WrkShutdown joining thread %d\n", i); + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { pthread_join(pThis->pWrkThrds[i].thrdID, NULL); } + dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", + (unsigned long) pThis, pThis->iQueueSize); + return iRet; } @@ -398,7 +398,6 @@ queueWorker(void *arg) void *pUsr; sigset_t sigSet; int iMyThrdIndx; /* index for this thread in queue thread table */ - int bIsAdmin; /* does this thread have admin chores? */ assert(pThis != NULL); @@ -411,14 +410,12 @@ queueWorker(void *arg) break; assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); - bIsAdmin = (iMyThrdIndx == 0 && pThis->iNumWorkerThreads > 1) ? 1 : 0; - dbgprintf("Queue 0x%lx/w%d: worker thread startup (isAdmin=%d).\n", (unsigned long) pThis, iMyThrdIndx, bIsAdmin); + dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx); /* now we have our identity, on to real processing */ - while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || !pThis->iQueueSize == 0) { - if(bIsAdmin) /* main worker must do special chores */ - queueManageWorkers(pThis); - + while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN + || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) { +dbgprintf("worker %d runs, cmd %d\n", iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); pthread_mutex_lock(pThis->mut); while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", @@ -441,6 +438,8 @@ queueWorker(void *arg) dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", (unsigned long) pThis, iMyThrdIndx); iRetLocal = pThis->pConsumer(pUsr); + dbgprintf("Queue 0x%lx/w%d: worker: consumer returnd %d\n", + (unsigned long) pThis, iMyThrdIndx, iRetLocal); if(iRetLocal != RS_RET_OK) dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", (unsigned long) pThis, iMyThrdIndx, iRetLocal); @@ -451,16 +450,29 @@ queueWorker(void *arg) } else { /* the mutex must be unlocked in any case (important for termination) */ pthread_mutex_unlock(pThis->mut); } + + /* We now yield to give the other threads a chance to obtain the mutex. If we do not + * do that, this thread may very well aquire the mutex again before another thread + * has even a chance to run. The reason is that mutex operations are free to be + * implemented in the quickest possible way (and they typically are!). That is, the + * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily + * schedule other threads waiting on the same mutex. That can lead to the same thread + * aquiring the mutex ever and ever again while all others are starving for it. We + * have exactly seen this behaviour when we deliberately introduced a long-running + * test action which basically did a sleep. I understand that with real actions the + * likelihood of this starvation condition is very low - but it could still happen + * and would be very hard to debug. The yield() is a sure fix, its performance overhead + * should be well accepted given the above facts. -- rgerhards, 2008-01-10 + */ + pthread_yield(); - if(Debug && !(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) && pThis->iQueueSize > 0) + if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " - "messages to process.\n", (unsigned long) pThis, iMyThrdIndx); + " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize); } - if(bIsAdmin) /* main worker must do special chores */ - queueManageWorkersOnShutdown(pThis); - - dbgprintf("Queue 0x%lx/w%d: worker thread terminates.\n", (unsigned long) pThis, iMyThrdIndx); + dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n", + (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize); pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */ pthread_exit(0); } @@ -569,7 +581,6 @@ rsRetVal queueStart(queue_t *pThis) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); /* fire up the worker threads */ - pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN; iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); @@ -589,21 +600,11 @@ rsRetVal queueDestruct(queue_t *pThis) assert(pThis != NULL); + /* first, terminate worker threads */ if(pThis->pWrkThrds != NULL) { - /* first stop the worker thread */ - dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); - pThis->bDoRun = 0; -pThis->bImmediateShutdown = 1; /*testing */ - /* request all threads to terminate */ - /* We instruct worker 0 to shutdown, which in turn will terminate all other - * threads (if any exist) -- rgerhards, 2008-01-10 - */ - pThis->pWrkThrds[0].tCurrCmd = eWRKTHRDCMD_SHUTDOWN; - /* we must broadcast, because we can not specifically awake worker 0 */ - pthread_cond_broadcast(pThis->notEmpty); - pthread_join(pThis->pWrkThrds[0].thrdID, NULL); + queueShutdownWorkers(pThis); free(pThis->pWrkThrds); - dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis); + pThis->pWrkThrds = NULL; } /* ... then free resources */ |