summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-08-10 16:18:21 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-08-10 16:18:21 +0200
commitee4aed1713bb968afa6db992f9e2e6c00d6c9350 (patch)
treea29745c01c25dd05ac40d0b188d5cd12b5ade069 /plugins
parent55256ac96815d6e13fc9df7206d50ef7dcaca4fe (diff)
downloadrsyslog-ee4aed1713bb968afa6db992f9e2e6c00d6c9350.tar.gz
rsyslog-ee4aed1713bb968afa6db992f9e2e6c00d6c9350.tar.xz
rsyslog-ee4aed1713bb968afa6db992f9e2e6c00d6c9350.zip
added tests for imptcp and fixed some problems with it
it now also works reliably in edge-triggered mode
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imptcp/imptcp.c59
1 files changed, 41 insertions, 18 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 5aeb0192..80df959c 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -434,6 +434,8 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP)
iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen);
if(iNewSock < 0) {
+ if(errno == EAGAIN || errno == EWOULDBLOCK)
+ ABORT_FINALIZE(RS_RET_NO_MORE_DATA);
ABORT_FINALIZE(RS_RET_ACCEPT_ERR);
}
@@ -747,6 +749,8 @@ addLstn(ptcpsrv_t *pSrv, int sock)
/* add to start of server's listener list */
pLstn->prev = NULL;
pLstn->next = pSrv->pLstn;
+ if(pSrv->pLstn != NULL)
+ pSrv->pLstn->prev = pLstn;
pSrv->pLstn = pLstn;
iRet = addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd);
@@ -777,6 +781,8 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP)
/* add to start of server's listener list */
pSess->prev = NULL;
pSess->next = pSrv->pSess;
+ if(pSrv->pSess != NULL)
+ pSrv->pSess->prev = pSess;
pSrv->pSess = pSess;
iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd);
@@ -801,6 +807,9 @@ closeSess(ptcpsess_t *pSess)
close(sock);
/* finally unlink session from structures */
+//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev);
+//dbgprintf("imptcp: pSess->next %p\n", pSess->next);
+//dbgprintf("imptcp: pSess->prev %p\n", pSess->prev);
if(pSess->next != NULL)
pSess->next->prev = pSess->prev;
if(pSess->prev == NULL) {
@@ -921,11 +930,19 @@ lstnActivity(ptcplstn_t *pLstn)
int newSock;
prop_t *peerName;
prop_t *peerIP;
+ rsRetVal localRet;
+int iac = 0;
DEFiRet;
DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
- CHKiRet(AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP));
- CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP));
+ while(1) {
+ localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP);
+//if(iac++ > 0) fprintf(stderr, "%d accepts in a row!\n", iac);
+ if(localRet == RS_RET_NO_MORE_DATA)
+ break;
+ CHKiRet(localRet);
+ CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP));
+ }
finalize_it:
RETiRet;
@@ -940,24 +957,32 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
- char rcvBuf[4096];
+ char rcvBuf[128*1024];
DEFiRet;
+int iac = 0;
DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock);
- lenBuf = sizeof(rcvBuf);
- lenRcv = recv(pSess->sock, rcvBuf, lenBuf, 0);
-
- if(lenRcv > 0) {
- /* have data, process it */
- DBGPRINTF("imtcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf);
- CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv));
- } else if (lenRcv == 0) {
- /* session was closed, do clean-up */
- CHKiRet(closeSess(pSess));
- } else {
- DBGPRINTF("imtcp: error on session socket %d - closed.\n", pSess->sock);
- closeSess(pSess); /* try clean-up by dropping session */
+ while(1) {
+ lenBuf = sizeof(rcvBuf);
+ lenRcv = recv(pSess->sock, rcvBuf, lenBuf, 0);
+//if(iac++ > 1) fprintf(stderr, "\n%d recv in a row!\n", iac-1);
+
+ if(lenRcv > 0) {
+ /* have data, process it */
+ DBGPRINTF("imtcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf);
+ CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv));
+ } else if (lenRcv == 0) {
+ /* session was closed, do clean-up */
+ CHKiRet(closeSess(pSess));
+ break;
+ } else {
+ if(errno == EAGAIN || errno == EWOULDBLOCK)
+ break;
+ DBGPRINTF("imtcp: error on session socket %d - closed.\n", pSess->sock);
+ closeSess(pSess); /* try clean-up by dropping session */
+ break;
+ }
}
finalize_it:
@@ -1127,8 +1152,6 @@ CODEmodInit_QueryRegCFSLineHdlr
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0,
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID));
- //CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverstreamdriverpermittedpeer"), 0,
- //eCmdHdlrGetWord, setPermittedPeer, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0,