diff options
-rw-r--r-- | runtime/nsd.h | 2 | ||||
-rw-r--r-- | runtime/nsdpoll_ptcp.c | 60 | ||||
-rw-r--r-- | runtime/nspoll.c | 4 | ||||
-rw-r--r-- | runtime/nspoll.h | 2 | ||||
-rw-r--r-- | tcpsrv.c | 106 | ||||
-rw-r--r-- | tcpsrv.h | 1 |
6 files changed, 92 insertions, 83 deletions
diff --git a/runtime/nsd.h b/runtime/nsd.h index fc34ad6e..e5b9320b 100644 --- a/runtime/nsd.h +++ b/runtime/nsd.h @@ -91,7 +91,7 @@ ENDinterface(nsdsel) BEGINinterface(nsdpoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(nsdpoll_t **ppThis); rsRetVal (*Destruct)(nsdpoll_t **ppThis); - rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int op); + rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op); rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr); ENDinterface(nsdpoll) #define nsdpollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index ed683acc..95fd8aa3 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -78,14 +78,44 @@ finalize_it: } -/* remove the entry identified by id/pUsr from the list. - * rgerhards, 2009-11-18 +/* find and unlink the entry identified by id/pUsr from the list. + * rgerhards, 2009-11-23 + */ +static inline rsRetVal +unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **ppEvtLst) { + nsdpoll_epollevt_lst_t *pEvtLst; + nsdpoll_epollevt_lst_t *pPrev = NULL; + DEFiRet; + + pEvtLst = pThis->pRoot; + while(pEvtLst != NULL && pEvtLst->id != id && pEvtLst->pUsr != pUsr) { + pPrev = pEvtLst; + pEvtLst = pEvtLst->pNext; + } + if(pEvtLst == NULL) + ABORT_FINALIZE(RS_RET_NOT_FOUND); + + *ppEvtLst = pEvtLst; + + /* unlink */ + if(pPrev == NULL) + pThis->pRoot = pEvtLst->pNext; + else + pPrev->pNext = pEvtLst->pNext; + +finalize_it: + RETiRet; +} + + +/* destruct the provided element. It must already be unlinked from the list. + * rgerhards, 2009-11-23 */ static inline rsRetVal -delEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr) { +delEvent(nsdpoll_epollevt_lst_t **ppEvtLst) { DEFiRet; - // TODO: XXX add code! -#warning delEvent implementation is missing! + free(*ppEvtLst); + *ppEvtLst = NULL; RETiRet; } @@ -119,7 +149,7 @@ ENDobjDestruct(nsdpoll_ptcp) /* Modify socket set */ static rsRetVal -Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) { +Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) { nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll; nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd; nsdpoll_epollevt_lst_t *pEventLst; @@ -127,7 +157,7 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) { char errStr[512]; DEFiRet; - if(mode == NSDPOLL_ADD) { + if(op == NSDPOLL_ADD) { dbgprintf("adding nsdpoll entry %d/%p\n", id, pUsr); CHKiRet(addEvent(pThis, id, pUsr, mode, pSock, &pEventLst)); if(epoll_ctl(pThis->efd, EPOLL_CTL_ADD, pSock->sock, &pEventLst->event) < 0) { @@ -137,11 +167,21 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) { "epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n", pSock->sock, id, pUsr, mode, errStr); } - } else if(mode == NSDPOLL_DEL) { - // TODO: XXX : code missing! + } else if(op == NSDPOLL_DEL) { + // TODO: XXX : code missing! del Event dbgprintf("removing nsdpoll entry %d/%p\n", id, pUsr); + CHKiRet(unlinkEvent(pThis, id, pUsr, &pEventLst)); + if(epoll_ctl(pThis->efd, EPOLL_CTL_DEL, pSock->sock, &pEventLst->event) < 0) { + errSave = errno; + rs_strerror_r(errSave, errStr, sizeof(errStr)); + errmsg.LogError(errSave, RS_RET_ERR_EPOLL_CTL, + "epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n", + pSock->sock, id, pUsr, mode, errStr); + ABORT_FINALIZE(RS_RET_ERR_EPOLL_CTL); + } + CHKiRet(delEvent(&pEventLst)); } else { - dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", mode); + dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", op); ABORT_FINALIZE(RS_RET_ERR); } diff --git a/runtime/nspoll.c b/runtime/nspoll.c index b9a189cb..f287cd4e 100644 --- a/runtime/nspoll.c +++ b/runtime/nspoll.c @@ -145,10 +145,10 @@ Wait(nspoll_t *pThis, int timeout, int *idRdy, void **ppUsr) { * rgerhards, 2009-11-18 */ static rsRetVal -Ctl(nspoll_t *pThis, netstrm_t *pStrm, int id, void *pUsr, int op) { +Ctl(nspoll_t *pThis, netstrm_t *pStrm, int id, void *pUsr, int mode, int op) { DEFiRet; ISOBJ_TYPE_assert(pThis, nspoll); - iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pStrm->pDrvrData, id, pUsr, op); + iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pStrm->pDrvrData, id, pUsr, mode, op); RETiRet; } diff --git a/runtime/nspoll.h b/runtime/nspoll.h index 281b8103..a77759c0 100644 --- a/runtime/nspoll.h +++ b/runtime/nspoll.h @@ -51,7 +51,7 @@ BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*ConstructFinalize)(nspoll_t *pThis); rsRetVal (*Destruct)(nspoll_t **ppThis); rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr); - rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int op); + rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int mode, int op); rsRetVal (*IsEPollSupported)(void); /* static method */ ENDinterface(nspoll) #define nspollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ @@ -237,11 +237,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis) if(pThis->pSessions != NULL) { /* close all TCP connections! */ - i = TCPSessGetNxtSess(pThis, -1); - while(i != -1) { - tcps_sess.Destruct(&pThis->pSessions[i]); - /* now get next... */ - i = TCPSessGetNxtSess(pThis, i); + if(!pThis->bUsingEPoll) { + i = TCPSessGetNxtSess(pThis, -1); + while(i != -1) { + tcps_sess.Destruct(&pThis->pSessions[i]); + /* now get next... */ + i = TCPSessGetNxtSess(pThis, i); + } } /* we are done with the session table - so get rid of it... */ @@ -437,7 +439,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, } *ppSess = pSess; - pThis->pSessions[iSess] = pSess; + if(!pThis->bUsingEPoll) + pThis->pSessions[iSess] = pSess; pSess = NULL; /* this is now also handed over */ finalize_it: @@ -465,10 +468,12 @@ RunCancelCleanup(void *arg) /* process a receive request on one of the streams + * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need + * to remove any descriptor we close from the epoll set. * rgerhards, 2009-07-020 */ static rsRetVal -doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) +doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */ ssize_t iRcvd; @@ -490,6 +495,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } + if(pPoll != NULL) { + CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); + } pThis->pOnRegularClose(*ppSess); tcps_sess.Destruct(ppSess); break; @@ -516,6 +524,8 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) tcps_sess.Destruct(ppSess); break; } + +finalize_it: RETiRet; } @@ -585,7 +595,7 @@ RunSelect(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess]); + doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); @@ -613,17 +623,14 @@ finalize_it: /* this is a very special case - this time only we do not exit the * select() equivalent. * rgerhards, 2009-11-18 */ -#pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal Run(tcpsrv_t *pThis) { DEFiRet; - int nfds; int i; - int iTCPSess; - int bIsReady; tcps_sess_t *pNewSess; nspoll_t *pPoll = NULL; + void *pUsr; rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -632,8 +639,6 @@ Run(tcpsrv_t *pThis) * this thread. Thus, we also need to instantiate a cancel cleanup handler * to prevent us from leaking anything. -- rgerhards, 20080-04-24 */ -#warning implement cancel cleanup handler! - //pthread_cleanup_push(RunCancelCleanup, (void*) &pSel); if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) { // TODO: set driver localRet = nspoll.ConstructFinalize(pPoll); @@ -647,15 +652,18 @@ Run(tcpsrv_t *pThis) dbgprintf("we would use the poll handler, currently not implemented!\n"); + /* flag that we are in epoll mode */ + pThis->bUsingEPoll = TRUE; + /* Add the TCP listen sockets to the list of sockets to monitor */ for(i = 0 ; i < pThis->iLstnCurr ; ++i) { dbgprintf("Trying to add listener %d\n", i); - CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, &pThis->ppLstn, NSDPOLL_IN)); + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); dbgprintf("Added listener %d\n", i); } while(1) { - localRet = nspoll.Wait(pSel, &nfds); + localRet = nspoll.Wait(pPoll, -1, &i, &pUsr); if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ @@ -665,69 +673,30 @@ Run(tcpsrv_t *pThis) */ if(localRet != RS_RET_OK) continue; - } -#if 0 - while(1) { - CHKiRet(nssel.Construct(&pSel)); - CHKiRet(nssel.ConstructFinalize(pSel)); + dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr); - /* do the sessions */ - iTCPSess = TCPSessGetNxtSess(pThis, -1); - while(iTCPSess != -1) { - /* TODO: access to pNsd is NOT really CLEAN, use method... */ - CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); - /* now get next... */ - iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); - } - - /* wait for io to become ready */ - CHKiRet(nssel.Wait(pSel, &nfds)); - if(glbl.GetGlobalInputTermState() == 1) - break; /* terminate input! */ - - for(i = 0 ; i < pThis->iLstnCurr ; ++i) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); - if(bIsReady) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); - --nfds; /* indicate we have processed one */ - } + if(pUsr == pThis->ppLstn) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + pNewSess = (tcps_sess_t*) pUsr; + doReceive(pThis, &pNewSess, pPoll); } + } - /* now check the sessions */ - iTCPSess = TCPSessGetNxtSess(pThis, -1); - while(nfds && iTCPSess != -1) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); - if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess]); - --nfds; /* indicate we have processed one */ - } - iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); - } - CHKiRet(nssel.Destruct(&pSel)); -finalize_it: /* this is a very special case - this time only we do not exit the function, - * because that would not help us either. So we simply retry it. Let's see - * if that actually is a better idea. Exiting the loop wasn't we always - * crashed, which made sense (the rest of the engine was not prepared for - * that) -- rgerhards, 2008-05-19 - */ - /*EMPTY*/; + /* remove the tcp listen sockets from the epoll set */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL)); } -#endif - /* note that this point is usually not reached */ -// pthread_cleanup_pop(1); /* remove cleanup handler */ finalize_it: if(pPoll != NULL) nspoll.Destruct(&pPoll); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* Standard-Constructor */ @@ -1032,7 +1001,6 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->ConstructFinalize = tcpsrvConstructFinalize; pIf->Destruct = tcpsrvDestruct; - //pIf->SessAccept = SessAccept; pIf->configureTCPListen = configureTCPListen; pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; @@ -55,6 +55,7 @@ struct tcpsrv_s { ruleset_t *pRuleset; /**< ruleset to bind to */ permittedPeers_t *pPermPeers;/**< driver's permitted peers */ bool bEmitMsgOnClose; /**< emit an informational message when the remote peer closes connection */ + bool bUsingEPoll; /**< are we in epoll mode (means we do not need to keep track of sessions!) */ int iLstnCurr; /**< max nbr of listeners currently supported */ netstrm_t **ppLstn; /**< our netstream listners */ tcpLstnPortList_t **ppLstnPort; /**< pointer to relevant listen port description */ |