diff options
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r-- | plugins/imptcp/imptcp.c | 229 |
1 files changed, 182 insertions, 47 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 33bfa81c..6e3a67ab 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -10,7 +10,7 @@ * * File begun on 2010-08-10 by RGerhards * - * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -83,7 +83,8 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(errmsg) DEFobjCurrIf(ruleset) - +/* forward references */ +static void * wrkr(void *myself); /* config settings */ typedef struct configSettings_s { @@ -154,6 +155,21 @@ struct ptcplstn_s { }; +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + struct epoll_event *event; /* event == NULL -> idle */ + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[4]; +static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; +static int wrkrMax = 4; +static int wrkrRunning; + + /* type of object stored in epoll descriptor */ typedef enum { epolld_lstn, @@ -175,16 +191,6 @@ struct epolld_s { static ptcpsrv_t *pSrvRoot = NULL; static int epollfd = -1; /* (sole) descriptor for epoll */ static int iMaxLine; /* maximum size of a single message */ -/* we use a single static receive buffer, as this module is not multi-threaded. Keeping - * the buffer in the data segment is probably a little bit more efficient than on the stack - * (but at least I can't believe it will ever be less efficient ;) -- rgerhards, 2010-08-10 - * Note that we do NOT (yet?) provide a config setting to set the buffer size. For usual - * syslog traffic, it should be large enough. Also keep in mind that we run under a virtual - * memory system, so if we do not use large parts of the buffer, that's no issue at - * all -- it'll just use up address space. On the other hand, it would be silly to page in - * or page out some data just to get space for the IO buffer. - */ -static char rcvBuf[128*1024]; /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); @@ -838,21 +844,6 @@ finalize_it: } -#if 0 -/* set permitted peer -- rgerhards, 2008-05-19 - */ -static rsRetVal -setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID) -{ - DEFiRet; - CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID)); - free(pszID); /* no longer needed, but we need to free as of interface def */ -finalize_it: - RETiRet; -} -#endif - - /* accept a new ruleset to bind. Checks if it exists and complains, if not */ static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) { @@ -911,6 +902,45 @@ finalize_it: } +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + wrkrRunning = 0; + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrIdle, NULL); + for(i = 0 ; i < wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].event = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + } + +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + printf("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrIdle); + pthread_mutex_destroy(&wrkrMut); + +} + + + /* start up all listeners * This is a one-time stop once the module is set to start. */ @@ -965,6 +995,7 @@ sessActivity(ptcpsess_t *pSess) { int lenRcv; int lenBuf; + char rcvBuf[128*1024]; DEFiRet; DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock); @@ -1002,36 +1033,138 @@ finalize_it: } +/* This function is called to process a single request. This may + * be carried out by the main worker or a helper. It can be run + * concurrently. + */ +static inline void +processWorkItem(struct epoll_event *event) +{ + epolld_t *epd; + + epd = (epolld_t*) event->data.ptr; + switch(epd->typ) { + case epolld_lstn: + lstnActivity((ptcplstn_t *) epd->ptr); + break; + case epolld_sess: + sessActivity((ptcpsess_t *) epd->ptr); + break; + default: + errmsg.LogError(0, RS_RET_INTERNAL_ERROR, + "error: invalid epolld_type_t %d after epoll", epd->typ); + break; + } +} + + +/* This function is called to process a complete workset, that + * is a set of events returned from epoll. + */ +static inline void +processWorkSet(int nEvents, struct epoll_event events[]) +{ + int iEvt; + int i; + int remainEvents; + + remainEvents = nEvents; + for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) { +dbgprintf("XXX: remain entries during processing %d\n", remainEvents); + if(remainEvents == 1) { +dbgprintf("XXX: processWorkset 1\n"); + /* process self, save context switch */ + processWorkItem(events+iEvt); + } else { +dbgprintf("XXX: processWorkset 2\n"); + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i) +dbgprintf("XXX: wrkrinfo[%d].event %p\n", i, wrkrInfo[i].event); + /*do search*/; + if(i < wrkrMax) { +dbgprintf("XXX: processWorkset 2.1, event=%p, wrkrRunnig %d, max %d\n", events+iEvt, wrkrRunning, wrkrMax); + /* worker free -> use it! */ + wrkrInfo[i].event = events+iEvt; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { +dbgprintf("XXX: processWorkset 2.2\n"); + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorkItem(events+iEvt); + } + } + --remainEvents; + } + + if(nEvents > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); +dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning); + while(wrkrRunning > 0) { +dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning); + pthread_cond_wait(&wrkrIdle, &wrkrMut); +dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning); + } + pthread_mutex_unlock(&wrkrMut); + } + +} + + +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->event == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + ++wrkrRunning; + pthread_mutex_unlock(&wrkrMut); + +dbgprintf("XXX: worker %p activated\n", pthread_self()); + ++me->numCalled; + processWorkItem(me->event); + + pthread_mutex_lock(&wrkrMut); + me->event = NULL; /* indicate we are free again */ + --wrkrRunning; + pthread_cond_signal(&wrkrIdle); +dbgprintf("XXX: worker %p idling, currently running %d\n", pthread_self(), wrkrRunning); + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + /* This function is called to gather input. */ BEGINrunInput - int i; - int nfds; + int nEvents; struct epoll_event events[128]; - epolld_t *epd; CODESTARTrunInput + startWorkerPool(); DBGPRINTF("imptcp: now beginning to process input data\n"); while(glbl.GetGlobalInputTermState() == 0) { DBGPRINTF("imptcp going on epoll_wait\n"); - nfds = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); - DBGPRINTF("imptcp: epoll returned %d events\n", nfds); - for(i = 0 ; (i < nfds) && (glbl.GetGlobalInputTermState() == 0) ; ++i) { /* support for larger batches (later, TODO) */ - epd = (epolld_t*) events[i].data.ptr; - switch(epd->typ) { - case epolld_lstn: - lstnActivity((ptcplstn_t *) epd->ptr); - break; - case epolld_sess: - sessActivity((ptcpsess_t *) epd->ptr); - break; - default: - errmsg.LogError(0, RS_RET_INTERNAL_ERROR, - "error: invalid epolld_type_t %d after epoll", epd->typ); - break; - } - } + nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); + DBGPRINTF("imptcp: epoll returned %d events\n", nEvents); + processWorkSet(nEvents, events); } DBGPRINTF("imptcp: successfully terminated\n"); + /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */ ENDrunInput @@ -1104,6 +1237,8 @@ shutdownSrv(ptcpsrv_t *pSrv) BEGINafterRun ptcpsrv_t *pSrv, *srvDel; CODESTARTafterRun + stopWorkerPool(); + /* we need to close everything that is still open */ pSrv = pSrvRoot; while(pSrv != NULL) { |