summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-11-23 15:33:52 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2009-11-23 15:33:52 +0100
commite0d77fa90cad334f308da9cbd4369d61f1c97511 (patch)
tree2cf61bc271b7a50f3b56f3da3d3e90c9ca40cb3d /tcpsrv.c
parent02e4a98bac7329f6ab4bb3503839aba7e87881e5 (diff)
downloadrsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.tar.gz
rsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.tar.xz
rsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.zip
milestone commit: first working version with epoll/tcp
... but not well-tested, so there may be many hidden bugs.
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c106
1 files changed, 37 insertions, 69 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index e6fdd087..9d51d83b 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -237,11 +237,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
if(pThis->pSessions != NULL) {
/* close all TCP connections! */
- i = TCPSessGetNxtSess(pThis, -1);
- while(i != -1) {
- tcps_sess.Destruct(&pThis->pSessions[i]);
- /* now get next... */
- i = TCPSessGetNxtSess(pThis, i);
+ if(!pThis->bUsingEPoll) {
+ i = TCPSessGetNxtSess(pThis, -1);
+ while(i != -1) {
+ tcps_sess.Destruct(&pThis->pSessions[i]);
+ /* now get next... */
+ i = TCPSessGetNxtSess(pThis, i);
+ }
}
/* we are done with the session table - so get rid of it... */
@@ -437,7 +439,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
}
*ppSess = pSess;
- pThis->pSessions[iSess] = pSess;
+ if(!pThis->bUsingEPoll)
+ pThis->pSessions[iSess] = pSess;
pSess = NULL; /* this is now also handed over */
finalize_it:
@@ -465,10 +468,12 @@ RunCancelCleanup(void *arg)
/* process a receive request on one of the streams
+ * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need
+ * to remove any descriptor we close from the epoll set.
* rgerhards, 2009-07-020
*/
static rsRetVal
-doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess)
+doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
{
char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
ssize_t iRcvd;
@@ -490,6 +495,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess)
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
(*ppSess)->pStrm, pszPeer);
}
+ if(pPoll != NULL) {
+ CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
+ }
pThis->pOnRegularClose(*ppSess);
tcps_sess.Destruct(ppSess);
break;
@@ -516,6 +524,8 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess)
tcps_sess.Destruct(ppSess);
break;
}
+
+finalize_it:
RETiRet;
}
@@ -585,7 +595,7 @@ RunSelect(tcpsrv_t *pThis)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- doReceive(pThis, &pThis->pSessions[iTCPSess]);
+ doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
--nfds; /* indicate we have processed one */
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
@@ -613,17 +623,14 @@ finalize_it: /* this is a very special case - this time only we do not exit the
* select() equivalent.
* rgerhards, 2009-11-18
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
static rsRetVal
Run(tcpsrv_t *pThis)
{
DEFiRet;
- int nfds;
int i;
- int iTCPSess;
- int bIsReady;
tcps_sess_t *pNewSess;
nspoll_t *pPoll = NULL;
+ void *pUsr;
rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -632,8 +639,6 @@ Run(tcpsrv_t *pThis)
* this thread. Thus, we also need to instantiate a cancel cleanup handler
* to prevent us from leaking anything. -- rgerhards, 20080-04-24
*/
-#warning implement cancel cleanup handler!
- //pthread_cleanup_push(RunCancelCleanup, (void*) &pSel);
if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) {
// TODO: set driver
localRet = nspoll.ConstructFinalize(pPoll);
@@ -647,15 +652,18 @@ Run(tcpsrv_t *pThis)
dbgprintf("we would use the poll handler, currently not implemented!\n");
+ /* flag that we are in epoll mode */
+ pThis->bUsingEPoll = TRUE;
+
/* Add the TCP listen sockets to the list of sockets to monitor */
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
dbgprintf("Trying to add listener %d\n", i);
- CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, &pThis->ppLstn, NSDPOLL_IN));
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD));
dbgprintf("Added listener %d\n", i);
}
while(1) {
- localRet = nspoll.Wait(pSel, &nfds);
+ localRet = nspoll.Wait(pPoll, -1, &i, &pUsr);
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
@@ -665,69 +673,30 @@ Run(tcpsrv_t *pThis)
*/
if(localRet != RS_RET_OK)
continue;
- }
-#if 0
- while(1) {
- CHKiRet(nssel.Construct(&pSel));
- CHKiRet(nssel.ConstructFinalize(pSel));
+ dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr);
- /* do the sessions */
- iTCPSess = TCPSessGetNxtSess(pThis, -1);
- while(iTCPSess != -1) {
- /* TODO: access to pNsd is NOT really CLEAN, use method... */
- CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD));
- /* now get next... */
- iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
- }
-
- /* wait for io to become ready */
- CHKiRet(nssel.Wait(pSel, &nfds));
- if(glbl.GetGlobalInputTermState() == 1)
- break; /* terminate input! */
-
- for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
- if(glbl.GetGlobalInputTermState() == 1)
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
- if(bIsReady) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
- --nfds; /* indicate we have processed one */
- }
+ if(pUsr == pThis->ppLstn) {
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ pNewSess = (tcps_sess_t*) pUsr;
+ doReceive(pThis, &pNewSess, pPoll);
}
+ }
- /* now check the sessions */
- iTCPSess = TCPSessGetNxtSess(pThis, -1);
- while(nfds && iTCPSess != -1) {
- if(glbl.GetGlobalInputTermState() == 1)
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
- if(bIsReady) {
- doReceive(pThis, &pThis->pSessions[iTCPSess]);
- --nfds; /* indicate we have processed one */
- }
- iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
- }
- CHKiRet(nssel.Destruct(&pSel));
-finalize_it: /* this is a very special case - this time only we do not exit the function,
- * because that would not help us either. So we simply retry it. Let's see
- * if that actually is a better idea. Exiting the loop wasn't we always
- * crashed, which made sense (the rest of the engine was not prepared for
- * that) -- rgerhards, 2008-05-19
- */
- /*EMPTY*/;
+ /* remove the tcp listen sockets from the epoll set */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL));
}
-#endif
- /* note that this point is usually not reached */
-// pthread_cleanup_pop(1); /* remove cleanup handler */
finalize_it:
if(pPoll != NULL)
nspoll.Destruct(&pPoll);
RETiRet;
}
-#pragma GCC diagnostic warning "-Wempty-body"
/* Standard-Constructor */
@@ -1032,7 +1001,6 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->ConstructFinalize = tcpsrvConstructFinalize;
pIf->Destruct = tcpsrvDestruct;
- //pIf->SessAccept = SessAccept;
pIf->configureTCPListen = configureTCPListen;
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;