diff options
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 486 |
1 files changed, 419 insertions, 67 deletions
@@ -15,12 +15,9 @@ * callbacks before the code is run. The tcpsrv then calls back * into the specific input modules at the appropriate time. * - * NOTE: read comments in module-template.h to understand how this file - * works! - * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * - * Copyright 2007, 2008, 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -50,6 +47,7 @@ #include <ctype.h> #include <netinet/in.h> #include <netdb.h> +#include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #if HAVE_FCNTL_H @@ -68,11 +66,14 @@ #include "netstrms.h" #include "netstrm.h" #include "nssel.h" +#include "nspoll.h" #include "errmsg.h" #include "ruleset.h" #include "unicode-helper.h" + MODULE_TYPE_LIB +MODULE_TYPE_NOKEEP /* defines */ #define TCPSESS_MAX_DEFAULT 200 /* default for nbr of tcp sessions if no number is given */ @@ -89,9 +90,27 @@ DEFobjCurrIf(net) DEFobjCurrIf(netstrms) DEFobjCurrIf(netstrm) DEFobjCurrIf(nssel) +DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + int idx; + tcpsrv_t *pSrv; /* pSrv == NULL -> idle */ + nspoll_t *pPoll; + void *pUsr; + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[4]; +static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; +static int wrkrMax = 4; +static int wrkrRunning; + /* add new listener port to listener port list * rgerhards, 2009-05-21 */ @@ -104,7 +123,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) ISOBJ_TYPE_assert(pThis, tcpsrv); /* create entry */ - CHKmalloc(pEntry = malloc(sizeof(tcpLstnPortList_t))); + CHKmalloc(pEntry = MALLOC(sizeof(tcpLstnPortList_t))); pEntry->pszPort = pszPort; pEntry->pSrv = pThis; pEntry->pRuleset = pThis->pRuleset; @@ -165,9 +184,9 @@ TCPSessTblInit(tcpsrv_t *pThis) ISOBJ_TYPE_assert(pThis, tcpsrv); assert(pThis->pSessions == NULL); - dbgprintf("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax); + DBGPRINTF("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax); if((pThis->pSessions = (tcps_sess_t **) calloc(pThis->iSessMax, sizeof(tcps_sess_t *))) == NULL) { - dbgprintf("Error: TCPSessInit() could not alloc memory for TCP session table.\n"); + DBGPRINTF("Error: TCPSessInit() could not alloc memory for TCP session table.\n"); ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -238,11 +257,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis) if(pThis->pSessions != NULL) { /* close all TCP connections! */ - i = TCPSessGetNxtSess(pThis, -1); - while(i != -1) { - tcps_sess.Destruct(&pThis->pSessions[i]); - /* now get next... */ - i = TCPSessGetNxtSess(pThis, i); + if(!pThis->bUsingEPoll) { + i = TCPSessGetNxtSess(pThis, -1); + while(i != -1) { + tcps_sess.Destruct(&pThis->pSessions[i]); + /* now get next... */ + i = TCPSessGetNxtSess(pThis, i); + } } /* we are done with the session table - so get rid of it... */ @@ -412,7 +433,7 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, * rgerhards, 2005-09-26 */ if(!pThis->pIsPermittedHost((struct sockaddr*) addr, (char*) fromHostFQDN, pThis->pUsr, pSess->pUsr)) { - dbgprintf("%s is not an allowed sender\n", fromHostFQDN); + DBGPRINTF("%s is not an allowed sender\n", fromHostFQDN); if(glbl.GetOption_DisallowWarning()) { errno = 0; errmsg.LogError(0, RS_RET_HOST_NOT_PERMITTED, "TCP message from disallowed sender %s discarded", fromHostFQDN); @@ -438,7 +459,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, } *ppSess = pSess; - pThis->pSessions[iSess] = pSess; + if(!pThis->bUsingEPoll) + pThis->pSessions[iSess] = pSess; pSess = NULL; /* this is now also handed over */ finalize_it: @@ -465,26 +487,230 @@ RunCancelCleanup(void *arg) } -/* This function is called to gather input. */ -#pragma GCC diagnostic ignored "-Wempty-body" +/* helper to close a session. Takes status of poll vs. select into consideration. + * rgerhards, 2009-11-25 + */ +static inline rsRetVal +closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { + DEFiRet; + if(pPoll != NULL) { + CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); + } + pThis->pOnRegularClose(*ppSess); + tcps_sess.Destruct(ppSess); +finalize_it: + RETiRet; +} + + +/* process a receive request on one of the streams + * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need + * to remove any descriptor we close from the epoll set. + * rgerhards, 2009-07-020 + */ static rsRetVal -Run(tcpsrv_t *pThis) +doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { - DEFiRet; + char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */ + ssize_t iRcvd; rsRetVal localRet; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, tcpsrv); + DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm); + /* Receive message */ + iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd); + switch(iRet) { + case RS_RET_CLOSED: + if(pThis->bEmitMsgOnClose) { + uchar *pszPeer; + int lenPeer; + errno = 0; + prop.GetString((*ppSess)->fromHostIP, &pszPeer, &lenPeer); + errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", + (*ppSess)->pStrm, pszPeer); + } + //pthread_mutex_lock(&mut); + CHKiRet(closeSess(pThis, ppSess, pPoll)); + //pthread_mutex_unlock(&mut); + break; + case RS_RET_RETRY: + /* we simply ignore retry - this is not an error, but we also have not received anything */ + break; + case RS_RET_OK: + /* valid data received, process it! */ + localRet = tcps_sess.DataRcvd(*ppSess, buf, iRcvd); + if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) { + /* in this case, something went awfully wrong. + * We are instructed to terminate the session. + */ + errmsg.LogError(0, localRet, "Tearing down TCP Session - see " + "previous messages for reason(s)\n"); + CHKiRet(closeSess(pThis, ppSess, pPoll)); + } + break; + default: + errno = 0; + errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n", + (*ppSess)->pStrm); + CHKiRet(closeSess(pThis, ppSess, pPoll)); + break; + } + +finalize_it: + RETiRet; +} + +/* process a single workset item + */ +static inline rsRetVal +processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) +{ + tcps_sess_t *pNewSess; + DEFiRet; + + dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); + if(pUsr == pThis->ppLstn) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); + iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); + if(iRet == RS_RET_OK) { + if(pPoll != NULL) + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + DBGPRINTF("tcpsrv: error %d during accept\n", iRet); + } + } else { + pNewSess = (tcps_sess_t*) pUsr; + doReceive(pThis, &pNewSess, pPoll); + if(pPoll == NULL && pNewSess == NULL) { + pThis->pSessions[idx] = NULL; + } + } + +finalize_it: + RETiRet; +} + + +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + pthread_mutex_unlock(&wrkrMut); + + ++me->numCalled; + processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr); + + pthread_mutex_lock(&wrkrMut); + me->pSrv = NULL; /* indicate we are free again */ + --wrkrRunning; + pthread_cond_signal(&wrkrIdle); + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + +/* Process a workset, that is handle io. We become activated + * from either select or epoll handler. We split the workload + * out to a pool of threads, but try to avoid context switches + * as much as possible. + */ +static rsRetVal +processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) +{ + int i; + int origEntries = numEntries; + DEFiRet; + + dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); + + while(numEntries > 0) { + if(glbl.GetGlobalInputTermState() == 1) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + if(numEntries == 1) { + /* process self, save context switch */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); + } else { + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) + /*do search*/; + if(i < wrkrMax) { + /* worker free -> use it! */ + wrkrInfo[i].pSrv = pThis; + wrkrInfo[i].pPoll = pPoll; + wrkrInfo[i].idx = workset[numEntries -1].id; + wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; + /* Note: we must increment wrkrRunning HERE and not inside the worker's + * code. This is because a worker may actually never start, and thus + * increment wrkrRunning, before we finish and check the running worker + * count. We can only avoid this by incrementing it here. + */ + ++wrkrRunning; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, + workset[numEntries-1].pUsr); + } + } + --numEntries; + } + + if(origEntries > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); + while(wrkrRunning > 0) { + pthread_cond_wait(&wrkrIdle, &wrkrMut); + } + pthread_mutex_unlock(&wrkrMut); + } + +finalize_it: + RETiRet; +} + + +/* This function is called to gather input. + * This variant here is only used if we need to work with a netstream driver + * that does not support epoll(). + */ +#pragma GCC diagnostic ignored "-Wempty-body" +static inline rsRetVal +RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) +{ + DEFiRet; int nfds; int i; + int iWorkset; int iTCPSess; int bIsReady; - tcps_sess_t *pNewSess; nssel_t *pSel = NULL; - ssize_t iRcvd; + rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler - * to prevent us from leaking anything. -- rgerharsd, 20080-04-24 + * to prevent us from leaking anything. -- rgerhards, 20080-04-24 */ pthread_cleanup_push(RunCancelCleanup, (void*) &pSel); while(1) { @@ -508,12 +734,24 @@ Run(tcpsrv_t *pThis) /* wait for io to become ready */ CHKiRet(nssel.Wait(pSel, &nfds)); + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ + iWorkset = 0; for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + if(glbl.GetGlobalInputTermState() == 1) + ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + workset[iWorkset].id = i; + workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */ + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } + //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); --nfds; /* indicate we have processed one */ } } @@ -521,54 +759,26 @@ Run(tcpsrv_t *pThis) /* now check the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(nfds && iTCPSess != -1) { + if(glbl.GetGlobalInputTermState() == 1) + ABORT_FINALIZE(RS_RET_FORCE_TERM); localRet = nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds); if(bIsReady || localRet != RS_RET_OK) { - char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */ - dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm); - - /* Receive message */ - iRet = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf), &iRcvd); - switch(iRet) { - case RS_RET_CLOSED: - if(pThis->bEmitMsgOnClose) { - uchar *pszPeer; - int lenPeer; - errno = 0; - prop.GetString(pThis->pSessions[iTCPSess]->fromHostIP, &pszPeer, &lenPeer); - errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", - pThis->pSessions[iTCPSess]->pStrm, pszPeer); - } - pThis->pOnRegularClose(pThis->pSessions[iTCPSess]); - tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); - break; - case RS_RET_RETRY: - /* we simply ignore retry - this is not an error, but we also have not received anything */ - break; - case RS_RET_OK: - /* valid data received, process it! */ - localRet = tcps_sess.DataRcvd(pThis->pSessions[iTCPSess], buf, iRcvd); - if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) { - /* in this case, something went awfully wrong. - * We are instructed to terminate the session. - */ - errmsg.LogError(0, localRet, "Tearing down TCP Session %d - see " - "previous messages for reason(s)", iTCPSess); - pThis->pOnErrClose(pThis->pSessions[iTCPSess]); - tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); - } - break; - default: - errno = 0; - errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n", - pThis->pSessions[iTCPSess]->pStrm); - pThis->pOnErrClose(pThis->pSessions[iTCPSess]); - tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); - break; + workset[iWorkset].id = iTCPSess; + workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess]; + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; } --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } + + if(iWorkset > 0) + processWorkset(pThis, NULL, iWorkset, workset); + + /* we need to copy back close descriptors */ CHKiRet(nssel.Destruct(&pSel)); finalize_it: /* this is a very special case - this time only we do not exit the function, * because that would not help us either. So we simply retry it. Let's see @@ -582,19 +792,93 @@ finalize_it: /* this is a very special case - this time only we do not exit the } /* note that this point is usually not reached */ - pthread_cleanup_pop(0); /* remove cleanup handler */ + pthread_cleanup_pop(1); /* remove cleanup handler */ RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" +/* This function is called to gather input. It tries doing that via the epoll() + * interface. If the driver does not support that, it falls back to calling its + * select() equivalent. + * rgerhards, 2009-11-18 + */ +static rsRetVal +Run(tcpsrv_t *pThis) +{ + DEFiRet; + int i; + nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */ + int numEntries; + nspoll_t *pPoll = NULL; + rsRetVal localRet; + + ISOBJ_TYPE_assert(pThis, tcpsrv); + + /* this is an endless loop - it is terminated by the framework canelling + * this thread. Thus, we also need to instantiate a cancel cleanup handler + * to prevent us from leaking anything. -- rgerhards, 20080-04-24 + */ + if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) { + // TODO: set driver + localRet = nspoll.ConstructFinalize(pPoll); + } + if(localRet != RS_RET_OK) { + /* fall back to select */ + dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); + iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); + FINALIZE; + } + + dbgprintf("tcpsrv uses epoll() interface, nsdpoll driver found\n"); + + /* flag that we are in epoll mode */ + pThis->bUsingEPoll = TRUE; + + /* Add the TCP listen sockets to the list of sockets to monitor */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); + dbgprintf("Added listener %d\n", i); + } + + while(1) { + numEntries = sizeof(workset)/sizeof(nsd_epworkset_t); + localRet = nspoll.Wait(pPoll, -1, &numEntries, workset); + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ + + /* check if we need to ignore the i/o ready state. We do this if we got an invalid + * return state. Validly, this can happen for RS_RET_EINTR, for other cases it may + * not be the right thing, but what is the right thing is really hard at this point... + */ + if(localRet != RS_RET_OK) + continue; + + processWorkset(pThis, pPoll, numEntries, workset); + } + + /* remove the tcp listen sockets from the epoll set */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL)); + } + +finalize_it: + if(pPoll != NULL) + nspoll.Destruct(&pPoll); + RETiRet; +} + + /* Standard-Constructor */ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */ pThis->iSessMax = TCPSESS_MAX_DEFAULT; pThis->iLstnMax = TCPLSTN_MAX_DEFAULT; pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; + pThis->bDisableLFDelim = 0; pThis->OnMsgReceive = NULL; + pThis->bUseFlowControl = 1; ENDobjConstruct(tcpsrv) @@ -750,6 +1034,18 @@ SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, } +/* set enable/disable standard LF frame delimiter (use with care!) + * -- rgerhards, 2010-01-03 + */ +static rsRetVal +SetbDisableLFDelim(tcpsrv_t *pThis, int bVal) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, tcpsrv); + pThis->bDisableLFDelim = bVal; + RETiRet; +} + /* Set additional framing to use (if any) -- rgerhards, 2008-12-10 */ static rsRetVal @@ -858,6 +1154,18 @@ SetLstnMax(tcpsrv_t *pThis, int iMax) } +/* set if flow control shall be supported + */ +static rsRetVal +SetUseFlowControl(tcpsrv_t *pThis, int bUseFlowControl) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, tcpsrv); + pThis->bUseFlowControl = bUseFlowControl; + RETiRet; +} + + /* set max number of sessions * this must be called before ConstructFinalize, or it will have no effect! * rgerhards, 2009-04-09 @@ -891,7 +1199,6 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->ConstructFinalize = tcpsrvConstructFinalize; pIf->Destruct = tcpsrvDestruct; - //pIf->SessAccept = SessAccept; pIf->configureTCPListen = configureTCPListen; pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; @@ -899,7 +1206,9 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->SetUsrP = SetUsrP; pIf->SetInputName = SetInputName; pIf->SetAddtlFrameDelim = SetAddtlFrameDelim; + pIf->SetbDisableLFDelim = SetbDisableLFDelim; pIf->SetSessMax = SetSessMax; + pIf->SetUseFlowControl = SetUseFlowControl; pIf->SetLstnMax = SetLstnMax; pIf->SetDrvrMode = SetDrvrMode; pIf->SetDrvrAuthMode = SetDrvrAuthMode; @@ -952,6 +1261,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME)); CHKiRet(objUse(netstrm, DONT_LOAD_LIB)); CHKiRet(objUse(nssel, DONT_LOAD_LIB)); + CHKiRet(objUse(nspoll, DONT_LOAD_LIB)); CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB)); CHKiRet(objUse(conf, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); @@ -964,11 +1274,50 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE ENDObjClassInit(tcpsrv) -/* --------------- here now comes the plumbing that makes as a library module --------------- */ +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + wrkrRunning = 0; + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrIdle, NULL); + for(i = 0 ; i < wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].pSrv = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + } + +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrIdle); + pthread_mutex_destroy(&wrkrMut); + +} +/* --------------- here now comes the plumbing that makes as a library module --------------- */ + BEGINmodExit CODESTARTmodExit +dbgprintf("tcpsrv: modExit\n"); + stopWorkerPool(); + /* de-init in reverse order! */ tcpsrvClassExit(); tcps_sessClassExit(); @@ -988,6 +1337,9 @@ CODESTARTmodInit /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(tcps_sessClassInit(pModInfo)); CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ + + startWorkerPool(); + ENDmodInit /* vim:set ai: |