summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-03 13:28:45 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-03 13:28:45 +0000
commit8951958ff6d8241df9ffe048e2c0d0766a9d383b (patch)
treef667f9ec334d959d8afbf068553e7a1cf1bdcdde /queue.c
parent9e67ae041d964748755e5c9c45ebe55ff612391e (diff)
downloadrsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.tar.gz
rsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.tar.xz
rsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.zip
queue is now a full object and handles threading by itself
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c124
1 files changed, 98 insertions, 26 deletions
diff --git a/queue.c b/queue.c
index 2600871a..c5aff7c5 100644
--- a/queue.c
+++ b/queue.c
@@ -28,9 +28,11 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
+#include <signal.h>
#include <pthread.h>
#include "rsyslog.h"
+#include "syslogd.h"
#include "queue.h"
/* static data */
@@ -109,14 +111,62 @@ rsRetVal qDelFixedArray(queue_t *pThis, void **out)
}
/* --------------- end type-specific handlers -------------------- */
+/* Each queue has one associated worker (consumer) thread. It will pull
+ * the message from the queue and pass it to a user-defined function.
+ * This function was provided on construction. It MUST be thread-safe.
+ *
+ * Please NOTE:
+ * Having more than one worker requires considerable
+ * additional code review in regard to thread-safety.
+ */
+static void *
+queueWorker(void *arg)
+{
+ queue_t *pThis = (queue_t*) arg;
+ void *pUsr;
+ sigset_t sigSet;
+
+ assert(pThis != NULL);
+
+ sigfillset(&sigSet);
+ pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+
+ while(pThis->bDoRun || !pThis->empty) {
+ pthread_mutex_lock(pThis->mut);
+ while (pThis->empty && pThis->bDoRun) {
+ dbgprintf("queueWorker: queue 0x%lx EMPTY, waiting for next message.\n", (unsigned long) pThis);
+ pthread_cond_wait (pThis->notEmpty, pThis->mut);
+ }
+ if(!pThis->empty) {
+ /* dequeue element (still protected from mutex) */
+ pThis->qDel(pThis, &pUsr);
+ pthread_mutex_unlock(pThis->mut);
+ pthread_cond_signal (pThis->notFull);
+ /* do actual processing (the lengthy part, runs in parallel) */
+ dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
+ pThis->pConsumer(pUsr);
+ } else { /* the mutex must be unlocked in any case (important for termination) */
+ pthread_mutex_unlock(pThis->mut);
+ }
+
+ if(Debug && !pThis->bDoRun && !pThis->empty)
+ dbgprintf("Worker 0x%lx does not yet terminate because it still has messages to process.\n",
+ (unsigned long) pThis);
+ }
+
+ dbgprintf("Worker thread for queue 0x%lx terminates.\n", (unsigned long) pThis);
+ pthread_exit(0);
+}
/* Constructor for the queue object */
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize)
+rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*))
{
DEFiRet;
queue_t *pThis;
+ int i;
assert(ppThis != NULL);
+ assert(pConsumer != NULL);
dbgprintf("queueConstruct in \n");
if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) {
@@ -125,6 +175,7 @@ dbgprintf("queueConstruct in \n");
/* we have an object, so let's fill the properties */
pThis->iMaxQueueSize = iMaxQueueSize;
+ pThis->pConsumer = pConsumer;
pThis->empty = 1;
pThis->full = 0;
pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
@@ -147,6 +198,11 @@ dbgprintf("queueConstruct in \n");
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis));
+
+ /* now fire up the worker thread */
+ pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
+ i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis);
+ dbgprintf("Worker thread for queue 0x%lx started with state %d.\n", (unsigned long) pThis, i);
finalize_it:
if(iRet == RS_RET_OK) {
@@ -166,6 +222,18 @@ rsRetVal queueDestruct(queue_t *pThis)
DEFiRet;
assert(pThis != NULL);
+
+ /* first stop the worker thread */
+ dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
+ pThis->bDoRun = 0;
+ /* It's actually not "not empty" below but awaking the worker. The worker
+ * then finds out that it shall terminate and does so.
+ */
+ pthread_cond_signal(pThis->notEmpty);
+ pthread_join(pThis->thrdWorker, NULL);
+ dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis);
+
+ /* ... then free resources */
pthread_mutex_destroy (pThis->mut);
free (pThis->mut);
pthread_cond_destroy (pThis->notFull);
@@ -179,41 +247,45 @@ rsRetVal queueDestruct(queue_t *pThis)
}
-/* In queueAdd() and queueDel() we have a potential race condition. If a message
- * is dequeued and at the same time a message is enqueued and the queue is either
- * full or empty, the full (or empty) indicator may be invalidly updated. HOWEVER,
- * this does not cause any real problems. No queue pointers can be wrong. And even
- * if one of the flags is set invalidly, that does not pose a real problem. If
- * "full" is invalidly set, at mose one message might be lost, if we are already in
- * a timeout situation (this is quite acceptable). And if "empty" is accidently set,
- * the receiver will not continue the inner loop, but break out of the outer. So no
- * harm is done at all. For this reason, I do not yet use a mutex to guard the two
- * flags - there would be a notable performance hit with, IMHO, no gain in stability
- * or functionality. But anyhow, now it's documented...
- * rgerhards, 2007-09-20
- * NOTE: this comment does not really apply - the callers handle the mutex, so it
- * *is* guarded.
+/* enqueue a new user data element
+ * Enqueues the new element and awakes worker thread.
+ * TODO: this code still uses the "discard if queue full" approach from
+ * the main queue. This needs to be reconsidered or, better, done via a
+ * caller-selectable parameter mode. For the time being, I leave it in.
+ * rgerhards, 2008-01-03
*/
-rsRetVal queueAdd(queue_t *pThis, void* in)
+rsRetVal
+queueEnqObj(queue_t *pThis, void *pUsr)
{
DEFiRet;
+ int i;
+ struct timespec t;
assert(pThis != NULL);
- CHKiRet(pThis->qAdd(pThis, in));
-finalize_it:
- return iRet;
-}
-rsRetVal queueDel(queue_t *pThis, void **out)
-{
- DEFiRet;
+ pthread_mutex_lock(pThis->mut);
+
+ while(pThis->full) {
+ dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis);
+
+ clock_gettime (CLOCK_REALTIME, &t);
+ t.tv_sec += 2; /* TODO: configurable! */
+
+ if(pthread_cond_timedwait (pThis->notFull,
+ pThis->mut, &t) != 0) {
+ dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis);
+ ABORT_FINALIZE(RS_RET_QUEUE_FULL);
+ }
+ }
+ CHKiRet(pThis->qAdd(pThis, pUsr));
- assert(pThis != NULL);
- CHKiRet(pThis->qDel(pThis, out));
finalize_it:
+ /* now activate the worker thread */
+ pthread_mutex_unlock(pThis->mut);
+ i = pthread_cond_signal(pThis->notEmpty);
+ dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
return iRet;
}
-
/*
* vi:set ai:
*/