diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2005-10-25 15:50:08 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2005-10-25 15:50:08 +0000 |
commit | 755ec717d9cd6e25767584a528034ce0ab0ee5aa (patch) | |
tree | 20fd91683a496a2f8868650f76c7f408c1e271cc | |
parent | 090627b79c1e801bc11c706f62628c7517dd3596 (diff) | |
download | rsyslog-755ec717d9cd6e25767584a528034ce0ab0ee5aa.tar.gz rsyslog-755ec717d9cd6e25767584a528034ce0ab0ee5aa.tar.xz rsyslog-755ec717d9cd6e25767584a528034ce0ab0ee5aa.zip |
dual-threading code looks now fairly complete and stable
-rw-r--r-- | NEWS | 2 | ||||
-rw-r--r-- | syslogd.c | 1373 | ||||
-rw-r--r-- | template.c | 4 |
3 files changed, 714 insertions, 665 deletions
@@ -21,6 +21,8 @@ Version 1.12.0 (RGer), 2005-10-20 - fixed an one-too-low memory allocation in the TCP sender. Could result in rsyslogd dumping core. - fixed a bug with regular expression support (thanks to Andres Riancho) +- a little bit of code restructuring (especially main(), which was + horribly large) --------------------------------------------------------------------------- Version 1.11.1 (RGer), 2005-10-19 - support for BSD-style program name and host blocks @@ -343,8 +343,7 @@ int funix[MAXFUNIX] = { -1, }; /* read-only after startup */ #define TABLE_ALLPRI 0xFF /* Value to indicate all priorities in f_pmask */ #define LOG_MARK LOG_MAKEPRI(LOG_NFACILITIES, 0) /* mark "facility" */ -/* - * Flags to logmsg(). +/* Flags to logmsg(). */ /* NO LONGER NEEDED: #define IGN_CONS 0x001 * don't print on console */ #define SYNC_FILE 0x002 /* do fsync on file after printing */ @@ -591,6 +590,7 @@ typedef struct { pthread_cond_t *notFull, *notEmpty; } msgQueue; +int bRunningMultithreaded = 0; /* Is this program running in multithreaded mode? */ msgQueue *pMsgQueue = NULL; static pthread_t thrdWorker; static int bGlblDone = 0; @@ -794,33 +794,22 @@ static char template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility /* up to the next comment, prototypes that should be removed by reordering */ #ifdef USE_PTHREADS -msgQueue *queueInit (void); +static msgQueue *queueInit (void); static void *singleWorker(void *vParam); /* REMOVEME later 2005-10-24 */ #endif /* Function prototypes. */ -int main(int argc, char **argv); static char **crunch_list(char *list); -static int usage(void); -static void untty(void); static void printchopped(char *hname, char *msg, int len, int fd, int iSourceType); static void printline(char *hname, char *msg, int iSource); static void logmsg(int pri, struct msg*, int flags); static void fprintlog(register struct filed *f); -static void endtty(); static void wallmsg(register struct filed *f); static void reapchild(); static const char *cvthname(struct sockaddr_in *f); -static void domark(void); -static void domarkAlarmHdlr(); static void debug_switch(); static void logerror(char *type); static void logerrorInt(char *type, int errCode); static void logerrorSz(char *type, char *errMsg); -static void die(int sig); -#ifndef TESTING -static void doexit(int sig); -#endif -static void init(); static rsRetVal cfline(char *line, register struct filed *f); static int decode(char *name, struct code *codetab); static void sighup_handler(); @@ -2784,617 +2773,6 @@ static char *MsgGetProp(struct msg *pMsg, struct templateEntry *pTpe, */ -int main(int argc, char **argv) -{ register int i; - register char *p; -#if !defined(__GLIBC__) - int len, num_fds; -#else /* __GLIBC__ */ -#ifndef TESTING - size_t len; -#endif - int num_fds; -#endif /* __GLIBC__ */ - fd_set readfds; -#ifdef SYSLOG_INET - fd_set writefds; - struct filed *f; -#endif - -#ifdef MTRACE - mtrace(); /* this is a debug aid for leak detection - either remove - * or put in conditional compilation. 2005-01-18 RGerhards */ -#endif - -#ifndef TESTING - int fd; -#ifdef SYSLOG_INET - struct sockaddr_in frominet; - char *from; - int iTCPSess; -#endif - pid_t ppid = getpid(); -#endif - int ch; - struct hostent *hent; - - char line[MAXLINE +1]; - extern int optind; - extern char *optarg; - int maxfds; - char *pTmp; - -#ifndef TESTING - chdir ("/"); -#endif - for (i = 1; i < MAXFUNIX; i++) { - funixn[i] = ""; - funix[i] = -1; - } - - while ((ch = getopt(argc, argv, "a:dhi:f:l:m:nop:r:s:t:vw")) != EOF) - switch((char)ch) { - case 'a': - if (nfunix < MAXFUNIX) - if(*optarg == ':') { - funixParseHost[nfunix] = 1; - funixn[nfunix++] = optarg+1; - } - else { - funixParseHost[nfunix] = 0; - funixn[nfunix++] = optarg; - } - else - fprintf(stderr, "Out of descriptors, ignoring %s\n", optarg); - break; - case 'd': /* debug */ - Debug = 1; - break; - case 'f': /* configuration file */ - ConfFile = optarg; - break; - case 'h': - NoHops = 0; - break; - case 'i': /* pid file name */ - PidFile = optarg; - break; - case 'l': - if (LocalHosts) { - fprintf (stderr, "Only one -l argument allowed," \ - "the first one is taken.\n"); - break; - } - LocalHosts = crunch_list(optarg); - break; - case 'm': /* mark interval */ - MarkInterval = atoi(optarg) * 60; - break; - case 'n': /* don't fork */ - NoFork = 1; - break; - case 'o': /* omit local logging (/dev/log) */ - startIndexUxLocalSockets = 1; - break; - case 'p': /* path to regular log socket */ - funixn[0] = optarg; - break; - case 'r': /* accept remote messages */ - AcceptRemote = 1; - LogPort = atoi(optarg); - break; - case 's': - if (StripDomains) { - fprintf (stderr, "Only one -s argument allowed," \ - "the first one is taken.\n"); - break; - } - StripDomains = crunch_list(optarg); - break; - case 't': /* enable tcp logging */ - bEnableTCP = -1; - TCPLstnPort = atoi(optarg); - break; - case 'v': - printf("rsyslogd %s.%s, ", VERSION, PATCHLEVEL); - printf("compiled with:\n"); -#ifdef USE_PTHREADS - printf("\tFEATURE_PTHREADS (dual-threading)\n"); -#endif -#ifdef FEATURE_REGEXP - printf("\tFEATURE_REGEXP\n"); -#endif -#ifdef WITH_DB - printf("\tFEATURE_DB\n"); -#endif -#ifndef NOLARGEFILE - printf("\tFEATURE_LARGEFILE\n"); -#endif -#ifdef SYSLOG_INET - printf("\tSYSLOG_INET (Internet/remote support)\n"); -#endif -#ifndef NDEBUG - printf("\tFEATURE_DEBUG (debug build, slow code)\n"); -#endif - printf("\nSee http://www.rsyslog.com for more information.\n"); - exit(0); /* exit for -v option - so this is a "good one" */ - case 'w': /* disable disallowed host warnigs */ - option_DisallowWarning = 0; - break; - case '?': - default: - usage(); - } - if ((argc -= optind)) - usage(); - -#ifndef TESTING - if ( !(Debug || NoFork) ) - { - dprintf("Checking pidfile.\n"); - if (!check_pid(PidFile)) - { - signal (SIGTERM, doexit); - if (fork()) { - /* - * Parent process - */ - sleep(300); - /* - * Not reached unless something major went wrong. 5 - * minutes should be a fair amount of time to wait. - * Please note that this procedure is important since - * the father must not exit before syslogd isn't - * initialized or the klogd won't be able to flush its - * logs. -Joey - */ - exit(1); /* "good" exit - after forking, not diasabling anything */ - } - num_fds = getdtablesize(); - for (i= 0; i < num_fds; i++) - (void) close(i); - untty(); - } - else - { - fputs("rsyslogd: Already running.\n", stderr); - exit(1); /* "good" exit, done if syslogd is already running */ - } - } - else -#endif - debugging_on = 1; -#ifndef SYSV - else - setlinebuf(stdout); -#endif - -#ifndef TESTING - /* tuck my process id away */ - if ( !Debug ) - { - dprintf("Writing pidfile.\n"); - if (!check_pid(PidFile)) - { - if (!write_pid(PidFile)) - { - dprintf("Can't write pid.\n"); - exit(1); /* exit during startup - questionable */ - } - } - else - { - dprintf("Pidfile (and pid) already exist.\n"); - exit(1); /* exit during startup - questionable */ - } - } /* if ( !Debug ) */ -#endif - myPid = getpid(); /* save our pid for further testing (also used for messages) */ - - /* initialize the default templates - * we use template names with a SP in front - these - * can NOT be generated via the configuration file - */ - pTmp = template_TraditionalFormat; - tplAddLine(" TradFmt", &pTmp); - pTmp = template_WallFmt; - tplAddLine(" WallFmt", &pTmp); - pTmp = template_StdFwdFmt; - tplAddLine(" StdFwdFmt", &pTmp); - pTmp = template_StdUsrMsgFmt; - tplAddLine(" StdUsrMsgFmt", &pTmp); - pTmp = template_StdDBFmt; - tplAddLine(" StdDBFmt", &pTmp); - - /* prepare emergency logging system */ - - consfile.f_type = F_CONSOLE; - (void) strcpy(consfile.f_un.f_fname, ctty); - cflineSetTemplateAndIOV(&consfile, " TradFmt"); - (void) gethostname(LocalHostName, sizeof(LocalHostName)); - if ( (p = strchr(LocalHostName, '.')) ) { - *p++ = '\0'; - LocalDomain = p; - } - else - { - LocalDomain = ""; - - /* - * It's not clearly defined whether gethostname() - * should return the simple hostname or the fqdn. A - * good piece of software should be aware of both and - * we want to distribute good software. Joey - * - * Good software also always checks its return values... - * If syslogd starts up before DNS is up & /etc/hosts - * doesn't have LocalHostName listed, gethostbyname will - * return NULL. - */ - hent = gethostbyname(LocalHostName); - if ( hent ) - snprintf(LocalHostName, sizeof(LocalHostName), "%s", hent->h_name); - - if ( (p = strchr(LocalHostName, '.')) ) - { - *p++ = '\0'; - LocalDomain = p; - } - } - - /* - * Convert to lower case to recognize the correct domain laterly - */ - for (p = (char *)LocalDomain; *p ; p++) - if (isupper(*p)) - *p = tolower(*p); - - (void) signal(SIGTERM, die); - (void) signal(SIGINT, Debug ? die : SIG_IGN); - (void) signal(SIGQUIT, Debug ? die : SIG_IGN); - (void) signal(SIGCHLD, reapchild); - (void) signal(SIGALRM, domarkAlarmHdlr); - (void) signal(SIGUSR1, Debug ? debug_switch : SIG_IGN); - (void) signal(SIGPIPE, SIG_IGN); - (void) signal(SIGXFSZ, SIG_IGN); /* do not abort if 2gig file limit is hit */ - (void) alarm(TIMERINTVL); - - /* Create a partial message table for all file descriptors. */ - num_fds = getdtablesize(); - dprintf("Allocated parts table for %d file descriptors.\n", num_fds); - if ((parts = (char **) malloc(num_fds * sizeof(char *))) == NULL) - { - logerror("Cannot allocate memory for message parts table."); - die(0); - } - for(i= 0; i < num_fds; ++i) - parts[i] = NULL; - - dprintf("Starting.\n"); - init(); -#ifndef TESTING - if(Debug) { - dprintf("Debugging enabled, SIGUSR1 to turn off debugging.\n"); - debugging_on = 1; - } - /* - * Send a signal to the parent to it can terminate. - */ - if (myPid != ppid) - kill (ppid, SIGTERM); -#endif - /* END OF INTIALIZATION - * ... but keep in mind that we might do a restart and thus init() might - * be called again. If that happens, we must shut down all active threads, - * do the init() and then restart things. - * rgerhards, 2005-10-24 - */ -#ifdef USE_PTHREADS - /* create message queue */ - pMsgQueue = queueInit(); - if(pMsgQueue == NULL) { - fprintf (stderr, "error: could not create message queue - terminating.\n"); - exit (1); - } - /* start up worker thread */ - { int i; - i = pthread_create(&thrdWorker, NULL, singleWorker, NULL); - dprintf("worker thread started with state %d\n", i); - } -#endif - - /* --------------------- Main loop begins here. ----------------------------------------- */ - for (;;) { - int nfds; - errno = 0; - FD_ZERO(&readfds); - maxfds = 0; -#ifdef SYSLOG_UNIXAF -#ifndef TESTING - /* - * Add the Unix Domain Sockets to the list of read - * descriptors. - * rgerhards 2005-08-01: we must now check if there are - * any local sockets to listen to at all. If the -o option - * is given without -a, we do not need to listen at all.. - */ - /* Copy master connections */ - for (i = startIndexUxLocalSockets; i < nfunix; i++) { - if (funix[i] != -1) { - FD_SET(funix[i], &readfds); - if (funix[i]>maxfds) maxfds=funix[i]; - } - } -#endif -#endif -#ifdef SYSLOG_INET -#ifndef TESTING - /* - * Add the Internet Domain Socket to the list of read - * descriptors. - */ - if ( InetInuse && AcceptRemote ) { - FD_SET(inetm, &readfds); - if (inetm>maxfds) maxfds=inetm; - dprintf("Listening on syslog UDP port.\n"); - } - - /* Add the TCP socket to the list of read descriptors. - */ - if(bEnableTCP && sockTCPLstn != -1) { - FD_SET(sockTCPLstn, &readfds); - if (sockTCPLstn>maxfds) maxfds=sockTCPLstn; - dprintf("Listening on syslog TCP port.\n"); - /* do the sessions */ - iTCPSess = TCPSessGetNxtSess(-1); - while(iTCPSess != -1) { - int fd; - fd = TCPSessions[iTCPSess].sock; - dprintf("Adding TCP Session %d\n", fd); - FD_SET(fd, &readfds); - if (fd>maxfds) maxfds=fd; - /* now get next... */ - iTCPSess = TCPSessGetNxtSess(iTCPSess); - } - } - - /* TODO: activate the code below only if we actually need to check - * for outstanding writefds. - */ - if(1) { - /* Now add the TCP output sockets to the writefds set. This implementation - * is not optimal (performance-wise) and it should be replaced with something - * better in the longer term. I've not yet done this, as this code is - * scheduled to be replaced after the liblogging integration. - * rgerhards 2005-07-20 - */ - FD_ZERO(&writefds); - for (f = Files; f != NULL ; f = f->f_next) { - if( (f->f_type == F_FORW) - && (f->f_un.f_forw.protocol == FORW_TCP) - && (TCPSendGetStatus(f) == TCP_SEND_CONNECTING)) { - FD_SET(f->f_file, &writefds); - if(f->f_file > maxfds) - maxfds = f->f_file; - } - } - } -#endif -#endif -#ifdef TESTING - FD_SET(fileno(stdin), &readfds); - if (fileno(stdin) > maxfds) maxfds = fileno(stdin); - - dprintf("Listening on stdin. Press Ctrl-C to interrupt.\n"); -#endif - - if ( debugging_on ) { - dprintf("----------------------------------------\nCalling select, active file descriptors (max %d): ", maxfds); - for (nfds= 0; nfds <= maxfds; ++nfds) - if ( FD_ISSET(nfds, &readfds) ) - dprintf("%d ", nfds); - dprintf("\n"); - } -#ifdef SYSLOG_INET - nfds = select(maxfds+1, (fd_set *) &readfds, (fd_set *) &writefds, - (fd_set *) NULL, (struct timeval *) NULL); -#else - nfds = select(maxfds+1, (fd_set *) &readfds, (fd_set *) NULL, - (fd_set *) NULL, (struct timeval *) NULL); -#endif - if(bRequestDoMark) { - domark(); - bRequestDoMark = 0; - /* We do not use continue, because domark() is carried out - * only when something else happened. - */ - } - if (restart) { - dprintf("\nReceived SIGHUP, reloading rsyslogd.\n"); - init(); - restart = 0; - continue; - } - if (nfds == 0) { - dprintf("No select activity.\n"); - continue; - } - if (nfds < 0) { - if (errno != EINTR) - logerror("select"); - dprintf("Select interrupted.\n"); - continue; - } - - if ( debugging_on ) - { - dprintf("\nSuccessful select, descriptor count = %d, " \ - "Activity on: ", nfds); - for (nfds= 0; nfds <= maxfds; ++nfds) - if ( FD_ISSET(nfds, &readfds) ) - dprintf("%d ", nfds); - dprintf(("\n")); - } - -#ifndef TESTING -#ifdef SYSLOG_INET - /* TODO: activate the code below only if we actually need to check - * for outstanding writefds. - */ - if(1) { - /* Now check the TCP send sockets. So far, we only see if they become - * writable and then change their internal status. No real async - * writing is currently done. This code will be replaced once liblogging - * is used, thus we try not to focus too much on it. - * - * IMPORTANT: With the current code, the writefds must be checked first, - * because the readfds might have messages to be forwarded, which - * rely on the status setting that is done here! - * - * rgerhards 2005-07-20 - */ - for (f = Files; f != NULL ; f = f->f_next) { - if( (f->f_type == F_FORW) - && (f->f_un.f_forw.protocol == FORW_TCP) - && (TCPSendGetStatus(f) == TCP_SEND_CONNECTING) - && (FD_ISSET(f->f_file, &writefds))) { - dprintf("tcp send socket %d ready for writing.\n", f->f_file); - TCPSendSetStatus(f, TCP_SEND_READY); - /* Send stored message (if any) */ - if(f->f_un.f_forw.savedMsg != NULL) { - if(TCPSend(f, f->f_un.f_forw.savedMsg) != 0) { - /* error! */ - f->f_type = F_FORW_SUSP; - errno = 0; - logerror("error forwarding via tcp, suspending..."); - } - free(f->f_un.f_forw.savedMsg); - f->f_un.f_forw.savedMsg = NULL; - } - } - } - } -#endif /* #ifdef SYSLOG_INET */ -#ifdef SYSLOG_UNIXAF - for (i = 0; i < nfunix; i++) { - if ((fd = funix[i]) != -1 && FD_ISSET(fd, &readfds)) { - int iRcvd; - memset(line, '\0', sizeof(line)); - iRcvd = recv(fd, line, MAXLINE - 2, 0); - dprintf("Message from UNIX socket: #%d\n", fd); - if (iRcvd > 0) { - line[iRcvd] = line[iRcvd+1] = '\0'; - printchopped(LocalHostName, line, iRcvd + 2, fd, funixParseHost[i]); - } else if (iRcvd < 0 && errno != EINTR) { - dprintf("UNIX socket error: %d = %s.\n", \ - errno, strerror(errno)); - logerror("recvfrom UNIX"); - } - } - } -#endif - -#ifdef SYSLOG_INET - if (InetInuse && AcceptRemote && FD_ISSET(inetm, &readfds)) { - len = sizeof(frominet); - memset(line, '\0', sizeof(line)); - i = recvfrom(finet, line, MAXLINE - 2, 0, \ - (struct sockaddr *) &frominet, &len); - dprintf("Message from UDP inetd socket: #%d, host: %s\n", - inetm, inet_ntoa(frominet.sin_addr)); - if (i > 0) { - from = (char *)cvthname(&frominet); - /* Here we check if a host is permitted to send us - * syslog messages. If it isn't, we do not further - * process the message but log a warning (if we are - * configured to do this). - * rgerhards, 2005-09-26 - */ - if(isAllowedSender(pAllowedSenders_UDP, &frominet)) { - line[i] = line[i+1] = '\0'; - printchopped(from, line, i + 2, finet, 1); - } else { - if(option_DisallowWarning) { - logerrorSz("UDP message from disallowed sender %s discarded", - from); - } - } - } else if (i < 0 && errno != EINTR && errno != EAGAIN) { - /* see link below why we check EAGAIN: - * http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=188194 - */ - dprintf("INET socket error: %d = %s.\n", \ - errno, strerror(errno)); - logerror("recvfrom inet"); - /* should be harmless now that we set - * BSDCOMPAT on the socket */ - sleep(1); - } - } - - if(bEnableTCP && sockTCPLstn != -1) { - /* Now check for TCP input */ - if(FD_ISSET(sockTCPLstn, &readfds)) { - dprintf("New connect on TCP inetd socket: #%d\n", sockTCPLstn); - TCPSessAccept(); - } - - /* now check the sessions */ - /* TODO: optimize the whole thing. We could stop enumerating as - * soon as we have found all sockets flagged as active. */ - iTCPSess = TCPSessGetNxtSess(-1); - while(iTCPSess != -1) { - int fd; - int state; - fd = TCPSessions[iTCPSess].sock; - if(FD_ISSET(fd, &readfds)) { - char buf[MAXLINE]; - dprintf("tcp session socket with new data: #%d\n", fd); - - /* Receive message */ - state = recv(fd, buf, sizeof(buf), 0); - if(state == 0) { - /* Session closed */ - TCPSessClose(iTCPSess); - } else if(state == -1) { - char errmsg[128]; - snprintf(errmsg, sizeof(errmsg)/sizeof(char), - "TCP session %d will be closed, error ignored", - fd); - logerror(errmsg); - TCPSessClose(iTCPSess); - } else { - /* valid data received, process it! */ - TCPSessDataRcvd(iTCPSess, buf, state); - } - } - iTCPSess = TCPSessGetNxtSess(iTCPSess); - } - } - -#endif -#else - if ( FD_ISSET(fileno(stdin), &readfds) ) { - dprintf("Message from stdin.\n"); - memset(line, '\0', sizeof(line)); - line[0] = '.'; - parts[fileno(stdin)] = NULL; - i = read(fileno(stdin), line, MAXLINE); - if (i > 0) { - printchopped(LocalHostName, line, i+1, - fileno(stdin), 0); - } else if (i < 0) { - if (errno != EINTR) { - logerror("stdin"); - } - } - FD_CLR(fileno(stdin), &readfds); - } - -#endif - } -} - static int usage(void) { fprintf(stderr, "usage: rsyslogd [-dhvw] [-l hostlist] [-m markinterval] [-n] [-p path]\n" \ @@ -4076,7 +3454,55 @@ static void processMsg(struct msg *pMsg) * do so many external definitons. * rgerhards, 2005-10-24 */ -msgQueue *queueInit (void) + +/* shuts down the worker process. The worker will first finish + * with the message queue. Control returns, when done. + * This function is intended to be called during syslogd shutdown + * AND restat (init()!). + * rgerhards, 2005-10-25 + */ +static void stopWorker(void) +{ + if(bRunningMultithreaded) { + /* we could run single-threaded if there was an error + * during startup. Then, we obviously do not need to + * do anything to stop the worker ;) + */ + dprintf("Initiating worker thread shutdown sequence...\n"); + /* We are now done with all messages, so we need to wake up the + * worker thread and then wait for it to finish. + */ + bGlblDone = 1; + /* It's actually not "not empty" below but awaking the worker. The worker + * then finds out that it shall terminate and does so. + */ + pthread_cond_signal(pMsgQueue->notEmpty); + pthread_join(thrdWorker, NULL); + bRunningMultithreaded = 0; + dprintf("Worker thread terminated.\n"); + } +} + + +/* starts the worker thread. It must be made sure that the queue is + * already existing and the worker is NOT already running. + * rgerhards 2005-10-25 + */ +static void startWorker(void) +{ + int i; + if(pMsgQueue == NULL) { + bGlblDone = 0; /* we are NOT done (else worker would immediately terminate) */ + i = pthread_create(&thrdWorker, NULL, singleWorker, NULL); + dprintf("Worker thread started with state %d.\n", i); + bRunningMultithreaded = 1; + } else { + dprintf("message queue not existing, remaining single-threaded.\n"); + } +} + + +static msgQueue *queueInit (void) { msgQueue *q; @@ -4097,7 +3523,7 @@ msgQueue *queueInit (void) return (q); } -void queueDelete (msgQueue *q) +static void queueDelete (msgQueue *q) { pthread_mutex_destroy (q->mut); free (q->mut); @@ -4108,7 +3534,7 @@ void queueDelete (msgQueue *q) free (q); } -void queueAdd (msgQueue *q, void* in) +static void queueAdd (msgQueue *q, void* in) { q->buf[q->tail] = in; q->tail++; @@ -4121,7 +3547,7 @@ void queueAdd (msgQueue *q, void* in) return; } -void queueDel (msgQueue *q, struct msg **out) +static void queueDel (msgQueue *q, struct msg **out) { *out = (struct msg*) q->buf[q->head]; @@ -4165,6 +3591,8 @@ static void *singleWorker(void *vParam) MsgDestruct(pMsg); /* If you need a delay for testing, here do a */ /* sleep(1); */ + } else { /* the mutex must be unlocked in any case (important for termination) */ + pthread_mutex_unlock(fifo->mut); } if(debugging_on && bGlblDone && !fifo->empty) dprintf("Worker does not yet terminate because it still has messages to process.\n"); @@ -4195,7 +3623,7 @@ static void enqueueMsg(struct msg *pMsg) assert(pMsg != NULL); - if(fifo == NULL) { + if(bRunningMultithreaded == 0) { /* multi-threading is not yet initialized, happens e.g. * during startup and restart. rgerhards, 2005-10-25 */ @@ -4376,6 +3804,10 @@ void logmsg(int pri, struct msg *pMsg, int flags) * * To aid this functionality, I am moving the rest of the code (the actual * consumer) to its own method, now called "processMsg()". + * + * rgerhards, 2005-10-25: as of now, the dual-threading code is now in place. + * It is an optional feature and even when enabled, rsyslogd will run single-threaded + * if it gets any errors during thread creation. */ pMsg->msgFlags = flags; @@ -5053,7 +4485,6 @@ void endutent(void) * Adjust the size of a variable to prevent a buffer overflow * should _PATH_DEV ever contain something different than "/dev/". */ - static void wallmsg(register struct filed *f) { @@ -5167,8 +4598,7 @@ static void reapchild() errno = saved_errno; } -/* - * Return a printable representation of a host address. +/* Return a printable representation of a host address. */ static const char *cvthname(struct sockaddr_in *f) { @@ -5187,15 +4617,13 @@ static const char *cvthname(struct sockaddr_in *f) inet_ntoa(f->sin_addr)); return (inet_ntoa(f->sin_addr)); } - /* - * Convert to lower case, just like LocalDomain above + /* Convert to lower case, just like LocalDomain above */ for (p = (char *)hp->h_name; *p ; p++) if (isupper(*p)) *p = tolower(*p); - /* - * Notice that the string still contains the fqdn, but your + /* Notice that the string still contains the fqdn, but your * hostname and domain are separated by a '\0'. */ if ((p = strchr(hp->h_name, '.'))) { @@ -5318,8 +4746,7 @@ static void logerrorInt(char *type, int errCode) return; } -/* - * Print syslogd errors some place. +/* Print syslogd errors some place. */ static void logerror(char *type) { @@ -5379,19 +4806,9 @@ static void die(int sig) } #ifdef USE_PTHREADS - /* We are now done with all messages, so we need to wake up the - * worker thread and then wait for it to finish. - */ - /* TODO: this code is not really working. It's just good as a test - * harness!! It dumps *at least* because we have no qeuue! - */ - bGlblDone = 1; - /* It's actually not "not empty" below but awaking the worker. The worker - * then finds out that it shall terminate and does so. - */ - pthread_cond_signal(pMsgQueue->notEmpty); - pthread_join(thrdWorker, NULL); - /* delete fifo here! */ + stopWorker(); + queueDelete(pMsgQueue); /* delete fifo here! */ + pMsgQueue = 0; #endif @@ -5634,8 +5051,7 @@ void cfsysline(char *p) } -/* - * INIT -- Initialize syslogd from configuration table +/* INIT -- Initialize syslogd from configuration table */ static void init() { @@ -5681,17 +5097,19 @@ static void init() dprintf("Called init.\n"); Initialized = 0; if(Files != NULL) { + struct filed *fPrev; dprintf("Initializing log structures.\n"); f = Files; while (f != NULL) { /* flush any pending output */ - if (f->f_prevcount) + if (f->f_prevcount) { /* rgerhards: 2004-11-09: I am now changing it, but * I am not sure it is correct as done. * TODO: verify later! */ fprintlog(f); + } /* free iovec if it was allocated */ if(f->f_iov != NULL) { @@ -5725,8 +5143,9 @@ static void init() } # endif /* done with this entry, we now need to delete itself */ + fPrev = f; f = f->f_next; - free(f); + free(fPrev); } /* Reflect the deletion of the Files linked list. */ @@ -5879,8 +5298,9 @@ static void init() if(Debug) { printf("Active selectors:\n"); - for (f = Files; f; f = f->f_next) { - if (f->f_type != F_UNUSED) { + for (f = Files; f != NULL ; f = f->f_next) { + if (1) { + //if (f->f_type != F_UNUSED) { if(f->pCSProgNameComp != NULL) printf("tag: '%s'\n", rsCStrGetSzStr(f->pCSProgNameComp)); if(f->eHostnameCmpMode != HN_NO_COMP) @@ -6990,7 +6410,6 @@ void dprintf(char *fmt, ...) * a flag variable which will tell the main loop to go through a restart. */ void sighup_handler() - { restart = 1; signal(SIGHUP, sighup_handler); @@ -7019,9 +6438,7 @@ static void initMySQL(register struct filed *f) we call the error handler */ if(iCounter) DBErrorHandler(f); - } - else - { + } else { f->f_timeResumeOnError = 0; /* We have a working db connection */ dprintf("connected successfully to db\n"); } @@ -7238,6 +6655,638 @@ int getSubString(char **ppSrc, char *pDst, size_t DstSize, char cSep) return iErr; } + + +static void mainloop(void) +{ + int i; +#if !defined(__GLIBC__) + int len, num_fds; +#else /* __GLIBC__ */ +#ifndef TESTING + size_t len; +#endif +#endif /* __GLIBC__ */ + fd_set readfds; +#ifdef SYSLOG_INET + fd_set writefds; + struct filed *f; +#endif + +#ifndef TESTING + int fd; +#ifdef SYSLOG_INET + struct sockaddr_in frominet; + char *from; + int iTCPSess; +#endif +#endif + + char line[MAXLINE +1]; + int maxfds; + + /* --------------------- Main loop begins here. ----------------------------------------- */ + for (;;) { + int nfds; + errno = 0; + FD_ZERO(&readfds); + maxfds = 0; +#ifdef SYSLOG_UNIXAF +#ifndef TESTING + /* + * Add the Unix Domain Sockets to the list of read + * descriptors. + * rgerhards 2005-08-01: we must now check if there are + * any local sockets to listen to at all. If the -o option + * is given without -a, we do not need to listen at all.. + */ + /* Copy master connections */ + for (i = startIndexUxLocalSockets; i < nfunix; i++) { + if (funix[i] != -1) { + FD_SET(funix[i], &readfds); + if (funix[i]>maxfds) maxfds=funix[i]; + } + } +#endif +#endif +#ifdef SYSLOG_INET +#ifndef TESTING + /* + * Add the Internet Domain Socket to the list of read + * descriptors. + */ + if ( InetInuse && AcceptRemote ) { + FD_SET(inetm, &readfds); + if (inetm>maxfds) maxfds=inetm; + dprintf("Listening on syslog UDP port.\n"); + } + + /* Add the TCP socket to the list of read descriptors. + */ + if(bEnableTCP && sockTCPLstn != -1) { + FD_SET(sockTCPLstn, &readfds); + if (sockTCPLstn>maxfds) maxfds=sockTCPLstn; + dprintf("Listening on syslog TCP port.\n"); + /* do the sessions */ + iTCPSess = TCPSessGetNxtSess(-1); + while(iTCPSess != -1) { + int fd; + fd = TCPSessions[iTCPSess].sock; + dprintf("Adding TCP Session %d\n", fd); + FD_SET(fd, &readfds); + if (fd>maxfds) maxfds=fd; + /* now get next... */ + iTCPSess = TCPSessGetNxtSess(iTCPSess); + } + } + + /* TODO: activate the code below only if we actually need to check + * for outstanding writefds. + */ + if(1) { + /* Now add the TCP output sockets to the writefds set. This implementation + * is not optimal (performance-wise) and it should be replaced with something + * better in the longer term. I've not yet done this, as this code is + * scheduled to be replaced after the liblogging integration. + * rgerhards 2005-07-20 + */ + FD_ZERO(&writefds); + for (f = Files; f != NULL ; f = f->f_next) { + if( (f->f_type == F_FORW) + && (f->f_un.f_forw.protocol == FORW_TCP) + && (TCPSendGetStatus(f) == TCP_SEND_CONNECTING)) { + FD_SET(f->f_file, &writefds); + if(f->f_file > maxfds) + maxfds = f->f_file; + } + } + } +#endif +#endif +#ifdef TESTING + FD_SET(fileno(stdin), &readfds); + if (fileno(stdin) > maxfds) maxfds = fileno(stdin); + + dprintf("Listening on stdin. Press Ctrl-C to interrupt.\n"); +#endif + + if ( debugging_on ) { + dprintf("----------------------------------------\nCalling select, active file descriptors (max %d): ", maxfds); + for (nfds= 0; nfds <= maxfds; ++nfds) + if ( FD_ISSET(nfds, &readfds) ) + dprintf("%d ", nfds); + dprintf("\n"); + } +#ifdef SYSLOG_INET + nfds = select(maxfds+1, (fd_set *) &readfds, (fd_set *) &writefds, + (fd_set *) NULL, (struct timeval *) NULL); +#else + nfds = select(maxfds+1, (fd_set *) &readfds, (fd_set *) NULL, + (fd_set *) NULL, (struct timeval *) NULL); +#endif + if(bRequestDoMark) { + domark(); + bRequestDoMark = 0; + /* We do not use continue, because domark() is carried out + * only when something else happened. + */ + } + if (restart) { + dprintf("\nReceived SIGHUP, reloading rsyslogd.\n"); +# ifdef USE_PTHREADS + stopWorker(); +# endif + init(); +# ifdef USE_PTHREADS + startWorker(); +# endif + restart = 0; + continue; + } + if (nfds == 0) { + dprintf("No select activity.\n"); + continue; + } + if (nfds < 0) { + if (errno != EINTR) + logerror("select"); + dprintf("Select interrupted.\n"); + continue; + } + + if ( debugging_on ) + { + dprintf("\nSuccessful select, descriptor count = %d, " \ + "Activity on: ", nfds); + for (nfds= 0; nfds <= maxfds; ++nfds) + if ( FD_ISSET(nfds, &readfds) ) + dprintf("%d ", nfds); + dprintf(("\n")); + } + +#ifndef TESTING +#ifdef SYSLOG_INET + /* TODO: activate the code below only if we actually need to check + * for outstanding writefds. + */ + if(1) { + /* Now check the TCP send sockets. So far, we only see if they become + * writable and then change their internal status. No real async + * writing is currently done. This code will be replaced once liblogging + * is used, thus we try not to focus too much on it. + * + * IMPORTANT: With the current code, the writefds must be checked first, + * because the readfds might have messages to be forwarded, which + * rely on the status setting that is done here! + * + * rgerhards 2005-07-20 + */ + for (f = Files; f != NULL ; f = f->f_next) { + if( (f->f_type == F_FORW) + && (f->f_un.f_forw.protocol == FORW_TCP) + && (TCPSendGetStatus(f) == TCP_SEND_CONNECTING) + && (FD_ISSET(f->f_file, &writefds))) { + dprintf("tcp send socket %d ready for writing.\n", f->f_file); + TCPSendSetStatus(f, TCP_SEND_READY); + /* Send stored message (if any) */ + if(f->f_un.f_forw.savedMsg != NULL) { + if(TCPSend(f, f->f_un.f_forw.savedMsg) != 0) { + /* error! */ + f->f_type = F_FORW_SUSP; + errno = 0; + logerror("error forwarding via tcp, suspending..."); + } + free(f->f_un.f_forw.savedMsg); + f->f_un.f_forw.savedMsg = NULL; + } + } + } + } +#endif /* #ifdef SYSLOG_INET */ +#ifdef SYSLOG_UNIXAF + for (i = 0; i < nfunix; i++) { + if ((fd = funix[i]) != -1 && FD_ISSET(fd, &readfds)) { + int iRcvd; + memset(line, '\0', sizeof(line)); + iRcvd = recv(fd, line, MAXLINE - 2, 0); + dprintf("Message from UNIX socket: #%d\n", fd); + if (iRcvd > 0) { + line[iRcvd] = line[iRcvd+1] = '\0'; + printchopped(LocalHostName, line, iRcvd + 2, fd, funixParseHost[i]); + } else if (iRcvd < 0 && errno != EINTR) { + dprintf("UNIX socket error: %d = %s.\n", \ + errno, strerror(errno)); + logerror("recvfrom UNIX"); + } + } + } +#endif + +#ifdef SYSLOG_INET + if (InetInuse && AcceptRemote && FD_ISSET(inetm, &readfds)) { + len = sizeof(frominet); + memset(line, '\0', sizeof(line)); + i = recvfrom(finet, line, MAXLINE - 2, 0, \ + (struct sockaddr *) &frominet, &len); + dprintf("Message from UDP inetd socket: #%d, host: %s\n", + inetm, inet_ntoa(frominet.sin_addr)); + if (i > 0) { + from = (char *)cvthname(&frominet); + /* Here we check if a host is permitted to send us + * syslog messages. If it isn't, we do not further + * process the message but log a warning (if we are + * configured to do this). + * rgerhards, 2005-09-26 + */ + if(isAllowedSender(pAllowedSenders_UDP, &frominet)) { + line[i] = line[i+1] = '\0'; + printchopped(from, line, i + 2, finet, 1); + } else { + if(option_DisallowWarning) { + logerrorSz("UDP message from disallowed sender %s discarded", + from); + } + } + } else if (i < 0 && errno != EINTR && errno != EAGAIN) { + /* see link below why we check EAGAIN: + * http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=188194 + */ + dprintf("INET socket error: %d = %s.\n", \ + errno, strerror(errno)); + logerror("recvfrom inet"); + /* should be harmless now that we set + * BSDCOMPAT on the socket */ + sleep(1); + } + } + + if(bEnableTCP && sockTCPLstn != -1) { + /* Now check for TCP input */ + if(FD_ISSET(sockTCPLstn, &readfds)) { + dprintf("New connect on TCP inetd socket: #%d\n", sockTCPLstn); + TCPSessAccept(); + } + + /* now check the sessions */ + /* TODO: optimize the whole thing. We could stop enumerating as + * soon as we have found all sockets flagged as active. */ + iTCPSess = TCPSessGetNxtSess(-1); + while(iTCPSess != -1) { + int fd; + int state; + fd = TCPSessions[iTCPSess].sock; + if(FD_ISSET(fd, &readfds)) { + char buf[MAXLINE]; + dprintf("tcp session socket with new data: #%d\n", fd); + + /* Receive message */ + state = recv(fd, buf, sizeof(buf), 0); + if(state == 0) { + /* Session closed */ + TCPSessClose(iTCPSess); + } else if(state == -1) { + char errmsg[128]; + snprintf(errmsg, sizeof(errmsg)/sizeof(char), + "TCP session %d will be closed, error ignored", + fd); + logerror(errmsg); + TCPSessClose(iTCPSess); + } else { + /* valid data received, process it! */ + TCPSessDataRcvd(iTCPSess, buf, state); + } + } + iTCPSess = TCPSessGetNxtSess(iTCPSess); + } + } + +#endif +#else + if ( FD_ISSET(fileno(stdin), &readfds) ) { + dprintf("Message from stdin.\n"); + memset(line, '\0', sizeof(line)); + line[0] = '.'; + parts[fileno(stdin)] = NULL; + i = read(fileno(stdin), line, MAXLINE); + if (i > 0) { + printchopped(LocalHostName, line, i+1, + fileno(stdin), 0); + } else if (i < 0) { + if (errno != EINTR) { + logerror("stdin"); + } + } + FD_CLR(fileno(stdin), &readfds); + } + +#endif + } +} + +int main(int argc, char **argv) +{ register int i; + register char *p; +#if !defined(__GLIBC__) + int len, num_fds; +#else /* __GLIBC__ */ + int num_fds; +#endif /* __GLIBC__ */ + +#ifdef MTRACE + mtrace(); /* this is a debug aid for leak detection - either remove + * or put in conditional compilation. 2005-01-18 RGerhards */ +#endif + +#ifndef TESTING + pid_t ppid = getpid(); +#endif + int ch; + struct hostent *hent; + + extern int optind; + extern char *optarg; + char *pTmp; + +#ifndef TESTING + chdir ("/"); +#endif + for (i = 1; i < MAXFUNIX; i++) { + funixn[i] = ""; + funix[i] = -1; + } + + while ((ch = getopt(argc, argv, "a:dhi:f:l:m:nop:r:s:t:vw")) != EOF) + switch((char)ch) { + case 'a': + if (nfunix < MAXFUNIX) + if(*optarg == ':') { + funixParseHost[nfunix] = 1; + funixn[nfunix++] = optarg+1; + } + else { + funixParseHost[nfunix] = 0; + funixn[nfunix++] = optarg; + } + else + fprintf(stderr, "Out of descriptors, ignoring %s\n", optarg); + break; + case 'd': /* debug */ + Debug = 1; + break; + case 'f': /* configuration file */ + ConfFile = optarg; + break; + case 'h': + NoHops = 0; + break; + case 'i': /* pid file name */ + PidFile = optarg; + break; + case 'l': + if (LocalHosts) { + fprintf (stderr, "Only one -l argument allowed," \ + "the first one is taken.\n"); + break; + } + LocalHosts = crunch_list(optarg); + break; + case 'm': /* mark interval */ + MarkInterval = atoi(optarg) * 60; + break; + case 'n': /* don't fork */ + NoFork = 1; + break; + case 'o': /* omit local logging (/dev/log) */ + startIndexUxLocalSockets = 1; + break; + case 'p': /* path to regular log socket */ + funixn[0] = optarg; + break; + case 'r': /* accept remote messages */ + AcceptRemote = 1; + LogPort = atoi(optarg); + break; + case 's': + if (StripDomains) { + fprintf (stderr, "Only one -s argument allowed," \ + "the first one is taken.\n"); + break; + } + StripDomains = crunch_list(optarg); + break; + case 't': /* enable tcp logging */ + bEnableTCP = -1; + TCPLstnPort = atoi(optarg); + break; + case 'v': + printf("rsyslogd %s.%s, ", VERSION, PATCHLEVEL); + printf("compiled with:\n"); +#ifdef USE_PTHREADS + printf("\tFEATURE_PTHREADS (dual-threading)\n"); +#endif +#ifdef FEATURE_REGEXP + printf("\tFEATURE_REGEXP\n"); +#endif +#ifdef WITH_DB + printf("\tFEATURE_DB\n"); +#endif +#ifndef NOLARGEFILE + printf("\tFEATURE_LARGEFILE\n"); +#endif +#ifdef SYSLOG_INET + printf("\tSYSLOG_INET (Internet/remote support)\n"); +#endif +#ifndef NDEBUG + printf("\tFEATURE_DEBUG (debug build, slow code)\n"); +#endif + printf("\nSee http://www.rsyslog.com for more information.\n"); + exit(0); /* exit for -v option - so this is a "good one" */ + case 'w': /* disable disallowed host warnigs */ + option_DisallowWarning = 0; + break; + case '?': + default: + usage(); + } + if ((argc -= optind)) + usage(); + +#ifndef TESTING + if ( !(Debug || NoFork) ) + { + dprintf("Checking pidfile.\n"); + if (!check_pid(PidFile)) + { + signal (SIGTERM, doexit); + if (fork()) { + /* + * Parent process + */ + sleep(300); + /* + * Not reached unless something major went wrong. 5 + * minutes should be a fair amount of time to wait. + * Please note that this procedure is important since + * the father must not exit before syslogd isn't + * initialized or the klogd won't be able to flush its + * logs. -Joey + */ + exit(1); /* "good" exit - after forking, not diasabling anything */ + } + num_fds = getdtablesize(); + for (i= 0; i < num_fds; i++) + (void) close(i); + untty(); + } + else + { + fputs("rsyslogd: Already running.\n", stderr); + exit(1); /* "good" exit, done if syslogd is already running */ + } + } + else +#endif + debugging_on = 1; +#ifndef SYSV + else + setlinebuf(stdout); +#endif + +#ifndef TESTING + /* tuck my process id away */ + if ( !Debug ) + { + dprintf("Writing pidfile.\n"); + if (!check_pid(PidFile)) + { + if (!write_pid(PidFile)) + { + dprintf("Can't write pid.\n"); + exit(1); /* exit during startup - questionable */ + } + } + else + { + dprintf("Pidfile (and pid) already exist.\n"); + exit(1); /* exit during startup - questionable */ + } + } /* if ( !Debug ) */ +#endif + myPid = getpid(); /* save our pid for further testing (also used for messages) */ + + /* initialize the default templates + * we use template names with a SP in front - these + * can NOT be generated via the configuration file + */ + pTmp = template_TraditionalFormat; + tplAddLine(" TradFmt", &pTmp); + pTmp = template_WallFmt; + tplAddLine(" WallFmt", &pTmp); + pTmp = template_StdFwdFmt; + tplAddLine(" StdFwdFmt", &pTmp); + pTmp = template_StdUsrMsgFmt; + tplAddLine(" StdUsrMsgFmt", &pTmp); + pTmp = template_StdDBFmt; + tplAddLine(" StdDBFmt", &pTmp); + + /* prepare emergency logging system */ + + consfile.f_type = F_CONSOLE; + (void) strcpy(consfile.f_un.f_fname, ctty); + cflineSetTemplateAndIOV(&consfile, " TradFmt"); + (void) gethostname(LocalHostName, sizeof(LocalHostName)); + if ( (p = strchr(LocalHostName, '.')) ) { + *p++ = '\0'; + LocalDomain = p; + } + else + { + LocalDomain = ""; + + /* + * It's not clearly defined whether gethostname() + * should return the simple hostname or the fqdn. A + * good piece of software should be aware of both and + * we want to distribute good software. Joey + * + * Good software also always checks its return values... + * If syslogd starts up before DNS is up & /etc/hosts + * doesn't have LocalHostName listed, gethostbyname will + * return NULL. + */ + hent = gethostbyname(LocalHostName); + if ( hent ) + snprintf(LocalHostName, sizeof(LocalHostName), "%s", hent->h_name); + + if ( (p = strchr(LocalHostName, '.')) ) + { + *p++ = '\0'; + LocalDomain = p; + } + } + + /* Convert to lower case to recognize the correct domain laterly + */ + for (p = (char *)LocalDomain; *p ; p++) + if (isupper(*p)) + *p = tolower(*p); + + (void) signal(SIGTERM, die); + (void) signal(SIGINT, Debug ? die : SIG_IGN); + (void) signal(SIGQUIT, Debug ? die : SIG_IGN); + (void) signal(SIGCHLD, reapchild); + (void) signal(SIGALRM, domarkAlarmHdlr); + (void) signal(SIGUSR1, Debug ? debug_switch : SIG_IGN); + (void) signal(SIGPIPE, SIG_IGN); + (void) signal(SIGXFSZ, SIG_IGN); /* do not abort if 2gig file limit is hit */ + (void) alarm(TIMERINTVL); + + /* Create a partial message table for all file descriptors. */ + num_fds = getdtablesize(); + dprintf("Allocated parts table for %d file descriptors.\n", num_fds); + if ((parts = (char **) malloc(num_fds * sizeof(char *))) == NULL) + { + logerror("Cannot allocate memory for message parts table."); + die(0); + } + for(i= 0; i < num_fds; ++i) + parts[i] = NULL; + + dprintf("Starting.\n"); + init(); +#ifndef TESTING + if(Debug) { + dprintf("Debugging enabled, SIGUSR1 to turn off debugging.\n"); + debugging_on = 1; + } + /* + * Send a signal to the parent to it can terminate. + */ + if (myPid != ppid) + kill (ppid, SIGTERM); +#endif + /* END OF INTIALIZATION + * ... but keep in mind that we might do a restart and thus init() might + * be called again. If that happens, we must shut down all active threads, + * do the init() and then restart things. + * rgerhards, 2005-10-24 + */ +#ifdef USE_PTHREADS + /* create message queue */ + pMsgQueue = queueInit(); + if(pMsgQueue != NULL) { + errno = 0; + logerror("error: could not create message queue - running single-threaded!\n"); + } else { /* start up worker thread */ + startWorker(); + } +#endif + + /* --------------------- Main loop begins here. ----------------------------------------- */ + mainloop(); + return 0; +} + /* * Local variables: * c-indent-level: 8 @@ -329,10 +329,8 @@ static int do_Parameter(char **pp, struct template *pTpl) /* TODO: RGer: check if we can recover better... (probably not) */ } - regex_char[0] = '\0'; - /* Get the regex string for compiling later */ - strncpy(regex_char, p, longitud); + memcpy(regex_char, p, longitud); regex_char[longitud] = '\0'; dprintf("debug: regex detected: '%s'\n", |