summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--queue.c124
-rw-r--r--queue.h9
-rw-r--r--rsyslog.h1
-rw-r--r--syslogd.c230
4 files changed, 146 insertions, 218 deletions
diff --git a/queue.c b/queue.c
index 2600871a..c5aff7c5 100644
--- a/queue.c
+++ b/queue.c
@@ -28,9 +28,11 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
+#include <signal.h>
#include <pthread.h>
#include "rsyslog.h"
+#include "syslogd.h"
#include "queue.h"
/* static data */
@@ -109,14 +111,62 @@ rsRetVal qDelFixedArray(queue_t *pThis, void **out)
}
/* --------------- end type-specific handlers -------------------- */
+/* Each queue has one associated worker (consumer) thread. It will pull
+ * the message from the queue and pass it to a user-defined function.
+ * This function was provided on construction. It MUST be thread-safe.
+ *
+ * Please NOTE:
+ * Having more than one worker requires considerable
+ * additional code review in regard to thread-safety.
+ */
+static void *
+queueWorker(void *arg)
+{
+ queue_t *pThis = (queue_t*) arg;
+ void *pUsr;
+ sigset_t sigSet;
+
+ assert(pThis != NULL);
+
+ sigfillset(&sigSet);
+ pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+
+ while(pThis->bDoRun || !pThis->empty) {
+ pthread_mutex_lock(pThis->mut);
+ while (pThis->empty && pThis->bDoRun) {
+ dbgprintf("queueWorker: queue 0x%lx EMPTY, waiting for next message.\n", (unsigned long) pThis);
+ pthread_cond_wait (pThis->notEmpty, pThis->mut);
+ }
+ if(!pThis->empty) {
+ /* dequeue element (still protected from mutex) */
+ pThis->qDel(pThis, &pUsr);
+ pthread_mutex_unlock(pThis->mut);
+ pthread_cond_signal (pThis->notFull);
+ /* do actual processing (the lengthy part, runs in parallel) */
+ dbgprintf("Worker for queue 0x%lx is running...\n", (unsigned long) pThis);
+ pThis->pConsumer(pUsr);
+ } else { /* the mutex must be unlocked in any case (important for termination) */
+ pthread_mutex_unlock(pThis->mut);
+ }
+
+ if(Debug && !pThis->bDoRun && !pThis->empty)
+ dbgprintf("Worker 0x%lx does not yet terminate because it still has messages to process.\n",
+ (unsigned long) pThis);
+ }
+
+ dbgprintf("Worker thread for queue 0x%lx terminates.\n", (unsigned long) pThis);
+ pthread_exit(0);
+}
/* Constructor for the queue object */
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize)
+rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*))
{
DEFiRet;
queue_t *pThis;
+ int i;
assert(ppThis != NULL);
+ assert(pConsumer != NULL);
dbgprintf("queueConstruct in \n");
if((pThis = (queue_t *)malloc(sizeof(queue_t))) == NULL) {
@@ -125,6 +175,7 @@ dbgprintf("queueConstruct in \n");
/* we have an object, so let's fill the properties */
pThis->iMaxQueueSize = iMaxQueueSize;
+ pThis->pConsumer = pConsumer;
pThis->empty = 1;
pThis->full = 0;
pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
@@ -147,6 +198,11 @@ dbgprintf("queueConstruct in \n");
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis));
+
+ /* now fire up the worker thread */
+ pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
+ i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis);
+ dbgprintf("Worker thread for queue 0x%lx started with state %d.\n", (unsigned long) pThis, i);
finalize_it:
if(iRet == RS_RET_OK) {
@@ -166,6 +222,18 @@ rsRetVal queueDestruct(queue_t *pThis)
DEFiRet;
assert(pThis != NULL);
+
+ /* first stop the worker thread */
+ dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
+ pThis->bDoRun = 0;
+ /* It's actually not "not empty" below but awaking the worker. The worker
+ * then finds out that it shall terminate and does so.
+ */
+ pthread_cond_signal(pThis->notEmpty);
+ pthread_join(pThis->thrdWorker, NULL);
+ dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis);
+
+ /* ... then free resources */
pthread_mutex_destroy (pThis->mut);
free (pThis->mut);
pthread_cond_destroy (pThis->notFull);
@@ -179,41 +247,45 @@ rsRetVal queueDestruct(queue_t *pThis)
}
-/* In queueAdd() and queueDel() we have a potential race condition. If a message
- * is dequeued and at the same time a message is enqueued and the queue is either
- * full or empty, the full (or empty) indicator may be invalidly updated. HOWEVER,
- * this does not cause any real problems. No queue pointers can be wrong. And even
- * if one of the flags is set invalidly, that does not pose a real problem. If
- * "full" is invalidly set, at mose one message might be lost, if we are already in
- * a timeout situation (this is quite acceptable). And if "empty" is accidently set,
- * the receiver will not continue the inner loop, but break out of the outer. So no
- * harm is done at all. For this reason, I do not yet use a mutex to guard the two
- * flags - there would be a notable performance hit with, IMHO, no gain in stability
- * or functionality. But anyhow, now it's documented...
- * rgerhards, 2007-09-20
- * NOTE: this comment does not really apply - the callers handle the mutex, so it
- * *is* guarded.
+/* enqueue a new user data element
+ * Enqueues the new element and awakes worker thread.
+ * TODO: this code still uses the "discard if queue full" approach from
+ * the main queue. This needs to be reconsidered or, better, done via a
+ * caller-selectable parameter mode. For the time being, I leave it in.
+ * rgerhards, 2008-01-03
*/
-rsRetVal queueAdd(queue_t *pThis, void* in)
+rsRetVal
+queueEnqObj(queue_t *pThis, void *pUsr)
{
DEFiRet;
+ int i;
+ struct timespec t;
assert(pThis != NULL);
- CHKiRet(pThis->qAdd(pThis, in));
-finalize_it:
- return iRet;
-}
-rsRetVal queueDel(queue_t *pThis, void **out)
-{
- DEFiRet;
+ pthread_mutex_lock(pThis->mut);
+
+ while(pThis->full) {
+ dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis);
+
+ clock_gettime (CLOCK_REALTIME, &t);
+ t.tv_sec += 2; /* TODO: configurable! */
+
+ if(pthread_cond_timedwait (pThis->notFull,
+ pThis->mut, &t) != 0) {
+ dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis);
+ ABORT_FINALIZE(RS_RET_QUEUE_FULL);
+ }
+ }
+ CHKiRet(pThis->qAdd(pThis, pUsr));
- assert(pThis != NULL);
- CHKiRet(pThis->qDel(pThis, out));
finalize_it:
+ /* now activate the worker thread */
+ pthread_mutex_unlock(pThis->mut);
+ i = pthread_cond_signal(pThis->notEmpty);
+ dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
return iRet;
}
-
/*
* vi:set ai:
*/
diff --git a/queue.h b/queue.h
index 86499306..0021dd08 100644
--- a/queue.h
+++ b/queue.h
@@ -23,6 +23,7 @@
#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED
+#include <pthread.h>
/* queue types */
typedef enum {
@@ -34,6 +35,9 @@ typedef enum {
typedef struct queue_s {
queueType_t qType;
int iMaxQueueSize; /* how large can the queue grow? */
+ pthread_t thrdWorker; /* ID of the worker thread associated with this queue */
+ int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */
+ rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
rsRetVal (*qDestruct)(struct queue_s *pThis);
@@ -55,9 +59,8 @@ typedef struct queue_s {
/* prototypes */
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize);
+rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*));
rsRetVal queueDestruct(queue_t *pThis);
-rsRetVal queueAdd(queue_t *pThis, void* in);
-rsRetVal queueDel(queue_t *pThis, void **out);
+rsRetVal queueEnqObj(queue_t *pThis, void *pUsr);
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/rsyslog.h b/rsyslog.h
index 20bbf1ef..4404a30b 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -93,6 +93,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_GSS_SEND_ERROR = -2024, /**< error during GSS (via TCP) send process */
RS_RET_TCP_SOCKCREATE_ERR = -2025, /**< error during creation of TCP socket */
RS_RET_GSS_SENDINIT_ERROR = -2024, /**< error during GSS (via TCP) send initialization process */
+ RS_RET_QUEUE_FULL = -2025, /**< queue is full, operation could not be completed */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
diff --git a/syslogd.c b/syslogd.c
index e92ef1e0..04b30e0c 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -7,33 +7,13 @@
*
* to learn more about it and discuss any questions you may have.
*
- * Please note that as of now, a lot of the code in this file stems
- * from the sysklogd project. To learn more over this project, please
- * visit
- *
- * http://www.infodrom.org/projects/sysklogd/
- *
+ * rsyslog had initially been forked from the sysklogd project.
* I would like to express my thanks to the developers of the sysklogd
* package - without it, I would have had a much harder start...
*
- * Please note that I made quite some changes to the orignal package.
- * I expect to do even more changes - up
- * to a full rewrite - to meet my design goals, which among others
- * contain a (at least) dual-thread design with a memory buffer for
- * storing received bursts of data. This is also the reason why I
- * kind of "forked" a completely new branch of the package. My intension
- * is to do many changes and only this initial release will look
- * similar to sysklogd (well, one never knows...).
- *
- * As I have made a lot of modifications, please assume that all bugs
- * in this package are mine and not those of the sysklogd team.
- *
- * As of this writing, there already exist heavy
- * modifications to the orginal sysklogd package. I suggest to no
- * longer rely too much on code knowledge you eventually have with
- * sysklogd - rgerhards 2005-07-05
- * The code is now almost completely different. Be careful!
- * rgerhards, 2006-11-30
+ * As of this writing (2008-01-03), there have been numerous changes to
+ * the original package. Be very careful when you apply some of your
+ * sysklogd knowledge to rsyslog.
*
* This Project was intiated and is maintained by
* Rainer Gerhards <rgerhards@hq.adiscon.com>. See
@@ -56,7 +36,7 @@
* to the database).
*
* rsyslog - An Enhanced syslogd Replacement.
- * Copyright 2003-2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2003-2008 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -145,6 +125,7 @@
#include <stdarg.h>
#include <time.h>
#include <dlfcn.h>
+#include <assert.h>
#include <sys/syslog.h>
#include <sys/param.h>
@@ -155,7 +136,6 @@
#endif
#include <sys/ioctl.h>
#include <sys/wait.h>
-#include <sys/socket.h>
#include <sys/file.h>
#include <sys/time.h>
@@ -166,19 +146,10 @@
#include <sys/resource.h>
#include <signal.h>
-#include <netinet/in.h>
-#include <netdb.h>
#include <dirent.h>
#include <glob.h>
#include <sys/types.h>
-#include <arpa/nameser.h>
-#include <arpa/inet.h>
-#include <resolv.h>
-#include "pidfile.h"
-#include <assert.h>
-#include <pthread.h>
-
#if HAVE_PATHS_H
#include <paths.h>
#endif
@@ -187,13 +158,13 @@
#include <zlib.h>
#endif
+#include "pidfile.h"
#include "srUtils.h"
#include "stringbuf.h"
#include "syslogd-types.h"
#include "template.h"
#include "outchannel.h"
#include "syslogd.h"
-#include "sync.h" /* struct NetAddr */
#include "parse.h"
#include "msg.h"
@@ -319,12 +290,6 @@ static EHostnameCmpMode eDfltHostnameCmpMode;
static rsCStrObj *pDfltHostnameCmp;
static rsCStrObj *pDfltProgNameCmp;
-/* supporting structures for multithreading */
-int bRunningMultithreaded = 0; /* Is this program running in multithreaded mode? */
-static pthread_t thrdWorker;
-static int bGlblDone = 0;
-/* END supporting structures for multithreading */
-
static int bParseHOSTNAMEandTAG = 1; /* global config var: should the hostname and tag be
* parsed inside message - rgerhards, 2006-03-13 */
static int bFinished = 0; /* used by termination signal handler, read-only except there
@@ -423,7 +388,6 @@ static int logEveryMsg = 0;/* no repeat message processing - read-only after st
static unsigned int Forwarding = 0;
char LocalHostName[MAXHOSTNAMELEN+1];/* our hostname - read-only after startup */
char *LocalDomain; /* our local domain name - read-only after startup */
-//char *LogPort = "514"; /* port number for INET connections */
static int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */
int family = PF_UNSPEC; /* protocol family (IPv4, IPv6 or both), set via cmdline */
int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */
@@ -562,7 +526,6 @@ static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Faci
/* up to the next comment, prototypes that should be removed by reordering */
-static void *singleWorker(); /* REMOVEME later 2005-10-24 */
/* Function prototypes. */
static char **crunch_list(char *list);
static void printline(char *hname, char *msg, int iSource);
@@ -1555,7 +1518,7 @@ logmsgInternal(int pri, char *msg, int flags)
getCurrTime(&(pMsg->tTIMESTAMP)); /* use the current time! */
flags |= INTERNAL_MSG;
- if(bRunningMultithreaded == 0) { /* not yet in queued mode */
+ if(Initialized == 0) { /* not yet in queued mode */
iminternalAddMsg(pri, pMsg, flags);
} else {
/* we have the queue, so we can simply provide the
@@ -1877,103 +1840,26 @@ static void processMsg(msg_t *pMsg)
}
-/* Start Threading-Related code */
-
-/* shuts down the worker process. The worker will first finish
- * with the message queue. Control returns, when done.
- * This function is intended to be called during syslogd shutdown
- * AND restart (init()!).
- * rgerhards, 2005-10-25
- */
-static void stopWorker(void)
-{
- if(bRunningMultithreaded) {
- /* we could run single-threaded if there was an error
- * during startup. Then, we obviously do not need to
- * do anything to stop the worker ;)
- */
- dbgprintf("Initiating worker thread shutdown sequence...\n");
- /* We are now done with all messages, so we need to wake up the
- * worker thread and then wait for it to finish.
- */
- bGlblDone = 1;
- /* It's actually not "not empty" below but awaking the worker. The worker
- * then finds out that it shall terminate and does so.
- */
- pthread_cond_signal(pMsgQueue->notEmpty);
- pthread_join(thrdWorker, NULL);
- bRunningMultithreaded = 0;
- dbgprintf("Worker thread terminated.\n");
- }
-}
-
-
-/* starts the worker thread. It must be made sure that the queue is
- * already existing and the worker is NOT already running.
- * rgerhards 2005-10-25
- */
-static void startWorker(void)
-{
- int i;
- if(pMsgQueue != NULL) {
- bGlblDone = 0; /* we are NOT done (else worker would immediately terminate) */
- i = pthread_create(&thrdWorker, NULL, singleWorker, NULL);
- dbgprintf("Worker thread started with state %d.\n", i);
- bRunningMultithreaded = 1;
- } else {
- dbgprintf("message queue not existing, remaining single-threaded.\n");
- }
-}
-
-
-/* The worker thread (so far, we have dual-threading, so only one
- * worker thread. Having more than one worker requires considerable
- * additional code review in regard to thread-safety.
+/* The consumer of dequeued messages. This function is called by the
+ * queue engine on dequeueing of a message. It runs on a SEPARATE
+ * THREAD.
+ * NOTE: Having more than one worker requires guarding of some
+ * message object structures and potentially others - need to be checked
+ * before we support multiple worker threads on the message queue.
*/
-static void *
-singleWorker()
+static rsRetVal
+msgConsumer(void *pUsr)
{
- queue_t *fifo = pMsgQueue;
- msg_t *pMsg;
- sigset_t sigSet;
+ msg_t *pMsg = (msg_t*) pUsr;
- assert(fifo != NULL);
-
- sigfillset(&sigSet);
- pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+ assert(pMsg != NULL);
- while(!bGlblDone || !fifo->empty) {
- pthread_mutex_lock(fifo->mut);
- while (fifo->empty && !bGlblDone) {
- dbgprintf("singleWorker: queue EMPTY, waiting for next message.\n");
- pthread_cond_wait (fifo->notEmpty, fifo->mut);
- }
- if(!fifo->empty) {
- /* dequeue element (still protected from mutex) */
- queueDel(fifo, (void*) &pMsg);
- assert(pMsg != NULL);
- pthread_mutex_unlock(fifo->mut);
- pthread_cond_signal (fifo->notFull);
- /* do actual processing (the lengthy part, runs in parallel) */
- dbgprintf("Lone worker is running...\n");
- processMsg(pMsg);
- MsgDestruct(pMsg);
- /* If you need a delay for testing, here do a */
- /* sleep(1); */
- } else { /* the mutex must be unlocked in any case (important for termination) */
- pthread_mutex_unlock(fifo->mut);
- }
-
- if(debugging_on && bGlblDone && !fifo->empty)
- dbgprintf("Worker does not yet terminate because it still has messages to process.\n");
- }
+ processMsg(pMsg);
+ MsgDestruct(pMsg);
- dbgprintf("Worker thread terminates\n");
- pthread_exit(0);
+ return RS_RET_OK;
}
-/* END threads-related code */
-
/* This method enqueues a message into the the message buffer. It also
* the worker thread, so that the message will be processed.
@@ -1982,41 +1868,23 @@ singleWorker()
*/
static void enqueueMsg(msg_t *pMsg)
{
- int iRet;
- queue_t *fifo = pMsgQueue;
- struct timespec t;
+ DEFiRet;
assert(pMsg != NULL);
- if(bRunningMultithreaded == 0) {
- /* multi-threading is not yet initialized, happens e.g.
+ if(Initialized == 0) {
+ /* queue is not yet initialized, happens e.g.
* during startup and restart. rgerhards, 2005-10-25
+ * TODO: check if that really still can happen! rgerhards, 2008-01-03
*/
dbgprintf("enqueueMsg: not yet running on multiple threads\n");
processMsg(pMsg);
} else {
- /* "normal" mode, threading initialized */
- pthread_mutex_lock(fifo->mut);
-
- while (fifo->full) {
- dbgprintf("enqueueMsg: queue FULL.\n");
-
- clock_gettime (CLOCK_REALTIME, &t);
- t.tv_sec += 2;
-
- if(pthread_cond_timedwait (fifo->notFull,
- fifo->mut, &t) != 0) {
- dbgprintf("enqueueMsg: cond timeout, dropping message!\n");
- MsgDestruct(pMsg);
- goto unlock;
- }
+ /* "normal" mode, queue initialized */
+ CHKiRet_Hdlr(queueEnqObj(pMsgQueue, (void*) pMsg)) {
+ /* if we have an error return, the pMsg was not destructed */
+ MsgDestruct(pMsg);
}
- queueAdd(fifo, pMsg);
- unlock:
- /* now activate the worker thread */
- pthread_mutex_unlock(fifo->mut);
- iRet = pthread_cond_signal(fifo->notEmpty);
- dbgprintf("EnqueueMsg signaled condition (%d)\n", iRet);
}
}
@@ -2766,13 +2634,13 @@ die(int sig)
/* close the inputs */
thrdTerminateAll(); /* TODO: inputs only, please */
+ /* drain queue and stop worker thread */
+ queueDestruct(pMsgQueue);
+ pMsgQueue = NULL;
+
/* Free ressources and close connections */
freeSelectors();
- /* Worker threads are stopped by freeSelectors() */
- queueDestruct(pMsgQueue); /* delete fifo here! */
- pMsgQueue = NULL;
-
/* rger 2005-02-22
* now clean up the in-memory structures. OK, the OS
* would also take care of that, but if we do it
@@ -3177,21 +3045,6 @@ static void freeSelectors(void)
if(Files != NULL) {
dbgprintf("Freeing log structures.\n");
- /* just in case, we flush the emergency log. If error messages occur after
- * this stage, we loose them, but that's ok. With multi-threading, this can
- * never happen. -- rgerhards, 2007-08-03
- */
- processImInternal();
-
- /* we first wait until all messages are processed (stopWorker() does
- * that. Then, we go one last time over all actions and flush any
- * pending "message repeated n times" messages. We must use this sequence
- * because otherwise we would flush at whatever message is currently being
- * processed without draining the queue. That would lead to invalid
- * results. -- rgerhards, 2007-12-12
- */
- stopWorker();
-
for(f = Files ; f != NULL ; f = f->f_next) {
llExecFunc(&f->llActList, freeSelectorsActions, NULL);
}
@@ -3448,6 +3301,13 @@ init(void)
dbgprintf("rsyslog %s.\n", VERSION);
dbgprintf("Called init.\n");
+ /* delete the message queue, which also flushes all messages left over */
+ if(pMsgQueue != NULL) {
+ dbgprintf("deleting main message queue\n");
+ queueDestruct(pMsgQueue); /* delete pThis here! */
+ pMsgQueue = NULL;
+ }
+
/* Close all open log files and free log descriptor array. This also frees
* all output-modules instance data.
*/
@@ -3460,12 +3320,6 @@ init(void)
dbgprintf("Clearing templates.\n");
tplDeleteNew();
- if(pMsgQueue != NULL) {
- dbgprintf("deleting message queue\n");
- queueDestruct(pMsgQueue); /* delete fifo here! */
- pMsgQueue = NULL;
- }
-
/* re-setting values to defaults (where applicable) */
/* TODO: once we have loadable modules, we must re-visit this code. The reason is
* that config variables are not re-set, because the module is not yet loaded. On
@@ -3510,14 +3364,12 @@ init(void)
}
/* create message queue */
- CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize)) {
+ CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize, msgConsumer)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);
}
- startWorker();
-
Initialized = 1;
/* the output part and the queue is now ready to run. So it is a good time