diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-11-23 15:33:52 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-11-23 15:33:52 +0100 |
commit | e0d77fa90cad334f308da9cbd4369d61f1c97511 (patch) | |
tree | 2cf61bc271b7a50f3b56f3da3d3e90c9ca40cb3d /tcpsrv.c | |
parent | 02e4a98bac7329f6ab4bb3503839aba7e87881e5 (diff) | |
download | rsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.tar.gz rsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.tar.xz rsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.zip |
milestone commit: first working version with epoll/tcp
... but not well-tested, so there may be many hidden bugs.
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 106 |
1 files changed, 37 insertions, 69 deletions
@@ -237,11 +237,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis) if(pThis->pSessions != NULL) { /* close all TCP connections! */ - i = TCPSessGetNxtSess(pThis, -1); - while(i != -1) { - tcps_sess.Destruct(&pThis->pSessions[i]); - /* now get next... */ - i = TCPSessGetNxtSess(pThis, i); + if(!pThis->bUsingEPoll) { + i = TCPSessGetNxtSess(pThis, -1); + while(i != -1) { + tcps_sess.Destruct(&pThis->pSessions[i]); + /* now get next... */ + i = TCPSessGetNxtSess(pThis, i); + } } /* we are done with the session table - so get rid of it... */ @@ -437,7 +439,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, } *ppSess = pSess; - pThis->pSessions[iSess] = pSess; + if(!pThis->bUsingEPoll) + pThis->pSessions[iSess] = pSess; pSess = NULL; /* this is now also handed over */ finalize_it: @@ -465,10 +468,12 @@ RunCancelCleanup(void *arg) /* process a receive request on one of the streams + * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need + * to remove any descriptor we close from the epoll set. * rgerhards, 2009-07-020 */ static rsRetVal -doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) +doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */ ssize_t iRcvd; @@ -490,6 +495,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } + if(pPoll != NULL) { + CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); + } pThis->pOnRegularClose(*ppSess); tcps_sess.Destruct(ppSess); break; @@ -516,6 +524,8 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess) tcps_sess.Destruct(ppSess); break; } + +finalize_it: RETiRet; } @@ -585,7 +595,7 @@ RunSelect(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess]); + doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); @@ -613,17 +623,14 @@ finalize_it: /* this is a very special case - this time only we do not exit the * select() equivalent. * rgerhards, 2009-11-18 */ -#pragma GCC diagnostic ignored "-Wempty-body" static rsRetVal Run(tcpsrv_t *pThis) { DEFiRet; - int nfds; int i; - int iTCPSess; - int bIsReady; tcps_sess_t *pNewSess; nspoll_t *pPoll = NULL; + void *pUsr; rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -632,8 +639,6 @@ Run(tcpsrv_t *pThis) * this thread. Thus, we also need to instantiate a cancel cleanup handler * to prevent us from leaking anything. -- rgerhards, 20080-04-24 */ -#warning implement cancel cleanup handler! - //pthread_cleanup_push(RunCancelCleanup, (void*) &pSel); if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) { // TODO: set driver localRet = nspoll.ConstructFinalize(pPoll); @@ -647,15 +652,18 @@ Run(tcpsrv_t *pThis) dbgprintf("we would use the poll handler, currently not implemented!\n"); + /* flag that we are in epoll mode */ + pThis->bUsingEPoll = TRUE; + /* Add the TCP listen sockets to the list of sockets to monitor */ for(i = 0 ; i < pThis->iLstnCurr ; ++i) { dbgprintf("Trying to add listener %d\n", i); - CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, &pThis->ppLstn, NSDPOLL_IN)); + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); dbgprintf("Added listener %d\n", i); } while(1) { - localRet = nspoll.Wait(pSel, &nfds); + localRet = nspoll.Wait(pPoll, -1, &i, &pUsr); if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ @@ -665,69 +673,30 @@ Run(tcpsrv_t *pThis) */ if(localRet != RS_RET_OK) continue; - } -#if 0 - while(1) { - CHKiRet(nssel.Construct(&pSel)); - CHKiRet(nssel.ConstructFinalize(pSel)); + dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr); - /* do the sessions */ - iTCPSess = TCPSessGetNxtSess(pThis, -1); - while(iTCPSess != -1) { - /* TODO: access to pNsd is NOT really CLEAN, use method... */ - CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); - /* now get next... */ - iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); - } - - /* wait for io to become ready */ - CHKiRet(nssel.Wait(pSel, &nfds)); - if(glbl.GetGlobalInputTermState() == 1) - break; /* terminate input! */ - - for(i = 0 ; i < pThis->iLstnCurr ; ++i) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); - if(bIsReady) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); - --nfds; /* indicate we have processed one */ - } + if(pUsr == pThis->ppLstn) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + pNewSess = (tcps_sess_t*) pUsr; + doReceive(pThis, &pNewSess, pPoll); } + } - /* now check the sessions */ - iTCPSess = TCPSessGetNxtSess(pThis, -1); - while(nfds && iTCPSess != -1) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); - if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess]); - --nfds; /* indicate we have processed one */ - } - iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); - } - CHKiRet(nssel.Destruct(&pSel)); -finalize_it: /* this is a very special case - this time only we do not exit the function, - * because that would not help us either. So we simply retry it. Let's see - * if that actually is a better idea. Exiting the loop wasn't we always - * crashed, which made sense (the rest of the engine was not prepared for - * that) -- rgerhards, 2008-05-19 - */ - /*EMPTY*/; + /* remove the tcp listen sockets from the epoll set */ + for(i = 0 ; i < pThis->iLstnCurr ; ++i) { + CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL)); } -#endif - /* note that this point is usually not reached */ -// pthread_cleanup_pop(1); /* remove cleanup handler */ finalize_it: if(pPoll != NULL) nspoll.Destruct(&pPoll); RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" /* Standard-Constructor */ @@ -1032,7 +1001,6 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->ConstructFinalize = tcpsrvConstructFinalize; pIf->Destruct = tcpsrvDestruct; - //pIf->SessAccept = SessAccept; pIf->configureTCPListen = configureTCPListen; pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; |