diff options
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 148 |
1 files changed, 125 insertions, 23 deletions
@@ -15,9 +15,6 @@ * callbacks before the code is run. The tcpsrv then calls back * into the specific input modules at the appropriate time. * - * NOTE: read comments in module-template.h to understand how this file - * works! - * * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) * * Copyright 2007, 2008, 2009 Rainer Gerhards and Adiscon GmbH. @@ -68,6 +65,7 @@ #include "netstrms.h" #include "netstrm.h" #include "nssel.h" +#include "nspoll.h" #include "errmsg.h" #include "ruleset.h" #include "unicode-helper.h" @@ -89,6 +87,7 @@ DEFobjCurrIf(net) DEFobjCurrIf(netstrms) DEFobjCurrIf(netstrm) DEFobjCurrIf(nssel) +DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) @@ -238,11 +237,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis) if(pThis->pSessions != NULL) { /* close all TCP connections! */ - i = TCPSessGetNxtSess(pThis, -1); - while(i != -1) { - tcps_sess.Destruct(&pThis->pSessions[i]); - /* now get next... */ - i = TCPSessGetNxtSess(pThis, i); + if(!pThis->bUsingEPoll) { + i = TCPSessGetNxtSess(pThis, -1); + while(i != -1) { + tcps_sess.Destruct(&pThis->pSessions[i]); + /* now get next... */ + i = TCPSessGetNxtSess(pThis, i); + } } /* we are done with the session table - so get rid of it... */ @@ -438,7 +439,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, } *ppSess = pSess; - pThis->pSessions[iSess] = pSess; + if(!pThis->bUsingEPoll) + pThis->pSessions[iSess] = pSess; pSess = NULL; /* this is now also handed over */ finalize_it: @@ -465,11 +467,29 @@ RunCancelCleanup(void *arg) } +/* helper to close a session. Takes status of poll vs. select into consideration. + * rgerhards, 2009-11-25 + */ +static inline rsRetVal +closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { + DEFiRet; + if(pPoll != NULL) { + CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); + } + pThis->pOnRegularClose(*ppSess); + tcps_sess.Destruct(ppSess); +finalize_it: + RETiRet; +} + + /* process a receive request on one of the streams + * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need + * to remove any descriptor we close from the epoll set. * rgerhards, 2009-07-020 */ static rsRetVal -doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) +doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */ ssize_t iRcvd; @@ -478,7 +498,6 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) ISOBJ_TYPE_assert(pThis, tcpsrv); DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm); - /* Receive message */ iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd); switch(iRet) { @@ -491,8 +510,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } - pThis->pOnRegularClose(*ppSess); - tcps_sess.Destruct(ppSess); + CHKiRet(closeSess(pThis, ppSess, pPoll)); break; case RS_RET_RETRY: /* we simply ignore retry - this is not an error, but we also have not received anything */ @@ -505,26 +523,29 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) */ errmsg.LogError(0, localRet, "Tearing down TCP Session - see " "previous messages for reason(s)\n"); - pThis->pOnErrClose(*ppSess); - tcps_sess.Destruct(ppSess); + CHKiRet(closeSess(pThis, ppSess, pPoll)); } break; default: errno = 0; errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n", (*ppSess)->pStrm); - pThis->pOnErrClose(*ppSess); - tcps_sess.Destruct(ppSess); + CHKiRet(closeSess(pThis, ppSess, pPoll)); break; } + +finalize_it: RETiRet; } -/* This function is called to gather input. */ +/* This function is called to gather input. + * This variant here is only used if we need to work with a netstream driver + * that does not support epoll(). + */ #pragma GCC diagnostic ignored "-Wempty-body" -static rsRetVal -Run(tcpsrv_t *pThis) +static inline rsRetVal +RunSelect(tcpsrv_t *pThis) { DEFiRet; int nfds; @@ -532,7 +553,7 @@ Run(tcpsrv_t *pThis) int iTCPSess; int bIsReady; tcps_sess_t *pNewSess; - nssel_t *pSel; + nssel_t *pSel = NULL; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -583,7 +604,7 @@ Run(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess]); + doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); @@ -606,6 +627,87 @@ finalize_it: /* this is a very special case - this time only we do not exit the #pragma GCC diagnostic warning "-Wempty-body" +/* This function is called to gather input. It tries doing that via the epoll() + * interface. If the driver does not support that, it falls back to calling its + * select() equivalent. + * rgerhards, 2009-11-18 + */ +static rsRetVal +Run(tcpsrv_t *pThis) +{ + DEFiRet; + int i; + tcps_sess_t *pNewSess; + nspoll_t *pPoll = NULL; + void *pUsr; + rsRetVal localRet; + + ISOBJ_TYPE_assert(pThis, tcpsrv); + + /* this is an endless loop - it is terminated by the framework canelling + * this thread. Thus, we also need to instantiate a cancel cleanup handler + * to prevent us from leaking anything. -- rgerhards, 20080-04-24 + */ + if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) { + // TODO: set driver + localRet = nspoll.ConstructFinalize(pPoll); + } + if(localRet != RS_RET_OK) { + /* fall back to select */ + dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); + iRet = RunSelect(pThis); + FINALIZE; + } + + dbgprintf("tcpsrv uses epoll() interface, nsdpol driver found\n"); + + /* flag that we are in epoll mode */ + pThis->bUsingEPoll = TRUE; + + /* Add the TCP listen sockets to the list of sockets to monitor */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); + dbgprintf("Added listener %d\n", i); + } + + while(1) { + localRet = nspoll.Wait(pPoll, -1, &i, &pUsr); + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ + + /* check if we need to ignore the i/o ready state. We do this if we got an invalid + * return state. Validly, this can happen for RS_RET_EINTR, for other cases it may + * not be the right thing, but what is the right thing is really hard at this point... + */ + if(localRet != RS_RET_OK) + continue; + + dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr); + + if(pUsr == pThis->ppLstn) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + pNewSess = (tcps_sess_t*) pUsr; + doReceive(pThis, &pNewSess, pPoll); + } + } + + /* remove the tcp listen sockets from the epoll set */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL)); + } + +finalize_it: + if(pPoll != NULL) + nspoll.Destruct(&pPoll); + RETiRet; +} + + /* Standard-Constructor */ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */ pThis->iSessMax = TCPSESS_MAX_DEFAULT; @@ -908,7 +1010,6 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->ConstructFinalize = tcpsrvConstructFinalize; pIf->Destruct = tcpsrvDestruct; - //pIf->SessAccept = SessAccept; pIf->configureTCPListen = configureTCPListen; pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; @@ -969,6 +1070,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME)); CHKiRet(objUse(netstrm, DONT_LOAD_LIB)); CHKiRet(objUse(nssel, DONT_LOAD_LIB)); + CHKiRet(objUse(nspoll, DONT_LOAD_LIB)); CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB)); CHKiRet(objUse(conf, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); |