diff options
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 27 |
1 files changed, 23 insertions, 4 deletions
@@ -104,8 +104,8 @@ static struct wrkrInfo_s { void *pUsr; long long unsigned numCalled; /* how often was this called */ } wrkrInfo[4]; -static pthread_cond_t wrkrFinished; static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; static int wrkrMax = 4; static int wrkrRunning; @@ -604,6 +604,8 @@ dbgprintf("XXX: worker %p activated\n", pthread_self()); pthread_mutex_lock(&wrkrMut); me->pSrv = NULL; /* indicate we are free again */ --wrkrRunning; + pthread_cond_signal(&wrkrIdle); +dbgprintf("XXX: worker %p idling\n", pthread_self()); } pthread_mutex_unlock(&wrkrMut); @@ -620,6 +622,7 @@ static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) { int i; + int origEntries = numEntries; DEFiRet; dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); @@ -637,7 +640,7 @@ dbgprintf("XXX: processWorkset 2\n"); pthread_mutex_lock(&wrkrMut); /* check if there is a free worker */ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) -dbgprintf("XXX: wrkrinfo[%d].pSrv %p\n", i, wrkrInfo[i].pSrv); +dbgprintf("XXX: wrkrinfo[%d].pSrv %p, .pUsr %p\n", i, wrkrInfo[i].pSrv, wrkrInfo[i].pUsr); /*do search*/; if(i < wrkrMax) { dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax); @@ -659,6 +662,21 @@ dbgprintf("XXX: processWorkset 2.2\n"); --numEntries; } + if(origEntries > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); +dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning); + while(wrkrRunning > 0) { +dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning); + pthread_cond_wait(&wrkrIdle, &wrkrMut); +dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning); + } + pthread_mutex_unlock(&wrkrMut); + } + finalize_it: RETiRet; } @@ -1219,8 +1237,9 @@ static inline void startWorkerPool(void) { int i; + wrkrRunning = 0; pthread_mutex_init(&wrkrMut, NULL); - pthread_cond_init(&wrkrFinished, NULL); + pthread_cond_init(&wrkrIdle, NULL); for(i = 0 ; i < wrkrMax ; ++i) { /* init worker info structure! */ pthread_cond_init(&wrkrInfo[i].run, NULL); @@ -1243,7 +1262,7 @@ stopWorkerPool(void) DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); pthread_cond_destroy(&wrkrInfo[i].run); } - pthread_cond_destroy(&wrkrFinished); + pthread_cond_destroy(&wrkrIdle); pthread_mutex_destroy(&wrkrMut); } |