summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/immark/immark.c1
-rw-r--r--queue.c128
-rw-r--r--queue.h15
-rw-r--r--stream.c5
-rw-r--r--threads.c8
-rw-r--r--threads.h6
6 files changed, 111 insertions, 52 deletions
diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c
index f21bfd7f..4735b730 100644
--- a/plugins/immark/immark.c
+++ b/plugins/immark/immark.c
@@ -67,7 +67,6 @@ typedef struct _instanceData {
*/
BEGINrunInput
CODESTARTrunInput
- thrdBlockTermination(pThrd);
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
diff --git a/queue.c b/queue.c
index eb0e5a8a..c7602b26 100644
--- a/queue.c
+++ b/queue.c
@@ -337,15 +337,59 @@ queueDel(queue_t *pThis, void *pUsr)
+/* Worker thread management function carried out each time
+ * the main worker is awoken.
+ */
+static rsRetVal queueManageWorkers(queue_t *pThis)
+{
+ DEFiRet;
+
+ return iRet;
+}
+
+
+/* Worker thread management function carried out when the main
+ * worker is about to terminate.
+ */
+static rsRetVal queueManageWorkersOnShutdown(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+
+ /* ask all other workers to terminate */
+ for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i)
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_SHUTDOWN;
+
+ /* awake those that sleep */
+ pthread_cond_broadcast(pThis->notEmpty);
+
+ /* and wait for their termination */
+ for(i = 1 ; i < pThis->iNumWorkerThreads ; ++i) {
+dbgprintf("WrkShutdown joining thread %d\n", i);
+ pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
+ }
+
+ return iRet;
+}
+
/* Each queue has one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
*
- * Please NOTE:
- * Having more than one worker requires considerable
- * additional code review in regard to thread-safety.
-*/
+ * There are two classes of worker threads, all implemented via the function
+ * below. The queue may start multiple workers. The first one carries out normal
+ * processing BUT also manages the other workers (the first one and all other
+ * ones are the two different classes). This is so that the queue can dynamically
+ * start and stop worker threads. So far, this dynamic mode is not yet supported,
+ * but we will need it at least for disk-assisted queue types. There, multiple
+ * workers are supported as long as the queue is running in memory, but only
+ * a single worker is supported if running in disk mode. To start and stop
+ * workers, we need to have one thread that is capable to wait. We could start
+ * up a specific management thread. However, this means additional overhead. So
+ * we have decided to use worker #0, which is always present, to carry out this
+ * management as an additional chore. -- rgerhards, 2008-01-10
+ */
static void *
queueWorker(void *arg)
{
@@ -353,16 +397,32 @@ queueWorker(void *arg)
queue_t *pThis = (queue_t*) arg;
void *pUsr;
sigset_t sigSet;
+ int iMyThrdIndx; /* index for this thread in queue thread table */
+ int bIsAdmin; /* does this thread have admin chores? */
assert(pThis != NULL);
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
- while(pThis->bDoRun || !pThis->iQueueSize == 0) {
+ /* first find myself in the queue's thread table */
+ for(iMyThrdIndx = 0 ; iMyThrdIndx < pThis->iNumWorkerThreads ; ++iMyThrdIndx)
+ if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self())
+ break;
+ assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
+
+ bIsAdmin = (iMyThrdIndx == 0 && pThis->iNumWorkerThreads > 1) ? 1 : 0;
+ dbgprintf("Queue 0x%lx/w%d: worker thread startup (isAdmin=%d).\n", (unsigned long) pThis, iMyThrdIndx, bIsAdmin);
+
+ /* now we have our identity, on to real processing */
+ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || !pThis->iQueueSize == 0) {
+ if(bIsAdmin) /* main worker must do special chores */
+ queueManageWorkers(pThis);
+
pthread_mutex_lock(pThis->mut);
- while (pThis->iQueueSize == 0 && pThis->bDoRun) {
- dbgprintf("queueWorker: queue 0x%lx EMPTY, waiting for next message.\n", (unsigned long) pThis);
+ while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
+ dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
+ (unsigned long) pThis, iMyThrdIndx);
pthread_cond_wait (pThis->notEmpty, pThis->mut);
}
if(pThis->iQueueSize > 0) {
@@ -378,25 +438,30 @@ queueWorker(void *arg)
*/
if(iRet == RS_RET_OK) {
rsRetVal iRetLocal;
- dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
+ (unsigned long) pThis, iMyThrdIndx);
iRetLocal = pThis->pConsumer(pUsr);
if(iRetLocal != RS_RET_OK)
- dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
- (unsigned long) pThis, iRetLocal);
+ dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
+ (unsigned long) pThis, iMyThrdIndx, iRetLocal);
} else {
- dbgprintf("Queue 0x%lx: error %d dequeueing element - ignoring, but strange things "
- "may happen\n", (unsigned long) pThis, iRet);
+ dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
+ "may happen\n", (unsigned long) pThis, iMyThrdIndx, iRet);
}
} else { /* the mutex must be unlocked in any case (important for termination) */
pthread_mutex_unlock(pThis->mut);
}
- if(Debug && !pThis->bDoRun && pThis->iQueueSize > 0)
- dbgprintf("Worker 0x%lx does not yet terminate because it still has messages to process.\n",
- (unsigned long) pThis);
+ if(Debug && !(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) && pThis->iQueueSize > 0)
+ dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
+ "messages to process.\n", (unsigned long) pThis, iMyThrdIndx);
}
- dbgprintf("Worker thread for queue 0x%lx terminates.\n", (unsigned long) pThis);
+ if(bIsAdmin) /* main worker must do special chores */
+ queueManageWorkersOnShutdown(pThis);
+
+ dbgprintf("Queue 0x%lx/w%d: worker thread terminates.\n", (unsigned long) pThis, iMyThrdIndx);
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */
pthread_exit(0);
}
@@ -500,15 +565,16 @@ rsRetVal queueStart(queue_t *pThis)
pThis->iMaxFileSize);
if(pThis->qType != QUEUETYPE_DIRECT) {
- if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL)
+ if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads, sizeof(qWrkThrd_t))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- /* fire up the worker thread */
+ /* fire up the worker threads */
pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- iState = pthread_create(&(pThis->pWorkerThreads[i]), NULL, queueWorker, (void*) pThis);
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN;
+ iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
- (unsigned long) pThis, (unsigned) pThis->pWorkerThreads[i], i, iState);
+ (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
}
}
@@ -520,23 +586,23 @@ finalize_it:
rsRetVal queueDestruct(queue_t *pThis)
{
DEFiRet;
- int i;
assert(pThis != NULL);
- if(pThis->pWorkerThreads != NULL) {
+ if(pThis->pWrkThrds != NULL) {
/* first stop the worker thread */
dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
pThis->bDoRun = 0;
- /* It's actually not "not empty" below but awaking the workers. They
- * then find out that they shall terminate and do so.
+pThis->bImmediateShutdown = 1; /*testing */
+ /* request all threads to terminate */
+ /* We instruct worker 0 to shutdown, which in turn will terminate all other
+ * threads (if any exist) -- rgerhards, 2008-01-10
*/
+ pThis->pWrkThrds[0].tCurrCmd = eWRKTHRDCMD_SHUTDOWN;
+ /* we must broadcast, because we can not specifically awake worker 0 */
pthread_cond_broadcast(pThis->notEmpty);
- /* end then wait for all worker threads to terminate */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- pthread_join(pThis->pWorkerThreads[i], NULL);
- }
- free(pThis->pWorkerThreads);
+ pthread_join(pThis->pWrkThrds[0].thrdID, NULL);
+ free(pThis->pWrkThrds);
dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis);
}
@@ -621,7 +687,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
* rgerhards, 2008-01-08
*/
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- if(pThis->pWorkerThreads != NULL)
+ if(pThis->pWrkThrds != NULL)
pthread_mutex_lock(pThis->mut);
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
@@ -641,7 +707,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
finalize_it:
/* now activate the worker thread */
- if(pThis->pWorkerThreads != NULL) {
+ if(pThis->pWrkThrds != NULL) {
pthread_mutex_unlock(pThis->mut);
i = pthread_cond_signal(pThis->notEmpty);
dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
diff --git a/queue.h b/queue.h
index df4146f6..b4dd1000 100644
--- a/queue.h
+++ b/queue.h
@@ -60,14 +60,27 @@ typedef struct qLinkedList_S {
void *pUsr;
} qLinkedList_t;
+typedef enum {
+ eWRKTHRDCMD_RUN,
+ eWRKTHRDCMD_SHUTDOWN,
+ eWRKTHRDCMD_SHUTDOWN_IMMEDIATE,
+ eWRKTHRDCMD_TERMINATED /* granted, that's more a state than a cmd - thread is dead... */
+} qWrkCmd_t; /* commands for queue worker threads */
+
+typedef struct qWrkThrd_s {
+ pthread_t thrdID; /* thread ID */
+ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
+} qWrkThrd_t; /* type for queue worker threads */
+
/* the queue object */
typedef struct queue_s {
queueType_t qType;
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
- pthread_t *pWorkerThreads;/* array with ID of the worker thread(s) associated with this queue */
+ qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */
int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */
+ int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
diff --git a/stream.c b/stream.c
index 24c53313..740ca9fb 100644
--- a/stream.c
+++ b/stream.c
@@ -122,7 +122,6 @@ static rsRetVal strmCloseFile(strm_t *pThis)
pThis->pszCurrFName = NULL;
}
-dbgprintf("exit strmCloseFile, fd: %d\n", pThis->fd);
return iRet;
}
@@ -135,7 +134,6 @@ strmNextFile(strm_t *pThis)
{
DEFiRet;
-dbgprintf("strmNextFile, old num %d\n", pThis->iCurrFNum);
assert(pThis != NULL);
assert(pThis->iMaxFiles != 0);
assert(pThis->fd != -1);
@@ -582,7 +580,6 @@ rsRetVal strmRecordBegin(strm_t *pThis)
assert(pThis != NULL);
assert(pThis->bInRecord == 0);
pThis->bInRecord = 1;
-dbgprintf("strmRecordBegin set \n");
return RS_RET_OK;
}
@@ -592,10 +589,8 @@ rsRetVal strmRecordEnd(strm_t *pThis)
assert(pThis != NULL);
assert(pThis->bInRecord == 1);
-dbgprintf("strmRecordEnd in %d\n", iRet);
pThis->bInRecord = 0;
iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */
-dbgprintf("strmRecordEnd out %d\n", iRet);
return iRet;
}
diff --git a/threads.c b/threads.c
index 2166f4bb..8ce5515d 100644
--- a/threads.c
+++ b/threads.c
@@ -229,17 +229,9 @@ thrdSleep(thrdInfo_t *pThis, int iSeconds, int iuSeconds)
assert(pThis != NULL);
tvSelectTimeout.tv_sec = iSeconds;
tvSelectTimeout.tv_usec = iuSeconds; /* micro seconds */
- thrdUnblockTermination(pThis);
- /* there may be a race condition if pthread_kill() is called after unblock but
- * before the select() is setup. TODO: check and re-eval -- rgerhards, 2007-12-20
- */
select(1, NULL, NULL, NULL, &tvSelectTimeout);
if(pThis->bShallStop)
iRet = RS_RET_TERMINATE_NOW;
-#if 0 /* TODO: remove once we know we do not need the thrdBlockTermination() call -- rgerhards, 2007.12.25 */
- else
- thrdBlockTermination(pThis);
-#endif
return iRet;
}
diff --git a/threads.h b/threads.h
index dc937def..aa6a5c28 100644
--- a/threads.h
+++ b/threads.h
@@ -43,11 +43,5 @@ rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdI
rsRetVal thrdSleep(thrdInfo_t *pThis, int iSeconds, int iuSeconds);
/* macros (replace inline functions) */
-/*TODO: remove these macros once we now we can live without -- rgerhards, 2007-12-20
- * #define thrdBlockTermination(pThis) {dbgprintf("lock mutex\n"); pthread_mutex_lock((pThis)->mutTermOK) ;}
- * #define thrdUnblockTermination(pThis) {dbgprintf("unlock mutex\n"); pthread_mutex_unlock((pThis)->mutTermOK) ;}
- */
-#define thrdBlockTermination(pThis)
-#define thrdUnblockTermination(pThis)
#endif /* #ifndef THREADS_H_INCLUDED */