From 48ab717fedba586be5054320e32afc84afee9f52 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 31 Jan 2011 13:13:00 +0100 Subject: fixing regression: multi-threading had races --- tcpsrv.c | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) (limited to 'tcpsrv.c') diff --git a/tcpsrv.c b/tcpsrv.c index da5182e1..23b2f630 100644 --- a/tcpsrv.c +++ b/tcpsrv.c @@ -71,6 +71,7 @@ #include "ruleset.h" #include "unicode-helper.h" + MODULE_TYPE_LIB /* defines */ @@ -514,6 +515,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) rsRetVal localRet; DEFiRet; +//printf("doReceive %p/%p\n", pThis, *ppSess); ISOBJ_TYPE_assert(pThis, tcpsrv); DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm); /* Receive message */ @@ -528,7 +530,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } + //pthread_mutex_lock(&mut); CHKiRet(closeSess(pThis, ppSess, pPoll)); + //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: /* we simply ignore retry - this is not an error, but we also have not received anything */ @@ -567,11 +571,13 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); if(pUsr == pThis->ppLstn) { +//printf("work item %p: connect\n", pUsr); DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); DBGPRINTF("New session created with NSD %p.\n", pNewSess); } else { +//printf("work item %p: receive\n", pUsr); pNewSess = (tcps_sess_t*) pUsr; doReceive(pThis, &pNewSess, pPoll); } @@ -595,10 +601,8 @@ wrkr(void *myself) } if(glbl.GetGlobalInputTermState() == 1) break; - ++wrkrRunning; pthread_mutex_unlock(&wrkrMut); -dbgprintf("XXX: worker %p activated\n", pthread_self()); ++me->numCalled; processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr); @@ -606,15 +610,12 @@ dbgprintf("XXX: worker %p activated\n", pthread_self()); me->pSrv = NULL; /* indicate we are free again */ --wrkrRunning; pthread_cond_signal(&wrkrIdle); -dbgprintf("XXX: worker %p idling\n", pthread_self()); } pthread_mutex_unlock(&wrkrMut); return NULL; } -#warning remove include -#include /* Process a workset, that is handle io. We become activated * from either select or epoll handler. We split the workload @@ -628,16 +629,20 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t int origEntries = numEntries; DEFiRet; +#if 0 { /* chck workset for dupes */ int k, j; -for(k = 0 ; k < numEntries ; ++k) +for(k = 0 ; k < numEntries ; ++k) { + //printf("work item %d: %p\n", k, workset[k].pUsr); for(j = k+1 ; j < numEntries ; ++j) { if(workset[k].pUsr == workset[j].pUsr) { - fprintf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr); - fflush(stderr); + printf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr); + flush(stderr); } } } +} +#endif dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); while(numEntries > 0) { @@ -663,6 +668,12 @@ dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[n wrkrInfo[i].pPoll = pPoll; wrkrInfo[i].idx = workset[numEntries -1].id; wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; + /* Note: we must increment wrkrRunning HERE and not inside the worker's + * code. This is because a worker may actually never start, and thus + * increment wrkrRunning, before we finish and check the running worker + * count. We can only avoid this by incrementing it here. + */ + ++wrkrRunning; pthread_cond_signal(&wrkrInfo[i].run); pthread_mutex_unlock(&wrkrMut); } else { @@ -682,11 +693,8 @@ dbgprintf("XXX: processWorkset 2.2\n"); * by workers running during the epoll call. */ pthread_mutex_lock(&wrkrMut); -dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning); while(wrkrRunning > 0) { -dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning); pthread_cond_wait(&wrkrIdle, &wrkrMut); -dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning); } pthread_mutex_unlock(&wrkrMut); } -- cgit