summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/netstrm.c2
-rw-r--r--runtime/netstrms.c1
-rw-r--r--runtime/nsdpoll_ptcp.c8
-rw-r--r--runtime/nsdpoll_ptcp.h1
-rw-r--r--tcps_sess.c3
-rw-r--r--tcpsrv.c30
6 files changed, 33 insertions, 12 deletions
diff --git a/runtime/netstrm.c b/runtime/netstrm.c
index 3658006f..a6f840a5 100644
--- a/runtime/netstrm.c
+++ b/runtime/netstrm.c
@@ -64,6 +64,7 @@ ENDobjConstruct(netstrm)
/* destructor for the netstrm object */
BEGINobjDestruct(netstrm) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(netstrm)
+//printf("destruct driver data %p\n", pThis->pDrvrData);
if(pThis->pDrvrData != NULL)
iRet = pThis->Drvr.Destruct(&pThis->pDrvrData);
ENDobjDestruct(netstrm)
@@ -169,6 +170,7 @@ Rcv(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrm);
+//printf("Rcv %p\n", pThis);
iRet = pThis->Drvr.Rcv(pThis->pDrvrData, pBuf, pLenBuf);
RETiRet;
}
diff --git a/runtime/netstrms.c b/runtime/netstrms.c
index e9ff2568..56e492fe 100644
--- a/runtime/netstrms.c
+++ b/runtime/netstrms.c
@@ -32,7 +32,6 @@
#include "rsyslog.h"
#include "module-template.h"
#include "obj.h"
-//#include "errmsg.h"
#include "nsd.h"
#include "netstrm.h"
#include "nssel.h"
diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c
index b6002b09..6fd92df1 100644
--- a/runtime/nsdpoll_ptcp.c
+++ b/runtime/nsdpoll_ptcp.c
@@ -77,8 +77,10 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock,
if(mode & NSDPOLL_OUT)
pNew->event.events |= EPOLLOUT;
pNew->event.data.u64 = (uint64) pNew;
+ pthread_mutex_lock(&pThis->mutEvtLst);
pNew->pNext = pThis->pRoot;
pThis->pRoot = pNew;
+ pthread_mutex_unlock(&pThis->mutEvtLst);
*pEvtLst = pNew;
finalize_it:
@@ -95,6 +97,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **
nsdpoll_epollevt_lst_t *pPrev = NULL;
DEFiRet;
+ pthread_mutex_lock(&pThis->mutEvtLst);
pEvtLst = pThis->pRoot;
while(pEvtLst != NULL && !(pEvtLst->id == id && pEvtLst->pUsr == pUsr)) {
pPrev = pEvtLst;
@@ -112,6 +115,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **
pPrev->pNext = pEvtLst->pNext;
finalize_it:
+ pthread_mutex_unlock(&pThis->mutEvtLst);
RETiRet;
}
@@ -145,6 +149,7 @@ BEGINobjConstruct(nsdpoll_ptcp) /* be sure to specify the object type also in EN
DBGPRINTF("epoll_create1() could not create fd\n");
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
+ pthread_mutex_init(&pThis->mutEvtLst, NULL);
finalize_it:
ENDobjConstruct(nsdpoll_ptcp)
@@ -152,6 +157,9 @@ ENDobjConstruct(nsdpoll_ptcp)
/* destructor for the nsdpoll_ptcp object */
BEGINobjDestruct(nsdpoll_ptcp) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(nsdpoll_ptcp)
+ //printf("ndspoll_ptcp destruct, event list root is %p\n", pThis->pRoot);
+#warning cleanup event list is missing! (at least I think so)
+ pthread_mutex_destroy(&pThis->mutEvtLst);
ENDobjDestruct(nsdpoll_ptcp)
diff --git a/runtime/nsdpoll_ptcp.h b/runtime/nsdpoll_ptcp.h
index cea2823d..dfefad1b 100644
--- a/runtime/nsdpoll_ptcp.h
+++ b/runtime/nsdpoll_ptcp.h
@@ -49,6 +49,7 @@ struct nsdpoll_ptcp_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
int efd; /* file descriptor used by epoll */
nsdpoll_epollevt_lst_t *pRoot; /* Root of the epoll event list */
+ pthread_mutex_t mutEvtLst;
};
/* interface is defined in nsd.h, we just implement it! */
diff --git a/tcps_sess.c b/tcps_sess.c
index 99af0cb8..8b944885 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -95,6 +95,7 @@ finalize_it:
/* destructor for the tcps_sess object */
BEGINobjDestruct(tcps_sess) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(tcps_sess)
+//printf("sess %p destruct, pStrm %p\n", pThis, pThis->pStrm);
if(pThis->pStrm != NULL)
netstrm.Destruct(&pThis->pStrm);
@@ -337,6 +338,7 @@ Close(tcps_sess_t *pThis)
{
DEFiRet;
+//printf("sess %p close\n", pThis);
ISOBJ_TYPE_assert(pThis, tcps_sess);
netstrm.Destruct(&pThis->pStrm);
if(pThis->fromHost != NULL) {
@@ -466,6 +468,7 @@ DataRcvd(tcps_sess_t *pThis, char *pData, size_t iLen)
char *pEnd;
DEFiRet;
+//printf("DataRcvd: %p\n", pThis);
ISOBJ_TYPE_assert(pThis, tcps_sess);
assert(pData != NULL);
assert(iLen > 0);
diff --git a/tcpsrv.c b/tcpsrv.c
index da5182e1..23b2f630 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -71,6 +71,7 @@
#include "ruleset.h"
#include "unicode-helper.h"
+
MODULE_TYPE_LIB
/* defines */
@@ -514,6 +515,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
rsRetVal localRet;
DEFiRet;
+//printf("doReceive %p/%p\n", pThis, *ppSess);
ISOBJ_TYPE_assert(pThis, tcpsrv);
DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm);
/* Receive message */
@@ -528,7 +530,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
(*ppSess)->pStrm, pszPeer);
}
+ //pthread_mutex_lock(&mut);
CHKiRet(closeSess(pThis, ppSess, pPoll));
+ //pthread_mutex_unlock(&mut);
break;
case RS_RET_RETRY:
/* we simply ignore retry - this is not an error, but we also have not received anything */
@@ -567,11 +571,13 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr);
if(pUsr == pThis->ppLstn) {
+//printf("work item %p: connect\n", pUsr);
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
DBGPRINTF("New session created with NSD %p.\n", pNewSess);
} else {
+//printf("work item %p: receive\n", pUsr);
pNewSess = (tcps_sess_t*) pUsr;
doReceive(pThis, &pNewSess, pPoll);
}
@@ -595,10 +601,8 @@ wrkr(void *myself)
}
if(glbl.GetGlobalInputTermState() == 1)
break;
- ++wrkrRunning;
pthread_mutex_unlock(&wrkrMut);
-dbgprintf("XXX: worker %p activated\n", pthread_self());
++me->numCalled;
processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
@@ -606,15 +610,12 @@ dbgprintf("XXX: worker %p activated\n", pthread_self());
me->pSrv = NULL; /* indicate we are free again */
--wrkrRunning;
pthread_cond_signal(&wrkrIdle);
-dbgprintf("XXX: worker %p idling\n", pthread_self());
}
pthread_mutex_unlock(&wrkrMut);
return NULL;
}
-#warning remove include
-#include <stdio.h>
/* Process a workset, that is handle io. We become activated
* from either select or epoll handler. We split the workload
@@ -628,16 +629,20 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t
int origEntries = numEntries;
DEFiRet;
+#if 0
{ /* chck workset for dupes */
int k, j;
-for(k = 0 ; k < numEntries ; ++k)
+for(k = 0 ; k < numEntries ; ++k) {
+ //printf("work item %d: %p\n", k, workset[k].pUsr);
for(j = k+1 ; j < numEntries ; ++j) {
if(workset[k].pUsr == workset[j].pUsr) {
- fprintf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr);
- fflush(stderr);
+ printf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr);
+ flush(stderr);
}
}
}
+}
+#endif
dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries);
while(numEntries > 0) {
@@ -663,6 +668,12 @@ dbgprintf("XXX: processWorkset 2.1, pUsr=%p, wrkrRunnig %d, max %d\n", workset[n
wrkrInfo[i].pPoll = pPoll;
wrkrInfo[i].idx = workset[numEntries -1].id;
wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
+ /* Note: we must increment wrkrRunning HERE and not inside the worker's
+ * code. This is because a worker may actually never start, and thus
+ * increment wrkrRunning, before we finish and check the running worker
+ * count. We can only avoid this by incrementing it here.
+ */
+ ++wrkrRunning;
pthread_cond_signal(&wrkrInfo[i].run);
pthread_mutex_unlock(&wrkrMut);
} else {
@@ -682,11 +693,8 @@ dbgprintf("XXX: processWorkset 2.2\n");
* by workers running during the epoll call.
*/
pthread_mutex_lock(&wrkrMut);
-dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning);
while(wrkrRunning > 0) {
-dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning);
pthread_cond_wait(&wrkrIdle, &wrkrMut);
-dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning);
}
pthread_mutex_unlock(&wrkrMut);
}