From 1892fc75f9fad0b0741b4a3eb1fc382f900b2301 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 23 Apr 2008 15:07:19 +0200 Subject: added new netstrms class netstrms is at the top layer of the socket abstraction --- plugins/imtcp/imtcp.c | 2 +- runtime/Makefile.am | 10 ++- runtime/netstrm.c | 88 ++++----------------- runtime/netstrm.h | 28 ++----- runtime/netstrms.c | 206 ++++++++++++++++++++++++++++++++++++++++++++++++++ runtime/netstrms.h | 52 +++++++++++++ runtime/nsd.h | 2 +- runtime/nsdsel_ptcp.c | 17 ++++- runtime/nssel.c | 15 ++-- runtime/nssel.h | 4 +- runtime/rsyslog.h | 2 + tcps_sess.h | 5 +- tcpsrv.c | 154 +++++++++++++++++++++---------------- tcpsrv.h | 13 ++-- tools/omfwd.c | 23 +++++- 15 files changed, 433 insertions(+), 188 deletions(-) create mode 100644 runtime/netstrms.c create mode 100644 runtime/netstrms.h diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c index b7f8f0b5..17a48d17 100644 --- a/plugins/imtcp/imtcp.c +++ b/plugins/imtcp/imtcp.c @@ -70,7 +70,7 @@ isPermittedHost(struct sockaddr *addr, char *fromHostFQDN, void __attribute__((u } -static int* +static rsRetVal doOpenLstnSocks(tcpsrv_t *pSrv) { ISOBJ_TYPE_assert(pSrv, tcpsrv); diff --git a/runtime/Makefile.am b/runtime/Makefile.am index 61ede1d7..7cb1b9bb 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -81,7 +81,7 @@ lmregexp_la_LIBADD = endif if ENABLE_INET -pkglib_LTLIBRARIES += lmnet.la lmnetstrm.la lmnssel.la +pkglib_LTLIBRARIES += lmnet.la lmnetstrms.la lmnetstrm.la lmnssel.la # # network support # @@ -90,7 +90,13 @@ lmnet_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags) lmnet_la_LDFLAGS = -module -avoid-version lmnet_la_LIBADD = -# network streams +# network stream master class and stream factory +lmnetstrms_la_SOURCES = netstrms.c netstrms.h +lmnetstrms_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags) +lmnetstrms_la_LDFLAGS = -module -avoid-version +lmnetstrms_la_LIBADD = + +# individual network streams lmnetstrm_la_SOURCES = netstrm.c netstrm.h lmnetstrm_la_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags) lmnetstrm_la_LDFLAGS = -module -avoid-version diff --git a/runtime/netstrm.c b/runtime/netstrm.c index bdd2636c..f0bdab78 100644 --- a/runtime/netstrm.c +++ b/runtime/netstrm.c @@ -38,29 +38,16 @@ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. */ #include "config.h" - -#include "rsyslog.h" -#include -#include #include #include -#include #include -#include -#include -#include -#include -#include -#include - -#include "syslogd-types.h" + +#include "rsyslog.h" #include "module-template.h" -#include "parse.h" -#include "srUtils.h" #include "obj.h" #include "errmsg.h" -#include "net.h" -#include "nsd.h" +//#include "nsd.h" +#include "netstrms.h" #include "netstrm.h" MODULE_TYPE_LIB @@ -68,37 +55,7 @@ MODULE_TYPE_LIB /* static data */ DEFobjStaticHelpers DEFobjCurrIf(errmsg) -DEFobjCurrIf(glbl) -DEFobjCurrIf(net) - - -/* load our low-level driver. This must be done before any - * driver-specific functions (allmost all...) can be carried - * out. Note that the driver's .ifIsLoaded is correctly - * initialized by calloc() and we depend on that. - * rgerhards, 2008-04-18 - */ -static rsRetVal -loadDrvr(netstrm_t *pThis) -{ - uchar *pDrvrName; - DEFiRet; - - pDrvrName = pThis->pDrvrName; - if(pDrvrName == NULL) /* if no drvr name is set, use system default */ - pDrvrName = glbl.GetDfltNetstrmDrvr(); - - pThis->Drvr.ifVersion = nsdCURR_IF_VERSION; - /* The pDrvrName+2 below is a hack to obtain the object name. It - * safes us to have yet another variable with the name without "lm" in - * front of it. If we change the module load interface, we may re-think - * about this hack, but for the time being it is efficient and clean - * enough. -- rgerhards, 2008-04-18 - */ - CHKiRet(obj.UseObj(__FILE__, pDrvrName+2, pDrvrName, (void*) &pThis->Drvr)); -finalize_it: - RETiRet; -} +DEFobjCurrIf(netstrms) /* Standard-Constructor */ @@ -120,7 +77,6 @@ netstrmConstructFinalize(netstrm_t *pThis) { DEFiRet; ISOBJ_TYPE_assert(pThis, netstrm); - CHKiRet(loadDrvr(pThis)); CHKiRet(pThis->Drvr.Construct(&pThis->pDrvrData)); finalize_it: RETiRet; @@ -152,40 +108,27 @@ AbortDestruct(netstrm_t **ppThis) * rgerhards, 2008-04-21 */ static rsRetVal -AcceptConnReq(netstrm_t *pThis, nsd_t *pReqNsd, netstrm_t **ppNew) +AcceptConnReq(netstrm_t *pThis, netstrm_t **ppNew) { - netstrm_t *pNew = NULL; nsd_t *pNewNsd = NULL; DEFiRet; ISOBJ_TYPE_assert(pThis, netstrm); - assert(pReqNsd != NULL); assert(ppNew != NULL); /* accept the new connection */ - CHKiRet(pThis->Drvr.AcceptConnReq(pReqNsd, &pNewNsd)); - + CHKiRet(pThis->Drvr.AcceptConnReq(pThis->pDrvrData, &pNewNsd)); /* construct our object so that we can use it... */ - CHKiRet(netstrmConstruct(&pNew)); - - pNew->pDrvrData = pNewNsd; - if(pThis->pDrvrName == NULL) { - pNew->pDrvrName = NULL; - } else { - CHKmalloc(pNew->pDrvrName = (uchar*) strdup((char*)pThis->pDrvrName)); - } - CHKiRet(loadDrvr(pNew)); - - *ppNew = pNew; + CHKiRet(netstrms.CreateStrm(pThis->pNS, ppNew)); + (*ppNew)->pDrvrData = pNewNsd; finalize_it: if(iRet != RS_RET_OK) { - if(pNew != NULL) - netstrmDestruct(&pNew); /* the close may be redundant, but that doesn't hurt... */ if(pNewNsd != NULL) pThis->Drvr.Destruct(&pNewNsd); } + RETiRet; } @@ -197,12 +140,11 @@ finalize_it: * rgerhards, 2008-04-22 */ static rsRetVal -LstnInit(netstrm_t *pThis, uchar *pLstnPort, uchar *pLstnIP, int iSessMax) +LstnInit(void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*), uchar *pLstnPort, uchar *pLstnIP, int iSessMax) { DEFiRet; - ISOBJ_TYPE_assert(pThis, netstrm); assert(pLstnPort != NULL); - CHKiRet(pThis->Drvr.LstnInit(&pThis->parrLstn, &pThis->isizeLstnArr, pLstnPort, pLstnIP, iSessMax)); + //CHKiRet(pThis->Drvr.LstnInit(pUsr, fAddLstn, pLstnPort, pLstnIP, iSessMax)); finalize_it: RETiRet; @@ -290,9 +232,8 @@ ENDobjQueryInterface(netstrm) BEGINObjClassExit(netstrm, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */ CODESTARTObjClassExit(netstrm) /* release objects we no longer need */ - objRelease(net, CORE_COMPONENT); - objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); + objRelease(netstrms, LM_NETSTRMS_FILENAME); ENDObjClassExit(netstrm) @@ -303,8 +244,7 @@ ENDObjClassExit(netstrm) BEGINAbstractObjClassInit(netstrm, 1, OBJ_IS_CORE_MODULE) /* class, version */ /* request objects we use */ CHKiRet(objUse(errmsg, CORE_COMPONENT)); - CHKiRet(objUse(glbl, CORE_COMPONENT)); - CHKiRet(objUse(net, CORE_COMPONENT)); + CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME)); /* set our own handlers */ ENDObjClassInit(netstrm) diff --git a/runtime/netstrm.h b/runtime/netstrm.h index a3719f93..b87228d2 100644 --- a/runtime/netstrm.h +++ b/runtime/netstrm.h @@ -29,34 +29,18 @@ /* the netstrm object */ struct netstrm_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ - nsd_t *pDrvrData; /**< the driver's data elements */ + nsd_t *pDrvrData; /**< the driver's data elements (at most other places, this is called pNsd) */ uchar *pDrvrName; /**< nsd driver name to use, or NULL if system default */ nsd_if_t Drvr; /**< our stream driver */ + netstrms_t *pNS; /**< pointer to our netstream subsystem object */ /* for listeners, we need to have the capablity to listen on multiple "sockets". This * is needed to support IPv6. We do this by specifying an array of nsd_t objects to * handle this case. */ - int isizeLstnArr; - nsd_t **parrLstn; + //int isizeLstnArr; + //nsd_t **parrLstn; }; -/* a helper object enabling us to wait on a set of streams to become - * ready for IO - this is modelled after select(). We need this, because - * stream drivers may have different concepts. Consequently, - * the structure must contain nsd_t's from the same stream driver type - * only. This is implemented as a singly-linked list where every - * new element is added at the top of the list. -- rgerhards, 2008-04-22 - */ -typedef struct netstrm_iowaiter_s netstrm_iowaiter_t; -struct netstrm_iowaiter_s { - netstrm_iowaiter_t *pNext; - nsd_t *pNsd; - enum { - NETSTRM_IOWAIT_RD = 1, - NETSTRM_IOWAIT_WR = 2, - NETSTRM_IOWAIT_RDWR = 3 - } waitOp; /**< the operation we wait for */ -}; /* interface */ BEGINinterface(netstrm) /* name must also be changed in ENDinterface macro! */ @@ -64,8 +48,8 @@ BEGINinterface(netstrm) /* name must also be changed in ENDinterface macro! */ rsRetVal (*ConstructFinalize)(netstrm_t *pThis); rsRetVal (*Destruct)(netstrm_t **ppThis); rsRetVal (*AbortDestruct)(netstrm_t **ppThis); - rsRetVal (*LstnInit)(netstrm_t *pThis, uchar *pLstnPort, uchar *pLstnIP, int iSessMax); - rsRetVal (*AcceptConnReq)(netstrm_t *pThis, nsd_t *pReqNsd, netstrm_t **ppNew); + rsRetVal (*LstnInit)(void *pUsr, rsRetVal(*)(void*,netstrm_t*), uchar *pLstnPort, uchar *pLstnIP, int iSessMax); + rsRetVal (*AcceptConnReq)(netstrm_t *pThis, netstrm_t **ppNew); rsRetVal (*Rcv)(netstrm_t *pThis, uchar *pRcvBuf, ssize_t *pLenBuf); rsRetVal (*Send)(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf); rsRetVal (*Connect)(netstrm_t *pThis, int family, unsigned char *port, unsigned char *host); diff --git a/runtime/netstrms.c b/runtime/netstrms.c new file mode 100644 index 00000000..d0e11441 --- /dev/null +++ b/runtime/netstrms.c @@ -0,0 +1,206 @@ +/* netstrms.c + * + * Work on this module begung 2008-04-23 by Rainer Gerhards. + * + * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ +#include "config.h" + +//#include +#include +#include +#include +//#include + +#include "rsyslog.h" +//#include "syslogd-types.h" +#include "module-template.h" +#include "obj.h" +//#include "errmsg.h" +//#include "net.h" +#include "nsd.h" +#include "netstrm.h" +#include "netstrms.h" + +MODULE_TYPE_LIB + +/* static data */ +DEFobjStaticHelpers +//DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(netstrm) +//DEFobjCurrIf(net) + + +/* load our low-level driver. This must be done before any + * driver-specific functions (allmost all...) can be carried + * out. Note that the driver's .ifIsLoaded is correctly + * initialized by calloc() and we depend on that. + * rgerhards, 2008-04-18 + */ +static rsRetVal +loadDrvr(netstrms_t *pThis) +{ + uchar *pDrvrName; + DEFiRet; + + pDrvrName = pThis->pDrvrName; + if(pDrvrName == NULL) /* if no drvr name is set, use system default */ + pDrvrName = glbl.GetDfltNetstrmDrvr(); + + pThis->Drvr.ifVersion = nsdCURR_IF_VERSION; + /* The pDrvrName+2 below is a hack to obtain the object name. It + * safes us to have yet another variable with the name without "lm" in + * front of it. If we change the module load interface, we may re-think + * about this hack, but for the time being it is efficient and clean + * enough. -- rgerhards, 2008-04-18 + */ + CHKiRet(obj.UseObj(__FILE__, pDrvrName+2, pDrvrName, (void*) &pThis->Drvr)); +finalize_it: + RETiRet; +} + + +/* Standard-Constructor */ +BEGINobjConstruct(netstrms) /* be sure to specify the object type also in END macro! */ +ENDobjConstruct(netstrms) + + +/* destructor for the netstrms object */ +BEGINobjDestruct(netstrms) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(netstrms) + if(pThis->pDrvrName != NULL) + free(pThis->pDrvrName); +ENDobjDestruct(netstrms) + + +/* ConstructionFinalizer */ +static rsRetVal +netstrmsConstructFinalize(netstrms_t *pThis) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, netstrms); + CHKiRet(loadDrvr(pThis)); +finalize_it: + RETiRet; +} + + +/* create an instance of a netstrm object. It is initialized with default + * values. The current driver is used. The caller may set netstrm properties + * and must call ConstructFinalize(). + */ +static rsRetVal +CreateStrm(netstrms_t *pThis, netstrm_t **ppStrm) +{ + netstrm_t *pStrm = NULL; + DEFiRet; + + CHKiRet(netstrm.Construct(&pStrm)); + /* we copy over our driver structure. We could provide a pointer to + * ourselves, but that costs some performance on each driver invocation. + * As we already have hefty indirection (and thus performance toll), I + * prefer to copy over the function pointers here. -- rgerhards, 2008-04-23 + */ + memcpy(&pStrm->Drvr, &pThis->Drvr, sizeof(pThis->Drvr)); + pStrm->pNS = pThis; + + *ppStrm = pStrm; + +finalize_it: + if(iRet != RS_RET_OK) { + if(pStrm != NULL) + netstrm.Destruct(&pStrm); + } + RETiRet; +} + + +/* queryInterface function */ +BEGINobjQueryInterface(netstrms) +CODESTARTobjQueryInterface(netstrms) + if(pIf->ifVersion != netstrmsCURR_IF_VERSION) {/* check for current version, increment on each change */ + ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED); + } + + /* ok, we have the right interface, so let's fill it + * Please note that we may also do some backwards-compatibility + * work here (if we can support an older interface version - that, + * of course, also affects the "if" above). + */ + pIf->Construct = netstrmsConstruct; + pIf->ConstructFinalize = netstrmsConstructFinalize; + pIf->Destruct = netstrmsDestruct; + pIf->CreateStrm = CreateStrm; +finalize_it: +ENDobjQueryInterface(netstrms) + + +/* exit our class */ +BEGINObjClassExit(netstrms, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */ +CODESTARTObjClassExit(netstrms) + /* release objects we no longer need */ + //objRelease(net, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(netstrm, LM_NETSTRM_FILENAME); + //objRelease(errmsg, CORE_COMPONENT); +ENDObjClassExit(netstrms) + + +/* Initialize the netstrms class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-02-19 + */ +BEGINAbstractObjClassInit(netstrms, 1, OBJ_IS_CORE_MODULE) /* class, version */ + /* request objects we use */ + //CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME)); + //CHKiRet(objUse(net, CORE_COMPONENT)); + + /* set our own handlers */ +ENDObjClassInit(netstrms) + + +/* --------------- here now comes the plumbing that makes as a library module --------------- */ + + +BEGINmodExit +CODESTARTmodExit + netstrmsClassExit(); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_LIB_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ + + /* Initialize all classes that are in our module - this includes ourselfs */ + CHKiRet(netstrmsClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ +ENDmodInit +/* vi:set ai: + */ diff --git a/runtime/netstrms.h b/runtime/netstrms.h new file mode 100644 index 00000000..1e920304 --- /dev/null +++ b/runtime/netstrms.h @@ -0,0 +1,52 @@ +/* Definitions for the stream-based netstrmsworking class. + * + * Copyright 2007, 2008 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#ifndef INCLUDED_NETSTRMS_H +#define INCLUDED_NETSTRMS_H + +#include "nsd.h" /* we need our driver interface to be defined */ + +/* the netstrms object */ +struct netstrms_s { + BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ + uchar *pDrvrName; /**< nsd driver name to use, or NULL if system default */ + nsd_if_t Drvr; /**< our stream driver */ +}; + + +/* interface */ +BEGINinterface(netstrms) /* name must also be changed in ENDinterface macro! */ + rsRetVal (*Construct)(netstrms_t **ppThis); + rsRetVal (*ConstructFinalize)(netstrms_t *pThis); + rsRetVal (*Destruct)(netstrms_t **ppThis); + rsRetVal (*CreateStrm)(netstrms_t *pThis, netstrm_t **ppStrm); +ENDinterface(netstrms) +#define netstrmsCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ + +/* prototypes */ +PROTOTYPEObj(netstrms); + +/* the name of our library binary */ +#define LM_NETSTRMS_FILENAME "lmnetstrms" + +#endif /* #ifndef INCLUDED_NETSTRMS_H */ diff --git a/runtime/nsd.h b/runtime/nsd.h index 2fb883ac..db61780f 100644 --- a/runtime/nsd.h +++ b/runtime/nsd.h @@ -57,7 +57,7 @@ BEGINinterface(nsdsel) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Destruct)(nsdsel_t **ppThis); rsRetVal (*Add)(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp); rsRetVal (*Select)(nsdsel_t *pNsdsel, int *piNumReady); - rsRetVal (*IsReady)(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp); + rsRetVal (*IsReady)(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady); ENDinterface(nsdsel) #define nsdselCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/runtime/nsdsel_ptcp.c b/runtime/nsdsel_ptcp.c index 67f3c62a..c5864809 100644 --- a/runtime/nsdsel_ptcp.c +++ b/runtime/nsdsel_ptcp.c @@ -131,7 +131,7 @@ Select(nsdsel_t *pNsdsel, int *piNumReady) /* check if a socket is ready for IO */ static rsRetVal -IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp) +IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp, int *pbIsReady) { DEFiRet; nsdsel_ptcp_t *pThis = (nsdsel_ptcp_t*) pNsdsel; @@ -139,6 +139,21 @@ IsReady(nsdsel_t *pNsdsel, nsd_t *pNsd, nsdsel_waitOp_t waitOp) ISOBJ_TYPE_assert(pThis, nsdsel_ptcp); ISOBJ_TYPE_assert(pSock, nsd_ptcp); + assert(pbIsReady != NULL); + + switch(waitOp) { + case NSDSEL_RD: + *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds); + break; + case NSDSEL_WR: + *pbIsReady = FD_ISSET(pSock->sock, &pThis->writefds); + break; + case NSDSEL_RDWR: + *pbIsReady = FD_ISSET(pSock->sock, &pThis->readfds) + | FD_ISSET(pSock->sock, &pThis->writefds); + break; + } + RETiRet; } diff --git a/runtime/nssel.c b/runtime/nssel.c index f2844872..0cbda9b9 100644 --- a/runtime/nssel.c +++ b/runtime/nssel.c @@ -41,6 +41,7 @@ #include "rsyslog.h" #include "obj.h" #include "module-template.h" +#include "netstrm.h" #include "nssel.h" MODULE_TYPE_LIB @@ -103,18 +104,19 @@ finalize_it: } -/* Add a stream object to the current IOW. Note that a single stream may - * have multiple "sockets" if it is a listener. If so, all of them are - * begin added. +/* Add a stream object to the current select() set. + * Note that a single stream may have multiple "sockets" if + * it is a listener. If so, all of them are begin added. */ static rsRetVal -Add(nssel_t *pThis, netstrm_t *pStrm) +Add(nssel_t *pThis, netstrm_t *pStrm, nsdsel_waitOp_t waitOp) { DEFiRet; ISOBJ_TYPE_assert(pThis, nssel); ISOBJ_TYPE_assert(pStrm, netstrm); - + + CHKiRet(pThis->Drvr.Add(pThis->pDrvrData, pStrm->pDrvrData, waitOp)); finalize_it: RETiRet; @@ -131,6 +133,7 @@ Wait(nssel_t *pThis, int *piNumReady) DEFiRet; ISOBJ_TYPE_assert(pThis, nssel); assert(piNumReady != NULL); + iRet = pThis->Drvr.Select(pThis->pDrvrData, piNumReady); RETiRet; } @@ -142,7 +145,7 @@ Wait(nssel_t *pThis, int *piNumReady) * rgerhards, 2008-04-23 */ static rsRetVal -IsReady(nssel_t *pThis, netstrm_t *pStrm, int *pbIsReady, int *piNumReady) +IsReady(nssel_t *pThis, netstrm_t *pStrm, nsdsel_waitOp_t waitOp, int *pbIsReady, int *piNumReady) { DEFiRet; ISOBJ_TYPE_assert(pThis, nssel); diff --git a/runtime/nssel.h b/runtime/nssel.h index 16919b1f..2f907caa 100644 --- a/runtime/nssel.h +++ b/runtime/nssel.h @@ -40,9 +40,9 @@ BEGINinterface(nssel) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(nssel_t **ppThis); rsRetVal (*ConstructFinalize)(nssel_t *pThis); rsRetVal (*Destruct)(nssel_t **ppThis); - rsRetVal (*Add)(nssel_t *pThis, netstrm_t *pStrm); + rsRetVal (*Add)(nssel_t *pThis, netstrm_t *pStrm, nsdsel_waitOp_t waitOp); rsRetVal (*Wait)(nssel_t *pThis, int *pNumReady); - rsRetVal (*IsReady)(nssel_t *pThis, netstrm_t *pStrm, int *pbIsReady, int *pNumRead); + rsRetVal (*IsReady)(nssel_t *pThis, netstrm_t *pStrm, nsdsel_waitOp_t waitOp, int *pbIsReady, int *piNumReady); ENDinterface(nssel) #define nsselCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 4f62ca3c..6bffae4b 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -64,6 +64,7 @@ typedef struct thrdInfo thrdInfo_t; typedef struct obj_s obj_t; typedef struct filed selector_t;/* TODO: this so far resides in syslogd.c, think about modularization */ typedef struct NetAddr netAddr_t; +typedef struct netstrms_s netstrms_t; typedef struct netstrm_s netstrm_t; typedef struct nssel_s nssel_t; typedef enum nsdsel_waitOp_e nsdsel_waitOp_t; @@ -76,6 +77,7 @@ typedef struct interface_s interface_t; typedef struct objInfo_s objInfo_t; typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */ typedef rsRetVal (*errLogFunc_t)(uchar*); /* this is a trick to store a function ptr to a function returning a function ptr... */ +typedef struct tcpsrv_s tcpsrv_t; /* some universal 64 bit define... */ typedef long long int64; diff --git a/tcps_sess.h b/tcps_sess.h index 102d91b5..25884ea2 100644 --- a/tcps_sess.h +++ b/tcps_sess.h @@ -32,8 +32,9 @@ struct tcpsrv_s; typedef struct tcps_sess_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ struct tcpsrv_s *pSrv; /* pointer back to my server (e.g. for callbacks) */ - int sock; - int iMsg; /* index of next char to store in msg */ + int sock; // TODO: remove + netstrm_t *pStrm; + int iMsg; /* index of next char to store in msg */ int bAtStrtOfFram; /* are we at the very beginning of a new frame? */ enum { eAtStrtFram, diff --git a/tcpsrv.c b/tcpsrv.c index a276bf19..973b59fa 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -63,13 +63,16 @@ #include "tcpsrv.h" #include "obj.h" #include "glbl.h" +#include "netstrms.h" #include "netstrm.h" +#include "nssel.h" #include "errmsg.h" MODULE_TYPE_LIB /* defines */ #define TCPSESS_MAX_DEFAULT 200 /* default for nbr of tcp sessions if no number is given */ +#define TCPLSTN_MAX_DEFAULT 20 /* default for nbr of listeners */ /* static data */ DEFobjStaticHelpers @@ -78,7 +81,9 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(tcps_sess) DEFobjCurrIf(errmsg) DEFobjCurrIf(net) +DEFobjCurrIf(netstrms) DEFobjCurrIf(netstrm) +DEFobjCurrIf(nssel) /* configure TCP listener settings. This is called during command @@ -193,17 +198,17 @@ TCPSessGetNxtSess(tcpsrv_t *pThis, int iCurr) */ static void deinit_tcp_listener(tcpsrv_t *pThis) { - int iTCPSess; + int i; ISOBJ_TYPE_assert(pThis, tcpsrv); assert(pThis->pSessions != NULL); /* close all TCP connections! */ - iTCPSess = TCPSessGetNxtSess(pThis, -1); - while(iTCPSess != -1) { - tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); + i = TCPSessGetNxtSess(pThis, -1); + while(i != -1) { + tcps_sess.Destruct(&pThis->pSessions[i]); /* now get next... */ - iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); + i = TCPSessGetNxtSess(pThis, i); } /* we are done with the session table - so get rid of it... @@ -214,8 +219,25 @@ static void deinit_tcp_listener(tcpsrv_t *pThis) if(pThis->TCPLstnPort != NULL) free(pThis->TCPLstnPort); - /* finally close our listen stream */ - netstrm.Destruct(&pThis->pLstn); + /* finally close our listen streams */ + for(i = 0 ; i < pThis->iLstnMax ; ++i) { + netstrm.Destruct(pThis->ppLstn + i); + } +} + + +/* add a listen socket to our listen socket array. This is a callback + * invoked from the netstrm class. -- rgerhards, 2008-04-23 + */ +static rsRetVal +addTcpLstn(void *pUsr, netstrm_t *pLstn) +{ + tcpsrv_t *pThis = (tcpsrv_t*) pUsr; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, tcpsrv); + + RETiRet; } @@ -242,7 +264,7 @@ create_tcp_socket(tcpsrv_t *pThis) TCPLstnPort = (uchar*)pThis->TCPLstnPort; /* TODO: add capability to specify local listen address! */ - CHKiRet(netstrm.LstnInit(pThis->pLstn, TCPLstnPort, NULL, pThis->iSessMax)); + CHKiRet(netstrm.LstnInit((void*)pThis, addTcpLstn, TCPLstnPort, NULL, pThis->iSessMax)); finalize_it: RETiRet; @@ -260,7 +282,7 @@ finalize_it: * rgerhards, 2008-03-02 */ static rsRetVal -SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, nsd_t *pNsd) +SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, netstrm_t *pStrm) { DEFiRet; tcps_sess_t *pSess; @@ -272,7 +294,7 @@ SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, nsd_t *pNsd) ISOBJ_TYPE_assert(pThis, tcpsrv); - CHKiRet(netstrm.AcceptConnReq(pThis->pLstn, pNsd, &pNewStrm)); + CHKiRet(netstrm.AcceptConnReq(pStrm, &pNewStrm)); /* Add to session list */ iSess = TCPSessTblFindFreeSpot(pThis); @@ -343,19 +365,18 @@ finalize_it: } -/* This function is called to gather input. - */ +/* This function is called to gather input. */ static rsRetVal Run(tcpsrv_t *pThis) { DEFiRet; - int maxfds; int nfds; int i; int iTCPSess; - fd_set readfds; + int bIsReady; tcps_sess_t *pNewSess; - nsdsel_t *pSel; + nssel_t *pSel; + int state; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -364,52 +385,32 @@ Run(tcpsrv_t *pThis) * right into the sleep below. */ while(1) { - CHKiRet(nsdsel_ptcp.Construct(&pSel)); + CHKiRet(nssel.Construct(&pSel)); + // TODO: set driver + CHKiRet(nssel.ConstructFinalize(pSel)); /* Add the TCP listen sockets to the list of read descriptors. */ - if(pThis->pSocksLstn != NULL && *pThis->pSocksLstn) { - for (i = 0; i < *pThis->pSocksLstn; i++) { - /* The if() below is theoretically not needed, but I leave it in - * so that a socket may become unsuable during execution. That - * feature is not yet supported by the current code base. - */ - if (pThis->pSocksLstn[i+1] != -1) { - if(Debug) net.debugListenInfo(pThis->pSocksLstn[i+1], "TCP"); - CHKiRet(nsdsel_ptcp.Add(pSel, )); - FD_SET(pThis->pSocksLstn[i+1], &readfds); - if(pThis->pSocksLstn[i+1]>maxfds) maxfds=pThis->pSocksLstn[i+1]; - } - } - /* do the sessions */ - iTCPSess = TCPSessGetNxtSess(pThis, -1); - while(iTCPSess != -1) { - int fdSess; - fdSess = pThis->pSessions[iTCPSess]->sock; // TODO: NOT CLEAN!, use method - dbgprintf("Adding TCP Session %d\n", fdSess); - FD_SET(fdSess, &readfds); - if (fdSess>maxfds) maxfds=fdSess; - /* now get next... */ - iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); - } + for(i = 0 ; i < pThis->iLstnMax ; ++i) { + CHKiRet(nssel.Add(pSel, pThis->ppLstn[i], NSDSEL_RD)); } - if(Debug) { - // TODO: name in dbgprintf! - dbgprintf("-------- calling select, active file descriptors (max %d): ", maxfds); - for (nfds = 0; nfds <= maxfds; ++nfds) - if ( FD_ISSET(nfds, &readfds) ) - dbgprintf("%d ", nfds); - dbgprintf("\n"); + /* do the sessions */ + iTCPSess = TCPSessGetNxtSess(pThis, -1); + while(iTCPSess != -1) { + /* TODO: access to pNsd is NOT really CLEAN, use method... */ + CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); + /* now get next... */ + iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } /* wait for io to become ready */ - /* this is the somewhat weak spot in our socket layer abstraction... */ - nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL); + CHKiRet(nssel.Wait(pSel, &nfds)); - for (i = 0; i < *pThis->pSocksLstn; i++) { - if (FD_ISSET(pThis->pSocksLstn[i+1], &readfds)) { - dbgprintf("New connect on TCP inetd socket: #%d\n", pThis->pSocksLstn[i+1]); - SessAccept(pThis, &pNewSess, pThis->pSocksLstn[i+1]); + for(i = 0 ; i < pThis->iLstnMax ; ++i) { + CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); + if(bIsReady) { + dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]); + SessAccept(pThis, &pNewSess, pThis->ppLstn[i]); --nfds; /* indicate we have processed one */ } } @@ -417,12 +418,10 @@ Run(tcpsrv_t *pThis) /* now check the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(nfds && iTCPSess != -1) { - int fdSess; - int state; - fdSess = pThis->pSessions[iTCPSess]->sock; // TODO: not clean, use method - if(FD_ISSET(fdSess, &readfds)) { + CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); + if(bIsReady) { char buf[MAXLINE]; - dbgprintf("tcp session socket with new data: #%d\n", fdSess); + dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm); /* Receive message */ state = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf)); @@ -430,7 +429,8 @@ Run(tcpsrv_t *pThis) pThis->pOnRegularClose(pThis->pSessions[iTCPSess]); tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); } else if(state == -1) { - errmsg.LogError(NO_ERRCODE, "TCP session %d will be closed, error ignored\n", fdSess); + errmsg.LogError(NO_ERRCODE, "netstream session %p will be closed, error ignored\n", + pThis->pSessions[iTCPSess]->pStrm); pThis->pOnErrClose(pThis->pSessions[iTCPSess]); tcps_sess.Destruct(&pThis->pSessions[iTCPSess]); } else { @@ -451,26 +451,39 @@ Run(tcpsrv_t *pThis) } } +finalize_it: // TODO: think: is it really good to exit the loop? RETiRet; } -/* Standard-Constructor - */ +/* Standard-Constructor */ BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */ - pThis->iSessMax = 200; /* TODO: useful default ;) */ + pThis->iSessMax = TCPSESS_MAX_DEFAULT; /* TODO: useful default ;) */ + pThis->iLstnMax = TCPLSTN_MAX_DEFAULT; /* TODO: useful default ;) */ ENDobjConstruct(tcpsrv) -/* ConstructionFinalizer - */ +/* ConstructionFinalizer */ static rsRetVal -tcpsrvConstructFinalize(tcpsrv_t __attribute__((unused)) *pThis) +tcpsrvConstructFinalize(tcpsrv_t *pThis) { DEFiRet; ISOBJ_TYPE_assert(pThis, tcpsrv); + + /* prepare network stream subsystem */ + CHKiRet(netstrms.Construct(&pThis->pNS)); + // TODO: set driver! + CHKiRet(netstrms.ConstructFinalize(pThis->pNS)); + + /* set up listeners */ + CHKmalloc(pThis->ppLstn = calloc(pThis->iLstnMax, sizeof(netstrm_t*))); iRet = pThis->OpenLstnSocks(pThis); +finalize_it: + if(iRet != RS_RET_OK) { + if(pThis->pNS != NULL) + netstrms.Destruct(&pThis->pNS); + } RETiRet; } @@ -482,6 +495,9 @@ CODESTARTobjDestruct(tcpsrv) pThis->OnDestruct(pThis->pUsr); deinit_tcp_listener(pThis); + + if(pThis->pNS != NULL) + netstrms.Destruct(&pThis->pNS); ENDobjDestruct(tcpsrv) @@ -630,7 +646,9 @@ CODESTARTObjClassExit(tcpsrv) objRelease(conf, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); - objRelease(netstrm, LM_NET_FILENAME); + objRelease(nssel, LM_NSSEL_FILENAME); + objRelease(netstrm, LM_NETSTRM_FILENAME); + objRelease(netstrms, LM_NETSTRMS_FILENAME); objRelease(net, LM_NET_FILENAME); ENDObjClassExit(tcpsrv) @@ -643,7 +661,9 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE /* request objects we use */ CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); - CHKiRet(objUse(netstrm, LM_NET_FILENAME)); + CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME)); + CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME)); + CHKiRet(objUse(nssel, LM_NSSEL_FILENAME)); CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB)); CHKiRet(objUse(conf, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); diff --git a/tcpsrv.h b/tcpsrv.h index 4cec906f..ada77aec 100644 --- a/tcpsrv.h +++ b/tcpsrv.h @@ -26,10 +26,11 @@ #include "tcps_sess.h" /* the tcpsrv object */ -typedef struct tcpsrv_s { +struct tcpsrv_s { BEGINobjInstance; /**< Data to implement generic object - MUST be the first data element! */ - //int *pSocksLstn; /**< listen socket array for server [0] holds count */ - netstrm_t *pLstn; /**< our netstream listner (which may contain multiple "sockets" */ + netstrms_t *pNS; /**< pointer to network stream subsystem */ + int iLstnMax; /**< max nbr of listeners currently supported */ + netstrm_t **ppLstn; /**< our netstream listners */ int iSessMax; /**< max number of sessions supported */ char *TCPLstnPort; /**< the port the listener shall listen on */ tcps_sess_t **pSessions;/**< array of all of our sessions */ @@ -43,10 +44,10 @@ typedef struct tcpsrv_s { rsRetVal (*pOnRegularClose)(tcps_sess_t *pSess); rsRetVal (*pOnErrClose)(tcps_sess_t *pSess); /* session specific callbacks */ - rsRetVal (*pOnSessAccept)(struct tcpsrv_s *, tcps_sess_t*); + rsRetVal (*pOnSessAccept)(tcpsrv_t *, tcps_sess_t*); rsRetVal (*OnSessConstructFinalize)(void*); rsRetVal (*pOnSessDestruct)(void*); -} tcpsrv_t; +}; /* interfaces */ @@ -56,7 +57,7 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */ rsRetVal (*ConstructFinalize)(tcpsrv_t __attribute__((unused)) *pThis); rsRetVal (*Destruct)(tcpsrv_t **ppThis); void (*configureTCPListen)(tcpsrv_t*, char *cOptarg); - rsRetVal (*SessAccept)(tcpsrv_t *pThis, tcps_sess_t **ppSess, nsd_t *pNsd); + rsRetVal (*SessAccept)(tcpsrv_t *pThis, tcps_sess_t **ppSess, netstrm_t *pStrm); rsRetVal (*create_tcp_socket)(tcpsrv_t *pThis); rsRetVal (*Run)(tcpsrv_t *pThis); /* set methods */ diff --git a/tools/omfwd.c b/tools/omfwd.c index 3a2fe37f..719075c7 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -51,6 +51,7 @@ #include "syslogd-types.h" #include "srUtils.h" #include "net.h" +#include "netstrms.h" #include "netstrm.h" #include "omfwd.h" #include "template.h" @@ -69,12 +70,14 @@ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(net) +DEFobjCurrIf(netstrms) DEFobjCurrIf(netstrm) DEFobjCurrIf(tcpclt) typedef struct _instanceData { - char *f_hname; + netstrms_t *pNS; /* netstream subsystem */ netstrm_t *pNetstrm; /* our output netstream */ + char *f_hname; int *pSockArray; /* sockets to use for UDP */ int bIsConnected; /* are we connected to remote host? 0 - no, 1 - yes, UDP means addr resolved */ struct addrinfo *f_addr; @@ -252,16 +255,24 @@ static rsRetVal TCPSendInit(void *pvData) assert(pData != NULL); if(pData->pNetstrm == NULL) { - CHKiRet(netstrm.Construct(&pData->pNetstrm)); + CHKiRet(netstrms.Construct(&pData->pNS)); /* here we may set another netstream driver (e.g. to do TLS) */ + CHKiRet(netstrms.ConstructFinalize(pData->pNS)); + + /* now create the actual stream and connect to the server */ + CHKiRet(netstrms.CreateStrm(pData->pNS, &pData->pNetstrm)); CHKiRet(netstrm.ConstructFinalize(pData->pNetstrm)); CHKiRet(netstrm.Connect(pData->pNetstrm, glbl.GetDefPFFamily(), (uchar*)pData->port, (uchar*)pData->f_hname)); } finalize_it: - if(iRet != RS_RET_OK) - netstrm.Destruct(&pData->pNetstrm); + if(iRet != RS_RET_OK) { + if(pData->pNetstrm != NULL) + netstrm.Destruct(&pData->pNetstrm); + if(pData->pNS != NULL) + netstrms.Destruct(&pData->pNS); + } RETiRet; } @@ -394,6 +405,8 @@ static rsRetVal loadTCPSupport(void) { DEFiRet; + if(!netstrms.ifIsLoaded) + CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME)); if(!netstrm.ifIsLoaded) CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME)); if(!tcpclt.ifIsLoaded) @@ -564,6 +577,8 @@ CODESTARTmodExit objRelease(net, LM_NET_FILENAME); if(netstrm.ifIsLoaded) objRelease(netstrm, LM_NETSTRM_FILENAME); + if(netstrms.ifIsLoaded) + objRelease(netstrms, LM_NETSTRMS_FILENAME); if(!tcpclt.ifIsLoaded) objRelease(tcpclt, LM_TCPCLT_FILENAME); -- cgit