diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-25 10:47:22 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-25 10:47:22 +0200 |
commit | eb1615068c6a704287eda732d287280df4cc4c44 (patch) | |
tree | bf5d8b9cfcdf4203839faf03535149e6ad66f8fc | |
parent | 7adb9877f0c08f929d89f436103dfade03e8ea07 (diff) | |
download | rsyslog-eb1615068c6a704287eda732d287280df4cc4c44.tar.gz rsyslog-eb1615068c6a704287eda732d287280df4cc4c44.tar.xz rsyslog-eb1615068c6a704287eda732d287280df4cc4c44.zip |
added new testing module imdiag
which enables to talk to the rsyslog core at runtime. The current
implementation is only a beginning, but can be expanded over time
-rw-r--r-- | ChangeLog | 3 | ||||
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | dirty.h | 1 | ||||
-rw-r--r-- | plugins/imdiag/imdiag.c | 247 | ||||
-rw-r--r-- | runtime/netstrm.c | 1 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 | ||||
-rw-r--r-- | tcps_sess.c | 27 | ||||
-rw-r--r-- | tcps_sess.h | 8 | ||||
-rw-r--r-- | tcpsrv.c | 26 | ||||
-rw-r--r-- | tcpsrv.h | 5 | ||||
-rw-r--r-- | tests/Makefile.am | 1 | ||||
-rwxr-xr-x | tests/diskqueue.sh | 2 | ||||
-rwxr-xr-x | tests/fieldtest.sh | 2 | ||||
-rwxr-xr-x | tests/inputname.sh | 2 | ||||
-rwxr-xr-x | tests/manytcp.sh | 2 | ||||
-rwxr-xr-x | tests/omod-if-array.sh | 2 | ||||
-rwxr-xr-x | tests/parsertest.sh | 2 | ||||
-rw-r--r-- | tests/testsuites/diskqueue.conf | 3 | ||||
-rw-r--r-- | tests/testsuites/manytcp.conf | 3 | ||||
-rwxr-xr-x | tests/waitqueueempty.sh | 4 | ||||
-rw-r--r-- | tools/syslogd.c | 27 |
21 files changed, 289 insertions, 82 deletions
@@ -17,6 +17,9 @@ Version 4.3.1 [DEVEL] (rgerhards), 2009-04-?? - bugfix: light and full delay watermarks had invalid values, badly affecting performance for delayable inputs - build system improvements - thanks to Michael Biebl +- added new testing module imdiag, which enables to talk to the + rsyslog core at runtime. The current implementation is only a + beginning, but can be expanded over time --------------------------------------------------------------------------- Version 4.3.0 [DEVEL] (rgerhards), 2009-04-17 - new feature: new output plugin omprog, which permits to start program diff --git a/Makefile.am b/Makefile.am index a5cf879c..8d57700f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -131,5 +131,5 @@ SUBDIRS += tests # temporarily be removed below. The intent behind forcing everthing to compile # in a make distcheck is so that we detect code that accidently was not updated # when some global update happened. -DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog +DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog --enable-imdiag ACLOCAL_AMFLAGS = -I m4 @@ -32,6 +32,7 @@ rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, uchar *pszInputName, struct syslogTime *stTime, time_t ttGenTime); int parseRFCSyslogMsg(msg_t *pMsg, int flags); int parseLegacySyslogMsg(msg_t *pMsg, int flags); +rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ /* TODO: the following 2 need to go in conf obj interface... */ rsRetVal cflineParseTemplateName(uchar** pp, omodStringRequest_t *pOMSR, int iEntry, int iTplOpts, uchar *dfltTplName); diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 90c5d9ee..40f94692 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -26,8 +26,8 @@ * * A copy of the GPL can be found in the file "COPYING" in this distribution. */ - #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -43,114 +43,247 @@ #include <fcntl.h> #endif #include "rsyslog.h" -//#include "dirty.h" +#include "dirty.h" #include "cfsysline.h" #include "module-template.h" +#include "unicode-helper.h" #include "net.h" #include "netstrm.h" -#include "netstrms.h" #include "errmsg.h" +#include "tcpsrv.h" +#include "srUtils.h" +#include "net.h" /* for permittedPeers, may be removed when this is removed */ MODULE_TYPE_INPUT /* static data */ DEF_IMOD_STATIC_DATA +DEFobjCurrIf(tcpsrv) +DEFobjCurrIf(tcps_sess) DEFobjCurrIf(net) DEFobjCurrIf(netstrm) -DEFobjCurrIf(netstrms) DEFobjCurrIf(errmsg) /* Module static data */ -netstrms_t *pNS; /**< pointer to network stream subsystem */ -netstrm_t *arrLstn[10]; /**< our netstream listners */ -int iLstnMax = 0; /**< max nbr of listeners currently supported */ +static tcpsrv_t *pOurTcpsrv = NULL; /* our TCP server(listener) TODO: change for multiple instances */ +static permittedPeers_t *pPermPeersRoot = NULL; /* config settings */ +static int iTCPSessMax = 20; /* max number of sessions */ +static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */ +static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ +static uchar *pszInputName = NULL; /* value for inputname property, NULL is OK and handled by core engine */ + + +/* callbacks */ +/* this shall go into a specific ACL module! */ +static int +isPermittedHost(struct sockaddr __attribute__((unused)) *addr, char __attribute__((unused)) *fromHostFQDN, + void __attribute__((unused)) *pUsrSrv, void __attribute__((unused)) *pUsrSess) +{ + return 1; /* TODO: implement ACLs ... or via some other way? */ +} -/* add a listen socket to our listen socket array. This is a callback - * invoked from the netstrm class. -- rgerhards, 2008-04-23 +static rsRetVal +doOpenLstnSocks(tcpsrv_t *pSrv) +{ + ISOBJ_TYPE_assert(pSrv, tcpsrv); + return tcpsrv.create_tcp_socket(pSrv); +} + + +static rsRetVal +doRcvData(tcps_sess_t *pSess, char *buf, size_t lenBuf, ssize_t *piLenRcvd) +{ + DEFiRet; + assert(pSess != NULL); + assert(piLenRcvd != NULL); + + *piLenRcvd = lenBuf; + CHKiRet(netstrm.Rcv(pSess->pStrm, (uchar*) buf, piLenRcvd)); +finalize_it: + RETiRet; +} + +static rsRetVal +onRegularClose(tcps_sess_t *pSess) +{ + DEFiRet; + assert(pSess != NULL); + + /* process any incomplete frames left over */ + tcps_sess.PrepareClose(pSess); + /* Session closed */ + tcps_sess.Close(pSess); + RETiRet; +} + + +static rsRetVal +onErrClose(tcps_sess_t *pSess) +{ + DEFiRet; + assert(pSess != NULL); + + tcps_sess.Close(pSess); + RETiRet; +} + +/* ------------------------------ end callbacks ------------------------------ */ + + +/* This function waits until the main queue is drained (size = 0) */ static rsRetVal -addTcpLstn(netstrm_t *pLstn) +waitMainQEmpty(void) { + int iMsgQueueSize; DEFiRet; - ISOBJ_TYPE_assert(pLstn, netstrm); + CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + while(iMsgQueueSize > 0) { + srSleep(0,2); /* wait a little bit */ + CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + } + +finalize_it: + RETiRet; +} + - if((unsigned)iLstnMax >= sizeof(arrLstn)/sizeof(netstrm_t*)) - ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED); +/* Function to handle received messages. This is our core function! + * rgerhards, 2009-05-24 + */ +static rsRetVal +OnMsgReceived(tcps_sess_t *pSess, uchar *pRcv, int iLenMsg) +{ + ssize_t len; + int iMsgQueueSize; + uchar *pszMsg; + uchar buf[1024]; + DEFiRet; - arrLstn[iLstnMax] = pLstn; - ++iLstnMax; + assert(pSess != NULL); + assert(pRcv != NULL); + + /* NOTE: pRcv is NOT a C-String but rather an array of characters + * WITHOUT a termination \0 char. So we need to convert it to one + * before proceeding. + */ + CHKmalloc(pszMsg = malloc(sizeof(uchar) * (iLenMsg + 1))); + memcpy(pszMsg, pRcv, iLenMsg); + pszMsg[iLenMsg] = '\0'; + + if(!ustrcmp(pszMsg, UCHAR_CONSTANT("GetMainMsgQueueSize"))) { + CHKiRet(diagGetMainMsgQSize(&iMsgQueueSize)); + len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "%d\n", iMsgQueueSize); + CHKiRet(netstrm.Send(pSess->pStrm, buf, &len)); + } else if(!ustrcmp(pszMsg, UCHAR_CONSTANT("WaitMainQueueEmpty"))) { + CHKiRet(waitMainQEmpty()); + len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "mainqueue empty\n"); + CHKiRet(netstrm.Send(pSess->pStrm, buf, &len)); + } else { + len = snprintf((char*)buf, sizeof(buf)/sizeof(uchar), "unkown command '%s'\n", pszMsg); + CHKiRet(netstrm.Send(pSess->pStrm, buf, &len)); + } finalize_it: RETiRet; } -/* initialize network stream subsystem */ +/* set permitted peer -- rgerhards, 2008-05-19 + */ static rsRetVal -initNetstrm(void) +setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID) +{ + DEFiRet; + CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID)); + free(pszID); /* no longer needed, but we need to free as of interface def */ +finalize_it: + RETiRet; +} + + +static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVal) { DEFiRet; - /* prepare network stream subsystem */ - CHKiRet(netstrms.Construct(&pNS)); - CHKiRet(netstrms.SetDrvrMode(pNS, 0)); /* always plain text */ - //CHKiRet(netstrms.SetDrvrAuthMode(pThis->pNS, pThis->pszDrvrAuthMode)); - //CHKiRet(netstrms.SetDrvrPermPeers(pThis->pNS, pThis->pPermPeers)); - // TODO: set driver! - CHKiRet(netstrms.ConstructFinalize(&pNS)); + if(pOurTcpsrv == NULL) { + CHKiRet(tcpsrv.Construct(&pOurTcpsrv)); + CHKiRet(tcpsrv.SetSessMax(pOurTcpsrv, iTCPSessMax)); + CHKiRet(tcpsrv.SetCBIsPermittedHost(pOurTcpsrv, isPermittedHost)); + CHKiRet(tcpsrv.SetCBRcvData(pOurTcpsrv, doRcvData)); + CHKiRet(tcpsrv.SetCBOpenLstnSocks(pOurTcpsrv, doOpenLstnSocks)); + CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose)); + CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose)); + CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode)); + CHKiRet(tcpsrv.SetOnMsgReceive(pOurTcpsrv, OnMsgReceived)); + /* now set optional params, but only if they were actually configured */ + if(pszStrmDrvrAuthMode != NULL) { + CHKiRet(tcpsrv.SetDrvrAuthMode(pOurTcpsrv, pszStrmDrvrAuthMode)); + } + if(pPermPeersRoot != NULL) { + CHKiRet(tcpsrv.SetDrvrPermPeers(pOurTcpsrv, pPermPeersRoot)); + } + } - /* set up listeners */ - CHKiRet(netstrm.LstnInit(pNS, NULL, addTcpLstn, "127.0.0.1", (uchar*)"44514", 1)); + /* initialized, now add socket */ + CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, pszInputName == NULL ? + UCHAR_CONSTANT("imdiag") : pszInputName)); + tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal); finalize_it: if(iRet != RS_RET_OK) { - if(pNS != NULL) - netstrms.Destruct(&pNS); + errmsg.LogError(0, NO_ERRCODE, "error %d trying to add listener", iRet); + if(pOurTcpsrv != NULL) + tcpsrv.Destruct(&pOurTcpsrv); } RETiRet; } - -/* This function is called to gather input. In our case, it is a bit abused - * to drive the listener loop for the diagnostics code. +/* This function is called to gather input. */ BEGINrunInput CODESTARTrunInput + CHKiRet(tcpsrv.ConstructFinalize(pOurTcpsrv)); + iRet = tcpsrv.Run(pOurTcpsrv); +finalize_it: ENDrunInput /* initialize and return if will run or not */ BEGINwillRun CODESTARTwillRun - iRet = initNetstrm(); + /* first apply some config settings */ + if(pOurTcpsrv == NULL) + ABORT_FINALIZE(RS_RET_NO_RUN); +finalize_it: ENDwillRun BEGINafterRun - int i; CODESTARTafterRun /* do cleanup here */ - /* finally close our listen streams */ - for(i = 0 ; i < iLstnMax ; ++i) { - netstrm.Destruct(arrLstn[i]); - } - - /* destruct netstream subsystem */ - netstrms.Destruct(pNS); ENDafterRun BEGINmodExit CODESTARTmodExit + if(pOurTcpsrv != NULL) + iRet = tcpsrv.Destruct(&pOurTcpsrv); + + if(pPermPeersRoot != NULL) { + net.DestructPermittedPeers(&pPermPeersRoot); + } + /* release objects we used */ objRelease(net, LM_NET_FILENAME); - objRelease(netstrm, DONT_LOAD_LIB); - objRelease(netstrms, LM_NETSTRMS_FILENAME); + objRelease(netstrm, LM_NETSTRMS_FILENAME); + objRelease(tcps_sess, LM_TCPSRV_FILENAME); + objRelease(tcpsrv, LM_TCPSRV_FILENAME); objRelease(errmsg, CORE_COMPONENT); ENDmodExit @@ -158,6 +291,14 @@ ENDmodExit static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { + iTCPSessMax = 200; + iStrmDrvrMode = 0; + free(pszInputName); + pszInputName = NULL; + if(pszStrmDrvrAuthMode != NULL) { + free(pszStrmDrvrAuthMode); + pszStrmDrvrAuthMode = NULL; + } return RS_RET_OK; } @@ -173,26 +314,28 @@ BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr + pOurTcpsrv = NULL; /* request objects we use */ CHKiRet(objUse(net, LM_NET_FILENAME)); - CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME)); - CHKiRet(objUse(netstrm, DONT_LOAD_LIB)); + CHKiRet(objUse(netstrm, LM_NETSTRMS_FILENAME)); + CHKiRet(objUse(tcps_sess, LM_TCPSRV_FILENAME)); + CHKiRet(objUse(tcpsrv, LM_TCPSRV_FILENAME)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* register config file handlers */ -#if 0 - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpserverrun", 0, eCmdHdlrGetWord, + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpmaxsessions", 0, eCmdHdlrInt, + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagmaxsessions"), 0, eCmdHdlrInt, NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpserverstreamdrivermode", 0, + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverstreamdrivermode"), 0, eCmdHdlrInt, NULL, &iStrmDrvrMode, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpserverstreamdriverauthmode", 0, + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverstreamdriverauthmode"), 0, eCmdHdlrGetWord, NULL, &pszStrmDrvrAuthMode, STD_LOADABLE_MODULE_ID)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputtcpserverstreamdriverpermittedpeer", 0, + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverstreamdriverpermittedpeer"), 0, eCmdHdlrGetWord, setPermittedPeer, NULL, STD_LOADABLE_MODULE_ID)); -#endif - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("imdiagserverinputname"), 0, + eCmdHdlrGetWord, NULL, &pszInputName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit diff --git a/runtime/netstrm.c b/runtime/netstrm.c index 459561bc..f6a8de7f 100644 --- a/runtime/netstrm.c +++ b/runtime/netstrm.c @@ -148,7 +148,6 @@ LstnInit(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*), ISOBJ_TYPE_assert(pNS, netstrms); assert(fAddLstn != NULL); assert(pLstnPort != NULL); -RUNLOG_STR("XXX: Init Lstn"); CHKiRet(pNS->Drvr.LstnInit(pNS, pUsr, fAddLstn, pLstnPort, pLstnIP, iSessMax)); diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 8e8a9f2a..77d845fd 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -84,6 +84,7 @@ typedef rsRetVal (*errLogFunc_t)(uchar*); /* this is a trick to store a function typedef struct permittedPeers_s permittedPeers_t; /* this should go away in the long term -- rgerhards, 2008-05-19 */ typedef struct permittedPeerWildcard_s permittedPeerWildcard_t; /* this should go away in the long term -- rgerhards, 2008-05-19 */ typedef struct tcpsrv_s tcpsrv_t; +typedef struct tcps_sess_s tcps_sess_t; typedef struct vmstk_s vmstk_t; typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */ diff --git a/tcps_sess.c b/tcps_sess.c index c564caea..c4548804 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -58,6 +58,7 @@ static int iMaxLine; /* maximum size of a single message */ /* forward definitions */ static rsRetVal Close(tcps_sess_t *pThis); +static rsRetVal defaultDoSubmitMessage(tcps_sess_t *pThis, uchar*, int); /* Standard-Constructor */ @@ -65,6 +66,7 @@ BEGINobjConstruct(tcps_sess) /* be sure to specify the object type also in END m pThis->iMsg = 0; /* just make sure... */ pThis->bAtStrtOfFram = 1; /* indicate frame header expected */ pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; /* just make sure... */ + pThis->DoSubmitMessage = defaultDoSubmitMessage; /* now allocate the message reception buffer */ CHKmalloc(pThis->pMsg = (uchar*) malloc(sizeof(uchar) * iMaxLine + 1)); finalize_it: @@ -206,6 +208,15 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr) } +static rsRetVal +SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)) +{ + DEFiRet; + pThis->DoSubmitMessage = OnMsgReceive; + RETiRet; +} + + /* This is a helper for submitting the message to the rsyslog core. * It does some common processing, including resetting the various * state variables to a "processed" state. @@ -217,8 +228,11 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr) * rgerhards, 2009-04-23 */ static rsRetVal -doSubmitMessage(tcps_sess_t *pThis) +defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg) { +// TODO: make calling this overridable so that the diag module can ask to be called +// and so it can do its work right in this entry point (but we need to check that +// we have the capability to send a reply at this point). msg_t *pMsg; struct syslogTime stTime; time_t ttGenTime; @@ -233,7 +247,7 @@ doSubmitMessage(tcps_sess_t *pThis) CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); /* first trim the buffer to what we have actually received */ CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg)); - memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg); + memcpy(pMsg->pszRawMsg, pszMsg, iLenMsg); pMsg->iLenRawMsg = pThis->iMsg; MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); @@ -291,7 +305,7 @@ PrepareClose(tcps_sess_t *pThis) * this case. */ dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); - doSubmitMessage(pThis); + pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); } finalize_it: @@ -372,7 +386,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ dbgprintf("error: message received is larger than max msg size, we split it\n"); - doSubmitMessage(pThis); + pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... @@ -383,7 +397,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) if(( (c == '\n') || ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ - doSubmitMessage(pThis); + pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); pThis->inputState = eAtStrtFram; } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes! @@ -400,7 +414,7 @@ processDataRcvd(tcps_sess_t *pThis, char c) pThis->iOctetsRemain--; if(pThis->iOctetsRemain < 1) { /* we have end of frame! */ - doSubmitMessage(pThis); + pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg); pThis->inputState = eAtStrtFram; } } @@ -474,6 +488,7 @@ CODESTARTobjQueryInterface(tcps_sess) pIf->SetHostIP = SetHostIP; pIf->SetStrm = SetStrm; pIf->SetMsgIdx = SetMsgIdx; + pIf->SetOnMsgReceive = SetOnMsgReceive; finalize_it: ENDobjQueryInterface(tcps_sess) diff --git a/tcps_sess.h b/tcps_sess.h index 2ef28264..5e59aaab 100644 --- a/tcps_sess.h +++ b/tcps_sess.h @@ -29,7 +29,7 @@ struct tcpsrv_s; /* the tcps_sess object */ -typedef struct tcps_sess_s { +struct tcps_sess_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ tcpsrv_t *pSrv; /* pointer back to my server (e.g. for callbacks) */ tcpLstnPortList_t *pLstnInfo; /* pointer back to listener info */ @@ -46,8 +46,9 @@ typedef struct tcps_sess_s { uchar *pMsg; /* message (fragment) received */ uchar *fromHost; uchar *fromHostIP; - void *pUsr; /* a user-pointer */ -} tcps_sess_t; + void *pUsr; /* a user-pointer */ + rsRetVal (*DoSubmitMessage)(tcps_sess_t*, uchar*, int); /* submit message callback */ +}; /* interfaces */ @@ -67,6 +68,7 @@ BEGINinterface(tcps_sess) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetHostIP)(tcps_sess_t *pThis, uchar*); rsRetVal (*SetStrm)(tcps_sess_t *pThis, netstrm_t*); rsRetVal (*SetMsgIdx)(tcps_sess_t *pThis, int); + rsRetVal (*SetOnMsgReceive)(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)); ENDinterface(tcps_sess) #define tcps_sessCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */ /* interface changes @@ -380,14 +380,15 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, errno = 0; errmsg.LogError(0, RS_RET_MAX_SESS_REACHED, "too many tcp sessions - dropping incoming request"); ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED); - } else { - /* we found a free spot and can construct our session object */ - CHKiRet(tcps_sess.Construct(&pSess)); - CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis)); - CHKiRet(tcps_sess.SetLstnInfo(pSess, pLstnInfo)); } - /* OK, we have a "good" index... */ + /* we found a free spot and can construct our session object */ + CHKiRet(tcps_sess.Construct(&pSess)); + CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis)); + CHKiRet(tcps_sess.SetLstnInfo(pSess, pLstnInfo)); + if(pThis->OnMsgReceive != NULL) + CHKiRet(tcps_sess.SetOnMsgReceive(pSess, pThis->OnMsgReceive)); + /* get the host name */ CHKiRet(netstrm.GetRemoteHName(pNewStrm, &fromHostFQDN)); CHKiRet(netstrm.GetRemoteIP(pNewStrm, &fromHostIP)); @@ -568,6 +569,7 @@ finalize_it: /* this is a very special case - this time only we do not exit the BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */ pThis->iSessMax = TCPSESS_MAX_DEFAULT; /* TODO: useful default ;) */ pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; + pThis->OnMsgReceive = NULL; ENDobjConstruct(tcpsrv) @@ -713,6 +715,16 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr) RETiRet; } +static rsRetVal +SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)) +{ + DEFiRet; + assert(OnMsgReceive != NULL); + pThis->OnMsgReceive = OnMsgReceive; + RETiRet; +} + + /* Set additional framing to use (if any) -- rgerhards, 2008-12-10 */ static rsRetVal @@ -731,7 +743,6 @@ SetInputName(tcpsrv_t *pThis, uchar *name) { uchar *pszName; DEFiRet; -dbgprintf("XXX: SetInputName: %s\n", name); ISOBJ_TYPE_assert(pThis, tcpsrv); if(name == NULL) pszName = NULL; @@ -843,6 +854,7 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->SetCBOnDestruct = SetCBOnDestruct; pIf->SetCBOnRegularClose = SetCBOnRegularClose; pIf->SetCBOnErrClose = SetCBOnErrClose; + pIf->SetOnMsgReceive = SetOnMsgReceive; finalize_it: ENDobjQueryInterface(tcpsrv) @@ -71,6 +71,7 @@ struct tcpsrv_s { rsRetVal (*pOnSessAccept)(tcpsrv_t *, tcps_sess_t*); rsRetVal (*OnSessConstructFinalize)(void*); rsRetVal (*pOnSessDestruct)(void*); + rsRetVal (*OnMsgReceive)(tcps_sess_t *, uchar *pszMsg, int iLenMsg); /* submit message callback */ }; @@ -104,8 +105,10 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetCBOnSessConstructFinalize)(tcpsrv_t*, rsRetVal (*) (void*)); /* added v5 */ rsRetVal (*SetSessMax)(tcpsrv_t *pThis, int iMaxSess); /* 2009-04-09 */ + /* added v6 */ + rsRetVal (*SetOnMsgReceive)(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)); /* 2009-05-24 */ ENDinterface(tcpsrv) -#define tcpsrvCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */ +#define tcpsrvCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */ /* change for v4: * - SetAddtlFrameDelim() added -- rgerhards, 2008-12-10 * - SetInputName() added -- rgerhards, 2008-12-10 diff --git a/tests/Makefile.am b/tests/Makefile.am index c0e629d3..ed48fce8 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -46,6 +46,7 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ testsuites/1.inputname_imtcp_12515 \ testsuites/1.inputname_imtcp_12516 \ omod-if-array.sh \ + waitqueueempty.sh \ cfg.sh ourtail_SOURCES = ourtail.c diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh index 6384eb64..a91f3414 100755 --- a/tests/diskqueue.sh +++ b/tests/diskqueue.sh @@ -17,7 +17,7 @@ if [ "$?" -ne "0" ]; then echo "error during tcpflood! see rsyslog.out.log.save for what was written" cp rsyslog.out.log rsyslog.out.log.save fi -sleep 4 # we need this so that rsyslogd can receive all outstanding messages +$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work diff --git a/tests/fieldtest.sh b/tests/fieldtest.sh index 7a646f00..482fa143 100755 --- a/tests/fieldtest.sh +++ b/tests/fieldtest.sh @@ -1,5 +1,5 @@ echo test fieldtest via udp -./killrsyslog.sh # kill rsyslogd if it runs for some reason +$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason ./nettester -tfield1 -iudp if [ "$?" -ne "0" ]; then diff --git a/tests/inputname.sh b/tests/inputname.sh index 7d9ea111..e1a58517 100755 --- a/tests/inputname.sh +++ b/tests/inputname.sh @@ -1,5 +1,5 @@ echo testing $InputTCPServerInputName directive -./killrsyslog.sh # kill rsyslogd if it runs for some reason +$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason echo port 12514 ./nettester -tinputname_imtcp_12514 -cinputname_imtcp -itcp -p12514 diff --git a/tests/manytcp.sh b/tests/manytcp.sh index d9b2e9a0..06bd38b6 100755 --- a/tests/manytcp.sh +++ b/tests/manytcp.sh @@ -8,7 +8,7 @@ if [ "$?" -ne "0" ]; then echo "error during tcpflood! see rsyslog.out.log.save for what was written" cp rsyslog.out.log rsyslog.out.log.save fi -sleep 5 # we need this so that rsyslogd can receive all outstanding messages +$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work diff --git a/tests/omod-if-array.sh b/tests/omod-if-array.sh index 7b4b5611..2c2a8ef3 100755 --- a/tests/omod-if-array.sh +++ b/tests/omod-if-array.sh @@ -1,5 +1,5 @@ echo test omod-if-array via udp -./killrsyslog.sh # kill rsyslogd if it runs for some reason +$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason ./nettester -tomod-if-array -iudp -p4711 if [ "$?" -ne "0" ]; then diff --git a/tests/parsertest.sh b/tests/parsertest.sh index 152d8b60..afdb9469 100755 --- a/tests/parsertest.sh +++ b/tests/parsertest.sh @@ -1,5 +1,5 @@ echo test parsertest via udp -./killrsyslog.sh # kill rsyslogd if it runs for some reason +$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason ./nettester -tparse1 -iudp if [ "$?" -ne "0" ]; then diff --git a/tests/testsuites/diskqueue.conf b/tests/testsuites/diskqueue.conf index 8851a459..017ee96d 100644 --- a/tests/testsuites/diskqueue.conf +++ b/tests/testsuites/diskqueue.conf @@ -4,6 +4,9 @@ $ModLoad ../plugins/imtcp/.libs/imtcp $MainMsgQueueTimeoutShutdown 10000 $InputTCPServerRun 13514 +$ModLoad ../plugins/imdiag/.libs/imdiag +$IMDiagServerRun 13500 + $ErrorMessagesToStderr off # set spool locations and switch queue to disk-only mode diff --git a/tests/testsuites/manytcp.conf b/tests/testsuites/manytcp.conf index 8175732e..3867da46 100644 --- a/tests/testsuites/manytcp.conf +++ b/tests/testsuites/manytcp.conf @@ -6,6 +6,9 @@ $MaxOpenFiles 2000 $InputTCPMaxSessions 1100 $InputTCPServerRun 13514 +$ModLoad ../plugins/imdiag/.libs/imdiag +$IMDiagServerRun 13500 + $ErrorMessagesToStderr off $template outfmt,"%msg:F,58:2%\n" diff --git a/tests/waitqueueempty.sh b/tests/waitqueueempty.sh new file mode 100755 index 00000000..2c047588 --- /dev/null +++ b/tests/waitqueueempty.sh @@ -0,0 +1,4 @@ +# wait until main message queue is empty. This is currently done in +# a separate shell script so that we can change the implementation +# at some later point. -- rgerhards, 2009-05-25 +echo WaitMainQueueEmpty | nc 127.0.0.1 13500 diff --git a/tools/syslogd.c b/tools/syslogd.c index dc5b8fee..0b860448 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -350,10 +350,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a bDebugPrintModuleList = 1; bEscapeCCOnRcv = 1; /* default is to escape control characters */ bReduceRepeatMsgs = 0; - if(pszMainMsgQFName != NULL) { - free(pszMainMsgQFName); - pszMainMsgQFName = NULL; - } + free(pszMainMsgQFName); + pszMainMsgQFName = NULL; iMainMsgQueueSize = 10000; iMainMsgQHighWtrMark = 8000; iMainMsgQLowWtrMark = 2000; @@ -412,6 +410,26 @@ static int usage(void) } +/* ------------------------------ some support functions for imdiag ------------------------------ * + * This is a bit dirty, but the only way to do it, at least with reasonable effort. + * rgerhards, 2009-05-25 + */ + +/* return back the approximate current number of messages in the main message queue + */ +rsRetVal +diagGetMainMsgQSize(int *piSize) +{ + DEFiRet; + assert(piSize != NULL); + *piSize = pMsgQueue->iQueueSize; + RETiRet; +} + + +/* ------------------------------ end support functions for imdiag ------------------------------ */ + + /* function to destruct a selector_t object * rgerhards, 2007-08-01 */ @@ -2658,7 +2676,6 @@ init(void) ABORT_FINALIZE(RS_RET_VALIDATION_RUN); /* switch the message object to threaded operation, if necessary */ -/* TODO:XXX: I think we must do this also if we have action queues! -- rgerhards, 2009-01-26 */ if(MainMsgQueType == QUEUETYPE_DIRECT || iMainMsgQueueNumWorkers > 1) { MsgEnableThreadSafety(); } |