summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-01-31 15:59:43 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-01-31 15:59:43 +0100
commitfd256a09ffa109120304d293cf6faf808c5a1a21 (patch)
tree4a5f4db37b469a34531efa6cc0f5382c01630ab9 /tcpsrv.c
parent48ab717fedba586be5054320e32afc84afee9f52 (diff)
downloadrsyslog-fd256a09ffa109120304d293cf6faf808c5a1a21.tar.gz
rsyslog-fd256a09ffa109120304d293cf6faf808c5a1a21.tar.xz
rsyslog-fd256a09ffa109120304d293cf6faf808c5a1a21.zip
tcpsrv select-handler experimentally moved to multi-threading as well
first tests done with plain tcp, TLS subsystems tests need to be carried out. No serious lab testing done so far.
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c57
1 files changed, 44 insertions, 13 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index 23b2f630..d86bff6b 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -496,7 +496,9 @@ closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) {
CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
}
pThis->pOnRegularClose(*ppSess);
+dbgprintf("XXX: pre destruct *ppSess = %p\n", *ppSess);
tcps_sess.Destruct(ppSess);
+dbgprintf("XXX: post destruct *ppSess = %p\n", *ppSess);
finalize_it:
RETiRet;
}
@@ -531,7 +533,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
(*ppSess)->pStrm, pszPeer);
}
//pthread_mutex_lock(&mut);
+dbgprintf("XXX: calling closeSess()\n");
CHKiRet(closeSess(pThis, ppSess, pPoll));
+dbgprintf("XXX: done closeSess(), *ppSess %p\n", *ppSess);
//pthread_mutex_unlock(&mut);
break;
case RS_RET_RETRY:
@@ -574,12 +578,16 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
//printf("work item %p: connect\n", pUsr);
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
- CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ if(pPoll != NULL)
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
DBGPRINTF("New session created with NSD %p.\n", pNewSess);
} else {
//printf("work item %p: receive\n", pUsr);
pNewSess = (tcps_sess_t*) pUsr;
doReceive(pThis, &pNewSess, pPoll);
+ if(pPoll == NULL && pNewSess == NULL) {
+ pThis->pSessions[idx] = NULL;
+ }
}
finalize_it:
@@ -648,21 +656,15 @@ for(k = 0 ; k < numEntries ; ++k) {
while(numEntries > 0) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
-dbgprintf("XXX: num entries during processing %d\n", numEntries);
if(numEntries == 1) {
- //|| workset[numEntries-1].pUsr == pThis->ppLstn) {
-dbgprintf("XXX: processWorkset 1\n");
/* process self, save context switch */
processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr);
} else {
-dbgprintf("XXX: processWorkset 2\n");
pthread_mutex_lock(&wrkrMut);
/* check if there is a free worker */
for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i)
-dbgprintf("XXX: wrkrinfo[%d].pSrv %p, .pUsr %p\n", i, wrkrInfo[i].pSrv, wrkrInfo[i].pUsr);
/*do search*/;
if(i < wrkrMax) {
-dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax);
/* worker free -> use it! */
wrkrInfo[i].pSrv = pThis;
wrkrInfo[i].pPoll = pPoll;
@@ -677,7 +679,6 @@ dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[n
pthread_cond_signal(&wrkrInfo[i].run);
pthread_mutex_unlock(&wrkrMut);
} else {
-dbgprintf("XXX: processWorkset 2.2\n");
pthread_mutex_unlock(&wrkrMut);
/* no free worker, so we process this one ourselfs */
processWorksetItem(pThis, pPoll, workset[numEntries-1].id,
@@ -710,11 +711,13 @@ finalize_it:
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static inline rsRetVal
-RunSelect(tcpsrv_t *pThis)
+RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset)
{
DEFiRet;
int nfds;
int i;
+ int iWorkset;
+ int numEntries;
int iTCPSess;
int bIsReady;
tcps_sess_t *pNewSess;
@@ -740,6 +743,10 @@ RunSelect(tcpsrv_t *pThis)
/* do the sessions */
iTCPSess = TCPSessGetNxtSess(pThis, -1);
while(iTCPSess != -1) {
+dbgprintf("Added sessions to select set, pSel %p\n", pSel);
+dbgprintf("Added sessions to select set, iTCPSess %d\n", iTCPSess);
+dbgprintf("Added sessions to select set, ptr to strm %p\n", pThis->pSessions[iTCPSess]);
+dbgprintf("Added sessions to select set, strm %p\n", pThis->pSessions[iTCPSess]->pStrm);
/* TODO: access to pNsd is NOT really CLEAN, use method... */
CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD));
/* now get next... */
@@ -751,13 +758,21 @@ RunSelect(tcpsrv_t *pThis)
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
+ iWorkset = 0;
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
if(glbl.GetGlobalInputTermState() == 1)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
- SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
+ workset[iWorkset].id = i;
+ workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
+ //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]);
+ //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]);
--nfds; /* indicate we have processed one */
}
}
@@ -769,11 +784,23 @@ RunSelect(tcpsrv_t *pThis)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds));
if(bIsReady) {
- doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
+ //doReceive(pThis, &pThis->pSessions[iTCPSess], NULL);
+ workset[iWorkset].id = iTCPSess;
+ workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess];
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
--nfds; /* indicate we have processed one */
}
iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
}
+
+ if(iWorkset > 0)
+ processWorkset(pThis, NULL, iWorkset, workset);
+
+ /* we need to copy back close descriptors */
CHKiRet(nssel.Destruct(&pSel));
finalize_it: /* this is a very special case - this time only we do not exit the function,
* because that would not help us either. So we simply retry it. Let's see
@@ -808,6 +835,10 @@ Run(tcpsrv_t *pThis)
rsRetVal localRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
+#if 0
+iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t));
+FINALIZE;
+#endif
/* this is an endless loop - it is terminated by the framework canelling
* this thread. Thus, we also need to instantiate a cancel cleanup handler
@@ -820,7 +851,7 @@ Run(tcpsrv_t *pThis)
if(localRet != RS_RET_OK) {
/* fall back to select */
dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
- iRet = RunSelect(pThis);
+ iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t));
FINALIZE;
}