diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-10 10:18:59 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-10 10:18:59 +0200 |
commit | d630bc742f2f0b6a29c745bba743ecb8a03033c6 (patch) | |
tree | 4c8a70641fb757808c2fd0cb5a0c257cb2c0eeb6 /tools | |
parent | 559cb84a79a9848ce1415569158928478991108c (diff) | |
parent | 8fbcea483710faae468ecf0ba706adc7e60ed41d (diff) | |
download | rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.tar.gz rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.tar.xz rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.zip |
Merge branch 'concurrent-output' into tmp
Diffstat (limited to 'tools')
-rw-r--r-- | tools/syslogd.c | 74 |
1 files changed, 48 insertions, 26 deletions
diff --git a/tools/syslogd.c b/tools/syslogd.c index dfbd184b..2c36e6c2 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -627,39 +627,66 @@ chkMsgAgainstACL() { * (by definition!) considered committed. * rgerhards, 2009-11-16 */ +///static inline rsRetVal +///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { + ///DEFiRet; + //////RETiRet; +///} + +/* preprocess a batch of messages, that is ready them for actual processing. This is done + * as a first stage and totally in parallel to any other worker active in the system. So + * it helps us keep up the overall concurrency level. + * rgerhards, 2010-06-09 + */ static inline rsRetVal -msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { +preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) { uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; + prop_t *propFromHost = NULL; + prop_t *propFromHostIP = NULL; int bIsPermitted; + msg_t *pMsg; + int i; + rsRetVal localRet; DEFiRet; - if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { - dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); - CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP)); - bIsPermitted = net.isAllowedSender2((uchar*)"UDP", - (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1); - if(!bIsPermitted) { - DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", - fromHostFQDN); - ABORT_FINALIZE(RS_RET_ERR); - /* save some of the info we obtained */ - MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost); - CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP)); - pMsg->msgFlags &= ~NEEDS_ACLCHK_U; + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { + DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); + if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK) + continue; + bIsPermitted = net.isAllowedSender2((uchar*)"UDP", + (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1); + if(!bIsPermitted) { + DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", + fromHostFQDN); + pBatch->pElem[i].state = BATCH_STATE_DISC; + } else { + /* save some of the info we obtained */ + MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); + CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); + pMsg->msgFlags &= ~NEEDS_ACLCHK_U; + } + } + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + pBatch->pElem[i].state = BATCH_STATE_DISC; + } } } - if((pMsg->msgFlags & NEEDS_PARSING) != 0) - CHKiRet(parser.ParseMsg(pMsg)); - ruleset.ProcessMsg(pMsg); finalize_it: + if(propFromHost != NULL) + prop.Destruct(&propFromHost); + if(propFromHostIP != NULL) + prop.Destruct(&propFromHostIP); RETiRet; } - /* The consumer of dequeued messages. This function is called by the * queue engine on dequeueing of a message. It runs on a SEPARATE * THREAD. It receives an array of pointers, which it must iterate @@ -670,22 +697,17 @@ static rsRetVal msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate) { int i; - prop_t *propFromHost = NULL; - prop_t *propFromHostIP = NULL; DEFiRet; assert(pBatch != NULL); + preprocessBatch(pBatch, pbShutdownImmediate); for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem); - msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP); + ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp); pBatch->pElem[i].state = BATCH_STATE_COMM; } - if(propFromHost != NULL) - prop.Destruct(&propFromHost); - if(propFromHostIP != NULL) - prop.Destruct(&propFromHostIP); RETiRet; } @@ -735,7 +757,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub) pRuleset = MsgGetRuleset(pMultiSub->ppMsgs[0]); pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset); - iRet = qqueueMultiEnqObj(pQueue, pMultiSub); + iRet = pQueue->MultiEnq(pQueue, pMultiSub); pMultiSub->nElem = 0; finalize_it: |