diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2011-01-26 15:37:07 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2011-01-26 15:37:07 +0100 |
commit | 3049f535fff9d351480bceb7ea82667176a7c8a2 (patch) | |
tree | d633ee9f9801ad882e66504a18e07fac7c3d38f8 /tcpsrv.c | |
parent | 0e4373a6329a1f74dda8eceed5fd18ce92fe0d10 (diff) | |
download | rsyslog-3049f535fff9d351480bceb7ea82667176a7c8a2.tar.gz rsyslog-3049f535fff9d351480bceb7ea82667176a7c8a2.tar.xz rsyslog-3049f535fff9d351480bceb7ea82667176a7c8a2.zip |
interim commit: refactored epoll processing
this is a perquisite for multi-threading the input handler
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 68 |
1 files changed, 58 insertions, 10 deletions
@@ -539,6 +539,53 @@ finalize_it: } +/* process a single workset item + */ +static inline rsRetVal +processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) +{ + tcps_sess_t *pNewSess; + DEFiRet; + + dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); + if(pUsr == pThis->ppLstn) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); + SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + pNewSess = (tcps_sess_t*) pUsr; + doReceive(pThis, &pNewSess, pPoll); + } + +finalize_it: + RETiRet; +} + + +/* Process a workset, that is handle io. We become activated + * from either select or epoll handler. We split the workload + * out to a pool of threads, but try to avoid context switches + * as much as possible. + */ +static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) +{ + int i; + DEFiRet; + + dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); + + for(i = 0 ; i < numEntries ; i++) { + if(glbl.GetGlobalInputTermState() == 1) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr)); + } + +finalize_it: + RETiRet; +} + + /* This function is called to gather input. * This variant here is only used if we need to work with a netstream driver * that does not support epoll(). @@ -637,12 +684,9 @@ Run(tcpsrv_t *pThis) { DEFiRet; int i; - int retIDs[128]; /* 128 is currently fixed num of concurrent requests */ - void *pUsr[128]; + nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */ int numEntries; - tcps_sess_t *pNewSess; nspoll_t *pPoll = NULL; - int currIdx; rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -675,8 +719,8 @@ Run(tcpsrv_t *pThis) } while(1) { - numEntries = sizeof(retIDs)/sizeof(int); - localRet = nspoll.Wait(pPoll, -1, &numEntries, retIDs, pUsr); + numEntries = sizeof(workset)/sizeof(nsd_epworkset_t); + localRet = nspoll.Wait(pPoll, -1, &numEntries, workset); if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ @@ -687,23 +731,27 @@ Run(tcpsrv_t *pThis) if(localRet != RS_RET_OK) continue; + processWorkset(pThis, pPoll, numEntries, workset); +#if 0 dbgprintf("poll returned with %d entries.\n", numEntries); for(i = 0 ; i < numEntries ; i++) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); - currIdx = retIDs[i]; - dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, pUsr[i]); - if(pUsr[i] == pThis->ppLstn) { + currIdx = workset[i].id; + dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr); +dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]); + if(workset[i].pUsr == pThis->ppLstn) { DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]); SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]); CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); DBGPRINTF("New session created with NSD %p.\n", pNewSess); } else { - pNewSess = (tcps_sess_t*) pUsr[i]; + pNewSess = (tcps_sess_t*) workset[i].pUsr; doReceive(pThis, &pNewSess, pPoll); } } +#endif } /* remove the tcp listen sockets from the epoll set */ |