diff options
-rw-r--r-- | ChangeLog | 9 | ||||
-rw-r--r-- | configure.ac | 1 | ||||
-rw-r--r-- | runtime/debug.c | 40 | ||||
-rw-r--r-- | runtime/netstrm.c | 2 | ||||
-rw-r--r-- | runtime/netstrms.c | 1 | ||||
-rw-r--r-- | runtime/nsd_gtls.c | 28 | ||||
-rw-r--r-- | runtime/nsdpoll_ptcp.c | 9 | ||||
-rw-r--r-- | runtime/nsdpoll_ptcp.h | 1 | ||||
-rw-r--r-- | tcps_sess.c | 3 | ||||
-rw-r--r-- | tcpsrv.c | 210 | ||||
-rw-r--r-- | tests/Makefile.am | 13 | ||||
-rwxr-xr-x | tests/imtcp-tls-basic.sh | 11 | ||||
-rw-r--r-- | tests/tcpflood.c | 202 |
13 files changed, 440 insertions, 90 deletions
@@ -1,6 +1,13 @@ --------------------------------------------------------------------------- -Version 6.1.5 [DEVEL] (rgerhards), 2011-02-?? +Version 6.1.5 [DEVEL] (rgerhards), 2011-03-04 - improved testbench +- enhanced imtcp to use a pool of worker threads to process incoming + messages. This enables higher processing rates, especially in the TLS + case (where more CPU is needed for the crypto functions) +- added support for TLS (in anon mode) to tcpflood +- improved TLS error reporting +- improved TLS startup (Diffie-Hellman bits do not need to be generated, + as we do not support full anon key exchange -- we always need certs) - bugfix: fixed a memory leak and potential abort condition this could happen if multiple rulesets were used and some output batches contained messages belonging to more than one ruleset. diff --git a/configure.ac b/configure.ac index 936a61fc..bd0d6e62 100644 --- a/configure.ac +++ b/configure.ac @@ -681,6 +681,7 @@ AC_ARG_ENABLE(gnutls, ) if test "x$enable_gnutls" = "xyes"; then PKG_CHECK_MODULES(GNUTLS, gnutls >= 1.4.0) + AC_DEFINE([ENABLE_GNUTLS], [1], [Indicator that GnuTLS is present]) fi AM_CONDITIONAL(ENABLE_GNUTLS, test x$enable_gnutls = xyes) AC_SUBST(GNUTLS_CFLAGS) diff --git a/runtime/debug.c b/runtime/debug.c index 3f1c23bd..283dae3a 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -843,12 +843,15 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) static int bWasNL = 0; char pszThrdName[64]; /* 64 is to be on the safe side, anything over 20 is bad... */ char pszWriteBuf[32*1024]; + size_t lenCopy; + size_t offsWriteBuf = 0; size_t lenWriteBuf; struct timespec t; # if _POSIX_TIMERS <= 0 struct timeval tv; # endif +#if 1 /* The bWasNL handler does not really work. It works if no thread * switching occurs during non-NL messages. Else, things are messed * up. Anyhow, it works well enough to provide useful help during @@ -859,8 +862,8 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) */ if(ptLastThrdID != pthread_self()) { if(!bWasNL) { - if(stddbg != -1) write(stddbg, "\n", 1); - if(altdbg != -1) write(altdbg, "\n", 1); + pszWriteBuf[0] = '\n'; + offsWriteBuf = 1; bWasNL = 1; } ptLastThrdID = pthread_self(); @@ -881,25 +884,28 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) t.tv_sec = tv.tv_sec; t.tv_nsec = tv.tv_usec * 1000; # endif - lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), + lenWriteBuf = snprintf(pszWriteBuf+offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf, "%4.4ld.%9.9ld:", (long) (t.tv_sec % 10000), t.tv_nsec); - if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf); - if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf); + offsWriteBuf += lenWriteBuf; } - lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszThrdName); - // use for testing: lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "{%ld}%s: ", (long) syscall(SYS_gettid), pszThrdName); - if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf); - if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf); + lenWriteBuf = snprintf(pszWriteBuf + offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf, "%s: ", pszThrdName); + offsWriteBuf += lenWriteBuf; /* print object name header if we have an object */ if(pszObjName != NULL) { - lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszObjName); - if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf); - if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf); + lenWriteBuf = snprintf(pszWriteBuf + offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf, "%s: ", pszObjName); + offsWriteBuf += lenWriteBuf; } } - if(stddbg != -1) write(stddbg, pszMsg, lenMsg); - if(altdbg != -1) write(altdbg, pszMsg, lenMsg); +#endif + if(lenMsg > sizeof(pszWriteBuf) - offsWriteBuf) + lenCopy = sizeof(pszWriteBuf) - offsWriteBuf; + else + lenCopy = lenMsg; + memcpy(pszWriteBuf + offsWriteBuf, pszMsg, lenCopy); + offsWriteBuf += lenCopy; + if(stddbg != -1) write(stddbg, pszWriteBuf, offsWriteBuf); + if(altdbg != -1) write(altdbg, pszWriteBuf, offsWriteBuf); bWasNL = (pszMsg[lenMsg - 1] == '\n') ? 1 : 0; } @@ -923,12 +929,12 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg) pszObjName = obj.GetName(pObj); } - pthread_mutex_lock(&mutdbgprint); - pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint); +// pthread_mutex_lock(&mutdbgprint); +// pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint); do_dbgprint(pszObjName, pszMsg, lenMsg); - pthread_cleanup_pop(1); +// pthread_cleanup_pop(1); } #pragma GCC diagnostic warning "-Wempty-body" diff --git a/runtime/netstrm.c b/runtime/netstrm.c index 3658006f..a6f840a5 100644 --- a/runtime/netstrm.c +++ b/runtime/netstrm.c @@ -64,6 +64,7 @@ ENDobjConstruct(netstrm) /* destructor for the netstrm object */ BEGINobjDestruct(netstrm) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(netstrm) +//printf("destruct driver data %p\n", pThis->pDrvrData); if(pThis->pDrvrData != NULL) iRet = pThis->Drvr.Destruct(&pThis->pDrvrData); ENDobjDestruct(netstrm) @@ -169,6 +170,7 @@ Rcv(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf) { DEFiRet; ISOBJ_TYPE_assert(pThis, netstrm); +//printf("Rcv %p\n", pThis); iRet = pThis->Drvr.Rcv(pThis->pDrvrData, pBuf, pLenBuf); RETiRet; } diff --git a/runtime/netstrms.c b/runtime/netstrms.c index e9ff2568..56e492fe 100644 --- a/runtime/netstrms.c +++ b/runtime/netstrms.c @@ -32,7 +32,6 @@ #include "rsyslog.h" #include "module-template.h" #include "obj.h" -//#include "errmsg.h" #include "nsd.h" #include "netstrm.h" #include "nssel.h" diff --git a/runtime/nsd_gtls.c b/runtime/nsd_gtls.c index 0ee70e56..b4e747bf 100644 --- a/runtime/nsd_gtls.c +++ b/runtime/nsd_gtls.c @@ -50,7 +50,6 @@ #include "nsd_gtls.h" /* things to move to some better place/functionality - TODO */ -#define DH_BITS 1024 #define CRLFILE "crl.pem" @@ -81,7 +80,6 @@ static pthread_mutex_t mutGtlsStrerror; /**< a mutex protecting the potentially /* ------------------------------ GnuTLS specifics ------------------------------ */ static gnutls_certificate_credentials xcred; -static gnutls_dh_params dh_params; #ifdef DEBUG #if 0 /* uncomment, if needed some time again -- DEV Debug only */ @@ -609,7 +607,6 @@ gtlsInitSession(nsd_gtls_t *pThis) /* request client certificate if any. */ gnutls_certificate_server_set_request( session, GNUTLS_CERT_REQUEST); - gnutls_dh_set_prime_bits(session, DH_BITS); pThis->sess = session; @@ -618,23 +615,6 @@ finalize_it: } -static rsRetVal -generate_dh_params(void) -{ - int gnuRet; - DEFiRet; - /* Generate Diffie Hellman parameters - for use with DHE - * kx algorithms. These should be discarded and regenerated - * once a day, once a week or once a month. Depending on the - * security requirements. - */ - CHKgnutls(gnutls_dh_params_init( &dh_params)); - CHKgnutls(gnutls_dh_params_generate2( dh_params, DH_BITS)); -finalize_it: - RETiRet; -} - - /* set up all global things that are needed for server operations * rgerhards, 2008-04-30 */ @@ -648,8 +628,6 @@ gtlsGlblInitLstn(void) * considered legacy. -- rgerhards, 2008-05-05 */ /*CHKgnutls(gnutls_certificate_set_x509_crl_file(xcred, CRLFILE, GNUTLS_X509_FMT_PEM));*/ - CHKiRet(generate_dh_params()); - gnutls_certificate_set_dh_params(xcred, dh_params); /* this is void */ bGlblSrvrInitDone = 1; /* we are all set now */ /* now we need to add our certificate */ @@ -1173,6 +1151,8 @@ CODESTARTobjDestruct(nsd_gtls) gnutls_x509_crt_deinit(pThis->ourCert); if(pThis->bOurKeyIsInit) gnutls_x509_privkey_deinit(pThis->ourKey); +#warning need more checks if the new gnutls_deinit() breaks things during normal operations +// gnutls_deinit(pThis->sess); /* see ln 600 pThis->bInSess as something to check? */ ENDobjDestruct(nsd_gtls) @@ -1418,6 +1398,10 @@ AcceptConnReq(nsd_t *pNsd, nsd_t **ppNew) /* we got a handshake, now check authorization */ CHKiRet(gtlsChkPeerAuth(pNew)); } else { + uchar *pGnuErr = gtlsStrerror(gnuRet); + errmsg.LogError(0, RS_RET_TLS_HANDSHAKE_ERR, + "gnutls returned error on handshake: %s\n", pGnuErr); + free(pGnuErr); ABORT_FINALIZE(RS_RET_TLS_HANDSHAKE_ERR); } diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index 26810b7d..6fd92df1 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -71,13 +71,16 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, pNew->pUsr = pUsr; pNew->pSock = pSock; pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */ + //pNew->event.events = EPOLLET; if(mode & NSDPOLL_IN) pNew->event.events |= EPOLLIN; if(mode & NSDPOLL_OUT) pNew->event.events |= EPOLLOUT; pNew->event.data.u64 = (uint64) pNew; + pthread_mutex_lock(&pThis->mutEvtLst); pNew->pNext = pThis->pRoot; pThis->pRoot = pNew; + pthread_mutex_unlock(&pThis->mutEvtLst); *pEvtLst = pNew; finalize_it: @@ -94,6 +97,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t ** nsdpoll_epollevt_lst_t *pPrev = NULL; DEFiRet; + pthread_mutex_lock(&pThis->mutEvtLst); pEvtLst = pThis->pRoot; while(pEvtLst != NULL && !(pEvtLst->id == id && pEvtLst->pUsr == pUsr)) { pPrev = pEvtLst; @@ -111,6 +115,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t ** pPrev->pNext = pEvtLst->pNext; finalize_it: + pthread_mutex_unlock(&pThis->mutEvtLst); RETiRet; } @@ -144,6 +149,7 @@ BEGINobjConstruct(nsdpoll_ptcp) /* be sure to specify the object type also in EN DBGPRINTF("epoll_create1() could not create fd\n"); ABORT_FINALIZE(RS_RET_IO_ERROR); } + pthread_mutex_init(&pThis->mutEvtLst, NULL); finalize_it: ENDobjConstruct(nsdpoll_ptcp) @@ -151,6 +157,9 @@ ENDobjConstruct(nsdpoll_ptcp) /* destructor for the nsdpoll_ptcp object */ BEGINobjDestruct(nsdpoll_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(nsdpoll_ptcp) + //printf("ndspoll_ptcp destruct, event list root is %p\n", pThis->pRoot); +#warning cleanup event list is missing! (at least I think so) + pthread_mutex_destroy(&pThis->mutEvtLst); ENDobjDestruct(nsdpoll_ptcp) diff --git a/runtime/nsdpoll_ptcp.h b/runtime/nsdpoll_ptcp.h index cea2823d..dfefad1b 100644 --- a/runtime/nsdpoll_ptcp.h +++ b/runtime/nsdpoll_ptcp.h @@ -49,6 +49,7 @@ struct nsdpoll_ptcp_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ int efd; /* file descriptor used by epoll */ nsdpoll_epollevt_lst_t *pRoot; /* Root of the epoll event list */ + pthread_mutex_t mutEvtLst; }; /* interface is defined in nsd.h, we just implement it! */ diff --git a/tcps_sess.c b/tcps_sess.c index 99af0cb8..8b944885 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -95,6 +95,7 @@ finalize_it: /* destructor for the tcps_sess object */ BEGINobjDestruct(tcps_sess) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(tcps_sess) +//printf("sess %p destruct, pStrm %p\n", pThis, pThis->pStrm); if(pThis->pStrm != NULL) netstrm.Destruct(&pThis->pStrm); @@ -337,6 +338,7 @@ Close(tcps_sess_t *pThis) { DEFiRet; +//printf("sess %p close\n", pThis); ISOBJ_TYPE_assert(pThis, tcps_sess); netstrm.Destruct(&pThis->pStrm); if(pThis->fromHost != NULL) { @@ -466,6 +468,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) char *pEnd; DEFiRet; +//printf("DataRcvd: %p\n", pThis); ISOBJ_TYPE_assert(pThis, tcps_sess); assert(pData != NULL); assert(iLen > 0); @@ -47,6 +47,7 @@ #include <ctype.h> #include <netinet/in.h> #include <netdb.h> +#include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #if HAVE_FCNTL_H @@ -70,6 +71,7 @@ #include "ruleset.h" #include "unicode-helper.h" + MODULE_TYPE_LIB /* defines */ @@ -91,6 +93,23 @@ DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + int idx; + tcpsrv_t *pSrv; /* pSrv == NULL -> idle */ + nspoll_t *pPoll; + void *pUsr; + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[4]; +static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; +static int wrkrMax = 4; +static int wrkrRunning; + /* add new listener port to listener port list * rgerhards, 2009-05-21 */ @@ -510,7 +529,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } + //pthread_mutex_lock(&mut); CHKiRet(closeSess(pThis, ppSess, pPoll)); + //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: /* we simply ignore retry - this is not an error, but we also have not received anything */ @@ -538,7 +559,6 @@ finalize_it: RETiRet; } - /* process a single workset item */ static inline rsRetVal @@ -550,12 +570,20 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); if(pUsr == pThis->ppLstn) { DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); - SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); - DBGPRINTF("New session created with NSD %p.\n", pNewSess); + iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); + if(iRet == RS_RET_OK) { + if(pPoll != NULL) + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + DBGPRINTF("tcpsrv: error %d during accept\n", iRet); + } } else { pNewSess = (tcps_sess_t*) pUsr; doReceive(pThis, &pNewSess, pPoll); + if(pPoll == NULL && pNewSess == NULL) { + pThis->pSessions[idx] = NULL; + } } finalize_it: @@ -563,22 +591,95 @@ finalize_it: } +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + pthread_mutex_unlock(&wrkrMut); + + ++me->numCalled; + processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr); + + pthread_mutex_lock(&wrkrMut); + me->pSrv = NULL; /* indicate we are free again */ + --wrkrRunning; + pthread_cond_signal(&wrkrIdle); + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + /* Process a workset, that is handle io. We become activated * from either select or epoll handler. We split the workload * out to a pool of threads, but try to avoid context switches * as much as possible. */ -static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) +static rsRetVal +processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) { int i; + int origEntries = numEntries; DEFiRet; dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); - for(i = 0 ; i < numEntries ; i++) { + while(numEntries > 0) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr)); + if(numEntries == 1) { + /* process self, save context switch */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); + } else { + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) + /*do search*/; + if(i < wrkrMax) { + /* worker free -> use it! */ + wrkrInfo[i].pSrv = pThis; + wrkrInfo[i].pPoll = pPoll; + wrkrInfo[i].idx = workset[numEntries -1].id; + wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; + /* Note: we must increment wrkrRunning HERE and not inside the worker's + * code. This is because a worker may actually never start, and thus + * increment wrkrRunning, before we finish and check the running worker + * count. We can only avoid this by incrementing it here. + */ + ++wrkrRunning; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, + workset[numEntries-1].pUsr); + } + } + --numEntries; + } + + if(origEntries > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); + while(wrkrRunning > 0) { + pthread_cond_wait(&wrkrIdle, &wrkrMut); + } + pthread_mutex_unlock(&wrkrMut); } finalize_it: @@ -592,14 +693,14 @@ finalize_it: */ #pragma GCC diagnostic ignored "-Wempty-body" static inline rsRetVal -RunSelect(tcpsrv_t *pThis) +RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) { DEFiRet; int nfds; int i; + int iWorkset; int iTCPSess; int bIsReady; - tcps_sess_t *pNewSess; nssel_t *pSel = NULL; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -633,13 +734,21 @@ RunSelect(tcpsrv_t *pThis) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ + iWorkset = 0; for(i = 0 ; i < pThis->iLstnCurr ; ++i) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + workset[iWorkset].id = i; + workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */ + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } + //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); --nfds; /* indicate we have processed one */ } } @@ -651,11 +760,22 @@ RunSelect(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); + workset[iWorkset].id = iTCPSess; + workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess]; + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } + + if(iWorkset > 0) + processWorkset(pThis, NULL, iWorkset, workset); + + /* we need to copy back close descriptors */ CHKiRet(nssel.Destruct(&pSel)); finalize_it: /* this is a very special case - this time only we do not exit the function, * because that would not help us either. So we simply retry it. Let's see @@ -702,7 +822,7 @@ Run(tcpsrv_t *pThis) if(localRet != RS_RET_OK) { /* fall back to select */ dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); - iRet = RunSelect(pThis); + iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); FINALIZE; } @@ -732,26 +852,6 @@ Run(tcpsrv_t *pThis) continue; processWorkset(pThis, pPoll, numEntries, workset); -#if 0 - dbgprintf("poll returned with %d entries.\n", numEntries); - - for(i = 0 ; i < numEntries ; i++) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - currIdx = workset[i].id; - dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr); -dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]); - if(workset[i].pUsr == pThis->ppLstn) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]); - SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); - DBGPRINTF("New session created with NSD %p.\n", pNewSess); - } else { - pNewSess = (tcps_sess_t*) workset[i].pUsr; - doReceive(pThis, &pNewSess, pPoll); - } - } -#endif } /* remove the tcp listen sockets from the epoll set */ @@ -1155,11 +1255,50 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE ENDObjClassInit(tcpsrv) -/* --------------- here now comes the plumbing that makes as a library module --------------- */ +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + wrkrRunning = 0; + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrIdle, NULL); + for(i = 0 ; i < wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].pSrv = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + } +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrIdle); + pthread_mutex_destroy(&wrkrMut); + +} + + +/* --------------- here now comes the plumbing that makes as a library module --------------- */ BEGINmodExit CODESTARTmodExit +dbgprintf("tcpsrv: modExit\n"); + stopWorkerPool(); + /* de-init in reverse order! */ tcpsrvClassExit(); tcps_sessClassExit(); @@ -1179,6 +1318,9 @@ CODESTARTmodInit /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(tcps_sessClassInit(pModInfo)); CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ + + startWorkerPool(); + ENDmodInit /* vim:set ai: diff --git a/tests/Makefile.am b/tests/Makefile.am index a040a9d9..059f951f 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -58,6 +58,11 @@ TESTS += \ imptcp_conndrop.sh endif +if ENABLE_GNUTLS +TESTS += \ + imtcp-tls-basic.sh +endif + if ENABLE_OMUXSOCK TESTS += uxsock_simple.sh endif @@ -195,6 +200,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ testsuites/da-mainmsg-q.conf \ diskqueue-fsync.sh \ testsuites/diskqueue-fsync.conf \ + imtcp-tls-basic.sh \ + testsuites/imtcp-tls-basic.conf \ imtcp-multiport.sh \ testsuites/imtcp-multiport.conf \ udp-msgreduc-vg.sh \ @@ -344,7 +351,11 @@ uxsockrcvr_SOURCES = uxsockrcvr.c uxsockrcvr_LDADD = $(SOL_LIBS) tcpflood_SOURCES = tcpflood.c -tcpflood_LDADD = $(SOL_LIBS) $(PTHREADS_LIBS) +tcpflood_CPPFLAGS = $(PTHREADS_CFLAGS) $(GNUTLS_CFLAGS) +tcpflood_LDADD = $(SOL_LIBS) $(PTHREADS_LIBS) $(GNUTLS_LIBS) +if ENABLE_GNUTLS +tcpflood_LDADD += -lgcrypt +endif syslog_caller_SOURCES = syslog_caller.c syslog_caller_LDADD = $(SOL_LIBS) diff --git a/tests/imtcp-tls-basic.sh b/tests/imtcp-tls-basic.sh new file mode 100755 index 00000000..d00f95d6 --- /dev/null +++ b/tests/imtcp-tls-basic.sh @@ -0,0 +1,11 @@ +# added 2011-02-28 by Rgerhards +# This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== +echo \[imtcp-tls-basic.sh\]: testing imtcp in TLS mode - basic test +source $srcdir/diag.sh init +source $srcdir/diag.sh startup imtcp-tls-basic.conf +source $srcdir/diag.sh tcpflood -p13514 -m50000 -Ttls -Z./tls-certs/cert.pem -z./tls-certs/key.pem +source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +source $srcdir/diag.sh wait-shutdown +source $srcdir/diag.sh seq-check 0 49999 +source $srcdir/diag.sh exit diff --git a/tests/tcpflood.c b/tests/tcpflood.c index 7b376bdd..59c63d23 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -48,6 +48,9 @@ * -b number of messages within a batch (default: 100,000,000 millions) * -Y use multiple threads, one per connection (which means 1 if one only connection * is configured!) + * -z private key file for TLS mode + * -Z cert (public key) file for TLS mode + * -L loglevel to use for GnuTLS troubleshooting (0-off to 10-all, 0 default) * * Part of the testbench for rsyslog. * @@ -85,6 +88,12 @@ #include <pthread.h> #include <sys/resource.h> #include <sys/time.h> +#include <errno.h> +#ifdef ENABLE_GNUTLS +# include <gnutls/gnutls.h> +# include <gcrypt.h> + GCRY_THREAD_OPTION_PTHREAD_IMPL; +#endif #define EXIT_FAILURE 1 #define INVALID_SOCKET -1 @@ -92,6 +101,7 @@ #define NETTEST_INPUT_CONF_FILE "nettest.input.conf" /* name of input file, must match $IncludeConfig in .conf files */ #define MAX_EXTRADATA_LEN 100*1024 +#define MAX_SENDBUF 2 * MAX_EXTRADATA_LEN static char *targetIP = "127.0.0.1"; static char *msgPRI = "167"; @@ -122,6 +132,14 @@ static long long batchsize = 100000000ll; static int waittime = 0; static int runMultithreaded = 0; /* run tests in multithreaded mode */ static int numThrds = 1; /* number of threads to use */ +static char *tlsCertFile = NULL; +static char *tlsKeyFile = NULL; +static int tlsLogLevel = 0; + +#ifdef ENABLE_GNUTLS +static gnutls_session_t *sessArray; /* array of TLS sessions to use */ +static gnutls_certificate_credentials tlscred; +#endif /* variables for managing multi-threaded operations */ int runningThreads; /* number of threads currently running */ @@ -151,7 +169,12 @@ struct runstats { static int udpsock; /* socket for sending in UDP mode */ static struct sockaddr_in udpRcvr; /* remote receiver in UDP mode */ -static enum { TP_UDP, TP_TCP } transport = TP_TCP; +static enum { TP_UDP, TP_TCP, TP_TLS } transport = TP_TCP; + +/* forward definitions */ +static void initTLSSess(int); +static int sendTLS(int i, char *buf, int lenBuf); +static void closeTLSSess(int __attribute__((unused)) i); /* prepare send subsystem for UDP send */ static inline int @@ -234,6 +257,9 @@ int openConnections(void) if(bShowProgress) write(1, " open connections", sizeof(" open connections")-1); +# ifdef ENABLE_GNUTLS + sessArray = calloc(numConnections, sizeof(gnutls_session_t)); +# endif sockArray = calloc(numConnections, sizeof(int)); for(i = 0 ; i < numConnections ; ++i) { if(i % 10 == 0) { @@ -244,6 +270,9 @@ int openConnections(void) printf("error in trying to open connection i=%d\n", i); return 1; } + if(transport == TP_TLS) { + initTLSSess(i); + } } if(bShowProgress) { lenMsg = sprintf(msgBuf, "\r%5.5d open connections\n", i); @@ -268,7 +297,7 @@ void closeConnections(void) struct linger ling; char msgBuf[128]; - if(transport != TP_TCP) + if(transport == TP_UDP) return; if(bShowProgress) @@ -287,6 +316,8 @@ void closeConnections(void) ling.l_onoff = 1; ling.l_linger = 1; setsockopt(sockArray[i], SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)); + if(transport == TP_TLS) + closeTLSSess(i); close(sockArray[i]); } } @@ -346,11 +377,9 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst) /* use fixed message format from command line */ *pLenBuf = snprintf(buf, maxBuf, "%s\n", MsgToSend); } + ++inst->numSent; - if(inst->numSent++ >= inst->numMsgs) - *pLenBuf = 0; /* indicate end of run */ - -finalize_it: ; +finalize_it: /*EMPTY to keep the compiler happy */; } /* send messages to the tcp connections we keep open. We use @@ -369,6 +398,8 @@ int sendMessages(struct instdata *inst) int lenSend = 0; char *statusText = ""; char buf[MAX_EXTRADATA_LEN + 1024]; + char sendBuf[MAX_SENDBUF]; + int offsSendBuf = 0; if(!bSilent) { if(dataFile == NULL) { @@ -382,22 +413,20 @@ int sendMessages(struct instdata *inst) } if(bShowProgress) printf("\r%8.8d %s sent", 0, statusText); - while(1) { /* broken inside loop! */ + while(i < inst->numMsgs) { if(runMultithreaded) { socknum = inst->idx; } else { if(i < numConnections) socknum = i; - else if(i >= inst->numMsgs - numConnections) + else if(i >= inst->numMsgs - numConnections) { socknum = i - (inst->numMsgs - numConnections); - else { + } else { int rnd = rand(); socknum = rnd % numConnections; } } genMsg(buf, sizeof(buf), &lenBuf, inst); /* generate the message to send according to params */ - if(lenBuf == 0) - break; /* end of processing! */ if(transport == TP_TCP) { if(sockArray[socknum] == -1) { /* connection was dropped, need to re-establish */ @@ -409,6 +438,17 @@ int sendMessages(struct instdata *inst) lenSend = send(sockArray[socknum], buf, lenBuf, 0); } else if(transport == TP_UDP) { lenSend = sendto(udpsock, buf, lenBuf, 0, &udpRcvr, sizeof(udpRcvr)); + } else if(transport == TP_TLS) { + if(offsSendBuf + lenBuf < MAX_SENDBUF) { + memcpy(sendBuf+offsSendBuf, buf, lenBuf); + offsSendBuf += lenBuf; + lenSend = lenBuf; /* simulate "good" call */ + } else { + lenSend = sendTLS(socknum, sendBuf, offsSendBuf); + lenSend = (lenSend == offsSendBuf) ? lenBuf : -1; + memcpy(sendBuf, buf, lenBuf); + offsSendBuf = lenBuf; + } } if(lenSend != lenBuf) { printf("\r%5.5d\n", i); @@ -439,6 +479,10 @@ int sendMessages(struct instdata *inst) ++msgNum; ++i; } + if(transport == TP_TLS && offsSendBuf != 0) { + /* send remaining buffer */ + lenSend = sendTLS(socknum, sendBuf, offsSendBuf); + } if(!bSilent) printf("\r%8.8d %s sent\n", i, statusText); @@ -643,6 +687,119 @@ runTests(void) return 0; } +# if defined(ENABLE_GNUTLS) +/* This defines a log function to be provided to GnuTLS. It hopefully + * helps us track down hard to find problems. + * rgerhards, 2008-06-20 + */ +static void tlsLogFunction(int level, const char *msg) +{ + printf("GnuTLS (level %d): %s", level, msg); + +} + + +/* global init GnuTLS + */ +static void +initTLS(void) +{ + int r; + + /* order of gcry_control and gnutls_global_init matters! */ + gcry_control(GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); + gnutls_global_init(); + /* set debug mode, if so required by the options */ + if(tlsLogLevel > 0) { + gnutls_global_set_log_function(tlsLogFunction); + gnutls_global_set_log_level(tlsLogLevel); + } + + r = gnutls_certificate_allocate_credentials(&tlscred); + if(r != GNUTLS_E_SUCCESS) { + printf("error allocating credentials\n"); + gnutls_perror(r); + exit(1); + } + r = gnutls_certificate_set_x509_key_file(tlscred, tlsCertFile, tlsKeyFile, GNUTLS_X509_FMT_PEM); + if(r != GNUTLS_E_SUCCESS) { + printf("error setting certificate files -- have you mixed up key and certificate?\n"); + printf("If in doubt, try swapping the files in -z/-Z\n"); + printf("Certifcate is: '%s'\n", tlsCertFile); + printf("Key is: '%s'\n", tlsKeyFile); + gnutls_perror(r); + r = gnutls_certificate_set_x509_key_file(tlscred, tlsKeyFile, tlsCertFile, + GNUTLS_X509_FMT_PEM); + if(r == GNUTLS_E_SUCCESS) { + printf("Tried swapping files, this seems to work " + "(but results may be unpredictable!)\n"); + } else { + exit(1); + } + } +} + + +static void +initTLSSess(int i) +{ + int r; + gnutls_init(sessArray + i, GNUTLS_CLIENT); + + /* Use default priorities */ + gnutls_set_default_priority(sessArray[i]); + + /* put our credentials to the current session */ + r = gnutls_credentials_set(sessArray[i], GNUTLS_CRD_CERTIFICATE, tlscred); + if(r != GNUTLS_E_SUCCESS) { + fprintf (stderr, "Setting credentials failed\n"); + gnutls_perror(r); + exit(1); + } + + /* NOTE: the following statement generates a cast warning, but there seems to + * be no way around it with current GnuTLS. Do NOT try to "fix" the situation! + */ + gnutls_transport_set_ptr(sessArray[i], (gnutls_transport_ptr_t) sockArray[i]); + + /* Perform the TLS handshake */ + r = gnutls_handshake(sessArray[i]); + if(r < 0) { + fprintf (stderr, "TLS Handshake failed\n"); + gnutls_perror(r); + exit(1); + } +} + +static int +sendTLS(int i, char *buf, int lenBuf) +{ + int lenSent; + int r; + + lenSent = 0; + while(lenSent != lenBuf) { + r = gnutls_record_send(sessArray[i], buf + lenSent, lenBuf - lenSent); + if(r < 0) + break; + lenSent += r; + } + + return lenSent; +} + +static void +closeTLSSess(int i) +{ + gnutls_bye(sessArray[i], GNUTLS_SHUT_RDWR); + gnutls_deinit(sessArray[i]); +} +# else /* NO TLS available */ +static void initTLS(void) {} +static void initTLSSess(int __attribute__((unused)) i) {} +static int sendTLS(int i, char *buf, int lenBuf) { return 0; } +static void closeTLSSess(int __attribute__((unused)) i) {} +# endif /* Run the test. * rgerhards, 2009-04-03 @@ -666,7 +823,7 @@ int main(int argc, char *argv[]) setvbuf(stdout, buf, _IONBF, 48); - while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:M:rsBR:S:T:XW:Y")) != -1) { + while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:L:M:rsBR:S:T:XW:Yz:Z:")) != -1) { switch (opt) { case 'b': batchsize = atoll(optarg); break; @@ -701,6 +858,8 @@ int main(int argc, char *argv[]) break; case 'F': frameDelim = atoi(optarg); break; + case 'L': tlsLogLevel = atoi(optarg); + break; case 'M': MsgToSend = optarg; break; case 'I': dataFile = optarg; @@ -725,8 +884,15 @@ int main(int argc, char *argv[]) transport = TP_UDP; } else if(!strcmp(optarg, "tcp")) { transport = TP_TCP; + } else if(!strcmp(optarg, "tls")) { +# if defined(ENABLE_GNUTLS) + transport = TP_TLS; +# else + fprintf(stderr, "compiled without TLS support!\n", optarg); + exit(1); +# endif } else { - fprintf(stderr, "unkonwn transport '%s'\n", optarg); + fprintf(stderr, "unknown transport '%s'\n", optarg); exit(1); } break; @@ -734,6 +900,10 @@ int main(int argc, char *argv[]) break; case 'Y': runMultithreaded = 1; break; + case 'z': tlsKeyFile = optarg; + break; + case 'Z': tlsCertFile = optarg; + break; default: printf("invalid option '%c' or value missing - terminating...\n", opt); exit (1); break; @@ -769,6 +939,10 @@ int main(int argc, char *argv[]) } } + if(transport == TP_TLS) { + initTLS(); + } + if(openConnections() != 0) { printf("error opening connections\n"); exit(1); @@ -781,7 +955,7 @@ int main(int argc, char *argv[]) closeConnections(); /* this is important so that we do not finish too early! */ - if(nConnDrops > 0) + if(nConnDrops > 0 && !bSilent) printf("-D option initiated %ld connection closures\n", nConnDrops); if(!bSilent) |