summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-01-27 16:33:13 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-01-27 16:33:13 +0100
commitc6447bb3452e9ac3c4daed5827f711c2c6956ca4 (patch)
tree0ec04ea9646cc7faa0c97a81c6c595e14d0beb4d
parent3049f535fff9d351480bceb7ea82667176a7c8a2 (diff)
downloadrsyslog-c6447bb3452e9ac3c4daed5827f711c2c6956ca4.tar.gz
rsyslog-c6447bb3452e9ac3c4daed5827f711c2c6956ca4.tar.xz
rsyslog-c6447bb3452e9ac3c4daed5827f711c2c6956ca4.zip
experimental: added thread pool to tcpsrv epoll handler
this seems to work in lab, but is brand-new code. needs practice drill.
-rw-r--r--runtime/nsdpoll_ptcp.c3
-rw-r--r--tcpsrv.c149
2 files changed, 126 insertions, 26 deletions
diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c
index 26810b7d..76c4e887 100644
--- a/runtime/nsdpoll_ptcp.c
+++ b/runtime/nsdpoll_ptcp.c
@@ -70,7 +70,8 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock,
pNew->id = id;
pNew->pUsr = pUsr;
pNew->pSock = pSock;
- pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */
+ //pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */
+ pNew->event.events = EPOLLET;
if(mode & NSDPOLL_IN)
pNew->event.events |= EPOLLIN;
if(mode & NSDPOLL_OUT)
diff --git a/tcpsrv.c b/tcpsrv.c
index 77b7b9be..baaa27b1 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -47,6 +47,7 @@
#include <ctype.h>
#include <netinet/in.h>
#include <netdb.h>
+#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#if HAVE_FCNTL_H
@@ -91,6 +92,23 @@ DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ int idx;
+ tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
+ nspoll_t *pPoll;
+ void *pUsr;
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[4];
+static pthread_cond_t wrkrFinished;
+static pthread_mutex_t wrkrMut;
+static int wrkrMax = 4;
+static int wrkrRunning;
+
/* add new listener port to listener port list
* rgerhards, 2009-05-21
*/
@@ -538,7 +556,6 @@ finalize_it:
RETiRet;
}
-
/* process a single workset item
*/
static inline rsRetVal
@@ -563,22 +580,83 @@ finalize_it:
}
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1)
+ break;
+ ++wrkrRunning;
+ pthread_mutex_unlock(&wrkrMut);
+
+dbgprintf("XXX: worker %p activated\n", pthread_self());
+ ++me->numCalled;
+ processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->pSrv = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
/* Process a workset, that is handle io. We become activated
* from either select or epoll handler. We split the workload
* out to a pool of threads, but try to avoid context switches
* as much as possible.
*/
-static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
+static rsRetVal
+processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
{
int i;
DEFiRet;
dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
- for(i = 0 ; i < numEntries ; i++) {
+ while(numEntries > 0) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr));
+dbgprintf("XXX: num entries during processing %d\n", numEntries);
+ if(numEntries == 1) {
+dbgprintf("XXX: processWorkset 1\n");
+ /* process self, save context switch */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr);
+ } else {
+dbgprintf("XXX: processWorkset 2\n");
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i)
+dbgprintf("XXX: wrkrinfo[%d].pSrv %p\n", i, wrkrInfo[i].pSrv);
+ /*do search*/;
+ if(i < wrkrMax) {
+dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax);
+ /* worker free -> use it! */
+ wrkrInfo[i].pSrv = pThis;
+ wrkrInfo[i].pPoll = pPoll;
+ wrkrInfo[i].idx = workset[numEntries -1].id;
+ wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+dbgprintf("XXX: processWorkset 2.2\n");
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id,
+ workset[numEntries-1].pUsr);
+ }
+ }
+ --numEntries;
}
finalize_it:
@@ -732,26 +810,6 @@ Run(tcpsrv_t *pThis)
continue;
processWorkset(pThis, pPoll, numEntries, workset);
-#if 0
- dbgprintf("poll returned with %d entries.\n", numEntries);
-
- for(i = 0 ; i < numEntries ; i++) {
- if(glbl.GetGlobalInputTermState() == 1)
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- currIdx = workset[i].id;
- dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr);
-dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]);
- if(workset[i].pUsr == pThis->ppLstn) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]);
- SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]);
- CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
- DBGPRINTF("New session created with NSD %p.\n", pNewSess);
- } else {
- pNewSess = (tcps_sess_t*) workset[i].pUsr;
- doReceive(pThis, &pNewSess, pPoll);
- }
- }
-#endif
}
/* remove the tcp listen sockets from the epoll set */
@@ -1155,11 +1213,49 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
ENDObjClassInit(tcpsrv)
-/* --------------- here now comes the plumbing that makes as a library module --------------- */
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+startWorkerPool(void)
+{
+ int i;
+ pthread_mutex_init(&wrkrMut, NULL);
+ pthread_cond_init(&wrkrFinished, NULL);
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].pSrv = NULL;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i]));
+ }
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrFinished);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+
+
+/* --------------- here now comes the plumbing that makes as a library module --------------- */
BEGINmodExit
CODESTARTmodExit
+dbgprintf("tcpsrv: modExit\n");
+ stopWorkerPool();
+
/* de-init in reverse order! */
tcpsrvClassExit();
tcps_sessClassExit();
@@ -1179,6 +1275,9 @@ CODESTARTmodInit
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(tcps_sessClassInit(pModInfo));
CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+
+ startWorkerPool();
+
ENDmodInit
/* vim:set ai: