summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-08-10 16:18:21 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-08-10 16:18:21 +0200
commitee4aed1713bb968afa6db992f9e2e6c00d6c9350 (patch)
treea29745c01c25dd05ac40d0b188d5cd12b5ade069
parent55256ac96815d6e13fc9df7206d50ef7dcaca4fe (diff)
downloadrsyslog-ee4aed1713bb968afa6db992f9e2e6c00d6c9350.tar.gz
rsyslog-ee4aed1713bb968afa6db992f9e2e6c00d6c9350.tar.xz
rsyslog-ee4aed1713bb968afa6db992f9e2e6c00d6c9350.zip
added tests for imptcp and fixed some problems with it
it now also works reliably in edge-triggered mode
-rw-r--r--plugins/imptcp/imptcp.c59
-rw-r--r--tests/Makefile.am6
-rwxr-xr-xtests/diag.sh2
-rwxr-xr-xtests/imptcp_large.sh14
-rw-r--r--tests/testsuites/imptcp_large.conf16
5 files changed, 78 insertions, 19 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 5aeb0192..80df959c 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -434,6 +434,8 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP)
iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen);
if(iNewSock < 0) {
+ if(errno == EAGAIN || errno == EWOULDBLOCK)
+ ABORT_FINALIZE(RS_RET_NO_MORE_DATA);
ABORT_FINALIZE(RS_RET_ACCEPT_ERR);
}
@@ -747,6 +749,8 @@ addLstn(ptcpsrv_t *pSrv, int sock)
/* add to start of server's listener list */
pLstn->prev = NULL;
pLstn->next = pSrv->pLstn;
+ if(pSrv->pLstn != NULL)
+ pSrv->pLstn->prev = pLstn;
pSrv->pLstn = pLstn;
iRet = addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd);
@@ -777,6 +781,8 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP)
/* add to start of server's listener list */
pSess->prev = NULL;
pSess->next = pSrv->pSess;
+ if(pSrv->pSess != NULL)
+ pSrv->pSess->prev = pSess;
pSrv->pSess = pSess;
iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd);
@@ -801,6 +807,9 @@ closeSess(ptcpsess_t *pSess)
close(sock);
/* finally unlink session from structures */
+//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev);
+//dbgprintf("imptcp: pSess->next %p\n", pSess->next);
+//dbgprintf("imptcp: pSess->prev %p\n", pSess->prev);
if(pSess->next != NULL)
pSess->next->prev = pSess->prev;
if(pSess->prev == NULL) {
@@ -921,11 +930,19 @@ lstnActivity(ptcplstn_t *pLstn)
int newSock;
prop_t *peerName;
prop_t *peerIP;
+ rsRetVal localRet;
+int iac = 0;
DEFiRet;
DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
- CHKiRet(AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP));
- CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP));
+ while(1) {
+ localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP);
+//if(iac++ > 0) fprintf(stderr, "%d accepts in a row!\n", iac);
+ if(localRet == RS_RET_NO_MORE_DATA)
+ break;
+ CHKiRet(localRet);
+ CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP));
+ }
finalize_it:
RETiRet;
@@ -940,24 +957,32 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
- char rcvBuf[4096];
+ char rcvBuf[128*1024];
DEFiRet;
+int iac = 0;
DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock);
- lenBuf = sizeof(rcvBuf);
- lenRcv = recv(pSess->sock, rcvBuf, lenBuf, 0);
-
- if(lenRcv > 0) {
- /* have data, process it */
- DBGPRINTF("imtcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf);
- CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv));
- } else if (lenRcv == 0) {
- /* session was closed, do clean-up */
- CHKiRet(closeSess(pSess));
- } else {
- DBGPRINTF("imtcp: error on session socket %d - closed.\n", pSess->sock);
- closeSess(pSess); /* try clean-up by dropping session */
+ while(1) {
+ lenBuf = sizeof(rcvBuf);
+ lenRcv = recv(pSess->sock, rcvBuf, lenBuf, 0);
+//if(iac++ > 1) fprintf(stderr, "\n%d recv in a row!\n", iac-1);
+
+ if(lenRcv > 0) {
+ /* have data, process it */
+ DBGPRINTF("imtcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf);
+ CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv));
+ } else if (lenRcv == 0) {
+ /* session was closed, do clean-up */
+ CHKiRet(closeSess(pSess));
+ break;
+ } else {
+ if(errno == EAGAIN || errno == EWOULDBLOCK)
+ break;
+ DBGPRINTF("imtcp: error on session socket %d - closed.\n", pSess->sock);
+ closeSess(pSess); /* try clean-up by dropping session */
+ break;
+ }
}
finalize_it:
@@ -1127,8 +1152,6 @@ CODEmodInit_QueryRegCFSLineHdlr
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0,
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID));
- //CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverstreamdriverpermittedpeer"), 0,
- //eCmdHdlrGetWord, setPermittedPeer, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0,
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 0045f00a..b9b1ede7 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -7,6 +7,8 @@ TESTS = $(TESTRUNS) cfg.sh \
diskqueue.sh \
diskqueue-fsync.sh \
manytcp.sh \
+ manyptcp.sh \
+ imptcp_large.sh \
sndrcv.sh \
sndrcv_gzip.sh \
asynwr_simple.sh \
@@ -136,6 +138,10 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
testsuites/imtcp-multiport.conf \
manytcp.sh \
testsuites/manytcp.conf \
+ manyptcp.sh \
+ testsuites/manyptcp.conf \
+ imptcp_large.sh \
+ testsuites/imptcp_large.conf \
inputname.sh \
testsuites/inputname_imtcp.conf \
testsuites/1.inputname_imtcp_12514 \
diff --git a/tests/diag.sh b/tests/diag.sh
index 51ad5f6a..8659aa17 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -10,7 +10,7 @@
#valgrind="valgrind --tool=helgrind --log-fd=1"
#valgrind="valgrind --tool=exp-ptrcheck --log-fd=1"
#set -o xtrace
-#export RSYSLOG_DEBUG="debug nostdout printmutexaction"
+#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
#export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
diff --git a/tests/imptcp_large.sh b/tests/imptcp_large.sh
new file mode 100755
index 00000000..f6eee895
--- /dev/null
+++ b/tests/imptcp_large.sh
@@ -0,0 +1,14 @@
+# Test imptcp with large messages
+# added 2010-08-10 by Rgerhards
+#
+# This file is part of the rsyslog project, released under GPLv3
+cat rsyslog.action.1.include
+source $srcdir/diag.sh init
+source $srcdir/diag.sh startup imptcp_large.conf
+# send 4000 messages of 10.000bytes plus header max, randomized
+source $srcdir/diag.sh tcpflood -c5 -m20000 -r -d10000 -P129
+sleep 2 # due to large messages, we need this time for the tcp receiver to settle...
+source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
+source $srcdir/diag.sh wait-shutdown # and wait for it to terminate
+source $srcdir/diag.sh seq-check 0 19999 -E
+source $srcdir/diag.sh exit
diff --git a/tests/testsuites/imptcp_large.conf b/tests/testsuites/imptcp_large.conf
new file mode 100644
index 00000000..677e33f6
--- /dev/null
+++ b/tests/testsuites/imptcp_large.conf
@@ -0,0 +1,16 @@
+# simple async writing test
+# rgerhards, 2010-03-09
+$MaxMessageSize 10k
+$IncludeConfig diag-common.conf
+
+$ModLoad ../plugins/imptcp/.libs/imptcp
+$MainMsgQueueTimeoutShutdown 10000
+$InputPTCPServerRun 13514
+
+$template outfmt,"%msg:F,58:2%,%msg:F,58:3%,%msg:F,58:4%\n"
+$template dynfile,"rsyslog.out.log" # trick to use relative path names!
+$OMFileFlushOnTXEnd off
+$OMFileFlushInterval 2
+$OMFileIOBufferSize 256k
+$IncludeConfig rsyslog.action.1.include
+local0.* ?dynfile;outfmt