diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-09 15:37:00 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-09 15:37:00 +0200 |
commit | 8fbcea483710faae468ecf0ba706adc7e60ed41d (patch) | |
tree | bf69b277de872fc88a53fd2a43b9b21f48efd90d | |
parent | 395660f462c62029f76b99f73bd9a424a8cf73a2 (diff) | |
download | rsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.tar.gz rsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.tar.xz rsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.zip |
main msg q consumer now preprocesses messages before doing rule processing
things like ACL check and message parsing. This leads to a greater level
of concurrent processing. Beware, though, that this commit duplicates
some messages. May be a regression from this or an earlier commit. I will
soon sort out.
-rw-r--r-- | action.c | 16 | ||||
-rw-r--r-- | doc/msgflow.txt | 6 | ||||
-rw-r--r-- | runtime/rule.c | 1 | ||||
-rw-r--r-- | tools/syslogd.c | 72 |
4 files changed, 52 insertions, 43 deletions
@@ -1140,20 +1140,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } -/* rgerhards 2004-11-09: fprintlog() is the actual driver for - * the output channel. It receives the channel description (f) as - * well as the message and outputs them according to the channel - * semantics. The message is typically already contained in the - * channel save buffer (f->f_prevline). This is not only the case - * when a message was already repeated but also when a new message - * arrived. - * rgerhards 2007-08-01: interface changed to use action_t - * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE - * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do - * not do this here. Failing to do so results in all kinds of - * "interesting" problems! - * RGERHARDS, 2008-01-29: - * This is now the action caller and has been renamed. +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. */ rsRetVal actionWriteToAction(action_t *pAction) diff --git a/doc/msgflow.txt b/doc/msgflow.txt index c1c440ef..b53ba7e7 100644 --- a/doc/msgflow.txt +++ b/doc/msgflow.txt @@ -18,11 +18,11 @@ syslogd.c/msgConsumeOne parser.ParseMsg ruleset.ProcessMsg (loops through ruleset) ruleset.c/processMsgDoRules (for each rule in ruleset) -rule.c/ProcessMsg -rule.c/shouldProcessThisMessage +rule.c/processMsg +1:rule.c/shouldProcessThisMessage (evaluates filters, optimize via ALL-Filter) if to be processed, loop through associated actions -> -rule.c/processMsgsDoAction +2:rule.c/processMsgsDoAction action.c/actionCallAction (LOCKs action object!) action.c/doActionCallAction (does duplicate message reduction) action.c/actionWriteToAction diff --git a/runtime/rule.c b/runtime/rule.c index 7a26a03a..3b98d7d1 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -166,7 +166,6 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) } } -RUNLOG_VAR("%p", pRule->pCSProgNameComp); if(pRule->pCSProgNameComp != NULL) { int bInv = 0, bEqv = 0, offset = 0; if(*(rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp)) == '-') { diff --git a/tools/syslogd.c b/tools/syslogd.c index 94e8346d..2c36e6c2 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -627,39 +627,66 @@ chkMsgAgainstACL() { * (by definition!) considered committed. * rgerhards, 2009-11-16 */ +///static inline rsRetVal +///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { + ///DEFiRet; + //////RETiRet; +///} + +/* preprocess a batch of messages, that is ready them for actual processing. This is done + * as a first stage and totally in parallel to any other worker active in the system. So + * it helps us keep up the overall concurrency level. + * rgerhards, 2010-06-09 + */ static inline rsRetVal -msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { +preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) { uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; + prop_t *propFromHost = NULL; + prop_t *propFromHostIP = NULL; int bIsPermitted; + msg_t *pMsg; + int i; + rsRetVal localRet; DEFiRet; - if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { - dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); - CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP)); - bIsPermitted = net.isAllowedSender2((uchar*)"UDP", - (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1); - if(!bIsPermitted) { - DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", - fromHostFQDN); - ABORT_FINALIZE(RS_RET_ERR); - /* save some of the info we obtained */ - MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost); - CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP)); - pMsg->msgFlags &= ~NEEDS_ACLCHK_U; + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { + DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); + if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK) + continue; + bIsPermitted = net.isAllowedSender2((uchar*)"UDP", + (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1); + if(!bIsPermitted) { + DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", + fromHostFQDN); + pBatch->pElem[i].state = BATCH_STATE_DISC; + } else { + /* save some of the info we obtained */ + MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); + CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); + pMsg->msgFlags &= ~NEEDS_ACLCHK_U; + } + } + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + pBatch->pElem[i].state = BATCH_STATE_DISC; + } } } - if((pMsg->msgFlags & NEEDS_PARSING) != 0) - CHKiRet(parser.ParseMsg(pMsg)); - ruleset.ProcessMsg(pMsg); finalize_it: + if(propFromHost != NULL) + prop.Destruct(&propFromHost); + if(propFromHostIP != NULL) + prop.Destruct(&propFromHostIP); RETiRet; } - /* The consumer of dequeued messages. This function is called by the * queue engine on dequeueing of a message. It runs on a SEPARATE * THREAD. It receives an array of pointers, which it must iterate @@ -670,22 +697,17 @@ static rsRetVal msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate) { int i; - prop_t *propFromHost = NULL; - prop_t *propFromHostIP = NULL; DEFiRet; assert(pBatch != NULL); + preprocessBatch(pBatch, pbShutdownImmediate); for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem); - msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP); + ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp); pBatch->pElem[i].state = BATCH_STATE_COMM; } - if(propFromHost != NULL) - prop.Destruct(&propFromHost); - if(propFromHostIP != NULL) - prop.Destruct(&propFromHostIP); RETiRet; } |