diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2005-10-24 16:44:14 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2005-10-24 16:44:14 +0000 |
commit | 0795edb76c48886a2937fb93f001a3e3b79ab962 (patch) | |
tree | ed5ab4ae22d05919b62f83a7ee67a2c4f43a3436 /syslogd.c | |
parent | bce77e093ba6c3fc6e4a5d18db046d6c161b06ac (diff) | |
download | rsyslog-0795edb76c48886a2937fb93f001a3e3b79ab962.tar.gz rsyslog-0795edb76c48886a2937fb93f001a3e3b79ab962.tar.xz rsyslog-0795edb76c48886a2937fb93f001a3e3b79ab962.zip |
first steps with pthread, crashes at program end
Diffstat (limited to 'syslogd.c')
-rw-r--r-- | syslogd.c | 154 |
1 files changed, 120 insertions, 34 deletions
@@ -346,8 +346,7 @@ int funix[MAXFUNIX] = { -1, }; /* read-only after startup */ /* * Flags to logmsg(). */ - -#define IGN_CONS 0x001 /* don't print on console */ +/* NO LONGER NEEDED: #define IGN_CONS 0x001 * don't print on console */ #define SYNC_FILE 0x002 /* do fsync on file after printing */ #define ADDDATE 0x004 /* add a date to the message */ #define MARK 0x008 /* this message is a mark */ @@ -456,6 +455,7 @@ struct msg { char *pszTIMESTAMP3164; /* TIMESTAMP as RFC3164 formatted string (always 15 charcters) */ char *pszTIMESTAMP3339; /* TIMESTAMP as RFC3339 formatted string (32 charcters at most) */ char *pszTIMESTAMP_MySQL;/* TIMESTAMP as MySQL formatted string (always 14 charcters) */ + int msgFlags; /* flags associated with this message */ }; @@ -577,6 +577,17 @@ static EHostnameCmpMode eDfltHostnameCmpMode; static rsCStrObj *pDfltHostnameCmp; static rsCStrObj *pDfltProgNameCmp; +/* supporting structures for multithreading */ +#ifdef USE_PTHREADS +static pthread_cond_t cndRunWorker = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t mtxRunWorker = PTHREAD_MUTEX_INITIALIZER; +static pthread_t thrdWorker; +static int bGlblDone = 0; +static struct msg *bufpMsg = NULL; +#endif +/* END supporting structures for multithreading */ + + /* * Intervals at which we flush out "message repeated" messages, * in seconds after previous message is logged. After each flush, @@ -770,6 +781,8 @@ static char template_StdUsrMsgFmt[] = "\" %syslogtag%%msg%\n\r\""; static char template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')\",SQL"; /* end template */ + +static void *singleWorker(void *vParam); /* REMOVEME later 2005-10-24 */ /* Function prototypes. */ int main(int argc, char **argv); static char **crunch_list(char *list); @@ -778,7 +791,7 @@ static void untty(void); static void printchopped(char *hname, char *msg, int len, int fd, int iSourceType); static void printline(char *hname, char *msg, int iSource); static void logmsg(int pri, struct msg*, int flags); -static void fprintlog(register struct filed *f, int flags); +static void fprintlog(register struct filed *f); static void endtty(); static void wallmsg(register struct filed *f); static void reapchild(); @@ -821,7 +834,7 @@ static int create_udp_socket(); * rgerhards 2005-10-24 */ -/* END Acess functions for the struct filed */ +/* END Access functions for the struct filed */ /* Code for handling allowed/disallowed senders */ @@ -1266,12 +1279,10 @@ static void TCPSendSetStatus(struct filed *f, enum TCPSendStatus iNewState) /* there can potentially be a race condition, so guard by mutex */ # ifdef USE_PTHREADS pthread_mutex_lock(&f->f_un.f_forw.mtxTCPSend); -dprintf("SetStats aquired mutex\n"); # endif f->f_un.f_forw.status = iNewState; # ifdef USE_PTHREADS pthread_mutex_unlock(&f->f_un.f_forw.mtxTCPSend); -dprintf("SetStats freed mutex\n"); # endif } @@ -3008,6 +3019,19 @@ int main(int argc, char **argv) if (myPid != ppid) kill (ppid, SIGTERM); #endif + /* END OF INTIALIZATION + * ... but keep in mind that we might do a restart and thus init() might + * be called again. If that happens, we must shut down all active threads, + * do the init() and then restart things. + * rgerhards, 2005-10-24 + */ +#ifdef USE_PTHREADS + /* start up worker thread */ + { int i; + i = pthread_create(&thrdWorker, NULL, singleWorker, NULL); + printf("worker thread started with state %d\n", i); + } +#endif /* Main loop begins here. */ for (;;) { @@ -3862,9 +3886,9 @@ int shouldProcessThisMessage(struct filed *f, struct msg *pMsg) * See comment dated 2005-10-13 in logmsg() on multithreading. * rgerhards, 2005-10-13 */ -static void processMsg(struct msg *pMsg, int flags) +static void processMsg(struct msg *pMsg) { - register struct filed *f; + struct filed *f; assert(pMsg != NULL); @@ -3883,7 +3907,7 @@ static void processMsg(struct msg *pMsg, int flags) if (f->f_file >= 0) { untty(); f->f_pMsg = MsgAddRef(pMsg); /* is expected here... */ - fprintlog(f, flags); + fprintlog(f); MsgDestruct(pMsg); (void) close(f->f_file); f->f_file = -1; @@ -3902,7 +3926,7 @@ static void processMsg(struct msg *pMsg, int flags) if (f->f_file >= 0) { untty(); f->f_pMsg = MsgAddRef(pMsg); /* is expected here... */ - fprintlog(f, flags); + fprintlog(f); MsgDestruct(pMsg); (void) close(f->f_file); f->f_file = -1; @@ -3944,31 +3968,26 @@ static void processMsg(struct msg *pMsg, int flags) break; /* that's it for this message ;) */ } - if (f->f_type == F_CONSOLE && (flags & IGN_CONS)) - continue; - /* don't output marks to recently written files */ - if ((flags & MARK) && (now - f->f_time) < MarkInterval / 2) + if ((pMsg->msgFlags & MARK) && (now - f->f_time) < MarkInterval / 2) continue; /* * suppress duplicate lines to this file */ - if ((flags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(f->f_pMsg) && + if ((pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(f->f_pMsg) && !strcmp(getMSG(pMsg), getMSG(f->f_pMsg)) && !strcmp(getHOSTNAME(pMsg), getHOSTNAME(f->f_pMsg))) { f->f_prevcount++; dprintf("msg repeated %d times, %ld sec of %d.\n", f->f_prevcount, now - f->f_time, repeatinterval[f->f_repeatcount]); - /* - * If domark would have logged this by now, + /* If domark would have logged this by now, * flush it now (so we don't hold isolated messages), - * but back off so we'll flush less often - * in the future. + * but back off so we'll flush less often in the future. */ if (now > REPEATTIME(f)) { - fprintlog(f, flags); + fprintlog(f); BACKOFF(f); } } else { @@ -3980,12 +3999,73 @@ static void processMsg(struct msg *pMsg, int flags) MsgDestruct(f->f_pMsg); f->f_pMsg = MsgAddRef(pMsg); /* call the output driver */ - fprintlog(f, flags); + fprintlog(f); } } } +#ifdef USE_PTHREADS +/* This block contains code that is only present when USE_PTHREADS is + * enabled. I plan to move it to some other file, but for the time + * being, I include it here because that saves me from the need to + * do so many external definitons. + * rgerhards, 2005-10-24 + */ + +/* The worker thread (so far, we have dual-threading, so only one + * worker thread. Having more than one worker requires considerable + * additional code review in regard to thread-safety. + */ +static void *singleWorker(void *vParam) +{ + struct msg *pMsg; + while(!bGlblDone) { + pthread_mutex_lock(&mtxRunWorker); + printf("worker waits for next message\n"); + pthread_cond_wait(&cndRunWorker, &mtxRunWorker); + /* dequeue element (still protected from mutex) */ + printf("Worker dequeues...\n"); + pMsg = bufpMsg; + pthread_mutex_unlock(&mtxRunWorker); + /* do actual processing (the lengthy part, runs in parallel) */ + dprintf("worker is running\n"); + processMsg(pMsg); + } + + pthread_exit(0); +} + +/* END threads-related code */ +#endif /* #ifdef USE_PTHREADS */ + + +/* This method enqueues a message into the the message buffer. It also + * the worker thread, so that the message will be processed. If we are + * compiled without PTHREADS support, we simply use this method as + * an alias for processMsg(). + * See comment dated 2005-10-13 in logmsg() on multithreading. + * rgerhards, 2005-10-24 + */ +#ifndef USE_PTHREADS +#define enqueueMsg(x) processMsg((x)) +#else +static void enqueueMsg(struct msg *pMsg) +{ + int iRet; + assert(pMsg != NULL); + + iRet = pthread_mutex_lock(&mtxRunWorker); + printf("EnqueueMsg waiting on mutex (%d)\n", iRet); + bufpMsg = pMsg; + /* now activate the worker thread */ + pthread_mutex_unlock(&mtxRunWorker); + iRet = pthread_cond_signal(&cndRunWorker); + printf("EnqueueMsg signaled condition (%d)\n", iRet); +} +#endif /* #ifndef USE_PTHREADS */ + + /* * Log a message to the appropriate log files, users, etc. based on * the priority. @@ -4146,7 +4226,8 @@ void logmsg(int pri, struct msg *pMsg, int flags) * consumer) to its own method, now called "processMsg()". */ - processMsg(pMsg, flags); + pMsg->msgFlags = flags; + enqueueMsg(pMsg); #ifndef SYSV (void) sigsetmask(omask); @@ -4545,7 +4626,7 @@ again: * This whole function is probably about to change once we have the * message abstraction. */ -void fprintlog(register struct filed *f, int flags) +void fprintlog(register struct filed *f) { char *msg; char *psz; /* for shell support */ @@ -4683,14 +4764,6 @@ void fprintlog(register struct filed *f, int flags) case F_CONSOLE: f->f_time = now; -#ifdef UNIXPC - if (1) { -#else - if (flags & IGN_CONS) { -#endif - dprintf(" (ignored).\n"); - break; - } /* FALLTHROUGH */ case F_TTY: @@ -5138,7 +5211,7 @@ static void die(int sig) * I am not sure it is correct as done. * TODO: verify later! */ - fprintlog(f, 0); + fprintlog(f); } Initialized = was_initialized; /* we restore this so that the logmsgInternal() @@ -5154,6 +5227,20 @@ static void die(int sig) logmsgInternal(LOG_SYSLOG|LOG_INFO, buf, LocalHostName, ADDDATE); } +#ifdef USE_PTHREADS + /* We are now done with all messages, so we need to wake up the + * worker thread and then wait for it to finish. + */ + /* TODO: this code is not really working. It's just good as a test + * harness!! It dumps *at least* because we have no qeuue! + */ + bGlblDone = 1; + pthread_cond_signal(&cndRunWorker); + pthread_join(thrdWorker, NULL); + pthread_cond_destroy(&cndRunWorker); +#endif + + /* Free ressources and close MySQL connections */ for (f = Files; f != NULL ; f = f->f_next) { /* free iovec if it was allocated */ @@ -5449,7 +5536,7 @@ static void init() * I am not sure it is correct as done. * TODO: verify later! */ - fprintlog(f, 0); + fprintlog(f); /* free iovec if it was allocated */ if(f->f_iov != NULL) { @@ -6379,7 +6466,6 @@ static rsRetVal cfline(char *line, register struct filed *f) /* in this case, we also need a mutex... */ # ifdef USE_PTHREADS pthread_mutex_init(&f->f_un.f_forw.mtxTCPSend, 0); -dprintf("initializing mutex!\n"); # endif } else { f->f_un.f_forw.protocol = FORW_UDP; |