summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-01-27 17:25:50 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-01-27 17:25:50 +0100
commitb621be0936468eacd19f55c9ff43c7598a4a6701 (patch)
tree15502ac4ef7ab3f21a46fdaf31c20ed4aedf51d2
parentc6447bb3452e9ac3c4daed5827f711c2c6956ca4 (diff)
downloadrsyslog-b621be0936468eacd19f55c9ff43c7598a4a6701.tar.gz
rsyslog-b621be0936468eacd19f55c9ff43c7598a4a6701.tar.xz
rsyslog-b621be0936468eacd19f55c9ff43c7598a4a6701.zip
resolved problem with intermixing requests
-rw-r--r--runtime/nsdpoll_ptcp.c4
-rw-r--r--tcpsrv.c27
2 files changed, 25 insertions, 6 deletions
diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c
index 76c4e887..b6002b09 100644
--- a/runtime/nsdpoll_ptcp.c
+++ b/runtime/nsdpoll_ptcp.c
@@ -70,8 +70,8 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock,
pNew->id = id;
pNew->pUsr = pUsr;
pNew->pSock = pSock;
- //pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */
- pNew->event.events = EPOLLET;
+ pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */
+ //pNew->event.events = EPOLLET;
if(mode & NSDPOLL_IN)
pNew->event.events |= EPOLLIN;
if(mode & NSDPOLL_OUT)
diff --git a/tcpsrv.c b/tcpsrv.c
index baaa27b1..3f078b6e 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -104,8 +104,8 @@ static struct wrkrInfo_s {
void *pUsr;
long long unsigned numCalled; /* how often was this called */
} wrkrInfo[4];
-static pthread_cond_t wrkrFinished;
static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
static int wrkrMax = 4;
static int wrkrRunning;
@@ -604,6 +604,8 @@ dbgprintf("XXX: worker %p activated\n", pthread_self());
pthread_mutex_lock(&wrkrMut);
me->pSrv = NULL; /* indicate we are free again */
--wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+dbgprintf("XXX: worker %p idling\n", pthread_self());
}
pthread_mutex_unlock(&wrkrMut);
@@ -620,6 +622,7 @@ static rsRetVal
processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
{
int i;
+ int origEntries = numEntries;
DEFiRet;
dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
@@ -637,7 +640,7 @@ dbgprintf("XXX: processWorkset 2\n");
pthread_mutex_lock(&wrkrMut);
/* check if there is a free worker */
for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i)
-dbgprintf("XXX: wrkrinfo[%d].pSrv %p\n", i, wrkrInfo[i].pSrv);
+dbgprintf("XXX: wrkrinfo[%d].pSrv %p, .pUsr %p\n", i, wrkrInfo[i].pSrv, wrkrInfo[i].pUsr);
/*do search*/;
if(i < wrkrMax) {
dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[numEntries - 1].pUsr, wrkrRunning, wrkrMax);
@@ -659,6 +662,21 @@ dbgprintf("XXX: processWorkset 2.2\n");
--numEntries;
}
+ if(origEntries > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning);
+ while(wrkrRunning > 0) {
+dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning);
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+ }
+
finalize_it:
RETiRet;
}
@@ -1219,8 +1237,9 @@ static inline void
startWorkerPool(void)
{
int i;
+ wrkrRunning = 0;
pthread_mutex_init(&wrkrMut, NULL);
- pthread_cond_init(&wrkrFinished, NULL);
+ pthread_cond_init(&wrkrIdle, NULL);
for(i = 0 ; i < wrkrMax ; ++i) {
/* init worker info structure! */
pthread_cond_init(&wrkrInfo[i].run, NULL);
@@ -1243,7 +1262,7 @@ stopWorkerPool(void)
DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
pthread_cond_destroy(&wrkrInfo[i].run);
}
- pthread_cond_destroy(&wrkrFinished);
+ pthread_cond_destroy(&wrkrIdle);
pthread_mutex_destroy(&wrkrMut);
}