diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-25 13:02:06 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-25 13:02:06 +0200 |
commit | 7a7ec37f99f3dd5120952e6ca6263dd72061abb1 (patch) | |
tree | f2054bf445ed844606172346210f244a67db2cc7 | |
parent | aaef9aa018dc030a7b5b2585bad19812ff214fab (diff) | |
download | rsyslog-7a7ec37f99f3dd5120952e6ca6263dd72061abb1.tar.gz rsyslog-7a7ec37f99f3dd5120952e6ca6263dd72061abb1.tar.xz rsyslog-7a7ec37f99f3dd5120952e6ca6263dd72061abb1.zip |
improved testbench / solved imdiag race condition
imdiag/imtcp had a modload race condition (as imdiag is a testing aid,
this has no implications for production deployments). Also, I replaced
netcat by a custom program to talk to imdiag. This, for the first time ever,
is now a Java program. I plan to add some GUI troubleshooting tools and
thought it is a good idea to start doing things in Java that can simply
be done in that language.
-rw-r--r-- | ChangeLog | 3 | ||||
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | runtime/modules.c | 21 | ||||
-rw-r--r-- | tcps_sess.c | 23 | ||||
-rw-r--r-- | tests/DiagTalker.java | 43 | ||||
-rw-r--r-- | tests/Makefile.am | 6 | ||||
-rwxr-xr-x | tests/diskqueue.sh | 4 | ||||
-rwxr-xr-x | tests/waitqueueempty.sh | 3 |
8 files changed, 91 insertions, 14 deletions
@@ -1,4 +1,7 @@ --------------------------------------------------------------------------- +Version 4.3.? [DEVEL] (rgerhards), 2009-??-?? +- bugfix: imdiag/imtcp had a race condition +--------------------------------------------------------------------------- Version 4.3.1 [DEVEL] (rgerhards), 2009-05-25 - added capability to run multiple tcp listeners (on different ports) - performance enhancement: imtcp calls parser no longer on input thread diff --git a/Makefile.am b/Makefile.am index 8d57700f..e991f323 100644 --- a/Makefile.am +++ b/Makefile.am @@ -131,5 +131,5 @@ SUBDIRS += tests # temporarily be removed below. The intent behind forcing everthing to compile # in a make distcheck is so that we detect code that accidently was not updated # when some global update happened. -DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog --enable-imdiag +DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog --enable-imdiag --enable-shave ACLOCAL_AMFLAGS = -I m4 diff --git a/runtime/modules.c b/runtime/modules.c index 9fdb48e7..3e8662a3 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -40,6 +40,7 @@ #include <time.h> #include <assert.h> #include <errno.h> +#include <pthread.h> #ifdef OS_BSD # include "libgen.h" #endif @@ -61,6 +62,14 @@ DEFobjStaticHelpers DEFobjCurrIf(errmsg) +/* we must ensure that only one thread at one time tries to load or unload + * modules, otherwise we may see race conditions. This first came up with + * imdiag/imtcp, which both use the same stream drivers. Below is the mutex + * for that handling. + * rgerhards, 2009-05-25 + */ +static pthread_mutex_t mutLoadUnload; + static modInfo_t *pLoadedModules = NULL; /* list of currently-loaded modules */ static modInfo_t *pLoadedModulesLast = NULL; /* tail-pointer */ @@ -479,6 +488,8 @@ modUnlinkAndDestroy(modInfo_t **ppThis) pThis = *ppThis; assert(pThis != NULL); + pthread_mutex_lock(&mutLoadUnload); + /* first check if we are permitted to unload */ if(pThis->eType == eMOD_LIB) { if(pThis->uRefCnt > 0) { @@ -513,6 +524,7 @@ modUnlinkAndDestroy(modInfo_t **ppThis) moduleDestruct(pThis); finalize_it: + pthread_mutex_unlock(&mutLoadUnload); RETiRet; } @@ -587,6 +599,8 @@ Load(uchar *pModName) assert(pModName != NULL); dbgprintf("Requested to load module '%s'\n", pModName); + pthread_mutex_lock(&mutLoadUnload); + iModNameLen = strlen((char *) pModName); if(iModNameLen > 3 && !strcmp((char *) pModName + iModNameLen - 3, ".so")) { iModNameLen -= 3; @@ -696,6 +710,7 @@ Load(uchar *pModName) } finalize_it: + pthread_mutex_unlock(&mutLoadUnload); RETiRet; } @@ -791,6 +806,7 @@ BEGINObjClassExit(module, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MA CODESTARTObjClassExit(module) /* release objects we no longer need */ objRelease(errmsg, CORE_COMPONENT); + pthread_mutex_destroy(&mutLoadUnload); # ifdef DEBUG modUsrPrintAll(); /* debug aid - TODO: integrate with debug.c, at least the settings! */ @@ -833,6 +849,7 @@ ENDobjQueryInterface(module) */ BEGINAbstractObjClassInit(module, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE class also in END MACRO! */ uchar *pModPath; + pthread_mutexattr_t mutAttr; /* use any module load path specified in the environment */ if((pModPath = (uchar*) getenv("RSYSLOG_MODDIR")) != NULL) { @@ -850,6 +867,10 @@ BEGINAbstractObjClassInit(module, 1, OBJ_IS_CORE_MODULE) /* class, version - CHA SetModDir(glblModPath); } + pthread_mutexattr_init(&mutAttr); + pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&mutLoadUnload, &mutAttr); + /* request objects we use */ CHKiRet(objUse(errmsg, CORE_COMPONENT)); ENDObjClassInit(module) diff --git a/tcps_sess.c b/tcps_sess.c index c4548804..62d51f66 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -58,7 +58,6 @@ static int iMaxLine; /* maximum size of a single message */ /* forward definitions */ static rsRetVal Close(tcps_sess_t *pThis); -static rsRetVal defaultDoSubmitMessage(tcps_sess_t *pThis, uchar*, int); /* Standard-Constructor */ @@ -66,7 +65,6 @@ BEGINobjConstruct(tcps_sess) /* be sure to specify the object type also in END m pThis->iMsg = 0; /* just make sure... */ pThis->bAtStrtOfFram = 1; /* indicate frame header expected */ pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; /* just make sure... */ - pThis->DoSubmitMessage = defaultDoSubmitMessage; /* now allocate the message reception buffer */ CHKmalloc(pThis->pMsg = (uchar*) malloc(sizeof(uchar) * iMaxLine + 1)); finalize_it: @@ -228,11 +226,8 @@ SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar * rgerhards, 2009-04-23 */ static rsRetVal -defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg) +defaultDoSubmitMessage(tcps_sess_t *pThis) { -// TODO: make calling this overridable so that the diag module can ask to be called -// and so it can do its work right in this entry point (but we need to check that -// we have the capability to send a reply at this point). msg_t *pMsg; struct syslogTime stTime; time_t ttGenTime; @@ -240,6 +235,11 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg) ISOBJ_TYPE_assert(pThis, tcps_sess); + if(pThis->DoSubmitMessage != NULL) { + pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); + FINALIZE; + } + //TODO: if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { datetime.getCurrTime(&stTime, &ttGenTime); //} @@ -247,7 +247,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg) CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); /* first trim the buffer to what we have actually received */ CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg)); - memcpy(pMsg->pszRawMsg, pszMsg, iLenMsg); + memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg); pMsg->iLenRawMsg = pThis->iMsg; MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); @@ -266,6 +266,7 @@ finalize_it: } + /* This should be called before a normal (non forced) close * of a TCP session. This function checks if there is any unprocessed * message left in the TCP stream. Such a message is probably a @@ -305,7 +306,7 @@ PrepareClose(tcps_sess_t *pThis) * this case. */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); - pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); + defaultDoSubmitMessage(pThis); } finalize_it: @@ -386,7 +387,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); + defaultDoSubmitMessage(pThis); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -397,7 +398,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(( (c == '\n') || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); + defaultDoSubmitMessage(pThis); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -414,7 +415,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); + defaultDoSubmitMessage(pThis); pThis->inputState = eAtStrtFram; } } diff --git a/tests/DiagTalker.java b/tests/DiagTalker.java new file mode 100644 index 00000000..e33a5867 --- /dev/null +++ b/tests/DiagTalker.java @@ -0,0 +1,43 @@ +//package com.rsyslog.diag; +import java.io.*; +import java.net.*; + +public class DiagTalker { + public static void main(String[] args) throws IOException { + + Socket diagSocket = null; + PrintWriter out = null; + BufferedReader in = null; + final String host = "127.0.0.1"; + final int port = 13500; + + try { + diagSocket = new Socket(host, port); + out = new PrintWriter(diagSocket.getOutputStream(), true); + in = new BufferedReader(new InputStreamReader( + diagSocket.getInputStream())); + } catch (UnknownHostException e) { + System.err.println("can not resolve " + host + "!"); + System.exit(1); + } catch (IOException e) { + System.err.println("Couldn't get I/O for " + + "the connection to: " + host + "."); + System.exit(1); + } + + BufferedReader stdIn = new BufferedReader( + new InputStreamReader(System.in)); + String userInput; + + while ((userInput = stdIn.readLine()) != null) { + out.println(userInput); + System.out.println("imdiag returns: " + in.readLine()); + } + + out.close(); + in.close(); + stdIn.close(); + diagSocket.close(); + } +} + diff --git a/tests/Makefile.am b/tests/Makefile.am index ed48fce8..caa95c51 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,9 +5,12 @@ if ENABLE_OMSTDOUT TESTS += omod-if-array.sh parsertest.sh inputname.sh fieldtest.sh endif TESTS_ENVIRONMENT = RSYSLOG_MODDIR='$(abs_top_builddir)'/runtime/.libs/ -DISTCLEANFILES=rsyslog.pid +DISTCLEANFILES=rsyslog.pid '$(abs_top_builddir)'/DiagTalker.class test_files = testbench.h runtime-dummy.c +check_JAVA = DiagTalker.java +#dist_java_JAVA = DiagTalker.java + EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ cfg1.cfgtest \ cfg1.testin \ @@ -47,6 +50,7 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ testsuites/1.inputname_imtcp_12516 \ omod-if-array.sh \ waitqueueempty.sh \ + DiagTalker.java \ cfg.sh ourtail_SOURCES = ourtail.c diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh index a91f3414..eabfcf78 100755 --- a/tests/diskqueue.sh +++ b/tests/diskqueue.sh @@ -4,6 +4,10 @@ # memory to disk mode for DA queues. # added 2009-04-17 by Rgerhards # This file is part of the rsyslog project, released under GPLv3 +# uncomment for debugging support: +#set -o xtrace +#export RSYSLOG_DEBUG="debug nostdout" +#export RSYSLOG_DEBUGLOG="tmp" echo testing queue disk-only mode rm -rf test-spool mkdir test-spool diff --git a/tests/waitqueueempty.sh b/tests/waitqueueempty.sh index 2c047588..4825853a 100755 --- a/tests/waitqueueempty.sh +++ b/tests/waitqueueempty.sh @@ -1,4 +1,5 @@ # wait until main message queue is empty. This is currently done in # a separate shell script so that we can change the implementation # at some later point. -- rgerhards, 2009-05-25 -echo WaitMainQueueEmpty | nc 127.0.0.1 13500 +#echo WaitMainQueueEmpty | nc 127.0.0.1 13500 +echo WaitMainQueueEmpty | java -classpath $abs_top_builddir DiagTalker |