summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-09 15:37:00 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-09 15:37:00 +0200
commit8fbcea483710faae468ecf0ba706adc7e60ed41d (patch)
treebf69b277de872fc88a53fd2a43b9b21f48efd90d
parent395660f462c62029f76b99f73bd9a424a8cf73a2 (diff)
downloadrsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.tar.gz
rsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.tar.xz
rsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.zip
main msg q consumer now preprocesses messages before doing rule processing
things like ACL check and message parsing. This leads to a greater level of concurrent processing. Beware, though, that this commit duplicates some messages. May be a regression from this or an earlier commit. I will soon sort out.
-rw-r--r--action.c16
-rw-r--r--doc/msgflow.txt6
-rw-r--r--runtime/rule.c1
-rw-r--r--tools/syslogd.c72
4 files changed, 52 insertions, 43 deletions
diff --git a/action.c b/action.c
index b8751c63..cae86c29 100644
--- a/action.c
+++ b/action.c
@@ -1140,20 +1140,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
}
-/* rgerhards 2004-11-09: fprintlog() is the actual driver for
- * the output channel. It receives the channel description (f) as
- * well as the message and outputs them according to the channel
- * semantics. The message is typically already contained in the
- * channel save buffer (f->f_prevline). This is not only the case
- * when a message was already repeated but also when a new message
- * arrived.
- * rgerhards 2007-08-01: interface changed to use action_t
- * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE
- * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do
- * not do this here. Failing to do so results in all kinds of
- * "interesting" problems!
- * RGERHARDS, 2008-01-29:
- * This is now the action caller and has been renamed.
+/* This function builds up a batch of messages to be (later)
+ * submitted to the action queue.
*/
rsRetVal
actionWriteToAction(action_t *pAction)
diff --git a/doc/msgflow.txt b/doc/msgflow.txt
index c1c440ef..b53ba7e7 100644
--- a/doc/msgflow.txt
+++ b/doc/msgflow.txt
@@ -18,11 +18,11 @@ syslogd.c/msgConsumeOne
parser.ParseMsg
ruleset.ProcessMsg (loops through ruleset)
ruleset.c/processMsgDoRules (for each rule in ruleset)
-rule.c/ProcessMsg
-rule.c/shouldProcessThisMessage
+rule.c/processMsg
+1:rule.c/shouldProcessThisMessage
(evaluates filters, optimize via ALL-Filter)
if to be processed, loop through associated actions ->
-rule.c/processMsgsDoAction
+2:rule.c/processMsgsDoAction
action.c/actionCallAction (LOCKs action object!)
action.c/doActionCallAction (does duplicate message reduction)
action.c/actionWriteToAction
diff --git a/runtime/rule.c b/runtime/rule.c
index 7a26a03a..3b98d7d1 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -166,7 +166,6 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
}
}
-RUNLOG_VAR("%p", pRule->pCSProgNameComp);
if(pRule->pCSProgNameComp != NULL) {
int bInv = 0, bEqv = 0, offset = 0;
if(*(rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp)) == '-') {
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 94e8346d..2c36e6c2 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -627,39 +627,66 @@ chkMsgAgainstACL() {
* (by definition!) considered committed.
* rgerhards, 2009-11-16
*/
+///static inline rsRetVal
+///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
+ ///DEFiRet;
+ //////RETiRet;
+///}
+
+/* preprocess a batch of messages, that is ready them for actual processing. This is done
+ * as a first stage and totally in parallel to any other worker active in the system. So
+ * it helps us keep up the overall concurrency level.
+ * rgerhards, 2010-06-09
+ */
static inline rsRetVal
-msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
+preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
+ prop_t *propFromHost = NULL;
+ prop_t *propFromHostIP = NULL;
int bIsPermitted;
+ msg_t *pMsg;
+ int i;
+ rsRetVal localRet;
DEFiRet;
- if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
- dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
- CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP));
- bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
- (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
- if(!bIsPermitted) {
- DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
- fromHostFQDN);
- ABORT_FINALIZE(RS_RET_ERR);
- /* save some of the info we obtained */
- MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost);
- CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP));
- pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
+ for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
+ DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
+ if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK)
+ continue;
+ bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
+ (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
+ if(!bIsPermitted) {
+ DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
+ fromHostFQDN);
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ } else {
+ /* save some of the info we obtained */
+ MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
+ CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
+ pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
+ }
+ }
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ }
}
}
- if((pMsg->msgFlags & NEEDS_PARSING) != 0)
- CHKiRet(parser.ParseMsg(pMsg));
- ruleset.ProcessMsg(pMsg);
finalize_it:
+ if(propFromHost != NULL)
+ prop.Destruct(&propFromHost);
+ if(propFromHostIP != NULL)
+ prop.Destruct(&propFromHostIP);
RETiRet;
}
-
/* The consumer of dequeued messages. This function is called by the
* queue engine on dequeueing of a message. It runs on a SEPARATE
* THREAD. It receives an array of pointers, which it must iterate
@@ -670,22 +697,17 @@ static rsRetVal
msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
{
int i;
- prop_t *propFromHost = NULL;
- prop_t *propFromHostIP = NULL;
DEFiRet;
assert(pBatch != NULL);
+ preprocessBatch(pBatch, pbShutdownImmediate);
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem);
- msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP);
+ ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp);
pBatch->pElem[i].state = BATCH_STATE_COMM;
}
- if(propFromHost != NULL)
- prop.Destruct(&propFromHost);
- if(propFromHostIP != NULL)
- prop.Destruct(&propFromHostIP);
RETiRet;
}