diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2007-12-14 11:55:35 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2007-12-14 11:55:35 +0000 |
commit | 0c5e8a2f96dd22534c0b7189ff5e75519be03b82 (patch) | |
tree | a96f0e832270068dc3f81d657e4fda76014f4d76 | |
parent | 004229dda605fa93fa04f7c94bff69517241a885 (diff) | |
download | rsyslog-0c5e8a2f96dd22534c0b7189ff5e75519be03b82.tar.gz rsyslog-0c5e8a2f96dd22534c0b7189ff5e75519be03b82.tar.xz rsyslog-0c5e8a2f96dd22534c0b7189ff5e75519be03b82.zip |
moved core threading helpers out of syslogd.c
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | configure.ac | 4 | ||||
-rw-r--r-- | sync.c | 6 | ||||
-rw-r--r-- | sync.h | 9 | ||||
-rw-r--r-- | syslogd.c | 154 | ||||
-rw-r--r-- | threads.c | 123 | ||||
-rw-r--r-- | threads.h | 47 |
7 files changed, 181 insertions, 164 deletions
diff --git a/Makefile.am b/Makefile.am index dc67a195..7e738f18 100644 --- a/Makefile.am +++ b/Makefile.am @@ -30,6 +30,8 @@ rsyslogd_SOURCES = \ template.h \ outchannel.h \ liblogging-stub.h \ + threads.c \ + threads.h \ sync.c \ sync.h \ net.c \ diff --git a/configure.ac b/configure.ac index 64329cfb..ef5353d9 100644 --- a/configure.ac +++ b/configure.ac @@ -165,6 +165,10 @@ AC_ARG_ENABLE(pthreads, [enable_pthreads=yes] ) +if test "x$enable_pthreads" = "xno"; then + AC_MSG_ERROR(rsyslog v3 does no longer support single threading mode -- use a previous version for that); +fi + if test "x$enable_pthreads" != "xno"; then AC_CHECK_HEADERS( [pthread.h], @@ -23,10 +23,6 @@ #include "config.h" -#ifdef USE_PTHREADS -/* all of this code is compiled only if PTHREADS is supported - otherwise - * we do not need syncrhonization objects (and do not have them!). - */ #include <stdlib.h> #include "rsyslog.h" @@ -75,5 +71,3 @@ unlockObj(pthread_mutex_t *mut) pthread_mutex_unlock(mut); } #endif /* #ifndef NDEBUG */ - -#endif /* #ifdef USE_PTHREADS */ @@ -24,7 +24,6 @@ #ifndef INCLUDED_SYNC_H #define INCLUDED_SYNC_H -#ifdef USE_PTHREADS /* Code to compile for threading support */ #include <pthread.h> /* SYNC_OBJ_TOOL definition must be placed in object to be synced! @@ -52,12 +51,4 @@ void SyncObjExit(pthread_mutex_t **mut); extern void lockObj(pthread_mutex_t *mut); extern void unlockObj(pthread_mutex_t *mut); -#else /* Code not to compile for threading support */ -#define SYNC_OBJ_TOOL -#define SYNC_OBJ_TOOL_INIT(x) -#define SYNC_OBJ_TOOL_EXIT(X) -#define LockObj(x) -#define UnlockObj(x) -#endif - #endif /* #ifndef INCLUDED_SYNC_H */ @@ -203,9 +203,7 @@ #include <assert.h> -#ifdef USE_PTHREADS #include <pthread.h> -#endif #if HAVE_PATHS_H #include <paths.h> @@ -239,6 +237,7 @@ #include "omfwd.h" #include "omfile.h" #include "omdiscard.h" +#include "threads.h" #if IMMARK #include "plugins/immark/immark.h" #endif @@ -385,24 +384,9 @@ static rsCStrObj *pDfltHostnameCmp; static rsCStrObj *pDfltProgNameCmp; /* supporting structures for multithreading */ -#ifdef USE_PTHREADS -/* this is the first approach to a queue, this time with static - * memory. - */ -typedef struct { - void** pbuf; - long head, tail; - int full, empty; - pthread_mutex_t *mut; - pthread_cond_t *notFull, *notEmpty; -} msgQueue; - -int iMainMsgQueueSize; int bRunningMultithreaded = 0; /* Is this program running in multithreaded mode? */ -msgQueue *pMsgQueue = NULL; static pthread_t thrdWorker; static int bGlblDone = 0; -#endif /* END supporting structures for multithreading */ static int bParseHOSTNAMEandTAG = 1; /* global config var: should the hostname and tag be @@ -632,9 +616,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a free(pModDir); pModDir = NULL; } -#ifdef USE_PTHREADS iMainMsgQueueSize = 10000; -#endif #if defined(SYSLOG_INET) && defined(USE_GSSAPI) if (gss_listen_service_name != NULL) { free(gss_listen_service_name); @@ -673,10 +655,7 @@ static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Faci /* up to the next comment, prototypes that should be removed by reordering */ -#ifdef USE_PTHREADS -static msgQueue *queueInit (void); static void *singleWorker(); /* REMOVEME later 2005-10-24 */ -#endif /* Function prototypes. */ static char **crunch_list(char *list); static void printline(char *hname, char *msg, int iSource); @@ -2284,7 +2263,6 @@ dbgprintf("logmsgInternal: msg passed: '%s'\n", msg); getCurrTime(&(pMsg->tTIMESTAMP)); /* use the current time! */ flags |= INTERNAL_MSG; -#ifdef USE_PTHREADS if(bRunningMultithreaded == 0) { /* not yet in queued mode */ iminternalAddMsg(pri, pMsg, flags); } else { @@ -2294,9 +2272,6 @@ dbgprintf("logmsgInternal: msg passed: '%s'\n", msg); logmsg(pri, pMsg, flags); MsgDestruct(pMsg); } -#else - iminternalAddMsg(pri, pMsg, flags); -#endif } /* This functions looks at the given message and checks if it matches the @@ -2611,13 +2586,7 @@ static void processMsg(msg_t *pMsg) } -#ifdef USE_PTHREADS -/* This block contains code that is only present when USE_PTHREADS is - * enabled. I plan to move it to some other file, but for the time - * being, I include it here because that saves me from the need to - * do so many external definitons. - * rgerhards, 2005-10-24 - */ +/* Start Threading-Related code */ /* shuts down the worker process. The worker will first finish * with the message queue. Control returns, when done. @@ -2666,87 +2635,6 @@ static void startWorker(void) } -static msgQueue *queueInit (void) -{ - msgQueue *q; - - q = (msgQueue *)malloc(sizeof(msgQueue)); - if (q == NULL) return (NULL); - if((q->pbuf = malloc(sizeof(void *) * iMainMsgQueueSize)) == NULL) { - free(q); - return NULL; - } - - q->empty = 1; - q->full = 0; - q->head = 0; - q->tail = 0; - q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); - pthread_mutex_init (q->mut, NULL); - q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); - pthread_cond_init (q->notFull, NULL); - q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); - pthread_cond_init (q->notEmpty, NULL); - - return (q); -} - -static void queueDelete (msgQueue *q) -{ - pthread_mutex_destroy (q->mut); - free (q->mut); - pthread_cond_destroy (q->notFull); - free (q->notFull); - pthread_cond_destroy (q->notEmpty); - free (q->notEmpty); - free(q->pbuf); - free (q); -} - - -/* In queueAdd() and queueDel() we have a potential race condition. If a message - * is dequeued and at the same time a message is enqueued and the queue is either - * full or empty, the full (or empty) indicator may be invalidly updated. HOWEVER, - * this does not cause any real problems. No queue pointers can be wrong. And even - * if one of the flags is set invalidly, that does not pose a real problem. If - * "full" is invalidly set, at mose one message might be lost, if we are already in - * a timeout situation (this is quite acceptable). And if "empty" is accidently set, - * the receiver will not continue the inner loop, but break out of the outer. So no - * harm is done at all. For this reason, I do not yet use a mutex to guard the two - * flags - there would be a notable performance hit with, IMHO, no gain in stability - * or functionality. But anyhow, now it's documented... - * rgerhards, 2007-09-20 - * NOTE: this comment does not really apply - the callers handle the mutex, so it - * *is* guarded. - */ -static void queueAdd (msgQueue *q, void* in) -{ - q->pbuf[q->tail] = in; - q->tail++; - if (q->tail == iMainMsgQueueSize) - q->tail = 0; - if (q->tail == q->head) - q->full = 1; - q->empty = 0; - - return; -} - -static void queueDel (msgQueue *q, msg_t **out) -{ - *out = (msg_t*) q->pbuf[q->head]; - - q->head++; - if (q->head == iMainMsgQueueSize) - q->head = 0; - if (q->head == q->tail) - q->empty = 1; - q->full = 0; - - return; -} - - /* The worker thread (so far, we have dual-threading, so only one * worker thread. Having more than one worker requires considerable * additional code review in regard to thread-safety. @@ -2770,7 +2658,7 @@ static void *singleWorker() } if(!fifo->empty) { /* dequeue element (still protected from mutex) */ - queueDel(fifo, &pMsg); + queueDel(fifo, (void*) &pMsg); assert(pMsg != NULL); pthread_mutex_unlock(fifo->mut); pthread_cond_signal (fifo->notFull); @@ -2793,19 +2681,13 @@ static void *singleWorker() } /* END threads-related code */ -#endif /* #ifdef USE_PTHREADS */ /* This method enqueues a message into the the message buffer. It also - * the worker thread, so that the message will be processed. If we are - * compiled without PTHREADS support, we simply use this method as - * an alias for processMsg(). + * the worker thread, so that the message will be processed. * See comment dated 2005-10-13 in logmsg() on multithreading. * rgerhards, 2005-10-24 */ -#ifndef USE_PTHREADS -#define enqueueMsg(x) processMsg((x)) -#else static void enqueueMsg(msg_t *pMsg) { int iRet; @@ -2844,7 +2726,6 @@ static void enqueueMsg(msg_t *pMsg) dbgprintf("EnqueueMsg signaled condition (%d)\n", iRet); } } -#endif /* #ifndef USE_PTHREADS */ /* Helper to parseRFCSyslogMsg. This function parses a field up to @@ -3636,11 +3517,9 @@ static void die(int sig) /* Free ressources and close connections */ freeSelectors(); -#ifdef USE_PTHREADS /* Worker threads are stopped by freeSelectors() */ queueDelete(pMsgQueue); /* delete fifo here! */ pMsgQueue = NULL; -#endif /* now clean up the listener part */ #ifdef SYSLOG_INET @@ -4132,9 +4011,7 @@ static void freeSelectors(void) * processed without draining the queue. That would lead to invalid * results. -- rgerhards, 2007-12-12 */ -# ifdef USE_PTHREADS stopWorker(); -# endif for(f = Files ; f != NULL ; f = f->f_next) { llExecFunc(&f->llActList, freeSelectorsActions, NULL); @@ -4240,9 +4117,7 @@ static void dbgPrintInitInfo(void) printf("Control character escape sequence prefix is '%c'.\n", cCCEscapeChar); -#ifdef USE_PTHREADS printf("Main queue size %d messages.\n", iMainMsgQueueSize); -#endif } @@ -4430,13 +4305,11 @@ static void init(void) dbgprintf("Clearing templates.\n"); tplDeleteNew(); -#ifdef USE_PTHREADS if(pMsgQueue != NULL) { dbgprintf("deleting message queue\n"); queueDelete(pMsgQueue); /* delete fifo here! */ pMsgQueue = NULL; } -#endif /* re-setting values to defaults (where applicable) */ /* TODO: once we have loadable modules, we must re-visit this code. The reason is @@ -4533,16 +4406,14 @@ static void init(void) } #endif -# ifdef USE_PTHREADS /* create message queue */ pMsgQueue = queueInit(); if(pMsgQueue == NULL) { - errno = 0; + errno = 0; /* TODO: check if that is possible without threads - I think we must give up... */ logerror("error: could not create message queue - running single-threaded!\n"); } startWorker(); -# endif Initialized = 1; @@ -5412,15 +5283,12 @@ int decode(uchar *name, struct code *codetab) extern void dbgprintf(char *fmt, ...) __attribute__((format(printf,1, 2))); void dbgprintf(char *fmt, ...) { -# ifdef USE_PTHREADS static int bWasNL = FALSE; -# endif va_list ap; if ( !(Debug && debugging_on) ) return; -# ifdef USE_PTHREADS /* The bWasNL handler does not really work. It works if no thread * switching occurs during non-NL messages. Else, things are messed * up. Anyhow, it works well enough to provide useful help during @@ -5435,7 +5303,6 @@ void dbgprintf(char *fmt, ...) fprintf(stdout, "%8.8d: ", (unsigned int) pthread_self()); } bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? TRUE : FALSE; -# endif va_start(ap, fmt); vfprintf(stdout, fmt, ap); va_end(ap); @@ -5856,10 +5723,8 @@ static void mainloop(void) int iTCPSess; #endif /* #ifdef SYSLOG_INET */ #ifdef BSD -#ifdef USE_PTHREADS struct timeval tvSelectTimeout; #endif -#endif while(!bFinished){ errno = 0; @@ -5956,7 +5821,6 @@ static void mainloop(void) #define MAIN_SELECT_TIMEVAL NULL #ifdef BSD -#ifdef USE_PTHREADS /* There seems to be a problem with BSD and threads. When running on * multiple threads, a signal will not cause the select call to be * interrrupted. I am not sure if this is by design or an bug (some @@ -5979,7 +5843,6 @@ static void mainloop(void) # undef MAIN_SELECT_TIMEVAL # define MAIN_SELECT_TIMEVAL &tvSelectTimeout #endif -#endif #ifdef SYSLOG_INET #define MAIN_SELECT_WRITEFDS (fd_set *) &writefds #else @@ -6085,9 +5948,7 @@ static rsRetVal loadBuildInModules(void) * is that rsyslog will terminate if we can not register our built-in config commands. * This, I think, is the right thing to do. -- rgerhards, 2007-07-31 */ -#ifdef USE_PTHREADS CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL)); -#endif CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL)); @@ -6122,11 +5983,6 @@ static void printVersion(void) { printf("rsyslogd %s, ", VERSION); printf("compiled with:\n"); -#ifdef USE_PTHREADS - printf("\tFEATURE_PTHREADS (dual-threading):\tYes\n"); -#else - printf("\tFEATURE_PTHREADS (dual-threading):\tNo\n"); -#endif #ifdef FEATURE_REGEXP printf("\tFEATURE_REGEXP:\t\t\t\tYes\n"); #else diff --git a/threads.c b/threads.c new file mode 100644 index 00000000..c3aee4b8 --- /dev/null +++ b/threads.c @@ -0,0 +1,123 @@ +/* threads.c + * + * This file implements threading support helpers (and maybe the thread object) + * for rsyslog. + * + * File begun on 2007-12-14 by RGerhards + * + * Copyright 2007 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> + +#include "syslogd.h" +#include "threads.h" + +int iMainMsgQueueSize; +msgQueue *pMsgQueue = NULL; + +msgQueue *queueInit (void) +{ + msgQueue *q; + + q = (msgQueue *)malloc(sizeof(msgQueue)); + if (q == NULL) return (NULL); + if((q->pbuf = malloc(sizeof(void *) * iMainMsgQueueSize)) == NULL) { + free(q); + return NULL; + } + + q->empty = 1; + q->full = 0; + q->head = 0; + q->tail = 0; + q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); + pthread_mutex_init (q->mut, NULL); + q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); + pthread_cond_init (q->notFull, NULL); + q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); + pthread_cond_init (q->notEmpty, NULL); + + return (q); +} + +void queueDelete (msgQueue *q) +{ + pthread_mutex_destroy (q->mut); + free (q->mut); + pthread_cond_destroy (q->notFull); + free (q->notFull); + pthread_cond_destroy (q->notEmpty); + free (q->notEmpty); + free(q->pbuf); + free (q); +} + + +/* In queueAdd() and queueDel() we have a potential race condition. If a message + * is dequeued and at the same time a message is enqueued and the queue is either + * full or empty, the full (or empty) indicator may be invalidly updated. HOWEVER, + * this does not cause any real problems. No queue pointers can be wrong. And even + * if one of the flags is set invalidly, that does not pose a real problem. If + * "full" is invalidly set, at mose one message might be lost, if we are already in + * a timeout situation (this is quite acceptable). And if "empty" is accidently set, + * the receiver will not continue the inner loop, but break out of the outer. So no + * harm is done at all. For this reason, I do not yet use a mutex to guard the two + * flags - there would be a notable performance hit with, IMHO, no gain in stability + * or functionality. But anyhow, now it's documented... + * rgerhards, 2007-09-20 + * NOTE: this comment does not really apply - the callers handle the mutex, so it + * *is* guarded. + */ +void queueAdd (msgQueue *q, void* in) +{ + q->pbuf[q->tail] = in; + q->tail++; + if (q->tail == iMainMsgQueueSize) + q->tail = 0; + if (q->tail == q->head) + q->full = 1; + q->empty = 0; + + return; +} + +void queueDel(msgQueue *q, void **out) +{ + *out = (void*) q->pbuf[q->head]; + + q->head++; + if (q->head == iMainMsgQueueSize) + q->head = 0; + if (q->head == q->tail) + q->empty = 1; + q->full = 0; + + return; +} + +/* + * vi:set ai: + */ diff --git a/threads.h b/threads.h new file mode 100644 index 00000000..5f3faab7 --- /dev/null +++ b/threads.h @@ -0,0 +1,47 @@ +/* Definition of the threading support module. + * + * Copyright 2007 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ + +#ifndef THREADS_H_INCLUDED +#define THREADS_H_INCLUDED + +/* this is the first approach to a queue, this time with static + * memory. + */ +typedef struct { + void** pbuf; + long head, tail; + int full, empty; + pthread_mutex_t *mut; + pthread_cond_t *notFull, *notEmpty; +} msgQueue; + +/* prototypes */ +msgQueue *queueInit (void); +void queueDelete (msgQueue *q); +void queueAdd (msgQueue *q, void* in); +void queueDel (msgQueue *q, void **out); + +/* go-away's */ +extern int iMainMsgQueueSize; +extern msgQueue *pMsgQueue; + +#endif /* #ifndef THREADS_H_INCLUDED */ |