summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-09 15:37:00 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-09 15:37:00 +0200
commit8fbcea483710faae468ecf0ba706adc7e60ed41d (patch)
treebf69b277de872fc88a53fd2a43b9b21f48efd90d /tools
parent395660f462c62029f76b99f73bd9a424a8cf73a2 (diff)
downloadrsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.tar.gz
rsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.tar.xz
rsyslog-8fbcea483710faae468ecf0ba706adc7e60ed41d.zip
main msg q consumer now preprocesses messages before doing rule processing
things like ACL check and message parsing. This leads to a greater level of concurrent processing. Beware, though, that this commit duplicates some messages. May be a regression from this or an earlier commit. I will soon sort out.
Diffstat (limited to 'tools')
-rw-r--r--tools/syslogd.c72
1 files changed, 47 insertions, 25 deletions
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 94e8346d..2c36e6c2 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -627,39 +627,66 @@ chkMsgAgainstACL() {
* (by definition!) considered committed.
* rgerhards, 2009-11-16
*/
+///static inline rsRetVal
+///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
+ ///DEFiRet;
+ //////RETiRet;
+///}
+
+/* preprocess a batch of messages, that is ready them for actual processing. This is done
+ * as a first stage and totally in parallel to any other worker active in the system. So
+ * it helps us keep up the overall concurrency level.
+ * rgerhards, 2010-06-09
+ */
static inline rsRetVal
-msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
+preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
+ prop_t *propFromHost = NULL;
+ prop_t *propFromHostIP = NULL;
int bIsPermitted;
+ msg_t *pMsg;
+ int i;
+ rsRetVal localRet;
DEFiRet;
- if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
- dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
- CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP));
- bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
- (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
- if(!bIsPermitted) {
- DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
- fromHostFQDN);
- ABORT_FINALIZE(RS_RET_ERR);
- /* save some of the info we obtained */
- MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost);
- CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP));
- pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
+ for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
+ DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
+ if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK)
+ continue;
+ bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
+ (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
+ if(!bIsPermitted) {
+ DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
+ fromHostFQDN);
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ } else {
+ /* save some of the info we obtained */
+ MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
+ CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
+ pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
+ }
+ }
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ }
}
}
- if((pMsg->msgFlags & NEEDS_PARSING) != 0)
- CHKiRet(parser.ParseMsg(pMsg));
- ruleset.ProcessMsg(pMsg);
finalize_it:
+ if(propFromHost != NULL)
+ prop.Destruct(&propFromHost);
+ if(propFromHostIP != NULL)
+ prop.Destruct(&propFromHostIP);
RETiRet;
}
-
/* The consumer of dequeued messages. This function is called by the
* queue engine on dequeueing of a message. It runs on a SEPARATE
* THREAD. It receives an array of pointers, which it must iterate
@@ -670,22 +697,17 @@ static rsRetVal
msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
{
int i;
- prop_t *propFromHost = NULL;
- prop_t *propFromHostIP = NULL;
DEFiRet;
assert(pBatch != NULL);
+ preprocessBatch(pBatch, pbShutdownImmediate);
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem);
- msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP);
+ ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp);
pBatch->pElem[i].state = BATCH_STATE_COMM;
}
- if(propFromHost != NULL)
- prop.Destruct(&propFromHost);
- if(propFromHostIP != NULL)
- prop.Destruct(&propFromHostIP);
RETiRet;
}