summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2007-12-14 11:55:35 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2007-12-14 11:55:35 +0000
commit0c5e8a2f96dd22534c0b7189ff5e75519be03b82 (patch)
treea96f0e832270068dc3f81d657e4fda76014f4d76
parent004229dda605fa93fa04f7c94bff69517241a885 (diff)
downloadrsyslog-0c5e8a2f96dd22534c0b7189ff5e75519be03b82.tar.gz
rsyslog-0c5e8a2f96dd22534c0b7189ff5e75519be03b82.tar.xz
rsyslog-0c5e8a2f96dd22534c0b7189ff5e75519be03b82.zip
moved core threading helpers out of syslogd.c
-rw-r--r--Makefile.am2
-rw-r--r--configure.ac4
-rw-r--r--sync.c6
-rw-r--r--sync.h9
-rw-r--r--syslogd.c154
-rw-r--r--threads.c123
-rw-r--r--threads.h47
7 files changed, 181 insertions, 164 deletions
diff --git a/Makefile.am b/Makefile.am
index dc67a195..7e738f18 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -30,6 +30,8 @@ rsyslogd_SOURCES = \
template.h \
outchannel.h \
liblogging-stub.h \
+ threads.c \
+ threads.h \
sync.c \
sync.h \
net.c \
diff --git a/configure.ac b/configure.ac
index 64329cfb..ef5353d9 100644
--- a/configure.ac
+++ b/configure.ac
@@ -165,6 +165,10 @@ AC_ARG_ENABLE(pthreads,
[enable_pthreads=yes]
)
+if test "x$enable_pthreads" = "xno"; then
+ AC_MSG_ERROR(rsyslog v3 does no longer support single threading mode -- use a previous version for that);
+fi
+
if test "x$enable_pthreads" != "xno"; then
AC_CHECK_HEADERS(
[pthread.h],
diff --git a/sync.c b/sync.c
index a3254cae..af2119a3 100644
--- a/sync.c
+++ b/sync.c
@@ -23,10 +23,6 @@
#include "config.h"
-#ifdef USE_PTHREADS
-/* all of this code is compiled only if PTHREADS is supported - otherwise
- * we do not need syncrhonization objects (and do not have them!).
- */
#include <stdlib.h>
#include "rsyslog.h"
@@ -75,5 +71,3 @@ unlockObj(pthread_mutex_t *mut)
pthread_mutex_unlock(mut);
}
#endif /* #ifndef NDEBUG */
-
-#endif /* #ifdef USE_PTHREADS */
diff --git a/sync.h b/sync.h
index c0c6948d..fa79b66b 100644
--- a/sync.h
+++ b/sync.h
@@ -24,7 +24,6 @@
#ifndef INCLUDED_SYNC_H
#define INCLUDED_SYNC_H
-#ifdef USE_PTHREADS /* Code to compile for threading support */
#include <pthread.h>
/* SYNC_OBJ_TOOL definition must be placed in object to be synced!
@@ -52,12 +51,4 @@ void SyncObjExit(pthread_mutex_t **mut);
extern void lockObj(pthread_mutex_t *mut);
extern void unlockObj(pthread_mutex_t *mut);
-#else /* Code not to compile for threading support */
-#define SYNC_OBJ_TOOL
-#define SYNC_OBJ_TOOL_INIT(x)
-#define SYNC_OBJ_TOOL_EXIT(X)
-#define LockObj(x)
-#define UnlockObj(x)
-#endif
-
#endif /* #ifndef INCLUDED_SYNC_H */
diff --git a/syslogd.c b/syslogd.c
index dd230358..f8db3e34 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -203,9 +203,7 @@
#include <assert.h>
-#ifdef USE_PTHREADS
#include <pthread.h>
-#endif
#if HAVE_PATHS_H
#include <paths.h>
@@ -239,6 +237,7 @@
#include "omfwd.h"
#include "omfile.h"
#include "omdiscard.h"
+#include "threads.h"
#if IMMARK
#include "plugins/immark/immark.h"
#endif
@@ -385,24 +384,9 @@ static rsCStrObj *pDfltHostnameCmp;
static rsCStrObj *pDfltProgNameCmp;
/* supporting structures for multithreading */
-#ifdef USE_PTHREADS
-/* this is the first approach to a queue, this time with static
- * memory.
- */
-typedef struct {
- void** pbuf;
- long head, tail;
- int full, empty;
- pthread_mutex_t *mut;
- pthread_cond_t *notFull, *notEmpty;
-} msgQueue;
-
-int iMainMsgQueueSize;
int bRunningMultithreaded = 0; /* Is this program running in multithreaded mode? */
-msgQueue *pMsgQueue = NULL;
static pthread_t thrdWorker;
static int bGlblDone = 0;
-#endif
/* END supporting structures for multithreading */
static int bParseHOSTNAMEandTAG = 1; /* global config var: should the hostname and tag be
@@ -632,9 +616,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
free(pModDir);
pModDir = NULL;
}
-#ifdef USE_PTHREADS
iMainMsgQueueSize = 10000;
-#endif
#if defined(SYSLOG_INET) && defined(USE_GSSAPI)
if (gss_listen_service_name != NULL) {
free(gss_listen_service_name);
@@ -673,10 +655,7 @@ static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Faci
/* up to the next comment, prototypes that should be removed by reordering */
-#ifdef USE_PTHREADS
-static msgQueue *queueInit (void);
static void *singleWorker(); /* REMOVEME later 2005-10-24 */
-#endif
/* Function prototypes. */
static char **crunch_list(char *list);
static void printline(char *hname, char *msg, int iSource);
@@ -2284,7 +2263,6 @@ dbgprintf("logmsgInternal: msg passed: '%s'\n", msg);
getCurrTime(&(pMsg->tTIMESTAMP)); /* use the current time! */
flags |= INTERNAL_MSG;
-#ifdef USE_PTHREADS
if(bRunningMultithreaded == 0) { /* not yet in queued mode */
iminternalAddMsg(pri, pMsg, flags);
} else {
@@ -2294,9 +2272,6 @@ dbgprintf("logmsgInternal: msg passed: '%s'\n", msg);
logmsg(pri, pMsg, flags);
MsgDestruct(pMsg);
}
-#else
- iminternalAddMsg(pri, pMsg, flags);
-#endif
}
/* This functions looks at the given message and checks if it matches the
@@ -2611,13 +2586,7 @@ static void processMsg(msg_t *pMsg)
}
-#ifdef USE_PTHREADS
-/* This block contains code that is only present when USE_PTHREADS is
- * enabled. I plan to move it to some other file, but for the time
- * being, I include it here because that saves me from the need to
- * do so many external definitons.
- * rgerhards, 2005-10-24
- */
+/* Start Threading-Related code */
/* shuts down the worker process. The worker will first finish
* with the message queue. Control returns, when done.
@@ -2666,87 +2635,6 @@ static void startWorker(void)
}
-static msgQueue *queueInit (void)
-{
- msgQueue *q;
-
- q = (msgQueue *)malloc(sizeof(msgQueue));
- if (q == NULL) return (NULL);
- if((q->pbuf = malloc(sizeof(void *) * iMainMsgQueueSize)) == NULL) {
- free(q);
- return NULL;
- }
-
- q->empty = 1;
- q->full = 0;
- q->head = 0;
- q->tail = 0;
- q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
- pthread_mutex_init (q->mut, NULL);
- q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
- pthread_cond_init (q->notFull, NULL);
- q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
- pthread_cond_init (q->notEmpty, NULL);
-
- return (q);
-}
-
-static void queueDelete (msgQueue *q)
-{
- pthread_mutex_destroy (q->mut);
- free (q->mut);
- pthread_cond_destroy (q->notFull);
- free (q->notFull);
- pthread_cond_destroy (q->notEmpty);
- free (q->notEmpty);
- free(q->pbuf);
- free (q);
-}
-
-
-/* In queueAdd() and queueDel() we have a potential race condition. If a message
- * is dequeued and at the same time a message is enqueued and the queue is either
- * full or empty, the full (or empty) indicator may be invalidly updated. HOWEVER,
- * this does not cause any real problems. No queue pointers can be wrong. And even
- * if one of the flags is set invalidly, that does not pose a real problem. If
- * "full" is invalidly set, at mose one message might be lost, if we are already in
- * a timeout situation (this is quite acceptable). And if "empty" is accidently set,
- * the receiver will not continue the inner loop, but break out of the outer. So no
- * harm is done at all. For this reason, I do not yet use a mutex to guard the two
- * flags - there would be a notable performance hit with, IMHO, no gain in stability
- * or functionality. But anyhow, now it's documented...
- * rgerhards, 2007-09-20
- * NOTE: this comment does not really apply - the callers handle the mutex, so it
- * *is* guarded.
- */
-static void queueAdd (msgQueue *q, void* in)
-{
- q->pbuf[q->tail] = in;
- q->tail++;
- if (q->tail == iMainMsgQueueSize)
- q->tail = 0;
- if (q->tail == q->head)
- q->full = 1;
- q->empty = 0;
-
- return;
-}
-
-static void queueDel (msgQueue *q, msg_t **out)
-{
- *out = (msg_t*) q->pbuf[q->head];
-
- q->head++;
- if (q->head == iMainMsgQueueSize)
- q->head = 0;
- if (q->head == q->tail)
- q->empty = 1;
- q->full = 0;
-
- return;
-}
-
-
/* The worker thread (so far, we have dual-threading, so only one
* worker thread. Having more than one worker requires considerable
* additional code review in regard to thread-safety.
@@ -2770,7 +2658,7 @@ static void *singleWorker()
}
if(!fifo->empty) {
/* dequeue element (still protected from mutex) */
- queueDel(fifo, &pMsg);
+ queueDel(fifo, (void*) &pMsg);
assert(pMsg != NULL);
pthread_mutex_unlock(fifo->mut);
pthread_cond_signal (fifo->notFull);
@@ -2793,19 +2681,13 @@ static void *singleWorker()
}
/* END threads-related code */
-#endif /* #ifdef USE_PTHREADS */
/* This method enqueues a message into the the message buffer. It also
- * the worker thread, so that the message will be processed. If we are
- * compiled without PTHREADS support, we simply use this method as
- * an alias for processMsg().
+ * the worker thread, so that the message will be processed.
* See comment dated 2005-10-13 in logmsg() on multithreading.
* rgerhards, 2005-10-24
*/
-#ifndef USE_PTHREADS
-#define enqueueMsg(x) processMsg((x))
-#else
static void enqueueMsg(msg_t *pMsg)
{
int iRet;
@@ -2844,7 +2726,6 @@ static void enqueueMsg(msg_t *pMsg)
dbgprintf("EnqueueMsg signaled condition (%d)\n", iRet);
}
}
-#endif /* #ifndef USE_PTHREADS */
/* Helper to parseRFCSyslogMsg. This function parses a field up to
@@ -3636,11 +3517,9 @@ static void die(int sig)
/* Free ressources and close connections */
freeSelectors();
-#ifdef USE_PTHREADS
/* Worker threads are stopped by freeSelectors() */
queueDelete(pMsgQueue); /* delete fifo here! */
pMsgQueue = NULL;
-#endif
/* now clean up the listener part */
#ifdef SYSLOG_INET
@@ -4132,9 +4011,7 @@ static void freeSelectors(void)
* processed without draining the queue. That would lead to invalid
* results. -- rgerhards, 2007-12-12
*/
-# ifdef USE_PTHREADS
stopWorker();
-# endif
for(f = Files ; f != NULL ; f = f->f_next) {
llExecFunc(&f->llActList, freeSelectorsActions, NULL);
@@ -4240,9 +4117,7 @@ static void dbgPrintInitInfo(void)
printf("Control character escape sequence prefix is '%c'.\n",
cCCEscapeChar);
-#ifdef USE_PTHREADS
printf("Main queue size %d messages.\n", iMainMsgQueueSize);
-#endif
}
@@ -4430,13 +4305,11 @@ static void init(void)
dbgprintf("Clearing templates.\n");
tplDeleteNew();
-#ifdef USE_PTHREADS
if(pMsgQueue != NULL) {
dbgprintf("deleting message queue\n");
queueDelete(pMsgQueue); /* delete fifo here! */
pMsgQueue = NULL;
}
-#endif
/* re-setting values to defaults (where applicable) */
/* TODO: once we have loadable modules, we must re-visit this code. The reason is
@@ -4533,16 +4406,14 @@ static void init(void)
}
#endif
-# ifdef USE_PTHREADS
/* create message queue */
pMsgQueue = queueInit();
if(pMsgQueue == NULL) {
- errno = 0;
+ errno = 0; /* TODO: check if that is possible without threads - I think we must give up... */
logerror("error: could not create message queue - running single-threaded!\n");
}
startWorker();
-# endif
Initialized = 1;
@@ -5412,15 +5283,12 @@ int decode(uchar *name, struct code *codetab)
extern void dbgprintf(char *fmt, ...) __attribute__((format(printf,1, 2)));
void dbgprintf(char *fmt, ...)
{
-# ifdef USE_PTHREADS
static int bWasNL = FALSE;
-# endif
va_list ap;
if ( !(Debug && debugging_on) )
return;
-# ifdef USE_PTHREADS
/* The bWasNL handler does not really work. It works if no thread
* switching occurs during non-NL messages. Else, things are messed
* up. Anyhow, it works well enough to provide useful help during
@@ -5435,7 +5303,6 @@ void dbgprintf(char *fmt, ...)
fprintf(stdout, "%8.8d: ", (unsigned int) pthread_self());
}
bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? TRUE : FALSE;
-# endif
va_start(ap, fmt);
vfprintf(stdout, fmt, ap);
va_end(ap);
@@ -5856,10 +5723,8 @@ static void mainloop(void)
int iTCPSess;
#endif /* #ifdef SYSLOG_INET */
#ifdef BSD
-#ifdef USE_PTHREADS
struct timeval tvSelectTimeout;
#endif
-#endif
while(!bFinished){
errno = 0;
@@ -5956,7 +5821,6 @@ static void mainloop(void)
#define MAIN_SELECT_TIMEVAL NULL
#ifdef BSD
-#ifdef USE_PTHREADS
/* There seems to be a problem with BSD and threads. When running on
* multiple threads, a signal will not cause the select call to be
* interrrupted. I am not sure if this is by design or an bug (some
@@ -5979,7 +5843,6 @@ static void mainloop(void)
# undef MAIN_SELECT_TIMEVAL
# define MAIN_SELECT_TIMEVAL &tvSelectTimeout
#endif
-#endif
#ifdef SYSLOG_INET
#define MAIN_SELECT_WRITEFDS (fd_set *) &writefds
#else
@@ -6085,9 +5948,7 @@ static rsRetVal loadBuildInModules(void)
* is that rsyslog will terminate if we can not register our built-in config commands.
* This, I think, is the right thing to do. -- rgerhards, 2007-07-31
*/
-#ifdef USE_PTHREADS
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL));
-#endif
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL));
@@ -6122,11 +5983,6 @@ static void printVersion(void)
{
printf("rsyslogd %s, ", VERSION);
printf("compiled with:\n");
-#ifdef USE_PTHREADS
- printf("\tFEATURE_PTHREADS (dual-threading):\tYes\n");
-#else
- printf("\tFEATURE_PTHREADS (dual-threading):\tNo\n");
-#endif
#ifdef FEATURE_REGEXP
printf("\tFEATURE_REGEXP:\t\t\t\tYes\n");
#else
diff --git a/threads.c b/threads.c
new file mode 100644
index 00000000..c3aee4b8
--- /dev/null
+++ b/threads.c
@@ -0,0 +1,123 @@
+/* threads.c
+ *
+ * This file implements threading support helpers (and maybe the thread object)
+ * for rsyslog.
+ *
+ * File begun on 2007-12-14 by RGerhards
+ *
+ * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+#include "rsyslog.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include "syslogd.h"
+#include "threads.h"
+
+int iMainMsgQueueSize;
+msgQueue *pMsgQueue = NULL;
+
+msgQueue *queueInit (void)
+{
+ msgQueue *q;
+
+ q = (msgQueue *)malloc(sizeof(msgQueue));
+ if (q == NULL) return (NULL);
+ if((q->pbuf = malloc(sizeof(void *) * iMainMsgQueueSize)) == NULL) {
+ free(q);
+ return NULL;
+ }
+
+ q->empty = 1;
+ q->full = 0;
+ q->head = 0;
+ q->tail = 0;
+ q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
+ pthread_mutex_init (q->mut, NULL);
+ q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
+ pthread_cond_init (q->notFull, NULL);
+ q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
+ pthread_cond_init (q->notEmpty, NULL);
+
+ return (q);
+}
+
+void queueDelete (msgQueue *q)
+{
+ pthread_mutex_destroy (q->mut);
+ free (q->mut);
+ pthread_cond_destroy (q->notFull);
+ free (q->notFull);
+ pthread_cond_destroy (q->notEmpty);
+ free (q->notEmpty);
+ free(q->pbuf);
+ free (q);
+}
+
+
+/* In queueAdd() and queueDel() we have a potential race condition. If a message
+ * is dequeued and at the same time a message is enqueued and the queue is either
+ * full or empty, the full (or empty) indicator may be invalidly updated. HOWEVER,
+ * this does not cause any real problems. No queue pointers can be wrong. And even
+ * if one of the flags is set invalidly, that does not pose a real problem. If
+ * "full" is invalidly set, at mose one message might be lost, if we are already in
+ * a timeout situation (this is quite acceptable). And if "empty" is accidently set,
+ * the receiver will not continue the inner loop, but break out of the outer. So no
+ * harm is done at all. For this reason, I do not yet use a mutex to guard the two
+ * flags - there would be a notable performance hit with, IMHO, no gain in stability
+ * or functionality. But anyhow, now it's documented...
+ * rgerhards, 2007-09-20
+ * NOTE: this comment does not really apply - the callers handle the mutex, so it
+ * *is* guarded.
+ */
+void queueAdd (msgQueue *q, void* in)
+{
+ q->pbuf[q->tail] = in;
+ q->tail++;
+ if (q->tail == iMainMsgQueueSize)
+ q->tail = 0;
+ if (q->tail == q->head)
+ q->full = 1;
+ q->empty = 0;
+
+ return;
+}
+
+void queueDel(msgQueue *q, void **out)
+{
+ *out = (void*) q->pbuf[q->head];
+
+ q->head++;
+ if (q->head == iMainMsgQueueSize)
+ q->head = 0;
+ if (q->head == q->tail)
+ q->empty = 1;
+ q->full = 0;
+
+ return;
+}
+
+/*
+ * vi:set ai:
+ */
diff --git a/threads.h b/threads.h
new file mode 100644
index 00000000..5f3faab7
--- /dev/null
+++ b/threads.h
@@ -0,0 +1,47 @@
+/* Definition of the threading support module.
+ *
+ * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+
+#ifndef THREADS_H_INCLUDED
+#define THREADS_H_INCLUDED
+
+/* this is the first approach to a queue, this time with static
+ * memory.
+ */
+typedef struct {
+ void** pbuf;
+ long head, tail;
+ int full, empty;
+ pthread_mutex_t *mut;
+ pthread_cond_t *notFull, *notEmpty;
+} msgQueue;
+
+/* prototypes */
+msgQueue *queueInit (void);
+void queueDelete (msgQueue *q);
+void queueAdd (msgQueue *q, void* in);
+void queueDel (msgQueue *q, void **out);
+
+/* go-away's */
+extern int iMainMsgQueueSize;
+extern msgQueue *pMsgQueue;
+
+#endif /* #ifndef THREADS_H_INCLUDED */