diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2011-01-31 15:59:43 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2011-01-31 15:59:43 +0100 |
commit | fd256a09ffa109120304d293cf6faf808c5a1a21 (patch) | |
tree | 4a5f4db37b469a34531efa6cc0f5382c01630ab9 | |
parent | 48ab717fedba586be5054320e32afc84afee9f52 (diff) | |
download | rsyslog-fd256a09ffa109120304d293cf6faf808c5a1a21.tar.gz rsyslog-fd256a09ffa109120304d293cf6faf808c5a1a21.tar.xz rsyslog-fd256a09ffa109120304d293cf6faf808c5a1a21.zip |
tcpsrv select-handler experimentally moved to multi-threading as well
first tests done with plain tcp, TLS subsystems tests need to be
carried out. No serious lab testing done so far.
-rw-r--r-- | runtime/debug.c | 2 | ||||
-rw-r--r-- | tcpsrv.c | 57 |
2 files changed, 45 insertions, 14 deletions
diff --git a/runtime/debug.c b/runtime/debug.c index a017fc30..283dae3a 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -851,7 +851,7 @@ do_dbgprint(uchar *pszObjName, char *pszMsg, size_t lenMsg) struct timeval tv; # endif -#if 0 +#if 1 /* The bWasNL handler does not really work. It works if no thread * switching occurs during non-NL messages. Else, things are messed * up. Anyhow, it works well enough to provide useful help during @@ -496,7 +496,9 @@ closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); } pThis->pOnRegularClose(*ppSess); +dbgprintf("XXX: pre destruct *ppSess = %p\n", *ppSess); tcps_sess.Destruct(ppSess); +dbgprintf("XXX: post destruct *ppSess = %p\n", *ppSess); finalize_it: RETiRet; } @@ -531,7 +533,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) (*ppSess)->pStrm, pszPeer); } //pthread_mutex_lock(&mut); +dbgprintf("XXX: calling closeSess()\n"); CHKiRet(closeSess(pThis, ppSess, pPoll)); +dbgprintf("XXX: done closeSess(), *ppSess %p\n", *ppSess); //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: @@ -574,12 +578,16 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) //printf("work item %p: connect\n", pUsr); DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + if(pPoll != NULL) + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); DBGPRINTF("New session created with NSD %p.\n", pNewSess); } else { //printf("work item %p: receive\n", pUsr); pNewSess = (tcps_sess_t*) pUsr; doReceive(pThis, &pNewSess, pPoll); + if(pPoll == NULL && pNewSess == NULL) { + pThis->pSessions[idx] = NULL; + } } finalize_it: @@ -648,21 +656,15 @@ for(k = 0 ; k < numEntries ; ++k) { while(numEntries > 0) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); -dbgprintf("XXX: num entries during processing %d\n", numEntries); if(numEntries == 1) { - //|| workset[numEntries-1].pUsr == pThis->ppLstn) { -dbgprintf("XXX: processWorkset 1\n"); /* process self, save context switch */ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); } else { -dbgprintf("XXX: processWorkset 2\n"); pthread_mutex_lock(&wrkrMut); /* check if there is a free worker */ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) -dbgprintf("XXX: wrkrinfo[%d].pSrv %p, .pUsr %p\n", i, wrkrInfo[i].pSrv, wrkrInfo[i].pUsr); /*do search*/; if(i < wrkrMax) { -dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax); /* worker free -> use it! */ wrkrInfo[i].pSrv = pThis; wrkrInfo[i].pPoll = pPoll; @@ -677,7 +679,6 @@ dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[n pthread_cond_signal(&wrkrInfo[i].run); pthread_mutex_unlock(&wrkrMut); } else { -dbgprintf("XXX: processWorkset 2.2\n"); pthread_mutex_unlock(&wrkrMut); /* no free worker, so we process this one ourselfs */ processWorksetItem(pThis, pPoll, workset[numEntries-1].id, @@ -710,11 +711,13 @@ finalize_it: */ #pragma GCC diagnostic ignored "-Wempty-body" static inline rsRetVal -RunSelect(tcpsrv_t *pThis) +RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) { DEFiRet; int nfds; int i; + int iWorkset; + int numEntries; int iTCPSess; int bIsReady; tcps_sess_t *pNewSess; @@ -740,6 +743,10 @@ RunSelect(tcpsrv_t *pThis) /* do the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(iTCPSess != -1) { +dbgprintf("Added sessions to select set, pSel %p\n", pSel); +dbgprintf("Added sessions to select set, iTCPSess %d\n", iTCPSess); +dbgprintf("Added sessions to select set, ptr to strm %p\n", pThis->pSessions[iTCPSess]); +dbgprintf("Added sessions to select set, strm %p\n", pThis->pSessions[iTCPSess]->pStrm); /* TODO: access to pNsd is NOT really CLEAN, use method... */ CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); /* now get next... */ @@ -751,13 +758,21 @@ RunSelect(tcpsrv_t *pThis) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ + iWorkset = 0; for(i = 0 ; i < pThis->iLstnCurr ; ++i) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + workset[iWorkset].id = i; + workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */ + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } + //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); --nfds; /* indicate we have processed one */ } } @@ -769,11 +784,23 @@ RunSelect(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); + //doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); + workset[iWorkset].id = iTCPSess; + workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess]; + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } + + if(iWorkset > 0) + processWorkset(pThis, NULL, iWorkset, workset); + + /* we need to copy back close descriptors */ CHKiRet(nssel.Destruct(&pSel)); finalize_it: /* this is a very special case - this time only we do not exit the function, * because that would not help us either. So we simply retry it. Let's see @@ -808,6 +835,10 @@ Run(tcpsrv_t *pThis) rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); +#if 0 +iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); +FINALIZE; +#endif /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler @@ -820,7 +851,7 @@ Run(tcpsrv_t *pThis) if(localRet != RS_RET_OK) { /* fall back to select */ dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); - iRet = RunSelect(pThis); + iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); FINALIZE; } |