From c6447bb3452e9ac3c4daed5827f711c2c6956ca4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 27 Jan 2011 16:33:13 +0100 Subject: experimental: added thread pool to tcpsrv epoll handler this seems to work in lab, but is brand-new code. needs practice drill. --- runtime/nsdpoll_ptcp.c | 3 +- tcpsrv.c | 149 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 126 insertions(+), 26 deletions(-) diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index 26810b7d..76c4e887 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -70,7 +70,8 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, pNew->id = id; pNew->pUsr = pUsr; pNew->pSock = pSock; - pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */ + //pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */ + pNew->event.events = EPOLLET; if(mode & NSDPOLL_IN) pNew->event.events |= EPOLLIN; if(mode & NSDPOLL_OUT) diff --git a/tcpsrv.c b/tcpsrv.c index 77b7b9be..baaa27b1 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -47,6 +47,7 @@ #include #include #include +#include #include #include #if HAVE_FCNTL_H @@ -91,6 +92,23 @@ DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + int idx; + tcpsrv_t *pSrv; /* pSrv == NULL -> idle */ + nspoll_t *pPoll; + void *pUsr; + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[4]; +static pthread_cond_t wrkrFinished; +static pthread_mutex_t wrkrMut; +static int wrkrMax = 4; +static int wrkrRunning; + /* add new listener port to listener port list * rgerhards, 2009-05-21 */ @@ -538,7 +556,6 @@ finalize_it: RETiRet; } - /* process a single workset item */ static inline rsRetVal @@ -563,22 +580,83 @@ finalize_it: } +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + ++wrkrRunning; + pthread_mutex_unlock(&wrkrMut); + +dbgprintf("XXX: worker %p activated\n", pthread_self()); + ++me->numCalled; + processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr); + + pthread_mutex_lock(&wrkrMut); + me->pSrv = NULL; /* indicate we are free again */ + --wrkrRunning; + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + /* Process a workset, that is handle io. We become activated * from either select or epoll handler. We split the workload * out to a pool of threads, but try to avoid context switches * as much as possible. */ -static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) +static rsRetVal +processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) { int i; DEFiRet; dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); - for(i = 0 ; i < numEntries ; i++) { + while(numEntries > 0) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr)); +dbgprintf("XXX: num entries during processing %d\n", numEntries); + if(numEntries == 1) { +dbgprintf("XXX: processWorkset 1\n"); + /* process self, save context switch */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); + } else { +dbgprintf("XXX: processWorkset 2\n"); + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) +dbgprintf("XXX: wrkrinfo[%d].pSrv %p\n", i, wrkrInfo[i].pSrv); + /*do search*/; + if(i < wrkrMax) { +dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax); + /* worker free -> use it! */ + wrkrInfo[i].pSrv = pThis; + wrkrInfo[i].pPoll = pPoll; + wrkrInfo[i].idx = workset[numEntries -1].id; + wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { +dbgprintf("XXX: processWorkset 2.2\n"); + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, + workset[numEntries-1].pUsr); + } + } + --numEntries; } finalize_it: @@ -732,26 +810,6 @@ Run(tcpsrv_t *pThis) continue; processWorkset(pThis, pPoll, numEntries, workset); -#if 0 - dbgprintf("poll returned with %d entries.\n", numEntries); - - for(i = 0 ; i < numEntries ; i++) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - currIdx = workset[i].id; - dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr); -dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]); - if(workset[i].pUsr == pThis->ppLstn) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]); - SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); - DBGPRINTF("New session created with NSD %p.\n", pNewSess); - } else { - pNewSess = (tcps_sess_t*) workset[i].pUsr; - doReceive(pThis, &pNewSess, pPoll); - } - } -#endif } /* remove the tcp listen sockets from the epoll set */ @@ -1155,11 +1213,49 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE ENDObjClassInit(tcpsrv) -/* --------------- here now comes the plumbing that makes as a library module --------------- */ +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrFinished, NULL); + for(i = 0 ; i < wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].pSrv = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + } +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrFinished); + pthread_mutex_destroy(&wrkrMut); + +} + + +/* --------------- here now comes the plumbing that makes as a library module --------------- */ BEGINmodExit CODESTARTmodExit +dbgprintf("tcpsrv: modExit\n"); + stopWorkerPool(); + /* de-init in reverse order! */ tcpsrvClassExit(); tcps_sessClassExit(); @@ -1179,6 +1275,9 @@ CODESTARTmodInit /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(tcps_sessClassInit(pModInfo)); CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ + + startWorkerPool(); + ENDmodInit /* vim:set ai: -- cgit From b621be0936468eacd19f55c9ff43c7598a4a6701 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 27 Jan 2011 17:25:50 +0100 Subject: resolved problem with intermixing requests --- runtime/nsdpoll_ptcp.c | 4 ++-- tcpsrv.c | 27 +++++++++++++++++++++++---- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index 76c4e887..b6002b09 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -70,8 +70,8 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, pNew->id = id; pNew->pUsr = pUsr; pNew->pSock = pSock; - //pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */ - pNew->event.events = EPOLLET; + pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */ + //pNew->event.events = EPOLLET; if(mode & NSDPOLL_IN) pNew->event.events |= EPOLLIN; if(mode & NSDPOLL_OUT) diff --git a/tcpsrv.c b/tcpsrv.c index baaa27b1..3f078b6e 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -104,8 +104,8 @@ static struct wrkrInfo_s { void *pUsr; long long unsigned numCalled; /* how often was this called */ } wrkrInfo[4]; -static pthread_cond_t wrkrFinished; static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; static int wrkrMax = 4; static int wrkrRunning; @@ -604,6 +604,8 @@ dbgprintf("XXX: worker %p activated\n", pthread_self()); pthread_mutex_lock(&wrkrMut); me->pSrv = NULL; /* indicate we are free again */ --wrkrRunning; + pthread_cond_signal(&wrkrIdle); +dbgprintf("XXX: worker %p idling\n", pthread_self()); } pthread_mutex_unlock(&wrkrMut); @@ -620,6 +622,7 @@ static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) { int i; + int origEntries = numEntries; DEFiRet; dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); @@ -637,7 +640,7 @@ dbgprintf("XXX: processWorkset 2\n"); pthread_mutex_lock(&wrkrMut); /* check if there is a free worker */ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) -dbgprintf("XXX: wrkrinfo[%d].pSrv %p\n", i, wrkrInfo[i].pSrv); +dbgprintf("XXX: wrkrinfo[%d].pSrv %p, .pUsr %p\n", i, wrkrInfo[i].pSrv, wrkrInfo[i].pUsr); /*do search*/; if(i < wrkrMax) { dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax); @@ -659,6 +662,21 @@ dbgprintf("XXX: processWorkset 2.2\n"); --numEntries; } + if(origEntries > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); +dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning); + while(wrkrRunning > 0) { +dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning); + pthread_cond_wait(&wrkrIdle, &wrkrMut); +dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning); + } + pthread_mutex_unlock(&wrkrMut); + } + finalize_it: RETiRet; } @@ -1219,8 +1237,9 @@ static inline void startWorkerPool(void) { int i; + wrkrRunning = 0; pthread_mutex_init(&wrkrMut, NULL); - pthread_cond_init(&wrkrFinished, NULL); + pthread_cond_init(&wrkrIdle, NULL); for(i = 0 ; i < wrkrMax ; ++i) { /* init worker info structure! */ pthread_cond_init(&wrkrInfo[i].run, NULL); @@ -1243,7 +1262,7 @@ stopWorkerPool(void) DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); pthread_cond_destroy(&wrkrInfo[i].run); } - pthread_cond_destroy(&wrkrFinished); + pthread_cond_destroy(&wrkrIdle); pthread_mutex_destroy(&wrkrMut); } -- cgit From a4a94ddfc0dc2256d7a3bc79ed8f9489de9f0f9b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 28 Jan 2011 11:00:21 +0100 Subject: interim commit: current debug state of new imptcp I need to verify the concept used in a simpler environment, and this means I probably need to freeze the (non-working) state here for a couple of days. --- runtime/debug.c | 40 +++++++++++++++++++++++----------------- tcpsrv.c | 14 ++++++++++++++ 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/runtime/debug.c b/runtime/debug.c index 3f1c23bd..a017fc30 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -843,12 +843,15 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) static int bWasNL = 0; char pszThrdName[64]; /* 64 is to be on the safe side, anything over 20 is bad... */ char pszWriteBuf[32*1024]; + size_t lenCopy; + size_t offsWriteBuf = 0; size_t lenWriteBuf; struct timespec t; # if _POSIX_TIMERS <= 0 struct timeval tv; # endif +#if 0 /* The bWasNL handler does not really work. It works if no thread * switching occurs during non-NL messages. Else, things are messed * up. Anyhow, it works well enough to provide useful help during @@ -859,8 +862,8 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) */ if(ptLastThrdID != pthread_self()) { if(!bWasNL) { - if(stddbg != -1) write(stddbg, "\n", 1); - if(altdbg != -1) write(altdbg, "\n", 1); + pszWriteBuf[0] = '\n'; + offsWriteBuf = 1; bWasNL = 1; } ptLastThrdID = pthread_self(); @@ -881,25 +884,28 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) t.tv_sec = tv.tv_sec; t.tv_nsec = tv.tv_usec * 1000; # endif - lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), + lenWriteBuf = snprintf(pszWriteBuf+offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf, "%4.4ld.%9.9ld:", (long) (t.tv_sec % 10000), t.tv_nsec); - if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf); - if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf); + offsWriteBuf += lenWriteBuf; } - lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszThrdName); - // use for testing: lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "{%ld}%s: ", (long) syscall(SYS_gettid), pszThrdName); - if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf); - if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf); + lenWriteBuf = snprintf(pszWriteBuf + offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf, "%s: ", pszThrdName); + offsWriteBuf += lenWriteBuf; /* print object name header if we have an object */ if(pszObjName != NULL) { - lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszObjName); - if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf); - if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf); + lenWriteBuf = snprintf(pszWriteBuf + offsWriteBuf, sizeof(pszWriteBuf) - offsWriteBuf, "%s: ", pszObjName); + offsWriteBuf += lenWriteBuf; } } - if(stddbg != -1) write(stddbg, pszMsg, lenMsg); - if(altdbg != -1) write(altdbg, pszMsg, lenMsg); +#endif + if(lenMsg > sizeof(pszWriteBuf) - offsWriteBuf) + lenCopy = sizeof(pszWriteBuf) - offsWriteBuf; + else + lenCopy = lenMsg; + memcpy(pszWriteBuf + offsWriteBuf, pszMsg, lenCopy); + offsWriteBuf += lenCopy; + if(stddbg != -1) write(stddbg, pszWriteBuf, offsWriteBuf); + if(altdbg != -1) write(altdbg, pszWriteBuf, offsWriteBuf); bWasNL = (pszMsg[lenMsg - 1] == '\n') ? 1 : 0; } @@ -923,12 +929,12 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg) pszObjName = obj.GetName(pObj); } - pthread_mutex_lock(&mutdbgprint); - pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint); +// pthread_mutex_lock(&mutdbgprint); +// pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint); do_dbgprint(pszObjName, pszMsg, lenMsg); - pthread_cleanup_pop(1); +// pthread_cleanup_pop(1); } #pragma GCC diagnostic warning "-Wempty-body" diff --git a/tcpsrv.c b/tcpsrv.c index 3f078b6e..da5182e1 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -548,6 +548,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) errno = 0; errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n", (*ppSess)->pStrm); +abort(); CHKiRet(closeSess(pThis, ppSess, pPoll)); break; } @@ -612,6 +613,8 @@ dbgprintf("XXX: worker %p idling\n", pthread_self()); return NULL; } +#warning remove include +#include /* Process a workset, that is handle io. We become activated * from either select or epoll handler. We split the workload @@ -625,6 +628,16 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t int origEntries = numEntries; DEFiRet; +{ /* chck workset for dupes */ +int k, j; +for(k = 0 ; k < numEntries ; ++k) + for(j = k+1 ; j < numEntries ; ++j) { + if(workset[k].pUsr == workset[j].pUsr) { + fprintf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr); + fflush(stderr); + } + } +} dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); while(numEntries > 0) { @@ -632,6 +645,7 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t ABORT_FINALIZE(RS_RET_FORCE_TERM); dbgprintf("XXX: num entries during processing %d\n", numEntries); if(numEntries == 1) { + //|| workset[numEntries-1].pUsr == pThis->ppLstn) { dbgprintf("XXX: processWorkset 1\n"); /* process self, save context switch */ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); -- cgit From 48ab717fedba586be5054320e32afc84afee9f52 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 31 Jan 2011 13:13:00 +0100 Subject: fixing regression: multi-threading had races --- runtime/netstrm.c | 2 ++ runtime/netstrms.c | 1 - runtime/nsdpoll_ptcp.c | 8 ++++++++ runtime/nsdpoll_ptcp.h | 1 + tcps_sess.c | 3 +++ tcpsrv.c | 30 +++++++++++++++++++----------- 6 files changed, 33 insertions(+), 12 deletions(-) diff --git a/runtime/netstrm.c b/runtime/netstrm.c index 3658006f..a6f840a5 100644 --- a/runtime/netstrm.c +++ b/runtime/netstrm.c @@ -64,6 +64,7 @@ ENDobjConstruct(netstrm) /* destructor for the netstrm object */ BEGINobjDestruct(netstrm) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(netstrm) +//printf("destruct driver data %p\n", pThis->pDrvrData); if(pThis->pDrvrData != NULL) iRet = pThis->Drvr.Destruct(&pThis->pDrvrData); ENDobjDestruct(netstrm) @@ -169,6 +170,7 @@ Rcv(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf) { DEFiRet; ISOBJ_TYPE_assert(pThis, netstrm); +//printf("Rcv %p\n", pThis); iRet = pThis->Drvr.Rcv(pThis->pDrvrData, pBuf, pLenBuf); RETiRet; } diff --git a/runtime/netstrms.c b/runtime/netstrms.c index e9ff2568..56e492fe 100644 --- a/runtime/netstrms.c +++ b/runtime/netstrms.c @@ -32,7 +32,6 @@ #include "rsyslog.h" #include "module-template.h" #include "obj.h" -//#include "errmsg.h" #include "nsd.h" #include "netstrm.h" #include "nssel.h" diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index b6002b09..6fd92df1 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -77,8 +77,10 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock, if(mode & NSDPOLL_OUT) pNew->event.events |= EPOLLOUT; pNew->event.data.u64 = (uint64) pNew; + pthread_mutex_lock(&pThis->mutEvtLst); pNew->pNext = pThis->pRoot; pThis->pRoot = pNew; + pthread_mutex_unlock(&pThis->mutEvtLst); *pEvtLst = pNew; finalize_it: @@ -95,6 +97,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t ** nsdpoll_epollevt_lst_t *pPrev = NULL; DEFiRet; + pthread_mutex_lock(&pThis->mutEvtLst); pEvtLst = pThis->pRoot; while(pEvtLst != NULL && !(pEvtLst->id == id && pEvtLst->pUsr == pUsr)) { pPrev = pEvtLst; @@ -112,6 +115,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t ** pPrev->pNext = pEvtLst->pNext; finalize_it: + pthread_mutex_unlock(&pThis->mutEvtLst); RETiRet; } @@ -145,6 +149,7 @@ BEGINobjConstruct(nsdpoll_ptcp) /* be sure to specify the object type also in EN DBGPRINTF("epoll_create1() could not create fd\n"); ABORT_FINALIZE(RS_RET_IO_ERROR); } + pthread_mutex_init(&pThis->mutEvtLst, NULL); finalize_it: ENDobjConstruct(nsdpoll_ptcp) @@ -152,6 +157,9 @@ ENDobjConstruct(nsdpoll_ptcp) /* destructor for the nsdpoll_ptcp object */ BEGINobjDestruct(nsdpoll_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(nsdpoll_ptcp) + //printf("ndspoll_ptcp destruct, event list root is %p\n", pThis->pRoot); +#warning cleanup event list is missing! (at least I think so) + pthread_mutex_destroy(&pThis->mutEvtLst); ENDobjDestruct(nsdpoll_ptcp) diff --git a/runtime/nsdpoll_ptcp.h b/runtime/nsdpoll_ptcp.h index cea2823d..dfefad1b 100644 --- a/runtime/nsdpoll_ptcp.h +++ b/runtime/nsdpoll_ptcp.h @@ -49,6 +49,7 @@ struct nsdpoll_ptcp_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ int efd; /* file descriptor used by epoll */ nsdpoll_epollevt_lst_t *pRoot; /* Root of the epoll event list */ + pthread_mutex_t mutEvtLst; }; /* interface is defined in nsd.h, we just implement it! */ diff --git a/tcps_sess.c b/tcps_sess.c index 99af0cb8..8b944885 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -95,6 +95,7 @@ finalize_it: /* destructor for the tcps_sess object */ BEGINobjDestruct(tcps_sess) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(tcps_sess) +//printf("sess %p destruct, pStrm %p\n", pThis, pThis->pStrm); if(pThis->pStrm != NULL) netstrm.Destruct(&pThis->pStrm); @@ -337,6 +338,7 @@ Close(tcps_sess_t *pThis) { DEFiRet; +//printf("sess %p close\n", pThis); ISOBJ_TYPE_assert(pThis, tcps_sess); netstrm.Destruct(&pThis->pStrm); if(pThis->fromHost != NULL) { @@ -466,6 +468,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen) char *pEnd; DEFiRet; +//printf("DataRcvd: %p\n", pThis); ISOBJ_TYPE_assert(pThis, tcps_sess); assert(pData != NULL); assert(iLen > 0); diff --git a/tcpsrv.c b/tcpsrv.c index da5182e1..23b2f630 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -71,6 +71,7 @@ #include "ruleset.h" #include "unicode-helper.h" + MODULE_TYPE_LIB /* defines */ @@ -514,6 +515,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) rsRetVal localRet; DEFiRet; +//printf("doReceive %p/%p\n", pThis, *ppSess); ISOBJ_TYPE_assert(pThis, tcpsrv); DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm); /* Receive message */ @@ -528,7 +530,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } + //pthread_mutex_lock(&mut); CHKiRet(closeSess(pThis, ppSess, pPoll)); + //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: /* we simply ignore retry - this is not an error, but we also have not received anything */ @@ -567,11 +571,13 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); if(pUsr == pThis->ppLstn) { +//printf("work item %p: connect\n", pUsr); DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); DBGPRINTF("New session created with NSD %p.\n", pNewSess); } else { +//printf("work item %p: receive\n", pUsr); pNewSess = (tcps_sess_t*) pUsr; doReceive(pThis, &pNewSess, pPoll); } @@ -595,10 +601,8 @@ wrkr(void *myself) } if(glbl.GetGlobalInputTermState() == 1) break; - ++wrkrRunning; pthread_mutex_unlock(&wrkrMut); -dbgprintf("XXX: worker %p activated\n", pthread_self()); ++me->numCalled; processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr); @@ -606,15 +610,12 @@ dbgprintf("XXX: worker %p activated\n", pthread_self()); me->pSrv = NULL; /* indicate we are free again */ --wrkrRunning; pthread_cond_signal(&wrkrIdle); -dbgprintf("XXX: worker %p idling\n", pthread_self()); } pthread_mutex_unlock(&wrkrMut); return NULL; } -#warning remove include -#include /* Process a workset, that is handle io. We become activated * from either select or epoll handler. We split the workload @@ -628,16 +629,20 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t int origEntries = numEntries; DEFiRet; +#if 0 { /* chck workset for dupes */ int k, j; -for(k = 0 ; k < numEntries ; ++k) +for(k = 0 ; k < numEntries ; ++k) { + //printf("work item %d: %p\n", k, workset[k].pUsr); for(j = k+1 ; j < numEntries ; ++j) { if(workset[k].pUsr == workset[j].pUsr) { - fprintf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr); - fflush(stderr); + printf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr); + flush(stderr); } } } +} +#endif dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); while(numEntries > 0) { @@ -663,6 +668,12 @@ dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[n wrkrInfo[i].pPoll = pPoll; wrkrInfo[i].idx = workset[numEntries -1].id; wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; + /* Note: we must increment wrkrRunning HERE and not inside the worker's + * code. This is because a worker may actually never start, and thus + * increment wrkrRunning, before we finish and check the running worker + * count. We can only avoid this by incrementing it here. + */ + ++wrkrRunning; pthread_cond_signal(&wrkrInfo[i].run); pthread_mutex_unlock(&wrkrMut); } else { @@ -682,11 +693,8 @@ dbgprintf("XXX: processWorkset 2.2\n"); * by workers running during the epoll call. */ pthread_mutex_lock(&wrkrMut); -dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning); while(wrkrRunning > 0) { -dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning); pthread_cond_wait(&wrkrIdle, &wrkrMut); -dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning); } pthread_mutex_unlock(&wrkrMut); } -- cgit From fd256a09ffa109120304d293cf6faf808c5a1a21 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 31 Jan 2011 15:59:43 +0100 Subject: tcpsrv select-handler experimentally moved to multi-threading as well first tests done with plain tcp, TLS subsystems tests need to be carried out. No serious lab testing done so far. --- runtime/debug.c | 2 +- tcpsrv.c | 57 ++++++++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/runtime/debug.c b/runtime/debug.c index a017fc30..283dae3a 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -851,7 +851,7 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) struct timeval tv; # endif -#if 0 +#if 1 /* The bWasNL handler does not really work. It works if no thread * switching occurs during non-NL messages. Else, things are messed * up. Anyhow, it works well enough to provide useful help during diff --git a/tcpsrv.c b/tcpsrv.c index 23b2f630..d86bff6b 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -496,7 +496,9 @@ closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); } pThis->pOnRegularClose(*ppSess); +dbgprintf("XXX: pre destruct *ppSess = %p\n", *ppSess); tcps_sess.Destruct(ppSess); +dbgprintf("XXX: post destruct *ppSess = %p\n", *ppSess); finalize_it: RETiRet; } @@ -531,7 +533,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) (*ppSess)->pStrm, pszPeer); } //pthread_mutex_lock(&mut); +dbgprintf("XXX: calling closeSess()\n"); CHKiRet(closeSess(pThis, ppSess, pPoll)); +dbgprintf("XXX: done closeSess(), *ppSess %p\n", *ppSess); //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: @@ -574,12 +578,16 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) //printf("work item %p: connect\n", pUsr); DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + if(pPoll != NULL) + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); DBGPRINTF("New session created with NSD %p.\n", pNewSess); } else { //printf("work item %p: receive\n", pUsr); pNewSess = (tcps_sess_t*) pUsr; doReceive(pThis, &pNewSess, pPoll); + if(pPoll == NULL && pNewSess == NULL) { + pThis->pSessions[idx] = NULL; + } } finalize_it: @@ -648,21 +656,15 @@ for(k = 0 ; k < numEntries ; ++k) { while(numEntries > 0) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); -dbgprintf("XXX: num entries during processing %d\n", numEntries); if(numEntries == 1) { - //|| workset[numEntries-1].pUsr == pThis->ppLstn) { -dbgprintf("XXX: processWorkset 1\n"); /* process self, save context switch */ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); } else { -dbgprintf("XXX: processWorkset 2\n"); pthread_mutex_lock(&wrkrMut); /* check if there is a free worker */ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) -dbgprintf("XXX: wrkrinfo[%d].pSrv %p, .pUsr %p\n", i, wrkrInfo[i].pSrv, wrkrInfo[i].pUsr); /*do search*/; if(i < wrkrMax) { -dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax); /* worker free -> use it! */ wrkrInfo[i].pSrv = pThis; wrkrInfo[i].pPoll = pPoll; @@ -677,7 +679,6 @@ dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[n pthread_cond_signal(&wrkrInfo[i].run); pthread_mutex_unlock(&wrkrMut); } else { -dbgprintf("XXX: processWorkset 2.2\n"); pthread_mutex_unlock(&wrkrMut); /* no free worker, so we process this one ourselfs */ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, @@ -710,11 +711,13 @@ finalize_it: */ #pragma GCC diagnostic ignored "-Wempty-body" static inline rsRetVal -RunSelect(tcpsrv_t *pThis) +RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) { DEFiRet; int nfds; int i; + int iWorkset; + int numEntries; int iTCPSess; int bIsReady; tcps_sess_t *pNewSess; @@ -740,6 +743,10 @@ RunSelect(tcpsrv_t *pThis) /* do the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(iTCPSess != -1) { +dbgprintf("Added sessions to select set, pSel %p\n", pSel); +dbgprintf("Added sessions to select set, iTCPSess %d\n", iTCPSess); +dbgprintf("Added sessions to select set, ptr to strm %p\n", pThis->pSessions[iTCPSess]); +dbgprintf("Added sessions to select set, strm %p\n", pThis->pSessions[iTCPSess]->pStrm); /* TODO: access to pNsd is NOT really CLEAN, use method... */ CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); /* now get next... */ @@ -751,13 +758,21 @@ RunSelect(tcpsrv_t *pThis) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ + iWorkset = 0; for(i = 0 ; i < pThis->iLstnCurr ; ++i) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + workset[iWorkset].id = i; + workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */ + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } + //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); --nfds; /* indicate we have processed one */ } } @@ -769,11 +784,23 @@ RunSelect(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); + //doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); + workset[iWorkset].id = iTCPSess; + workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess]; + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } + + if(iWorkset > 0) + processWorkset(pThis, NULL, iWorkset, workset); + + /* we need to copy back close descriptors */ CHKiRet(nssel.Destruct(&pSel)); finalize_it: /* this is a very special case - this time only we do not exit the function, * because that would not help us either. So we simply retry it. Let's see @@ -808,6 +835,10 @@ Run(tcpsrv_t *pThis) rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); +#if 0 +iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); +FINALIZE; +#endif /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler @@ -820,7 +851,7 @@ Run(tcpsrv_t *pThis) if(localRet != RS_RET_OK) { /* fall back to select */ dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); - iRet = RunSelect(pThis); + iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); FINALIZE; } -- cgit From bea499dcb2747d1f5b42eae4978cfe86a37dc957 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 1 Feb 2011 22:58:30 +0100 Subject: (somewhat) improved TLS subsystem - improved TLS error reporting - improved TLS startup (Diffie-Hellman bits do not need to be generated, as we do not support full anon key exchange -- we always need certs) --- ChangeLog | 3 +++ runtime/nsd_gtls.c | 26 ++++---------------------- tcpsrv.c | 12 ++++++++---- 3 files changed, 15 insertions(+), 26 deletions(-) diff --git a/ChangeLog b/ChangeLog index 8dae0f8f..e6667c52 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ +- improved TLS error reporting +- improved TLS startup (Diffie-Hellman bits do not need to be generated, + as we do not support full anon key exchange -- we always need certs) --------------------------------------------------------------------------- Version 6.1.3 [DEVEL] (rgerhards), 2010-12-?? - added $IMUDPSchedulingPolicy and $IMUDPSchedulingPriority config settings diff --git a/runtime/nsd_gtls.c b/runtime/nsd_gtls.c index 0ee70e56..d6874183 100644 --- a/runtime/nsd_gtls.c +++ b/runtime/nsd_gtls.c @@ -50,7 +50,6 @@ #include "nsd_gtls.h" /* things to move to some better place/functionality - TODO */ -#define DH_BITS 1024 #define CRLFILE "crl.pem" @@ -81,7 +80,6 @@ static pthread_mutex_t mutGtlsStrerror; /**< a mutex protecting the potentially /* ------------------------------ GnuTLS specifics ------------------------------ */ static gnutls_certificate_credentials xcred; -static gnutls_dh_params dh_params; #ifdef DEBUG #if 0 /* uncomment, if needed some time again -- DEV Debug only */ @@ -609,7 +607,6 @@ gtlsInitSession(nsd_gtls_t *pThis) /* request client certificate if any. */ gnutls_certificate_server_set_request( session, GNUTLS_CERT_REQUEST); - gnutls_dh_set_prime_bits(session, DH_BITS); pThis->sess = session; @@ -618,23 +615,6 @@ finalize_it: } -static rsRetVal -generate_dh_params(void) -{ - int gnuRet; - DEFiRet; - /* Generate Diffie Hellman parameters - for use with DHE - * kx algorithms. These should be discarded and regenerated - * once a day, once a week or once a month. Depending on the - * security requirements. - */ - CHKgnutls(gnutls_dh_params_init( &dh_params)); - CHKgnutls(gnutls_dh_params_generate2( dh_params, DH_BITS)); -finalize_it: - RETiRet; -} - - /* set up all global things that are needed for server operations * rgerhards, 2008-04-30 */ @@ -648,8 +628,6 @@ gtlsGlblInitLstn(void) * considered legacy. -- rgerhards, 2008-05-05 */ /*CHKgnutls(gnutls_certificate_set_x509_crl_file(xcred, CRLFILE, GNUTLS_X509_FMT_PEM));*/ - CHKiRet(generate_dh_params()); - gnutls_certificate_set_dh_params(xcred, dh_params); /* this is void */ bGlblSrvrInitDone = 1; /* we are all set now */ /* now we need to add our certificate */ @@ -1418,6 +1396,10 @@ AcceptConnReq(nsd_t *pNsd, nsd_t **ppNew) /* we got a handshake, now check authorization */ CHKiRet(gtlsChkPeerAuth(pNew)); } else { + uchar *pGnuErr = gtlsStrerror(gnuRet); + errmsg.LogError(0, RS_RET_TLS_HANDSHAKE_ERR, + "gnutls returned error on handshake: %s\n", pGnuErr); + free(pGnuErr); ABORT_FINALIZE(RS_RET_TLS_HANDSHAKE_ERR); } diff --git a/tcpsrv.c b/tcpsrv.c index d86bff6b..c031a591 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -577,10 +577,14 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) if(pUsr == pThis->ppLstn) { //printf("work item %p: connect\n", pUsr); DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); - SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); - if(pPoll != NULL) - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); - DBGPRINTF("New session created with NSD %p.\n", pNewSess); + iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); + if(iRet == RS_RET_OK) { + if(pPoll != NULL) + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + DBGPRINTF("tcpsrv: error %d during accept\n", iRet); + } } else { //printf("work item %p: receive\n", pUsr); pNewSess = (tcps_sess_t*) pUsr; -- cgit From 910f635cececf329dd50aedb9f6e1a8fae4efb0c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 2 Feb 2011 00:45:39 +0100 Subject: added support for TLS (in anon mode) to tcpflood --- ChangeLog | 1 + configure.ac | 1 + tests/Makefile.am | 6 ++- tests/tcpflood.c | 128 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 130 insertions(+), 6 deletions(-) diff --git a/ChangeLog b/ChangeLog index e6667c52..66743ad0 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,4 @@ +- added support for TLS (in anon mode) to tcpflood - improved TLS error reporting - improved TLS startup (Diffie-Hellman bits do not need to be generated, as we do not support full anon key exchange -- we always need certs) diff --git a/configure.ac b/configure.ac index b1af0662..1cde5935 100644 --- a/configure.ac +++ b/configure.ac @@ -651,6 +651,7 @@ AC_ARG_ENABLE(gnutls, ) if test "x$enable_gnutls" = "xyes"; then PKG_CHECK_MODULES(GNUTLS, gnutls >= 1.4.0) + AC_DEFINE([ENABLE_GNUTLS], [1], [Indicator that GnuTLS is present]) fi AM_CONDITIONAL(ENABLE_GNUTLS, test x$enable_gnutls = xyes) AC_SUBST(GNUTLS_CFLAGS) diff --git a/tests/Makefile.am b/tests/Makefile.am index 44c1daa7..68e84d53 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -326,7 +326,11 @@ uxsockrcvr_SOURCES = uxsockrcvr.c uxsockrcvr_LDADD = $(SOL_LIBS) tcpflood_SOURCES = tcpflood.c -tcpflood_LDADD = $(SOL_LIBS) $(PTHREADS_LIBS) +tcpflood_CPPFLAGS = $(PTHREADS_CFLAGS) $(GNUTLS_CFLAGS) +tcpflood_LDADD = $(SOL_LIBS) $(PTHREADS_LIBS) $(GNUTLS_LIBS) +if ENABLE_GNUTLS +tcpflood_LDADD += -lgcrypt +endif syslog_caller_SOURCES = syslog_caller.c syslog_caller_LDADD = $(SOL_LIBS) diff --git a/tests/tcpflood.c b/tests/tcpflood.c index 7b376bdd..eb2259ec 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -48,6 +48,8 @@ * -b number of messages within a batch (default: 100,000,000 millions) * -Y use multiple threads, one per connection (which means 1 if one only connection * is configured!) + * -z private key file for TLS mode + * -Z cert (public key) file for TLS mode * * Part of the testbench for rsyslog. * @@ -85,6 +87,12 @@ #include #include #include +#include +#ifdef ENABLE_GNUTLS +# include +# include + GCRY_THREAD_OPTION_PTHREAD_IMPL; +#endif #define EXIT_FAILURE 1 #define INVALID_SOCKET -1 @@ -122,6 +130,13 @@ static long long batchsize = 100000000ll; static int waittime = 0; static int runMultithreaded = 0; /* run tests in multithreaded mode */ static int numThrds = 1; /* number of threads to use */ +static char *tlsCertFile = NULL; +static char *tlsKeyFile = NULL; + +#ifdef ENABLE_GNUTLS +static gnutls_session_t *sessArray; /* array of TLS sessions to use */ +static gnutls_certificate_credentials tlscred; +#endif /* variables for managing multi-threaded operations */ int runningThreads; /* number of threads currently running */ @@ -151,7 +166,12 @@ struct runstats { static int udpsock; /* socket for sending in UDP mode */ static struct sockaddr_in udpRcvr; /* remote receiver in UDP mode */ -static enum { TP_UDP, TP_TCP } transport = TP_TCP; +static enum { TP_UDP, TP_TCP, TP_TLS } transport = TP_TCP; + +/* forward definitions */ +static void initTLSSess(int); +static int sendTLS(int i, char *buf, int lenBuf); +static void closeTLSSess(int __attribute__((unused)) i); /* prepare send subsystem for UDP send */ static inline int @@ -234,6 +254,9 @@ int openConnections(void) if(bShowProgress) write(1, " open connections", sizeof(" open connections")-1); +# ifdef ENABLE_GNUTLS + sessArray = calloc(numConnections, sizeof(gnutls_session_t)); +# endif sockArray = calloc(numConnections, sizeof(int)); for(i = 0 ; i < numConnections ; ++i) { if(i % 10 == 0) { @@ -244,6 +267,9 @@ int openConnections(void) printf("error in trying to open connection i=%d\n", i); return 1; } + if(transport == TP_TLS) { + initTLSSess(i); + } } if(bShowProgress) { lenMsg = sprintf(msgBuf, "\r%5.5d open connections\n", i); @@ -268,7 +294,7 @@ void closeConnections(void) struct linger ling; char msgBuf[128]; - if(transport != TP_TCP) + if(transport == TP_UDP) return; if(bShowProgress) @@ -287,6 +313,8 @@ void closeConnections(void) ling.l_onoff = 1; ling.l_linger = 1; setsockopt(sockArray[i], SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)); + if(transport == TP_TLS) + closeTLSSess(i); close(sockArray[i]); } } @@ -409,6 +437,8 @@ int sendMessages(struct instdata *inst) lenSend = send(sockArray[socknum], buf, lenBuf, 0); } else if(transport == TP_UDP) { lenSend = sendto(udpsock, buf, lenBuf, 0, &udpRcvr, sizeof(udpRcvr)); + } else if(transport == TP_TLS) { + lenSend = sendTLS(socknum, buf, lenBuf); } if(lenSend != lenBuf) { printf("\r%5.5d\n", i); @@ -643,6 +673,79 @@ runTests(void) return 0; } +# if defined(ENABLE_GNUTLS) +#if 0 +static void logFunction(int __attribute__((unused)) level, const char *msg) { + printf("%s\n", msg); +} +#endif +/* global init GnuTLS + */ +static void +initTLS(void) +{ + int r; + + /* order of gcry_control and gnutls_global_init matters! */ + gcry_control(GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); + gnutls_global_init (); + /* DEV debugging: gnutls_global_set_log_function(logFunction); */ + /* DEV debugging: gnutls_global_set_log_level(9); */ + + r = gnutls_certificate_allocate_credentials(&tlscred); + if(r != GNUTLS_E_SUCCESS) { + /* I don't know why this works even in case of error... */ + gnutls_perror(r); + } + r = gnutls_certificate_set_x509_key_file(tlscred, tlsCertFile, tlsKeyFile, GNUTLS_X509_FMT_PEM); + if(r != GNUTLS_E_SUCCESS) { + /* I don't know why this works even in case of error... */ + gnutls_perror(r); + } +} + +static void +initTLSSess(int i) +{ + int r; + gnutls_init (sessArray + i, GNUTLS_CLIENT); + + /* Use default priorities */ + gnutls_set_default_priority(sessArray[i]); + + /* put our credentials to the current session */ + r = gnutls_credentials_set(sessArray[i], GNUTLS_CRD_CERTIFICATE, tlscred); + + gnutls_transport_set_ptr(sessArray[i], (gnutls_transport_ptr_t) sockArray[i]); + + /* Perform the TLS handshake */ + r = gnutls_handshake(sessArray[i]); + + if(r < 0) { + fprintf (stderr, "TLS Handshake failed\n"); + gnutls_perror(r); + exit(1); + } +} + +static int +sendTLS(int i, char *buf, int lenBuf) +{ + return gnutls_record_send(sessArray[i], buf, lenBuf); +} + +static void +closeTLSSess(int i) +{ + gnutls_bye(sessArray[i], GNUTLS_SHUT_RDWR); + gnutls_deinit(sessArray[i]); +} +# else /* NO TLS available */ +static void initTLS(void) {} +static void initTLSSess(int __attribute__((unused)) i) {} +static int sendTLS(int i, char *buf, int lenBuf) { return 0; } +static void closeTLSSess(int __attribute__((unused)) i) {} +# endif /* Run the test. * rgerhards, 2009-04-03 @@ -666,7 +769,7 @@ int main(int argc, char *argv[]) setvbuf(stdout, buf, _IONBF, 48); - while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:M:rsBR:S:T:XW:Y")) != -1) { + while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:M:rsBR:S:T:XW:YzZ")) != -1) { switch (opt) { case 'b': batchsize = atoll(optarg); break; @@ -725,8 +828,15 @@ int main(int argc, char *argv[]) transport = TP_UDP; } else if(!strcmp(optarg, "tcp")) { transport = TP_TCP; + } else if(!strcmp(optarg, "tls")) { +# if defined(ENABLE_GNUTLS) + transport = TP_TLS; +# else + fprintf(stderr, "compiled without TLS support!\n", optarg); + exit(1); +# endif } else { - fprintf(stderr, "unkonwn transport '%s'\n", optarg); + fprintf(stderr, "unknown transport '%s'\n", optarg); exit(1); } break; @@ -734,6 +844,10 @@ int main(int argc, char *argv[]) break; case 'Y': runMultithreaded = 1; break; + case 'z': tlsKeyFile = optarg; + break; + case 'Z': tlsCertFile = optarg; + break; default: printf("invalid option '%c' or value missing - terminating...\n", opt); exit (1); break; @@ -769,6 +883,10 @@ int main(int argc, char *argv[]) } } + if(transport == TP_TLS) { + initTLS(); + } + if(openConnections() != 0) { printf("error opening connections\n"); exit(1); @@ -781,7 +899,7 @@ int main(int argc, char *argv[]) closeConnections(); /* this is important so that we do not finish too early! */ - if(nConnDrops > 0) + if(nConnDrops > 0 && !bSilent) printf("-D option initiated %ld connection closures\n", nConnDrops); if(!bSilent) -- cgit From 0c880a4a30b7d390f8d45491fa6c5186dbe6c117 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 2 Feb 2011 01:51:18 +0100 Subject: fixed very recent regression in tcpsrv.c & some cleanup forgot a debugging abort() ;) --- tcpsrv.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tcpsrv.c b/tcpsrv.c index c031a591..db21c638 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -533,9 +533,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) (*ppSess)->pStrm, pszPeer); } //pthread_mutex_lock(&mut); -dbgprintf("XXX: calling closeSess()\n"); CHKiRet(closeSess(pThis, ppSess, pPoll)); -dbgprintf("XXX: done closeSess(), *ppSess %p\n", *ppSess); //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: @@ -556,7 +554,6 @@ dbgprintf("XXX: done closeSess(), *ppSess %p\n", *ppSess); errno = 0; errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n", (*ppSess)->pStrm); -abort(); CHKiRet(closeSess(pThis, ppSess, pPoll)); break; } @@ -674,6 +671,7 @@ for(k = 0 ; k < numEntries ; ++k) { wrkrInfo[i].pPoll = pPoll; wrkrInfo[i].idx = workset[numEntries -1].id; wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; +dbgprintf("XXX: activating worker %d\n", i); /* Note: we must increment wrkrRunning HERE and not inside the worker's * code. This is because a worker may actually never start, and thus * increment wrkrRunning, before we finish and check the running worker @@ -747,10 +745,10 @@ RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) /* do the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(iTCPSess != -1) { -dbgprintf("Added sessions to select set, pSel %p\n", pSel); -dbgprintf("Added sessions to select set, iTCPSess %d\n", iTCPSess); -dbgprintf("Added sessions to select set, ptr to strm %p\n", pThis->pSessions[iTCPSess]); -dbgprintf("Added sessions to select set, strm %p\n", pThis->pSessions[iTCPSess]->pStrm); +//dbgprintf("Added sessions to select set, pSel %p\n", pSel); +//dbgprintf("Added sessions to select set, iTCPSess %d\n", iTCPSess); +//dbgprintf("Added sessions to select set, ptr to strm %p\n", pThis->pSessions[iTCPSess]); +//dbgprintf("Added sessions to select set, strm %p\n", pThis->pSessions[iTCPSess]->pStrm); /* TODO: access to pNsd is NOT really CLEAN, use method... */ CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); /* now get next... */ @@ -1317,6 +1315,7 @@ stopWorkerPool(void) pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ pthread_join(wrkrInfo[i].tid, NULL); DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); +printf("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); pthread_cond_destroy(&wrkrInfo[i].run); } pthread_cond_destroy(&wrkrIdle); -- cgit From 233157e520045ef4e05687b0db3b41692ffce5fd Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 4 Feb 2011 09:22:58 +0100 Subject: experiemental milestone: tcpflood support buffering in TLS mode This needs to be checked, it is not yet 100% correct --- tests/tcpflood.c | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/tests/tcpflood.c b/tests/tcpflood.c index eb2259ec..b0a40924 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -100,6 +100,7 @@ #define NETTEST_INPUT_CONF_FILE "nettest.input.conf" /* name of input file, must match $IncludeConfig in .conf files */ #define MAX_EXTRADATA_LEN 100*1024 +#define MAX_SENDBUF 2 * MAX_EXTRADATA_LEN static char *targetIP = "127.0.0.1"; static char *msgPRI = "167"; @@ -397,6 +398,8 @@ int sendMessages(struct instdata *inst) int lenSend = 0; char *statusText = ""; char buf[MAX_EXTRADATA_LEN + 1024]; + char sendBuf[MAX_SENDBUF]; + int offsSendBuf = 0; if(!bSilent) { if(dataFile == NULL) { @@ -438,7 +441,16 @@ int sendMessages(struct instdata *inst) } else if(transport == TP_UDP) { lenSend = sendto(udpsock, buf, lenBuf, 0, &udpRcvr, sizeof(udpRcvr)); } else if(transport == TP_TLS) { - lenSend = sendTLS(socknum, buf, lenBuf); + if(offsSendBuf + lenBuf < MAX_SENDBUF) { + memcpy(sendBuf+offsSendBuf, buf, lenBuf); + offsSendBuf += lenBuf; + lenSend = lenBuf; /* simulate "good" call */ + } else { + lenSend = sendTLS(socknum, sendBuf, offsSendBuf); + lenSend = (lenSend == offsSendBuf) ? lenBuf : -1; + memcpy(sendBuf, buf, lenBuf); + offsSendBuf = lenBuf; + } } if(lenSend != lenBuf) { printf("\r%5.5d\n", i); @@ -469,6 +481,11 @@ int sendMessages(struct instdata *inst) ++msgNum; ++i; } + if(transport == TP_TLS && offsSendBuf != 0) { + /* send remaining buffer */ + lenSend = sendTLS(socknum, sendBuf, offsSendBuf); +printf("TLS send buffer of %d messages remaining, sent %d\n", offsSendBuf, lenSend); + } if(!bSilent) printf("\r%8.8d %s sent\n", i, statusText); @@ -731,7 +748,18 @@ initTLSSess(int i) static int sendTLS(int i, char *buf, int lenBuf) { - return gnutls_record_send(sessArray[i], buf, lenBuf); + int lenSent; + int r; + + lenSent = 0; + while(lenSent != lenBuf) { + r = gnutls_record_send(sessArray[i], buf + lenSent, lenBuf - lenSent); + if(r < 0) + break; + lenSent += r; + } + + return lenSent; } static void -- cgit From bf088c5c2ed06db1c765a96df3762a00ae44db9c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 Feb 2011 15:52:28 +0100 Subject: completed new TLS code in tcpflood --- runtime/nsd_gtls.c | 2 ++ tests/tcpflood.c | 75 +++++++++++++++++++++++++++++++++++++----------------- 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/runtime/nsd_gtls.c b/runtime/nsd_gtls.c index d6874183..b4e747bf 100644 --- a/runtime/nsd_gtls.c +++ b/runtime/nsd_gtls.c @@ -1151,6 +1151,8 @@ CODESTARTobjDestruct(nsd_gtls) gnutls_x509_crt_deinit(pThis->ourCert); if(pThis->bOurKeyIsInit) gnutls_x509_privkey_deinit(pThis->ourKey); +#warning need more checks if the new gnutls_deinit() breaks things during normal operations +// gnutls_deinit(pThis->sess); /* see ln 600 pThis->bInSess as something to check? */ ENDobjDestruct(nsd_gtls) diff --git a/tests/tcpflood.c b/tests/tcpflood.c index b0a40924..b4f097f9 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -50,6 +50,7 @@ * is configured!) * -z private key file for TLS mode * -Z cert (public key) file for TLS mode + * -L loglevel to use for GnuTLS troubleshooting (0-off to 10-all, 0 default) * * Part of the testbench for rsyslog. * @@ -133,6 +134,7 @@ static int runMultithreaded = 0; /* run tests in multithreaded mode */ static int numThrds = 1; /* number of threads to use */ static char *tlsCertFile = NULL; static char *tlsKeyFile = NULL; +static int tlsLogLevel = 0; #ifdef ENABLE_GNUTLS static gnutls_session_t *sessArray; /* array of TLS sessions to use */ @@ -332,7 +334,7 @@ void closeConnections(void) * of constructing test messages. -- rgerhards, 2010-03-31 */ static inline void -genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst) +genMsg(char *buf, size_t maxBuf, int *pLenBuf) { int edLen; /* actual extra data length to use */ char extraData[MAX_EXTRADATA_LEN + 1]; @@ -376,10 +378,7 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst) *pLenBuf = snprintf(buf, maxBuf, "%s\n", MsgToSend); } - if(inst->numSent++ >= inst->numMsgs) - *pLenBuf = 0; /* indicate end of run */ - -finalize_it: ; +finalize_it: /*EMPTY to keep the compiler happy */; } /* send messages to the tcp connections we keep open. We use @@ -413,22 +412,20 @@ int sendMessages(struct instdata *inst) } if(bShowProgress) printf("\r%8.8d %s sent", 0, statusText); - while(1) { /* broken inside loop! */ + while(i < inst->numMsgs) { if(runMultithreaded) { socknum = inst->idx; } else { if(i < numConnections) socknum = i; - else if(i >= inst->numMsgs - numConnections) + else if(i >= inst->numMsgs - numConnections) { socknum = i - (inst->numMsgs - numConnections); - else { + } else { int rnd = rand(); socknum = rnd % numConnections; } } - genMsg(buf, sizeof(buf), &lenBuf, inst); /* generate the message to send according to params */ - if(lenBuf == 0) - break; /* end of processing! */ + genMsg(buf, sizeof(buf), &lenBuf); /* generate the message to send according to params */ if(transport == TP_TCP) { if(sockArray[socknum] == -1) { /* connection was dropped, need to re-establish */ @@ -484,7 +481,6 @@ int sendMessages(struct instdata *inst) if(transport == TP_TLS && offsSendBuf != 0) { /* send remaining buffer */ lenSend = sendTLS(socknum, sendBuf, offsSendBuf); -printf("TLS send buffer of %d messages remaining, sent %d\n", offsSendBuf, lenSend); } if(!bSilent) printf("\r%8.8d %s sent\n", i, statusText); @@ -691,11 +687,17 @@ runTests(void) } # if defined(ENABLE_GNUTLS) -#if 0 -static void logFunction(int __attribute__((unused)) level, const char *msg) { - printf("%s\n", msg); +/* This defines a log function to be provided to GnuTLS. It hopefully + * helps us track down hard to find problems. + * rgerhards, 2008-06-20 + */ +static void tlsLogFunction(int level, const char *msg) +{ + printf("GnuTLS (level %d): %s", level, msg); + } -#endif + + /* global init GnuTLS */ static void @@ -705,39 +707,62 @@ initTLS(void) /* order of gcry_control and gnutls_global_init matters! */ gcry_control(GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); - gnutls_global_init (); - /* DEV debugging: gnutls_global_set_log_function(logFunction); */ - /* DEV debugging: gnutls_global_set_log_level(9); */ + gnutls_global_init(); + /* set debug mode, if so required by the options */ + if(tlsLogLevel > 0) { + gnutls_global_set_log_function(tlsLogFunction); + gnutls_global_set_log_level(tlsLogLevel); + } r = gnutls_certificate_allocate_credentials(&tlscred); if(r != GNUTLS_E_SUCCESS) { - /* I don't know why this works even in case of error... */ + printf("error allocating credentials\n"); gnutls_perror(r); + exit(1); } r = gnutls_certificate_set_x509_key_file(tlscred, tlsCertFile, tlsKeyFile, GNUTLS_X509_FMT_PEM); if(r != GNUTLS_E_SUCCESS) { - /* I don't know why this works even in case of error... */ + printf("error setting certificate files -- have you mixed up key and certificate?\n"); + printf("If in doubt, try swapping the files in -z/-Z\n"); + printf("Certifcate is: '%s'\n", tlsCertFile); + printf("Key is: '%s'\n", tlsKeyFile); gnutls_perror(r); + r = gnutls_certificate_set_x509_key_file(tlscred, tlsKeyFile, tlsCertFile, + GNUTLS_X509_FMT_PEM); + if(r == GNUTLS_E_SUCCESS) { + printf("Tried swapping files, this seems to work " + "(but results may be unpredictable!)\n"); + } else { + exit(1); + } } } + static void initTLSSess(int i) { int r; - gnutls_init (sessArray + i, GNUTLS_CLIENT); + gnutls_init(sessArray + i, GNUTLS_CLIENT); /* Use default priorities */ gnutls_set_default_priority(sessArray[i]); /* put our credentials to the current session */ r = gnutls_credentials_set(sessArray[i], GNUTLS_CRD_CERTIFICATE, tlscred); + if(r != GNUTLS_E_SUCCESS) { + fprintf (stderr, "Setting credentials failed\n"); + gnutls_perror(r); + exit(1); + } + /* NOTE: the following statement generates a cast warning, but there seems to + * be no way around it with current GnuTLS. Do NOT try to "fix" the situation! + */ gnutls_transport_set_ptr(sessArray[i], (gnutls_transport_ptr_t) sockArray[i]); /* Perform the TLS handshake */ r = gnutls_handshake(sessArray[i]); - if(r < 0) { fprintf (stderr, "TLS Handshake failed\n"); gnutls_perror(r); @@ -797,7 +822,7 @@ int main(int argc, char *argv[]) setvbuf(stdout, buf, _IONBF, 48); - while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:M:rsBR:S:T:XW:YzZ")) != -1) { + while((opt = getopt(argc, argv, "b:ef:F:t:p:c:C:m:i:I:P:d:Dn:L:M:rsBR:S:T:XW:Yz:Z:")) != -1) { switch (opt) { case 'b': batchsize = atoll(optarg); break; @@ -832,6 +857,8 @@ int main(int argc, char *argv[]) break; case 'F': frameDelim = atoi(optarg); break; + case 'L': tlsLogLevel = atoi(optarg); + break; case 'M': MsgToSend = optarg; break; case 'I': dataFile = optarg; -- cgit From 34cf945d034cbd3ef2331f378842bb21478ce7be Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 Feb 2011 16:38:24 +0100 Subject: some cleanup --- ChangeLog | 7 +++++-- tcpsrv.c | 32 -------------------------------- 2 files changed, 5 insertions(+), 34 deletions(-) diff --git a/ChangeLog b/ChangeLog index 1d2e3ce1..d49b476e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,9 +1,12 @@ +--------------------------------------------------------------------------- +Version 6.1.5 [DEVEL] (rgerhards), 2011-02-?? +- enhanced imtcp to use a pool of worker threads to process incoming + messages. This enables higher processing rates, especially in the TLS + case (where more CPU is needed for the crypto functions) - added support for TLS (in anon mode) to tcpflood - improved TLS error reporting - improved TLS startup (Diffie-Hellman bits do not need to be generated, as we do not support full anon key exchange -- we always need certs) ---------------------------------------------------------------------------- -Version 6.1.5 [DEVEL] (rgerhards), 2011-02-?? - bugfix: fixed a memory leak and potential abort condition this could happen if multiple rulesets were used and some output batches contained messages belonging to more than one ruleset. diff --git a/tcpsrv.c b/tcpsrv.c index db21c638..698b52dc 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -496,9 +496,7 @@ closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); } pThis->pOnRegularClose(*ppSess); -dbgprintf("XXX: pre destruct *ppSess = %p\n", *ppSess); tcps_sess.Destruct(ppSess); -dbgprintf("XXX: post destruct *ppSess = %p\n", *ppSess); finalize_it: RETiRet; } @@ -517,7 +515,6 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) rsRetVal localRet; DEFiRet; -//printf("doReceive %p/%p\n", pThis, *ppSess); ISOBJ_TYPE_assert(pThis, tcpsrv); DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm); /* Receive message */ @@ -572,7 +569,6 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); if(pUsr == pThis->ppLstn) { -//printf("work item %p: connect\n", pUsr); DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); if(iRet == RS_RET_OK) { @@ -583,7 +579,6 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) DBGPRINTF("tcpsrv: error %d during accept\n", iRet); } } else { -//printf("work item %p: receive\n", pUsr); pNewSess = (tcps_sess_t*) pUsr; doReceive(pThis, &pNewSess, pPoll); if(pPoll == NULL && pNewSess == NULL) { @@ -638,20 +633,6 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t int origEntries = numEntries; DEFiRet; -#if 0 -{ /* chck workset for dupes */ -int k, j; -for(k = 0 ; k < numEntries ; ++k) { - //printf("work item %d: %p\n", k, workset[k].pUsr); - for(j = k+1 ; j < numEntries ; ++j) { - if(workset[k].pUsr == workset[j].pUsr) { - printf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr); - flush(stderr); - } - } -} -} -#endif dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); while(numEntries > 0) { @@ -671,7 +652,6 @@ for(k = 0 ; k < numEntries ; ++k) { wrkrInfo[i].pPoll = pPoll; wrkrInfo[i].idx = workset[numEntries -1].id; wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; -dbgprintf("XXX: activating worker %d\n", i); /* Note: we must increment wrkrRunning HERE and not inside the worker's * code. This is because a worker may actually never start, and thus * increment wrkrRunning, before we finish and check the running worker @@ -719,10 +699,8 @@ RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) int nfds; int i; int iWorkset; - int numEntries; int iTCPSess; int bIsReady; - tcps_sess_t *pNewSess; nssel_t *pSel = NULL; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -745,10 +723,6 @@ RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) /* do the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(iTCPSess != -1) { -//dbgprintf("Added sessions to select set, pSel %p\n", pSel); -//dbgprintf("Added sessions to select set, iTCPSess %d\n", iTCPSess); -//dbgprintf("Added sessions to select set, ptr to strm %p\n", pThis->pSessions[iTCPSess]); -//dbgprintf("Added sessions to select set, strm %p\n", pThis->pSessions[iTCPSess]->pStrm); /* TODO: access to pNsd is NOT really CLEAN, use method... */ CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); /* now get next... */ @@ -786,7 +760,6 @@ RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - //doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); workset[iWorkset].id = iTCPSess; workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess]; ++iWorkset; @@ -837,10 +810,6 @@ Run(tcpsrv_t *pThis) rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); -#if 0 -iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); -FINALIZE; -#endif /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler @@ -1315,7 +1284,6 @@ stopWorkerPool(void) pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ pthread_join(wrkrInfo[i].tid, NULL); DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); -printf("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); pthread_cond_destroy(&wrkrInfo[i].run); } pthread_cond_destroy(&wrkrIdle); -- cgit From 9be853a2c8d0fd7fdc415200af57493ad5a00feb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 Feb 2011 17:28:07 +0100 Subject: added new tls-based test to testbench --- tests/Makefile.am | 7 +++++++ tests/imtcp-tls-basic.sh | 11 +++++++++++ tests/tcpflood.c | 5 +++-- 3 files changed, 21 insertions(+), 2 deletions(-) create mode 100755 tests/imtcp-tls-basic.sh diff --git a/tests/Makefile.am b/tests/Makefile.am index ec1b0813..9bf33938 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -57,6 +57,11 @@ TESTS += \ imptcp_conndrop.sh endif +if ENABLE_GNUTLS +TESTS += \ + imtcp-tls-basic.sh +endif + if ENABLE_OMUXSOCK TESTS += uxsock_simple.sh endif @@ -194,6 +199,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ testsuites/da-mainmsg-q.conf \ diskqueue-fsync.sh \ testsuites/diskqueue-fsync.conf \ + imtcp-tls-basic.sh \ + testsuites/imtcp-tls-basic.conf \ imtcp-multiport.sh \ testsuites/imtcp-multiport.conf \ manytcp.sh \ diff --git a/tests/imtcp-tls-basic.sh b/tests/imtcp-tls-basic.sh new file mode 100755 index 00000000..d00f95d6 --- /dev/null +++ b/tests/imtcp-tls-basic.sh @@ -0,0 +1,11 @@ +# added 2011-02-28 by Rgerhards +# This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== +echo \[imtcp-tls-basic.sh\]: testing imtcp in TLS mode - basic test +source $srcdir/diag.sh init +source $srcdir/diag.sh startup imtcp-tls-basic.conf +source $srcdir/diag.sh tcpflood -p13514 -m50000 -Ttls -Z./tls-certs/cert.pem -z./tls-certs/key.pem +source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages +source $srcdir/diag.sh wait-shutdown +source $srcdir/diag.sh seq-check 0 49999 +source $srcdir/diag.sh exit diff --git a/tests/tcpflood.c b/tests/tcpflood.c index b4f097f9..59c63d23 100644 --- a/tests/tcpflood.c +++ b/tests/tcpflood.c @@ -334,7 +334,7 @@ void closeConnections(void) * of constructing test messages. -- rgerhards, 2010-03-31 */ static inline void -genMsg(char *buf, size_t maxBuf, int *pLenBuf) +genMsg(char *buf, size_t maxBuf, int *pLenBuf, struct instdata *inst) { int edLen; /* actual extra data length to use */ char extraData[MAX_EXTRADATA_LEN + 1]; @@ -377,6 +377,7 @@ genMsg(char *buf, size_t maxBuf, int *pLenBuf) /* use fixed message format from command line */ *pLenBuf = snprintf(buf, maxBuf, "%s\n", MsgToSend); } + ++inst->numSent; finalize_it: /*EMPTY to keep the compiler happy */; } @@ -425,7 +426,7 @@ int sendMessages(struct instdata *inst) socknum = rnd % numConnections; } } - genMsg(buf, sizeof(buf), &lenBuf); /* generate the message to send according to params */ + genMsg(buf, sizeof(buf), &lenBuf, inst); /* generate the message to send according to params */ if(transport == TP_TCP) { if(sockArray[socknum] == -1) { /* connection was dropped, need to re-establish */ -- cgit