summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c486
1 files changed, 419 insertions, 67 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index 8c992f79..c091df0b 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -15,12 +15,9 @@
* callbacks before the code is run. The tcpsrv then calls back
* into the specific input modules at the appropriate time.
*
- * NOTE: read comments in module-template.h to understand how this file
- * works!
- *
* File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007, 2008, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -50,6 +47,7 @@
#include <ctype.h>
#include <netinet/in.h>
#include <netdb.h>
+#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#if HAVE_FCNTL_H
@@ -68,11 +66,14 @@
#include "netstrms.h"
#include "netstrm.h"
#include "nssel.h"
+#include "nspoll.h"
#include "errmsg.h"
#include "ruleset.h"
#include "unicode-helper.h"
+
MODULE_TYPE_LIB
+MODULE_TYPE_NOKEEP
/* defines */
#define TCPSESS_MAX_DEFAULT 200 /* default for nbr of tcp sessions if no number is given */
@@ -89,9 +90,27 @@ DEFobjCurrIf(net)
DEFobjCurrIf(netstrms)
DEFobjCurrIf(netstrm)
DEFobjCurrIf(nssel)
+DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ int idx;
+ tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
+ nspoll_t *pPoll;
+ void *pUsr;
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[4];
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrMax = 4;
+static int wrkrRunning;
+
/* add new listener port to listener port list
* rgerhards, 2009-05-21
*/
@@ -104,7 +123,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
ISOBJ_TYPE_assert(pThis, tcpsrv);
/* create entry */
- CHKmalloc(pEntry = malloc(sizeof(tcpLstnPortList_t)));
+ CHKmalloc(pEntry = MALLOC(sizeof(tcpLstnPortList_t)));
pEntry->pszPort = pszPort;
pEntry->pSrv = pThis;
pEntry->pRuleset = pThis->pRuleset;
@@ -165,9 +184,9 @@ TCPSessTblInit(tcpsrv_t *pThis)
ISOBJ_TYPE_assert(pThis, tcpsrv);
assert(pThis->pSessions == NULL);
- dbgprintf("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax);
+ DBGPRINTF("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax);
if((pThis->pSessions = (tcps_sess_t **) calloc(pThis->iSessMax, sizeof(tcps_sess_t *))) == NULL) {
- dbgprintf("Error: TCPSessInit() could not alloc memory for TCP session table.\n");
+ DBGPRINTF("Error: TCPSessInit() could not alloc memory for TCP session table.\n");
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
@@ -238,11 +257,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
if(pThis->pSessions != NULL) {
/* close all TCP connections! */
- i = TCPSessGetNxtSess(pThis, -1);
- while(i != -1) {
- tcps_sess.Destruct(&pThis->pSessions[i]);
- /* now get next... */
- i = TCPSessGetNxtSess(pThis, i);
+ if(!pThis->bUsingEPoll) {
+ i = TCPSessGetNxtSess(pThis, -1);
+ while(i != -1) {
+ tcps_sess.Destruct(&pThis->pSessions[i]);
+ /* now get next... */
+ i = TCPSessGetNxtSess(pThis, i);
+ }
}
/* we are done with the session table - so get rid of it... */
@@ -412,7 +433,7 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
* rgerhards, 2005-09-26
*/
if(!pThis->pIsPermittedHost((struct sockaddr*) addr, (char*) fromHostFQDN, pThis->pUsr, pSess->pUsr)) {
- dbgprintf("%s is not an allowed sender\n", fromHostFQDN);
+ DBGPRINTF("%s is not an allowed sender\n", fromHostFQDN);
if(glbl.GetOption_DisallowWarning()) {
errno = 0;
errmsg.LogError(0, RS_RET_HOST_NOT_PERMITTED, "TCP message from disallowed sender %s discarded", fromHostFQDN);
@@ -438,7 +459,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
}
*ppSess = pSess;
- pThis->pSessions[iSess] = pSess;
+ if(!pThis->bUsingEPoll)
+ pThis->pSessions[iSess] = pSess;
pSess = NULL; /* this is now also handed over */
finalize_it:
@@ -465,26 +487,230 @@ RunCancelCleanup(void *arg)
}
-/* This function is called to gather input. */
-#pragma GCC diagnostic ignored "-Wempty-body"
+/* helper to close a session. Takes status of poll vs. select into consideration.
+ * rgerhards, 2009-11-25
+ */
+static inline rsRetVal
+closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) {
+ DEFiRet;
+ if(pPoll != NULL) {
+ CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
+ }
+ pThis->pOnRegularClose(*ppSess);
+ tcps_sess.Destruct(ppSess);
+finalize_it:
+ RETiRet;
+}
+
+
+/* process a receive request on one of the streams
+ * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need
+ * to remove any descriptor we close from the epoll set.
+ * rgerhards, 2009-07-020
+ */
static rsRetVal
-Run(tcpsrv_t *pThis)
+doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
{
- DEFiRet;
+ char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
+ ssize_t iRcvd;
rsRetVal localRet;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm);
+ /* Receive message */
+ iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd);
+ switch(iRet) {
+ case RS_RET_CLOSED:
+ if(pThis->bEmitMsgOnClose) {
+ uchar *pszPeer;
+ int lenPeer;
+ errno = 0;
+ prop.GetString((*ppSess)->fromHostIP, &pszPeer, &lenPeer);
+ errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
+ (*ppSess)->pStrm, pszPeer);
+ }
+ //pthread_mutex_lock(&mut);
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ //pthread_mutex_unlock(&mut);
+ break;
+ case RS_RET_RETRY:
+ /* we simply ignore retry - this is not an error, but we also have not received anything */
+ break;
+ case RS_RET_OK:
+ /* valid data received, process it! */
+ localRet = tcps_sess.DataRcvd(*ppSess, buf, iRcvd);
+ if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) {
+ /* in this case, something went awfully wrong.
+ * We are instructed to terminate the session.
+ */
+ errmsg.LogError(0, localRet, "Tearing down TCP Session - see "
+ "previous messages for reason(s)\n");
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ }
+ break;
+ default:
+ errno = 0;
+ errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n",
+ (*ppSess)->pStrm);
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ break;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* process a single workset item
+ */
+static inline rsRetVal
+processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
+{
+ tcps_sess_t *pNewSess;
+ DEFiRet;
+
+ dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr);
+ if(pUsr == pThis->ppLstn) {
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
+ iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
+ if(iRet == RS_RET_OK) {
+ if(pPoll != NULL)
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ DBGPRINTF("tcpsrv: error %d during accept\n", iRet);
+ }
+ } else {
+ pNewSess = (tcps_sess_t*) pUsr;
+ doReceive(pThis, &pNewSess, pPoll);
+ if(pPoll == NULL && pNewSess == NULL) {
+ pThis->pSessions[idx] = NULL;
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1)
+ break;
+ pthread_mutex_unlock(&wrkrMut);
+
+ ++me->numCalled;
+ processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->pSrv = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
+/* Process a workset, that is handle io. We become activated
+ * from either select or epoll handler. We split the workload
+ * out to a pool of threads, but try to avoid context switches
+ * as much as possible.
+ */
+static rsRetVal
+processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
+{
+ int i;
+ int origEntries = numEntries;
+ DEFiRet;
+
+ dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
+
+ while(numEntries > 0) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ if(numEntries == 1) {
+ /* process self, save context switch */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr);
+ } else {
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i)
+ /*do search*/;
+ if(i < wrkrMax) {
+ /* worker free -> use it! */
+ wrkrInfo[i].pSrv = pThis;
+ wrkrInfo[i].pPoll = pPoll;
+ wrkrInfo[i].idx = workset[numEntries -1].id;
+ wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
+ /* Note: we must increment wrkrRunning HERE and not inside the worker's
+ * code. This is because a worker may actually never start, and thus
+ * increment wrkrRunning, before we finish and check the running worker
+ * count. We can only avoid this by incrementing it here.
+ */
+ ++wrkrRunning;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id,
+ workset[numEntries-1].pUsr);
+ }
+ }
+ --numEntries;
+ }
+
+ if(origEntries > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to gather input.
+ * This variant here is only used if we need to work with a netstream driver
+ * that does not support epoll().
+ */
+#pragma GCC diagnostic ignored "-Wempty-body"
+static inline rsRetVal
+RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset)
+{
+ DEFiRet;
int nfds;
int i;
+ int iWorkset;
int iTCPSess;
int bIsReady;
- tcps_sess_t *pNewSess;
nssel_t *pSel = NULL;
- ssize_t iRcvd;
+ rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
/* this is an endless loop - it is terminated by the framework canelling
* this thread. Thus, we also need to instantiate a cancel cleanup handler
- * to prevent us from leaking anything. -- rgerharsd, 20080-04-24
+ * to prevent us from leaking anything. -- rgerhards, 20080-04-24
*/
pthread_cleanup_push(RunCancelCleanup, (void*) &pSel);
while(1) {
@@ -508,12 +734,24 @@ Run(tcpsrv_t *pThis)
/* wait for io to become ready */
CHKiRet(nssel.Wait(pSel, &nfds));
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
+ iWorkset = 0;
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
+ workset[iWorkset].id = i;
+ workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
+ //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
--nfds; /* indicate we have processed one */
}
}
@@ -521,54 +759,26 @@ Run(tcpsrv_t *pThis)
/* now check the sessions */
iTCPSess = TCPSessGetNxtSess(pThis, -1);
while(nfds && iTCPSess != -1) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
localRet = nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds);
if(bIsReady || localRet != RS_RET_OK) {
- char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
- dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm);
-
- /* Receive message */
- iRet = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf), &iRcvd);
- switch(iRet) {
- case RS_RET_CLOSED:
- if(pThis->bEmitMsgOnClose) {
- uchar *pszPeer;
- int lenPeer;
- errno = 0;
- prop.GetString(pThis->pSessions[iTCPSess]->fromHostIP, &pszPeer, &lenPeer);
- errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
- pThis->pSessions[iTCPSess]->pStrm, pszPeer);
- }
- pThis->pOnRegularClose(pThis->pSessions[iTCPSess]);
- tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
- break;
- case RS_RET_RETRY:
- /* we simply ignore retry - this is not an error, but we also have not received anything */
- break;
- case RS_RET_OK:
- /* valid data received, process it! */
- localRet = tcps_sess.DataRcvd(pThis->pSessions[iTCPSess], buf, iRcvd);
- if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) {
- /* in this case, something went awfully wrong.
- * We are instructed to terminate the session.
- */
- errmsg.LogError(0, localRet, "Tearing down TCP Session %d - see "
- "previous messages for reason(s)", iTCPSess);
- pThis->pOnErrClose(pThis->pSessions[iTCPSess]);
- tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
- }
- break;
- default:
- errno = 0;
- errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n",
- pThis->pSessions[iTCPSess]->pStrm);
- pThis->pOnErrClose(pThis->pSessions[iTCPSess]);
- tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
- break;
+ workset[iWorkset].id = iTCPSess;
+ workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess];
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
}
--nfds; /* indicate we have processed one */
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
+
+ if(iWorkset > 0)
+ processWorkset(pThis, NULL, iWorkset, workset);
+
+ /* we need to copy back close descriptors */
CHKiRet(nssel.Destruct(&pSel));
finalize_it: /* this is a very special case - this time only we do not exit the function,
* because that would not help us either. So we simply retry it. Let's see
@@ -582,19 +792,93 @@ finalize_it: /* this is a very special case - this time only we do not exit the
}
/* note that this point is usually not reached */
- pthread_cleanup_pop(0); /* remove cleanup handler */
+ pthread_cleanup_pop(1); /* remove cleanup handler */
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
+/* This function is called to gather input. It tries doing that via the epoll()
+ * interface. If the driver does not support that, it falls back to calling its
+ * select() equivalent.
+ * rgerhards, 2009-11-18
+ */
+static rsRetVal
+Run(tcpsrv_t *pThis)
+{
+ DEFiRet;
+ int i;
+ nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */
+ int numEntries;
+ nspoll_t *pPoll = NULL;
+ rsRetVal localRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ /* this is an endless loop - it is terminated by the framework canelling
+ * this thread. Thus, we also need to instantiate a cancel cleanup handler
+ * to prevent us from leaking anything. -- rgerhards, 20080-04-24
+ */
+ if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) {
+ // TODO: set driver
+ localRet = nspoll.ConstructFinalize(pPoll);
+ }
+ if(localRet != RS_RET_OK) {
+ /* fall back to select */
+ dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
+ iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t));
+ FINALIZE;
+ }
+
+ dbgprintf("tcpsrv uses epoll() interface, nsdpoll driver found\n");
+
+ /* flag that we are in epoll mode */
+ pThis->bUsingEPoll = TRUE;
+
+ /* Add the TCP listen sockets to the list of sockets to monitor */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn);
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD));
+ dbgprintf("Added listener %d\n", i);
+ }
+
+ while(1) {
+ numEntries = sizeof(workset)/sizeof(nsd_epworkset_t);
+ localRet = nspoll.Wait(pPoll, -1, &numEntries, workset);
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
+
+ /* check if we need to ignore the i/o ready state. We do this if we got an invalid
+ * return state. Validly, this can happen for RS_RET_EINTR, for other cases it may
+ * not be the right thing, but what is the right thing is really hard at this point...
+ */
+ if(localRet != RS_RET_OK)
+ continue;
+
+ processWorkset(pThis, pPoll, numEntries, workset);
+ }
+
+ /* remove the tcp listen sockets from the epoll set */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL));
+ }
+
+finalize_it:
+ if(pPoll != NULL)
+ nspoll.Destruct(&pPoll);
+ RETiRet;
+}
+
+
/* Standard-Constructor */
BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
pThis->iSessMax = TCPSESS_MAX_DEFAULT;
pThis->iLstnMax = TCPLSTN_MAX_DEFAULT;
pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ pThis->bDisableLFDelim = 0;
pThis->OnMsgReceive = NULL;
+ pThis->bUseFlowControl = 1;
ENDobjConstruct(tcpsrv)
@@ -750,6 +1034,18 @@ SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*,
}
+/* set enable/disable standard LF frame delimiter (use with care!)
+ * -- rgerhards, 2010-01-03
+ */
+static rsRetVal
+SetbDisableLFDelim(tcpsrv_t *pThis, int bVal)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->bDisableLFDelim = bVal;
+ RETiRet;
+}
+
/* Set additional framing to use (if any) -- rgerhards, 2008-12-10 */
static rsRetVal
@@ -858,6 +1154,18 @@ SetLstnMax(tcpsrv_t *pThis, int iMax)
}
+/* set if flow control shall be supported
+ */
+static rsRetVal
+SetUseFlowControl(tcpsrv_t *pThis, int bUseFlowControl)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->bUseFlowControl = bUseFlowControl;
+ RETiRet;
+}
+
+
/* set max number of sessions
* this must be called before ConstructFinalize, or it will have no effect!
* rgerhards, 2009-04-09
@@ -891,7 +1199,6 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->ConstructFinalize = tcpsrvConstructFinalize;
pIf->Destruct = tcpsrvDestruct;
- //pIf->SessAccept = SessAccept;
pIf->configureTCPListen = configureTCPListen;
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;
@@ -899,7 +1206,9 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->SetUsrP = SetUsrP;
pIf->SetInputName = SetInputName;
pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
+ pIf->SetbDisableLFDelim = SetbDisableLFDelim;
pIf->SetSessMax = SetSessMax;
+ pIf->SetUseFlowControl = SetUseFlowControl;
pIf->SetLstnMax = SetLstnMax;
pIf->SetDrvrMode = SetDrvrMode;
pIf->SetDrvrAuthMode = SetDrvrAuthMode;
@@ -952,6 +1261,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
CHKiRet(objUse(netstrm, DONT_LOAD_LIB));
CHKiRet(objUse(nssel, DONT_LOAD_LIB));
+ CHKiRet(objUse(nspoll, DONT_LOAD_LIB));
CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB));
CHKiRet(objUse(conf, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
@@ -964,11 +1274,50 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
ENDObjClassInit(tcpsrv)
-/* --------------- here now comes the plumbing that makes as a library module --------------- */
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+startWorkerPool(void)
+{
+ int i;
+ wrkrRunning = 0;
+ pthread_mutex_init(&wrkrMut, NULL);
+ pthread_cond_init(&wrkrIdle, NULL);
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].pSrv = NULL;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i]));
+ }
+
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+/* --------------- here now comes the plumbing that makes as a library module --------------- */
+
BEGINmodExit
CODESTARTmodExit
+dbgprintf("tcpsrv: modExit\n");
+ stopWorkerPool();
+
/* de-init in reverse order! */
tcpsrvClassExit();
tcps_sessClassExit();
@@ -988,6 +1337,9 @@ CODESTARTmodInit
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(tcps_sessClassInit(pModInfo));
CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+
+ startWorkerPool();
+
ENDmodInit
/* vim:set ai: