summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-11-23 15:33:52 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2009-11-23 15:33:52 +0100
commite0d77fa90cad334f308da9cbd4369d61f1c97511 (patch)
tree2cf61bc271b7a50f3b56f3da3d3e90c9ca40cb3d
parent02e4a98bac7329f6ab4bb3503839aba7e87881e5 (diff)
downloadrsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.tar.gz
rsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.tar.xz
rsyslog-e0d77fa90cad334f308da9cbd4369d61f1c97511.zip
milestone commit: first working version with epoll/tcp
... but not well-tested, so there may be many hidden bugs.
-rw-r--r--runtime/nsd.h2
-rw-r--r--runtime/nsdpoll_ptcp.c60
-rw-r--r--runtime/nspoll.c4
-rw-r--r--runtime/nspoll.h2
-rw-r--r--tcpsrv.c106
-rw-r--r--tcpsrv.h1
6 files changed, 92 insertions, 83 deletions
diff --git a/runtime/nsd.h b/runtime/nsd.h
index fc34ad6e..e5b9320b 100644
--- a/runtime/nsd.h
+++ b/runtime/nsd.h
@@ -91,7 +91,7 @@ ENDinterface(nsdsel)
BEGINinterface(nsdpoll) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(nsdpoll_t **ppThis);
rsRetVal (*Destruct)(nsdpoll_t **ppThis);
- rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int op);
+ rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op);
rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr);
ENDinterface(nsdpoll)
#define nsdpollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c
index ed683acc..95fd8aa3 100644
--- a/runtime/nsdpoll_ptcp.c
+++ b/runtime/nsdpoll_ptcp.c
@@ -78,14 +78,44 @@ finalize_it:
}
-/* remove the entry identified by id/pUsr from the list.
- * rgerhards, 2009-11-18
+/* find and unlink the entry identified by id/pUsr from the list.
+ * rgerhards, 2009-11-23
+ */
+static inline rsRetVal
+unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **ppEvtLst) {
+ nsdpoll_epollevt_lst_t *pEvtLst;
+ nsdpoll_epollevt_lst_t *pPrev = NULL;
+ DEFiRet;
+
+ pEvtLst = pThis->pRoot;
+ while(pEvtLst != NULL && pEvtLst->id != id && pEvtLst->pUsr != pUsr) {
+ pPrev = pEvtLst;
+ pEvtLst = pEvtLst->pNext;
+ }
+ if(pEvtLst == NULL)
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
+
+ *ppEvtLst = pEvtLst;
+
+ /* unlink */
+ if(pPrev == NULL)
+ pThis->pRoot = pEvtLst->pNext;
+ else
+ pPrev->pNext = pEvtLst->pNext;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* destruct the provided element. It must already be unlinked from the list.
+ * rgerhards, 2009-11-23
*/
static inline rsRetVal
-delEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr) {
+delEvent(nsdpoll_epollevt_lst_t **ppEvtLst) {
DEFiRet;
- // TODO: XXX add code!
-#warning delEvent implementation is missing!
+ free(*ppEvtLst);
+ *ppEvtLst = NULL;
RETiRet;
}
@@ -119,7 +149,7 @@ ENDobjDestruct(nsdpoll_ptcp)
/* Modify socket set */
static rsRetVal
-Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) {
+Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) {
nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll;
nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
nsdpoll_epollevt_lst_t *pEventLst;
@@ -127,7 +157,7 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) {
char errStr[512];
DEFiRet;
- if(mode == NSDPOLL_ADD) {
+ if(op == NSDPOLL_ADD) {
dbgprintf("adding nsdpoll entry %d/%p\n", id, pUsr);
CHKiRet(addEvent(pThis, id, pUsr, mode, pSock, &pEventLst));
if(epoll_ctl(pThis->efd, EPOLL_CTL_ADD, pSock->sock, &pEventLst->event) < 0) {
@@ -137,11 +167,21 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode) {
"epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n",
pSock->sock, id, pUsr, mode, errStr);
}
- } else if(mode == NSDPOLL_DEL) {
- // TODO: XXX : code missing!
+ } else if(op == NSDPOLL_DEL) {
+ // TODO: XXX : code missing! del Event
dbgprintf("removing nsdpoll entry %d/%p\n", id, pUsr);
+ CHKiRet(unlinkEvent(pThis, id, pUsr, &pEventLst));
+ if(epoll_ctl(pThis->efd, EPOLL_CTL_DEL, pSock->sock, &pEventLst->event) < 0) {
+ errSave = errno;
+ rs_strerror_r(errSave, errStr, sizeof(errStr));
+ errmsg.LogError(errSave, RS_RET_ERR_EPOLL_CTL,
+ "epoll_ctl failed on fd %d, id %d/%p, op %d with %s\n",
+ pSock->sock, id, pUsr, mode, errStr);
+ ABORT_FINALIZE(RS_RET_ERR_EPOLL_CTL);
+ }
+ CHKiRet(delEvent(&pEventLst));
} else {
- dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", mode);
+ dbgprintf("program error: invalid NSDPOLL_mode %d - ignoring request\n", op);
ABORT_FINALIZE(RS_RET_ERR);
}
diff --git a/runtime/nspoll.c b/runtime/nspoll.c
index b9a189cb..f287cd4e 100644
--- a/runtime/nspoll.c
+++ b/runtime/nspoll.c
@@ -145,10 +145,10 @@ Wait(nspoll_t *pThis, int timeout, int *idRdy, void **ppUsr) {
* rgerhards, 2009-11-18
*/
static rsRetVal
-Ctl(nspoll_t *pThis, netstrm_t *pStrm, int id, void *pUsr, int op) {
+Ctl(nspoll_t *pThis, netstrm_t *pStrm, int id, void *pUsr, int mode, int op) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, nspoll);
- iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pStrm->pDrvrData, id, pUsr, op);
+ iRet = pThis->Drvr.Ctl(pThis->pDrvrData, pStrm->pDrvrData, id, pUsr, mode, op);
RETiRet;
}
diff --git a/runtime/nspoll.h b/runtime/nspoll.h
index 281b8103..a77759c0 100644
--- a/runtime/nspoll.h
+++ b/runtime/nspoll.h
@@ -51,7 +51,7 @@ BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */
rsRetVal (*ConstructFinalize)(nspoll_t *pThis);
rsRetVal (*Destruct)(nspoll_t **ppThis);
rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *idRdy, void **ppUsr);
- rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int op);
+ rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int mode, int op);
rsRetVal (*IsEPollSupported)(void); /* static method */
ENDinterface(nspoll)
#define nspollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
diff --git a/tcpsrv.c b/tcpsrv.c
index e6fdd087..9d51d83b 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -237,11 +237,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
if(pThis->pSessions != NULL) {
/* close all TCP connections! */
- i = TCPSessGetNxtSess(pThis, -1);
- while(i != -1) {
- tcps_sess.Destruct(&pThis->pSessions[i]);
- /* now get next... */
- i = TCPSessGetNxtSess(pThis, i);
+ if(!pThis->bUsingEPoll) {
+ i = TCPSessGetNxtSess(pThis, -1);
+ while(i != -1) {
+ tcps_sess.Destruct(&pThis->pSessions[i]);
+ /* now get next... */
+ i = TCPSessGetNxtSess(pThis, i);
+ }
}
/* we are done with the session table - so get rid of it... */
@@ -437,7 +439,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
}
*ppSess = pSess;
- pThis->pSessions[iSess] = pSess;
+ if(!pThis->bUsingEPoll)
+ pThis->pSessions[iSess] = pSess;
pSess = NULL; /* this is now also handed over */
finalize_it:
@@ -465,10 +468,12 @@ RunCancelCleanup(void *arg)
/* process a receive request on one of the streams
+ * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need
+ * to remove any descriptor we close from the epoll set.
* rgerhards, 2009-07-020
*/
static rsRetVal
-doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess)
+doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
{
char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
ssize_t iRcvd;
@@ -490,6 +495,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess)
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
(*ppSess)->pStrm, pszPeer);
}
+ if(pPoll != NULL) {
+ CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
+ }
pThis->pOnRegularClose(*ppSess);
tcps_sess.Destruct(ppSess);
break;
@@ -516,6 +524,8 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess)
tcps_sess.Destruct(ppSess);
break;
}
+
+finalize_it:
RETiRet;
}
@@ -585,7 +595,7 @@ RunSelect(tcpsrv_t *pThis)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- doReceive(pThis, &pThis->pSessions[iTCPSess]);
+ doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
--nfds; /* indicate we have processed one */
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
@@ -613,17 +623,14 @@ finalize_it: /* this is a very special case - this time only we do not exit the
* select() equivalent.
* rgerhards, 2009-11-18
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
static rsRetVal
Run(tcpsrv_t *pThis)
{
DEFiRet;
- int nfds;
int i;
- int iTCPSess;
- int bIsReady;
tcps_sess_t *pNewSess;
nspoll_t *pPoll = NULL;
+ void *pUsr;
rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -632,8 +639,6 @@ Run(tcpsrv_t *pThis)
* this thread. Thus, we also need to instantiate a cancel cleanup handler
* to prevent us from leaking anything. -- rgerhards, 20080-04-24
*/
-#warning implement cancel cleanup handler!
- //pthread_cleanup_push(RunCancelCleanup, (void*) &pSel);
if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) {
// TODO: set driver
localRet = nspoll.ConstructFinalize(pPoll);
@@ -647,15 +652,18 @@ Run(tcpsrv_t *pThis)
dbgprintf("we would use the poll handler, currently not implemented!\n");
+ /* flag that we are in epoll mode */
+ pThis->bUsingEPoll = TRUE;
+
/* Add the TCP listen sockets to the list of sockets to monitor */
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
dbgprintf("Trying to add listener %d\n", i);
- CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, &pThis->ppLstn, NSDPOLL_IN));
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD));
dbgprintf("Added listener %d\n", i);
}
while(1) {
- localRet = nspoll.Wait(pSel, &nfds);
+ localRet = nspoll.Wait(pPoll, -1, &i, &pUsr);
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
@@ -665,69 +673,30 @@ Run(tcpsrv_t *pThis)
*/
if(localRet != RS_RET_OK)
continue;
- }
-#if 0
- while(1) {
- CHKiRet(nssel.Construct(&pSel));
- CHKiRet(nssel.ConstructFinalize(pSel));
+ dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr);
- /* do the sessions */
- iTCPSess = TCPSessGetNxtSess(pThis, -1);
- while(iTCPSess != -1) {
- /* TODO: access to pNsd is NOT really CLEAN, use method... */
- CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD));
- /* now get next... */
- iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
- }
-
- /* wait for io to become ready */
- CHKiRet(nssel.Wait(pSel, &nfds));
- if(glbl.GetGlobalInputTermState() == 1)
- break; /* terminate input! */
-
- for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
- if(glbl.GetGlobalInputTermState() == 1)
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
- if(bIsReady) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
- --nfds; /* indicate we have processed one */
- }
+ if(pUsr == pThis->ppLstn) {
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ pNewSess = (tcps_sess_t*) pUsr;
+ doReceive(pThis, &pNewSess, pPoll);
}
+ }
- /* now check the sessions */
- iTCPSess = TCPSessGetNxtSess(pThis, -1);
- while(nfds && iTCPSess != -1) {
- if(glbl.GetGlobalInputTermState() == 1)
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
- if(bIsReady) {
- doReceive(pThis, &pThis->pSessions[iTCPSess]);
- --nfds; /* indicate we have processed one */
- }
- iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
- }
- CHKiRet(nssel.Destruct(&pSel));
-finalize_it: /* this is a very special case - this time only we do not exit the function,
- * because that would not help us either. So we simply retry it. Let's see
- * if that actually is a better idea. Exiting the loop wasn't we always
- * crashed, which made sense (the rest of the engine was not prepared for
- * that) -- rgerhards, 2008-05-19
- */
- /*EMPTY*/;
+ /* remove the tcp listen sockets from the epoll set */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL));
}
-#endif
- /* note that this point is usually not reached */
-// pthread_cleanup_pop(1); /* remove cleanup handler */
finalize_it:
if(pPoll != NULL)
nspoll.Destruct(&pPoll);
RETiRet;
}
-#pragma GCC diagnostic warning "-Wempty-body"
/* Standard-Constructor */
@@ -1032,7 +1001,6 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->ConstructFinalize = tcpsrvConstructFinalize;
pIf->Destruct = tcpsrvDestruct;
- //pIf->SessAccept = SessAccept;
pIf->configureTCPListen = configureTCPListen;
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;
diff --git a/tcpsrv.h b/tcpsrv.h
index b8d82163..e7a95a46 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -55,6 +55,7 @@ struct tcpsrv_s {
ruleset_t *pRuleset; /**< ruleset to bind to */
permittedPeers_t *pPermPeers;/**< driver's permitted peers */
bool bEmitMsgOnClose; /**< emit an informational message when the remote peer closes connection */
+ bool bUsingEPoll; /**< are we in epoll mode (means we do not need to keep track of sessions!) */
int iLstnCurr; /**< max nbr of listeners currently supported */
netstrm_t **ppLstn; /**< our netstream listners */
tcpLstnPortList_t **ppLstnPort; /**< pointer to relevant listen port description */