diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-10 17:33:21 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-10 17:33:21 +0000 |
commit | 68efb41220a834870681f293481655ed47e7b197 (patch) | |
tree | 7d86bb0fa5c08a02079f406267dff0bf49795a80 /queue.c | |
parent | 41f386f6abeff1577812a58ccd1d416a5389a85b (diff) | |
download | rsyslog-68efb41220a834870681f293481655ed47e7b197.tar.gz rsyslog-68efb41220a834870681f293481655ed47e7b197.tar.xz rsyslog-68efb41220a834870681f293481655ed47e7b197.zip |
- some cleanup
- implemented management function for worker thread 0 in order to change
queue workers dynamically -- stage work
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 128 |
1 files changed, 97 insertions, 31 deletions
@@ -337,15 +337,59 @@ queueDel(queue_t *pThis, void *pUsr) +/* Worker thread management function carried out each time + * the main worker is awoken. + */ +static rsRetVal queueManageWorkers(queue_t *pThis) +{ + DEFiRet; + + return iRet; +} + + +/* Worker thread management function carried out when the main + * worker is about to terminate. + */ +static rsRetVal queueManageWorkersOnShutdown(queue_t *pThis) +{ + DEFiRet; + int i; + + /* ask all other workers to terminate */ + for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) + pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_SHUTDOWN; + + /* awake those that sleep */ + pthread_cond_broadcast(pThis->notEmpty); + + /* and wait for their termination */ + for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) { +dbgprintf("WrkShutdown joining thread %d\n", i); + pthread_join(pThis->pWrkThrds[i].thrdID, NULL); + } + + return iRet; +} + /* Each queue has one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. * - * Please NOTE: - * Having more than one worker requires considerable - * additional code review in regard to thread-safety. -*/ + * There are two classes of worker threads, all implemented via the function + * below. The queue may start multiple workers. The first one carries out normal + * processing BUT also manages the other workers (the first one and all other + * ones are the two different classes). This is so that the queue can dynamically + * start and stop worker threads. So far, this dynamic mode is not yet supported, + * but we will need it at least for disk-assisted queue types. There, multiple + * workers are supported as long as the queue is running in memory, but only + * a single worker is supported if running in disk mode. To start and stop + * workers, we need to have one thread that is capable to wait. We could start + * up a specific management thread. However, this means additional overhead. So + * we have decided to use worker #0, which is always present, to carry out this + * management as an additional chore. -- rgerhards, 2008-01-10 + */ static void * queueWorker(void *arg) { @@ -353,16 +397,32 @@ queueWorker(void *arg) queue_t *pThis = (queue_t*) arg; void *pUsr; sigset_t sigSet; + int iMyThrdIndx; /* index for this thread in queue thread table */ + int bIsAdmin; /* does this thread have admin chores? */ assert(pThis != NULL); sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); - while(pThis->bDoRun || !pThis->iQueueSize == 0) { + /* first find myself in the queue's thread table */ + for(iMyThrdIndx = 0 ; iMyThrdIndx < pThis->iNumWorkerThreads ; ++iMyThrdIndx) + if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()) + break; + assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); + + bIsAdmin = (iMyThrdIndx == 0 && pThis->iNumWorkerThreads > 1) ? 1 : 0; + dbgprintf("Queue 0x%lx/w%d: worker thread startup (isAdmin=%d).\n", (unsigned long) pThis, iMyThrdIndx, bIsAdmin); + + /* now we have our identity, on to real processing */ + while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || !pThis->iQueueSize == 0) { + if(bIsAdmin) /* main worker must do special chores */ + queueManageWorkers(pThis); + pthread_mutex_lock(pThis->mut); - while (pThis->iQueueSize == 0 && pThis->bDoRun) { - dbgprintf("queueWorker: queue 0x%lx EMPTY, waiting for next message.\n", (unsigned long) pThis); + while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { + dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", + (unsigned long) pThis, iMyThrdIndx); pthread_cond_wait (pThis->notEmpty, pThis->mut); } if(pThis->iQueueSize > 0) { @@ -378,25 +438,30 @@ queueWorker(void *arg) */ if(iRet == RS_RET_OK) { rsRetVal iRetLocal; - dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", + (unsigned long) pThis, iMyThrdIndx); iRetLocal = pThis->pConsumer(pUsr); if(iRetLocal != RS_RET_OK) - dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", - (unsigned long) pThis, iRetLocal); + dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", + (unsigned long) pThis, iMyThrdIndx, iRetLocal); } else { - dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things " - "may happen\n", (unsigned long) pThis, iRet); + dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " + "may happen\n", (unsigned long) pThis, iMyThrdIndx, iRet); } } else { /* the mutex must be unlocked in any case (important for termination) */ pthread_mutex_unlock(pThis->mut); } - if(Debug && !pThis->bDoRun && pThis->iQueueSize > 0) - dbgprintf("Worker 0x%lx does not yet terminate because it still has messages to process.\n", - (unsigned long) pThis); + if(Debug && !(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) && pThis->iQueueSize > 0) + dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " + "messages to process.\n", (unsigned long) pThis, iMyThrdIndx); } - dbgprintf("Worker thread for queue 0x%lx terminates.\n", (unsigned long) pThis); + if(bIsAdmin) /* main worker must do special chores */ + queueManageWorkersOnShutdown(pThis); + + dbgprintf("Queue 0x%lx/w%d: worker thread terminates.\n", (unsigned long) pThis, iMyThrdIndx); + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */ pthread_exit(0); } @@ -500,15 +565,16 @@ rsRetVal queueStart(queue_t *pThis) pThis->iMaxFileSize); if(pThis->qType != QUEUETYPE_DIRECT) { - if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL) + if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads, sizeof(qWrkThrd_t))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - /* fire up the worker thread */ + /* fire up the worker threads */ pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - iState = pthread_create(&(pThis->pWorkerThreads[i]), NULL, queueWorker, (void*) pThis); + pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN; + iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", - (unsigned long) pThis, (unsigned) pThis->pWorkerThreads[i], i, iState); + (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); } } @@ -520,23 +586,23 @@ finalize_it: rsRetVal queueDestruct(queue_t *pThis) { DEFiRet; - int i; assert(pThis != NULL); - if(pThis->pWorkerThreads != NULL) { + if(pThis->pWrkThrds != NULL) { /* first stop the worker thread */ dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); pThis->bDoRun = 0; - /* It's actually not "not empty" below but awaking the workers. They - * then find out that they shall terminate and do so. +pThis->bImmediateShutdown = 1; /*testing */ + /* request all threads to terminate */ + /* We instruct worker 0 to shutdown, which in turn will terminate all other + * threads (if any exist) -- rgerhards, 2008-01-10 */ + pThis->pWrkThrds[0].tCurrCmd = eWRKTHRDCMD_SHUTDOWN; + /* we must broadcast, because we can not specifically awake worker 0 */ pthread_cond_broadcast(pThis->notEmpty); - /* end then wait for all worker threads to terminate */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - pthread_join(pThis->pWorkerThreads[i], NULL); - } - free(pThis->pWorkerThreads); + pthread_join(pThis->pWrkThrds[0].thrdID, NULL); + free(pThis->pWrkThrds); dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis); } @@ -621,7 +687,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) * rgerhards, 2008-01-08 */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - if(pThis->pWorkerThreads != NULL) + if(pThis->pWrkThrds != NULL) pthread_mutex_lock(pThis->mut); while(pThis->iQueueSize >= pThis->iMaxQueueSize) { @@ -641,7 +707,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) finalize_it: /* now activate the worker thread */ - if(pThis->pWorkerThreads != NULL) { + if(pThis->pWrkThrds != NULL) { pthread_mutex_unlock(pThis->mut); i = pthread_cond_signal(pThis->notEmpty); dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); |