summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2005-10-25 08:04:04 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2005-10-25 08:04:04 +0000
commit7ec0b41357a8c5a73600ef4b6e74dbff41a1222d (patch)
tree08567b63e3391666d9bcb57491e9803beb180248
parent0795edb76c48886a2937fb93f001a3e3b79ab962 (diff)
downloadrsyslog-7ec0b41357a8c5a73600ef4b6e74dbff41a1222d.tar.gz
rsyslog-7ec0b41357a8c5a73600ef4b6e74dbff41a1222d.tar.xz
rsyslog-7ec0b41357a8c5a73600ef4b6e74dbff41a1222d.zip
coming closer to a real dual-threading implementation. Now with queue.
-rw-r--r--syslogd.c200
1 files changed, 163 insertions, 37 deletions
diff --git a/syslogd.c b/syslogd.c
index 7ff2b09d..c7600f0b 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -579,11 +579,21 @@ static rsCStrObj *pDfltProgNameCmp;
/* supporting structures for multithreading */
#ifdef USE_PTHREADS
-static pthread_cond_t cndRunWorker = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t mtxRunWorker = PTHREAD_MUTEX_INITIALIZER;
+/* this is the first approach to a queue, this time with static
+ * memory.
+ */
+#define QUEUESIZE 100
+typedef struct {
+ void* buf[QUEUESIZE];
+ long head, tail;
+ int full, empty;
+ pthread_mutex_t *mut;
+ pthread_cond_t *notFull, *notEmpty;
+} msgQueue;
+
+msgQueue *pMsgQueue = NULL;
static pthread_t thrdWorker;
static int bGlblDone = 0;
-static struct msg *bufpMsg = NULL;
#endif
/* END supporting structures for multithreading */
@@ -782,7 +792,11 @@ static char template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility
/* end template */
+/* up to the next comment, prototypes that should be removed by reordering */
+#ifdef USE_PTHREADS
+msgQueue *queueInit (void);
static void *singleWorker(void *vParam); /* REMOVEME later 2005-10-24 */
+#endif
/* Function prototypes. */
int main(int argc, char **argv);
static char **crunch_list(char *list);
@@ -1927,7 +1941,8 @@ static struct msg* MsgConstruct()
getCurrTime(&(pM->tRcvdAt));
}
- /* DEV debugging only! dprintf("MsgConstruct\t0x%x\n", (int)pM);*/
+ /* DEV debugging only! dprintf("MsgConstruct\t0x%x, ref 1\n", (int)pM);*/
+ dprintf("MsgConstruct\t0x%x, ref 1\n", (int)pM);
return(pM);
}
@@ -1940,9 +1955,11 @@ static void MsgDestruct(struct msg * pM)
{
assert(pM != NULL);
/* DEV Debugging only ! dprintf("MsgDestruct\t0x%x, Ref now: %d\n", (int)pM, pM->iRefCount - 1); */
+ dprintf("MsgDestruct\t0x%x, Ref now: %d\n", (int)pM, pM->iRefCount - 1);
if(--pM->iRefCount == 0)
{
/* DEV Debugging Only! dprintf("MsgDestruct\t0x%x, RefCount now 0, doing DESTROY\n", (int)pM); */
+ dprintf("MsgDestruct\t0x%x, RefCount now 0, doing DESTROY\n", (int)pM);
if(pM->pszUxTradMsg != NULL)
free(pM->pszUxTradMsg);
if(pM->pszRawMsg != NULL)
@@ -1989,6 +2006,7 @@ static struct msg *MsgAddRef(struct msg *pM)
assert(pM != NULL);
pM->iRefCount++;
/* DEV debugging only! dprintf("MsgAddRef\t0x%x done, Ref now: %d\n", (int)pM, pM->iRefCount);*/
+ dprintf("MsgAddRef\t0x%x done, Ref now: %d\n", (int)pM, pM->iRefCount);
return(pM);
}
@@ -3026,14 +3044,20 @@ int main(int argc, char **argv)
* rgerhards, 2005-10-24
*/
#ifdef USE_PTHREADS
+ /* create message queue */
+ pMsgQueue = queueInit();
+ if(pMsgQueue == NULL) {
+ fprintf (stderr, "error: could not create message queue - terminating.\n");
+ exit (1);
+ }
/* start up worker thread */
{ int i;
i = pthread_create(&thrdWorker, NULL, singleWorker, NULL);
- printf("worker thread started with state %d\n", i);
+ dprintf("worker thread started with state %d\n", i);
}
#endif
- /* Main loop begins here. */
+ /* --------------------- Main loop begins here. ----------------------------------------- */
for (;;) {
int nfds;
errno = 0;
@@ -3732,11 +3756,7 @@ time_t now;
* function here probably is only an interim solution and that we need to
* think on the best way to do this.
*/
-void logmsgInternal(pri, msg, from, flags)
- int pri;
- char *msg;
- char *from;
- int flags;
+static void logmsgInternal(int pri, char * msg, char* from, int flags)
{
struct msg *pMsg;
@@ -3972,8 +3992,7 @@ static void processMsg(struct msg *pMsg)
if ((pMsg->msgFlags & MARK) && (now - f->f_time) < MarkInterval / 2)
continue;
- /*
- * suppress duplicate lines to this file
+ /* suppress duplicate lines to this file
*/
if ((pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(f->f_pMsg) &&
!strcmp(getMSG(pMsg), getMSG(f->f_pMsg)) &&
@@ -4012,6 +4031,68 @@ static void processMsg(struct msg *pMsg)
* do so many external definitons.
* rgerhards, 2005-10-24
*/
+msgQueue *queueInit (void)
+{
+ msgQueue *q;
+
+ q = (msgQueue *)malloc (sizeof (msgQueue));
+ if (q == NULL) return (NULL);
+
+ q->empty = 1;
+ q->full = 0;
+ q->head = 0;
+ q->tail = 0;
+ q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
+ pthread_mutex_init (q->mut, NULL);
+ q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
+ pthread_cond_init (q->notFull, NULL);
+ q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
+ pthread_cond_init (q->notEmpty, NULL);
+
+ return (q);
+}
+
+void queueDelete (msgQueue *q)
+{
+ pthread_mutex_destroy (q->mut);
+ free (q->mut);
+ pthread_cond_destroy (q->notFull);
+ free (q->notFull);
+ pthread_cond_destroy (q->notEmpty);
+ free (q->notEmpty);
+ free (q);
+}
+
+void queueAdd (msgQueue *q, void* in)
+{
+ q->buf[q->tail] = in;
+ q->tail++;
+ if (q->tail == QUEUESIZE)
+ q->tail = 0;
+ if (q->tail == q->head)
+ q->full = 1;
+ /* TODO: THE FOLLOWING LINE IS A TEST AID! Remove it! */
+// if(q->tail > 1)
+ /* syslogd will NOT work when the line above is present! */
+ q->empty = 0;
+
+ return;
+}
+
+void queueDel (msgQueue *q, struct msg **out)
+{
+ *out = (struct msg*) q->buf[q->head];
+
+ q->head++;
+ if (q->head == QUEUESIZE)
+ q->head = 0;
+ if (q->head == q->tail)
+ q->empty = 1;
+ q->full = 0;
+
+ return;
+}
+
/* The worker thread (so far, we have dual-threading, so only one
* worker thread. Having more than one worker requires considerable
@@ -4019,18 +4100,30 @@ static void processMsg(struct msg *pMsg)
*/
static void *singleWorker(void *vParam)
{
+ msgQueue *fifo = pMsgQueue;
struct msg *pMsg;
+
+ assert(fifo != NULL);
+
while(!bGlblDone) {
- pthread_mutex_lock(&mtxRunWorker);
- printf("worker waits for next message\n");
- pthread_cond_wait(&cndRunWorker, &mtxRunWorker);
- /* dequeue element (still protected from mutex) */
- printf("Worker dequeues...\n");
- pMsg = bufpMsg;
- pthread_mutex_unlock(&mtxRunWorker);
- /* do actual processing (the lengthy part, runs in parallel) */
- dprintf("worker is running\n");
- processMsg(pMsg);
+ pthread_mutex_lock(fifo->mut);
+ while (fifo->empty && !bGlblDone) {
+ dprintf ("singleWorker: queue EMPTY, waiting for next message.\n");
+ pthread_cond_wait (fifo->notEmpty, fifo->mut);
+ }
+ if(!fifo->empty) {
+ /* dequeue element (still protected from mutex) */
+ dprintf("Worker dequeues...\n");
+ queueDel(fifo, &pMsg);
+ assert(pMsg != NULL);
+ pthread_mutex_unlock(fifo->mut);
+ pthread_cond_signal (fifo->notFull);
+ /* do actual processing (the lengthy part, runs in parallel) */
+ dprintf("worker is running\n");
+ processMsg(pMsg);
+ dprintf("calling destructMsg(), Ref %d\n", pMsg->iRefCount);
+ MsgDestruct(pMsg);
+ }
}
pthread_exit(0);
@@ -4053,15 +4146,30 @@ static void *singleWorker(void *vParam)
static void enqueueMsg(struct msg *pMsg)
{
int iRet;
+ msgQueue *fifo = pMsgQueue;
+
assert(pMsg != NULL);
- iRet = pthread_mutex_lock(&mtxRunWorker);
- printf("EnqueueMsg waiting on mutex (%d)\n", iRet);
- bufpMsg = pMsg;
- /* now activate the worker thread */
- pthread_mutex_unlock(&mtxRunWorker);
- iRet = pthread_cond_signal(&cndRunWorker);
- printf("EnqueueMsg signaled condition (%d)\n", iRet);
+ if(fifo == NULL) {
+ /* multi-threading is not yet initialized, happens e.g.
+ * during startup and restart. rgerhards, 2005-10-25
+ */
+ dprintf("enqueueMsg: not yet running on multiple threads\n");
+ processMsg(pMsg);
+ } else {
+ /* "normal" mode, threading initialized */
+ iRet = pthread_mutex_lock(fifo->mut);
+ while (fifo->full) {
+ dprintf ("enqueueMsg: queue FULL.\n");
+ pthread_cond_wait (fifo->notFull, fifo->mut);
+ }
+ dprintf("enqueue and add ref\n");
+ queueAdd(fifo, MsgAddRef(pMsg));
+ /* now activate the worker thread */
+ pthread_mutex_unlock(fifo->mut);
+ iRet = pthread_cond_signal(fifo->notEmpty);
+ dprintf("EnqueueMsg signaled condition (%d)\n", iRet);
+ }
}
#endif /* #ifndef USE_PTHREADS */
@@ -5201,8 +5309,7 @@ static void die(int sig)
int i;
int was_initialized = Initialized;
- Initialized = 0; /* Don't log SIGCHLDs in case we
- receive one during exiting */
+ Initialized = 0; /* Don't log SIGCHLDs in case we receive one during exiting */
for (f = Files; f != NULL ; f = f->f_next) {
/* flush any pending output */
@@ -5224,7 +5331,7 @@ static void die(int sig)
PATCHLEVEL "\" x-pid=\"%d\"]" " exiting on signal %d.",
myPid, sig);
errno = 0;
- logmsgInternal(LOG_SYSLOG|LOG_INFO, buf, LocalHostName, ADDDATE);
+ logmsgInternal(LOG_SYSLOG|LOG_INFO, buf, LocalHostName, ADDDATE);
}
#ifdef USE_PTHREADS
@@ -5235,9 +5342,12 @@ static void die(int sig)
* harness!! It dumps *at least* because we have no qeuue!
*/
bGlblDone = 1;
- pthread_cond_signal(&cndRunWorker);
+ /* It's actually not "not empty" below but awaking the worker. The worker
+ * then finds out that it shall terminate and does so.
+ */
+ pthread_cond_signal(pMsgQueue->notEmpty);
pthread_join(thrdWorker, NULL);
- pthread_cond_destroy(&cndRunWorker);
+ /* delete fifo here! */
#endif
@@ -5252,7 +5362,8 @@ static void die(int sig)
free(f->f_iov);
}
/* Now delete cached messages */
- MsgDestruct(f->f_pMsg);
+ if(f->f_pMsg != NULL)
+ MsgDestruct(f->f_pMsg);
#ifdef WITH_DB
if (f->f_type == F_MYSQL)
closeMySQL(f);
@@ -6797,13 +6908,28 @@ int decode(name, codetab)
}
void dprintf(char *fmt, ...)
-
{
+# ifdef USE_PTHREADS
+ static int bWasNL = FALSE;
+# endif
va_list ap;
if ( !(Debug && debugging_on) )
return;
+# ifdef USE_PTHREADS
+ /* TODO: The bWasNL handler does not really work. It works if no thread
+ * switching occurs during non-NL messages. Else, things are messed
+ * up. Anyhow, it works well enough to provide useful help during
+ * getting this up and running. It is questionable if the extra effort
+ * is worth fixing it, giving the limited appliability.
+ * rgerhards, 2005-10-25
+ */
+ if(bWasNL) {
+ fprintf(stdout, "%8.8d: ", (unsigned int) pthread_self());
+ }
+ bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? TRUE : FALSE;
+# endif
va_start(ap, fmt);
vfprintf(stdout, fmt, ap);
va_end(ap);