From c3da462a9441f47ca1fd873a5943eb1d2c607f5f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 1 Feb 2011 14:00:40 +0100 Subject: fixed some regressions in imptcp from new changes, so far unreleased versions --- plugins/imptcp/imptcp.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index cb7e7ab8..63447a72 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -119,6 +119,7 @@ struct ptcpsrv_s { ruleset_t *pRuleset; ptcplstn_t *pLstn; /* root of our listeners */ ptcpsess_t *pSess; /* root of our sessions */ + pthread_mutex_t mutSessLst; }; /* the ptcp session object. Describes a single active session. @@ -164,7 +165,7 @@ static struct wrkrInfo_s { pthread_cond_t run; struct epoll_event *event; /* event == NULL -> idle */ long long unsigned numCalled; /* how often was this called */ -} wrkrInfo[4]; +} wrkrInfo[16]; static pthread_mutex_t wrkrMut; static pthread_cond_t wrkrIdle; static int wrkrRunning; @@ -215,6 +216,7 @@ static void destructSrv(ptcpsrv_t *pSrv) { prop.Destruct(&pSrv->pInputName); + pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->port); free(pSrv); } @@ -797,10 +799,12 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP) /* add to start of server's listener list */ pSess->prev = NULL; + pthread_mutex_lock(&pSrv->mutSessLst); pSess->next = pSrv->pSess; if(pSrv->pSess != NULL) pSrv->pSess->prev = pSess; pSrv->pSess = pSess; + pthread_mutex_unlock(&pSrv->mutSessLst); iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd); @@ -823,6 +827,7 @@ closeSess(ptcpsess_t *pSess) CHKiRet(removeEPollSock(sock, pSess->epd)); close(sock); + pthread_mutex_lock(&pSess->pSrv->mutSessLst); /* finally unlink session from structures */ if(pSess->next != NULL) pSess->next->prev = pSess->prev; @@ -832,6 +837,7 @@ closeSess(ptcpsess_t *pSess) } else { pSess->prev->next = pSess->next; } + pthread_mutex_unlock(&pSess->pSrv->mutSessLst); /* unlinked, now remove structure */ destructSess(pSess); @@ -869,6 +875,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa ptcpsrv_t *pSrv; CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t))); + pthread_mutex_init(&pSrv->mutSessLst, NULL); pSrv->pSess = NULL; pSrv->pLstn = NULL; pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose; @@ -907,6 +914,8 @@ startWorkerPool(void) { int i; wrkrRunning = 0; + if(cs.wrkrMax > 16) + cs.wrkrMax = 16; /* TODO: make dynamic? */ pthread_mutex_init(&wrkrMut, NULL); pthread_cond_init(&wrkrIdle, NULL); for(i = 0 ; i < cs.wrkrMax ; ++i) { @@ -1078,6 +1087,7 @@ processWorkSet(int nEvents, struct epoll_event events[]) if(i < cs.wrkrMax) { /* worker free -> use it! */ wrkrInfo[i].event = events+iEvt; + ++wrkrRunning; pthread_cond_signal(&wrkrInfo[i].run); pthread_mutex_unlock(&wrkrMut); } else { @@ -1118,7 +1128,6 @@ wrkr(void *myself) } if(glbl.GetGlobalInputTermState() == 1) break; - ++wrkrRunning; pthread_mutex_unlock(&wrkrMut); ++me->numCalled; -- cgit