summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-25 10:47:22 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-25 10:47:22 +0200
commiteb1615068c6a704287eda732d287280df4cc4c44 (patch)
treebf5d8b9cfcdf4203839faf03535149e6ad66f8fc
parent7adb9877f0c08f929d89f436103dfade03e8ea07 (diff)
downloadrsyslog-eb1615068c6a704287eda732d287280df4cc4c44.tar.gz
rsyslog-eb1615068c6a704287eda732d287280df4cc4c44.tar.xz
rsyslog-eb1615068c6a704287eda732d287280df4cc4c44.zip
added new testing module imdiag
which enables to talk to the rsyslog core at runtime. The current implementation is only a beginning, but can be expanded over time
-rw-r--r--ChangeLog3
-rw-r--r--Makefile.am2
-rw-r--r--dirty.h1
-rw-r--r--plugins/imdiag/imdiag.c247
-rw-r--r--runtime/netstrm.c1
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--tcps_sess.c27
-rw-r--r--tcps_sess.h8
-rw-r--r--tcpsrv.c26
-rw-r--r--tcpsrv.h5
-rw-r--r--tests/Makefile.am1
-rwxr-xr-xtests/diskqueue.sh2
-rwxr-xr-xtests/fieldtest.sh2
-rwxr-xr-xtests/inputname.sh2
-rwxr-xr-xtests/manytcp.sh2
-rwxr-xr-xtests/omod-if-array.sh2
-rwxr-xr-xtests/parsertest.sh2
-rw-r--r--tests/testsuites/diskqueue.conf3
-rw-r--r--tests/testsuites/manytcp.conf3
-rwxr-xr-xtests/waitqueueempty.sh4
-rw-r--r--tools/syslogd.c27
21 files changed, 289 insertions, 82 deletions
diff --git a/ChangeLog b/ChangeLog
index b7a0a67a..a0357889 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -17,6 +17,9 @@ Version 4.3.1 [DEVEL] (rgerhards), 2009-04-??
- bugfix: light and full delay watermarks had invalid values, badly
affecting performance for delayable inputs
- build system improvements - thanks to Michael Biebl
+- added new testing module imdiag, which enables to talk to the
+ rsyslog core at runtime. The current implementation is only a
+ beginning, but can be expanded over time
---------------------------------------------------------------------------
Version 4.3.0 [DEVEL] (rgerhards), 2009-04-17
- new feature: new output plugin omprog, which permits to start program
diff --git a/Makefile.am b/Makefile.am
index a5cf879c..8d57700f 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -131,5 +131,5 @@ SUBDIRS += tests
# temporarily be removed below. The intent behind forcing everthing to compile
# in a make distcheck is so that we detect code that accidently was not updated
# when some global update happened.
-DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog
+DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog --enable-imdiag
ACLOCAL_AMFLAGS = -I m4
diff --git a/dirty.h b/dirty.h
index db9bc31b..6d585753 100644
--- a/dirty.h
+++ b/dirty.h
@@ -32,6 +32,7 @@ rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags);
rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, uchar *pszInputName, struct syslogTime *stTime, time_t ttGenTime);
int parseRFCSyslogMsg(msg_t *pMsg, int flags);
int parseLegacySyslogMsg(msg_t *pMsg, int flags);
+rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */
/* TODO: the following 2 need to go in conf obj interface... */
rsRetVal cflineParseTemplateName(uchar** pp, omodStringRequest_t *pOMSR, int iEntry, int iTplOpts, uchar *dfltTplName);
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 90c5d9ee..40f94692 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -26,8 +26,8 @@
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
-
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
@@ -43,114 +43,247 @@
#include <fcntl.h>
#endif
#include "rsyslog.h"
-//#include "dirty.h"
+#include "dirty.h"
#include "cfsysline.h"
#include "module-template.h"
+#include "unicode-helper.h"
#include "net.h"
#include "netstrm.h"
-#include "netstrms.h"
#include "errmsg.h"
+#include "tcpsrv.h"
+#include "srUtils.h"
+#include "net.h" /* for permittedPeers, may be removed when this is removed */
MODULE_TYPE_INPUT
/* static data */
DEF_IMOD_STATIC_DATA
+DEFobjCurrIf(tcpsrv)
+DEFobjCurrIf(tcps_sess)
DEFobjCurrIf(net)
DEFobjCurrIf(netstrm)
-DEFobjCurrIf(netstrms)
DEFobjCurrIf(errmsg)
/* Module static data */
-netstrms_t *pNS; /**< pointer to network stream subsystem */
-netstrm_t *arrLstn[10]; /**< our netstream listners */
-int iLstnMax = 0; /**< max nbr of listeners currently supported */
+static tcpsrv_t *pOurTcpsrv = NULL; /* our TCP server(listener) TODO: change for multiple instances */
+static permittedPeers_t *pPermPeersRoot = NULL;
/* config settings */
+static int iTCPSessMax = 20; /* max number of sessions */
+static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */
+static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */
+static uchar *pszInputName = NULL; /* value for inputname property, NULL is OK and handled by core engine */
+
+
+/* callbacks */
+/* this shall go into a specific ACL module! */
+static int
+isPermittedHost(struct sockaddr __attribute__((unused)) *addr, char __attribute__((unused)) *fromHostFQDN,
+ void __attribute__((unused)) *pUsrSrv, void __attribute__((unused)) *pUsrSess)
+{
+ return 1; /* TODO: implement ACLs ... or via some other way? */
+}
-/* add a listen socket to our listen socket array. This is a callback
- * invoked from the netstrm class. -- rgerhards, 2008-04-23
+static rsRetVal
+doOpenLstnSocks(tcpsrv_t *pSrv)
+{
+ ISOBJ_TYPE_assert(pSrv, tcpsrv);
+ return tcpsrv.create_tcp_socket(pSrv);
+}
+
+
+static rsRetVal
+doRcvData(tcps_sess_t *pSess, char *buf, size_t lenBuf, ssize_t *piLenRcvd)
+{
+ DEFiRet;
+ assert(pSess != NULL);
+ assert(piLenRcvd != NULL);
+
+ *piLenRcvd = lenBuf;
+ CHKiRet(netstrm.Rcv(pSess->pStrm, (uchar*) buf, piLenRcvd));
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+onRegularClose(tcps_sess_t *pSess)
+{
+ DEFiRet;
+ assert(pSess != NULL);
+
+ /* process any incomplete frames left over */
+ tcps_sess.PrepareClose(pSess);
+ /* Session closed */
+ tcps_sess.Close(pSess);
+ RETiRet;
+}
+
+
+static rsRetVal
+onErrClose(tcps_sess_t *pSess)
+{
+ DEFiRet;
+ assert(pSess != NULL);
+
+ tcps_sess.Close(pSess);
+ RETiRet;
+}
+
+/* ------------------------------ end callbacks ------------------------------ */
+
+
+/* This function waits until the main queue is drained (size = 0)
*/
static rsRetVal
-addTcpLstn(netstrm_t *pLstn)
+waitMainQEmpty(void)
{
+ int iMsgQueueSize;
DEFiRet;
- ISOBJ_TYPE_assert(pLstn, netstrm);
+ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
+ while(iMsgQueueSize > 0) {
+ srSleep(0,2); /* wait a little bit */
+ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
- if((unsigned)iLstnMax >= sizeof(arrLstn)/sizeof(netstrm_t*))
- ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED);
+/* Function to handle received messages. This is our core function!
+ * rgerhards, 2009-05-24
+ */
+static rsRetVal
+OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg)
+{
+ ssize_t len;
+ int iMsgQueueSize;
+ uchar *pszMsg;
+ uchar buf[1024];
+ DEFiRet;
- arrLstn[iLstnMax] = pLstn;
- ++iLstnMax;
+ assert(pSess != NULL);
+ assert(pRcv != NULL);
+
+ /* NOTE: pRcv is NOT a C-String but rather an array of characters
+ * WITHOUT a termination \0 char. So we need to convert it to one
+ * before proceeding.
+ */
+ CHKmalloc(pszMsg = malloc(sizeof(uchar) * (iLenMsg + 1)));
+ memcpy(pszMsg, pRcv, iLenMsg);
+ pszMsg[iLenMsg] = '\0';
+
+ if(!ustrcmp(pszMsg, UCHAR_CONSTANT("GetMainMsgQueueSize"))) {
+ CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize));
+ len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "%d\n", iMsgQueueSize);
+ CHKiRet(netstrm.Send(pSess->pStrm, buf, &len));
+ } else if(!ustrcmp(pszMsg, UCHAR_CONSTANT("WaitMainQueueEmpty"))) {
+ CHKiRet(waitMainQEmpty());
+ len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "mainqueue empty\n");
+ CHKiRet(netstrm.Send(pSess->pStrm, buf, &len));
+ } else {
+ len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "unkown command '%s'\n", pszMsg);
+ CHKiRet(netstrm.Send(pSess->pStrm, buf, &len));
+ }
finalize_it:
RETiRet;
}
-/* initialize network stream subsystem */
+/* set permitted peer -- rgerhards, 2008-05-19
+ */
static rsRetVal
-initNetstrm(void)
+setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID)
+{
+ DEFiRet;
+ CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID));
+ free(pszID); /* no longer needed, but we need to free as of interface def */
+finalize_it:
+ RETiRet;
+}
+
+
+static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVal)
{
DEFiRet;
- /* prepare network stream subsystem */
- CHKiRet(netstrms.Construct(&pNS));
- CHKiRet(netstrms.SetDrvrMode(pNS, 0)); /* always plain text */
- //CHKiRet(netstrms.SetDrvrAuthMode(pThis->pNS, pThis->pszDrvrAuthMode));
- //CHKiRet(netstrms.SetDrvrPermPeers(pThis->pNS, pThis->pPermPeers));
- // TODO: set driver!
- CHKiRet(netstrms.ConstructFinalize(&pNS));
+ if(pOurTcpsrv == NULL) {
+ CHKiRet(tcpsrv.Construct(&pOurTcpsrv));
+ CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax));
+ CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost));
+ CHKiRet(tcpsrv.SetCBRcvData(pOurTcpsrv, doRcvData));
+ CHKiRet(tcpsrv.SetCBOpenLstnSocks(pOurTcpsrv, doOpenLstnSocks));
+ CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose));
+ CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose));
+ CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode));
+ CHKiRet(tcpsrv.SetOnMsgReceive(pOurTcpsrv, OnMsgReceived));
+ /* now set optional params, but only if they were actually configured */
+ if(pszStrmDrvrAuthMode != NULL) {
+ CHKiRet(tcpsrv.SetDrvrAuthMode(pOurTcpsrv, pszStrmDrvrAuthMode));
+ }
+ if(pPermPeersRoot != NULL) {
+ CHKiRet(tcpsrv.SetDrvrPermPeers(pOurTcpsrv, pPermPeersRoot));
+ }
+ }
- /* set up listeners */
- CHKiRet(netstrm.LstnInit(pNS, NULL, addTcpLstn, "127.0.0.1", (uchar*)"44514", 1));
+ /* initialized, now add socket */
+ CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, pszInputName == NULL ?
+ UCHAR_CONSTANT("imdiag") : pszInputName));
+ tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal);
finalize_it:
if(iRet != RS_RET_OK) {
- if(pNS != NULL)
- netstrms.Destruct(&pNS);
+ errmsg.LogError(0, NO_ERRCODE, "error %d trying to add listener", iRet);
+ if(pOurTcpsrv != NULL)
+ tcpsrv.Destruct(&pOurTcpsrv);
}
RETiRet;
}
-
-/* This function is called to gather input. In our case, it is a bit abused
- * to drive the listener loop for the diagnostics code.
+/* This function is called to gather input.
*/
BEGINrunInput
CODESTARTrunInput
+ CHKiRet(tcpsrv.ConstructFinalize(pOurTcpsrv));
+ iRet = tcpsrv.Run(pOurTcpsrv);
+finalize_it:
ENDrunInput
/* initialize and return if will run or not */
BEGINwillRun
CODESTARTwillRun
- iRet = initNetstrm();
+ /* first apply some config settings */
+ if(pOurTcpsrv == NULL)
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+finalize_it:
ENDwillRun
BEGINafterRun
- int i;
CODESTARTafterRun
/* do cleanup here */
- /* finally close our listen streams */
- for(i = 0 ; i < iLstnMax ; ++i) {
- netstrm.Destruct(arrLstn[i]);
- }
-
- /* destruct netstream subsystem */
- netstrms.Destruct(pNS);
ENDafterRun
BEGINmodExit
CODESTARTmodExit
+ if(pOurTcpsrv != NULL)
+ iRet = tcpsrv.Destruct(&pOurTcpsrv);
+
+ if(pPermPeersRoot != NULL) {
+ net.DestructPermittedPeers(&pPermPeersRoot);
+ }
+
/* release objects we used */
objRelease(net, LM_NET_FILENAME);
- objRelease(netstrm, DONT_LOAD_LIB);
- objRelease(netstrms, LM_NETSTRMS_FILENAME);
+ objRelease(netstrm, LM_NETSTRMS_FILENAME);
+ objRelease(tcps_sess, LM_TCPSRV_FILENAME);
+ objRelease(tcpsrv, LM_TCPSRV_FILENAME);
objRelease(errmsg, CORE_COMPONENT);
ENDmodExit
@@ -158,6 +291,14 @@ ENDmodExit
static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
+ iTCPSessMax = 200;
+ iStrmDrvrMode = 0;
+ free(pszInputName);
+ pszInputName = NULL;
+ if(pszStrmDrvrAuthMode != NULL) {
+ free(pszStrmDrvrAuthMode);
+ pszStrmDrvrAuthMode = NULL;
+ }
return RS_RET_OK;
}
@@ -173,26 +314,28 @@ BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
+ pOurTcpsrv = NULL;
/* request objects we use */
CHKiRet(objUse(net, LM_NET_FILENAME));
- CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
- CHKiRet(objUse(netstrm, DONT_LOAD_LIB));
+ CHKiRet(objUse(netstrm, LM_NETSTRMS_FILENAME));
+ CHKiRet(objUse(tcps_sess, LM_TCPSRV_FILENAME));
+ CHKiRet(objUse(tcpsrv, LM_TCPSRV_FILENAME));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
/* register config file handlers */
-#if 0
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpserverrun", 0, eCmdHdlrGetWord,
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpmaxsessions", 0, eCmdHdlrInt,
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagmaxsessions"), 0, eCmdHdlrInt,
NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpserverstreamdrivermode", 0,
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverstreamdrivermode"), 0,
eCmdHdlrInt, NULL, &iStrmDrvrMode, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpserverstreamdriverauthmode", 0,
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverstreamdriverauthmode"), 0,
eCmdHdlrGetWord, NULL, &pszStrmDrvrAuthMode, STD_LOADABLE_MODULE_ID));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpserverstreamdriverpermittedpeer", 0,
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverstreamdriverpermittedpeer"), 0,
eCmdHdlrGetWord, setPermittedPeer, NULL, STD_LOADABLE_MODULE_ID));
-#endif
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverinputname"), 0,
+ eCmdHdlrGetWord, NULL, &pszInputName, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler,
resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit
diff --git a/runtime/netstrm.c b/runtime/netstrm.c
index 459561bc..f6a8de7f 100644
--- a/runtime/netstrm.c
+++ b/runtime/netstrm.c
@@ -148,7 +148,6 @@ LstnInit(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*),
ISOBJ_TYPE_assert(pNS, netstrms);
assert(fAddLstn != NULL);
assert(pLstnPort != NULL);
-RUNLOG_STR("XXX: Init Lstn");
CHKiRet(pNS->Drvr.LstnInit(pNS, pUsr, fAddLstn, pLstnPort, pLstnIP, iSessMax));
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 8e8a9f2a..77d845fd 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -84,6 +84,7 @@ typedef rsRetVal (*errLogFunc_t)(uchar*); /* this is a trick to store a function
typedef struct permittedPeers_s permittedPeers_t; /* this should go away in the long term -- rgerhards, 2008-05-19 */
typedef struct permittedPeerWildcard_s permittedPeerWildcard_t; /* this should go away in the long term -- rgerhards, 2008-05-19 */
typedef struct tcpsrv_s tcpsrv_t;
+typedef struct tcps_sess_s tcps_sess_t;
typedef struct vmstk_s vmstk_t;
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */
diff --git a/tcps_sess.c b/tcps_sess.c
index c564caea..c4548804 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -58,6 +58,7 @@ static int iMaxLine; /* maximum size of a single message */
/* forward definitions */
static rsRetVal Close(tcps_sess_t *pThis);
+static rsRetVal defaultDoSubmitMessage(tcps_sess_t *pThis, uchar*, int);
/* Standard-Constructor */
@@ -65,6 +66,7 @@ BEGINobjConstruct(tcps_sess) /* be sure to specify the object type also in END m
pThis->iMsg = 0; /* just make sure... */
pThis->bAtStrtOfFram = 1; /* indicate frame header expected */
pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; /* just make sure... */
+ pThis->DoSubmitMessage = defaultDoSubmitMessage;
/* now allocate the message reception buffer */
CHKmalloc(pThis->pMsg = (uchar*) malloc(sizeof(uchar) * iMaxLine + 1));
finalize_it:
@@ -206,6 +208,15 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr)
}
+static rsRetVal
+SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
+{
+ DEFiRet;
+ pThis->DoSubmitMessage = OnMsgReceive;
+ RETiRet;
+}
+
+
/* This is a helper for submitting the message to the rsyslog core.
* It does some common processing, including resetting the various
* state variables to a "processed" state.
@@ -217,8 +228,11 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr)
* rgerhards, 2009-04-23
*/
static rsRetVal
-doSubmitMessage(tcps_sess_t *pThis)
+defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg)
{
+// TODO: make calling this overridable so that the diag module can ask to be called
+// and so it can do its work right in this entry point (but we need to check that
+// we have the capability to send a reply at this point).
msg_t *pMsg;
struct syslogTime stTime;
time_t ttGenTime;
@@ -233,7 +247,7 @@ doSubmitMessage(tcps_sess_t *pThis)
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
/* first trim the buffer to what we have actually received */
CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg));
- memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg);
+ memcpy(pMsg->pszRawMsg, pszMsg, iLenMsg);
pMsg->iLenRawMsg = pThis->iMsg;
MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName);
MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
@@ -291,7 +305,7 @@ PrepareClose(tcps_sess_t *pThis)
* this case.
*/
dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n");
- doSubmitMessage(pThis);
+ pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
}
finalize_it:
@@ -372,7 +386,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(pThis->iMsg >= iMaxLine) {
/* emergency, we now need to flush, no matter if we are at end of message or not... */
dbgprintf("error: message received is larger than max msg size, we split it\n");
- doSubmitMessage(pThis);
+ pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
/* we might think if it is better to ignore the rest of the
* message than to treat it as a new one. Maybe this is a good
* candidate for a configuration parameter...
@@ -383,7 +397,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(( (c == '\n')
|| ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim))
) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */
- doSubmitMessage(pThis);
+ pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
pThis->inputState = eAtStrtFram;
} else {
/* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes!
@@ -400,7 +414,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
pThis->iOctetsRemain--;
if(pThis->iOctetsRemain < 1) {
/* we have end of frame! */
- doSubmitMessage(pThis);
+ pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
pThis->inputState = eAtStrtFram;
}
}
@@ -474,6 +488,7 @@ CODESTARTobjQueryInterface(tcps_sess)
pIf->SetHostIP = SetHostIP;
pIf->SetStrm = SetStrm;
pIf->SetMsgIdx = SetMsgIdx;
+ pIf->SetOnMsgReceive = SetOnMsgReceive;
finalize_it:
ENDobjQueryInterface(tcps_sess)
diff --git a/tcps_sess.h b/tcps_sess.h
index 2ef28264..5e59aaab 100644
--- a/tcps_sess.h
+++ b/tcps_sess.h
@@ -29,7 +29,7 @@
struct tcpsrv_s;
/* the tcps_sess object */
-typedef struct tcps_sess_s {
+struct tcps_sess_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
tcpsrv_t *pSrv; /* pointer back to my server (e.g. for callbacks) */
tcpLstnPortList_t *pLstnInfo; /* pointer back to listener info */
@@ -46,8 +46,9 @@ typedef struct tcps_sess_s {
uchar *pMsg; /* message (fragment) received */
uchar *fromHost;
uchar *fromHostIP;
- void *pUsr; /* a user-pointer */
-} tcps_sess_t;
+ void *pUsr; /* a user-pointer */
+ rsRetVal (*DoSubmitMessage)(tcps_sess_t*, uchar*, int); /* submit message callback */
+};
/* interfaces */
@@ -67,6 +68,7 @@ BEGINinterface(tcps_sess) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetHostIP)(tcps_sess_t *pThis, uchar*);
rsRetVal (*SetStrm)(tcps_sess_t *pThis, netstrm_t*);
rsRetVal (*SetMsgIdx)(tcps_sess_t *pThis, int);
+ rsRetVal (*SetOnMsgReceive)(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int));
ENDinterface(tcps_sess)
#define tcps_sessCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */
/* interface changes
diff --git a/tcpsrv.c b/tcpsrv.c
index bbd95058..249eeecf 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -380,14 +380,15 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
errno = 0;
errmsg.LogError(0, RS_RET_MAX_SESS_REACHED, "too many tcp sessions - dropping incoming request");
ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
- } else {
- /* we found a free spot and can construct our session object */
- CHKiRet(tcps_sess.Construct(&pSess));
- CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
- CHKiRet(tcps_sess.SetLstnInfo(pSess, pLstnInfo));
}
- /* OK, we have a "good" index... */
+ /* we found a free spot and can construct our session object */
+ CHKiRet(tcps_sess.Construct(&pSess));
+ CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
+ CHKiRet(tcps_sess.SetLstnInfo(pSess, pLstnInfo));
+ if(pThis->OnMsgReceive != NULL)
+ CHKiRet(tcps_sess.SetOnMsgReceive(pSess, pThis->OnMsgReceive));
+
/* get the host name */
CHKiRet(netstrm.GetRemoteHName(pNewStrm, &fromHostFQDN));
CHKiRet(netstrm.GetRemoteIP(pNewStrm, &fromHostIP));
@@ -568,6 +569,7 @@ finalize_it: /* this is a very special case - this time only we do not exit the
BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
pThis->iSessMax = TCPSESS_MAX_DEFAULT; /* TODO: useful default ;) */
pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ pThis->OnMsgReceive = NULL;
ENDobjConstruct(tcpsrv)
@@ -713,6 +715,16 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr)
RETiRet;
}
+static rsRetVal
+SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
+{
+ DEFiRet;
+ assert(OnMsgReceive != NULL);
+ pThis->OnMsgReceive = OnMsgReceive;
+ RETiRet;
+}
+
+
/* Set additional framing to use (if any) -- rgerhards, 2008-12-10 */
static rsRetVal
@@ -731,7 +743,6 @@ SetInputName(tcpsrv_t *pThis, uchar *name)
{
uchar *pszName;
DEFiRet;
-dbgprintf("XXX: SetInputName: %s\n", name);
ISOBJ_TYPE_assert(pThis, tcpsrv);
if(name == NULL)
pszName = NULL;
@@ -843,6 +854,7 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->SetCBOnDestruct = SetCBOnDestruct;
pIf->SetCBOnRegularClose = SetCBOnRegularClose;
pIf->SetCBOnErrClose = SetCBOnErrClose;
+ pIf->SetOnMsgReceive = SetOnMsgReceive;
finalize_it:
ENDobjQueryInterface(tcpsrv)
diff --git a/tcpsrv.h b/tcpsrv.h
index 0f7dd6c6..2d174ce0 100644
--- a/tcpsrv.h
+++ b/tcpsrv.h
@@ -71,6 +71,7 @@ struct tcpsrv_s {
rsRetVal (*pOnSessAccept)(tcpsrv_t *, tcps_sess_t*);
rsRetVal (*OnSessConstructFinalize)(void*);
rsRetVal (*pOnSessDestruct)(void*);
+ rsRetVal (*OnMsgReceive)(tcps_sess_t *, uchar *pszMsg, int iLenMsg); /* submit message callback */
};
@@ -104,8 +105,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetCBOnSessConstructFinalize)(tcpsrv_t*, rsRetVal (*) (void*));
/* added v5 */
rsRetVal (*SetSessMax)(tcpsrv_t *pThis, int iMaxSess); /* 2009-04-09 */
+ /* added v6 */
+ rsRetVal (*SetOnMsgReceive)(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)); /* 2009-05-24 */
ENDinterface(tcpsrv)
-#define tcpsrvCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */
+#define tcpsrvCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
/* change for v4:
* - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10
* - SetInputName() added -- rgerhards, 2008-12-10
diff --git a/tests/Makefile.am b/tests/Makefile.am
index c0e629d3..ed48fce8 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -46,6 +46,7 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
testsuites/1.inputname_imtcp_12515 \
testsuites/1.inputname_imtcp_12516 \
omod-if-array.sh \
+ waitqueueempty.sh \
cfg.sh
ourtail_SOURCES = ourtail.c
diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh
index 6384eb64..a91f3414 100755
--- a/tests/diskqueue.sh
+++ b/tests/diskqueue.sh
@@ -17,7 +17,7 @@ if [ "$?" -ne "0" ]; then
echo "error during tcpflood! see rsyslog.out.log.save for what was written"
cp rsyslog.out.log rsyslog.out.log.save
fi
-sleep 4 # we need this so that rsyslogd can receive all outstanding messages
+$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
diff --git a/tests/fieldtest.sh b/tests/fieldtest.sh
index 7a646f00..482fa143 100755
--- a/tests/fieldtest.sh
+++ b/tests/fieldtest.sh
@@ -1,5 +1,5 @@
echo test fieldtest via udp
-./killrsyslog.sh # kill rsyslogd if it runs for some reason
+$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
./nettester -tfield1 -iudp
if [ "$?" -ne "0" ]; then
diff --git a/tests/inputname.sh b/tests/inputname.sh
index 7d9ea111..e1a58517 100755
--- a/tests/inputname.sh
+++ b/tests/inputname.sh
@@ -1,5 +1,5 @@
echo testing $InputTCPServerInputName directive
-./killrsyslog.sh # kill rsyslogd if it runs for some reason
+$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
echo port 12514
./nettester -tinputname_imtcp_12514 -cinputname_imtcp -itcp -p12514
diff --git a/tests/manytcp.sh b/tests/manytcp.sh
index d9b2e9a0..06bd38b6 100755
--- a/tests/manytcp.sh
+++ b/tests/manytcp.sh
@@ -8,7 +8,7 @@ if [ "$?" -ne "0" ]; then
echo "error during tcpflood! see rsyslog.out.log.save for what was written"
cp rsyslog.out.log rsyslog.out.log.save
fi
-sleep 5 # we need this so that rsyslogd can receive all outstanding messages
+$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
diff --git a/tests/omod-if-array.sh b/tests/omod-if-array.sh
index 7b4b5611..2c2a8ef3 100755
--- a/tests/omod-if-array.sh
+++ b/tests/omod-if-array.sh
@@ -1,5 +1,5 @@
echo test omod-if-array via udp
-./killrsyslog.sh # kill rsyslogd if it runs for some reason
+$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
./nettester -tomod-if-array -iudp -p4711
if [ "$?" -ne "0" ]; then
diff --git a/tests/parsertest.sh b/tests/parsertest.sh
index 152d8b60..afdb9469 100755
--- a/tests/parsertest.sh
+++ b/tests/parsertest.sh
@@ -1,5 +1,5 @@
echo test parsertest via udp
-./killrsyslog.sh # kill rsyslogd if it runs for some reason
+$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
./nettester -tparse1 -iudp
if [ "$?" -ne "0" ]; then
diff --git a/tests/testsuites/diskqueue.conf b/tests/testsuites/diskqueue.conf
index 8851a459..017ee96d 100644
--- a/tests/testsuites/diskqueue.conf
+++ b/tests/testsuites/diskqueue.conf
@@ -4,6 +4,9 @@ $ModLoad ../plugins/imtcp/.libs/imtcp
$MainMsgQueueTimeoutShutdown 10000
$InputTCPServerRun 13514
+$ModLoad ../plugins/imdiag/.libs/imdiag
+$IMDiagServerRun 13500
+
$ErrorMessagesToStderr off
# set spool locations and switch queue to disk-only mode
diff --git a/tests/testsuites/manytcp.conf b/tests/testsuites/manytcp.conf
index 8175732e..3867da46 100644
--- a/tests/testsuites/manytcp.conf
+++ b/tests/testsuites/manytcp.conf
@@ -6,6 +6,9 @@ $MaxOpenFiles 2000
$InputTCPMaxSessions 1100
$InputTCPServerRun 13514
+$ModLoad ../plugins/imdiag/.libs/imdiag
+$IMDiagServerRun 13500
+
$ErrorMessagesToStderr off
$template outfmt,"%msg:F,58:2%\n"
diff --git a/tests/waitqueueempty.sh b/tests/waitqueueempty.sh
new file mode 100755
index 00000000..2c047588
--- /dev/null
+++ b/tests/waitqueueempty.sh
@@ -0,0 +1,4 @@
+# wait until main message queue is empty. This is currently done in
+# a separate shell script so that we can change the implementation
+# at some later point. -- rgerhards, 2009-05-25
+echo WaitMainQueueEmpty | nc 127.0.0.1 13500
diff --git a/tools/syslogd.c b/tools/syslogd.c
index dc5b8fee..0b860448 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -350,10 +350,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
bDebugPrintModuleList = 1;
bEscapeCCOnRcv = 1; /* default is to escape control characters */
bReduceRepeatMsgs = 0;
- if(pszMainMsgQFName != NULL) {
- free(pszMainMsgQFName);
- pszMainMsgQFName = NULL;
- }
+ free(pszMainMsgQFName);
+ pszMainMsgQFName = NULL;
iMainMsgQueueSize = 10000;
iMainMsgQHighWtrMark = 8000;
iMainMsgQLowWtrMark = 2000;
@@ -412,6 +410,26 @@ static int usage(void)
}
+/* ------------------------------ some support functions for imdiag ------------------------------ *
+ * This is a bit dirty, but the only way to do it, at least with reasonable effort.
+ * rgerhards, 2009-05-25
+ */
+
+/* return back the approximate current number of messages in the main message queue
+ */
+rsRetVal
+diagGetMainMsgQSize(int *piSize)
+{
+ DEFiRet;
+ assert(piSize != NULL);
+ *piSize = pMsgQueue->iQueueSize;
+ RETiRet;
+}
+
+
+/* ------------------------------ end support functions for imdiag ------------------------------ */
+
+
/* function to destruct a selector_t object
* rgerhards, 2007-08-01
*/
@@ -2658,7 +2676,6 @@ init(void)
ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
/* switch the message object to threaded operation, if necessary */
-/* TODO:XXX: I think we must do this also if we have action queues! -- rgerhards, 2009-01-26 */
if(MainMsgQueType == QUEUETYPE_DIRECT || iMainMsgQueueNumWorkers > 1) {
MsgEnableThreadSafety();
}