summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/netstrm.c9
-rw-r--r--runtime/netstrm.h3
-rw-r--r--runtime/nsd.h3
-rw-r--r--runtime/nsd_ptcp.c60
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--tcps_sess.h2
-rw-r--r--tcpsrv.c23
7 files changed, 72 insertions, 29 deletions
diff --git a/runtime/netstrm.c b/runtime/netstrm.c
index f0bdab78..c8335fa4 100644
--- a/runtime/netstrm.c
+++ b/runtime/netstrm.c
@@ -140,11 +140,16 @@ finalize_it:
* rgerhards, 2008-04-22
*/
static rsRetVal
-LstnInit(void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*), uchar *pLstnPort, uchar *pLstnIP, int iSessMax)
+LstnInit(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*),
+ uchar *pLstnPort, uchar *pLstnIP, int iSessMax)
{
DEFiRet;
+
+ ISOBJ_TYPE_assert(pNS, netstrms);
+ assert(fAddLstn != NULL);
assert(pLstnPort != NULL);
- //CHKiRet(pThis->Drvr.LstnInit(pUsr, fAddLstn, pLstnPort, pLstnIP, iSessMax));
+
+ CHKiRet(pNS->Drvr.LstnInit(pNS, pUsr, fAddLstn, pLstnPort, pLstnIP, iSessMax));
finalize_it:
RETiRet;
diff --git a/runtime/netstrm.h b/runtime/netstrm.h
index b87228d2..33d166ea 100644
--- a/runtime/netstrm.h
+++ b/runtime/netstrm.h
@@ -48,7 +48,8 @@ BEGINinterface(netstrm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*ConstructFinalize)(netstrm_t *pThis);
rsRetVal (*Destruct)(netstrm_t **ppThis);
rsRetVal (*AbortDestruct)(netstrm_t **ppThis);
- rsRetVal (*LstnInit)(void *pUsr, rsRetVal(*)(void*,netstrm_t*), uchar *pLstnPort, uchar *pLstnIP, int iSessMax);
+ rsRetVal (*LstnInit)(netstrms_t *pNS, void *pUsr, rsRetVal(*)(void*,netstrm_t*),
+ uchar *pLstnPort, uchar *pLstnIP, int iSessMax);
rsRetVal (*AcceptConnReq)(netstrm_t *pThis, netstrm_t **ppNew);
rsRetVal (*Rcv)(netstrm_t *pThis, uchar *pRcvBuf, ssize_t *pLenBuf);
rsRetVal (*Send)(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf);
diff --git a/runtime/nsd.h b/runtime/nsd.h
index db61780f..c32e284e 100644
--- a/runtime/nsd.h
+++ b/runtime/nsd.h
@@ -46,7 +46,8 @@ BEGINinterface(nsd) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Rcv)(nsd_t *pThis, uchar *pRcvBuf, ssize_t *pLenBuf);
rsRetVal (*Send)(nsd_t *pThis, uchar *pBuf, ssize_t *pLenBuf);
rsRetVal (*Connect)(nsd_t *pThis, int family, unsigned char *port, unsigned char *host);
- rsRetVal (*LstnInit)(nsd_t ***parrLstn, int *pLstnArrSize, uchar *pLstnPort, uchar *pLstnIP, int iSessMax);
+ rsRetVal (*LstnInit)(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*),
+ uchar *pLstnPort, uchar *pLstnIP, int iSessMax);
rsRetVal (*AcceptConnReq)(nsd_t *pThis, nsd_t **ppThis);
ENDinterface(nsd)
#define nsdCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
diff --git a/runtime/nsd_ptcp.c b/runtime/nsd_ptcp.c
index c737c168..40b11ad4 100644
--- a/runtime/nsd_ptcp.c
+++ b/runtime/nsd_ptcp.c
@@ -45,6 +45,8 @@
#include "obj.h"
#include "errmsg.h"
#include "net.h"
+#include "netstrms.h"
+#include "netstrm.h"
#include "nsd_ptcp.h"
MODULE_TYPE_LIB
@@ -54,6 +56,8 @@ DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(net)
+DEFobjCurrIf(netstrms)
+DEFobjCurrIf(netstrm)
/* a few deinit helpers */
@@ -243,24 +247,29 @@ finalize_it:
}
-/* initialize tcp sockets for a listner. This function returns an array of nds_t
- * objects. The size of this array is returend in pLstnArrSize.
+/* initialize tcp sockets for a listner. The initialized sockets are passed to the
+ * app-level caller via a callback.
* pLstnPort must point to a port name or number. NULL is NOT permitted. pLstnIP
* points to the port to listen to (NULL means "all"), iMaxSess has the maximum
* number of sessions permitted.
* rgerhards, 2008-04-22
*/
static rsRetVal
-LstnInit(nsd_t ***parrLstnNsd, int *pLstnArrSize, uchar *pLstnPort, uchar *pLstnIP, int iSessMax)
+LstnInit(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*),
+ uchar *pLstnPort, uchar *pLstnIP, int iSessMax)
{
DEFiRet;
- struct addrinfo hints, *res = NULL, *r;
nsd_ptcp_t **arrLstn = NULL;
+ netstrm_t *pNewStrm = NULL;
+ nsd_t *pNewNsd = NULL;
int error, maxs, on = 1;
int sock;
+ int numSocks;
int sockflags;
+ struct addrinfo hints, *res = NULL, *r;
- assert(parrLstnNsd != NULL);
+ ISOBJ_TYPE_assert(pNS, netstrms);
+ assert(fAddLstn != NULL);
assert(pLstnPort != NULL);
assert(iSessMax >= 0);
@@ -282,7 +291,7 @@ LstnInit(nsd_t ***parrLstnNsd, int *pLstnArrSize, uchar *pLstnPort, uchar *pLstn
/* EMPTY */;
CHKmalloc(arrLstn = (nsd_ptcp_t**) malloc((maxs+1) * sizeof(nsd_ptcp_t*)));
- *pLstnArrSize = 0; /* num of sockets counter at start of array */
+ numSocks = 0; /* num of sockets counter at start of array */
for(r = res; r != NULL ; r = r->ai_next) {
sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol);
if(sock < 0) {
@@ -366,38 +375,39 @@ LstnInit(nsd_t ***parrLstnNsd, int *pLstnArrSize, uchar *pLstnPort, uchar *pLstn
}
}
- /* if we reach this point, we were able to obtain a valid socket, which we
- * now can save to the array of listen sockets. -- rgerhards, 2008-04-22
+ /* if we reach this point, we were able to obtain a valid socket, so we can
+ * construct a new netstrm obj and hand it over to the upper layers for inclusion
+ * into their socket array. -- rgerhards, 2008-04-23
*/
- CHKiRet(nsd_ptcpConstruct(arrLstn+*pLstnArrSize));
- arrLstn[*pLstnArrSize]->sock = sock;
- ++(*pLstnArrSize);
+ CHKiRet(pNS->Drvr.Construct(&pNewNsd));
+ ((nsd_ptcp_t*)pNewNsd)->sock = sock;
+ CHKiRet(netstrms.CreateStrm(pNS, &pNewStrm));
+ pNewStrm->pDrvrData = (nsd_t*) pNewNsd;
+ CHKiRet(fAddLstn(pUsr, pNewStrm));
+ pNewNsd = NULL;
+ pNewStrm = NULL;
}
if(res != NULL)
freeaddrinfo(res);
- if(*pLstnArrSize != maxs)
+ if(numSocks != maxs)
dbgprintf("We could initialize %d TCP listen sockets out of %d we received "
- "- this may or may not be an error indication.\n", *pLstnArrSize, maxs);
+ "- this may or may not be an error indication.\n", numSocks, maxs);
- if(*pLstnArrSize == 0) {
- dbgprintf("No TCP listen sockets could successfully be initialized, "
- "message reception disabled.\n");
+ if(numSocks == 0) {
+ dbgprintf("No TCP listen sockets could successfully be initialized");
ABORT_FINALIZE(RS_RET_COULD_NOT_BIND);
}
- *parrLstnNsd = (nsd_t**) arrLstn;
- arrLstn = NULL; /* prevent from being freed in error handler */
-
finalize_it:
if(iRet != RS_RET_OK) {
if(res != NULL)
freeaddrinfo(res);
- if(arrLstn != NULL) {
- for(maxs = 0 ; maxs < *pLstnArrSize ; ++maxs)
- nsd_ptcpDestruct(arrLstn+*pLstnArrSize);
- }
+ if(pNewStrm != NULL)
+ netstrm.Destruct(&pNewStrm);
+ if(pNewNsd != NULL)
+ pNS->Drvr.Destruct(&pNewNsd);
}
RETiRet;
@@ -537,6 +547,8 @@ CODESTARTObjClassExit(nsd_ptcp)
objRelease(net, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
+ objRelease(netstrm, LM_NETSTRM_FILENAME);
+ objRelease(netstrms, LM_NETSTRMS_FILENAME);
ENDObjClassExit(nsd_ptcp)
@@ -549,6 +561,8 @@ BEGINObjClassInit(nsd_ptcp, 1, OBJ_IS_LOADABLE_MODULE) /* class, version */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(net, CORE_COMPONENT));
+ CHKiRet(objUse(netstrm, LM_NETSTRM_FILENAME));
+ CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
/* set our own handlers */
ENDObjClassInit(nsd_ptcp)
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 6bffae4b..6cd9d94d 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -213,6 +213,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_INVALID_PORT = -2076, /**< invalid port value */
RS_RET_COULD_NOT_BIND = -2077, /**< could not bind socket, defunct */
RS_RET_MAX_SESS_REACHED = -2078, /**< max nbr of sessions reached, can not create more */
+ RS_RET_MAX_LSTN_REACHED = -2079, /**< max nbr of listeners reached, can not create more */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/tcps_sess.h b/tcps_sess.h
index 25884ea2..7baa035a 100644
--- a/tcps_sess.h
+++ b/tcps_sess.h
@@ -32,7 +32,7 @@ struct tcpsrv_s;
typedef struct tcps_sess_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
struct tcpsrv_s *pSrv; /* pointer back to my server (e.g. for callbacks) */
- int sock; // TODO: remove
+ //int sock; // TODO: remove
netstrm_t *pStrm;
int iMsg; /* index of next char to store in msg */
int bAtStrtOfFram; /* are we at the very beginning of a new frame? */
diff --git a/tcpsrv.c b/tcpsrv.c
index 973b59fa..b6fb35fc 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -236,7 +236,15 @@ addTcpLstn(void *pUsr, netstrm_t *pLstn)
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
+ ISOBJ_TYPE_assert(pLstn, netstrm);
+ if(pThis->iLstnMax >= TCPLSTN_MAX_DEFAULT)
+ ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED);
+
+ pThis->ppLstn[pThis->iLstnMax] = pLstn;
+ ++pThis->iLstnMax;
+
+finalize_it:
RETiRet;
}
@@ -264,7 +272,20 @@ create_tcp_socket(tcpsrv_t *pThis)
TCPLstnPort = (uchar*)pThis->TCPLstnPort;
/* TODO: add capability to specify local listen address! */
- CHKiRet(netstrm.LstnInit((void*)pThis, addTcpLstn, TCPLstnPort, NULL, pThis->iSessMax));
+ CHKiRet(netstrm.LstnInit(pThis->pNS, (void*)pThis, addTcpLstn, TCPLstnPort, NULL, pThis->iSessMax));
+
+
+ /* OK, we had success. Now it is also time to
+ * initialize our connections
+ */
+ if(TCPSessTblInit(pThis) != 0) {
+ /* OK, we are in some trouble - we could not initialize the
+ * session table, so we can not continue. We need to free all
+ * we have assigned so far, because we can not really use it...
+ */
+ errmsg.LogError(NO_ERRCODE, "Could not initialize TCP session table, suspending TCP message reception.");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
finalize_it:
RETiRet;