summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/imdiag/imdiag.c5
-rw-r--r--runtime/nsd_ptcp.c5
-rw-r--r--runtime/nsdpoll_ptcp.c15
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--tcpsrv.c33
5 files changed, 41 insertions, 18 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 2f7e5fee..81b357ef 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -270,6 +270,11 @@ waitMainQEmpty(tcps_sess_t *pSess)
dbgprintf("imdiag sleeping, wait mainq drain, curr size %d\n", iMsgQueueSize);
srSleep(0,2); /* wait a little bit */
CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
+ if(iMsgQueueSize == 0) {
+ /* verify that queue is still empty (else it could just be a race!) */
+ srSleep(1,5); /* wait a little bit */
+ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
+ }
}
CHKiRet(sendResponse(pSess, "mainqueue empty\n"));
diff --git a/runtime/nsd_ptcp.c b/runtime/nsd_ptcp.c
index 6feee71d..96881097 100644
--- a/runtime/nsd_ptcp.c
+++ b/runtime/nsd_ptcp.c
@@ -562,6 +562,7 @@ finalize_it:
static rsRetVal
Rcv(nsd_t *pNsd, uchar *pRcvBuf, ssize_t *pLenBuf)
{
+ char errStr[1024];
DEFiRet;
nsd_ptcp_t *pThis = (nsd_ptcp_t*) pNsd;
ISOBJ_TYPE_assert(pThis, nsd_ptcp);
@@ -571,7 +572,9 @@ Rcv(nsd_t *pNsd, uchar *pRcvBuf, ssize_t *pLenBuf)
if(*pLenBuf == 0) {
ABORT_FINALIZE(RS_RET_CLOSED);
} else if (*pLenBuf < 0) {
- ABORT_FINALIZE(RS_RET_ERR);
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ dbgprintf("error during recv on NSD %p: %s\n", pNsd, errStr);
+ ABORT_FINALIZE(RS_RET_RCV_ERR);
}
finalize_it:
diff --git a/runtime/nsdpoll_ptcp.c b/runtime/nsdpoll_ptcp.c
index 95fd8aa3..445ecc71 100644
--- a/runtime/nsdpoll_ptcp.c
+++ b/runtime/nsdpoll_ptcp.c
@@ -52,6 +52,12 @@ DEFobjCurrIf(glbl)
/* add new entry to list. We assume that the fd is not already present and DO NOT check this!
* Returns newly created entry in pEvtLst.
+ * Note that we currently need to use level-triggered mode, because the upper layers do not work
+ * in parallel. As such, in edge-triggered mode we may not get notified, because new data comes
+ * in after we have read everything that was present. To use ET mode, we need to change the upper
+ * peers so that they immediately start a new wait before processing the data read. That obviously
+ * requires more elaborate redesign and we postpone this until the current more simplictic mode has
+ * been proven OK in practice.
* rgerhards, 2009-11-18
*/
static inline rsRetVal
@@ -63,7 +69,7 @@ addEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, int mode, nsd_ptcp_t *pSock,
pNew->id = id;
pNew->pUsr = pUsr;
pNew->pSock = pSock;
- pNew->event.events = EPOLLET; /* always use edge-triggered mode */
+ pNew->event.events = 0; /* TODO: at some time we should be able to use EPOLLET */
if(mode & NSDPOLL_IN)
pNew->event.events |= EPOLLIN;
if(mode & NSDPOLL_OUT)
@@ -88,7 +94,7 @@ unlinkEvent(nsdpoll_ptcp_t *pThis, int id, void *pUsr, nsdpoll_epollevt_lst_t **
DEFiRet;
pEvtLst = pThis->pRoot;
- while(pEvtLst != NULL && pEvtLst->id != id && pEvtLst->pUsr != pUsr) {
+ while(pEvtLst != NULL && !(pEvtLst->id == id && pEvtLst->pUsr == pUsr)) {
pPrev = pEvtLst;
pEvtLst = pEvtLst->pNext;
}
@@ -158,7 +164,7 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) {
DEFiRet;
if(op == NSDPOLL_ADD) {
- dbgprintf("adding nsdpoll entry %d/%p\n", id, pUsr);
+ dbgprintf("adding nsdpoll entry %d/%p, sock %d\n", id, pUsr, pSock->sock);
CHKiRet(addEvent(pThis, id, pUsr, mode, pSock, &pEventLst));
if(epoll_ctl(pThis->efd, EPOLL_CTL_ADD, pSock->sock, &pEventLst->event) < 0) {
errSave = errno;
@@ -168,8 +174,7 @@ Ctl(nsdpoll_t *pNsdpoll, nsd_t *pNsd, int id, void *pUsr, int mode, int op) {
pSock->sock, id, pUsr, mode, errStr);
}
} else if(op == NSDPOLL_DEL) {
- // TODO: XXX : code missing! del Event
- dbgprintf("removing nsdpoll entry %d/%p\n", id, pUsr);
+ dbgprintf("removing nsdpoll entry %d/%p, sock %d\n", id, pUsr, pSock->sock);
CHKiRet(unlinkEvent(pThis, id, pUsr, &pEventLst));
if(epoll_ctl(pThis->efd, EPOLL_CTL_DEL, pSock->sock, &pEventLst->event) < 0) {
errSave = errno;
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index b8ebafd9..1431c684 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -418,6 +418,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_ERR_EPOLL = -2162, /**< epoll() returned with an unexpected error code */
RS_RET_ERR_EPOLL_CTL = -2163, /**< epol_ctll() returned with an unexpected error code */
RS_RET_TIMEOUT = -2164, /**< timeout occured during operation */
+ RS_RET_RCV_ERR = -2165, /**< error occured during socket rcv operation */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/tcpsrv.c b/tcpsrv.c
index 9d51d83b..d2ab16f2 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -467,6 +467,22 @@ RunCancelCleanup(void *arg)
}
+/* helper to close a session. Takes status of poll vs. select into consideration.
+ * rgerhards, 2009-11-25
+ */
+static inline rsRetVal
+closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) {
+ DEFiRet;
+ if(pPoll != NULL) {
+ CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
+ }
+ pThis->pOnRegularClose(*ppSess);
+ tcps_sess.Destruct(ppSess);
+finalize_it:
+ RETiRet;
+}
+
+
/* process a receive request on one of the streams
* If pPoll is non-NULL, we have a netstream in epoll mode, which means we need
* to remove any descriptor we close from the epoll set.
@@ -482,7 +498,6 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
ISOBJ_TYPE_assert(pThis, tcpsrv);
DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm);
-
/* Receive message */
iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd);
switch(iRet) {
@@ -495,11 +510,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
(*ppSess)->pStrm, pszPeer);
}
- if(pPoll != NULL) {
- CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
- }
- pThis->pOnRegularClose(*ppSess);
- tcps_sess.Destruct(ppSess);
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
break;
case RS_RET_RETRY:
/* we simply ignore retry - this is not an error, but we also have not received anything */
@@ -512,16 +523,14 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
*/
errmsg.LogError(0, localRet, "Tearing down TCP Session - see "
"previous messages for reason(s)\n");
- pThis->pOnErrClose(*ppSess);
- tcps_sess.Destruct(ppSess);
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
}
break;
default:
errno = 0;
errmsg.LogError(0, iRet, "netstream session %p will be closed due to error\n",
(*ppSess)->pStrm);
- pThis->pOnErrClose(*ppSess);
- tcps_sess.Destruct(ppSess);
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
break;
}
@@ -650,14 +659,14 @@ Run(tcpsrv_t *pThis)
FINALIZE;
}
- dbgprintf("we would use the poll handler, currently not implemented!\n");
+ dbgprintf("tcpsrv uses epoll() interface, nsdpol driver found\n");
/* flag that we are in epoll mode */
pThis->bUsingEPoll = TRUE;
/* Add the TCP listen sockets to the list of sockets to monitor */
for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
- dbgprintf("Trying to add listener %d\n", i);
+ dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn);
CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD));
dbgprintf("Added listener %d\n", i);
}