summaryrefslogtreecommitdiffstats
path: root/syslogd.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-03 13:28:45 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-03 13:28:45 +0000
commit8951958ff6d8241df9ffe048e2c0d0766a9d383b (patch)
treef667f9ec334d959d8afbf068553e7a1cf1bdcdde /syslogd.c
parent9e67ae041d964748755e5c9c45ebe55ff612391e (diff)
downloadrsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.tar.gz
rsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.tar.xz
rsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.zip
queue is now a full object and handles threading by itself
Diffstat (limited to 'syslogd.c')
-rw-r--r--syslogd.c230
1 files changed, 41 insertions, 189 deletions
diff --git a/syslogd.c b/syslogd.c
index e92ef1e0..04b30e0c 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -7,33 +7,13 @@
*
* to learn more about it and discuss any questions you may have.
*
- * Please note that as of now, a lot of the code in this file stems
- * from the sysklogd project. To learn more over this project, please
- * visit
- *
- * http://www.infodrom.org/projects/sysklogd/
- *
+ * rsyslog had initially been forked from the sysklogd project.
* I would like to express my thanks to the developers of the sysklogd
* package - without it, I would have had a much harder start...
*
- * Please note that I made quite some changes to the orignal package.
- * I expect to do even more changes - up
- * to a full rewrite - to meet my design goals, which among others
- * contain a (at least) dual-thread design with a memory buffer for
- * storing received bursts of data. This is also the reason why I
- * kind of "forked" a completely new branch of the package. My intension
- * is to do many changes and only this initial release will look
- * similar to sysklogd (well, one never knows...).
- *
- * As I have made a lot of modifications, please assume that all bugs
- * in this package are mine and not those of the sysklogd team.
- *
- * As of this writing, there already exist heavy
- * modifications to the orginal sysklogd package. I suggest to no
- * longer rely too much on code knowledge you eventually have with
- * sysklogd - rgerhards 2005-07-05
- * The code is now almost completely different. Be careful!
- * rgerhards, 2006-11-30
+ * As of this writing (2008-01-03), there have been numerous changes to
+ * the original package. Be very careful when you apply some of your
+ * sysklogd knowledge to rsyslog.
*
* This Project was intiated and is maintained by
* Rainer Gerhards <rgerhards@hq.adiscon.com>. See
@@ -56,7 +36,7 @@
* to the database).
*
* rsyslog - An Enhanced syslogd Replacement.
- * Copyright 2003-2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2003-2008 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -145,6 +125,7 @@
#include <stdarg.h>
#include <time.h>
#include <dlfcn.h>
+#include <assert.h>
#include <sys/syslog.h>
#include <sys/param.h>
@@ -155,7 +136,6 @@
#endif
#include <sys/ioctl.h>
#include <sys/wait.h>
-#include <sys/socket.h>
#include <sys/file.h>
#include <sys/time.h>
@@ -166,19 +146,10 @@
#include <sys/resource.h>
#include <signal.h>
-#include <netinet/in.h>
-#include <netdb.h>
#include <dirent.h>
#include <glob.h>
#include <sys/types.h>
-#include <arpa/nameser.h>
-#include <arpa/inet.h>
-#include <resolv.h>
-#include "pidfile.h"
-#include <assert.h>
-#include <pthread.h>
-
#if HAVE_PATHS_H
#include <paths.h>
#endif
@@ -187,13 +158,13 @@
#include <zlib.h>
#endif
+#include "pidfile.h"
#include "srUtils.h"
#include "stringbuf.h"
#include "syslogd-types.h"
#include "template.h"
#include "outchannel.h"
#include "syslogd.h"
-#include "sync.h" /* struct NetAddr */
#include "parse.h"
#include "msg.h"
@@ -319,12 +290,6 @@ static EHostnameCmpMode eDfltHostnameCmpMode;
static rsCStrObj *pDfltHostnameCmp;
static rsCStrObj *pDfltProgNameCmp;
-/* supporting structures for multithreading */
-int bRunningMultithreaded = 0; /* Is this program running in multithreaded mode? */
-static pthread_t thrdWorker;
-static int bGlblDone = 0;
-/* END supporting structures for multithreading */
-
static int bParseHOSTNAMEandTAG = 1; /* global config var: should the hostname and tag be
* parsed inside message - rgerhards, 2006-03-13 */
static int bFinished = 0; /* used by termination signal handler, read-only except there
@@ -423,7 +388,6 @@ static int logEveryMsg = 0;/* no repeat message processing - read-only after st
static unsigned int Forwarding = 0;
char LocalHostName[MAXHOSTNAMELEN+1];/* our hostname - read-only after startup */
char *LocalDomain; /* our local domain name - read-only after startup */
-//char *LogPort = "514"; /* port number for INET connections */
static int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */
int family = PF_UNSPEC; /* protocol family (IPv4, IPv6 or both), set via cmdline */
int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */
@@ -562,7 +526,6 @@ static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Faci
/* up to the next comment, prototypes that should be removed by reordering */
-static void *singleWorker(); /* REMOVEME later 2005-10-24 */
/* Function prototypes. */
static char **crunch_list(char *list);
static void printline(char *hname, char *msg, int iSource);
@@ -1555,7 +1518,7 @@ logmsgInternal(int pri, char *msg, int flags)
getCurrTime(&(pMsg->tTIMESTAMP)); /* use the current time! */
flags |= INTERNAL_MSG;
- if(bRunningMultithreaded == 0) { /* not yet in queued mode */
+ if(Initialized == 0) { /* not yet in queued mode */
iminternalAddMsg(pri, pMsg, flags);
} else {
/* we have the queue, so we can simply provide the
@@ -1877,103 +1840,26 @@ static void processMsg(msg_t *pMsg)
}
-/* Start Threading-Related code */
-
-/* shuts down the worker process. The worker will first finish
- * with the message queue. Control returns, when done.
- * This function is intended to be called during syslogd shutdown
- * AND restart (init()!).
- * rgerhards, 2005-10-25
- */
-static void stopWorker(void)
-{
- if(bRunningMultithreaded) {
- /* we could run single-threaded if there was an error
- * during startup. Then, we obviously do not need to
- * do anything to stop the worker ;)
- */
- dbgprintf("Initiating worker thread shutdown sequence...\n");
- /* We are now done with all messages, so we need to wake up the
- * worker thread and then wait for it to finish.
- */
- bGlblDone = 1;
- /* It's actually not "not empty" below but awaking the worker. The worker
- * then finds out that it shall terminate and does so.
- */
- pthread_cond_signal(pMsgQueue->notEmpty);
- pthread_join(thrdWorker, NULL);
- bRunningMultithreaded = 0;
- dbgprintf("Worker thread terminated.\n");
- }
-}
-
-
-/* starts the worker thread. It must be made sure that the queue is
- * already existing and the worker is NOT already running.
- * rgerhards 2005-10-25
- */
-static void startWorker(void)
-{
- int i;
- if(pMsgQueue != NULL) {
- bGlblDone = 0; /* we are NOT done (else worker would immediately terminate) */
- i = pthread_create(&thrdWorker, NULL, singleWorker, NULL);
- dbgprintf("Worker thread started with state %d.\n", i);
- bRunningMultithreaded = 1;
- } else {
- dbgprintf("message queue not existing, remaining single-threaded.\n");
- }
-}
-
-
-/* The worker thread (so far, we have dual-threading, so only one
- * worker thread. Having more than one worker requires considerable
- * additional code review in regard to thread-safety.
+/* The consumer of dequeued messages. This function is called by the
+ * queue engine on dequeueing of a message. It runs on a SEPARATE
+ * THREAD.
+ * NOTE: Having more than one worker requires guarding of some
+ * message object structures and potentially others - need to be checked
+ * before we support multiple worker threads on the message queue.
*/
-static void *
-singleWorker()
+static rsRetVal
+msgConsumer(void *pUsr)
{
- queue_t *fifo = pMsgQueue;
- msg_t *pMsg;
- sigset_t sigSet;
+ msg_t *pMsg = (msg_t*) pUsr;
- assert(fifo != NULL);
-
- sigfillset(&sigSet);
- pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+ assert(pMsg != NULL);
- while(!bGlblDone || !fifo->empty) {
- pthread_mutex_lock(fifo->mut);
- while (fifo->empty && !bGlblDone) {
- dbgprintf("singleWorker: queue EMPTY, waiting for next message.\n");
- pthread_cond_wait (fifo->notEmpty, fifo->mut);
- }
- if(!fifo->empty) {
- /* dequeue element (still protected from mutex) */
- queueDel(fifo, (void*) &pMsg);
- assert(pMsg != NULL);
- pthread_mutex_unlock(fifo->mut);
- pthread_cond_signal (fifo->notFull);
- /* do actual processing (the lengthy part, runs in parallel) */
- dbgprintf("Lone worker is running...\n");
- processMsg(pMsg);
- MsgDestruct(pMsg);
- /* If you need a delay for testing, here do a */
- /* sleep(1); */
- } else { /* the mutex must be unlocked in any case (important for termination) */
- pthread_mutex_unlock(fifo->mut);
- }
-
- if(debugging_on && bGlblDone && !fifo->empty)
- dbgprintf("Worker does not yet terminate because it still has messages to process.\n");
- }
+ processMsg(pMsg);
+ MsgDestruct(pMsg);
- dbgprintf("Worker thread terminates\n");
- pthread_exit(0);
+ return RS_RET_OK;
}
-/* END threads-related code */
-
/* This method enqueues a message into the the message buffer. It also
* the worker thread, so that the message will be processed.
@@ -1982,41 +1868,23 @@ singleWorker()
*/
static void enqueueMsg(msg_t *pMsg)
{
- int iRet;
- queue_t *fifo = pMsgQueue;
- struct timespec t;
+ DEFiRet;
assert(pMsg != NULL);
- if(bRunningMultithreaded == 0) {
- /* multi-threading is not yet initialized, happens e.g.
+ if(Initialized == 0) {
+ /* queue is not yet initialized, happens e.g.
* during startup and restart. rgerhards, 2005-10-25
+ * TODO: check if that really still can happen! rgerhards, 2008-01-03
*/
dbgprintf("enqueueMsg: not yet running on multiple threads\n");
processMsg(pMsg);
} else {
- /* "normal" mode, threading initialized */
- pthread_mutex_lock(fifo->mut);
-
- while (fifo->full) {
- dbgprintf("enqueueMsg: queue FULL.\n");
-
- clock_gettime (CLOCK_REALTIME, &t);
- t.tv_sec += 2;
-
- if(pthread_cond_timedwait (fifo->notFull,
- fifo->mut, &t) != 0) {
- dbgprintf("enqueueMsg: cond timeout, dropping message!\n");
- MsgDestruct(pMsg);
- goto unlock;
- }
+ /* "normal" mode, queue initialized */
+ CHKiRet_Hdlr(queueEnqObj(pMsgQueue, (void*) pMsg)) {
+ /* if we have an error return, the pMsg was not destructed */
+ MsgDestruct(pMsg);
}
- queueAdd(fifo, pMsg);
- unlock:
- /* now activate the worker thread */
- pthread_mutex_unlock(fifo->mut);
- iRet = pthread_cond_signal(fifo->notEmpty);
- dbgprintf("EnqueueMsg signaled condition (%d)\n", iRet);
}
}
@@ -2766,13 +2634,13 @@ die(int sig)
/* close the inputs */
thrdTerminateAll(); /* TODO: inputs only, please */
+ /* drain queue and stop worker thread */
+ queueDestruct(pMsgQueue);
+ pMsgQueue = NULL;
+
/* Free ressources and close connections */
freeSelectors();
- /* Worker threads are stopped by freeSelectors() */
- queueDestruct(pMsgQueue); /* delete fifo here! */
- pMsgQueue = NULL;
-
/* rger 2005-02-22
* now clean up the in-memory structures. OK, the OS
* would also take care of that, but if we do it
@@ -3177,21 +3045,6 @@ static void freeSelectors(void)
if(Files != NULL) {
dbgprintf("Freeing log structures.\n");
- /* just in case, we flush the emergency log. If error messages occur after
- * this stage, we loose them, but that's ok. With multi-threading, this can
- * never happen. -- rgerhards, 2007-08-03
- */
- processImInternal();
-
- /* we first wait until all messages are processed (stopWorker() does
- * that. Then, we go one last time over all actions and flush any
- * pending "message repeated n times" messages. We must use this sequence
- * because otherwise we would flush at whatever message is currently being
- * processed without draining the queue. That would lead to invalid
- * results. -- rgerhards, 2007-12-12
- */
- stopWorker();
-
for(f = Files ; f != NULL ; f = f->f_next) {
llExecFunc(&f->llActList, freeSelectorsActions, NULL);
}
@@ -3448,6 +3301,13 @@ init(void)
dbgprintf("rsyslog %s.\n", VERSION);
dbgprintf("Called init.\n");
+ /* delete the message queue, which also flushes all messages left over */
+ if(pMsgQueue != NULL) {
+ dbgprintf("deleting main message queue\n");
+ queueDestruct(pMsgQueue); /* delete pThis here! */
+ pMsgQueue = NULL;
+ }
+
/* Close all open log files and free log descriptor array. This also frees
* all output-modules instance data.
*/
@@ -3460,12 +3320,6 @@ init(void)
dbgprintf("Clearing templates.\n");
tplDeleteNew();
- if(pMsgQueue != NULL) {
- dbgprintf("deleting message queue\n");
- queueDestruct(pMsgQueue); /* delete fifo here! */
- pMsgQueue = NULL;
- }
-
/* re-setting values to defaults (where applicable) */
/* TODO: once we have loadable modules, we must re-visit this code. The reason is
* that config variables are not re-set, because the module is not yet loaded. On
@@ -3510,14 +3364,12 @@ init(void)
}
/* create message queue */
- CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize)) {
+ CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize, msgConsumer)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);
}
- startWorker();
-
Initialized = 1;
/* the output part and the queue is now ready to run. So it is a good time