summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-01-26 15:37:07 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-01-26 15:37:07 +0100
commit3049f535fff9d351480bceb7ea82667176a7c8a2 (patch)
treed633ee9f9801ad882e66504a18e07fac7c3d38f8
parent0e4373a6329a1f74dda8eceed5fd18ce92fe0d10 (diff)
downloadrsyslog-3049f535fff9d351480bceb7ea82667176a7c8a2.tar.gz
rsyslog-3049f535fff9d351480bceb7ea82667176a7c8a2.tar.xz
rsyslog-3049f535fff9d351480bceb7ea82667176a7c8a2.zip
interim commit: refactored epoll processing
this is a perquisite for multi-threading the input handler
-rw-r--r--runtime/nsd.h12
-rw-r--r--runtime/nsdpoll_ptcp.c9
-rw-r--r--runtime/nspoll.c6
-rw-r--r--runtime/nspoll.h2
-rw-r--r--runtime/typedefs.h1
-rw-r--r--tcpsrv.c68
-rw-r--r--tcpsrv.h12
7 files changed, 90 insertions, 20 deletions
diff --git a/runtime/nsd.h b/runtime/nsd.h
index 1d44a14c..5a3f462e 100644
--- a/runtime/nsd.h
+++ b/runtime/nsd.h
@@ -29,6 +29,16 @@
#include <sys/socket.h>
+/**
+ * The following structure is a set of descriptors that need to be processed.
+ * This set will be the result of the epoll call and be used
+ * in the actual request processing stage. -- rgerhards, 2011-01-24
+ */
+struct nsd_epworkset_s {
+ int id;
+ void *pUsr;
+};
+
enum nsdsel_waitOp_e {
NSDSEL_RD = 1,
NSDSEL_WR = 2,
@@ -92,7 +102,7 @@ BEGINinterface(nsdpoll) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(nsdpoll_t **ppThis);
rsRetVal (*Destruct)(nsdpoll_t **ppThis);
rsRetVal (*Ctl)(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op);
- rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *numReady, int idRdy[], void *ppUsr[]);
+ rsRetVal (*Wait)(nsdpoll_t *pNsdpoll, int timeout, int *numReady, nsd_epworkset_t workset[]);
ENDinterface(nsdpoll)
#define nsdpollCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c
index f22c6cb6..26810b7d 100644
--- a/runtime/nsdpoll_ptcp.c
+++ b/runtime/nsdpoll_ptcp.c
@@ -204,7 +204,7 @@ finalize_it:
* rgerhards, 2009-11-18
*/
static rsRetVal
-Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, int idRdy[], void *ppUsr[]) {
+Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, nsd_epworkset_t workset[]) {
nsdpoll_ptcp_t *pThis = (nsdpoll_ptcp_t*) pNsdpoll;
nsdpoll_epollevt_lst_t *pOurEvt;
struct epoll_event event[128];
@@ -212,8 +212,7 @@ Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, int idRdy[], void *ppUsr
int i;
DEFiRet;
- assert(idRdy != NULL);
- assert(ppUsr != NULL);
+ assert(workset != NULL);
if(*numEntries > 128)
*numEntries = 128;
@@ -234,8 +233,8 @@ Wait(nsdpoll_t *pNsdpoll, int timeout, int *numEntries, int idRdy[], void *ppUsr
dbgprintf("epoll returned %d entries\n", nfds);
for(i = 0 ; i < nfds ; ++i) {
pOurEvt = (nsdpoll_epollevt_lst_t*) event[i].data.u64;
- idRdy[i] = pOurEvt->id;
- ppUsr[i] = pOurEvt->pUsr;
+ workset[i].id = pOurEvt->id;
+ workset[i].pUsr = pOurEvt->pUsr;
dbgprintf("epoll push ppusr[%d]: %p\n", i, pOurEvt->pUsr);
}
*numEntries = nfds;
diff --git a/runtime/nspoll.c b/runtime/nspoll.c
index c36375fd..a936b255 100644
--- a/runtime/nspoll.c
+++ b/runtime/nspoll.c
@@ -129,11 +129,11 @@ finalize_it:
/* Carries out the actual wait (all done in lower layers)
*/
static rsRetVal
-Wait(nspoll_t *pThis, int timeout, int *numEntries, int idRdy[], void *ppUsr[]) {
+Wait(nspoll_t *pThis, int timeout, int *numEntries, nsd_epworkset_t workset[]) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, nspoll);
- assert(idRdy != NULL);
- iRet = pThis->Drvr.Wait(pThis->pDrvrData, timeout, numEntries, idRdy, ppUsr);
+ assert(workset != NULL);
+ iRet = pThis->Drvr.Wait(pThis->pDrvrData, timeout, numEntries, workset);
RETiRet;
}
diff --git a/runtime/nspoll.h b/runtime/nspoll.h
index 4b066577..037f6c38 100644
--- a/runtime/nspoll.h
+++ b/runtime/nspoll.h
@@ -50,7 +50,7 @@ BEGINinterface(nspoll) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Construct)(nspoll_t **ppThis);
rsRetVal (*ConstructFinalize)(nspoll_t *pThis);
rsRetVal (*Destruct)(nspoll_t **ppThis);
- rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *numEntries, int idRdy[], void *ppUsr[]);
+ rsRetVal (*Wait)(nspoll_t *pNsdpoll, int timeout, int *numEntries, nsd_epworkset_t workset[]);
rsRetVal (*Ctl)(nspoll_t *pNsdpoll, netstrm_t *pStrm, int id, void *pUsr, int mode, int op);
rsRetVal (*IsEPollSupported)(void); /* static method */
ENDinterface(nspoll)
diff --git a/runtime/typedefs.h b/runtime/typedefs.h
index 1f624f7a..b6cfbd57 100644
--- a/runtime/typedefs.h
+++ b/runtime/typedefs.h
@@ -79,6 +79,7 @@ typedef struct parserList_s parserList_t;
typedef struct strgen_s strgen_t;
typedef struct strgenList_s strgenList_t;
typedef struct statsobj_s statsobj_t;
+typedef struct nsd_epworkset_s nsd_epworkset_t;
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */
typedef uint64 qDeqID; /* queue Dequeue order ID. 32 bits is considered dangerously few */
diff --git a/tcpsrv.c b/tcpsrv.c
index 1ebfd91e..77b7b9be 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -539,6 +539,53 @@ finalize_it:
}
+/* process a single workset item
+ */
+static inline rsRetVal
+processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
+{
+ tcps_sess_t *pNewSess;
+ DEFiRet;
+
+ dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr);
+ if(pUsr == pThis->ppLstn) {
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
+ SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ pNewSess = (tcps_sess_t*) pUsr;
+ doReceive(pThis, &pNewSess, pPoll);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Process a workset, that is handle io. We become activated
+ * from either select or epoll handler. We split the workload
+ * out to a pool of threads, but try to avoid context switches
+ * as much as possible.
+ */
+static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
+{
+ int i;
+ DEFiRet;
+
+ dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
+
+ for(i = 0 ; i < numEntries ; i++) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
/* This function is called to gather input.
* This variant here is only used if we need to work with a netstream driver
* that does not support epoll().
@@ -637,12 +684,9 @@ Run(tcpsrv_t *pThis)
{
DEFiRet;
int i;
- int retIDs[128]; /* 128 is currently fixed num of concurrent requests */
- void *pUsr[128];
+ nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */
int numEntries;
- tcps_sess_t *pNewSess;
nspoll_t *pPoll = NULL;
- int currIdx;
rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -675,8 +719,8 @@ Run(tcpsrv_t *pThis)
}
while(1) {
- numEntries = sizeof(retIDs)/sizeof(int);
- localRet = nspoll.Wait(pPoll, -1, &numEntries, retIDs, pUsr);
+ numEntries = sizeof(workset)/sizeof(nsd_epworkset_t);
+ localRet = nspoll.Wait(pPoll, -1, &numEntries, workset);
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
@@ -687,23 +731,27 @@ Run(tcpsrv_t *pThis)
if(localRet != RS_RET_OK)
continue;
+ processWorkset(pThis, pPoll, numEntries, workset);
+#if 0
dbgprintf("poll returned with %d entries.\n", numEntries);
for(i = 0 ; i < numEntries ; i++) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- currIdx = retIDs[i];
- dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, pUsr[i]);
- if(pUsr[i] == pThis->ppLstn) {
+ currIdx = workset[i].id;
+ dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr);
+dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]);
+ if(workset[i].pUsr == pThis->ppLstn) {
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]);
SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]);
CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
DBGPRINTF("New session created with NSD %p.\n", pNewSess);
} else {
- pNewSess = (tcps_sess_t*) pUsr[i];
+ pNewSess = (tcps_sess_t*) workset[i].pUsr;
doReceive(pThis, &pNewSess, pPoll);
}
}
+#endif
}
/* remove the tcp listen sockets from the epoll set */
diff --git a/tcpsrv.h b/tcpsrv.h
index 57bdf4b1..d8669540 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -83,6 +83,18 @@ struct tcpsrv_s {
};
+/**
+ * The following structure is a set of descriptors that need to be processed.
+ * This set will be the result of the epoll or select call and be used
+ * in the actual request processing stage. It serves as a basis
+ * to run multiple request by concurrent threads. -- rgerhards, 2011-01-24
+ */
+struct tcpsrv_workset_s {
+ int idx; /**< index into session table (or -1 if listener) */
+ void *pUsr;
+};
+
+
/* interfaces */
BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
INTERFACEObjDebugPrint(tcpsrv);