summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-03-04 14:58:51 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-03-04 14:58:51 +0100
commit062c1ffbdce422c0df3b5314d25d935f1bd2a9e1 (patch)
tree4aacc6da01a86dbea578e9e54bea8db76e3bd6a4
parent75a585a7e26d4d6cf59157cf62584c139af1527c (diff)
parent9be853a2c8d0fd7fdc415200af57493ad5a00feb (diff)
downloadrsyslog-062c1ffbdce422c0df3b5314d25d935f1bd2a9e1.zip
rsyslog-062c1ffbdce422c0df3b5314d25d935f1bd2a9e1.tar.gz
rsyslog-062c1ffbdce422c0df3b5314d25d935f1bd2a9e1.tar.xz
Merge branch 'master-tcpsrv-mt'v6.1.5
Conflicts: ChangeLog
-rw-r--r--ChangeLog9
-rw-r--r--configure.ac1
-rw-r--r--runtime/debug.c40
-rw-r--r--runtime/netstrm.c2
-rw-r--r--runtime/netstrms.c1
-rw-r--r--runtime/nsd_gtls.c28
-rw-r--r--runtime/nsdpoll_ptcp.c9
-rw-r--r--runtime/nsdpoll_ptcp.h1
-rw-r--r--tcps_sess.c3
-rw-r--r--tcpsrv.c210
-rw-r--r--tests/Makefile.am13
-rwxr-xr-xtests/imtcp-tls-basic.sh11
-rw-r--r--tests/tcpflood.c202
13 files changed, 440 insertions, 90 deletions
diff --git a/ChangeLog b/ChangeLog
index 710cd52..01f53b3 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,13 @@
---------------------------------------------------------------------------
-Version 6.1.5 [DEVEL] (rgerhards), 2011-02-??
+Version 6.1.5 [DEVEL] (rgerhards), 2011-03-04
- improved testbench
+- enhanced imtcp to use a pool of worker threads to process incoming
+ messages. This enables higher processing rates, especially in the TLS
+ case (where more CPU is needed for the crypto functions)
+- added support for TLS (in anon mode) to tcpflood
+- improved TLS error reporting
+- improved TLS startup (Diffie-Hellman bits do not need to be generated,
+ as we do not support full anon key exchange -- we always need certs)
- bugfix: fixed a memory leak and potential abort condition
this could happen if multiple rulesets were used and some output batches
contained messages belonging to more than one ruleset.
diff --git a/configure.ac b/configure.ac
index 936a61f..bd0d6e6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -681,6 +681,7 @@ AC_ARG_ENABLE(gnutls,
)
if test "x$enable_gnutls" = "xyes"; then
PKG_CHECK_MODULES(GNUTLS, gnutls >= 1.4.0)
+ AC_DEFINE([ENABLE_GNUTLS], [1], [Indicator that GnuTLS is present])
fi
AM_CONDITIONAL(ENABLE_GNUTLS, test x$enable_gnutls = xyes)
AC_SUBST(GNUTLS_CFLAGS)
diff --git a/runtime/debug.c b/runtime/debug.c
index 3f1c23b..283dae3 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -843,12 +843,15 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
static int bWasNL = 0;
char pszThrdName[64]; /* 64 is to be on the safe side, anything over 20 is bad... */
char pszWriteBuf[32*1024];
+ size_t lenCopy;
+ size_t offsWriteBuf = 0;
size_t lenWriteBuf;
struct timespec t;
# if _POSIX_TIMERS <= 0
struct timeval tv;
# endif
+#if 1
/* The bWasNL handler does not really work. It works if no thread
* switching occurs during non-NL messages. Else, things are messed
* up. Anyhow, it works well enough to provide useful help during
@@ -859,8 +862,8 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
*/
if(ptLastThrdID != pthread_self()) {
if(!bWasNL) {
- if(stddbg != -1) write(stddbg, "\n", 1);
- if(altdbg != -1) write(altdbg, "\n", 1);
+ pszWriteBuf[0] = '\n';
+ offsWriteBuf = 1;
bWasNL = 1;
}
ptLastThrdID = pthread_self();
@@ -881,25 +884,28 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg)
t.tv_sec = tv.tv_sec;
t.tv_nsec = tv.tv_usec * 1000;
# endif
- lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf),
+ lenWriteBuf = snprintf(pszWriteBuf+offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf,
"%4.4ld.%9.9ld:", (long) (t.tv_sec % 10000), t.tv_nsec);
- if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
- if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
+ offsWriteBuf += lenWriteBuf;
}
- lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszThrdName);
- // use for testing: lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "{%ld}%s: ", (long) syscall(SYS_gettid), pszThrdName);
- if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
- if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
+ lenWriteBuf = snprintf(pszWriteBuf + offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf, "%s: ", pszThrdName);
+ offsWriteBuf += lenWriteBuf;
/* print object name header if we have an object */
if(pszObjName != NULL) {
- lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszObjName);
- if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
- if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
+ lenWriteBuf = snprintf(pszWriteBuf + offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf, "%s: ", pszObjName);
+ offsWriteBuf += lenWriteBuf;
}
}
- if(stddbg != -1) write(stddbg, pszMsg, lenMsg);
- if(altdbg != -1) write(altdbg, pszMsg, lenMsg);
+#endif
+ if(lenMsg > sizeof(pszWriteBuf) - offsWriteBuf)
+ lenCopy = sizeof(pszWriteBuf) - offsWriteBuf;
+ else
+ lenCopy = lenMsg;
+ memcpy(pszWriteBuf + offsWriteBuf, pszMsg, lenCopy);
+ offsWriteBuf += lenCopy;
+ if(stddbg != -1) write(stddbg, pszWriteBuf, offsWriteBuf);
+ if(altdbg != -1) write(altdbg, pszWriteBuf, offsWriteBuf);
bWasNL = (pszMsg[lenMsg - 1] == '\n') ? 1 : 0;
}
@@ -923,12 +929,12 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
pszObjName = obj.GetName(pObj);
}
- pthread_mutex_lock(&mutdbgprint);
- pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
+// pthread_mutex_lock(&mutdbgprint);
+// pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
do_dbgprint(pszObjName, pszMsg, lenMsg);
- pthread_cleanup_pop(1);
+// pthread_cleanup_pop(1);
}
#pragma GCC diagnostic warning "-Wempty-body"
diff --git a/runtime/netstrm.c b/runtime/netstrm.c
index 3658006..a6f840a 100644
--- a/runtime/netstrm.c
+++ b/runtime/netstrm.c
@@ -64,6 +64,7 @@ ENDobjConstruct(netstrm)
/* destructor for the netstrm object */
BEGINobjDestruct(netstrm) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(netstrm)
+//printf("destruct driver data %p\n", pThis->pDrvrData);
if(pThis->pDrvrData != NULL)
iRet = pThis->Drvr.Destruct(&pThis->pDrvrData);
ENDobjDestruct(netstrm)
@@ -169,6 +170,7 @@ Rcv(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrm);
+//printf("Rcv %p\n", pThis);
iRet = pThis->Drvr.Rcv(pThis->pDrvrData, pBuf, pLenBuf);
RETiRet;
}
diff --git a/runtime/netstrms.c b/runtime/netstrms.c
index e9ff256..56e492f 100644
--- a/runtime/netstrms.c
+++ b/runtime/netstrms.c
@@ -32,7 +32,6 @@
#include "rsyslog.h"
#include "module-template.h"
#include "obj.h"
-//#include "errmsg.h"
#include "nsd.h"
#include "netstrm.h"
#include "nssel.h"
diff --git a/runtime/nsd_gtls.c b/runtime/nsd_gtls.c
index 0ee70e5..b4e747b 100644
--- a/runtime/nsd_gtls.c
+++ b/runtime/nsd_gtls.c
@@ -50,7 +50,6 @@
#include "nsd_gtls.h"
/* things to move to some better place/functionality - TODO */
-#define DH_BITS 1024
#define CRLFILE "crl.pem"
@@ -81,7 +80,6 @@ static pthread_mutex_t mutGtlsStrerror; /**< a mutex protecting the potentially
/* ------------------------------ GnuTLS specifics ------------------------------ */
static gnutls_certificate_credentials xcred;
-static gnutls_dh_params dh_params;
#ifdef DEBUG
#if 0 /* uncomment, if needed some time again -- DEV Debug only */
@@ -609,7 +607,6 @@ gtlsInitSession(nsd_gtls_t *pThis)
/* request client certificate if any. */
gnutls_certificate_server_set_request( session, GNUTLS_CERT_REQUEST);
- gnutls_dh_set_prime_bits(session, DH_BITS);
pThis->sess = session;
@@ -618,23 +615,6 @@ finalize_it:
}
-static rsRetVal
-generate_dh_params(void)
-{
- int gnuRet;
- DEFiRet;
- /* Generate Diffie Hellman parameters - for use with DHE
- * kx algorithms. These should be discarded and regenerated
- * once a day, once a week or once a month. Depending on the
- * security requirements.
- */
- CHKgnutls(gnutls_dh_params_init( &dh_params));
- CHKgnutls(gnutls_dh_params_generate2( dh_params, DH_BITS));
-finalize_it:
- RETiRet;
-}
-
-
/* set up all global things that are needed for server operations
* rgerhards, 2008-04-30
*/
@@ -648,8 +628,6 @@ gtlsGlblInitLstn(void)
* considered legacy. -- rgerhards, 2008-05-05
*/
/*CHKgnutls(gnutls_certificate_set_x509_crl_file(xcred, CRLFILE, GNUTLS_X509_FMT_PEM));*/
- CHKiRet(generate_dh_params());
- gnutls_certificate_set_dh_params(xcred, dh_params); /* this is void */
bGlblSrvrInitDone = 1; /* we are all set now */
/* now we need to add our certificate */
@@ -1173,6 +1151,8 @@ CODESTARTobjDestruct(nsd_gtls)
gnutls_x509_crt_deinit(pThis->ourCert);
if(pThis->bOurKeyIsInit)
gnutls_x509_privkey_deinit(pThis->ourKey);
+#warning need more checks if the new gnutls_deinit() breaks things during normal operations
+// gnutls_deinit(pThis->sess); /* see ln 600 pThis->bInSess as something to check? */
ENDobjDestruct(nsd_gtls)
@@ -1418,6 +1398,10 @@ AcceptConnReq(nsd_t *pNsd, nsd_t **ppNew)
/* we got a handshake, now check authorization */
CHKiRet(gtlsChkPeerAuth(pNew));
} else {
+ uchar *pGnuErr = gtlsStrerror(gnuRet);
+ errmsg.LogError(0, RS_RET_TLS_HANDSHAKE_ERR,
+ "gnutls returned error on handshake: %s\n", pGnuErr);
+ free(pGnuErr);
ABORT_FINALIZE(RS_RET_TLS_HANDSHAKE_ERR);
}
diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c
index 26810b7..6fd92df 100644
--- a/runtime/nsdpoll_ptcp.c
+++ b/runtime/nsdpoll_ptcp.c
@@ -71,13 +71,16 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock,
pNew->pUsr = pUsr;
pNew->pSock = pSock;
pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */
+ //pNew->event.events = EPOLLET;
if(mode & NSDPOLL_IN)
pNew->event.events |= EPOLLIN;
if(mode & NSDPOLL_OUT)
pNew->event.events |= EPOLLOUT;
pNew->event.data.u64 = (uint64) pNew;
+ pthread_mutex_lock(&pThis->mutEvtLst);
pNew->pNext = pThis->pRoot;
pThis->pRoot = pNew;
+ pthread_mutex_unlock(&pThis->mutEvtLst);
*pEvtLst = pNew;
finalize_it:
@@ -94,6 +97,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **
nsdpoll_epollevt_lst_t *pPrev = NULL;
DEFiRet;
+ pthread_mutex_lock(&pThis->mutEvtLst);
pEvtLst = pThis->pRoot;
while(pEvtLst != NULL && !(pEvtLst->id == id && pEvtLst->pUsr == pUsr)) {
pPrev = pEvtLst;
@@ -111,6 +115,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **
pPrev->pNext = pEvtLst->pNext;
finalize_it:
+ pthread_mutex_unlock(&pThis->mutEvtLst);
RETiRet;
}
@@ -144,6 +149,7 @@ BEGINobjConstruct(nsdpoll_ptcp) /* be sure to specify the object type also in EN
DBGPRINTF("epoll_create1() could not create fd\n");
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
+ pthread_mutex_init(&pThis->mutEvtLst, NULL);
finalize_it:
ENDobjConstruct(nsdpoll_ptcp)
@@ -151,6 +157,9 @@ ENDobjConstruct(nsdpoll_ptcp)
/* destructor for the nsdpoll_ptcp object */
BEGINobjDestruct(nsdpoll_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(nsdpoll_ptcp)
+ //printf("ndspoll_ptcp destruct, event list root is %p\n", pThis->pRoot);
+#warning cleanup event list is missing! (at least I think so)
+ pthread_mutex_destroy(&pThis->mutEvtLst);
ENDobjDestruct(nsdpoll_ptcp)
diff --git a/runtime/nsdpoll_ptcp.h b/runtime/nsdpoll_ptcp.h
index cea2823..dfefad1 100644
--- a/runtime/nsdpoll_ptcp.h
+++ b/runtime/nsdpoll_ptcp.h
@@ -49,6 +49,7 @@ struct nsdpoll_ptcp_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
int efd; /* file descriptor used by epoll */
nsdpoll_epollevt_lst_t *pRoot; /* Root of the epoll event list */
+ pthread_mutex_t mutEvtLst;
};
/* interface is defined in nsd.h, we just implement it! */
diff --git a/tcps_sess.c b/tcps_sess.c
index 99af0cb..8b94488 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -95,6 +95,7 @@ finalize_it:
/* destructor for the tcps_sess object */
BEGINobjDestruct(tcps_sess) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(tcps_sess)
+//printf("sess %p destruct, pStrm %p\n", pThis, pThis->pStrm);
if(pThis->pStrm != NULL)
netstrm.Destruct(&pThis->pStrm);
@@ -337,6 +338,7 @@ Close(tcps_sess_t *pThis)
{
DEFiRet;
+//printf("sess %p close\n", pThis);
ISOBJ_TYPE_assert(pThis, tcps_sess);
netstrm.Destruct(&pThis->pStrm);
if(pThis->fromHost != NULL) {
@@ -466,6 +468,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen)
char *pEnd;
DEFiRet;
+//printf("DataRcvd: %p\n", pThis);
ISOBJ_TYPE_assert(pThis, tcps_sess);
assert(pData != NULL);
assert(iLen > 0);
diff --git a/tcpsrv.c b/tcpsrv.c
index 77b7b9b..698b52d 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -47,6 +47,7 @@
#include <ctype.h>
#include <netinet/in.h>
#include <netdb.h>
+#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#if HAVE_FCNTL_H
@@ -70,6 +71,7 @@
#include "ruleset.h"
#include "unicode-helper.h"
+
MODULE_TYPE_LIB
/* defines */
@@ -91,6 +93,23 @@ DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ int idx;
+ tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
+ nspoll_t *pPoll;
+ void *pUsr;
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[4];
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrMax = 4;
+static int wrkrRunning;
+
/* add new listener port to listener port list
* rgerhards, 2009-05-21
*/
@@ -510,7 +529,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
(*ppSess)->pStrm, pszPeer);
}
+ //pthread_mutex_lock(&mut);
CHKiRet(closeSess(pThis, ppSess, pPoll));
+ //pthread_mutex_unlock(&mut);
break;
case RS_RET_RETRY:
/* we simply ignore retry - this is not an error, but we also have not received anything */
@@ -538,7 +559,6 @@ finalize_it:
RETiRet;
}
-
/* process a single workset item
*/
static inline rsRetVal
@@ -550,12 +570,20 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr);
if(pUsr == pThis->ppLstn) {
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
- SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
- CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
- DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
+ if(iRet == RS_RET_OK) {
+ if(pPoll != NULL)
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ DBGPRINTF("tcpsrv: error %d during accept\n", iRet);
+ }
} else {
pNewSess = (tcps_sess_t*) pUsr;
doReceive(pThis, &pNewSess, pPoll);
+ if(pPoll == NULL && pNewSess == NULL) {
+ pThis->pSessions[idx] = NULL;
+ }
}
finalize_it:
@@ -563,22 +591,95 @@ finalize_it:
}
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1)
+ break;
+ pthread_mutex_unlock(&wrkrMut);
+
+ ++me->numCalled;
+ processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->pSrv = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
/* Process a workset, that is handle io. We become activated
* from either select or epoll handler. We split the workload
* out to a pool of threads, but try to avoid context switches
* as much as possible.
*/
-static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
+static rsRetVal
+processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
{
int i;
+ int origEntries = numEntries;
DEFiRet;
dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
- for(i = 0 ; i < numEntries ; i++) {
+ while(numEntries > 0) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr));
+ if(numEntries == 1) {
+ /* process self, save context switch */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr);
+ } else {
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i)
+ /*do search*/;
+ if(i < wrkrMax) {
+ /* worker free -> use it! */
+ wrkrInfo[i].pSrv = pThis;
+ wrkrInfo[i].pPoll = pPoll;
+ wrkrInfo[i].idx = workset[numEntries -1].id;
+ wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
+ /* Note: we must increment wrkrRunning HERE and not inside the worker's
+ * code. This is because a worker may actually never start, and thus
+ * increment wrkrRunning, before we finish and check the running worker
+ * count. We can only avoid this by incrementing it here.
+ */
+ ++wrkrRunning;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorksetItem(pThis, pPoll, workset[numEntries-1].id,
+ workset[numEntries-1].pUsr);
+ }
+ }
+ --numEntries;
+ }
+
+ if(origEntries > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_mutex_unlock(&wrkrMut);
}
finalize_it:
@@ -592,14 +693,14 @@ finalize_it:
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static inline rsRetVal
-RunSelect(tcpsrv_t *pThis)
+RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset)
{
DEFiRet;
int nfds;
int i;
+ int iWorkset;
int iTCPSess;
int bIsReady;
- tcps_sess_t *pNewSess;
nssel_t *pSel = NULL;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -633,13 +734,21 @@ RunSelect(tcpsrv_t *pThis)
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
+ iWorkset = 0;
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
+ workset[iWorkset].id = i;
+ workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
+ //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
--nfds; /* indicate we have processed one */
}
}
@@ -651,11 +760,22 @@ RunSelect(tcpsrv_t *pThis)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
+ workset[iWorkset].id = iTCPSess;
+ workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess];
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
--nfds; /* indicate we have processed one */
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
+
+ if(iWorkset > 0)
+ processWorkset(pThis, NULL, iWorkset, workset);
+
+ /* we need to copy back close descriptors */
CHKiRet(nssel.Destruct(&pSel));
finalize_it: /* this is a very special case - this time only we do not exit the function,
* because that would not help us either. So we simply retry it. Let's see
@@ -702,7 +822,7 @@ Run(tcpsrv_t *pThis)
if(localRet != RS_RET_OK) {
/* fall back to select */
dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
- iRet = RunSelect(pThis);
+ iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t));
FINALIZE;
}
@@ -732,26 +852,6 @@ Run(tcpsrv_t *pThis)
continue;
processWorkset(pThis, pPoll, numEntries, workset);
-#if 0
- dbgprintf("poll returned with %d entries.\n", numEntries);
-
- for(i = 0 ; i < numEntries ; i++) {
- if(glbl.GetGlobalInputTermState() == 1)
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- currIdx = workset[i].id;
- dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr);
-dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]);
- if(workset[i].pUsr == pThis->ppLstn) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]);
- SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]);
- CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
- DBGPRINTF("New session created with NSD %p.\n", pNewSess);
- } else {
- pNewSess = (tcps_sess_t*) workset[i].pUsr;
- doReceive(pThis, &pNewSess, pPoll);
- }
- }
-#endif
}
/* remove the tcp listen sockets from the epoll set */
@@ -1155,11 +1255,50 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
ENDObjClassInit(tcpsrv)
-/* --------------- here now comes the plumbing that makes as a library module --------------- */
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+startWorkerPool(void)
+{
+ int i;
+ wrkrRunning = 0;
+ pthread_mutex_init(&wrkrMut, NULL);
+ pthread_cond_init(&wrkrIdle, NULL);
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].pSrv = NULL;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i]));
+ }
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+
+
+/* --------------- here now comes the plumbing that makes as a library module --------------- */
BEGINmodExit
CODESTARTmodExit
+dbgprintf("tcpsrv: modExit\n");
+ stopWorkerPool();
+
/* de-init in reverse order! */
tcpsrvClassExit();
tcps_sessClassExit();
@@ -1179,6 +1318,9 @@ CODESTARTmodInit
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(tcps_sessClassInit(pModInfo));
CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+
+ startWorkerPool();
+
ENDmodInit
/* vim:set ai:
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a040a9d..059f951 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -58,6 +58,11 @@ TESTS += \
imptcp_conndrop.sh
endif
+if ENABLE_GNUTLS
+TESTS += \
+ imtcp-tls-basic.sh
+endif
+
if ENABLE_OMUXSOCK
TESTS += uxsock_simple.sh
endif
@@ -195,6 +200,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
testsuites/da-mainmsg-q.conf \
diskqueue-fsync.sh \
testsuites/diskqueue-fsync.conf \
+ imtcp-tls-basic.sh \
+ testsuites/imtcp-tls-basic.conf \
imtcp-multiport.sh \
testsuites/imtcp-multiport.conf \
udp-msgreduc-vg.sh \
@@ -344,7 +351,11 @@ uxsockrcvr_SOURCES = uxsockrcvr.c
uxsockrcvr_LDADD = $(SOL_LIBS)
tcpflood_SOURCES = tcpflood.c
-tcpflood_LDADD = $(SOL_LIBS) $(PTHREADS_LIBS)
+tcpflood_CPPFLAGS = $(PTHREADS_CFLAGS) $(GNUTLS_CFLAGS)
+tcpflood_LDADD = $(SOL_LIBS) $(PTHREADS_LIBS) $(GNUTLS_LIBS)
+if ENABLE_GNUTLS
+tcpflood_LDADD += -lgcrypt
+endif
syslog_caller_SOURCES = syslog_caller.c
syslog_caller_LDADD = $(SOL_LIBS)
diff --git a/tests/imtcp-tls-basic.sh b/tests/imtcp-tls-basic.sh
new file mode 100755
index 0000000..d00f95d
--- /dev/null
+++ b/tests/imtcp-tls-basic.sh
@@ -0,0 +1,11 @@
+# added 2011-02-28 by Rgerhards
+# This file is part of the rsyslog project, released under GPLv3
+echo ===============================================================================
+echo \[imtcp-tls-basic.sh\]: testing imtcp in TLS mode - basic test
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup imtcp-tls-basic.conf
+source $srcdir/diag.sh tcpflood -p13514 -m50000 -Ttls -Z./tls-certs/cert.pem -z./tls-certs/key.pem
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh wait-shutdown
+source $srcdir/diag.sh seq-check 0 49999
+source $srcdir/diag.sh exit
diff --git a/tests/tcpflood.c b/tests/tcpflood.c
index 7b376bd..59c63d2 100644
--- a/tests/tcpflood.c
+++ b/tests/tcpflood.c
@@ -48,6 +48,9 @@
* -b number of messages within a batch (default: 100,000,000 millions)
* -Y use multiple threads, one per connection (which means 1 if one only connection
* is configured!)
+ * -z private key file for TLS mode
+ * -Z cert (public key) file for TLS mode
+ * -L loglevel to use for GnuTLS troubleshooting (0-off to 10-all, 0 default)
*
* Part of the testbench for rsyslog.
*
@@ -85,6 +88,12 @@
#include <pthread.h>
#include <sys/resource.h>
#include <sys/time.h>
+#include <errno.h>
+#ifdef ENABLE_GNUTLS
+# include <gnutls/gnutls.h>
+# include <gcrypt.h>
+ GCRY_THREAD_OPTION_PTHREAD_IMPL;
+#endif
#define EXIT_FAILURE 1
#define INVALID_SOCKET -1
@@ -92,6 +101,7 @@
#define NETTEST_INPUT_CONF_FILE "nettest.input.conf" /* name of input file, must match $IncludeConfig in .conf files */
#define MAX_EXTRADATA_LEN 100*1024
+#define MAX_SENDBUF 2 * MAX_EXTRADATA_LEN
static char *targetIP = "127.0.0.1";
static char *msgPRI = "167";
@@ -122,6 +132,14 @@ static long long batchsize = 100000000ll;
static int waittime = 0;
static int runMultithreaded = 0; /* run tests in multithreaded mode */
static int numThrds = 1; /* number of threads to use */
+static char *tlsCertFile = NULL;
+static char *tlsKeyFile = NULL;
+static int tlsLogLevel = 0;
+
+#ifdef ENABLE_GNUTLS
+static gnutls_session_t *sessArray; /* array of TLS sessions to use */
+static gnutls_certificate_credentials tlscred;
+#endif
/* variables for managing multi-threaded operations */
int runningThreads; /* number of threads currently running */
@@ -151,7 +169,12 @@ struct runstats {
static int udpsock; /* socket for sending in UDP mode */
static struct sockaddr_in udpRcvr; /* remote receiver in UDP mode */
-static enum { TP_UDP, TP_TCP } transport = TP_TCP;
+static enum { TP_UDP, TP_TCP, TP_TLS } transport = TP_TCP;
+
+/* forward definitions */
+static void initTLSSess(int);
+static int sendTLS(int i, char *buf, int lenBuf);
+static void closeTLSSess(int __attribute__((unused)) i);
/* prepare send subsystem for UDP send */
static inline int
@@ -234,6 +257,9 @@ int openConnections(void)
if(bShowProgress)
write(1, " open connections", sizeof(" open connections")-1);
+# ifdef ENABLE_GNUTLS
+ sessArray = calloc(numConnections, sizeof(gnutls_session_t));
+# endif
sockArray = calloc(numConnections, sizeof(int));
for(i = 0 ; i < numConnections ; ++i) {
if(i % 10 == 0) {
@@ -244,6 +270,9 @@ int openConnections(void)
printf("error in trying to open connection i=%d\n", i);
return 1;
}
+ if(transport == TP_TLS) {
+ initTLSSess(i);
+ }
}
if(bShowProgress) {
lenMsg = sprintf(msgBuf, "\r%5.5d open connections\n", i);
@@ -268,7 +297,7 @@ void closeConnections(void)
struct linger ling;
char msgBuf[128];
- if(transport != TP_TCP)
+ if(transport == TP_UDP)
return;
if(bShowProgress)
@@ -287,6 +316,8 @@ void closeConnections(void)
ling.l_onoff = 1;
ling.l_linger = 1;
setsockopt(sockArray[i], SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+ if(transport == TP_TLS)
+ closeTLSSess(i);
close(sockArray[i]);
}
}
@@ -346,11 +377,9 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst)
/* use fixed message format from command line */
*pLenBuf = snprintf(buf, maxBuf, "%s\n", MsgToSend);
}
+ ++inst->numSent;
- if(inst->numSent++ >= inst->numMsgs)
- *pLenBuf = 0; /* indicate end of run */
-
-finalize_it: ;
+finalize_it: /*EMPTY to keep the compiler happy */;
}
/* send messages to the tcp connections we keep open. We use
@@ -369,6 +398,8 @@ int sendMessages(struct instdata *inst)
int lenSend = 0;
char *statusText = "";
char buf[MAX_EXTRADATA_LEN + 1024];
+ char sendBuf[MAX_SENDBUF];
+ int offsSendBuf = 0;
if(!bSilent) {
if(dataFile == NULL) {
@@ -382,22 +413,20 @@ int sendMessages(struct instdata *inst)
}
if(bShowProgress)
printf("\r%8.8d %s sent", 0, statusText);
- while(1) { /* broken inside loop! */
+ while(i < inst->numMsgs) {
if(runMultithreaded) {
socknum = inst->idx;
} else {
if(i < numConnections)
socknum = i;
- else if(i >= inst->numMsgs - numConnections)
+ else if(i >= inst->numMsgs - numConnections) {
socknum = i - (inst->numMsgs - numConnections);
- else {
+ } else {
int rnd = rand();
socknum = rnd % numConnections;
}
}
genMsg(buf, sizeof(buf), &lenBuf, inst); /* generate the message to send according to params */
- if(lenBuf == 0)
- break; /* end of processing! */
if(transport == TP_TCP) {
if(sockArray[socknum] == -1) {
/* connection was dropped, need to re-establish */
@@ -409,6 +438,17 @@ int sendMessages(struct instdata *inst)
lenSend = send(sockArray[socknum], buf, lenBuf, 0);
} else if(transport == TP_UDP) {
lenSend = sendto(udpsock, buf, lenBuf, 0, &udpRcvr, sizeof(udpRcvr));
+ } else if(transport == TP_TLS) {
+ if(offsSendBuf + lenBuf < MAX_SENDBUF) {
+ memcpy(sendBuf+offsSendBuf, buf, lenBuf);
+ offsSendBuf += lenBuf;
+ lenSend = lenBuf; /* simulate "good" call */
+ } else {
+ lenSend = sendTLS(socknum, sendBuf, offsSendBuf);
+ lenSend = (lenSend == offsSendBuf) ? lenBuf : -1;
+ memcpy(sendBuf, buf, lenBuf);
+ offsSendBuf = lenBuf;
+ }
}
if(lenSend != lenBuf) {
printf("\r%5.5d\n", i);
@@ -439,6 +479,10 @@ int sendMessages(struct instdata *inst)
++msgNum;
++i;
}
+ if(transport == TP_TLS && offsSendBuf != 0) {
+ /* send remaining buffer */
+ lenSend = sendTLS(socknum, sendBuf, offsSendBuf);
+ }
if(!bSilent)
printf("\r%8.8d %s sent\n", i, statusText);
@@ -643,6 +687,119 @@ runTests(void)
return 0;
}
+# if defined(ENABLE_GNUTLS)
+/* This defines a log function to be provided to GnuTLS. It hopefully
+ * helps us track down hard to find problems.
+ * rgerhards, 2008-06-20
+ */
+static void tlsLogFunction(int level, const char *msg)
+{
+ printf("GnuTLS (level %d): %s", level, msg);
+
+}
+
+
+/* global init GnuTLS
+ */
+static void
+initTLS(void)
+{
+ int r;
+
+ /* order of gcry_control and gnutls_global_init matters! */
+ gcry_control(GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+ gnutls_global_init();
+ /* set debug mode, if so required by the options */
+ if(tlsLogLevel > 0) {
+ gnutls_global_set_log_function(tlsLogFunction);
+ gnutls_global_set_log_level(tlsLogLevel);
+ }
+
+ r = gnutls_certificate_allocate_credentials(&tlscred);
+ if(r != GNUTLS_E_SUCCESS) {
+ printf("error allocating credentials\n");
+ gnutls_perror(r);
+ exit(1);
+ }
+ r = gnutls_certificate_set_x509_key_file(tlscred, tlsCertFile, tlsKeyFile, GNUTLS_X509_FMT_PEM);
+ if(r != GNUTLS_E_SUCCESS) {
+ printf("error setting certificate files -- have you mixed up key and certificate?\n");
+ printf("If in doubt, try swapping the files in -z/-Z\n");
+ printf("Certifcate is: '%s'\n", tlsCertFile);
+ printf("Key is: '%s'\n", tlsKeyFile);
+ gnutls_perror(r);
+ r = gnutls_certificate_set_x509_key_file(tlscred, tlsKeyFile, tlsCertFile,
+ GNUTLS_X509_FMT_PEM);
+ if(r == GNUTLS_E_SUCCESS) {
+ printf("Tried swapping files, this seems to work "
+ "(but results may be unpredictable!)\n");
+ } else {
+ exit(1);
+ }
+ }
+}
+
+
+static void
+initTLSSess(int i)
+{
+ int r;
+ gnutls_init(sessArray + i, GNUTLS_CLIENT);
+
+ /* Use default priorities */
+ gnutls_set_default_priority(sessArray[i]);
+
+ /* put our credentials to the current session */
+ r = gnutls_credentials_set(sessArray[i], GNUTLS_CRD_CERTIFICATE, tlscred);
+ if(r != GNUTLS_E_SUCCESS) {
+ fprintf (stderr, "Setting credentials failed\n");
+ gnutls_perror(r);
+ exit(1);
+ }
+
+ /* NOTE: the following statement generates a cast warning, but there seems to
+ * be no way around it with current GnuTLS. Do NOT try to "fix" the situation!
+ */
+ gnutls_transport_set_ptr(sessArray[i], (gnutls_transport_ptr_t) sockArray[i]);
+
+ /* Perform the TLS handshake */
+ r = gnutls_handshake(sessArray[i]);
+ if(r < 0) {
+ fprintf (stderr, "TLS Handshake failed\n");
+ gnutls_perror(r);
+ exit(1);
+ }
+}
+
+static int
+sendTLS(int i, char *buf, int lenBuf)
+{
+ int lenSent;
+ int r;
+
+ lenSent = 0;
+ while(lenSent != lenBuf) {
+ r = gnutls_record_send(sessArray[i], buf + lenSent, lenBuf - lenSent);
+ if(r < 0)
+ break;
+ lenSent += r;
+ }
+
+ return lenSent;
+}
+
+static void
+closeTLSSess(int i)
+{
+ gnutls_bye(sessArray[i], GNUTLS_SHUT_RDWR);
+ gnutls_deinit(sessArray[i]);
+}
+# else /* NO TLS available */
+static void initTLS(void) {}
+static void initTLSSess(int __attribute__((unused)) i) {}
+static int sendTLS(int i, char *buf, int lenBuf) { return 0; }
+static void closeTLSSess(int __attribute__((unused)) i) {}
+# endif
/* Run the test.
* rgerhards, 2009-04-03
@@ -666,7 +823,7 @@ int main(int argc, char *argv[])
setvbuf(stdout, buf, _IONBF, 48);
- while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:M:rsBR:S:T:XW:Y")) != -1) {
+ while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:L:M:rsBR:S:T:XW:Yz:Z:")) != -1) {
switch (opt) {
case 'b': batchsize = atoll(optarg);
break;
@@ -701,6 +858,8 @@ int main(int argc, char *argv[])
break;
case 'F': frameDelim = atoi(optarg);
break;
+ case 'L': tlsLogLevel = atoi(optarg);
+ break;
case 'M': MsgToSend = optarg;
break;
case 'I': dataFile = optarg;
@@ -725,8 +884,15 @@ int main(int argc, char *argv[])
transport = TP_UDP;
} else if(!strcmp(optarg, "tcp")) {
transport = TP_TCP;
+ } else if(!strcmp(optarg, "tls")) {
+# if defined(ENABLE_GNUTLS)
+ transport = TP_TLS;
+# else
+ fprintf(stderr, "compiled without TLS support!\n", optarg);
+ exit(1);
+# endif
} else {
- fprintf(stderr, "unkonwn transport '%s'\n", optarg);
+ fprintf(stderr, "unknown transport '%s'\n", optarg);
exit(1);
}
break;
@@ -734,6 +900,10 @@ int main(int argc, char *argv[])
break;
case 'Y': runMultithreaded = 1;
break;
+ case 'z': tlsKeyFile = optarg;
+ break;
+ case 'Z': tlsCertFile = optarg;
+ break;
default: printf("invalid option '%c' or value missing - terminating...\n", opt);
exit (1);
break;
@@ -769,6 +939,10 @@ int main(int argc, char *argv[])
}
}
+ if(transport == TP_TLS) {
+ initTLS();
+ }
+
if(openConnections() != 0) {
printf("error opening connections\n");
exit(1);
@@ -781,7 +955,7 @@ int main(int argc, char *argv[])
closeConnections(); /* this is important so that we do not finish too early! */
- if(nConnDrops > 0)
+ if(nConnDrops > 0 && !bSilent)
printf("-D option initiated %ld connection closures\n", nConnDrops);
if(!bSilent)