summaryrefslogtreecommitdiffstats
path: root/syslogd.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2005-10-24 16:44:14 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2005-10-24 16:44:14 +0000
commit0795edb76c48886a2937fb93f001a3e3b79ab962 (patch)
treeed5ab4ae22d05919b62f83a7ee67a2c4f43a3436 /syslogd.c
parentbce77e093ba6c3fc6e4a5d18db046d6c161b06ac (diff)
downloadrsyslog-0795edb76c48886a2937fb93f001a3e3b79ab962.tar.gz
rsyslog-0795edb76c48886a2937fb93f001a3e3b79ab962.tar.xz
rsyslog-0795edb76c48886a2937fb93f001a3e3b79ab962.zip
first steps with pthread, crashes at program end
Diffstat (limited to 'syslogd.c')
-rw-r--r--syslogd.c154
1 files changed, 120 insertions, 34 deletions
diff --git a/syslogd.c b/syslogd.c
index 02504a7f..7ff2b09d 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -346,8 +346,7 @@ int funix[MAXFUNIX] = { -1, }; /* read-only after startup */
/*
* Flags to logmsg().
*/
-
-#define IGN_CONS 0x001 /* don't print on console */
+/* NO LONGER NEEDED: #define IGN_CONS 0x001 * don't print on console */
#define SYNC_FILE 0x002 /* do fsync on file after printing */
#define ADDDATE 0x004 /* add a date to the message */
#define MARK 0x008 /* this message is a mark */
@@ -456,6 +455,7 @@ struct msg {
char *pszTIMESTAMP3164; /* TIMESTAMP as RFC3164 formatted string (always 15 charcters) */
char *pszTIMESTAMP3339; /* TIMESTAMP as RFC3339 formatted string (32 charcters at most) */
char *pszTIMESTAMP_MySQL;/* TIMESTAMP as MySQL formatted string (always 14 charcters) */
+ int msgFlags; /* flags associated with this message */
};
@@ -577,6 +577,17 @@ static EHostnameCmpMode eDfltHostnameCmpMode;
static rsCStrObj *pDfltHostnameCmp;
static rsCStrObj *pDfltProgNameCmp;
+/* supporting structures for multithreading */
+#ifdef USE_PTHREADS
+static pthread_cond_t cndRunWorker = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t mtxRunWorker = PTHREAD_MUTEX_INITIALIZER;
+static pthread_t thrdWorker;
+static int bGlblDone = 0;
+static struct msg *bufpMsg = NULL;
+#endif
+/* END supporting structures for multithreading */
+
+
/*
* Intervals at which we flush out "message repeated" messages,
* in seconds after previous message is logged. After each flush,
@@ -770,6 +781,8 @@ static char template_StdUsrMsgFmt[] = "\" %syslogtag%%msg%\n\r\"";
static char template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')\",SQL";
/* end template */
+
+static void *singleWorker(void *vParam); /* REMOVEME later 2005-10-24 */
/* Function prototypes. */
int main(int argc, char **argv);
static char **crunch_list(char *list);
@@ -778,7 +791,7 @@ static void untty(void);
static void printchopped(char *hname, char *msg, int len, int fd, int iSourceType);
static void printline(char *hname, char *msg, int iSource);
static void logmsg(int pri, struct msg*, int flags);
-static void fprintlog(register struct filed *f, int flags);
+static void fprintlog(register struct filed *f);
static void endtty();
static void wallmsg(register struct filed *f);
static void reapchild();
@@ -821,7 +834,7 @@ static int create_udp_socket();
* rgerhards 2005-10-24
*/
-/* END Acess functions for the struct filed */
+/* END Access functions for the struct filed */
/* Code for handling allowed/disallowed senders
*/
@@ -1266,12 +1279,10 @@ static void TCPSendSetStatus(struct filed *f, enum TCPSendStatus iNewState)
/* there can potentially be a race condition, so guard by mutex */
# ifdef USE_PTHREADS
pthread_mutex_lock(&f->f_un.f_forw.mtxTCPSend);
-dprintf("SetStats aquired mutex\n");
# endif
f->f_un.f_forw.status = iNewState;
# ifdef USE_PTHREADS
pthread_mutex_unlock(&f->f_un.f_forw.mtxTCPSend);
-dprintf("SetStats freed mutex\n");
# endif
}
@@ -3008,6 +3019,19 @@ int main(int argc, char **argv)
if (myPid != ppid)
kill (ppid, SIGTERM);
#endif
+ /* END OF INTIALIZATION
+ * ... but keep in mind that we might do a restart and thus init() might
+ * be called again. If that happens, we must shut down all active threads,
+ * do the init() and then restart things.
+ * rgerhards, 2005-10-24
+ */
+#ifdef USE_PTHREADS
+ /* start up worker thread */
+ { int i;
+ i = pthread_create(&thrdWorker, NULL, singleWorker, NULL);
+ printf("worker thread started with state %d\n", i);
+ }
+#endif
/* Main loop begins here. */
for (;;) {
@@ -3862,9 +3886,9 @@ int shouldProcessThisMessage(struct filed *f, struct msg *pMsg)
* See comment dated 2005-10-13 in logmsg() on multithreading.
* rgerhards, 2005-10-13
*/
-static void processMsg(struct msg *pMsg, int flags)
+static void processMsg(struct msg *pMsg)
{
- register struct filed *f;
+ struct filed *f;
assert(pMsg != NULL);
@@ -3883,7 +3907,7 @@ static void processMsg(struct msg *pMsg, int flags)
if (f->f_file >= 0) {
untty();
f->f_pMsg = MsgAddRef(pMsg); /* is expected here... */
- fprintlog(f, flags);
+ fprintlog(f);
MsgDestruct(pMsg);
(void) close(f->f_file);
f->f_file = -1;
@@ -3902,7 +3926,7 @@ static void processMsg(struct msg *pMsg, int flags)
if (f->f_file >= 0) {
untty();
f->f_pMsg = MsgAddRef(pMsg); /* is expected here... */
- fprintlog(f, flags);
+ fprintlog(f);
MsgDestruct(pMsg);
(void) close(f->f_file);
f->f_file = -1;
@@ -3944,31 +3968,26 @@ static void processMsg(struct msg *pMsg, int flags)
break; /* that's it for this message ;) */
}
- if (f->f_type == F_CONSOLE && (flags & IGN_CONS))
- continue;
-
/* don't output marks to recently written files */
- if ((flags & MARK) && (now - f->f_time) < MarkInterval / 2)
+ if ((pMsg->msgFlags & MARK) && (now - f->f_time) < MarkInterval / 2)
continue;
/*
* suppress duplicate lines to this file
*/
- if ((flags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(f->f_pMsg) &&
+ if ((pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(f->f_pMsg) &&
!strcmp(getMSG(pMsg), getMSG(f->f_pMsg)) &&
!strcmp(getHOSTNAME(pMsg), getHOSTNAME(f->f_pMsg))) {
f->f_prevcount++;
dprintf("msg repeated %d times, %ld sec of %d.\n",
f->f_prevcount, now - f->f_time,
repeatinterval[f->f_repeatcount]);
- /*
- * If domark would have logged this by now,
+ /* If domark would have logged this by now,
* flush it now (so we don't hold isolated messages),
- * but back off so we'll flush less often
- * in the future.
+ * but back off so we'll flush less often in the future.
*/
if (now > REPEATTIME(f)) {
- fprintlog(f, flags);
+ fprintlog(f);
BACKOFF(f);
}
} else {
@@ -3980,12 +3999,73 @@ static void processMsg(struct msg *pMsg, int flags)
MsgDestruct(f->f_pMsg);
f->f_pMsg = MsgAddRef(pMsg);
/* call the output driver */
- fprintlog(f, flags);
+ fprintlog(f);
}
}
}
+#ifdef USE_PTHREADS
+/* This block contains code that is only present when USE_PTHREADS is
+ * enabled. I plan to move it to some other file, but for the time
+ * being, I include it here because that saves me from the need to
+ * do so many external definitons.
+ * rgerhards, 2005-10-24
+ */
+
+/* The worker thread (so far, we have dual-threading, so only one
+ * worker thread. Having more than one worker requires considerable
+ * additional code review in regard to thread-safety.
+ */
+static void *singleWorker(void *vParam)
+{
+ struct msg *pMsg;
+ while(!bGlblDone) {
+ pthread_mutex_lock(&mtxRunWorker);
+ printf("worker waits for next message\n");
+ pthread_cond_wait(&cndRunWorker, &mtxRunWorker);
+ /* dequeue element (still protected from mutex) */
+ printf("Worker dequeues...\n");
+ pMsg = bufpMsg;
+ pthread_mutex_unlock(&mtxRunWorker);
+ /* do actual processing (the lengthy part, runs in parallel) */
+ dprintf("worker is running\n");
+ processMsg(pMsg);
+ }
+
+ pthread_exit(0);
+}
+
+/* END threads-related code */
+#endif /* #ifdef USE_PTHREADS */
+
+
+/* This method enqueues a message into the the message buffer. It also
+ * the worker thread, so that the message will be processed. If we are
+ * compiled without PTHREADS support, we simply use this method as
+ * an alias for processMsg().
+ * See comment dated 2005-10-13 in logmsg() on multithreading.
+ * rgerhards, 2005-10-24
+ */
+#ifndef USE_PTHREADS
+#define enqueueMsg(x) processMsg((x))
+#else
+static void enqueueMsg(struct msg *pMsg)
+{
+ int iRet;
+ assert(pMsg != NULL);
+
+ iRet = pthread_mutex_lock(&mtxRunWorker);
+ printf("EnqueueMsg waiting on mutex (%d)\n", iRet);
+ bufpMsg = pMsg;
+ /* now activate the worker thread */
+ pthread_mutex_unlock(&mtxRunWorker);
+ iRet = pthread_cond_signal(&cndRunWorker);
+ printf("EnqueueMsg signaled condition (%d)\n", iRet);
+}
+#endif /* #ifndef USE_PTHREADS */
+
+
/*
* Log a message to the appropriate log files, users, etc. based on
* the priority.
@@ -4146,7 +4226,8 @@ void logmsg(int pri, struct msg *pMsg, int flags)
* consumer) to its own method, now called "processMsg()".
*/
- processMsg(pMsg, flags);
+ pMsg->msgFlags = flags;
+ enqueueMsg(pMsg);
#ifndef SYSV
(void) sigsetmask(omask);
@@ -4545,7 +4626,7 @@ again:
* This whole function is probably about to change once we have the
* message abstraction.
*/
-void fprintlog(register struct filed *f, int flags)
+void fprintlog(register struct filed *f)
{
char *msg;
char *psz; /* for shell support */
@@ -4683,14 +4764,6 @@ void fprintlog(register struct filed *f, int flags)
case F_CONSOLE:
f->f_time = now;
-#ifdef UNIXPC
- if (1) {
-#else
- if (flags & IGN_CONS) {
-#endif
- dprintf(" (ignored).\n");
- break;
- }
/* FALLTHROUGH */
case F_TTY:
@@ -5138,7 +5211,7 @@ static void die(int sig)
* I am not sure it is correct as done.
* TODO: verify later!
*/
- fprintlog(f, 0);
+ fprintlog(f);
}
Initialized = was_initialized; /* we restore this so that the logmsgInternal()
@@ -5154,6 +5227,20 @@ static void die(int sig)
logmsgInternal(LOG_SYSLOG|LOG_INFO, buf, LocalHostName, ADDDATE);
}
+#ifdef USE_PTHREADS
+ /* We are now done with all messages, so we need to wake up the
+ * worker thread and then wait for it to finish.
+ */
+ /* TODO: this code is not really working. It's just good as a test
+ * harness!! It dumps *at least* because we have no qeuue!
+ */
+ bGlblDone = 1;
+ pthread_cond_signal(&cndRunWorker);
+ pthread_join(thrdWorker, NULL);
+ pthread_cond_destroy(&cndRunWorker);
+#endif
+
+
/* Free ressources and close MySQL connections */
for (f = Files; f != NULL ; f = f->f_next) {
/* free iovec if it was allocated */
@@ -5449,7 +5536,7 @@ static void init()
* I am not sure it is correct as done.
* TODO: verify later!
*/
- fprintlog(f, 0);
+ fprintlog(f);
/* free iovec if it was allocated */
if(f->f_iov != NULL) {
@@ -6379,7 +6466,6 @@ static rsRetVal cfline(char *line, register struct filed *f)
/* in this case, we also need a mutex... */
# ifdef USE_PTHREADS
pthread_mutex_init(&f->f_un.f_forw.mtxTCPSend, 0);
-dprintf("initializing mutex!\n");
# endif
} else {
f->f_un.f_forw.protocol = FORW_UDP;