summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c263
1 files changed, 202 insertions, 61 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index cbb3a504..95c45780 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -68,11 +68,13 @@
#include "netstrms.h"
#include "netstrm.h"
#include "nssel.h"
+#include "nspoll.h"
#include "errmsg.h"
#include "ruleset.h"
#include "unicode-helper.h"
MODULE_TYPE_LIB
+MODULE_TYPE_NOKEEP
/* defines */
#define TCPSESS_MAX_DEFAULT 200 /* default for nbr of tcp sessions if no number is given */
@@ -89,6 +91,7 @@ DEFobjCurrIf(net)
DEFobjCurrIf(netstrms)
DEFobjCurrIf(netstrm)
DEFobjCurrIf(nssel)
+DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
@@ -104,7 +107,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
ISOBJ_TYPE_assert(pThis, tcpsrv);
/* create entry */
- CHKmalloc(pEntry = malloc(sizeof(tcpLstnPortList_t)));
+ CHKmalloc(pEntry = MALLOC(sizeof(tcpLstnPortList_t)));
pEntry->pszPort = pszPort;
pEntry->pSrv = pThis;
pEntry->pRuleset = pThis->pRuleset;
@@ -165,9 +168,9 @@ TCPSessTblInit(tcpsrv_t *pThis)
ISOBJ_TYPE_assert(pThis, tcpsrv);
assert(pThis->pSessions == NULL);
- dbgprintf("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax);
+ DBGPRINTF("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax);
if((pThis->pSessions = (tcps_sess_t **) calloc(pThis->iSessMax, sizeof(tcps_sess_t *))) == NULL) {
- dbgprintf("Error: TCPSessInit() could not alloc memory for TCP session table.\n");
+ DBGPRINTF("Error: TCPSessInit() could not alloc memory for TCP session table.\n");
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
@@ -238,11 +241,13 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
if(pThis->pSessions != NULL) {
/* close all TCP connections! */
- i = TCPSessGetNxtSess(pThis, -1);
- while(i != -1) {
- tcps_sess.Destruct(&pThis->pSessions[i]);
- /* now get next... */
- i = TCPSessGetNxtSess(pThis, i);
+ if(!pThis->bUsingEPoll) {
+ i = TCPSessGetNxtSess(pThis, -1);
+ while(i != -1) {
+ tcps_sess.Destruct(&pThis->pSessions[i]);
+ /* now get next... */
+ i = TCPSessGetNxtSess(pThis, i);
+ }
}
/* we are done with the session table - so get rid of it... */
@@ -412,7 +417,7 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
* rgerhards, 2005-09-26
*/
if(!pThis->pIsPermittedHost((struct sockaddr*) addr, (char*) fromHostFQDN, pThis->pUsr, pSess->pUsr)) {
- dbgprintf("%s is not an allowed sender\n", fromHostFQDN);
+ DBGPRINTF("%s is not an allowed sender\n", fromHostFQDN);
if(glbl.GetOption_DisallowWarning()) {
errno = 0;
errmsg.LogError(0, RS_RET_HOST_NOT_PERMITTED, "TCP message from disallowed sender %s discarded", fromHostFQDN);
@@ -438,7 +443,8 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
}
*ppSess = pSess;
- pThis->pSessions[iSess] = pSess;
+ if(!pThis->bUsingEPoll)
+ pThis->pSessions[iSess] = pSess;
pSess = NULL; /* this is now also handed over */
finalize_it:
@@ -465,26 +471,101 @@ RunCancelCleanup(void *arg)
}
-/* This function is called to gather input. */
-#pragma GCC diagnostic ignored "-Wempty-body"
+/* helper to close a session. Takes status of poll vs. select into consideration.
+ * rgerhards, 2009-11-25
+ */
+static inline rsRetVal
+closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) {
+ DEFiRet;
+ if(pPoll != NULL) {
+ CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
+ }
+ pThis->pOnRegularClose(*ppSess);
+ tcps_sess.Destruct(ppSess);
+finalize_it:
+ RETiRet;
+}
+
+
+/* process a receive request on one of the streams
+ * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need
+ * to remove any descriptor we close from the epoll set.
+ * rgerhards, 2009-07-020
+ */
static rsRetVal
-Run(tcpsrv_t *pThis)
+doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
{
- DEFiRet;
+ char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
+ ssize_t iRcvd;
rsRetVal localRet;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm);
+ /* Receive message */
+ iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd);
+ switch(iRet) {
+ case RS_RET_CLOSED:
+ if(pThis->bEmitMsgOnClose) {
+ uchar *pszPeer;
+ int lenPeer;
+ errno = 0;
+ prop.GetString((*ppSess)->fromHostIP, &pszPeer, &lenPeer);
+ errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
+ (*ppSess)->pStrm, pszPeer);
+ }
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ break;
+ case RS_RET_RETRY:
+ /* we simply ignore retry - this is not an error, but we also have not received anything */
+ break;
+ case RS_RET_OK:
+ /* valid data received, process it! */
+ localRet = tcps_sess.DataRcvd(*ppSess, buf, iRcvd);
+ if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) {
+ /* in this case, something went awfully wrong.
+ * We are instructed to terminate the session.
+ */
+ errmsg.LogError(0, localRet, "Tearing down TCP Session - see "
+ "previous messages for reason(s)\n");
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ }
+ break;
+ default:
+ errno = 0;
+ errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n",
+ (*ppSess)->pStrm);
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ break;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to gather input.
+ * This variant here is only used if we need to work with a netstream driver
+ * that does not support epoll().
+ */
+#pragma GCC diagnostic ignored "-Wempty-body"
+static inline rsRetVal
+RunSelect(tcpsrv_t *pThis)
+{
+ DEFiRet;
int nfds;
int i;
int iTCPSess;
int bIsReady;
tcps_sess_t *pNewSess;
nssel_t *pSel = NULL;
- ssize_t iRcvd;
+ rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
/* this is an endless loop - it is terminated by the framework canelling
* this thread. Thus, we also need to instantiate a cancel cleanup handler
- * to prevent us from leaking anything. -- rgerharsd, 20080-04-24
+ * to prevent us from leaking anything. -- rgerhards, 20080-04-24
*/
pthread_cleanup_push(RunCancelCleanup, (void*) &pSel);
while(1) {
@@ -508,11 +589,15 @@ Run(tcpsrv_t *pThis)
/* wait for io to become ready */
CHKiRet(nssel.Wait(pSel, &nfds));
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
--nfds; /* indicate we have processed one */
}
@@ -521,50 +606,11 @@ Run(tcpsrv_t *pThis)
/* now check the sessions */
iTCPSess = TCPSessGetNxtSess(pThis, -1);
while(nfds && iTCPSess != -1) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
localRet = nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds);
if(bIsReady || localRet != RS_RET_OK) {
- char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
- dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm);
-
- /* Receive message */
- iRet = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf), &iRcvd);
- switch(iRet) {
- case RS_RET_CLOSED:
- if(pThis->bEmitMsgOnClose) {
- uchar *pszPeer;
- int lenPeer;
- errno = 0;
- prop.GetString(pThis->pSessions[iTCPSess]->fromHostIP, &pszPeer, &lenPeer);
- errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
- pThis->pSessions[iTCPSess]->pStrm, pszPeer);
- }
- pThis->pOnRegularClose(pThis->pSessions[iTCPSess]);
- tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
- break;
- case RS_RET_RETRY:
- /* we simply ignore retry - this is not an error, but we also have not received anything */
- break;
- case RS_RET_OK:
- /* valid data received, process it! */
- localRet = tcps_sess.DataRcvd(pThis->pSessions[iTCPSess], buf, iRcvd);
- if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) {
- /* in this case, something went awfully wrong.
- * We are instructed to terminate the session.
- */
- errmsg.LogError(0, localRet, "Tearing down TCP Session %d - see "
- "previous messages for reason(s)", iTCPSess);
- pThis->pOnErrClose(pThis->pSessions[iTCPSess]);
- tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
- }
- break;
- default:
- errno = 0;
- errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n",
- pThis->pSessions[iTCPSess]->pStrm);
- pThis->pOnErrClose(pThis->pSessions[iTCPSess]);
- tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
- break;
- }
+ doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
--nfds; /* indicate we have processed one */
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
@@ -582,18 +628,100 @@ finalize_it: /* this is a very special case - this time only we do not exit the
}
/* note that this point is usually not reached */
- pthread_cleanup_pop(0); /* remove cleanup handler */
+ pthread_cleanup_pop(1); /* remove cleanup handler */
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
+/* This function is called to gather input. It tries doing that via the epoll()
+ * interface. If the driver does not support that, it falls back to calling its
+ * select() equivalent.
+ * rgerhards, 2009-11-18
+ */
+static rsRetVal
+Run(tcpsrv_t *pThis)
+{
+ DEFiRet;
+ int i;
+ tcps_sess_t *pNewSess;
+ nspoll_t *pPoll = NULL;
+ void *pUsr;
+ rsRetVal localRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ /* this is an endless loop - it is terminated by the framework canelling
+ * this thread. Thus, we also need to instantiate a cancel cleanup handler
+ * to prevent us from leaking anything. -- rgerhards, 20080-04-24
+ */
+ if((localRet = nspoll.Construct(&pPoll)) == RS_RET_OK) {
+ // TODO: set driver
+ localRet = nspoll.ConstructFinalize(pPoll);
+ }
+ if(localRet != RS_RET_OK) {
+ /* fall back to select */
+ dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
+ iRet = RunSelect(pThis);
+ FINALIZE;
+ }
+
+ dbgprintf("tcpsrv uses epoll() interface, nsdpol driver found\n");
+
+ /* flag that we are in epoll mode */
+ pThis->bUsingEPoll = TRUE;
+
+ /* Add the TCP listen sockets to the list of sockets to monitor */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn);
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD));
+ dbgprintf("Added listener %d\n", i);
+ }
+
+ while(1) {
+ localRet = nspoll.Wait(pPoll, -1, &i, &pUsr);
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
+
+ /* check if we need to ignore the i/o ready state. We do this if we got an invalid
+ * return state. Validly, this can happen for RS_RET_EINTR, for other cases it may
+ * not be the right thing, but what is the right thing is really hard at this point...
+ */
+ if(localRet != RS_RET_OK)
+ continue;
+
+ dbgprintf("poll returned with i %d, pUsr %p\n", i, pUsr);
+
+ if(pUsr == pThis->ppLstn) {
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ pNewSess = (tcps_sess_t*) pUsr;
+ doReceive(pThis, &pNewSess, pPoll);
+ }
+ }
+
+ /* remove the tcp listen sockets from the epoll set */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL));
+ }
+
+finalize_it:
+ if(pPoll != NULL)
+ nspoll.Destruct(&pPoll);
+ RETiRet;
+}
+
+
/* Standard-Constructor */
BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
pThis->iSessMax = TCPSESS_MAX_DEFAULT;
pThis->iLstnMax = TCPLSTN_MAX_DEFAULT;
pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ pThis->bDisableLFDelim = 0;
pThis->OnMsgReceive = NULL;
ENDobjConstruct(tcpsrv)
@@ -750,6 +878,18 @@ SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*,
}
+/* set enable/disable standard LF frame delimiter (use with care!)
+ * -- rgerhards, 2010-01-03
+ */
+static rsRetVal
+SetbDisableLFDelim(tcpsrv_t *pThis, int bVal)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->bDisableLFDelim = bVal;
+ RETiRet;
+}
+
/* Set additional framing to use (if any) -- rgerhards, 2008-12-10 */
static rsRetVal
@@ -891,7 +1031,6 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->ConstructFinalize = tcpsrvConstructFinalize;
pIf->Destruct = tcpsrvDestruct;
- //pIf->SessAccept = SessAccept;
pIf->configureTCPListen = configureTCPListen;
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;
@@ -899,6 +1038,7 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->SetUsrP = SetUsrP;
pIf->SetInputName = SetInputName;
pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
+ pIf->SetbDisableLFDelim = SetbDisableLFDelim;
pIf->SetSessMax = SetSessMax;
pIf->SetLstnMax = SetLstnMax;
pIf->SetDrvrMode = SetDrvrMode;
@@ -952,6 +1092,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
CHKiRet(objUse(netstrm, DONT_LOAD_LIB));
CHKiRet(objUse(nssel, DONT_LOAD_LIB));
+ CHKiRet(objUse(nspoll, DONT_LOAD_LIB));
CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB));
CHKiRet(objUse(conf, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));