summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c242
1 files changed, 184 insertions, 58 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index 107d03f4..e8ea2b98 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -1,8 +1,11 @@
/* tcpsrv.c
*
- * Common code for plain TCP based servers. This is currently being
+ * Common code for plain TCP syslog based servers. This is currently being
* utilized by imtcp and imgssapi.
*
+ * NOTE: this is *not* a generic TCP server, but one for syslog servers. For
+ * generic stream servers, please use ./runtime/strmsrv.c!
+ *
* There are actually two classes within the tcpserver code: one is
* the tcpsrv itself, the other one is its sessions. This is a helper
* class to tcpsrv.
@@ -17,7 +20,7 @@
*
* File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007, 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007, 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -66,6 +69,8 @@
#include "netstrm.h"
#include "nssel.h"
#include "errmsg.h"
+#include "ruleset.h"
+#include "unicode-helper.h"
MODULE_TYPE_LIB
@@ -77,51 +82,75 @@ MODULE_TYPE_LIB
DEFobjStaticHelpers
DEFobjCurrIf(conf)
DEFobjCurrIf(glbl)
+DEFobjCurrIf(ruleset)
DEFobjCurrIf(tcps_sess)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(net)
DEFobjCurrIf(netstrms)
DEFobjCurrIf(netstrm)
DEFobjCurrIf(nssel)
+DEFobjCurrIf(prop)
-/* configure TCP listener settings. This is called during command
- * line parsing. The argument following -t is supplied as an argument.
- * The format of this argument is
- * "<port-to-use>, <nbr-of-sessions>"
- * Typically, there is no whitespace between port and session number.
- * (but it may be...).
- * NOTE: you can not use dbgprintf() in here - the dbgprintf() system is
- * not yet initilized when this function is called.
- * rgerhards, 2007-06-21
- * The port in cOptarg is handed over to us - the caller MUST NOT free it!
+/* add new listener port to listener port list
+ * rgerhards, 2009-05-21
+ */
+static inline rsRetVal
+addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
+{
+ tcpLstnPortList_t *pEntry;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ /* create entry */
+ CHKmalloc(pEntry = malloc(sizeof(tcpLstnPortList_t)));
+ pEntry->pszPort = pszPort;
+ pEntry->pSrv = pThis;
+ pEntry->pRuleset = pThis->pRuleset;
+
+ /* we need to create a property */
+ CHKiRet(prop.Construct(&pEntry->pInputName));
+ CHKiRet(prop.SetString(pEntry->pInputName, pThis->pszInputName, ustrlen(pThis->pszInputName)));
+ CHKiRet(prop.ConstructFinalize(pEntry->pInputName));
+
+ /* and add to list */
+ pEntry->pNext = pThis->pLstnPorts;
+ pThis->pLstnPorts = pEntry;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* configure TCP listener settings.
+ * Note: pszPort is handed over to us - the caller MUST NOT free it!
* rgerhards, 2008-03-20
*/
-static void
-configureTCPListen(tcpsrv_t *pThis, char *cOptarg)
+static rsRetVal
+configureTCPListen(tcpsrv_t *pThis, uchar *pszPort)
{
- register int i;
- register char *pArg = cOptarg;
+ int i;
+ uchar *pPort = pszPort;
+ DEFiRet;
- assert(cOptarg != NULL);
+ assert(pszPort != NULL);
ISOBJ_TYPE_assert(pThis, tcpsrv);
/* extract port */
i = 0;
- while(isdigit((int) *pArg)) {
- i = i * 10 + *pArg++ - '0';
+ while(isdigit((int) *pPort)) {
+ i = i * 10 + *pPort++ - '0';
}
- if(pThis->TCPLstnPort != NULL) {
- free(pThis->TCPLstnPort);
- pThis->TCPLstnPort = NULL;
- }
-
- if( i >= 0 && i <= 65535) {
- pThis->TCPLstnPort = cOptarg;
+ if(i >= 0 && i <= 65535) {
+ CHKiRet(addNewLstnPort(pThis, pszPort));
} else {
- errmsg.LogError(0, NO_ERRCODE, "Invalid TCP listen port %s - changed to 514.\n", cOptarg);
+ errmsg.LogError(0, NO_ERRCODE, "Invalid TCP listen port %s - ignored.\n", pszPort);
}
+
+finalize_it:
+ RETiRet;
}
@@ -202,6 +231,8 @@ TCPSessGetNxtSess(tcpsrv_t *pThis, int iCurr)
static void deinit_tcp_listener(tcpsrv_t *pThis)
{
int i;
+ tcpLstnPortList_t *pEntry;
+ tcpLstnPortList_t *pDel;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -219,8 +250,15 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
pThis->pSessions = NULL; /* just to make sure... */
}
- if(pThis->TCPLstnPort != NULL)
- free(pThis->TCPLstnPort);
+ /* free list of tcp listen ports */
+ pEntry = pThis->pLstnPorts;
+ while(pEntry != NULL) {
+ free(pEntry->pszPort);
+ prop.Destruct(&pEntry->pInputName);
+ pDel = pEntry;
+ pEntry = pEntry->pNext;
+ free(pDel);
+ }
/* finally close our listen streams */
for(i = 0 ; i < pThis->iLstnMax ; ++i) {
@@ -235,7 +273,8 @@ static void deinit_tcp_listener(tcpsrv_t *pThis)
static rsRetVal
addTcpLstn(void *pUsr, netstrm_t *pLstn)
{
- tcpsrv_t *pThis = (tcpsrv_t*) pUsr;
+ tcpLstnPortList_t *pPortList = (tcpLstnPortList_t *) pUsr;
+ tcpsrv_t *pThis = pPortList->pSrv;
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -245,6 +284,7 @@ addTcpLstn(void *pUsr, netstrm_t *pLstn)
ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED);
pThis->ppLstn[pThis->iLstnMax] = pLstn;
+ pThis->ppLstnPort[pThis->iLstnMax] = pPortList;
++pThis->iLstnMax;
finalize_it:
@@ -252,19 +292,20 @@ finalize_it:
}
-/* Initialize TCP sockets (for listener) and listens on them */
-static rsRetVal
-create_tcp_socket(tcpsrv_t *pThis)
+/* Initialize TCP listener socket for a single port
+ * rgerhards, 2009-05-21
+ */
+static inline rsRetVal
+initTCPListener(tcpsrv_t *pThis, tcpLstnPortList_t *pPortEntry)
{
DEFiRet;
uchar *TCPLstnPort;
ISOBJ_TYPE_assert(pThis, tcpsrv);
+ assert(pPortEntry != NULL);
- if(!strcmp((char*)pThis->TCPLstnPort, "0"))
- TCPLstnPort = (uchar*)"514";
- // TODO: we need to enable the caller to set a port (based on who is
- // using this, 514 may be totally unsuitable... --- rgerhards, 2008-04-22
+ if(!ustrcmp(pPortEntry->pszPort, UCHAR_CONSTANT("0")))
+ TCPLstnPort = UCHAR_CONSTANT("514");
/* use default - we can not do service db update, because there is
* no IANA-assignment for syslog/tcp. In the long term, we might
* re-use RFC 3195 port of 601, but that would probably break to
@@ -272,10 +313,31 @@ create_tcp_socket(tcpsrv_t *pThis)
* rgerhards, 2007-06-28
*/
else
- TCPLstnPort = (uchar*)pThis->TCPLstnPort;
+ TCPLstnPort = pPortEntry->pszPort;
/* TODO: add capability to specify local listen address! */
- CHKiRet(netstrm.LstnInit(pThis->pNS, (void*)pThis, addTcpLstn, TCPLstnPort, NULL, pThis->iSessMax));
+ CHKiRet(netstrm.LstnInit(pThis->pNS, (void*)pPortEntry, addTcpLstn, TCPLstnPort, NULL, pThis->iSessMax));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Initialize TCP sockets (for listener) and listens on them */
+static rsRetVal
+create_tcp_socket(tcpsrv_t *pThis)
+{
+ tcpLstnPortList_t *pEntry;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ /* init all configured ports */
+ pEntry = pThis->pLstnPorts;
+ while(pEntry != NULL) {
+ CHKiRet(initTCPListener(pThis, pEntry));
+ pEntry = pEntry->pNext;
+ }
/* OK, we had success. Now it is also time to
* initialize our connections
@@ -297,7 +359,7 @@ finalize_it:
/* Accept new TCP connection; make entry in session table. If there
* is no more space left in the connection table, the new TCP
* connection is immediately dropped.
- * ppSess has a pointer to the newly created session, if it succeds.
+ * ppSess has a pointer to the newly created session, if it succeeds.
* If it does not succeed, no session is created and ppSess is
* undefined. If the user has provided an OnSessAccept Callback,
* this one is executed immediately after creation of the
@@ -305,7 +367,7 @@ finalize_it:
* rgerhards, 2008-03-02
*/
static rsRetVal
-SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, netstrm_t *pStrm)
+SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, netstrm_t *pStrm)
{
DEFiRet;
tcps_sess_t *pSess = NULL;
@@ -316,6 +378,7 @@ SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, netstrm_t *pStrm)
uchar *fromHostIP = NULL;
ISOBJ_TYPE_assert(pThis, tcpsrv);
+ assert(pLstnInfo != NULL);
CHKiRet(netstrm.AcceptConnReq(pStrm, &pNewStrm));
@@ -325,13 +388,15 @@ SessAccept(tcpsrv_t *pThis, tcps_sess_t **ppSess, netstrm_t *pStrm)
errno = 0;
errmsg.LogError(0, RS_RET_MAX_SESS_REACHED, "too many tcp sessions - dropping incoming request");
ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
- } else {
- /* we found a free spot and can construct our session object */
- CHKiRet(tcps_sess.Construct(&pSess));
- CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
}
- /* OK, we have a "good" index... */
+ /* we found a free spot and can construct our session object */
+ CHKiRet(tcps_sess.Construct(&pSess));
+ CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
+ CHKiRet(tcps_sess.SetLstnInfo(pSess, pLstnInfo));
+ if(pThis->OnMsgReceive != NULL)
+ CHKiRet(tcps_sess.SetOnMsgReceive(pSess, pThis->OnMsgReceive));
+
/* get the host name */
CHKiRet(netstrm.GetRemoteHName(pNewStrm, &fromHostFQDN));
CHKiRet(netstrm.GetRemoteIP(pNewStrm, &fromHostIP));
@@ -376,12 +441,10 @@ finalize_it:
if(iRet != RS_RET_OK) {
if(pSess != NULL)
tcps_sess.Destruct(&pSess);
- if(fromHostFQDN != NULL)
- free(fromHostFQDN);
- if(fromHostIP != NULL)
- free(fromHostIP);
if(pNewStrm != NULL)
netstrm.Destruct(&pNewStrm);
+ free(fromHostFQDN);
+ free(fromHostIP);
}
RETiRet;
@@ -418,6 +481,7 @@ Run(tcpsrv_t *pThis)
* this thread. Thus, we also need to instantiate a cancel cleanup handler
* to prevent us from leaking anything. -- rgerharsd, 20080-04-24
*/
+RUNLOG_STR("XXXX: tcp server runs\n");
pthread_cleanup_push(RunCancelCleanup, (void*) &pSel);
while(1) {
CHKiRet(nssel.Construct(&pSel));
@@ -438,6 +502,7 @@ Run(tcpsrv_t *pThis)
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
+RUNLOG_STR("XXXX: tcp server select\n");
/* wait for io to become ready */
CHKiRet(nssel.Wait(pSel, &nfds));
@@ -445,17 +510,18 @@ Run(tcpsrv_t *pThis)
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
dbgprintf("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, &pNewSess, pThis->ppLstn[i]);
+ SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
--nfds; /* indicate we have processed one */
}
}
+RUNLOG_STR("XXXX: tcp server post select\n");
/* now check the sessions */
iTCPSess = TCPSessGetNxtSess(pThis, -1);
while(nfds && iTCPSess != -1) {
CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- char buf[8*1024]; /* reception buffer - may hold a partial or multiple messages */
+ char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm);
/* Receive message */
@@ -513,6 +579,8 @@ finalize_it: /* this is a very special case - this time only we do not exit the
/* Standard-Constructor */
BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
pThis->iSessMax = TCPSESS_MAX_DEFAULT; /* TODO: useful default ;) */
+ pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ pThis->OnMsgReceive = NULL;
ENDobjConstruct(tcpsrv)
@@ -535,6 +603,7 @@ tcpsrvConstructFinalize(tcpsrv_t *pThis)
/* set up listeners */
CHKmalloc(pThis->ppLstn = calloc(TCPLSTN_MAX_DEFAULT, sizeof(netstrm_t*)));
+ CHKmalloc(pThis->ppLstnPort = calloc(TCPLSTN_MAX_DEFAULT, sizeof(tcpLstnPortList_t*)));
iRet = pThis->OpenLstnSocks(pThis);
finalize_it:
@@ -556,10 +625,10 @@ CODESTARTobjDestruct(tcpsrv)
if(pThis->pNS != NULL)
netstrms.Destruct(&pThis->pNS);
- if(pThis->pszDrvrAuthMode != NULL)
- free(pThis->pszDrvrAuthMode);
- if(pThis->ppLstn != NULL)
- free(pThis->ppLstn);
+ free(pThis->pszDrvrAuthMode);
+ free(pThis->ppLstn);
+ free(pThis->ppLstnPort);
+ free(pThis->pszInputName);
ENDobjDestruct(tcpsrv)
@@ -657,6 +726,55 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr)
RETiRet;
}
+static rsRetVal
+SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
+{
+ DEFiRet;
+ assert(OnMsgReceive != NULL);
+ pThis->OnMsgReceive = OnMsgReceive;
+ RETiRet;
+}
+
+
+
+/* Set additional framing to use (if any) -- rgerhards, 2008-12-10 */
+static rsRetVal
+SetAddtlFrameDelim(tcpsrv_t *pThis, int iDelim)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->addtlFrameDelim = iDelim;
+ RETiRet;
+}
+
+
+/* Set the input name to use -- rgerhards, 2008-12-10 */
+static rsRetVal
+SetInputName(tcpsrv_t *pThis, uchar *name)
+{
+ uchar *pszName;
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ if(name == NULL)
+ pszName = NULL;
+ else
+ CHKmalloc(pszName = ustrdup(name));
+ free(pThis->pszInputName);
+ pThis->pszInputName = pszName;
+finalize_it:
+ RETiRet;
+}
+
+
+/* Set the ruleset (ptr) to use */
+static rsRetVal
+SetRuleset(tcpsrv_t *pThis, ruleset_t *pRuleset)
+{
+ DEFiRet;
+ pThis->pRuleset = pRuleset;
+ RETiRet;
+}
+
/* here follows a number of methods that shuffle authentication settings down
* to the drivers. Drivers not supporting these settings may return an error
@@ -680,7 +798,7 @@ SetDrvrAuthMode(tcpsrv_t *pThis, uchar *mode)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
- CHKmalloc(pThis->pszDrvrAuthMode = (uchar*)strdup((char*)mode));
+ CHKmalloc(pThis->pszDrvrAuthMode = ustrdup(mode));
finalize_it:
RETiRet;
}
@@ -735,12 +853,14 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->ConstructFinalize = tcpsrvConstructFinalize;
pIf->Destruct = tcpsrvDestruct;
- pIf->SessAccept = SessAccept;
+ //pIf->SessAccept = SessAccept;
pIf->configureTCPListen = configureTCPListen;
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;
pIf->SetUsrP = SetUsrP;
+ pIf->SetInputName = SetInputName;
+ pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
pIf->SetSessMax = SetSessMax;
pIf->SetDrvrMode = SetDrvrMode;
pIf->SetDrvrAuthMode = SetDrvrAuthMode;
@@ -755,6 +875,8 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->SetCBOnDestruct = SetCBOnDestruct;
pIf->SetCBOnRegularClose = SetCBOnRegularClose;
pIf->SetCBOnErrClose = SetCBOnErrClose;
+ pIf->SetOnMsgReceive = SetOnMsgReceive;
+ pIf->SetRuleset = SetRuleset;
finalize_it:
ENDobjQueryInterface(tcpsrv)
@@ -768,6 +890,8 @@ CODESTARTObjClassExit(tcpsrv)
/* release objects we no longer need */
objRelease(tcps_sess, DONT_LOAD_LIB);
objRelease(conf, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
objRelease(netstrms, DONT_LOAD_LIB);
@@ -791,6 +915,8 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB));
CHKiRet(objUse(conf, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
/* set our own handlers */
OBJSetMethodHandler(objMethod_DEBUGPRINT, tcpsrvDebugPrint);