diff options
-rw-r--r-- | plugins/immark/immark.c | 1 | ||||
-rw-r--r-- | queue.c | 128 | ||||
-rw-r--r-- | queue.h | 15 | ||||
-rw-r--r-- | stream.c | 5 | ||||
-rw-r--r-- | threads.c | 8 | ||||
-rw-r--r-- | threads.h | 6 |
6 files changed, 111 insertions, 52 deletions
diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c index f21bfd7f..4735b730 100644 --- a/plugins/immark/immark.c +++ b/plugins/immark/immark.c @@ -67,7 +67,6 @@ typedef struct _instanceData { */ BEGINrunInput CODESTARTrunInput - thrdBlockTermination(pThrd); /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. @@ -337,15 +337,59 @@ queueDel(queue_t *pThis, void *pUsr) +/* Worker thread management function carried out each time + * the main worker is awoken. + */ +static rsRetVal queueManageWorkers(queue_t *pThis) +{ + DEFiRet; + + return iRet; +} + + +/* Worker thread management function carried out when the main + * worker is about to terminate. + */ +static rsRetVal queueManageWorkersOnShutdown(queue_t *pThis) +{ + DEFiRet; + int i; + + /* ask all other workers to terminate */ + for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) + pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_SHUTDOWN; + + /* awake those that sleep */ + pthread_cond_broadcast(pThis->notEmpty); + + /* and wait for their termination */ + for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) { +dbgprintf("WrkShutdown joining thread %d\n", i); + pthread_join(pThis->pWrkThrds[i].thrdID, NULL); + } + + return iRet; +} + /* Each queue has one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. * - * Please NOTE: - * Having more than one worker requires considerable - * additional code review in regard to thread-safety. -*/ + * There are two classes of worker threads, all implemented via the function + * below. The queue may start multiple workers. The first one carries out normal + * processing BUT also manages the other workers (the first one and all other + * ones are the two different classes). This is so that the queue can dynamically + * start and stop worker threads. So far, this dynamic mode is not yet supported, + * but we will need it at least for disk-assisted queue types. There, multiple + * workers are supported as long as the queue is running in memory, but only + * a single worker is supported if running in disk mode. To start and stop + * workers, we need to have one thread that is capable to wait. We could start + * up a specific management thread. However, this means additional overhead. So + * we have decided to use worker #0, which is always present, to carry out this + * management as an additional chore. -- rgerhards, 2008-01-10 + */ static void * queueWorker(void *arg) { @@ -353,16 +397,32 @@ queueWorker(void *arg) queue_t *pThis = (queue_t*) arg; void *pUsr; sigset_t sigSet; + int iMyThrdIndx; /* index for this thread in queue thread table */ + int bIsAdmin; /* does this thread have admin chores? */ assert(pThis != NULL); sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); - while(pThis->bDoRun || !pThis->iQueueSize == 0) { + /* first find myself in the queue's thread table */ + for(iMyThrdIndx = 0 ; iMyThrdIndx < pThis->iNumWorkerThreads ; ++iMyThrdIndx) + if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()) + break; + assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); + + bIsAdmin = (iMyThrdIndx == 0 && pThis->iNumWorkerThreads > 1) ? 1 : 0; + dbgprintf("Queue 0x%lx/w%d: worker thread startup (isAdmin=%d).\n", (unsigned long) pThis, iMyThrdIndx, bIsAdmin); + + /* now we have our identity, on to real processing */ + while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || !pThis->iQueueSize == 0) { + if(bIsAdmin) /* main worker must do special chores */ + queueManageWorkers(pThis); + pthread_mutex_lock(pThis->mut); - while (pThis->iQueueSize == 0 && pThis->bDoRun) { - dbgprintf("queueWorker: queue 0x%lx EMPTY, waiting for next message.\n", (unsigned long) pThis); + while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { + dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", + (unsigned long) pThis, iMyThrdIndx); pthread_cond_wait (pThis->notEmpty, pThis->mut); } if(pThis->iQueueSize > 0) { @@ -378,25 +438,30 @@ queueWorker(void *arg) */ if(iRet == RS_RET_OK) { rsRetVal iRetLocal; - dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", + (unsigned long) pThis, iMyThrdIndx); iRetLocal = pThis->pConsumer(pUsr); if(iRetLocal != RS_RET_OK) - dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", - (unsigned long) pThis, iRetLocal); + dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", + (unsigned long) pThis, iMyThrdIndx, iRetLocal); } else { - dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things " - "may happen\n", (unsigned long) pThis, iRet); + dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " + "may happen\n", (unsigned long) pThis, iMyThrdIndx, iRet); } } else { /* the mutex must be unlocked in any case (important for termination) */ pthread_mutex_unlock(pThis->mut); } - if(Debug && !pThis->bDoRun && pThis->iQueueSize > 0) - dbgprintf("Worker 0x%lx does not yet terminate because it still has messages to process.\n", - (unsigned long) pThis); + if(Debug && !(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) && pThis->iQueueSize > 0) + dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " + "messages to process.\n", (unsigned long) pThis, iMyThrdIndx); } - dbgprintf("Worker thread for queue 0x%lx terminates.\n", (unsigned long) pThis); + if(bIsAdmin) /* main worker must do special chores */ + queueManageWorkersOnShutdown(pThis); + + dbgprintf("Queue 0x%lx/w%d: worker thread terminates.\n", (unsigned long) pThis, iMyThrdIndx); + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */ pthread_exit(0); } @@ -500,15 +565,16 @@ rsRetVal queueStart(queue_t *pThis) pThis->iMaxFileSize); if(pThis->qType != QUEUETYPE_DIRECT) { - if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL) + if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads, sizeof(qWrkThrd_t))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - /* fire up the worker thread */ + /* fire up the worker threads */ pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - iState = pthread_create(&(pThis->pWorkerThreads[i]), NULL, queueWorker, (void*) pThis); + pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN; + iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", - (unsigned long) pThis, (unsigned) pThis->pWorkerThreads[i], i, iState); + (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); } } @@ -520,23 +586,23 @@ finalize_it: rsRetVal queueDestruct(queue_t *pThis) { DEFiRet; - int i; assert(pThis != NULL); - if(pThis->pWorkerThreads != NULL) { + if(pThis->pWrkThrds != NULL) { /* first stop the worker thread */ dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); pThis->bDoRun = 0; - /* It's actually not "not empty" below but awaking the workers. They - * then find out that they shall terminate and do so. +pThis->bImmediateShutdown = 1; /*testing */ + /* request all threads to terminate */ + /* We instruct worker 0 to shutdown, which in turn will terminate all other + * threads (if any exist) -- rgerhards, 2008-01-10 */ + pThis->pWrkThrds[0].tCurrCmd = eWRKTHRDCMD_SHUTDOWN; + /* we must broadcast, because we can not specifically awake worker 0 */ pthread_cond_broadcast(pThis->notEmpty); - /* end then wait for all worker threads to terminate */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - pthread_join(pThis->pWorkerThreads[i], NULL); - } - free(pThis->pWorkerThreads); + pthread_join(pThis->pWrkThrds[0].thrdID, NULL); + free(pThis->pWrkThrds); dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis); } @@ -621,7 +687,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) * rgerhards, 2008-01-08 */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - if(pThis->pWorkerThreads != NULL) + if(pThis->pWrkThrds != NULL) pthread_mutex_lock(pThis->mut); while(pThis->iQueueSize >= pThis->iMaxQueueSize) { @@ -641,7 +707,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) finalize_it: /* now activate the worker thread */ - if(pThis->pWorkerThreads != NULL) { + if(pThis->pWrkThrds != NULL) { pthread_mutex_unlock(pThis->mut); i = pthread_cond_signal(pThis->notEmpty); dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); @@ -60,14 +60,27 @@ typedef struct qLinkedList_S { void *pUsr; } qLinkedList_t; +typedef enum { + eWRKTHRDCMD_RUN, + eWRKTHRDCMD_SHUTDOWN, + eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, + eWRKTHRDCMD_TERMINATED /* granted, that's more a state than a cmd - thread is dead... */ +} qWrkCmd_t; /* commands for queue worker threads */ + +typedef struct qWrkThrd_s { + pthread_t thrdID; /* thread ID */ + qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ +} qWrkThrd_t; /* type for queue worker threads */ + /* the queue object */ typedef struct queue_s { queueType_t qType; int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ - pthread_t *pWorkerThreads;/* array with ID of the worker thread(s) associated with this queue */ + qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */ int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */ + int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */ rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -122,7 +122,6 @@ static rsRetVal strmCloseFile(strm_t *pThis) pThis->pszCurrFName = NULL; } -dbgprintf("exit strmCloseFile, fd: %d\n", pThis->fd); return iRet; } @@ -135,7 +134,6 @@ strmNextFile(strm_t *pThis) { DEFiRet; -dbgprintf("strmNextFile, old num %d\n", pThis->iCurrFNum); assert(pThis != NULL); assert(pThis->iMaxFiles != 0); assert(pThis->fd != -1); @@ -582,7 +580,6 @@ rsRetVal strmRecordBegin(strm_t *pThis) assert(pThis != NULL); assert(pThis->bInRecord == 0); pThis->bInRecord = 1; -dbgprintf("strmRecordBegin set \n"); return RS_RET_OK; } @@ -592,10 +589,8 @@ rsRetVal strmRecordEnd(strm_t *pThis) assert(pThis != NULL); assert(pThis->bInRecord == 1); -dbgprintf("strmRecordEnd in %d\n", iRet); pThis->bInRecord = 0; iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */ -dbgprintf("strmRecordEnd out %d\n", iRet); return iRet; } @@ -229,17 +229,9 @@ thrdSleep(thrdInfo_t *pThis, int iSeconds, int iuSeconds) assert(pThis != NULL); tvSelectTimeout.tv_sec = iSeconds; tvSelectTimeout.tv_usec = iuSeconds; /* micro seconds */ - thrdUnblockTermination(pThis); - /* there may be a race condition if pthread_kill() is called after unblock but - * before the select() is setup. TODO: check and re-eval -- rgerhards, 2007-12-20 - */ select(1, NULL, NULL, NULL, &tvSelectTimeout); if(pThis->bShallStop) iRet = RS_RET_TERMINATE_NOW; -#if 0 /* TODO: remove once we know we do not need the thrdBlockTermination() call -- rgerhards, 2007.12.25 */ - else - thrdBlockTermination(pThis); -#endif return iRet; } @@ -43,11 +43,5 @@ rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdI rsRetVal thrdSleep(thrdInfo_t *pThis, int iSeconds, int iuSeconds); /* macros (replace inline functions) */ -/*TODO: remove these macros once we now we can live without -- rgerhards, 2007-12-20 - * #define thrdBlockTermination(pThis) {dbgprintf("lock mutex\n"); pthread_mutex_lock((pThis)->mutTermOK) ;} - * #define thrdUnblockTermination(pThis) {dbgprintf("unlock mutex\n"); pthread_mutex_unlock((pThis)->mutTermOK) ;} - */ -#define thrdBlockTermination(pThis) -#define thrdUnblockTermination(pThis) #endif /* #ifndef THREADS_H_INCLUDED */ |