summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c128
1 files changed, 97 insertions, 31 deletions
diff --git a/queue.c b/queue.c
index eb0e5a8a..c7602b26 100644
--- a/queue.c
+++ b/queue.c
@@ -337,15 +337,59 @@ queueDel(queue_t *pThis, void *pUsr)
+/* Worker thread management function carried out each time
+ * the main worker is awoken.
+ */
+static rsRetVal queueManageWorkers(queue_t *pThis)
+{
+ DEFiRet;
+
+ return iRet;
+}
+
+
+/* Worker thread management function carried out when the main
+ * worker is about to terminate.
+ */
+static rsRetVal queueManageWorkersOnShutdown(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+
+ /* ask all other workers to terminate */
+ for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i)
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_SHUTDOWN;
+
+ /* awake those that sleep */
+ pthread_cond_broadcast(pThis->notEmpty);
+
+ /* and wait for their termination */
+ for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) {
+dbgprintf("WrkShutdown joining thread %d\n", i);
+ pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
+ }
+
+ return iRet;
+}
+
/* Each queue has one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
*
- * Please NOTE:
- * Having more than one worker requires considerable
- * additional code review in regard to thread-safety.
-*/
+ * There are two classes of worker threads, all implemented via the function
+ * below. The queue may start multiple workers. The first one carries out normal
+ * processing BUT also manages the other workers (the first one and all other
+ * ones are the two different classes). This is so that the queue can dynamically
+ * start and stop worker threads. So far, this dynamic mode is not yet supported,
+ * but we will need it at least for disk-assisted queue types. There, multiple
+ * workers are supported as long as the queue is running in memory, but only
+ * a single worker is supported if running in disk mode. To start and stop
+ * workers, we need to have one thread that is capable to wait. We could start
+ * up a specific management thread. However, this means additional overhead. So
+ * we have decided to use worker #0, which is always present, to carry out this
+ * management as an additional chore. -- rgerhards, 2008-01-10
+ */
static void *
queueWorker(void *arg)
{
@@ -353,16 +397,32 @@ queueWorker(void *arg)
queue_t *pThis = (queue_t*) arg;
void *pUsr;
sigset_t sigSet;
+ int iMyThrdIndx; /* index for this thread in queue thread table */
+ int bIsAdmin; /* does this thread have admin chores? */
assert(pThis != NULL);
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
- while(pThis->bDoRun || !pThis->iQueueSize == 0) {
+ /* first find myself in the queue's thread table */
+ for(iMyThrdIndx = 0 ; iMyThrdIndx < pThis->iNumWorkerThreads ; ++iMyThrdIndx)
+ if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self())
+ break;
+ assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
+
+ bIsAdmin = (iMyThrdIndx == 0 && pThis->iNumWorkerThreads > 1) ? 1 : 0;
+ dbgprintf("Queue 0x%lx/w%d: worker thread startup (isAdmin=%d).\n", (unsigned long) pThis, iMyThrdIndx, bIsAdmin);
+
+ /* now we have our identity, on to real processing */
+ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || !pThis->iQueueSize == 0) {
+ if(bIsAdmin) /* main worker must do special chores */
+ queueManageWorkers(pThis);
+
pthread_mutex_lock(pThis->mut);
- while (pThis->iQueueSize == 0 && pThis->bDoRun) {
- dbgprintf("queueWorker: queue 0x%lx EMPTY, waiting for next message.\n", (unsigned long) pThis);
+ while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
+ dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
+ (unsigned long) pThis, iMyThrdIndx);
pthread_cond_wait (pThis->notEmpty, pThis->mut);
}
if(pThis->iQueueSize > 0) {
@@ -378,25 +438,30 @@ queueWorker(void *arg)
*/
if(iRet == RS_RET_OK) {
rsRetVal iRetLocal;
- dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
+ (unsigned long) pThis, iMyThrdIndx);
iRetLocal = pThis->pConsumer(pUsr);
if(iRetLocal != RS_RET_OK)
- dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
- (unsigned long) pThis, iRetLocal);
+ dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
+ (unsigned long) pThis, iMyThrdIndx, iRetLocal);
} else {
- dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things "
- "may happen\n", (unsigned long) pThis, iRet);
+ dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
+ "may happen\n", (unsigned long) pThis, iMyThrdIndx, iRet);
}
} else { /* the mutex must be unlocked in any case (important for termination) */
pthread_mutex_unlock(pThis->mut);
}
- if(Debug && !pThis->bDoRun && pThis->iQueueSize > 0)
- dbgprintf("Worker 0x%lx does not yet terminate because it still has messages to process.\n",
- (unsigned long) pThis);
+ if(Debug && !(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) && pThis->iQueueSize > 0)
+ dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
+ "messages to process.\n", (unsigned long) pThis, iMyThrdIndx);
}
- dbgprintf("Worker thread for queue 0x%lx terminates.\n", (unsigned long) pThis);
+ if(bIsAdmin) /* main worker must do special chores */
+ queueManageWorkersOnShutdown(pThis);
+
+ dbgprintf("Queue 0x%lx/w%d: worker thread terminates.\n", (unsigned long) pThis, iMyThrdIndx);
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */
pthread_exit(0);
}
@@ -500,15 +565,16 @@ rsRetVal queueStart(queue_t *pThis)
pThis->iMaxFileSize);
if(pThis->qType != QUEUETYPE_DIRECT) {
- if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL)
+ if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads, sizeof(qWrkThrd_t))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- /* fire up the worker thread */
+ /* fire up the worker threads */
pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- iState = pthread_create(&(pThis->pWorkerThreads[i]), NULL, queueWorker, (void*) pThis);
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN;
+ iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
- (unsigned long) pThis, (unsigned) pThis->pWorkerThreads[i], i, iState);
+ (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
}
}
@@ -520,23 +586,23 @@ finalize_it:
rsRetVal queueDestruct(queue_t *pThis)
{
DEFiRet;
- int i;
assert(pThis != NULL);
- if(pThis->pWorkerThreads != NULL) {
+ if(pThis->pWrkThrds != NULL) {
/* first stop the worker thread */
dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
pThis->bDoRun = 0;
- /* It's actually not "not empty" below but awaking the workers. They
- * then find out that they shall terminate and do so.
+pThis->bImmediateShutdown = 1; /*testing */
+ /* request all threads to terminate */
+ /* We instruct worker 0 to shutdown, which in turn will terminate all other
+ * threads (if any exist) -- rgerhards, 2008-01-10
*/
+ pThis->pWrkThrds[0].tCurrCmd = eWRKTHRDCMD_SHUTDOWN;
+ /* we must broadcast, because we can not specifically awake worker 0 */
pthread_cond_broadcast(pThis->notEmpty);
- /* end then wait for all worker threads to terminate */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- pthread_join(pThis->pWorkerThreads[i], NULL);
- }
- free(pThis->pWorkerThreads);
+ pthread_join(pThis->pWrkThrds[0].thrdID, NULL);
+ free(pThis->pWrkThrds);
dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis);
}
@@ -621,7 +687,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
* rgerhards, 2008-01-08
*/
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- if(pThis->pWorkerThreads != NULL)
+ if(pThis->pWrkThrds != NULL)
pthread_mutex_lock(pThis->mut);
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
@@ -641,7 +707,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
finalize_it:
/* now activate the worker thread */
- if(pThis->pWorkerThreads != NULL) {
+ if(pThis->pWrkThrds != NULL) {
pthread_mutex_unlock(pThis->mut);
i = pthread_cond_signal(pThis->notEmpty);
dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);