summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-25 13:02:06 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-25 13:02:06 +0200
commit7a7ec37f99f3dd5120952e6ca6263dd72061abb1 (patch)
treef2054bf445ed844606172346210f244a67db2cc7
parentaaef9aa018dc030a7b5b2585bad19812ff214fab (diff)
downloadrsyslog-7a7ec37f99f3dd5120952e6ca6263dd72061abb1.tar.gz
rsyslog-7a7ec37f99f3dd5120952e6ca6263dd72061abb1.tar.xz
rsyslog-7a7ec37f99f3dd5120952e6ca6263dd72061abb1.zip
improved testbench / solved imdiag race condition
imdiag/imtcp had a modload race condition (as imdiag is a testing aid, this has no implications for production deployments). Also, I replaced netcat by a custom program to talk to imdiag. This, for the first time ever, is now a Java program. I plan to add some GUI troubleshooting tools and thought it is a good idea to start doing things in Java that can simply be done in that language.
-rw-r--r--ChangeLog3
-rw-r--r--Makefile.am2
-rw-r--r--runtime/modules.c21
-rw-r--r--tcps_sess.c23
-rw-r--r--tests/DiagTalker.java43
-rw-r--r--tests/Makefile.am6
-rwxr-xr-xtests/diskqueue.sh4
-rwxr-xr-xtests/waitqueueempty.sh3
8 files changed, 91 insertions, 14 deletions
diff --git a/ChangeLog b/ChangeLog
index 9d57f5e1..2fc760c3 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,7 @@
---------------------------------------------------------------------------
+Version 4.3.? [DEVEL] (rgerhards), 2009-??-??
+- bugfix: imdiag/imtcp had a race condition
+---------------------------------------------------------------------------
Version 4.3.1 [DEVEL] (rgerhards), 2009-05-25
- added capability to run multiple tcp listeners (on different ports)
- performance enhancement: imtcp calls parser no longer on input thread
diff --git a/Makefile.am b/Makefile.am
index 8d57700f..e991f323 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -131,5 +131,5 @@ SUBDIRS += tests
# temporarily be removed below. The intent behind forcing everthing to compile
# in a make distcheck is so that we detect code that accidently was not updated
# when some global update happened.
-DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog --enable-imdiag
+DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog --enable-imdiag --enable-shave
ACLOCAL_AMFLAGS = -I m4
diff --git a/runtime/modules.c b/runtime/modules.c
index 9fdb48e7..3e8662a3 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -40,6 +40,7 @@
#include <time.h>
#include <assert.h>
#include <errno.h>
+#include <pthread.h>
#ifdef OS_BSD
# include "libgen.h"
#endif
@@ -61,6 +62,14 @@
DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
+/* we must ensure that only one thread at one time tries to load or unload
+ * modules, otherwise we may see race conditions. This first came up with
+ * imdiag/imtcp, which both use the same stream drivers. Below is the mutex
+ * for that handling.
+ * rgerhards, 2009-05-25
+ */
+static pthread_mutex_t mutLoadUnload;
+
static modInfo_t *pLoadedModules = NULL; /* list of currently-loaded modules */
static modInfo_t *pLoadedModulesLast = NULL; /* tail-pointer */
@@ -479,6 +488,8 @@ modUnlinkAndDestroy(modInfo_t **ppThis)
pThis = *ppThis;
assert(pThis != NULL);
+ pthread_mutex_lock(&mutLoadUnload);
+
/* first check if we are permitted to unload */
if(pThis->eType == eMOD_LIB) {
if(pThis->uRefCnt > 0) {
@@ -513,6 +524,7 @@ modUnlinkAndDestroy(modInfo_t **ppThis)
moduleDestruct(pThis);
finalize_it:
+ pthread_mutex_unlock(&mutLoadUnload);
RETiRet;
}
@@ -587,6 +599,8 @@ Load(uchar *pModName)
assert(pModName != NULL);
dbgprintf("Requested to load module '%s'\n", pModName);
+ pthread_mutex_lock(&mutLoadUnload);
+
iModNameLen = strlen((char *) pModName);
if(iModNameLen > 3 && !strcmp((char *) pModName + iModNameLen - 3, ".so")) {
iModNameLen -= 3;
@@ -696,6 +710,7 @@ Load(uchar *pModName)
}
finalize_it:
+ pthread_mutex_unlock(&mutLoadUnload);
RETiRet;
}
@@ -791,6 +806,7 @@ BEGINObjClassExit(module, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MA
CODESTARTObjClassExit(module)
/* release objects we no longer need */
objRelease(errmsg, CORE_COMPONENT);
+ pthread_mutex_destroy(&mutLoadUnload);
# ifdef DEBUG
modUsrPrintAll(); /* debug aid - TODO: integrate with debug.c, at least the settings! */
@@ -833,6 +849,7 @@ ENDobjQueryInterface(module)
*/
BEGINAbstractObjClassInit(module, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE class also in END MACRO! */
uchar *pModPath;
+ pthread_mutexattr_t mutAttr;
/* use any module load path specified in the environment */
if((pModPath = (uchar*) getenv("RSYSLOG_MODDIR")) != NULL) {
@@ -850,6 +867,10 @@ BEGINAbstractObjClassInit(module, 1, OBJ_IS_CORE_MODULE) /* class, version - CHA
SetModDir(glblModPath);
}
+ pthread_mutexattr_init(&mutAttr);
+ pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE);
+ pthread_mutex_init(&mutLoadUnload, &mutAttr);
+
/* request objects we use */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
ENDObjClassInit(module)
diff --git a/tcps_sess.c b/tcps_sess.c
index c4548804..62d51f66 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -58,7 +58,6 @@ static int iMaxLine; /* maximum size of a single message */
/* forward definitions */
static rsRetVal Close(tcps_sess_t *pThis);
-static rsRetVal defaultDoSubmitMessage(tcps_sess_t *pThis, uchar*, int);
/* Standard-Constructor */
@@ -66,7 +65,6 @@ BEGINobjConstruct(tcps_sess) /* be sure to specify the object type also in END m
pThis->iMsg = 0; /* just make sure... */
pThis->bAtStrtOfFram = 1; /* indicate frame header expected */
pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; /* just make sure... */
- pThis->DoSubmitMessage = defaultDoSubmitMessage;
/* now allocate the message reception buffer */
CHKmalloc(pThis->pMsg = (uchar*) malloc(sizeof(uchar) * iMaxLine + 1));
finalize_it:
@@ -228,11 +226,8 @@ SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar
* rgerhards, 2009-04-23
*/
static rsRetVal
-defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg)
+defaultDoSubmitMessage(tcps_sess_t *pThis)
{
-// TODO: make calling this overridable so that the diag module can ask to be called
-// and so it can do its work right in this entry point (but we need to check that
-// we have the capability to send a reply at this point).
msg_t *pMsg;
struct syslogTime stTime;
time_t ttGenTime;
@@ -240,6 +235,11 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg)
ISOBJ_TYPE_assert(pThis, tcps_sess);
+ if(pThis->DoSubmitMessage != NULL) {
+ pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
+ FINALIZE;
+ }
+
//TODO: if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
datetime.getCurrTime(&stTime, &ttGenTime);
//}
@@ -247,7 +247,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg)
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
/* first trim the buffer to what we have actually received */
CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg));
- memcpy(pMsg->pszRawMsg, pszMsg, iLenMsg);
+ memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg);
pMsg->iLenRawMsg = pThis->iMsg;
MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName);
MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
@@ -266,6 +266,7 @@ finalize_it:
}
+
/* This should be called before a normal (non forced) close
* of a TCP session. This function checks if there is any unprocessed
* message left in the TCP stream. Such a message is probably a
@@ -305,7 +306,7 @@ PrepareClose(tcps_sess_t *pThis)
* this case.
*/
dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n");
- pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
+ defaultDoSubmitMessage(pThis);
}
finalize_it:
@@ -386,7 +387,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(pThis->iMsg >= iMaxLine) {
/* emergency, we now need to flush, no matter if we are at end of message or not... */
dbgprintf("error: message received is larger than max msg size, we split it\n");
- pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
+ defaultDoSubmitMessage(pThis);
/* we might think if it is better to ignore the rest of the
* message than to treat it as a new one. Maybe this is a good
* candidate for a configuration parameter...
@@ -397,7 +398,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(( (c == '\n')
|| ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim))
) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */
- pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
+ defaultDoSubmitMessage(pThis);
pThis->inputState = eAtStrtFram;
} else {
/* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes!
@@ -414,7 +415,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
pThis->iOctetsRemain--;
if(pThis->iOctetsRemain < 1) {
/* we have end of frame! */
- pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
+ defaultDoSubmitMessage(pThis);
pThis->inputState = eAtStrtFram;
}
}
diff --git a/tests/DiagTalker.java b/tests/DiagTalker.java
new file mode 100644
index 00000000..e33a5867
--- /dev/null
+++ b/tests/DiagTalker.java
@@ -0,0 +1,43 @@
+//package com.rsyslog.diag;
+import java.io.*;
+import java.net.*;
+
+public class DiagTalker {
+ public static void main(String[] args) throws IOException {
+
+ Socket diagSocket = null;
+ PrintWriter out = null;
+ BufferedReader in = null;
+ final String host = "127.0.0.1";
+ final int port = 13500;
+
+ try {
+ diagSocket = new Socket(host, port);
+ out = new PrintWriter(diagSocket.getOutputStream(), true);
+ in = new BufferedReader(new InputStreamReader(
+ diagSocket.getInputStream()));
+ } catch (UnknownHostException e) {
+ System.err.println("can not resolve " + host + "!");
+ System.exit(1);
+ } catch (IOException e) {
+ System.err.println("Couldn't get I/O for "
+ + "the connection to: " + host + ".");
+ System.exit(1);
+ }
+
+ BufferedReader stdIn = new BufferedReader(
+ new InputStreamReader(System.in));
+ String userInput;
+
+ while ((userInput = stdIn.readLine()) != null) {
+ out.println(userInput);
+ System.out.println("imdiag returns: " + in.readLine());
+ }
+
+ out.close();
+ in.close();
+ stdIn.close();
+ diagSocket.close();
+ }
+}
+
diff --git a/tests/Makefile.am b/tests/Makefile.am
index ed48fce8..caa95c51 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -5,9 +5,12 @@ if ENABLE_OMSTDOUT
TESTS += omod-if-array.sh parsertest.sh inputname.sh fieldtest.sh
endif
TESTS_ENVIRONMENT = RSYSLOG_MODDIR='$(abs_top_builddir)'/runtime/.libs/
-DISTCLEANFILES=rsyslog.pid
+DISTCLEANFILES=rsyslog.pid '$(abs_top_builddir)'/DiagTalker.class
test_files = testbench.h runtime-dummy.c
+check_JAVA = DiagTalker.java
+#dist_java_JAVA = DiagTalker.java
+
EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
cfg1.cfgtest \
cfg1.testin \
@@ -47,6 +50,7 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
testsuites/1.inputname_imtcp_12516 \
omod-if-array.sh \
waitqueueempty.sh \
+ DiagTalker.java \
cfg.sh
ourtail_SOURCES = ourtail.c
diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh
index a91f3414..eabfcf78 100755
--- a/tests/diskqueue.sh
+++ b/tests/diskqueue.sh
@@ -4,6 +4,10 @@
# memory to disk mode for DA queues.
# added 2009-04-17 by Rgerhards
# This file is part of the rsyslog project, released under GPLv3
+# uncomment for debugging support:
+#set -o xtrace
+#export RSYSLOG_DEBUG="debug nostdout"
+#export RSYSLOG_DEBUGLOG="tmp"
echo testing queue disk-only mode
rm -rf test-spool
mkdir test-spool
diff --git a/tests/waitqueueempty.sh b/tests/waitqueueempty.sh
index 2c047588..4825853a 100755
--- a/tests/waitqueueempty.sh
+++ b/tests/waitqueueempty.sh
@@ -1,4 +1,5 @@
# wait until main message queue is empty. This is currently done in
# a separate shell script so that we can change the implementation
# at some later point. -- rgerhards, 2009-05-25
-echo WaitMainQueueEmpty | nc 127.0.0.1 13500
+#echo WaitMainQueueEmpty | nc 127.0.0.1 13500
+echo WaitMainQueueEmpty | java -classpath $abs_top_builddir DiagTalker