From 8951958ff6d8241df9ffe048e2c0d0766a9d383b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 3 Jan 2008 13:28:45 +0000 Subject: queue is now a full object and handles threading by itself --- queue.c | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 98 insertions(+), 26 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 2600871a..c5aff7c5 100644 --- a/queue.c +++ b/queue.c @@ -28,9 +28,11 @@ #include #include #include +#include #include #include "rsyslog.h" +#include "syslogd.h" #include "queue.h" /* static data */ @@ -109,14 +111,62 @@ rsRetVal qDelFixedArray(queue_t *pThis, void **out) } /* --------------- end type-specific handlers -------------------- */ +/* Each queue has one associated worker (consumer) thread. It will pull + * the message from the queue and pass it to a user-defined function. + * This function was provided on construction. It MUST be thread-safe. + * + * Please NOTE: + * Having more than one worker requires considerable + * additional code review in regard to thread-safety. + */ +static void * +queueWorker(void *arg) +{ + queue_t *pThis = (queue_t*) arg; + void *pUsr; + sigset_t sigSet; + + assert(pThis != NULL); + + sigfillset(&sigSet); + pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + + while(pThis->bDoRun || !pThis->empty) { + pthread_mutex_lock(pThis->mut); + while (pThis->empty && pThis->bDoRun) { + dbgprintf("queueWorker: queue 0x%lx EMPTY, waiting for next message.\n", (unsigned long) pThis); + pthread_cond_wait (pThis->notEmpty, pThis->mut); + } + if(!pThis->empty) { + /* dequeue element (still protected from mutex) */ + pThis->qDel(pThis, &pUsr); + pthread_mutex_unlock(pThis->mut); + pthread_cond_signal (pThis->notFull); + /* do actual processing (the lengthy part, runs in parallel) */ + dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis); + pThis->pConsumer(pUsr); + } else { /* the mutex must be unlocked in any case (important for termination) */ + pthread_mutex_unlock(pThis->mut); + } + + if(Debug && !pThis->bDoRun && !pThis->empty) + dbgprintf("Worker 0x%lx does not yet terminate because it still has messages to process.\n", + (unsigned long) pThis); + } + + dbgprintf("Worker thread for queue 0x%lx terminates.\n", (unsigned long) pThis); + pthread_exit(0); +} /* Constructor for the queue object */ -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize) +rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*)) { DEFiRet; queue_t *pThis; + int i; assert(ppThis != NULL); + assert(pConsumer != NULL); dbgprintf("queueConstruct in \n"); if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) { @@ -125,6 +175,7 @@ dbgprintf("queueConstruct in \n"); /* we have an object, so let's fill the properties */ pThis->iMaxQueueSize = iMaxQueueSize; + pThis->pConsumer = pConsumer; pThis->empty = 1; pThis->full = 0; pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); @@ -147,6 +198,11 @@ dbgprintf("queueConstruct in \n"); /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); + + /* now fire up the worker thread */ + pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ + i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis); + dbgprintf("Worker thread for queue 0x%lx started with state %d.\n", (unsigned long) pThis, i); finalize_it: if(iRet == RS_RET_OK) { @@ -166,6 +222,18 @@ rsRetVal queueDestruct(queue_t *pThis) DEFiRet; assert(pThis != NULL); + + /* first stop the worker thread */ + dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); + pThis->bDoRun = 0; + /* It's actually not "not empty" below but awaking the worker. The worker + * then finds out that it shall terminate and does so. + */ + pthread_cond_signal(pThis->notEmpty); + pthread_join(pThis->thrdWorker, NULL); + dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis); + + /* ... then free resources */ pthread_mutex_destroy (pThis->mut); free (pThis->mut); pthread_cond_destroy (pThis->notFull); @@ -179,41 +247,45 @@ rsRetVal queueDestruct(queue_t *pThis) } -/* In queueAdd() and queueDel() we have a potential race condition. If a message - * is dequeued and at the same time a message is enqueued and the queue is either - * full or empty, the full (or empty) indicator may be invalidly updated. HOWEVER, - * this does not cause any real problems. No queue pointers can be wrong. And even - * if one of the flags is set invalidly, that does not pose a real problem. If - * "full" is invalidly set, at mose one message might be lost, if we are already in - * a timeout situation (this is quite acceptable). And if "empty" is accidently set, - * the receiver will not continue the inner loop, but break out of the outer. So no - * harm is done at all. For this reason, I do not yet use a mutex to guard the two - * flags - there would be a notable performance hit with, IMHO, no gain in stability - * or functionality. But anyhow, now it's documented... - * rgerhards, 2007-09-20 - * NOTE: this comment does not really apply - the callers handle the mutex, so it - * *is* guarded. +/* enqueue a new user data element + * Enqueues the new element and awakes worker thread. + * TODO: this code still uses the "discard if queue full" approach from + * the main queue. This needs to be reconsidered or, better, done via a + * caller-selectable parameter mode. For the time being, I leave it in. + * rgerhards, 2008-01-03 */ -rsRetVal queueAdd(queue_t *pThis, void* in) +rsRetVal +queueEnqObj(queue_t *pThis, void *pUsr) { DEFiRet; + int i; + struct timespec t; assert(pThis != NULL); - CHKiRet(pThis->qAdd(pThis, in)); -finalize_it: - return iRet; -} -rsRetVal queueDel(queue_t *pThis, void **out) -{ - DEFiRet; + pthread_mutex_lock(pThis->mut); + + while(pThis->full) { + dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis); + + clock_gettime (CLOCK_REALTIME, &t); + t.tv_sec += 2; /* TODO: configurable! */ + + if(pthread_cond_timedwait (pThis->notFull, + pThis->mut, &t) != 0) { + dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } + } + CHKiRet(pThis->qAdd(pThis, pUsr)); - assert(pThis != NULL); - CHKiRet(pThis->qDel(pThis, out)); finalize_it: + /* now activate the worker thread */ + pthread_mutex_unlock(pThis->mut); + i = pthread_cond_signal(pThis->notEmpty); + dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); return iRet; } - /* * vi:set ai: */ -- cgit