summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-04-23 13:39:11 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-04-23 13:39:11 +0200
commit42c3dcfc1ca71814e62763338a24eae8c8463069 (patch)
tree7df7cabd3cc4f13e1f8aac5debb4379adccdde4c
parent925a5e34718cc842cb7e7ea6a5c8e8e8fbeb25dd (diff)
downloadrsyslog-42c3dcfc1ca71814e62763338a24eae8c8463069.tar.gz
rsyslog-42c3dcfc1ca71814e62763338a24eae8c8463069.tar.xz
rsyslog-42c3dcfc1ca71814e62763338a24eae8c8463069.zip
performance enhancement: imtcp calls parser no longer on input thread
but rather inside on of the potentially many main msg queue worker threads (an enhancement scheduled for all input plugins where this is possible)
-rw-r--r--ChangeLog4
-rw-r--r--tcps_sess.c66
2 files changed, 58 insertions, 12 deletions
diff --git a/ChangeLog b/ChangeLog
index 1ad99be1..98f59e9e 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -6,6 +6,10 @@ Version 4.3.1 [DEVEL] (rgerhards), 2009-04-??
* added tests for queue disk-only mode (checks disk queue logic)
- bugfix: light and full delay watermarks had invalid values, badly
affecting performance for delayable inputs
+- performance enhancemnt: imtcp calls parser no longer on input thread
+ but rather inside on of the potentially many main msg queue worker
+ threads (an enhancement scheduled for all input plugins where this is
+ possible)
---------------------------------------------------------------------------
Version 4.3.0 [DEVEL] (rgerhards), 2009-04-17
- new feature: new output plugin omprog, which permits to start program
diff --git a/tcps_sess.c b/tcps_sess.c
index d0edc018..2a3ec0df 100644
--- a/tcps_sess.c
+++ b/tcps_sess.c
@@ -43,6 +43,7 @@
#include "errmsg.h"
#include "netstrm.h"
#include "msg.h"
+#include "datetime.h"
/* static data */
@@ -50,6 +51,7 @@ DEFobjStaticHelpers
DEFobjCurrIf(glbl)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(netstrm)
+DEFobjCurrIf(datetime)
static int iMaxLine; /* maximum size of a single message */
@@ -191,6 +193,52 @@ SetUsrP(tcps_sess_t *pThis, void *pUsr)
}
+/* This is a helper for submitting the message to the rsyslog core.
+ * It does some common processing, including resetting the various
+ * state variables to a "processed" state.
+ * Note that this function is also called if we had a buffer overflow
+ * due to a too-long message. So far, there is no indication this
+ * happened and it may be worth thinking about different handling
+ * of this case (what obviously would require a change to this
+ * function or some related code).
+ * rgerhards, 2009-04-23
+ */
+static rsRetVal
+doSubmitMessage(tcps_sess_t *pThis)
+{
+ msg_t *pMsg;
+ struct syslogTime stTime;
+ time_t ttGenTime;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcps_sess);
+
+ //TODO: if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
+ datetime.getCurrTime(&stTime, &ttGenTime);
+ //}
+ /* we now create our own message object and submit it to the queue */
+ CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
+ /* first trim the buffer to what we have actually received */
+ CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg));
+ memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg);
+ pMsg->iLenRawMsg = pThis->iMsg;
+ MsgSetInputName(pMsg, (char*)pThis->pSrv->pszInputName);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ pMsg->bParseHOSTNAME = 1;
+ MsgSetRcvFrom(pMsg, (char*)pThis->fromHost);
+ CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP));
+ CHKiRet(submitMsg(pMsg));
+
+finalize_it:
+ /* reset status variables */
+ pThis->bAtStrtOfFram = 1;
+ pThis->iMsg = 0;
+
+ RETiRet;
+}
+
+
/* This should be called before a normal (non forced) close
* of a TCP session. This function checks if there is any unprocessed
* message left in the TCP stream. Such a message is probably a
@@ -230,9 +278,7 @@ PrepareClose(tcps_sess_t *pThis)
* this case.
*/
dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n");
- parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg,
- PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0);
- pThis->bAtStrtOfFram = 1;
+ doSubmitMessage(pThis);
}
finalize_it:
@@ -313,9 +359,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(pThis->iMsg >= iMaxLine) {
/* emergency, we now need to flush, no matter if we are at end of message or not... */
dbgprintf("error: message received is larger than max msg size, we split it\n");
- parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg,
- PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0);
- pThis->iMsg = 0;
+ doSubmitMessage(pThis);
/* we might think if it is better to ignore the rest of the
* message than to treat it as a new one. Maybe this is a good
* candidate for a configuration parameter...
@@ -326,9 +370,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(( (c == '\n')
|| ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim))
) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */
- parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg,
- PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0);
- pThis->iMsg = 0;
+ doSubmitMessage(pThis);
pThis->inputState = eAtStrtFram;
} else {
/* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes!
@@ -345,9 +387,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
pThis->iOctetsRemain--;
if(pThis->iOctetsRemain < 1) {
/* we have end of frame! */
- parseAndSubmitMessage(pThis->fromHost, pThis->fromHostIP, pThis->pMsg, pThis->iMsg,
- PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pThis->pSrv->pszInputName, NULL, 0);
- pThis->iMsg = 0;
+ doSubmitMessage(pThis);
pThis->inputState = eAtStrtFram;
}
}
@@ -432,6 +472,7 @@ CODESTARTObjClassExit(tcps_sess)
/* release objects we no longer need */
objRelease(errmsg, CORE_COMPONENT);
objRelease(netstrm, LM_NETSTRMS_FILENAME);
+ objRelease(datetime, CORE_COMPONENT);
ENDObjClassExit(tcps_sess)
@@ -443,6 +484,7 @@ BEGINObjClassInit(tcps_sess, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE c
/* request objects we use */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(netstrm, LM_NETSTRMS_FILENAME));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */