diff options
-rw-r--r-- | runtime/nsd.h | 12 | ||||
-rw-r--r-- | runtime/nsdpoll_ptcp.c | 9 | ||||
-rw-r--r-- | runtime/nspoll.c | 6 | ||||
-rw-r--r-- | runtime/nspoll.h | 2 | ||||
-rw-r--r-- | runtime/typedefs.h | 1 | ||||
-rw-r--r-- | tcpsrv.c | 68 | ||||
-rw-r--r-- | tcpsrv.h | 12 |
7 files changed, 90 insertions, 20 deletions
diff --git a/runtime/nsd.h b/runtime/nsd.h index 1d44a14c..5a3f462e 100644 --- a/runtime/nsd.h +++ b/runtime/nsd.h @@ -29,6 +29,16 @@ #include <sys/socket.h> +/** + * The following structure is a set of descriptors that need to be processed. + * This set will be the result of the epoll call and be used + * in the actual request processing stage. -- rgerhards, 2011-01-24 + */ +struct nsd_epworkset_s { + int id; + void *pUsr; +}; + enum nsdsel_waitOp_e { NSDSEL_RD = 1, NSDSEL_WR = 2, @@ -92,7 +102,7 @@ BEGINinterface(nsdpoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(nsdpoll_t **ppThis); rsRetVal (*Destruct)(nsdpoll_t **ppThis); rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op); - rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *numReady, int idRdy[], void *ppUsr[]); + rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *numReady, nsd_epworkset_t workset[]); ENDinterface(nsdpoll) #define nsdpollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c index f22c6cb6..26810b7d 100644 --- a/runtime/nsdpoll_ptcp.c +++ b/runtime/nsdpoll_ptcp.c @@ -204,7 +204,7 @@ finalize_it: * rgerhards, 2009-11-18 */ static rsRetVal -Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, int idRdy[], void *ppUsr[]) { +Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, nsd_epworkset_t workset[]) { nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll; nsdpoll_epollevt_lst_t *pOurEvt; struct epoll_event event[128]; @@ -212,8 +212,7 @@ Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, int idRdy[], void *ppUsr int i; DEFiRet; - assert(idRdy != NULL); - assert(ppUsr != NULL); + assert(workset != NULL); if(*numEntries > 128) *numEntries = 128; @@ -234,8 +233,8 @@ Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, int idRdy[], void *ppUsr dbgprintf("epoll returned %d entries\n", nfds); for(i = 0 ; i < nfds ; ++i) { pOurEvt = (nsdpoll_epollevt_lst_t*) event[i].data.u64; - idRdy[i] = pOurEvt->id; - ppUsr[i] = pOurEvt->pUsr; + workset[i].id = pOurEvt->id; + workset[i].pUsr = pOurEvt->pUsr; dbgprintf("epoll push ppusr[%d]: %p\n", i, pOurEvt->pUsr); } *numEntries = nfds; diff --git a/runtime/nspoll.c b/runtime/nspoll.c index c36375fd..a936b255 100644 --- a/runtime/nspoll.c +++ b/runtime/nspoll.c @@ -129,11 +129,11 @@ finalize_it: /* Carries out the actual wait (all done in lower layers) */ static rsRetVal -Wait(nspoll_t *pThis, int timeout, int *numEntries, int idRdy[], void *ppUsr[]) { +Wait(nspoll_t *pThis, int timeout, int *numEntries, nsd_epworkset_t workset[]) { DEFiRet; ISOBJ_TYPE_assert(pThis, nspoll); - assert(idRdy != NULL); - iRet = pThis->Drvr.Wait(pThis->pDrvrData, timeout, numEntries, idRdy, ppUsr); + assert(workset != NULL); + iRet = pThis->Drvr.Wait(pThis->pDrvrData, timeout, numEntries, workset); RETiRet; } diff --git a/runtime/nspoll.h b/runtime/nspoll.h index 4b066577..037f6c38 100644 --- a/runtime/nspoll.h +++ b/runtime/nspoll.h @@ -50,7 +50,7 @@ BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(nspoll_t **ppThis); rsRetVal (*ConstructFinalize)(nspoll_t *pThis); rsRetVal (*Destruct)(nspoll_t **ppThis); - rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *numEntries, int idRdy[], void *ppUsr[]); + rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *numEntries, nsd_epworkset_t workset[]); rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int mode, int op); rsRetVal (*IsEPollSupported)(void); /* static method */ ENDinterface(nspoll) diff --git a/runtime/typedefs.h b/runtime/typedefs.h index 1f624f7a..b6cfbd57 100644 --- a/runtime/typedefs.h +++ b/runtime/typedefs.h @@ -79,6 +79,7 @@ typedef struct parserList_s parserList_t; typedef struct strgen_s strgen_t; typedef struct strgenList_s strgenList_t; typedef struct statsobj_s statsobj_t; +typedef struct nsd_epworkset_s nsd_epworkset_t; typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */ typedef uint64 qDeqID; /* queue Dequeue order ID. 32 bits is considered dangerously few */ @@ -539,6 +539,53 @@ finalize_it: } +/* process a single workset item + */ +static inline rsRetVal +processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) +{ + tcps_sess_t *pNewSess; + DEFiRet; + + dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); + if(pUsr == pThis->ppLstn) { + DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); + SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + pNewSess = (tcps_sess_t*) pUsr; + doReceive(pThis, &pNewSess, pPoll); + } + +finalize_it: + RETiRet; +} + + +/* Process a workset, that is handle io. We become activated + * from either select or epoll handler. We split the workload + * out to a pool of threads, but try to avoid context switches + * as much as possible. + */ +static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) +{ + int i; + DEFiRet; + + dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); + + for(i = 0 ; i < numEntries ; i++) { + if(glbl.GetGlobalInputTermState() == 1) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr)); + } + +finalize_it: + RETiRet; +} + + /* This function is called to gather input. * This variant here is only used if we need to work with a netstream driver * that does not support epoll(). @@ -637,12 +684,9 @@ Run(tcpsrv_t *pThis) { DEFiRet; int i; - int retIDs[128]; /* 128 is currently fixed num of concurrent requests */ - void *pUsr[128]; + nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */ int numEntries; - tcps_sess_t *pNewSess; nspoll_t *pPoll = NULL; - int currIdx; rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -675,8 +719,8 @@ Run(tcpsrv_t *pThis) } while(1) { - numEntries = sizeof(retIDs)/sizeof(int); - localRet = nspoll.Wait(pPoll, -1, &numEntries, retIDs, pUsr); + numEntries = sizeof(workset)/sizeof(nsd_epworkset_t); + localRet = nspoll.Wait(pPoll, -1, &numEntries, workset); if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ @@ -687,23 +731,27 @@ Run(tcpsrv_t *pThis) if(localRet != RS_RET_OK) continue; + processWorkset(pThis, pPoll, numEntries, workset); +#if 0 dbgprintf("poll returned with %d entries.\n", numEntries); for(i = 0 ; i < numEntries ; i++) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); - currIdx = retIDs[i]; - dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, pUsr[i]); - if(pUsr[i] == pThis->ppLstn) { + currIdx = workset[i].id; + dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr); +dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]); + if(workset[i].pUsr == pThis->ppLstn) { DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]); SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]); CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); DBGPRINTF("New session created with NSD %p.\n", pNewSess); } else { - pNewSess = (tcps_sess_t*) pUsr[i]; + pNewSess = (tcps_sess_t*) workset[i].pUsr; doReceive(pThis, &pNewSess, pPoll); } } +#endif } /* remove the tcp listen sockets from the epoll set */ @@ -83,6 +83,18 @@ struct tcpsrv_s { }; +/** + * The following structure is a set of descriptors that need to be processed. + * This set will be the result of the epoll or select call and be used + * in the actual request processing stage. It serves as a basis + * to run multiple request by concurrent threads. -- rgerhards, 2011-01-24 + */ +struct tcpsrv_workset_s { + int idx; /**< index into session table (or -1 if listener) */ + void *pUsr; +}; + + /* interfaces */ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */ INTERFACEObjDebugPrint(tcpsrv); |