summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c283
1 files changed, 261 insertions, 22 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index f6daadeb..0e6e13d2 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -50,6 +50,7 @@
#include <ctype.h>
#include <netinet/in.h>
#include <netdb.h>
+#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#if HAVE_FCNTL_H
@@ -73,6 +74,7 @@
#include "ruleset.h"
#include "unicode-helper.h"
+
MODULE_TYPE_LIB
MODULE_TYPE_NOKEEP
@@ -95,6 +97,26 @@ DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
DEFobjCurrIf(statsobj)
+static void startWorkerPool(void);
+
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ int idx;
+ tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
+ nspoll_t *pPoll;
+ void *pUsr;
+ sbool enabled;
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[4];
+static sbool bWrkrRunning; /* are the worker threads running? */
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrMax = 4;
+static int wrkrRunning;
/* add new listener port to listener port list
* rgerhards, 2009-05-21
@@ -560,6 +582,138 @@ finalize_it:
RETiRet;
}
+/* process a single workset item
+ */
+static inline rsRetVal
+processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
+{
+ tcps_sess_t *pNewSess = NULL;
+ DEFiRet;
+
+ DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr);
+ if(pUsr == pThis->ppLstn) {
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
+ iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
+ if(iRet == RS_RET_OK) {
+ if(pPoll != NULL) {
+ dbgprintf("XXXXXX: processWorksetItem trying nspoll.ctl\n");
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ }
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ DBGPRINTF("tcpsrv: error %d during accept\n", iRet);
+ }
+ } else {
+ pNewSess = (tcps_sess_t*) pUsr;
+ doReceive(pThis, &pNewSess, pPoll);
+ if(pPoll == NULL && pNewSess == NULL) {
+ pThis->pSessions[idx] = NULL;
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1) {
+ --wrkrRunning;
+ break;
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ ++me->numCalled;
+ processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->pSrv = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+ }
+ me->enabled = 0; /* indicate we are no longer available */
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
+/* Process a workset, that is handle io. We become activated
+ * from either select or epoll handler. We split the workload
+ * out to a pool of threads, but try to avoid context switches
+ * as much as possible.
+ */
+static rsRetVal
+processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
+{
+ int i;
+ int origEntries = numEntries;
+ DEFiRet;
+
+ dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
+
+ while(numEntries > 0) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ if(numEntries == 1) {
+ /* process self, save context switch */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr);
+ } else {
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < wrkrMax) && ((wrkrInfo[i].pSrv != NULL) || (wrkrInfo[i].enabled == 0)) ; ++i)
+ /*do search*/;
+ if(i < wrkrMax) {
+ /* worker free -> use it! */
+ wrkrInfo[i].pSrv = pThis;
+ wrkrInfo[i].pPoll = pPoll;
+ wrkrInfo[i].idx = workset[numEntries -1].id;
+ wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
+ /* Note: we must increment wrkrRunning HERE and not inside the worker's
+ * code. This is because a worker may actually never start, and thus
+ * increment wrkrRunning, before we finish and check the running worker
+ * count. We can only avoid this by incrementing it here.
+ */
+ ++wrkrRunning;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id,
+ workset[numEntries-1].pUsr);
+ }
+ }
+ --numEntries;
+ }
+
+ if(origEntries > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
/* This function is called to gather input.
* This variant here is only used if we need to work with a netstream driver
@@ -567,14 +721,14 @@ finalize_it:
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static inline rsRetVal
-RunSelect(tcpsrv_t *pThis)
+RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset)
{
DEFiRet;
int nfds;
int i;
+ int iWorkset;
int iTCPSess;
int bIsReady;
- tcps_sess_t *pNewSess;
nssel_t *pSel = NULL;
rsRetVal localRet;
@@ -609,13 +763,21 @@ RunSelect(tcpsrv_t *pThis)
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
+ iWorkset = 0;
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
+ workset[iWorkset].id = i;
+ workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
+ //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
--nfds; /* indicate we have processed one */
}
}
@@ -627,11 +789,22 @@ RunSelect(tcpsrv_t *pThis)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
localRet = nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds);
if(bIsReady || localRet != RS_RET_OK) {
- doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
+ workset[iWorkset].id = iTCPSess;
+ workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess];
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
--nfds; /* indicate we have processed one */
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
+
+ if(iWorkset > 0)
+ processWorkset(pThis, NULL, iWorkset, workset);
+
+ /* we need to copy back close descriptors */
CHKiRet(nssel.Destruct(&pSel));
finalize_it: /* this is a very special case - this time only we do not exit the function,
* because that would not help us either. So we simply retry it. Let's see
@@ -662,13 +835,23 @@ Run(tcpsrv_t *pThis)
{
DEFiRet;
int i;
- tcps_sess_t *pNewSess;
+ nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */
+ int numEntries;
nspoll_t *pPoll = NULL;
- void *pUsr;
rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
+ /* check if we need to start the worker pool. Once it is running, all is
+ * well. Shutdown is done on modExit.
+ */
+ d_pthread_mutex_lock(&wrkrMut);
+ if(!bWrkrRunning) {
+ bWrkrRunning = 1;
+ startWorkerPool();
+ }
+ d_pthread_mutex_unlock(&wrkrMut);
+
/* this is an endless loop - it is terminated by the framework canelling
* this thread. Thus, we also need to instantiate a cancel cleanup handler
* to prevent us from leaking anything. -- rgerhards, 20080-04-24
@@ -680,11 +863,11 @@ Run(tcpsrv_t *pThis)
if(localRet != RS_RET_OK) {
/* fall back to select */
dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
- iRet = RunSelect(pThis);
+ iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t));
FINALIZE;
}
- dbgprintf("tcpsrv uses epoll() interface, nsdpol driver found\n");
+ dbgprintf("tcpsrv uses epoll() interface, nsdpoll driver found\n");
/* flag that we are in epoll mode */
pThis->bUsingEPoll = TRUE;
@@ -697,7 +880,8 @@ Run(tcpsrv_t *pThis)
}
while(1) {
- localRet = nspoll.Wait(pPoll, -1, &i, &pUsr);
+ numEntries = sizeof(workset)/sizeof(nsd_epworkset_t);
+ localRet = nspoll.Wait(pPoll, -1, &numEntries, workset);
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
@@ -708,17 +892,7 @@ Run(tcpsrv_t *pThis)
if(localRet != RS_RET_OK)
continue;
- dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr);
-
- if(pUsr == pThis->ppLstn) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
- CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
- DBGPRINTF("New session created with NSD %p.\n", pNewSess);
- } else {
- pNewSess = (tcps_sess_t*) pUsr;
- doReceive(pThis, &pNewSess, pPoll);
- }
+ processWorkset(pThis, pPoll, numEntries, workset);
}
/* remove the tcp listen sockets from the epoll set */
@@ -1148,11 +1322,62 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
ENDObjClassInit(tcpsrv)
-/* --------------- here now comes the plumbing that makes as a library module --------------- */
+/* start worker threads
+ * Important: if we fork, this MUST be done AFTER forking
+ */
+static void
+startWorkerPool(void)
+{
+ int i;
+ int r;
+ pthread_attr_t sessThrdAttr;
+
+ wrkrRunning = 0;
+ pthread_cond_init(&wrkrIdle, NULL);
+ pthread_attr_init(&sessThrdAttr);
+ pthread_attr_setstacksize(&sessThrdAttr, 200*1024);
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].pSrv = NULL;
+ wrkrInfo[i].numCalled = 0;
+ r = pthread_create(&wrkrInfo[i].tid, &sessThrdAttr, wrkr, &(wrkrInfo[i]));
+ if(r == 0) {
+ wrkrInfo[i].enabled = 1;
+ } else {
+ char errStr[1024];
+ wrkrInfo[i].enabled = 0;
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, NO_ERRCODE, "tcpsrv error creating thread %d: "
+ "%s", i, errStr);
+ }
+ }
+ pthread_attr_destroy(&sessThrdAttr);
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+/* --------------- here now comes the plumbing that makes as a library module --------------- */
+
BEGINmodExit
CODESTARTmodExit
+ stopWorkerPool();
/* de-init in reverse order! */
tcpsrvClassExit();
tcps_sessClassExit();
@@ -1168,6 +1393,20 @@ ENDqueryEtryPt
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+ /* we just init the worker mutex, but do not start the workers themselves. This is deferred
+ * to the first call of Run(). Reasons for this:
+ * 1. depending on load order, tcpsrv gets loaded during rsyslog startup BEFORE
+ * it forks, in which case the workers would be running in the then-killed parent,
+ * leading to a defuncnt child (we actually had this bug).
+ * 2. depending on circumstances, Run() would possibly never be called, in which case
+ * the worker threads would be totally useless.
+ * Note that in order to guarantee a non-racy worker start, we need to guard the
+ * startup sequence by a mutex, which is why we init it here (no problem with fork()
+ * in this case as the mutex is a pure-memory structure).
+ * rgerhards, 2012-05-18
+ */
+ pthread_mutex_init(&wrkrMut, NULL);
+ bWrkrRunning = 0;
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(tcps_sessClassInit(pModInfo));