summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-02-01 14:00:40 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-02-01 14:00:40 +0100
commitc3da462a9441f47ca1fd873a5943eb1d2c607f5f (patch)
tree7195d760e3345ae9daa527af55650f5e1e1c4a69
parent7974621502ae249a5e393dafb4f69895851b4014 (diff)
downloadrsyslog-c3da462a9441f47ca1fd873a5943eb1d2c607f5f.tar.gz
rsyslog-c3da462a9441f47ca1fd873a5943eb1d2c607f5f.tar.xz
rsyslog-c3da462a9441f47ca1fd873a5943eb1d2c607f5f.zip
fixed some regressions in imptcp
from new changes, so far unreleased versions
-rw-r--r--plugins/imptcp/imptcp.c13
1 files changed, 11 insertions, 2 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index cb7e7ab8..63447a72 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -119,6 +119,7 @@ struct ptcpsrv_s {
ruleset_t *pRuleset;
ptcplstn_t *pLstn; /* root of our listeners */
ptcpsess_t *pSess; /* root of our sessions */
+ pthread_mutex_t mutSessLst;
};
/* the ptcp session object. Describes a single active session.
@@ -164,7 +165,7 @@ static struct wrkrInfo_s {
pthread_cond_t run;
struct epoll_event *event; /* event == NULL -> idle */
long long unsigned numCalled; /* how often was this called */
-} wrkrInfo[4];
+} wrkrInfo[16];
static pthread_mutex_t wrkrMut;
static pthread_cond_t wrkrIdle;
static int wrkrRunning;
@@ -215,6 +216,7 @@ static void
destructSrv(ptcpsrv_t *pSrv)
{
prop.Destruct(&pSrv->pInputName);
+ pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->port);
free(pSrv);
}
@@ -797,10 +799,12 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP)
/* add to start of server's listener list */
pSess->prev = NULL;
+ pthread_mutex_lock(&pSrv->mutSessLst);
pSess->next = pSrv->pSess;
if(pSrv->pSess != NULL)
pSrv->pSess->prev = pSess;
pSrv->pSess = pSess;
+ pthread_mutex_unlock(&pSrv->mutSessLst);
iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd);
@@ -823,6 +827,7 @@ closeSess(ptcpsess_t *pSess)
CHKiRet(removeEPollSock(sock, pSess->epd));
close(sock);
+ pthread_mutex_lock(&pSess->pSrv->mutSessLst);
/* finally unlink session from structures */
if(pSess->next != NULL)
pSess->next->prev = pSess->prev;
@@ -832,6 +837,7 @@ closeSess(ptcpsess_t *pSess)
} else {
pSess->prev->next = pSess->next;
}
+ pthread_mutex_unlock(&pSess->pSrv->mutSessLst);
/* unlinked, now remove structure */
destructSess(pSess);
@@ -869,6 +875,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
ptcpsrv_t *pSrv;
CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t)));
+ pthread_mutex_init(&pSrv->mutSessLst, NULL);
pSrv->pSess = NULL;
pSrv->pLstn = NULL;
pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose;
@@ -907,6 +914,8 @@ startWorkerPool(void)
{
int i;
wrkrRunning = 0;
+ if(cs.wrkrMax > 16)
+ cs.wrkrMax = 16; /* TODO: make dynamic? */
pthread_mutex_init(&wrkrMut, NULL);
pthread_cond_init(&wrkrIdle, NULL);
for(i = 0 ; i < cs.wrkrMax ; ++i) {
@@ -1078,6 +1087,7 @@ processWorkSet(int nEvents, struct epoll_event events[])
if(i < cs.wrkrMax) {
/* worker free -> use it! */
wrkrInfo[i].event = events+iEvt;
+ ++wrkrRunning;
pthread_cond_signal(&wrkrInfo[i].run);
pthread_mutex_unlock(&wrkrMut);
} else {
@@ -1118,7 +1128,6 @@ wrkr(void *myself)
}
if(glbl.GetGlobalInputTermState() == 1)
break;
- ++wrkrRunning;
pthread_mutex_unlock(&wrkrMut);
++me->numCalled;