diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2005-10-25 08:04:04 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2005-10-25 08:04:04 +0000 |
commit | 7ec0b41357a8c5a73600ef4b6e74dbff41a1222d (patch) | |
tree | 08567b63e3391666d9bcb57491e9803beb180248 | |
parent | 0795edb76c48886a2937fb93f001a3e3b79ab962 (diff) | |
download | rsyslog-7ec0b41357a8c5a73600ef4b6e74dbff41a1222d.tar.gz rsyslog-7ec0b41357a8c5a73600ef4b6e74dbff41a1222d.tar.xz rsyslog-7ec0b41357a8c5a73600ef4b6e74dbff41a1222d.zip |
coming closer to a real dual-threading implementation. Now with queue.
-rw-r--r-- | syslogd.c | 200 |
1 files changed, 163 insertions, 37 deletions
@@ -579,11 +579,21 @@ static rsCStrObj *pDfltProgNameCmp; /* supporting structures for multithreading */ #ifdef USE_PTHREADS -static pthread_cond_t cndRunWorker = PTHREAD_COND_INITIALIZER; -static pthread_mutex_t mtxRunWorker = PTHREAD_MUTEX_INITIALIZER; +/* this is the first approach to a queue, this time with static + * memory. + */ +#define QUEUESIZE 100 +typedef struct { + void* buf[QUEUESIZE]; + long head, tail; + int full, empty; + pthread_mutex_t *mut; + pthread_cond_t *notFull, *notEmpty; +} msgQueue; + +msgQueue *pMsgQueue = NULL; static pthread_t thrdWorker; static int bGlblDone = 0; -static struct msg *bufpMsg = NULL; #endif /* END supporting structures for multithreading */ @@ -782,7 +792,11 @@ static char template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility /* end template */ +/* up to the next comment, prototypes that should be removed by reordering */ +#ifdef USE_PTHREADS +msgQueue *queueInit (void); static void *singleWorker(void *vParam); /* REMOVEME later 2005-10-24 */ +#endif /* Function prototypes. */ int main(int argc, char **argv); static char **crunch_list(char *list); @@ -1927,7 +1941,8 @@ static struct msg* MsgConstruct() getCurrTime(&(pM->tRcvdAt)); } - /* DEV debugging only! dprintf("MsgConstruct\t0x%x\n", (int)pM);*/ + /* DEV debugging only! dprintf("MsgConstruct\t0x%x, ref 1\n", (int)pM);*/ + dprintf("MsgConstruct\t0x%x, ref 1\n", (int)pM); return(pM); } @@ -1940,9 +1955,11 @@ static void MsgDestruct(struct msg * pM) { assert(pM != NULL); /* DEV Debugging only ! dprintf("MsgDestruct\t0x%x, Ref now: %d\n", (int)pM, pM->iRefCount - 1); */ + dprintf("MsgDestruct\t0x%x, Ref now: %d\n", (int)pM, pM->iRefCount - 1); if(--pM->iRefCount == 0) { /* DEV Debugging Only! dprintf("MsgDestruct\t0x%x, RefCount now 0, doing DESTROY\n", (int)pM); */ + dprintf("MsgDestruct\t0x%x, RefCount now 0, doing DESTROY\n", (int)pM); if(pM->pszUxTradMsg != NULL) free(pM->pszUxTradMsg); if(pM->pszRawMsg != NULL) @@ -1989,6 +2006,7 @@ static struct msg *MsgAddRef(struct msg *pM) assert(pM != NULL); pM->iRefCount++; /* DEV debugging only! dprintf("MsgAddRef\t0x%x done, Ref now: %d\n", (int)pM, pM->iRefCount);*/ + dprintf("MsgAddRef\t0x%x done, Ref now: %d\n", (int)pM, pM->iRefCount); return(pM); } @@ -3026,14 +3044,20 @@ int main(int argc, char **argv) * rgerhards, 2005-10-24 */ #ifdef USE_PTHREADS + /* create message queue */ + pMsgQueue = queueInit(); + if(pMsgQueue == NULL) { + fprintf (stderr, "error: could not create message queue - terminating.\n"); + exit (1); + } /* start up worker thread */ { int i; i = pthread_create(&thrdWorker, NULL, singleWorker, NULL); - printf("worker thread started with state %d\n", i); + dprintf("worker thread started with state %d\n", i); } #endif - /* Main loop begins here. */ + /* --------------------- Main loop begins here. ----------------------------------------- */ for (;;) { int nfds; errno = 0; @@ -3732,11 +3756,7 @@ time_t now; * function here probably is only an interim solution and that we need to * think on the best way to do this. */ -void logmsgInternal(pri, msg, from, flags) - int pri; - char *msg; - char *from; - int flags; +static void logmsgInternal(int pri, char * msg, char* from, int flags) { struct msg *pMsg; @@ -3972,8 +3992,7 @@ static void processMsg(struct msg *pMsg) if ((pMsg->msgFlags & MARK) && (now - f->f_time) < MarkInterval / 2) continue; - /* - * suppress duplicate lines to this file + /* suppress duplicate lines to this file */ if ((pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(f->f_pMsg) && !strcmp(getMSG(pMsg), getMSG(f->f_pMsg)) && @@ -4012,6 +4031,68 @@ static void processMsg(struct msg *pMsg) * do so many external definitons. * rgerhards, 2005-10-24 */ +msgQueue *queueInit (void) +{ + msgQueue *q; + + q = (msgQueue *)malloc (sizeof (msgQueue)); + if (q == NULL) return (NULL); + + q->empty = 1; + q->full = 0; + q->head = 0; + q->tail = 0; + q->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); + pthread_mutex_init (q->mut, NULL); + q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); + pthread_cond_init (q->notFull, NULL); + q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); + pthread_cond_init (q->notEmpty, NULL); + + return (q); +} + +void queueDelete (msgQueue *q) +{ + pthread_mutex_destroy (q->mut); + free (q->mut); + pthread_cond_destroy (q->notFull); + free (q->notFull); + pthread_cond_destroy (q->notEmpty); + free (q->notEmpty); + free (q); +} + +void queueAdd (msgQueue *q, void* in) +{ + q->buf[q->tail] = in; + q->tail++; + if (q->tail == QUEUESIZE) + q->tail = 0; + if (q->tail == q->head) + q->full = 1; + /* TODO: THE FOLLOWING LINE IS A TEST AID! Remove it! */ +// if(q->tail > 1) + /* syslogd will NOT work when the line above is present! */ + q->empty = 0; + + return; +} + +void queueDel (msgQueue *q, struct msg **out) +{ + *out = (struct msg*) q->buf[q->head]; + + q->head++; + if (q->head == QUEUESIZE) + q->head = 0; + if (q->head == q->tail) + q->empty = 1; + q->full = 0; + + return; +} + /* The worker thread (so far, we have dual-threading, so only one * worker thread. Having more than one worker requires considerable @@ -4019,18 +4100,30 @@ static void processMsg(struct msg *pMsg) */ static void *singleWorker(void *vParam) { + msgQueue *fifo = pMsgQueue; struct msg *pMsg; + + assert(fifo != NULL); + while(!bGlblDone) { - pthread_mutex_lock(&mtxRunWorker); - printf("worker waits for next message\n"); - pthread_cond_wait(&cndRunWorker, &mtxRunWorker); - /* dequeue element (still protected from mutex) */ - printf("Worker dequeues...\n"); - pMsg = bufpMsg; - pthread_mutex_unlock(&mtxRunWorker); - /* do actual processing (the lengthy part, runs in parallel) */ - dprintf("worker is running\n"); - processMsg(pMsg); + pthread_mutex_lock(fifo->mut); + while (fifo->empty && !bGlblDone) { + dprintf ("singleWorker: queue EMPTY, waiting for next message.\n"); + pthread_cond_wait (fifo->notEmpty, fifo->mut); + } + if(!fifo->empty) { + /* dequeue element (still protected from mutex) */ + dprintf("Worker dequeues...\n"); + queueDel(fifo, &pMsg); + assert(pMsg != NULL); + pthread_mutex_unlock(fifo->mut); + pthread_cond_signal (fifo->notFull); + /* do actual processing (the lengthy part, runs in parallel) */ + dprintf("worker is running\n"); + processMsg(pMsg); + dprintf("calling destructMsg(), Ref %d\n", pMsg->iRefCount); + MsgDestruct(pMsg); + } } pthread_exit(0); @@ -4053,15 +4146,30 @@ static void *singleWorker(void *vParam) static void enqueueMsg(struct msg *pMsg) { int iRet; + msgQueue *fifo = pMsgQueue; + assert(pMsg != NULL); - iRet = pthread_mutex_lock(&mtxRunWorker); - printf("EnqueueMsg waiting on mutex (%d)\n", iRet); - bufpMsg = pMsg; - /* now activate the worker thread */ - pthread_mutex_unlock(&mtxRunWorker); - iRet = pthread_cond_signal(&cndRunWorker); - printf("EnqueueMsg signaled condition (%d)\n", iRet); + if(fifo == NULL) { + /* multi-threading is not yet initialized, happens e.g. + * during startup and restart. rgerhards, 2005-10-25 + */ + dprintf("enqueueMsg: not yet running on multiple threads\n"); + processMsg(pMsg); + } else { + /* "normal" mode, threading initialized */ + iRet = pthread_mutex_lock(fifo->mut); + while (fifo->full) { + dprintf ("enqueueMsg: queue FULL.\n"); + pthread_cond_wait (fifo->notFull, fifo->mut); + } + dprintf("enqueue and add ref\n"); + queueAdd(fifo, MsgAddRef(pMsg)); + /* now activate the worker thread */ + pthread_mutex_unlock(fifo->mut); + iRet = pthread_cond_signal(fifo->notEmpty); + dprintf("EnqueueMsg signaled condition (%d)\n", iRet); + } } #endif /* #ifndef USE_PTHREADS */ @@ -5201,8 +5309,7 @@ static void die(int sig) int i; int was_initialized = Initialized; - Initialized = 0; /* Don't log SIGCHLDs in case we - receive one during exiting */ + Initialized = 0; /* Don't log SIGCHLDs in case we receive one during exiting */ for (f = Files; f != NULL ; f = f->f_next) { /* flush any pending output */ @@ -5224,7 +5331,7 @@ static void die(int sig) PATCHLEVEL "\" x-pid=\"%d\"]" " exiting on signal %d.", myPid, sig); errno = 0; - logmsgInternal(LOG_SYSLOG|LOG_INFO, buf, LocalHostName, ADDDATE); + logmsgInternal(LOG_SYSLOG|LOG_INFO, buf, LocalHostName, ADDDATE); } #ifdef USE_PTHREADS @@ -5235,9 +5342,12 @@ static void die(int sig) * harness!! It dumps *at least* because we have no qeuue! */ bGlblDone = 1; - pthread_cond_signal(&cndRunWorker); + /* It's actually not "not empty" below but awaking the worker. The worker + * then finds out that it shall terminate and does so. + */ + pthread_cond_signal(pMsgQueue->notEmpty); pthread_join(thrdWorker, NULL); - pthread_cond_destroy(&cndRunWorker); + /* delete fifo here! */ #endif @@ -5252,7 +5362,8 @@ static void die(int sig) free(f->f_iov); } /* Now delete cached messages */ - MsgDestruct(f->f_pMsg); + if(f->f_pMsg != NULL) + MsgDestruct(f->f_pMsg); #ifdef WITH_DB if (f->f_type == F_MYSQL) closeMySQL(f); @@ -6797,13 +6908,28 @@ int decode(name, codetab) } void dprintf(char *fmt, ...) - { +# ifdef USE_PTHREADS + static int bWasNL = FALSE; +# endif va_list ap; if ( !(Debug && debugging_on) ) return; +# ifdef USE_PTHREADS + /* TODO: The bWasNL handler does not really work. It works if no thread + * switching occurs during non-NL messages. Else, things are messed + * up. Anyhow, it works well enough to provide useful help during + * getting this up and running. It is questionable if the extra effort + * is worth fixing it, giving the limited appliability. + * rgerhards, 2005-10-25 + */ + if(bWasNL) { + fprintf(stdout, "%8.8d: ", (unsigned int) pthread_self()); + } + bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? TRUE : FALSE; +# endif va_start(ap, fmt); vfprintf(stdout, fmt, ap); va_end(ap); |