summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-04-24 17:43:45 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2008-04-24 17:43:45 +0200
commit4b05bef636c11cbaf4d32097ed9656a1447ed3d0 (patch)
tree15c698fa86a85e5e3a1db1f14e2eaf6f11c6948f
parent82095efa24ea0692a6747d4296f398ebd37e5339 (diff)
parenta7040a9623e228043209da897dbf30b9ab02d771 (diff)
downloadrsyslog-4b05bef636c11cbaf4d32097ed9656a1447ed3d0.tar.gz
rsyslog-4b05bef636c11cbaf4d32097ed9656a1447ed3d0.tar.xz
rsyslog-4b05bef636c11cbaf4d32097ed9656a1447ed3d0.zip
Merge branch 'sock-abstract' into tls
Conflicts: runtime/Makefile.am runtime/netstrm.c runtime/nsd.h runtime/nsd_ptcp.c runtime/rsyslog.h
-rw-r--r--plugins/imtcp/imtcp.c12
-rw-r--r--runtime/Makefile.am27
-rw-r--r--runtime/netstrm.c167
-rw-r--r--runtime/netstrm.h17
-rw-r--r--runtime/netstrms.c200
-rw-r--r--runtime/netstrms.h52
-rw-r--r--runtime/nsd.h28
-rw-r--r--runtime/nsd_ptcp.c246
-rw-r--r--runtime/nsd_ptcp.h2
-rw-r--r--runtime/nsdsel_ptcp.c222
-rw-r--r--runtime/nsdsel_ptcp.h46
-rw-r--r--runtime/nssel.c229
-rw-r--r--runtime/nssel.h55
-rw-r--r--runtime/obj-types.h8
-rw-r--r--runtime/obj.c2
-rw-r--r--runtime/rsyslog.h11
-rw-r--r--tcps_sess.c45
-rw-r--r--tcps_sess.h8
-rw-r--r--tcpsrv.c403
-rw-r--r--tcpsrv.h20
-rw-r--r--tools/omfwd.c23
21 files changed, 1299 insertions, 524 deletions
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index b7f8f0b5..1e599d14 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -43,6 +43,7 @@
#include "cfsysline.h"
#include "module-template.h"
#include "net.h"
+#include "netstrm.h"
#include "tcpsrv.h"
MODULE_TYPE_INPUT
@@ -52,6 +53,7 @@ DEF_IMOD_STATIC_DATA
DEFobjCurrIf(tcpsrv)
DEFobjCurrIf(tcps_sess)
DEFobjCurrIf(net)
+DEFobjCurrIf(netstrm)
/* Module static data */
static tcpsrv_t *pOurTcpsrv = NULL; /* our TCP server(listener) TODO: change for multiple instances */
@@ -70,7 +72,7 @@ isPermittedHost(struct sockaddr *addr, char *fromHostFQDN, void __attribute__((u
}
-static int*
+static rsRetVal
doOpenLstnSocks(tcpsrv_t *pSrv)
{
ISOBJ_TYPE_assert(pSrv, tcpsrv);
@@ -81,10 +83,12 @@ doOpenLstnSocks(tcpsrv_t *pSrv)
static int
doRcvData(tcps_sess_t *pSess, char *buf, size_t lenBuf)
{
- int state;
+ ssize_t state;
assert(pSess != NULL);
- state = recv(pSess->sock, buf, lenBuf, 0);
+ state = lenBuf;
+ if(netstrm.Rcv(pSess->pStrm, (uchar*) buf, &state) != RS_RET_OK)
+ state = -1; // TODO: move this function to an iRet interface! 2008-04-23
return state;
}
@@ -172,6 +176,7 @@ CODESTARTmodExit
/* release objects we used */
objRelease(net, LM_NET_FILENAME);
+ objRelease(netstrm, LM_NETSTRM_FILENAME);
objRelease(tcps_sess, LM_TCPSRV_FILENAME);
objRelease(tcpsrv, LM_TCPSRV_FILENAME);
ENDmodExit
@@ -199,6 +204,7 @@ CODEmodInit_QueryRegCFSLineHdlr
pOurTcpsrv = NULL;
/* request objects we use */
CHKiRet(objUse(net, LM_NET_FILENAME));
+ CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME));
CHKiRet(objUse(tcps_sess, LM_TCPSRV_FILENAME));
CHKiRet(objUse(tcpsrv, LM_TCPSRV_FILENAME));
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 1d07e79c..15efcbb8 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -81,7 +81,7 @@ lmregexp_la_LIBADD =
endif
if ENABLE_INET
-pkglib_LTLIBRARIES += lmnet.la lmnetstrm.la
+pkglib_LTLIBRARIES += lmnet.la lmnetstrms.la lmnetstrm.la lmnssel.la
#
# network support
#
@@ -90,18 +90,39 @@ lmnet_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags)
lmnet_la_LDFLAGS = -module -avoid-version
lmnet_la_LIBADD =
-# network streams
+# network stream master class and stream factory
+lmnetstrms_la_SOURCES = netstrms.c netstrms.h
+lmnetstrms_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags)
+lmnetstrms_la_LDFLAGS = -module -avoid-version
+lmnetstrms_la_LIBADD =
+
+# individual network streams
lmnetstrm_la_SOURCES = netstrm.c netstrm.h
lmnetstrm_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags)
lmnetstrm_la_LDFLAGS = -module -avoid-version
lmnetstrm_la_LIBADD =
-# plain tcp netstream driver
+# network stream select support (a helper class)
+lmnssel_la_SOURCES = nssel.c nssel.h
+lmnssel_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags)
+lmnssel_la_LDFLAGS = -module -avoid-version
+lmnssel_la_LIBADD =
+
+# netstream drivers
+
+# plain tcp driver - main driver
pkglib_LTLIBRARIES += lmnsd_ptcp.la
lmnsd_ptcp_la_SOURCES = nsd_ptcp.c nsd_ptcp.h
lmnsd_ptcp_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags)
lmnsd_ptcp_la_LDFLAGS = -module -avoid-version
lmnsd_ptcp_la_LIBADD =
+
+# select interface for ptcp driver
+pkglib_LTLIBRARIES += lmnsdsel_ptcp.la
+lmnsdsel_ptcp_la_SOURCES = nsdsel_ptcp.c nsdsel_ptcp.h
+lmnsdsel_ptcp_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags)
+lmnsdsel_ptcp_la_LDFLAGS = -module -avoid-version
+lmnsdsel_ptcp_la_LIBADD =
endif # if ENABLE_INET
#
diff --git a/runtime/netstrm.c b/runtime/netstrm.c
index a304ada4..5e073899 100644
--- a/runtime/netstrm.c
+++ b/runtime/netstrm.c
@@ -1,4 +1,4 @@
-/* netstrmstrm.c
+/* netstrm.c
*
* This class implements a generic netstrmwork stream class. It supports
* sending and receiving data streams over a netstrmwork. The class abstracts
@@ -38,29 +38,16 @@
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
#include "config.h"
-
-#include "rsyslog.h"
-#include <stdio.h>
-#include <stdarg.h>
#include <stdlib.h>
#include <assert.h>
-#include <errno.h>
#include <string.h>
-#include <signal.h>
-#include <ctype.h>
-#include <netdb.h>
-#include <fnmatch.h>
-#include <fcntl.h>
-#include <unistd.h>
-
-#include "syslogd-types.h"
+
+#include "rsyslog.h"
#include "module-template.h"
-#include "parse.h"
-#include "srUtils.h"
#include "obj.h"
#include "errmsg.h"
-#include "net.h"
-#include "nsd.h"
+//#include "nsd.h"
+#include "netstrms.h"
#include "netstrm.h"
MODULE_TYPE_LIB
@@ -68,41 +55,7 @@ MODULE_TYPE_LIB
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
-DEFobjCurrIf(glbl)
-DEFobjCurrIf(net)
-
-
-/* load our low-level driver. This must be done before any
- * driver-specific functions (allmost all...) can be carried
- * out. Note that the driver's .ifIsLoaded is correctly
- * initialized by calloc() and we depend on that.
- * rgerhards, 2008-04-18
- */
-static rsRetVal
-loadDrvr(netstrm_t *pThis)
-{
- uchar *pDrvrName;
- DEFiRet;
-
- if(pThis->Drvr.ifIsLoaded == 0) {
- pDrvrName = pThis->pDrvrName;
- if(pDrvrName == NULL) { /* if no drvr name is set, use system default */
- pDrvrName = glbl.GetDfltNetstrmDrvr();
- pThis->pDrvrName = (uchar*)strdup((char*)pDrvrName); // TODO: use set method once it exists
- }
-
- pThis->Drvr.ifVersion = nsdCURR_IF_VERSION;
- /* The pDrvrName+2 below is a hack to obtain the object name. It
- * safes us to have yet another variable with the name without "lm" in
- * front of it. If we change the module load interface, we may re-think
- * about this hack, but for the time being it is efficient and clean
- * enough. -- rgerhards, 2008-04-18
- */
- CHKiRet(obj.UseObj(__FILE__, pDrvrName+2, pDrvrName, (void*) &pThis->Drvr));
- }
-finalize_it:
- RETiRet;
-}
+DEFobjCurrIf(netstrms)
/* Standard-Constructor */
@@ -131,7 +84,6 @@ netstrmConstructFinalize(netstrm_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrm);
- CHKiRet(loadDrvr(pThis));
CHKiRet(pThis->Drvr.Construct(&pThis->pDrvrData));
finalize_it:
RETiRet;
@@ -155,73 +107,56 @@ AbortDestruct(netstrm_t **ppThis)
}
-#if 0
-This is not yet working - wait until we arrive at the receiver side (distracts too much at the moment)
-
-/* accept an incoming connection request, pNsdLstn provides the "listen socket" on which we can
- * accept the new session.
- * rgerhards, 2008-03-17
+/* accept an incoming connection request
+ * The netstrm instance that had the incoming request must be provided. If
+ * the connection request succeeds, a new netstrm object is created and
+ * passed back to the caller. The caller is responsible for destructing it.
+ * pReq is the nsd_t obj that has the accept request.
+ * rgerhards, 2008-04-21
*/
static rsRetVal
-AcceptConnReq(netstrm_t **ppThis, nsd_t *pNsdLstn)
+AcceptConnReq(netstrm_t *pThis, netstrm_t **ppNew)
{
- netstrm_t *pThis = NULL;
- nsd_t *pNsd;
+ nsd_t *pNewNsd = NULL;
DEFiRet;
- assert(ppThis != NULL);
+ ISOBJ_TYPE_assert(pThis, netstrm);
+ assert(ppNew != NULL);
+ /* accept the new connection */
+ CHKiRet(pThis->Drvr.AcceptConnReq(pThis->pDrvrData, &pNewNsd));
/* construct our object so that we can use it... */
- CHKiRet(netstrmConstruct(&pThis));
-
- /* TODO: obtain hostname, normalize (callback?), save it */
- CHKiRet(FillRemHost(pThis, (struct sockaddr*) &addr));
-
- /* set the new socket to non-blocking IO */
- if((sockflags = fcntl(iNewSock, F_GETFL)) != -1) {
- sockflags |= O_NONBLOCK;
- /* SETFL could fail too, so get it caught by the subsequent
- * error check.
- */
- sockflags = fcntl(iNewSock, F_SETFL, sockflags);
- }
- if(sockflags == -1) {
- dbgprintf("error %d setting fcntl(O_NONBLOCK) on tcp socket %d", errno, iNewSock);
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- }
-
- pThis->sock = iNewSock;
-
- *ppThis = pThis;
+ CHKiRet(netstrms.CreateStrm(pThis->pNS, ppNew));
+ (*ppNew)->pDrvrData = pNewNsd;
finalize_it:
if(iRet != RS_RET_OK) {
- if(pThis != NULL)
- netstrmDestruct(&pThis);
/* the close may be redundant, but that doesn't hurt... */
- if(iNewSock >= 0)
- close(iNewSock);
+ if(pNewNsd != NULL)
+ pThis->Drvr.Destruct(&pNewNsd);
}
RETiRet;
}
-#endif
-/* initialize the tcp socket for a listner
- * pLstnPort must point to a port name or number. NULL is NOT permitted
- * (hint: we need to be careful when we use this module together with librelp,
- * there NULL indicates the default port
- * default is used.
- * gerhards, 2008-03-17
+/* make the netstrm listen to specified port and IP.
+ * pLstnIP points to the port to listen to (NULL means "all"),
+ * iMaxSess has the maximum number of sessions permitted (this ist just a hint).
+ * pLstnPort must point to a port name or number. NULL is NOT permitted.
+ * rgerhards, 2008-04-22
*/
static rsRetVal
-LstnInit(netstrm_t *pThis, uchar *pLstnPort)
+LstnInit(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*),
+ uchar *pLstnPort, uchar *pLstnIP, int iSessMax)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, netstrm);
+
+ ISOBJ_TYPE_assert(pNS, netstrms);
+ assert(fAddLstn != NULL);
assert(pLstnPort != NULL);
- CHKiRet(pThis->Drvr.LstnInit(pThis->pDrvrData, pLstnPort));
+
+ CHKiRet(pNS->Drvr.LstnInit(pNS, pUsr, fAddLstn, pLstnPort, pLstnIP, iSessMax));
finalize_it:
RETiRet;
@@ -263,6 +198,28 @@ Send(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf)
}
+/* get remote hname - slim wrapper for NSD driver function */
+static rsRetVal
+GetRemoteHName(netstrm_t *pThis, uchar **ppsz)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, netstrm);
+ iRet = pThis->Drvr.GetRemoteHName(pThis->pDrvrData, ppsz);
+ RETiRet;
+}
+
+
+/* get remote IP - slim wrapper for NSD driver function */
+static rsRetVal
+GetRemoteIP(netstrm_t *pThis, uchar **ppsz)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, netstrm);
+ iRet = pThis->Drvr.GetRemoteIP(pThis->pDrvrData, ppsz);
+ RETiRet;
+}
+
+
/* open a connection to a remote host (server).
* rgerhards, 2008-03-19
*/
@@ -295,11 +252,13 @@ CODESTARTobjQueryInterface(netstrm)
pIf->ConstructFinalize = netstrmConstructFinalize;
pIf->Destruct = netstrmDestruct;
pIf->AbortDestruct = AbortDestruct;
- pIf->LstnInit = LstnInit;
- // TODO: add later: pIf->AcceptConnReq = AcceptConnReq;
pIf->Rcv = Rcv;
pIf->Send = Send;
pIf->Connect = Connect;
+ pIf->LstnInit = LstnInit;
+ pIf->AcceptConnReq = AcceptConnReq;
+ pIf->GetRemoteHName = GetRemoteHName;
+ pIf->GetRemoteIP = GetRemoteIP;
finalize_it:
ENDobjQueryInterface(netstrm)
@@ -309,9 +268,8 @@ ENDobjQueryInterface(netstrm)
BEGINObjClassExit(netstrm, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
CODESTARTObjClassExit(netstrm)
/* release objects we no longer need */
- objRelease(net, CORE_COMPONENT);
- objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
+ objRelease(netstrms, LM_NETSTRMS_FILENAME);
ENDObjClassExit(netstrm)
@@ -322,8 +280,7 @@ ENDObjClassExit(netstrm)
BEGINAbstractObjClassInit(netstrm, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
- CHKiRet(objUse(glbl, CORE_COMPONENT));
- CHKiRet(objUse(net, CORE_COMPONENT));
+ CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
/* set our own handlers */
ENDObjClassInit(netstrm)
diff --git a/runtime/netstrm.h b/runtime/netstrm.h
index 75b7c457..f4205f80 100644
--- a/runtime/netstrm.h
+++ b/runtime/netstrm.h
@@ -29,9 +29,10 @@
/* the netstrm object */
struct netstrm_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
- nsd_if_t Drvr; /**< our stream driver */
- nsd_t *pDrvrData; /**< the driver's data elements */
+ nsd_t *pDrvrData; /**< the driver's data elements (at most other places, this is called pNsd) */
uchar *pDrvrName; /**< nsd driver name to use, or NULL if system default */
+ nsd_if_t Drvr; /**< our stream driver */
+ netstrms_t *pNS; /**< pointer to our netstream subsystem object */
};
@@ -41,11 +42,19 @@ BEGINinterface(netstrm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*ConstructFinalize)(netstrm_t *pThis);
rsRetVal (*Destruct)(netstrm_t **ppThis);
rsRetVal (*AbortDestruct)(netstrm_t **ppThis);
- rsRetVal (*LstnInit)(netstrm_t *pThis, unsigned char *pLstnPort);
- rsRetVal (*AcceptConnReq)(netstrm_t **ppThis, int sock);
+ rsRetVal (*LstnInit)(netstrms_t *pNS, void *pUsr, rsRetVal(*)(void*,netstrm_t*),
+ uchar *pLstnPort, uchar *pLstnIP, int iSessMax);
+ rsRetVal (*AcceptConnReq)(netstrm_t *pThis, netstrm_t **ppNew);
rsRetVal (*Rcv)(netstrm_t *pThis, uchar *pRcvBuf, ssize_t *pLenBuf);
rsRetVal (*Send)(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf);
rsRetVal (*Connect)(netstrm_t *pThis, int family, unsigned char *port, unsigned char *host);
+ //rsRetVal (*SelectInit)(nsdsel_t **ppSel, netstrm_t *pThis);
+ //rsRetVal (*SelectAdd)(nsdsel_t *pSel, netstrm_t *pThis);
+ //rsRetVal (*SelectWait)(nsdsel_t *pSel, int *piNumReady);
+ //rsRetVal (*SelectIsReady)(nsdsel_t *pSel, int *piNumReady);
+ //rsRetVal (*SelectExit)(nsdsel_t **ppSel);
+ rsRetVal (*GetRemoteHName)(netstrm_t *pThis, uchar **pszName);
+ rsRetVal (*GetRemoteIP)(netstrm_t *pThis, uchar **pszIP);
ENDinterface(netstrm)
#define netstrmCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
diff --git a/runtime/netstrms.c b/runtime/netstrms.c
new file mode 100644
index 00000000..661234e4
--- /dev/null
+++ b/runtime/netstrms.c
@@ -0,0 +1,200 @@
+/* netstrms.c
+ *
+ * Work on this module begung 2008-04-23 by Rainer Gerhards.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#include "config.h"
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+
+#include "rsyslog.h"
+#include "module-template.h"
+#include "obj.h"
+//#include "errmsg.h"
+//#include "net.h"
+#include "nsd.h"
+#include "netstrm.h"
+#include "netstrms.h"
+
+MODULE_TYPE_LIB
+
+/* static data */
+DEFobjStaticHelpers
+//DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(netstrm)
+//DEFobjCurrIf(net)
+
+
+/* load our low-level driver. This must be done before any
+ * driver-specific functions (allmost all...) can be carried
+ * out. Note that the driver's .ifIsLoaded is correctly
+ * initialized by calloc() and we depend on that.
+ * rgerhards, 2008-04-18
+ */
+static rsRetVal
+loadDrvr(netstrms_t *pThis)
+{
+ uchar *pDrvrName;
+ DEFiRet;
+
+ pDrvrName = pThis->pDrvrName;
+ if(pDrvrName == NULL) /* if no drvr name is set, use system default */
+ pDrvrName = glbl.GetDfltNetstrmDrvr();
+
+ pThis->Drvr.ifVersion = nsdCURR_IF_VERSION;
+ /* The pDrvrName+2 below is a hack to obtain the object name. It
+ * safes us to have yet another variable with the name without "lm" in
+ * front of it. If we change the module load interface, we may re-think
+ * about this hack, but for the time being it is efficient and clean
+ * enough. -- rgerhards, 2008-04-18
+ */
+ CHKiRet(obj.UseObj(__FILE__, pDrvrName+2, pDrvrName, (void*) &pThis->Drvr));
+finalize_it:
+ RETiRet;
+}
+
+
+/* Standard-Constructor */
+BEGINobjConstruct(netstrms) /* be sure to specify the object type also in END macro! */
+ENDobjConstruct(netstrms)
+
+
+/* destructor for the netstrms object */
+BEGINobjDestruct(netstrms) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(netstrms)
+ if(pThis->pDrvrName != NULL)
+ free(pThis->pDrvrName);
+ENDobjDestruct(netstrms)
+
+
+/* ConstructionFinalizer */
+static rsRetVal
+netstrmsConstructFinalize(netstrms_t *pThis)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, netstrms);
+ CHKiRet(loadDrvr(pThis));
+finalize_it:
+ RETiRet;
+}
+
+
+/* create an instance of a netstrm object. It is initialized with default
+ * values. The current driver is used. The caller may set netstrm properties
+ * and must call ConstructFinalize().
+ */
+static rsRetVal
+CreateStrm(netstrms_t *pThis, netstrm_t **ppStrm)
+{
+ netstrm_t *pStrm = NULL;
+ DEFiRet;
+
+ CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME));
+ CHKiRet(netstrm.Construct(&pStrm));
+ /* we copy over our driver structure. We could provide a pointer to
+ * ourselves, but that costs some performance on each driver invocation.
+ * As we already have hefty indirection (and thus performance toll), I
+ * prefer to copy over the function pointers here. -- rgerhards, 2008-04-23
+ */
+ memcpy(&pStrm->Drvr, &pThis->Drvr, sizeof(pThis->Drvr));
+ pStrm->pNS = pThis;
+
+ *ppStrm = pStrm;
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pStrm != NULL)
+ netstrm.Destruct(&pStrm);
+ }
+ RETiRet;
+}
+
+
+/* queryInterface function */
+BEGINobjQueryInterface(netstrms)
+CODESTARTobjQueryInterface(netstrms)
+ if(pIf->ifVersion != netstrmsCURR_IF_VERSION) {/* check for current version, increment on each change */
+ ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
+ }
+
+ /* ok, we have the right interface, so let's fill it
+ * Please note that we may also do some backwards-compatibility
+ * work here (if we can support an older interface version - that,
+ * of course, also affects the "if" above).
+ */
+ pIf->Construct = netstrmsConstruct;
+ pIf->ConstructFinalize = netstrmsConstructFinalize;
+ pIf->Destruct = netstrmsDestruct;
+ pIf->CreateStrm = CreateStrm;
+finalize_it:
+ENDobjQueryInterface(netstrms)
+
+
+/* exit our class */
+BEGINObjClassExit(netstrms, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(netstrms)
+ /* release objects we no longer need */
+ //objRelease(net, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(netstrm, LM_NETSTRM_FILENAME);
+ENDObjClassExit(netstrms)
+
+
+/* Initialize the netstrms class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2008-02-19
+ */
+BEGINAbstractObjClassInit(netstrms, 1, OBJ_IS_CORE_MODULE) /* class, version */
+ /* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ //CHKiRet(objUse(net, CORE_COMPONENT));
+
+ /* set our own handlers */
+ENDObjClassInit(netstrms)
+
+
+/* --------------- here now comes the plumbing that makes as a library module --------------- */
+
+
+BEGINmodExit
+CODESTARTmodExit
+ netstrmsClassExit();
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_LIB_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+
+ /* Initialize all classes that are in our module - this includes ourselfs */
+ CHKiRet(netstrmsClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+ENDmodInit
+/* vi:set ai:
+ */
diff --git a/runtime/netstrms.h b/runtime/netstrms.h
new file mode 100644
index 00000000..1e920304
--- /dev/null
+++ b/runtime/netstrms.h
@@ -0,0 +1,52 @@
+/* Definitions for the stream-based netstrmsworking class.
+ *
+ * Copyright 2007, 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+
+#ifndef INCLUDED_NETSTRMS_H
+#define INCLUDED_NETSTRMS_H
+
+#include "nsd.h" /* we need our driver interface to be defined */
+
+/* the netstrms object */
+struct netstrms_s {
+ BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
+ uchar *pDrvrName; /**< nsd driver name to use, or NULL if system default */
+ nsd_if_t Drvr; /**< our stream driver */
+};
+
+
+/* interface */
+BEGINinterface(netstrms) /* name must also be changed in ENDinterface macro! */
+ rsRetVal (*Construct)(netstrms_t **ppThis);
+ rsRetVal (*ConstructFinalize)(netstrms_t *pThis);
+ rsRetVal (*Destruct)(netstrms_t **ppThis);
+ rsRetVal (*CreateStrm)(netstrms_t *pThis, netstrm_t **ppStrm);
+ENDinterface(netstrms)
+#define netstrmsCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+
+/* prototypes */
+PROTOTYPEObj(netstrms);
+
+/* the name of our library binary */
+#define LM_NETSTRMS_FILENAME "lmnetstrms"
+
+#endif /* #ifndef INCLUDED_NETSTRMS_H */
diff --git a/runtime/nsd.h b/runtime/nsd.h
index 203a65d6..d6fa9e0d 100644
--- a/runtime/nsd.h
+++ b/runtime/nsd.h
@@ -27,6 +27,12 @@
#ifndef INCLUDED_NSD_H
#define INCLUDED_NSD_H
+enum nsdsel_waitOp_e {
+ NSDSEL_RD = 1,
+ NSDSEL_WR = 2,
+ NSDSEL_RDWR = 3
+}; /**< the operation we wait for */
+
/* nsd_t is actually obj_t (which is somewhat better than void* but in essence
* much the same).
*/
@@ -36,17 +42,25 @@ BEGINinterface(nsd) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(nsd_t **ppThis);
rsRetVal (*Destruct)(nsd_t **ppThis);
rsRetVal (*Abort)(nsd_t *pThis);
- rsRetVal (*LstnInit)(nsd_t *pThis, unsigned char *pLstnPort);
- rsRetVal (*AcceptConnReq)(nsd_t **ppThis, int sock);
rsRetVal (*Rcv)(nsd_t *pThis, uchar *pRcvBuf, ssize_t *pLenBuf);
rsRetVal (*Send)(nsd_t *pThis, uchar *pBuf, ssize_t *pLenBuf);
rsRetVal (*Connect)(nsd_t *pThis, int family, unsigned char *port, unsigned char *host);
- rsRetVal (*GetSock)(nsd_t *pThis, int *pSock);
- /* GetSock() returns an error if the driver does not use plain
- * OS sockets. This interface is primarily meant as an internal aid for
- * those drivers that utilize the nsd_ptcp to do some of their work.
- */
+ rsRetVal (*LstnInit)(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*),
+ uchar *pLstnPort, uchar *pLstnIP, int iSessMax);
+ rsRetVal (*AcceptConnReq)(nsd_t *pThis, nsd_t **ppThis);
+ rsRetVal (*GetRemoteHName)(nsd_t *pThis, uchar **pszName);
+ rsRetVal (*GetRemoteIP)(nsd_t *pThis, uchar **pszIP);
ENDinterface(nsd)
#define nsdCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+/* interface for the select call */
+BEGINinterface(nsdsel) /* name must also be changed in ENDinterface macro! */
+ rsRetVal (*Construct)(nsdsel_t **ppThis);
+ rsRetVal (*Destruct)(nsdsel_t **ppThis);
+ rsRetVal (*Add)(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp);
+ rsRetVal (*Select)(nsdsel_t *pNsdsel, int *piNumReady);
+ rsRetVal (*IsReady)(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady);
+ENDinterface(nsdsel)
+#define nsdselCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+
#endif /* #ifndef INCLUDED_NSD_H */
diff --git a/runtime/nsd_ptcp.c b/runtime/nsd_ptcp.c
index c9df8f8c..2b77787e 100644
--- a/runtime/nsd_ptcp.c
+++ b/runtime/nsd_ptcp.c
@@ -45,6 +45,8 @@
#include "obj.h"
#include "errmsg.h"
#include "net.h"
+#include "netstrms.h"
+#include "netstrm.h"
#include "nsd_ptcp.h"
MODULE_TYPE_LIB
@@ -54,32 +56,33 @@ DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(net)
+DEFobjCurrIf(netstrms)
+DEFobjCurrIf(netstrm)
+/* a few deinit helpers */
+
+/* close socket if open (may always be called) */
+static void
+sockClose(int *pSock)
+{
+ if(*pSock >= 0) {
+ close(*pSock);
+ *pSock = -1;
+ }
+}
+
/* Standard-Constructor
*/
BEGINobjConstruct(nsd_ptcp) /* be sure to specify the object type also in END macro! */
pThis->sock = -1;
- pThis->iSessMax = 500; /* default max nbr of sessions -TODO:make configurable--rgerhards, 2008-04-17*/
ENDobjConstruct(nsd_ptcp)
/* destructor for the nsd_ptcp object */
BEGINobjDestruct(nsd_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
- int i;
CODESTARTobjDestruct(nsd_ptcp)
- if(pThis->sock != -1) {
- close(pThis->sock);
- pThis->sock = -1;
- }
-
- if(pThis->socks != NULL) {
- /* if we have some sockets at this stage, we need to close them */
- for(i = 1 ; i <= pThis->socks[0] ; ++i)
- close(pThis->socks[i]);
- free(pThis->socks);
- }
-
+ sockClose(&pThis->sock);
if(pThis->pRemHostIP != NULL)
free(pThis->pRemHostIP);
if(pThis->pRemHostName != NULL)
@@ -209,33 +212,34 @@ finalize_it:
-/* accept an incoming connection request, sock provides the socket on which we can
- * accept the new session.
- * rgerhards, 2008-03-17
+/* accept an incoming connection request
+ * rgerhards, 2008-04-22
*/
static rsRetVal
-AcceptConnReq(nsd_t **ppThis, int sock)
+AcceptConnReq(nsd_t *pNsd, nsd_t **ppNew)
{
int sockflags;
+ nsd_ptcp_t *pThis = (nsd_ptcp_t*) pNsd;
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
- nsd_ptcp_t *pThis = NULL;
+ nsd_ptcp_t *pNew = NULL;
int iNewSock = -1;
DEFiRet;
- assert(ppThis != NULL);
+ assert(ppNew != NULL);
+ ISOBJ_TYPE_assert(pThis, nsd_ptcp);
- iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen);
+ iNewSock = accept(pThis->sock, (struct sockaddr*) &addr, &addrlen);
if(iNewSock < 0) {
ABORT_FINALIZE(RS_RET_ACCEPT_ERR);
}
/* construct our object so that we can use it... */
- CHKiRet(nsd_ptcpConstruct(&pThis));
+ CHKiRet(nsd_ptcpConstruct(&pNew));
- CHKiRet(FillRemHost(pThis, (struct sockaddr*) &addr));
+ CHKiRet(FillRemHost(pNew, (struct sockaddr*) &addr));
- /* set the new socket to non-blocking IO */
+ /* set the new socket to non-blocking IO -TODO:do we really need to do this here? Do we always want it? */
if((sockflags = fcntl(iNewSock, F_GETFL)) != -1) {
sockflags |= O_NONBLOCK;
/* SETFL could fail too, so get it caught by the subsequent
@@ -248,41 +252,45 @@ AcceptConnReq(nsd_t **ppThis, int sock)
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
- pThis->sock = iNewSock;
-
- *ppThis = (nsd_t*) pThis;
+ pNew->sock = iNewSock;
+ *ppNew = (nsd_t*) pNew;
finalize_it:
if(iRet != RS_RET_OK) {
- if(pThis != NULL)
- nsd_ptcpDestruct(&pThis);
+ if(pNew != NULL)
+ nsd_ptcpDestruct(&pNew);
/* the close may be redundant, but that doesn't hurt... */
- if(iNewSock >= 0)
- close(iNewSock);
+ sockClose(&iNewSock);
}
RETiRet;
}
-/* initialize the tcp socket for a listner
- * pLstnPort must point to a port name or number. NULL is NOT permitted
- * (hint: we need to be careful when we use this module together with librelp,
- * there NULL indicates the default port
- * default is used.
- * gerhards, 2008-03-17
+/* initialize tcp sockets for a listner. The initialized sockets are passed to the
+ * app-level caller via a callback.
+ * pLstnPort must point to a port name or number. NULL is NOT permitted. pLstnIP
+ * points to the port to listen to (NULL means "all"), iMaxSess has the maximum
+ * number of sessions permitted.
+ * rgerhards, 2008-04-22
*/
static rsRetVal
-LstnInit(nsd_t *pNsd, uchar *pLstnPort)
+LstnInit(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*),
+ uchar *pLstnPort, uchar *pLstnIP, int iSessMax)
{
- nsd_ptcp_t *pThis = (nsd_ptcp_t*) pNsd;
- struct addrinfo hints, *res, *r;
- int error, maxs, *s, on = 1;
+ DEFiRet;
+ netstrm_t *pNewStrm = NULL;
+ nsd_t *pNewNsd = NULL;
+ int error, maxs, on = 1;
+ int sock;
+ int numSocks;
int sockflags;
+ struct addrinfo hints, *res = NULL, *r;
- DEFiRet;
- ISOBJ_TYPE_assert(pThis, nsd_ptcp);
+ ISOBJ_TYPE_assert(pNS, netstrms);
+ assert(fAddLstn != NULL);
assert(pLstnPort != NULL);
+ assert(iSessMax >= 0);
dbgprintf("creating tcp listen socket on port %s\n", pLstnPort);
@@ -291,7 +299,7 @@ LstnInit(nsd_t *pNsd, uchar *pLstnPort)
hints.ai_family = glbl.GetDefPFFamily();
hints.ai_socktype = SOCK_STREAM;
- error = getaddrinfo(NULL, (char*) pLstnPort, &hints, &res);
+ error = getaddrinfo((char*)pLstnIP, (char*) pLstnPort, &hints, &res);
if(error) {
dbgprintf("error %d querying port '%s'\n", error, pLstnPort);
ABORT_FINALIZE(RS_RET_INVALID_PORT);
@@ -300,20 +308,13 @@ LstnInit(nsd_t *pNsd, uchar *pLstnPort)
/* Count max number of sockets we may open */
for(maxs = 0, r = res; r != NULL ; r = r->ai_next, maxs++)
/* EMPTY */;
- pThis->socks = malloc((maxs+1) * sizeof(int));
- if (pThis->socks == NULL) {
- dbgprintf("couldn't allocate memory for TCP listen sockets, suspending RELP message reception.");
- freeaddrinfo(res);
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
- *pThis->socks = 0; /* num of sockets counter at start of array */
- s = pThis->socks + 1;
+ numSocks = 0; /* num of sockets counter at start of array */
for(r = res; r != NULL ; r = r->ai_next) {
- *s = socket(r->ai_family, r->ai_socktype, r->ai_protocol);
- if (*s < 0) {
+ sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol);
+ if(sock < 0) {
if(!(r->ai_family == PF_INET6 && errno == EAFNOSUPPORT))
- dbgprintf("creating tcp listen socket");
+ dbgprintf("error %d creating tcp listen socket", errno);
/* it is debatable if PF_INET with EAFNOSUPPORT should
* also be ignored...
*/
@@ -321,35 +322,32 @@ LstnInit(nsd_t *pNsd, uchar *pLstnPort)
}
#ifdef IPV6_V6ONLY
- if (r->ai_family == AF_INET6) {
+ if(r->ai_family == AF_INET6) {
int iOn = 1;
- if (setsockopt(*s, IPPROTO_IPV6, IPV6_V6ONLY,
+ if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
(char *)&iOn, sizeof (iOn)) < 0) {
- close(*s);
- *s = -1;
- continue;
+ close(sock);
+ continue;
}
}
#endif
- if(setsockopt(*s, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) {
+ if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) {
dbgprintf("error %d setting tcp socket option\n", errno);
- close(*s);
- *s = -1;
+ close(sock);
continue;
}
/* We use non-blocking IO! */
- if((sockflags = fcntl(*s, F_GETFL)) != -1) {
+ if((sockflags = fcntl(sock, F_GETFL)) != -1) {
sockflags |= O_NONBLOCK;
/* SETFL could fail too, so get it caught by the subsequent
* error check.
*/
- sockflags = fcntl(*s, F_SETFL, sockflags);
+ sockflags = fcntl(sock, F_SETFL, sockflags);
}
if(sockflags == -1) {
dbgprintf("error %d setting fcntl(O_NONBLOCK) on tcp socket", errno);
- close(*s);
- *s = -1;
+ close(sock);
continue;
}
@@ -360,62 +358,75 @@ LstnInit(nsd_t *pNsd, uchar *pLstnPort)
*/
#ifndef BSD
if(net.should_use_so_bsdcompat()) {
- if (setsockopt(*s, SOL_SOCKET, SO_BSDCOMPAT,
+ if (setsockopt(sock, SOL_SOCKET, SO_BSDCOMPAT,
(char *) &on, sizeof(on)) < 0) {
errmsg.LogError(NO_ERRCODE, "TCP setsockopt(BSDCOMPAT)");
- close(*s);
- *s = -1;
+ close(sock);
continue;
}
}
#endif
- if( (bind(*s, r->ai_addr, r->ai_addrlen) < 0)
+ if( (bind(sock, r->ai_addr, r->ai_addrlen) < 0)
#ifndef IPV6_V6ONLY
&& (errno != EADDRINUSE)
#endif
) {
+ /* TODO: check if *we* bound the socket - else we *have* an error! */
dbgprintf("error %d while binding tcp socket", errno);
- close(*s);
- *s = -1;
+ close(sock);
continue;
}
- if(listen(*s,pThis->iSessMax / 10 + 5) < 0) {
+ if(listen(sock, iSessMax / 10 + 5) < 0) {
/* If the listen fails, it most probably fails because we ask
* for a too-large backlog. So in this case we first set back
* to a fixed, reasonable, limit that should work. Only if
* that fails, too, we give up.
*/
dbgprintf("listen with a backlog of %d failed - retrying with default of 32.",
- pThis->iSessMax / 10 + 5);
- if(listen(*s, 32) < 0) {
+ iSessMax / 10 + 5);
+ if(listen(sock, 32) < 0) {
dbgprintf("tcp listen error %d, suspending\n", errno);
- close(*s);
- *s = -1;
+ close(sock);
continue;
}
}
- (*pThis->socks)++;
- s++;
+ /* if we reach this point, we were able to obtain a valid socket, so we can
+ * construct a new netstrm obj and hand it over to the upper layers for inclusion
+ * into their socket array. -- rgerhards, 2008-04-23
+ */
+ CHKiRet(pNS->Drvr.Construct(&pNewNsd));
+ ((nsd_ptcp_t*)pNewNsd)->sock = sock;
+ CHKiRet(netstrms.CreateStrm(pNS, &pNewStrm));
+ pNewStrm->pDrvrData = (nsd_t*) pNewNsd;
+ CHKiRet(fAddLstn(pUsr, pNewStrm));
+ pNewNsd = NULL;
+ pNewStrm = NULL;
+ ++numSocks;
}
- if(res != NULL)
- freeaddrinfo(res);
-
- if(*pThis->socks != maxs)
- dbgprintf("We could initialize %d RELP TCP listen sockets out of %d we received "
- "- this may or may not be an error indication.\n", *pThis->socks, maxs);
+ if(numSocks != maxs)
+ dbgprintf("We could initialize %d TCP listen sockets out of %d we received "
+ "- this may or may not be an error indication.\n", numSocks, maxs);
- if(*pThis->socks == 0) {
- dbgprintf("No RELP TCP listen socket could successfully be initialized, "
- "message reception via RELP disabled.\n");
- free(pThis->socks);
+ if(numSocks == 0) {
+ dbgprintf("No TCP listen sockets could successfully be initialized");
ABORT_FINALIZE(RS_RET_COULD_NOT_BIND);
}
finalize_it:
+ if(res != NULL)
+ freeaddrinfo(res);
+
+ if(iRet != RS_RET_OK) {
+ if(pNewStrm != NULL)
+ netstrm.Destruct(&pNewStrm);
+ if(pNewNsd != NULL)
+ pNS->Drvr.Destruct(&pNewNsd);
+ }
+
RETiRet;
}
@@ -514,16 +525,52 @@ finalize_it:
freeaddrinfo(res);
if(iRet != RS_RET_OK) {
- if(pThis->sock != -1) {
- close(pThis->sock);
- pThis->sock = -1;
- }
+ sockClose(&pThis->sock);
}
RETiRet;
}
+/* get the remote hostname. The returned hostname must be freed by the
+ * caller.
+ * rgerhards, 2008-04-24
+ */
+static rsRetVal
+GetRemoteHName(nsd_t *pNsd, uchar **ppszHName)
+{
+ DEFiRet;
+ nsd_ptcp_t *pThis = (nsd_ptcp_t*) pNsd;
+ ISOBJ_TYPE_assert(pThis, nsd_ptcp);
+ assert(ppszHName != NULL);
+
+ // TODO: how can the RemHost be empty?
+ CHKmalloc(*ppszHName = (uchar*)strdup(pThis->pRemHostName == NULL ? "" : (char*) pThis->pRemHostName));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* get the remote host's IP address. The returned string must be freed by the
+ * caller.
+ * rgerhards, 2008-04-24
+ */
+static rsRetVal
+GetRemoteIP(nsd_t *pNsd, uchar **ppszIP)
+{
+ DEFiRet;
+ nsd_ptcp_t *pThis = (nsd_ptcp_t*) pNsd;
+ ISOBJ_TYPE_assert(pThis, nsd_ptcp);
+ assert(ppszIP != NULL);
+
+ CHKmalloc(*ppszIP = (uchar*)strdup(pThis->pRemHostIP == NULL ? "" : (char*) pThis->pRemHostIP));
+
+finalize_it:
+ RETiRet;
+}
+
+
/* queryInterface function */
BEGINobjQueryInterface(nsd_ptcp)
CODESTARTobjQueryInterface(nsd_ptcp)
@@ -539,12 +586,13 @@ CODESTARTobjQueryInterface(nsd_ptcp)
pIf->Construct = (rsRetVal(*)(nsd_t**)) nsd_ptcpConstruct;
pIf->Destruct = (rsRetVal(*)(nsd_t**)) nsd_ptcpDestruct;
pIf->Abort = Abort;
- pIf->GetSock = GetSock;
- pIf->LstnInit = LstnInit;
- pIf->AcceptConnReq = AcceptConnReq;
pIf->Rcv = Rcv;
pIf->Send = Send;
+ pIf->LstnInit = LstnInit;
+ pIf->AcceptConnReq = AcceptConnReq;
pIf->Connect = Connect;
+ pIf->GetRemoteHName = GetRemoteHName;
+ pIf->GetRemoteIP = GetRemoteIP;
finalize_it:
ENDobjQueryInterface(nsd_ptcp)
@@ -557,6 +605,8 @@ CODESTARTObjClassExit(nsd_ptcp)
objRelease(net, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
+ objRelease(netstrm, LM_NETSTRM_FILENAME);
+ objRelease(netstrms, LM_NETSTRMS_FILENAME);
ENDObjClassExit(nsd_ptcp)
@@ -569,6 +619,8 @@ BEGINObjClassInit(nsd_ptcp, 1, OBJ_IS_LOADABLE_MODULE) /* class, version */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(net, CORE_COMPONENT));
+ CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME));
+ CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
/* set our own handlers */
ENDObjClassInit(nsd_ptcp)
diff --git a/runtime/nsd_ptcp.h b/runtime/nsd_ptcp.h
index 36725799..efd3ed05 100644
--- a/runtime/nsd_ptcp.h
+++ b/runtime/nsd_ptcp.h
@@ -33,8 +33,6 @@ struct nsd_ptcp_s {
uchar *pRemHostIP; /**< IP address of remote peer (currently used in server mode, only) */
uchar *pRemHostName; /**< host name of remote peer (currently used in server mode, only) */
int sock; /**< the socket we use for regular, single-socket, operations */
- int *socks; /**< the socket(s) we use for listeners, element 0 has nbr of socks */
- int iSessMax; /**< maximum number of sessions permitted */
};
/* interface is defined in nsd.h, we just implement it! */
diff --git a/runtime/nsdsel_ptcp.c b/runtime/nsdsel_ptcp.c
new file mode 100644
index 00000000..b439063a
--- /dev/null
+++ b/runtime/nsdsel_ptcp.c
@@ -0,0 +1,222 @@
+/* nsdsel_ptcp.c
+ *
+ * An implementation of the nsd select() interface for plain tcp sockets.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#include "config.h"
+
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/select.h>
+
+#include "rsyslog.h"
+#include "module-template.h"
+#include "obj.h"
+#include "errmsg.h"
+#include "nsd_ptcp.h"
+#include "nsdsel_ptcp.h"
+
+MODULE_TYPE_LIB
+
+/* static data */
+DEFobjStaticHelpers
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+
+
+/* Standard-Constructor
+ */
+BEGINobjConstruct(nsdsel_ptcp) /* be sure to specify the object type also in END macro! */
+ pThis->maxfds = 0;
+ FD_ZERO(&pThis->readfds);
+ FD_ZERO(&pThis->writefds);
+ENDobjConstruct(nsdsel_ptcp)
+
+
+/* destructor for the nsdsel_ptcp object */
+BEGINobjDestruct(nsdsel_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(nsdsel_ptcp)
+ENDobjDestruct(nsdsel_ptcp)
+
+
+/* Add a socket to the select set */
+static rsRetVal
+Add(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp)
+{
+ DEFiRet;
+ nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
+ nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
+
+ ISOBJ_TYPE_assert(pSock, nsd_ptcp);
+ ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
+
+ switch(waitOp) {
+ case NSDSEL_RD:
+ FD_SET(pSock->sock, &pThis->readfds);
+ break;
+ case NSDSEL_WR:
+ FD_SET(pSock->sock, &pThis->writefds);
+ break;
+ case NSDSEL_RDWR:
+ FD_SET(pSock->sock, &pThis->readfds);
+ FD_SET(pSock->sock, &pThis->writefds);
+ break;
+ }
+
+ if(pSock->sock > pThis->maxfds)
+ pThis->maxfds = pSock->sock;
+
+ RETiRet;
+}
+
+
+/* perform the select() piNumReady returns how many descriptors are ready for IO
+ * TODO: add timeout!
+ */
+static rsRetVal
+Select(nsdsel_t *pNsdsel, int *piNumReady)
+{
+ DEFiRet;
+ int i;
+ nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
+
+ ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
+ assert(piNumReady != NULL);
+
+ if(Debug) { // TODO: debug setting!
+ // TODO: name in dbgprintf!
+ dbgprintf("--------<NSDSEL_PTCP> calling select, active fds (max %d): ", pThis->maxfds);
+ for(i = 0; i <= pThis->maxfds; ++i)
+ if(FD_ISSET(i, &pThis->readfds) || FD_ISSET(i, &pThis->writefds))
+ dbgprintf("%d ", i);
+ dbgprintf("\n");
+ }
+
+ /* now do the select */
+ *piNumReady = select(pThis->maxfds+1, &pThis->readfds, &pThis->writefds, NULL, NULL);
+
+ RETiRet;
+}
+
+
+/* check if a socket is ready for IO */
+static rsRetVal
+IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady)
+{
+ DEFiRet;
+ nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel;
+ nsd_ptcp_t *pSock = (nsd_ptcp_t*) pNsd;
+
+ ISOBJ_TYPE_assert(pThis, nsdsel_ptcp);
+ ISOBJ_TYPE_assert(pSock, nsd_ptcp);
+ assert(pbIsReady != NULL);
+
+ switch(waitOp) {
+ case NSDSEL_RD:
+ *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds);
+ break;
+ case NSDSEL_WR:
+ *pbIsReady = FD_ISSET(pSock->sock, &pThis->writefds);
+ break;
+ case NSDSEL_RDWR:
+ *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds)
+ | FD_ISSET(pSock->sock, &pThis->writefds);
+ break;
+ }
+
+ RETiRet;
+}
+
+
+/* ------------------------------ end support for the select() interface ------------------------------ */
+
+
+/* queryInterface function */
+BEGINobjQueryInterface(nsdsel_ptcp)
+CODESTARTobjQueryInterface(nsdsel_ptcp)
+ if(pIf->ifVersion != nsdCURR_IF_VERSION) {/* check for current version, increment on each change */
+ ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
+ }
+
+ /* ok, we have the right interface, so let's fill it
+ * Please note that we may also do some backwards-compatibility
+ * work here (if we can support an older interface version - that,
+ * of course, also affects the "if" above).
+ */
+ pIf->Construct = (rsRetVal(*)(nsdsel_t**)) nsdsel_ptcpConstruct;
+ pIf->Destruct = (rsRetVal(*)(nsdsel_t**)) nsdsel_ptcpDestruct;
+ pIf->Add = Add;
+ pIf->Select = Select;
+ pIf->IsReady = IsReady;
+finalize_it:
+ENDobjQueryInterface(nsdsel_ptcp)
+
+
+/* exit our class
+ */
+BEGINObjClassExit(nsdsel_ptcp, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nsdsel_ptcp)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(errmsg, CORE_COMPONENT);
+ENDObjClassExit(nsdsel_ptcp)
+
+
+/* Initialize the nsdsel_ptcp class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2008-02-19
+ */
+BEGINObjClassInit(nsdsel_ptcp, 1, OBJ_IS_LOADABLE_MODULE) /* class, version */
+ /* request objects we use */
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+
+ /* set our own handlers */
+ENDObjClassInit(nsdsel_ptcp)
+
+
+/* --------------- here now comes the plumbing that makes as a library module --------------- */
+
+
+BEGINmodExit
+CODESTARTmodExit
+ nsdsel_ptcpClassExit();
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_LIB_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+
+ /* Initialize all classes that are in our module - this includes ourselfs */
+ CHKiRet(nsdsel_ptcpClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+ENDmodInit
+/* vi:set ai:
+ */
diff --git a/runtime/nsdsel_ptcp.h b/runtime/nsdsel_ptcp.h
new file mode 100644
index 00000000..39294d7d
--- /dev/null
+++ b/runtime/nsdsel_ptcp.h
@@ -0,0 +1,46 @@
+/* An implementation of the nsd select interface for plain tcp sockets.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+
+#ifndef INCLUDED_NSDSEL_PTCP_H
+#define INCLUDED_NSDSEL_PTCP_H
+
+#include "nsd.h"
+typedef nsdsel_if_t nsdsel_ptcp_if_t; /* we just *implement* this interface */
+
+/* the nsdsel_ptcp object */
+struct nsdsel_ptcp_s {
+ BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
+ int maxfds;
+ fd_set readfds;
+ fd_set writefds;
+};
+
+/* interface is defined in nsd.h, we just implement it! */
+
+/* prototypes */
+PROTOTYPEObj(nsdsel_ptcp);
+
+/* the name of our library binary */
+#define LM_NSDSEL_PTCP_FILENAME "lmnsdsel_ptcp"
+
+#endif /* #ifndef INCLUDED_NSDSEL_PTCP_H */
diff --git a/runtime/nssel.c b/runtime/nssel.c
new file mode 100644
index 00000000..7cb63e98
--- /dev/null
+++ b/runtime/nssel.c
@@ -0,0 +1,229 @@
+/* nssel.c
+ *
+ * The io waiter is a helper object enabling us to wait on a set of streams to become
+ * ready for IO - this is modelled after select(). We need this, because
+ * stream drivers may have different concepts. Consequently,
+ * the structure must contain nsd_t's from the same stream driver type
+ * only. This is implemented as a singly-linked list where every
+ * new element is added at the top of the list.
+ *
+ * Work on this module begun 2008-04-22 by Rainer Gerhards.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+#include "config.h"
+
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+
+#include "rsyslog.h"
+#include "obj.h"
+#include "module-template.h"
+#include "netstrm.h"
+#include "nssel.h"
+
+MODULE_TYPE_LIB
+
+/* static data */
+DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
+
+
+/* load our low-level driver. This must be done before any
+ * driver-specific functions (allmost all...) can be carried
+ * out. Note that the driver's .ifIsLoaded is correctly
+ * initialized by calloc() and we depend on that.
+ * rgerhards, 2008-04-18
+ */
+static rsRetVal
+loadDrvr(nssel_t *pThis)
+{
+ uchar *pDrvrName;
+ DEFiRet;
+
+ pDrvrName = pThis->pDrvrName;
+ if(pDrvrName == NULL) /* if no drvr name is set, use system default */
+ pDrvrName = glbl.GetDfltNetstrmDrvr();
+
+ pThis->Drvr.ifVersion = nsdCURR_IF_VERSION;
+ /* The pDrvrName+2 below is a hack to obtain the object name. It
+ * safes us to have yet another variable with the name without "lm" in
+ * front of it. If we change the module load interface, we may re-think
+ * about this hack, but for the time being it is efficient and clean
+ * enough. -- rgerhards, 2008-04-18
+ */
+ //CHKiRet(obj.UseObj(__FILE__, pDrvrName+2, pDrvrName, (void*) &pThis->Drvr));
+ CHKiRet(obj.UseObj(__FILE__, "nsdsel_ptcp", "lmnsdsel_ptcp", (void*) &pThis->Drvr));
+finalize_it:
+ RETiRet;
+}
+
+
+/* Standard-Constructor */
+BEGINobjConstruct(nssel) /* be sure to specify the object type also in END macro! */
+ENDobjConstruct(nssel)
+
+
+/* destructor for the nssel object */
+BEGINobjDestruct(nssel) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(nssel)
+ if(pThis->pDrvrData != NULL)
+ pThis->Drvr.Destruct(&pThis->pDrvrData);
+ENDobjDestruct(nssel)
+
+
+/* ConstructionFinalizer */
+static rsRetVal
+ConstructFinalize(nssel_t *pThis)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, nssel);
+ CHKiRet(loadDrvr(pThis));
+ CHKiRet(pThis->Drvr.Construct(&pThis->pDrvrData));
+finalize_it:
+ RETiRet;
+}
+
+
+/* Add a stream object to the current select() set.
+ * Note that a single stream may have multiple "sockets" if
+ * it is a listener. If so, all of them are begin added.
+ */
+static rsRetVal
+Add(nssel_t *pThis, netstrm_t *pStrm, nsdsel_waitOp_t waitOp)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, nssel);
+ ISOBJ_TYPE_assert(pStrm, netstrm);
+
+ CHKiRet(pThis->Drvr.Add(pThis->pDrvrData, pStrm->pDrvrData, waitOp));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* wait for IO to happen on one of our netstreams. iNumReady has
+ * the number of ready "sockets" after the call. This function blocks
+ * until some are ready. EAGAIN is retried.
+ */
+static rsRetVal
+Wait(nssel_t *pThis, int *piNumReady)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, nssel);
+ assert(piNumReady != NULL);
+ iRet = pThis->Drvr.Select(pThis->pDrvrData, piNumReady);
+ RETiRet;
+}
+
+
+/* Check if a stream is ready for IO. *piNumReady contains the remaining number
+ * of ready streams. Note that this function may say the stream is not ready
+ * but still decrement *piNumReady. This can happen when (e.g. with TLS) the low
+ * level driver requires some IO which is hidden from the upper layer point of view.
+ * rgerhards, 2008-04-23
+ */
+static rsRetVal
+IsReady(nssel_t *pThis, netstrm_t *pStrm, nsdsel_waitOp_t waitOp, int *pbIsReady, int *piNumReady)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, nssel);
+ ISOBJ_TYPE_assert(pStrm, netstrm);
+ assert(pbIsReady != NULL);
+ assert(piNumReady != NULL);
+ iRet = pThis->Drvr.IsReady(pThis->pDrvrData, pStrm->pDrvrData, waitOp, pbIsReady);
+ RETiRet;
+}
+
+
+/* queryInterface function */
+BEGINobjQueryInterface(nssel)
+CODESTARTobjQueryInterface(nssel)
+ if(pIf->ifVersion != nsselCURR_IF_VERSION) {/* check for current version, increment on each change */
+ ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
+ }
+
+ /* ok, we have the right interface, so let's fill it
+ * Please note that we may also do some backwards-compatibility
+ * work here (if we can support an older interface version - that,
+ * of course, also affects the "if" above).
+ */
+ pIf->Construct = nsselConstruct;
+ pIf->ConstructFinalize = ConstructFinalize;
+ pIf->Destruct = nsselDestruct;
+ pIf->Add = Add;
+ pIf->Wait = Wait;
+ pIf->IsReady = IsReady;
+finalize_it:
+ENDobjQueryInterface(nssel)
+
+
+/* exit our class
+ */
+BEGINObjClassExit(nssel, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nssel)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ENDObjClassExit(nssel)
+
+
+/* Initialize the nssel class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2008-02-19
+ */
+BEGINObjClassInit(nssel, 1, OBJ_IS_CORE_MODULE) /* class, version */
+ /* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+
+ /* set our own handlers */
+ENDObjClassInit(nssel)
+
+
+/* --------------- here now comes the plumbing that makes us a library module --------------- */
+
+
+BEGINmodExit
+CODESTARTmodExit
+ nsselClassExit();
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_LIB_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+
+ /* Initialize all classes that are in our module - this includes ourselfs */
+ CHKiRet(nsselClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+ENDmodInit
+/* vi:set ai:
+ */
diff --git a/runtime/nssel.h b/runtime/nssel.h
new file mode 100644
index 00000000..2f907caa
--- /dev/null
+++ b/runtime/nssel.h
@@ -0,0 +1,55 @@
+/* Definitions for the nssel IO waiter.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+
+#ifndef INCLUDED_NSSEL_H
+#define INCLUDED_NSSEL_H
+
+#include "nsd.h"
+
+/* the nssel object */
+struct nssel_s {
+ BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
+ nsd_t *pDrvrData; /**< the driver's data elements */
+ uchar *pDrvrName; /**< nsd driver name to use, or NULL if system default */
+ nsdsel_if_t Drvr; /**< our stream driver */
+};
+
+
+/* interface */
+BEGINinterface(nssel) /* name must also be changed in ENDinterface macro! */
+ rsRetVal (*Construct)(nssel_t **ppThis);
+ rsRetVal (*ConstructFinalize)(nssel_t *pThis);
+ rsRetVal (*Destruct)(nssel_t **ppThis);
+ rsRetVal (*Add)(nssel_t *pThis, netstrm_t *pStrm, nsdsel_waitOp_t waitOp);
+ rsRetVal (*Wait)(nssel_t *pThis, int *pNumReady);
+ rsRetVal (*IsReady)(nssel_t *pThis, netstrm_t *pStrm, nsdsel_waitOp_t waitOp, int *pbIsReady, int *piNumReady);
+ENDinterface(nssel)
+#define nsselCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+
+/* prototypes */
+PROTOTYPEObj(nssel);
+
+/* the name of our library binary */
+#define LM_NSSEL_FILENAME "lmnssel"
+
+#endif /* #ifndef INCLUDED_NSSEL_H */
diff --git a/runtime/obj-types.h b/runtime/obj-types.h
index acdc757d..2d0e0f14 100644
--- a/runtime/obj-types.h
+++ b/runtime/obj-types.h
@@ -106,8 +106,12 @@ struct obj_s { /* the dummy struct that each derived class can be casted to */
do { \
ASSERT(pObj != NULL); \
ASSERT((unsigned) ((obj_t*) (pObj))->iObjCooCKiE == (unsigned) 0xBADEFEE); \
- ASSERT(!strcmp((char*)(((obj_t*)pObj)->pObjInfo->pszID), #objType)); \
- } while(0);
+ if(strcmp((char*)(((obj_t*)pObj)->pObjInfo->pszID), #objType)) { \
+ dbgprintf("ISOBJ assert failure: invalid object type, expected '%s' " \
+ "actual '%s'\n", #objType, (((obj_t*)pObj)->pObjInfo->pszID)); \
+ assert(0); /* trigger assertion, messge we already have */ \
+ } \
+ } while(0)
#else /* non-debug mode, no checks but much faster */
# define BEGINobjInstance obj_t objData
# define ISOBJ_TYPE_assert(pObj, objType)
diff --git a/runtime/obj.c b/runtime/obj.c
index 8ab606f9..18a4a726 100644
--- a/runtime/obj.c
+++ b/runtime/obj.c
@@ -1198,7 +1198,7 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
FINALIZE; /* if it is not a lodable module, we do not need to do anything... */
if(pIf->ifIsLoaded == 0) {
- ABORT_FINALIZE(RS_RET_OK); /* we are already set */ /* TODO: flag an error? */
+ ABORT_FINALIZE(RS_RET_OK); /* we are not loaded - this is perfectly OK... */
}
if(pIf->ifIsLoaded == 2) {
pIf->ifIsLoaded = 0; /* clean up */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 8da59089..f59c38bf 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -64,17 +64,26 @@ typedef struct thrdInfo thrdInfo_t;
typedef struct obj_s obj_t;
typedef struct filed selector_t;/* TODO: this so far resides in syslogd.c, think about modularization */
typedef struct NetAddr netAddr_t;
+typedef struct netstrms_s netstrms_t;
typedef struct netstrm_s netstrm_t;
+typedef struct nssel_s nssel_t;
+typedef enum nsdsel_waitOp_e nsdsel_waitOp_t;
typedef struct nsd_ptcp_s nsd_ptcp_t;
+<<<<<<< HEAD:runtime/rsyslog.h
typedef struct nsd_gtls_s nsd_gtls_t;
typedef struct nsd_gsspi_s nsd_gsspi_t;
typedef struct nsd_nss_s nsd_nss_t;
+=======
+typedef struct nsdsel_ptcp_s nsdsel_ptcp_t;
+>>>>>>> a7040a9623e228043209da897dbf30b9ab02d771:runtime/rsyslog.h
typedef obj_t nsd_t;
+typedef obj_t nsdsel_t;
typedef struct msg msg_t;
typedef struct interface_s interface_t;
typedef struct objInfo_s objInfo_t;
typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */
typedef rsRetVal (*errLogFunc_t)(uchar*); /* this is a trick to store a function ptr to a function returning a function ptr... */
+typedef struct tcpsrv_s tcpsrv_t;
/* some universal 64 bit define... */
typedef long long int64;
@@ -210,6 +219,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_INVALID_PORT = -2076, /**< invalid port value */
RS_RET_COULD_NOT_BIND = -2077, /**< could not bind socket, defunct */
RS_RET_GNUTLS_ERR = -2078, /**< (unexpected) error in GnuTLS call */
+ RS_RET_MAX_SESS_REACHED = -2079, /**< max nbr of sessions reached, can not create more */
+ RS_RET_MAX_LSTN_REACHED = -2080, /**< max nbr of listeners reached, can not create more */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/tcps_sess.c b/tcps_sess.c
index b5c9c31f..33c13aa0 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -27,22 +27,12 @@
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
-
#include "config.h"
#include <stdlib.h>
#include <assert.h>
-#include <string.h>
#include <errno.h>
-#include <unistd.h>
-#include <stdarg.h>
#include <ctype.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#if HAVE_FCNTL_H
-#include <fcntl.h>
-#endif
+
#include "rsyslog.h"
#include "dirty.h"
#include "module-template.h"
@@ -51,11 +41,13 @@
#include "tcps_sess.h"
#include "obj.h"
#include "errmsg.h"
+#include "netstrm.h"
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
+DEFobjCurrIf(netstrm)
/* forward definitions */
static rsRetVal Close(tcps_sess_t *pThis);
@@ -64,7 +56,6 @@ static rsRetVal Close(tcps_sess_t *pThis);
/* Standard-Constructor
*/
BEGINobjConstruct(tcps_sess) /* be sure to specify the object type also in END macro! */
- pThis->sock = -1; /* no sock */
pThis->iMsg = 0; /* just make sure... */
pThis->bAtStrtOfFram = 1; /* indicate frame header expected */
pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; /* just make sure... */
@@ -90,8 +81,8 @@ finalize_it:
/* destructor for the tcps_sess object */
BEGINobjDestruct(tcps_sess) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(tcps_sess)
- if(pThis->sock != -1)
- Close(pThis);
+ if(pThis->pStrm != NULL)
+ netstrm.Destruct(&pThis->pStrm);
if(pThis->pSrv->pOnSessDestruct != NULL) {
pThis->pSrv->pOnSessDestruct(&pThis->pUsr);
@@ -99,7 +90,6 @@ CODESTARTobjDestruct(tcps_sess)
/* now destruct our own properties */
if(pThis->fromHost != NULL)
free(pThis->fromHost);
- close(pThis->sock);
ENDobjDestruct(tcps_sess)
@@ -110,6 +100,10 @@ ENDobjDebugPrint(tcps_sess)
/* set property functions */
+/* set the hostname. Note that the caller *hands over* the string. That is,
+ * the caller no longer controls it once SetHost() has received it. Most importantly,
+ * the caller must not free it. -- gerhards, 2008-04-24
+ */
static rsRetVal
SetHost(tcps_sess_t *pThis, uchar *pszHost)
{
@@ -119,25 +113,24 @@ SetHost(tcps_sess_t *pThis, uchar *pszHost)
if(pThis->fromHost != NULL) {
free(pThis->fromHost);
- pThis->fromHost = NULL;
}
- if((pThis->fromHost = strdup((char*)pszHost)) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ pThis->fromHost = pszHost;
finalize_it:
RETiRet;
}
static rsRetVal
-SetSock(tcps_sess_t *pThis, int sock)
+SetStrm(tcps_sess_t *pThis, netstrm_t *pStrm)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcps_sess);
- pThis->sock = sock;
+ pThis->pStrm = pStrm;
RETiRet;
}
+
static rsRetVal
SetMsgIdx(tcps_sess_t *pThis, int idx)
{
@@ -200,9 +193,8 @@ PrepareClose(tcps_sess_t *pThis)
/* In this case, we have an invalid frame count and thus
* generate an error message and discard the frame.
*/
- errmsg.LogError(NO_ERRCODE, "Incomplete frame at end of stream in session %d - "
- "ignoring extra data (a message may be lost).\n",
- pThis->sock);
+ errmsg.LogError(NO_ERRCODE, "Incomplete frame at end of stream in session %p - "
+ "ignoring extra data (a message may be lost).\n", pThis->pStrm);
/* nothing more to do */
} else { /* here, we have traditional framing. Missing LF at the end
* of message may occur. As such, we process the message in
@@ -228,8 +220,7 @@ Close(tcps_sess_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcps_sess);
- close(pThis->sock);
- pThis->sock = -1;
+ netstrm.Destruct(&pThis->pStrm);
free(pThis->fromHost);
pThis->fromHost = NULL; /* not really needed, but... */
@@ -403,7 +394,7 @@ CODESTARTobjQueryInterface(tcps_sess)
pIf->SetUsrP = SetUsrP;
pIf->SetTcpsrv = SetTcpsrv;
pIf->SetHost = SetHost;
- pIf->SetSock = SetSock;
+ pIf->SetStrm = SetStrm;
pIf->SetMsgIdx = SetMsgIdx;
finalize_it:
ENDobjQueryInterface(tcps_sess)
@@ -416,6 +407,7 @@ BEGINObjClassExit(tcps_sess, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END
CODESTARTObjClassExit(tcps_sess)
/* release objects we no longer need */
objRelease(errmsg, CORE_COMPONENT);
+ objRelease(netstrm, LM_NETSTRM_FILENAME);
ENDObjClassExit(tcps_sess)
@@ -426,6 +418,7 @@ ENDObjClassExit(tcps_sess)
BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE class also in END MACRO! */
/* request objects we use */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME));
/* set our own handlers */
OBJSetMethodHandler(objMethod_DEBUGPRINT, tcps_sessDebugPrint);
diff --git a/tcps_sess.h b/tcps_sess.h
index 1d45c482..9f3d10d6 100644
--- a/tcps_sess.h
+++ b/tcps_sess.h
@@ -32,8 +32,8 @@ struct tcpsrv_s;
typedef struct tcps_sess_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
struct tcpsrv_s *pSrv; /* pointer back to my server (e.g. for callbacks) */
- int sock;
- int iMsg; /* index of next char to store in msg */
+ netstrm_t *pStrm;
+ int iMsg; /* index of next char to store in msg */
int bAtStrtOfFram; /* are we at the very beginning of a new frame? */
enum {
eAtStrtFram,
@@ -43,7 +43,7 @@ typedef struct tcps_sess_s {
int iOctetsRemain; /* Number of Octets remaining in message */
TCPFRAMINGMODE eFraming;
char msg[MAXLINE+1];
- char *fromHost;
+ uchar *fromHost;
void *pUsr; /* a user-pointer */
} tcps_sess_t;
@@ -61,7 +61,7 @@ BEGINinterface(tcps_sess) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetTcpsrv)(tcps_sess_t *pThis, struct tcpsrv_s *pSrv);
rsRetVal (*SetUsrP)(tcps_sess_t*, void*);
rsRetVal (*SetHost)(tcps_sess_t *pThis, uchar*);
- rsRetVal (*SetSock)(tcps_sess_t *pThis, int);
+ rsRetVal (*SetStrm)(tcps_sess_t *pThis, netstrm_t*);
rsRetVal (*SetMsgIdx)(tcps_sess_t *pThis, int);
ENDinterface(tcps_sess)
#define tcps_sessCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
diff --git a/tcpsrv.c b/tcpsrv.c
index 96048e31..086d17b8 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -63,12 +63,16 @@
#include "tcpsrv.h"
#include "obj.h"
#include "glbl.h"
+#include "netstrms.h"
+#include "netstrm.h"
+#include "nssel.h"
#include "errmsg.h"
MODULE_TYPE_LIB
/* defines */
#define TCPSESS_MAX_DEFAULT 200 /* default for nbr of tcp sessions if no number is given */
+#define TCPLSTN_MAX_DEFAULT 20 /* default for nbr of listeners */
/* static data */
DEFobjStaticHelpers
@@ -77,30 +81,9 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(tcps_sess)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(net)
-
-
-
-/* code to free all sockets within a socket table.
- * A socket table is a descriptor table where the zero
- * element has the count of elements. This is used for
- * listening sockets. The socket table itself is also
- * freed.
- * A POINTER to this structure must be provided, thus
- * double indirection!
- * rgerhards, 2007-06-28
- */
-static void freeAllSockets(int **socks)
-{
- assert(socks != NULL);
- assert(*socks != NULL);
- while(**socks) {
- dbgprintf("Closing socket %d.\n", (*socks)[**socks]);
- close((*socks)[**socks]);
- (**socks)--;
- }
- free(*socks);
- *socks = NULL;
-}
+DEFobjCurrIf(netstrms)
+DEFobjCurrIf(netstrm)
+DEFobjCurrIf(nssel)
/* configure TCP listener settings. This is called during command
@@ -199,9 +182,10 @@ TCPSessGetNxtSess(tcpsrv_t *pThis, int iCurr)
register int i;
ISOBJ_TYPE_assert(pThis, tcpsrv);
- for(i = iCurr + 1 ; i < pThis->iSessMax ; ++i)
+ for(i = iCurr + 1 ; i < pThis->iSessMax ; ++i) {
if(pThis->pSessions[i] != NULL)
break;
+ }
return((i < pThis->iSessMax) ? i : -1);
}
@@ -215,17 +199,17 @@ TCPSessGetNxtSess(tcpsrv_t *pThis, int iCurr)
*/
static void deinit_tcp_listener(tcpsrv_t *pThis)
{
- int iTCPSess;
+ int i;
ISOBJ_TYPE_assert(pThis, tcpsrv);
assert(pThis->pSessions != NULL);
/* close all TCP connections! */
- iTCPSess = TCPSessGetNxtSess(pThis, -1);
- while(iTCPSess != -1) {
- tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
+ i = TCPSessGetNxtSess(pThis, -1);
+ while(i != -1) {
+ tcps_sess.Destruct(&pThis->pSessions[i]);
/* now get next... */
- iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
+ i = TCPSessGetNxtSess(pThis, i);
}
/* we are done with the session table - so get rid of it...
@@ -236,33 +220,50 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
if(pThis->TCPLstnPort != NULL)
free(pThis->TCPLstnPort);
- /* finally close the listen sockets themselfs */
- freeAllSockets(&pThis->pSocksLstn);
+ /* finally close our listen streams */
+ for(i = 0 ; i < pThis->iLstnMax ; ++i) {
+ netstrm.Destruct(pThis->ppLstn + i);
+ }
}
-/* Initialize TCP sockets (for listener)
- * This function returns either NULL (which means it failed) or
- * a pointer to an array of file descriptiors. If the pointer is
- * returned, the zeroest element [0] contains the count of valid
- * descriptors. The descriptors themself follow in range
- * [1] ... [num-descriptors]. It is guaranteed that each of these
- * descriptors is valid, at least when this function returns.
- * Please note that technically the array may be larger than the number
- * of valid pointers stored in it. The memory overhead is minimal, so
- * we do not bother to re-allocate an array of the exact size. Logically,
- * the array still contains the exactly correct number of descriptors.
+/* add a listen socket to our listen socket array. This is a callback
+ * invoked from the netstrm class. -- rgerhards, 2008-04-23
*/
-static int *create_tcp_socket(tcpsrv_t *pThis)
+static rsRetVal
+addTcpLstn(void *pUsr, netstrm_t *pLstn)
{
- struct addrinfo hints, *res, *r;
- int error, maxs, *s, *socks, on = 1;
- char *TCPLstnPort;
+ tcpsrv_t *pThis = (tcpsrv_t*) pUsr;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
+ ISOBJ_TYPE_assert(pLstn, netstrm);
+
+ if(pThis->iLstnMax >= TCPLSTN_MAX_DEFAULT)
+ ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED);
- if(!strcmp(pThis->TCPLstnPort, "0"))
- TCPLstnPort = "514";
+RUNLOG_VAR("%d", pThis->iLstnMax);
+ pThis->ppLstn[pThis->iLstnMax] = pLstn;
+ ++pThis->iLstnMax;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Initialize TCP sockets (for listener) and listens on them */
+static rsRetVal
+create_tcp_socket(tcpsrv_t *pThis)
+{
+ DEFiRet;
+ uchar *TCPLstnPort;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ if(!strcmp((char*)pThis->TCPLstnPort, "0"))
+ TCPLstnPort = (uchar*)"514";
+ // TODO: we need to enable the caller to set a port (based on who is
+ // using this, 514 may be totally unsuitable... --- rgerhards, 2008-04-22
/* use default - we can not do service db update, because there is
* no IANA-assignment for syslog/tcp. In the long term, we might
* re-use RFC 3195 port of 601, but that would probably break to
@@ -270,121 +271,10 @@ static int *create_tcp_socket(tcpsrv_t *pThis)
* rgerhards, 2007-06-28
*/
else
- TCPLstnPort = pThis->TCPLstnPort;
- dbgprintf("creating tcp socket on port %s\n", TCPLstnPort);
- memset(&hints, 0, sizeof(hints));
- hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
- hints.ai_family = glbl.GetDefPFFamily();
- hints.ai_socktype = SOCK_STREAM;
-
- error = getaddrinfo(NULL, TCPLstnPort, &hints, &res);
- if(error) {
- errmsg.LogError(NO_ERRCODE, "%s", gai_strerror(error));
- return NULL;
- }
+ TCPLstnPort = (uchar*)pThis->TCPLstnPort;
- /* Count max number of sockets we may open */
- for (maxs = 0, r = res; r != NULL ; r = r->ai_next, maxs++)
- /* EMPTY */;
- socks = malloc((maxs+1) * sizeof(int));
- if (socks == NULL) {
- errmsg.LogError(NO_ERRCODE, "couldn't allocate memory for TCP listen sockets, suspending TCP message reception.");
- freeaddrinfo(res);
- return NULL;
- }
-
- *socks = 0; /* num of sockets counter at start of array */
- s = socks + 1;
- for (r = res; r != NULL ; r = r->ai_next) {
- *s = socket(r->ai_family, r->ai_socktype, r->ai_protocol);
- if (*s < 0) {
- if(!(r->ai_family == PF_INET6 && errno == EAFNOSUPPORT))
- errmsg.LogError(NO_ERRCODE, "create_tcp_socket(), socket");
- /* it is debatable if PF_INET with EAFNOSUPPORT should
- * also be ignored...
- */
- continue;
- }
-
-#ifdef IPV6_V6ONLY
- if (r->ai_family == AF_INET6) {
- int iOn = 1;
- if (setsockopt(*s, IPPROTO_IPV6, IPV6_V6ONLY,
- (char *)&iOn, sizeof (iOn)) < 0) {
- errmsg.LogError(NO_ERRCODE, "TCP setsockopt");
- close(*s);
- *s = -1;
- continue;
- }
- }
-#endif
- if (setsockopt(*s, SOL_SOCKET, SO_REUSEADDR,
- (char *) &on, sizeof(on)) < 0 ) {
- errmsg.LogError(NO_ERRCODE, "TCP setsockopt(REUSEADDR)");
- close(*s);
- *s = -1;
- continue;
- }
-
- /* We need to enable BSD compatibility. Otherwise an attacker
- * could flood our log files by sending us tons of ICMP errors.
- */
-#ifndef OS_BSD
- if(net.should_use_so_bsdcompat()) {
- if (setsockopt(*s, SOL_SOCKET, SO_BSDCOMPAT,
- (char *) &on, sizeof(on)) < 0) {
- errmsg.LogError(NO_ERRCODE, "TCP setsockopt(BSDCOMPAT)");
- close(*s);
- *s = -1;
- continue;
- }
- }
-#endif
-
- if( (bind(*s, r->ai_addr, r->ai_addrlen) < 0)
-#ifndef IPV6_V6ONLY
- && (errno != EADDRINUSE)
-#endif
- ) {
- errmsg.LogError(NO_ERRCODE, "TCP bind");
- close(*s);
- *s = -1;
- continue;
- }
-
- if( listen(*s,pThis->iSessMax / 10 + 5) < 0) {
- /* If the listen fails, it most probably fails because we ask
- * for a too-large backlog. So in this case we first set back
- * to a fixed, reasonable, limit that should work. Only if
- * that fails, too, we give up.
- */
- errmsg.LogError(NO_ERRCODE, "listen with a backlog of %d failed - retrying with default of 32.",
- pThis->iSessMax / 10 + 5);
- if(listen(*s, 32) < 0) {
- errmsg.LogError(NO_ERRCODE, "TCP listen, suspending tcp inet");
- close(*s);
- *s = -1;
- continue;
- }
- }
-
- (*socks)++;
- s++;
- }
-
- if(res != NULL)
- freeaddrinfo(res);
-
- if(Debug && *socks != maxs)
- dbgprintf("We could initialize %d TCP listen sockets out of %d we received "
- "- this may or may not be an error indication.\n", *socks, maxs);
-
- if(*socks == 0) {
- errmsg.LogError(NO_ERRCODE, "No TCP listen socket could successfully be initialized, "
- "message reception via TCP disabled.\n");
- free(socks);
- return(NULL);
- }
+ /* TODO: add capability to specify local listen address! */
+ CHKiRet(netstrm.LstnInit(pThis->pNS, (void*)pThis, addTcpLstn, TCPLstnPort, NULL, pThis->iSessMax));
/* OK, we had success. Now it is also time to
* initialize our connections
@@ -395,11 +285,11 @@ static int *create_tcp_socket(tcpsrv_t *pThis)
* we have assigned so far, because we can not really use it...
*/
errmsg.LogError(NO_ERRCODE, "Could not initialize TCP session table, suspending TCP message reception.");
- freeAllSockets(&socks); /* prevent a socket leak */
- return(NULL);
+ ABORT_FINALIZE(RS_RET_ERR);
}
- return(socks);
+finalize_it:
+ RETiRet;
}
@@ -414,34 +304,25 @@ static int *create_tcp_socket(tcpsrv_t *pThis)
* rgerhards, 2008-03-02
*/
static rsRetVal
-SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, int fd)
+SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, netstrm_t *pStrm)
{
DEFiRet;
tcps_sess_t *pSess;
- int newConn;
+ netstrm_t *pNewStrm = NULL;
int iSess = -1;
struct sockaddr_storage addr;
- socklen_t addrlen = sizeof(struct sockaddr_storage);
- uchar fromHost[NI_MAXHOST];
- uchar fromHostFQDN[NI_MAXHOST];
+ uchar *fromHostFQDN = NULL;
ISOBJ_TYPE_assert(pThis, tcpsrv);
- newConn = accept(fd, (struct sockaddr*) &addr, &addrlen);
- if (newConn < 0) {
- errmsg.LogError(NO_ERRCODE, "tcp accept, ignoring error and connection request");
- ABORT_FINALIZE(RS_RET_ERR); // TODO: better error code
- //was: return -1;
- }
+ CHKiRet(netstrm.AcceptConnReq(pStrm, &pNewStrm));
/* Add to session list */
iSess = TCPSessTblFindFreeSpot(pThis);
if(iSess == -1) {
errno = 0;
errmsg.LogError(NO_ERRCODE, "too many tcp sessions - dropping incoming request");
- close(newConn);
- ABORT_FINALIZE(RS_RET_ERR); // TODO: better error code
- //was: return -1;
+ ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
} else {
/* we found a free spot and can construct our session object */
CHKiRet(tcps_sess.Construct(&pSess));
@@ -450,14 +331,8 @@ SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, int fd)
/* OK, we have a "good" index... */
/* get the host name */
- if(net.cvthname(&addr, fromHost, fromHostFQDN) != RS_RET_OK) {
- /* we seem to have something malicous - at least we
- * are now told to discard the connection request.
- * Error message has been generated by cvthname.
- */
- close (newConn);
- ABORT_FINALIZE(RS_RET_ERR); // TODO: better error code
- }
+ CHKiRet(netstrm.GetRemoteHName(pStrm, &fromHostFQDN));
+ /* TODO: check if we need to strip the domain name here -- rgerhards, 2008-04-24 */
/* Here we check if a host is permitted to send us
* syslog messages. If it isn't, we do not further
@@ -466,21 +341,20 @@ SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, int fd)
* rgerhards, 2005-09-26
*/
if(!pThis->pIsPermittedHost((struct sockaddr*) &addr, (char*) fromHostFQDN, pThis->pUsr, pSess->pUsr)) {
- dbgprintf("%s is not an allowed sender\n", (char *) fromHostFQDN);
+ dbgprintf("%s is not an allowed sender\n", fromHostFQDN);
if(glbl.GetOption_DisallowWarning()) {
errno = 0;
- errmsg.LogError(NO_ERRCODE, "TCP message from disallowed sender %s discarded",
- (char*)fromHost);
+ errmsg.LogError(NO_ERRCODE, "TCP message from disallowed sender %s discarded", fromHostFQDN);
}
- close(newConn);
ABORT_FINALIZE(RS_RET_HOST_NOT_PERMITTED);
}
/* OK, we have an allowed sender, so let's continue, what
* means we can finally fill in the session object.
*/
- CHKiRet(tcps_sess.SetHost(pSess, fromHost));
- CHKiRet(tcps_sess.SetSock(pSess, newConn));
+ CHKiRet(tcps_sess.SetHost(pSess, fromHostFQDN));
+ CHKiRet(tcps_sess.SetStrm(pSess, pNewStrm));
+ pNewStrm = NULL; /* prevent it from being freed in error handler, now done in tcps_sess! */
CHKiRet(tcps_sess.SetMsgIdx(pSess, 0));
CHKiRet(tcps_sess.ConstructFinalize(pSess));
@@ -499,79 +373,69 @@ finalize_it:
tcps_sess.Destruct(&pThis->pSessions[iSess]);
}
iSess = -1; // TODO: change this to be fully iRet compliant ;)
+ if(pNewStrm != NULL)
+ netstrm.Destruct(&pNewStrm);
}
RETiRet;
}
-/* This function is called to gather input.
- */
+static void
+RunCancelCleanup(void *arg)
+{
+ nssel_t **ppSel = (nssel_t**) arg;
+
+ if(*ppSel != NULL)
+ nssel.Destruct(ppSel);
+}
+/* This function is called to gather input. */
static rsRetVal
Run(tcpsrv_t *pThis)
{
DEFiRet;
- int maxfds;
int nfds;
int i;
int iTCPSess;
- fd_set readfds;
+ int bIsReady;
tcps_sess_t *pNewSess;
+ nssel_t *pSel;
+ int state;
ISOBJ_TYPE_assert(pThis, tcpsrv);
- /* this is an endless loop - it is terminated when the thread is
- * signalled to do so. This, however, is handled by the framework,
- * right into the sleep below.
+ /* this is an endless loop - it is terminated by the framework canelling
+ * this thread. Thus, we also need to instantiate a cancel cleanup handler
+ * to prevent us from leaking anything. -- rgerharsd, 20080-04-24
*/
+ pthread_cleanup_push(RunCancelCleanup, (void*) &pSel);
while(1) {
- maxfds = 0;
- FD_ZERO (&readfds);
-
- /* Add the TCP listen sockets to the list of read descriptors.
- */
- if(pThis->pSocksLstn != NULL && *pThis->pSocksLstn) {
- for (i = 0; i < *pThis->pSocksLstn; i++) {
- /* The if() below is theoretically not needed, but I leave it in
- * so that a socket may become unsuable during execution. That
- * feature is not yet supported by the current code base.
- */
- if (pThis->pSocksLstn[i+1] != -1) {
- if(Debug)
- net.debugListenInfo(pThis->pSocksLstn[i+1], "TCP");
- FD_SET(pThis->pSocksLstn[i+1], &readfds);
- if(pThis->pSocksLstn[i+1]>maxfds) maxfds=pThis->pSocksLstn[i+1];
- }
- }
- /* do the sessions */
- iTCPSess = TCPSessGetNxtSess(pThis, -1);
- while(iTCPSess != -1) {
- int fdSess;
- fdSess = pThis->pSessions[iTCPSess]->sock; // TODO: NOT CLEAN!, use method
- dbgprintf("Adding TCP Session %d\n", fdSess);
- FD_SET(fdSess, &readfds);
- if (fdSess>maxfds) maxfds=fdSess;
- /* now get next... */
- iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
- }
+ CHKiRet(nssel.Construct(&pSel));
+ // TODO: set driver
+ CHKiRet(nssel.ConstructFinalize(pSel));
+
+ /* Add the TCP listen sockets to the list of read descriptors. */
+ for(i = 0 ; i < pThis->iLstnMax ; ++i) {
+ CHKiRet(nssel.Add(pSel, pThis->ppLstn[i], NSDSEL_RD));
}
- if(Debug) {
- // TODO: name in dbgprintf!
- dbgprintf("--------<TCPSRV> calling select, active file descriptors (max %d): ", maxfds);
- for (nfds = 0; nfds <= maxfds; ++nfds)
- if ( FD_ISSET(nfds, &readfds) )
- dbgprintf("%d ", nfds);
- dbgprintf("\n");
+ /* do the sessions */
+ iTCPSess = TCPSessGetNxtSess(pThis, -1);
+ while(iTCPSess != -1) {
+ /* TODO: access to pNsd is NOT really CLEAN, use method... */
+ CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD));
+ /* now get next... */
+ iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
/* wait for io to become ready */
- nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL);
+ CHKiRet(nssel.Wait(pSel, &nfds));
- for (i = 0; i < *pThis->pSocksLstn; i++) {
- if (FD_ISSET(pThis->pSocksLstn[i+1], &readfds)) {
- dbgprintf("New connect on TCP inetd socket: #%d\n", pThis->pSocksLstn[i+1]);
- SessAccept(pThis, &pNewSess, pThis->pSocksLstn[i+1]);
+ for(i = 0 ; i < pThis->iLstnMax ; ++i) {
+ CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
+ if(bIsReady) {
+ dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ SessAccept(pThis, &pNewSess, pThis->ppLstn[i]);
--nfds; /* indicate we have processed one */
}
}
@@ -579,12 +443,10 @@ Run(tcpsrv_t *pThis)
/* now check the sessions */
iTCPSess = TCPSessGetNxtSess(pThis, -1);
while(nfds && iTCPSess != -1) {
- int fdSess;
- int state;
- fdSess = pThis->pSessions[iTCPSess]->sock; // TODO: not clean, use method
- if(FD_ISSET(fdSess, &readfds)) {
+ CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
+ if(bIsReady) {
char buf[MAXLINE];
- dbgprintf("tcp session socket with new data: #%d\n", fdSess);
+ dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm);
/* Receive message */
state = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf));
@@ -592,7 +454,8 @@ Run(tcpsrv_t *pThis)
pThis->pOnRegularClose(pThis->pSessions[iTCPSess]);
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
} else if(state == -1) {
- errmsg.LogError(NO_ERRCODE, "TCP session %d will be closed, error ignored\n", fdSess);
+ errmsg.LogError(NO_ERRCODE, "netstream session %p will be closed, error ignored\n",
+ pThis->pSessions[iTCPSess]->pStrm);
pThis->pOnErrClose(pThis->pSessions[iTCPSess]);
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
} else {
@@ -611,29 +474,44 @@ Run(tcpsrv_t *pThis)
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
+ CHKiRet(nssel.Destruct(&pSel));
}
+ /* note that this point is usually not reached */
+ pthread_cleanup_pop(0); /* remove cleanup handler */
+
+finalize_it: // TODO: think: is it really good to exit the loop?
RETiRet;
}
-/* Standard-Constructor
- */
+/* Standard-Constructor */
BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
- pThis->pSocksLstn = NULL;
- pThis->iSessMax = 200; /* TODO: useful default ;) */
+ pThis->iSessMax = TCPSESS_MAX_DEFAULT; /* TODO: useful default ;) */
ENDobjConstruct(tcpsrv)
-/* ConstructionFinalizer
- */
+/* ConstructionFinalizer */
static rsRetVal
-tcpsrvConstructFinalize(tcpsrv_t __attribute__((unused)) *pThis)
+tcpsrvConstructFinalize(tcpsrv_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
- pThis->pSocksLstn = pThis->OpenLstnSocks(pThis);
+ /* prepare network stream subsystem */
+ CHKiRet(netstrms.Construct(&pThis->pNS));
+ // TODO: set driver!
+ CHKiRet(netstrms.ConstructFinalize(pThis->pNS));
+
+ /* set up listeners */
+ CHKmalloc(pThis->ppLstn = calloc(TCPLSTN_MAX_DEFAULT, sizeof(netstrm_t*)));
+ iRet = pThis->OpenLstnSocks(pThis);
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pThis->pNS != NULL)
+ netstrms.Destruct(&pThis->pNS);
+ }
RETiRet;
}
@@ -645,6 +523,11 @@ CODESTARTobjDestruct(tcpsrv)
pThis->OnDestruct(pThis->pUsr);
deinit_tcp_listener(pThis);
+
+ if(pThis->pNS != NULL)
+ netstrms.Destruct(&pThis->pNS);
+ if(pThis->ppLstn != NULL)
+ free(pThis->ppLstn);
ENDobjDestruct(tcpsrv)
@@ -727,7 +610,7 @@ SetCBOnErrClose(tcpsrv_t *pThis, rsRetVal (*pCB)(tcps_sess_t*))
}
static rsRetVal
-SetCBOpenLstnSocks(tcpsrv_t *pThis, int* (*pCB)(tcpsrv_t*))
+SetCBOpenLstnSocks(tcpsrv_t *pThis, rsRetVal (*pCB)(tcpsrv_t*))
{
DEFiRet;
pThis->OpenLstnSocks = pCB;
@@ -793,6 +676,9 @@ CODESTARTObjClassExit(tcpsrv)
objRelease(conf, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
+ objRelease(nssel, LM_NSSEL_FILENAME);
+ objRelease(netstrm, LM_NETSTRM_FILENAME);
+ objRelease(netstrms, LM_NETSTRMS_FILENAME);
objRelease(net, LM_NET_FILENAME);
ENDObjClassExit(tcpsrv)
@@ -805,6 +691,9 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
/* request objects we use */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
+ CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
+ CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME));
+ CHKiRet(objUse(nssel, LM_NSSEL_FILENAME));
CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB));
CHKiRet(objUse(conf, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
diff --git a/tcpsrv.h b/tcpsrv.h
index 20a7ea25..ada77aec 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -26,9 +26,11 @@
#include "tcps_sess.h"
/* the tcpsrv object */
-typedef struct tcpsrv_s {
+struct tcpsrv_s {
BEGINobjInstance; /**< Data to implement generic object - MUST be the first data element! */
- int *pSocksLstn; /**< listen socket array for server [0] holds count */
+ netstrms_t *pNS; /**< pointer to network stream subsystem */
+ int iLstnMax; /**< max nbr of listeners currently supported */
+ netstrm_t **ppLstn; /**< our netstream listners */
int iSessMax; /**< max number of sessions supported */
char *TCPLstnPort; /**< the port the listener shall listen on */
tcps_sess_t **pSessions;/**< array of all of our sessions */
@@ -36,16 +38,16 @@ typedef struct tcpsrv_s {
/* callbacks */
int (*pIsPermittedHost)(struct sockaddr *addr, char *fromHostFQDN, void*pUsrSrv, void*pUsrSess);
int (*pRcvData)(tcps_sess_t*, char*, size_t);
- int* (*OpenLstnSocks)(struct tcpsrv_s*);
+ rsRetVal (*OpenLstnSocks)(struct tcpsrv_s*);
rsRetVal (*pOnListenDeinit)(void*);
rsRetVal (*OnDestruct)(void*);
rsRetVal (*pOnRegularClose)(tcps_sess_t *pSess);
rsRetVal (*pOnErrClose)(tcps_sess_t *pSess);
/* session specific callbacks */
- rsRetVal (*pOnSessAccept)(struct tcpsrv_s *, tcps_sess_t*);
+ rsRetVal (*pOnSessAccept)(tcpsrv_t *, tcps_sess_t*);
rsRetVal (*OnSessConstructFinalize)(void*);
rsRetVal (*pOnSessDestruct)(void*);
-} tcpsrv_t;
+};
/* interfaces */
@@ -55,13 +57,13 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
rsRetVal (*ConstructFinalize)(tcpsrv_t __attribute__((unused)) *pThis);
rsRetVal (*Destruct)(tcpsrv_t **ppThis);
void (*configureTCPListen)(tcpsrv_t*, char *cOptarg);
- int (*SessAccept)(tcpsrv_t *pThis, tcps_sess_t**ppSess, int fd);
- int* (*create_tcp_socket)(tcpsrv_t *pThis);
+ rsRetVal (*SessAccept)(tcpsrv_t *pThis, tcps_sess_t **ppSess, netstrm_t *pStrm);
+ rsRetVal (*create_tcp_socket)(tcpsrv_t *pThis);
rsRetVal (*Run)(tcpsrv_t *pThis);
/* set methods */
rsRetVal (*SetUsrP)(tcpsrv_t*, void*);
rsRetVal (*SetCBIsPermittedHost)(tcpsrv_t*, int (*) (struct sockaddr *addr, char*, void*, void*));
- rsRetVal (*SetCBOpenLstnSocks)(tcpsrv_t *, int* (*)(tcpsrv_t*));
+ rsRetVal (*SetCBOpenLstnSocks)(tcpsrv_t *, rsRetVal (*)(tcpsrv_t*));
rsRetVal (*SetCBRcvData)(tcpsrv_t *, int (*)(tcps_sess_t*, char*, size_t));
rsRetVal (*SetCBOnListenDeinit)(tcpsrv_t*, rsRetVal (*)(void*));
rsRetVal (*SetCBOnDestruct)(tcpsrv_t*, rsRetVal (*) (void*));
@@ -72,7 +74,7 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetCBOnSessDestruct)(tcpsrv_t*, rsRetVal (*) (void*));
rsRetVal (*SetCBOnSessConstructFinalize)(tcpsrv_t*, rsRetVal (*) (void*));
ENDinterface(tcpsrv)
-#define tcpsrvCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+#define tcpsrvCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/tools/omfwd.c b/tools/omfwd.c
index 3a2fe37f..719075c7 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -51,6 +51,7 @@
#include "syslogd-types.h"
#include "srUtils.h"
#include "net.h"
+#include "netstrms.h"
#include "netstrm.h"
#include "omfwd.h"
#include "template.h"
@@ -69,12 +70,14 @@ DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(net)
+DEFobjCurrIf(netstrms)
DEFobjCurrIf(netstrm)
DEFobjCurrIf(tcpclt)
typedef struct _instanceData {
- char *f_hname;
+ netstrms_t *pNS; /* netstream subsystem */
netstrm_t *pNetstrm; /* our output netstream */
+ char *f_hname;
int *pSockArray; /* sockets to use for UDP */
int bIsConnected; /* are we connected to remote host? 0 - no, 1 - yes, UDP means addr resolved */
struct addrinfo *f_addr;
@@ -252,16 +255,24 @@ static rsRetVal TCPSendInit(void *pvData)
assert(pData != NULL);
if(pData->pNetstrm == NULL) {
- CHKiRet(netstrm.Construct(&pData->pNetstrm));
+ CHKiRet(netstrms.Construct(&pData->pNS));
/* here we may set another netstream driver (e.g. to do TLS) */
+ CHKiRet(netstrms.ConstructFinalize(pData->pNS));
+
+ /* now create the actual stream and connect to the server */
+ CHKiRet(netstrms.CreateStrm(pData->pNS, &pData->pNetstrm));
CHKiRet(netstrm.ConstructFinalize(pData->pNetstrm));
CHKiRet(netstrm.Connect(pData->pNetstrm, glbl.GetDefPFFamily(),
(uchar*)pData->port, (uchar*)pData->f_hname));
}
finalize_it:
- if(iRet != RS_RET_OK)
- netstrm.Destruct(&pData->pNetstrm);
+ if(iRet != RS_RET_OK) {
+ if(pData->pNetstrm != NULL)
+ netstrm.Destruct(&pData->pNetstrm);
+ if(pData->pNS != NULL)
+ netstrms.Destruct(&pData->pNS);
+ }
RETiRet;
}
@@ -394,6 +405,8 @@ static rsRetVal
loadTCPSupport(void)
{
DEFiRet;
+ if(!netstrms.ifIsLoaded)
+ CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
if(!netstrm.ifIsLoaded)
CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME));
if(!tcpclt.ifIsLoaded)
@@ -564,6 +577,8 @@ CODESTARTmodExit
objRelease(net, LM_NET_FILENAME);
if(netstrm.ifIsLoaded)
objRelease(netstrm, LM_NETSTRM_FILENAME);
+ if(netstrms.ifIsLoaded)
+ objRelease(netstrms, LM_NETSTRMS_FILENAME);
if(!tcpclt.ifIsLoaded)
objRelease(tcpclt, LM_TCPCLT_FILENAME);