summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c244
1 files changed, 222 insertions, 22 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index f6daadeb..b5b64f07 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -50,6 +50,7 @@
#include <ctype.h>
#include <netinet/in.h>
#include <netdb.h>
+#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#if HAVE_FCNTL_H
@@ -73,6 +74,7 @@
#include "ruleset.h"
#include "unicode-helper.h"
+
MODULE_TYPE_LIB
MODULE_TYPE_NOKEEP
@@ -96,6 +98,23 @@ DEFobjCurrIf(prop)
DEFobjCurrIf(statsobj)
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ int idx;
+ tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
+ nspoll_t *pPoll;
+ void *pUsr;
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[4];
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrMax = 4;
+static int wrkrRunning;
+
/* add new listener port to listener port list
* rgerhards, 2009-05-21
*/
@@ -560,6 +579,135 @@ finalize_it:
RETiRet;
}
+/* process a single workset item
+ */
+static inline rsRetVal
+processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
+{
+ tcps_sess_t *pNewSess = NULL;
+ DEFiRet;
+
+ DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr);
+ if(pUsr == pThis->ppLstn) {
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
+ iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
+ if(iRet == RS_RET_OK) {
+ if(pPoll != NULL) {
+ dbgprintf("XXXXXX: processWorksetItem trying nspoll.ctl\n");
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ }
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ DBGPRINTF("tcpsrv: error %d during accept\n", iRet);
+ }
+ } else {
+ pNewSess = (tcps_sess_t*) pUsr;
+ doReceive(pThis, &pNewSess, pPoll);
+ if(pPoll == NULL && pNewSess == NULL) {
+ pThis->pSessions[idx] = NULL;
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1)
+ break;
+ pthread_mutex_unlock(&wrkrMut);
+
+ ++me->numCalled;
+ processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->pSrv = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
+/* Process a workset, that is handle io. We become activated
+ * from either select or epoll handler. We split the workload
+ * out to a pool of threads, but try to avoid context switches
+ * as much as possible.
+ */
+static rsRetVal
+processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
+{
+ int i;
+ int origEntries = numEntries;
+ DEFiRet;
+
+ dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
+
+ while(numEntries > 0) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ if(numEntries == 1) {
+ /* process self, save context switch */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr);
+ } else {
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i)
+ /*do search*/;
+ if(i < wrkrMax) {
+ /* worker free -> use it! */
+ wrkrInfo[i].pSrv = pThis;
+ wrkrInfo[i].pPoll = pPoll;
+ wrkrInfo[i].idx = workset[numEntries -1].id;
+ wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
+ /* Note: we must increment wrkrRunning HERE and not inside the worker's
+ * code. This is because a worker may actually never start, and thus
+ * increment wrkrRunning, before we finish and check the running worker
+ * count. We can only avoid this by incrementing it here.
+ */
+ ++wrkrRunning;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id,
+ workset[numEntries-1].pUsr);
+ }
+ }
+ --numEntries;
+ }
+
+ if(origEntries > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
/* This function is called to gather input.
* This variant here is only used if we need to work with a netstream driver
@@ -567,14 +715,14 @@ finalize_it:
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static inline rsRetVal
-RunSelect(tcpsrv_t *pThis)
+RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset)
{
DEFiRet;
int nfds;
int i;
+ int iWorkset;
int iTCPSess;
int bIsReady;
- tcps_sess_t *pNewSess;
nssel_t *pSel = NULL;
rsRetVal localRet;
@@ -609,13 +757,21 @@ RunSelect(tcpsrv_t *pThis)
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
+ iWorkset = 0;
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
+ workset[iWorkset].id = i;
+ workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
+ //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
--nfds; /* indicate we have processed one */
}
}
@@ -627,11 +783,22 @@ RunSelect(tcpsrv_t *pThis)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
localRet = nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds);
if(bIsReady || localRet != RS_RET_OK) {
- doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
+ workset[iWorkset].id = iTCPSess;
+ workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess];
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
--nfds; /* indicate we have processed one */
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
+
+ if(iWorkset > 0)
+ processWorkset(pThis, NULL, iWorkset, workset);
+
+ /* we need to copy back close descriptors */
CHKiRet(nssel.Destruct(&pSel));
finalize_it: /* this is a very special case - this time only we do not exit the function,
* because that would not help us either. So we simply retry it. Let's see
@@ -662,9 +829,9 @@ Run(tcpsrv_t *pThis)
{
DEFiRet;
int i;
- tcps_sess_t *pNewSess;
+ nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */
+ int numEntries;
nspoll_t *pPoll = NULL;
- void *pUsr;
rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -680,11 +847,11 @@ Run(tcpsrv_t *pThis)
if(localRet != RS_RET_OK) {
/* fall back to select */
dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
- iRet = RunSelect(pThis);
+ iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t));
FINALIZE;
}
- dbgprintf("tcpsrv uses epoll() interface, nsdpol driver found\n");
+ dbgprintf("tcpsrv uses epoll() interface, nsdpoll driver found\n");
/* flag that we are in epoll mode */
pThis->bUsingEPoll = TRUE;
@@ -697,7 +864,8 @@ Run(tcpsrv_t *pThis)
}
while(1) {
- localRet = nspoll.Wait(pPoll, -1, &i, &pUsr);
+ numEntries = sizeof(workset)/sizeof(nsd_epworkset_t);
+ localRet = nspoll.Wait(pPoll, -1, &numEntries, workset);
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
@@ -708,17 +876,7 @@ Run(tcpsrv_t *pThis)
if(localRet != RS_RET_OK)
continue;
- dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr);
-
- if(pUsr == pThis->ppLstn) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
- CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
- DBGPRINTF("New session created with NSD %p.\n", pNewSess);
- } else {
- pNewSess = (tcps_sess_t*) pUsr;
- doReceive(pThis, &pNewSess, pPoll);
- }
+ processWorkset(pThis, pPoll, numEntries, workset);
}
/* remove the tcp listen sockets from the epoll set */
@@ -1148,11 +1306,50 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
ENDObjClassInit(tcpsrv)
-/* --------------- here now comes the plumbing that makes as a library module --------------- */
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+startWorkerPool(void)
+{
+ int i;
+ wrkrRunning = 0;
+ pthread_mutex_init(&wrkrMut, NULL);
+ pthread_cond_init(&wrkrIdle, NULL);
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].pSrv = NULL;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i]));
+ }
+
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+/* --------------- here now comes the plumbing that makes as a library module --------------- */
+
BEGINmodExit
CODESTARTmodExit
+dbgprintf("tcpsrv: modExit\n");
+ stopWorkerPool();
+
/* de-init in reverse order! */
tcpsrvClassExit();
tcps_sessClassExit();
@@ -1172,6 +1369,9 @@ CODESTARTmodInit
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(tcps_sessClassInit(pModInfo));
CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+
+ startWorkerPool();
+
ENDmodInit
/* vim:set ai: