diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2011-02-28 16:38:24 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2011-02-28 16:38:24 +0100 |
commit | 34cf945d034cbd3ef2331f378842bb21478ce7be (patch) | |
tree | 14d034148f55037ce5d584f044b8c61b62abe2b4 | |
parent | c0f92325c634fd3d0482f5d8bdc6650ab725cce1 (diff) | |
download | rsyslog-34cf945d034cbd3ef2331f378842bb21478ce7be.tar.gz rsyslog-34cf945d034cbd3ef2331f378842bb21478ce7be.tar.xz rsyslog-34cf945d034cbd3ef2331f378842bb21478ce7be.zip |
some cleanup
-rw-r--r-- | ChangeLog | 7 | ||||
-rw-r--r-- | tcpsrv.c | 32 |
2 files changed, 5 insertions, 34 deletions
@@ -1,9 +1,12 @@ +--------------------------------------------------------------------------- +Version 6.1.5 [DEVEL] (rgerhards), 2011-02-?? +- enhanced imtcp to use a pool of worker threads to process incoming + messages. This enables higher processing rates, especially in the TLS + case (where more CPU is needed for the crypto functions) - added support for TLS (in anon mode) to tcpflood - improved TLS error reporting - improved TLS startup (Diffie-Hellman bits do not need to be generated, as we do not support full anon key exchange -- we always need certs) ---------------------------------------------------------------------------- -Version 6.1.5 [DEVEL] (rgerhards), 2011-02-?? - bugfix: fixed a memory leak and potential abort condition this could happen if multiple rulesets were used and some output batches contained messages belonging to more than one ruleset. @@ -496,9 +496,7 @@ closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); } pThis->pOnRegularClose(*ppSess); -dbgprintf("XXX: pre destruct *ppSess = %p\n", *ppSess); tcps_sess.Destruct(ppSess); -dbgprintf("XXX: post destruct *ppSess = %p\n", *ppSess); finalize_it: RETiRet; } @@ -517,7 +515,6 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) rsRetVal localRet; DEFiRet; -//printf("doReceive %p/%p\n", pThis, *ppSess); ISOBJ_TYPE_assert(pThis, tcpsrv); DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm); /* Receive message */ @@ -572,7 +569,6 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); if(pUsr == pThis->ppLstn) { -//printf("work item %p: connect\n", pUsr); DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); if(iRet == RS_RET_OK) { @@ -583,7 +579,6 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) DBGPRINTF("tcpsrv: error %d during accept\n", iRet); } } else { -//printf("work item %p: receive\n", pUsr); pNewSess = (tcps_sess_t*) pUsr; doReceive(pThis, &pNewSess, pPoll); if(pPoll == NULL && pNewSess == NULL) { @@ -638,20 +633,6 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t int origEntries = numEntries; DEFiRet; -#if 0 -{ /* chck workset for dupes */ -int k, j; -for(k = 0 ; k < numEntries ; ++k) { - //printf("work item %d: %p\n", k, workset[k].pUsr); - for(j = k+1 ; j < numEntries ; ++j) { - if(workset[k].pUsr == workset[j].pUsr) { - printf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr); - flush(stderr); - } - } -} -} -#endif dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); while(numEntries > 0) { @@ -671,7 +652,6 @@ for(k = 0 ; k < numEntries ; ++k) { wrkrInfo[i].pPoll = pPoll; wrkrInfo[i].idx = workset[numEntries -1].id; wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; -dbgprintf("XXX: activating worker %d\n", i); /* Note: we must increment wrkrRunning HERE and not inside the worker's * code. This is because a worker may actually never start, and thus * increment wrkrRunning, before we finish and check the running worker @@ -719,10 +699,8 @@ RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) int nfds; int i; int iWorkset; - int numEntries; int iTCPSess; int bIsReady; - tcps_sess_t *pNewSess; nssel_t *pSel = NULL; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -745,10 +723,6 @@ RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) /* do the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(iTCPSess != -1) { -//dbgprintf("Added sessions to select set, pSel %p\n", pSel); -//dbgprintf("Added sessions to select set, iTCPSess %d\n", iTCPSess); -//dbgprintf("Added sessions to select set, ptr to strm %p\n", pThis->pSessions[iTCPSess]); -//dbgprintf("Added sessions to select set, strm %p\n", pThis->pSessions[iTCPSess]->pStrm); /* TODO: access to pNsd is NOT really CLEAN, use method... */ CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); /* now get next... */ @@ -786,7 +760,6 @@ RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - //doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); workset[iWorkset].id = iTCPSess; workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess]; ++iWorkset; @@ -837,10 +810,6 @@ Run(tcpsrv_t *pThis) rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); -#if 0 -iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); -FINALIZE; -#endif /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler @@ -1315,7 +1284,6 @@ stopWorkerPool(void) pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ pthread_join(wrkrInfo[i].tid, NULL); DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); -printf("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); pthread_cond_destroy(&wrkrInfo[i].run); } pthread_cond_destroy(&wrkrIdle); |