summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-01-31 13:13:00 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-01-31 13:13:00 +0100
commit48ab717fedba586be5054320e32afc84afee9f52 (patch)
treeffd44239cfd9c51ef24a1fc4b3deab271ae73e4c /tcpsrv.c
parenta4a94ddfc0dc2256d7a3bc79ed8f9489de9f0f9b (diff)
downloadrsyslog-48ab717fedba586be5054320e32afc84afee9f52.tar.gz
rsyslog-48ab717fedba586be5054320e32afc84afee9f52.tar.xz
rsyslog-48ab717fedba586be5054320e32afc84afee9f52.zip
fixing regression: multi-threading had races
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c30
1 files changed, 19 insertions, 11 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index da5182e1..23b2f630 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -71,6 +71,7 @@
#include "ruleset.h"
#include "unicode-helper.h"
+
MODULE_TYPE_LIB
/* defines */
@@ -514,6 +515,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
rsRetVal localRet;
DEFiRet;
+//printf("doReceive %p/%p\n", pThis, *ppSess);
ISOBJ_TYPE_assert(pThis, tcpsrv);
DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm);
/* Receive message */
@@ -528,7 +530,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
(*ppSess)->pStrm, pszPeer);
}
+ //pthread_mutex_lock(&mut);
CHKiRet(closeSess(pThis, ppSess, pPoll));
+ //pthread_mutex_unlock(&mut);
break;
case RS_RET_RETRY:
/* we simply ignore retry - this is not an error, but we also have not received anything */
@@ -567,11 +571,13 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr);
if(pUsr == pThis->ppLstn) {
+//printf("work item %p: connect\n", pUsr);
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
DBGPRINTF("New session created with NSD %p.\n", pNewSess);
} else {
+//printf("work item %p: receive\n", pUsr);
pNewSess = (tcps_sess_t*) pUsr;
doReceive(pThis, &pNewSess, pPoll);
}
@@ -595,10 +601,8 @@ wrkr(void *myself)
}
if(glbl.GetGlobalInputTermState() == 1)
break;
- ++wrkrRunning;
pthread_mutex_unlock(&wrkrMut);
-dbgprintf("XXX: worker %p activated\n", pthread_self());
++me->numCalled;
processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
@@ -606,15 +610,12 @@ dbgprintf("XXX: worker %p activated\n", pthread_self());
me->pSrv = NULL; /* indicate we are free again */
--wrkrRunning;
pthread_cond_signal(&wrkrIdle);
-dbgprintf("XXX: worker %p idling\n", pthread_self());
}
pthread_mutex_unlock(&wrkrMut);
return NULL;
}
-#warning remove include
-#include <stdio.h>
/* Process a workset, that is handle io. We become activated
* from either select or epoll handler. We split the workload
@@ -628,16 +629,20 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t
int origEntries = numEntries;
DEFiRet;
+#if 0
{ /* chck workset for dupes */
int k, j;
-for(k = 0 ; k < numEntries ; ++k)
+for(k = 0 ; k < numEntries ; ++k) {
+ //printf("work item %d: %p\n", k, workset[k].pUsr);
for(j = k+1 ; j < numEntries ; ++j) {
if(workset[k].pUsr == workset[j].pUsr) {
- fprintf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr);
- fflush(stderr);
+ printf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr);
+ flush(stderr);
}
}
}
+}
+#endif
dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
while(numEntries > 0) {
@@ -663,6 +668,12 @@ dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[n
wrkrInfo[i].pPoll = pPoll;
wrkrInfo[i].idx = workset[numEntries -1].id;
wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
+ /* Note: we must increment wrkrRunning HERE and not inside the worker's
+ * code. This is because a worker may actually never start, and thus
+ * increment wrkrRunning, before we finish and check the running worker
+ * count. We can only avoid this by incrementing it here.
+ */
+ ++wrkrRunning;
pthread_cond_signal(&wrkrInfo[i].run);
pthread_mutex_unlock(&wrkrMut);
} else {
@@ -682,11 +693,8 @@ dbgprintf("XXX: processWorkset 2.2\n");
* by workers running during the epoll call.
*/
pthread_mutex_lock(&wrkrMut);
-dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning);
while(wrkrRunning > 0) {
-dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning);
pthread_cond_wait(&wrkrIdle, &wrkrMut);
-dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning);
}
pthread_mutex_unlock(&wrkrMut);
}