From c6447bb3452e9ac3c4daed5827f711c2c6956ca4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 27 Jan 2011 16:33:13 +0100 Subject: experimental: added thread pool to tcpsrv epoll handler this seems to work in lab, but is brand-new code. needs practice drill. --- runtime/nsdpoll_ptcp.c | 3 +- tcpsrv.c | 149 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 126 insertions(+), 26 deletions(-) diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index 26810b7d..76c4e887 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -70,7 +70,8 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, pNew->id = id; pNew->pUsr = pUsr; pNew->pSock = pSock; - pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */ + //pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */ + pNew->event.events = EPOLLET; if(mode & NSDPOLL_IN) pNew->event.events |= EPOLLIN; if(mode & NSDPOLL_OUT) diff --git a/tcpsrv.c b/tcpsrv.c index 77b7b9be..baaa27b1 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -47,6 +47,7 @@ #include #include #include +#include #include #include #if HAVE_FCNTL_H @@ -91,6 +92,23 @@ DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + int idx; + tcpsrv_t *pSrv; /* pSrv == NULL -> idle */ + nspoll_t *pPoll; + void *pUsr; + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[4]; +static pthread_cond_t wrkrFinished; +static pthread_mutex_t wrkrMut; +static int wrkrMax = 4; +static int wrkrRunning; + /* add new listener port to listener port list * rgerhards, 2009-05-21 */ @@ -538,7 +556,6 @@ finalize_it: RETiRet; } - /* process a single workset item */ static inline rsRetVal @@ -563,22 +580,83 @@ finalize_it: } +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + ++wrkrRunning; + pthread_mutex_unlock(&wrkrMut); + +dbgprintf("XXX: worker %p activated\n", pthread_self()); + ++me->numCalled; + processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr); + + pthread_mutex_lock(&wrkrMut); + me->pSrv = NULL; /* indicate we are free again */ + --wrkrRunning; + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + /* Process a workset, that is handle io. We become activated * from either select or epoll handler. We split the workload * out to a pool of threads, but try to avoid context switches * as much as possible. */ -static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) +static rsRetVal +processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) { int i; DEFiRet; dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); - for(i = 0 ; i < numEntries ; i++) { + while(numEntries > 0) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr)); +dbgprintf("XXX: num entries during processing %d\n", numEntries); + if(numEntries == 1) { +dbgprintf("XXX: processWorkset 1\n"); + /* process self, save context switch */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); + } else { +dbgprintf("XXX: processWorkset 2\n"); + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) +dbgprintf("XXX: wrkrinfo[%d].pSrv %p\n", i, wrkrInfo[i].pSrv); + /*do search*/; + if(i < wrkrMax) { +dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax); + /* worker free -> use it! */ + wrkrInfo[i].pSrv = pThis; + wrkrInfo[i].pPoll = pPoll; + wrkrInfo[i].idx = workset[numEntries -1].id; + wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { +dbgprintf("XXX: processWorkset 2.2\n"); + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, + workset[numEntries-1].pUsr); + } + } + --numEntries; } finalize_it: @@ -732,26 +810,6 @@ Run(tcpsrv_t *pThis) continue; processWorkset(pThis, pPoll, numEntries, workset); -#if 0 - dbgprintf("poll returned with %d entries.\n", numEntries); - - for(i = 0 ; i < numEntries ; i++) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - currIdx = workset[i].id; - dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr); -dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]); - if(workset[i].pUsr == pThis->ppLstn) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]); - SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); - DBGPRINTF("New session created with NSD %p.\n", pNewSess); - } else { - pNewSess = (tcps_sess_t*) workset[i].pUsr; - doReceive(pThis, &pNewSess, pPoll); - } - } -#endif } /* remove the tcp listen sockets from the epoll set */ @@ -1155,11 +1213,49 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE ENDObjClassInit(tcpsrv) -/* --------------- here now comes the plumbing that makes as a library module --------------- */ +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrFinished, NULL); + for(i = 0 ; i < wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].pSrv = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + } +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrFinished); + pthread_mutex_destroy(&wrkrMut); + +} + + +/* --------------- here now comes the plumbing that makes as a library module --------------- */ BEGINmodExit CODESTARTmodExit +dbgprintf("tcpsrv: modExit\n"); + stopWorkerPool(); + /* de-init in reverse order! */ tcpsrvClassExit(); tcps_sessClassExit(); @@ -1179,6 +1275,9 @@ CODESTARTmodInit /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(tcps_sessClassInit(pModInfo)); CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ + + startWorkerPool(); + ENDmodInit /* vim:set ai: -- cgit