summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-10 10:18:59 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-10 10:18:59 +0200
commitd630bc742f2f0b6a29c745bba743ecb8a03033c6 (patch)
tree4c8a70641fb757808c2fd0cb5a0c257cb2c0eeb6 /tools
parent559cb84a79a9848ce1415569158928478991108c (diff)
parent8fbcea483710faae468ecf0ba706adc7e60ed41d (diff)
downloadrsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.tar.gz
rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.tar.xz
rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.zip
Merge branch 'concurrent-output' into tmp
Diffstat (limited to 'tools')
-rw-r--r--tools/syslogd.c74
1 files changed, 48 insertions, 26 deletions
diff --git a/tools/syslogd.c b/tools/syslogd.c
index dfbd184b..2c36e6c2 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -627,39 +627,66 @@ chkMsgAgainstACL() {
* (by definition!) considered committed.
* rgerhards, 2009-11-16
*/
+///static inline rsRetVal
+///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
+ ///DEFiRet;
+ //////RETiRet;
+///}
+
+/* preprocess a batch of messages, that is ready them for actual processing. This is done
+ * as a first stage and totally in parallel to any other worker active in the system. So
+ * it helps us keep up the overall concurrency level.
+ * rgerhards, 2010-06-09
+ */
static inline rsRetVal
-msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
+preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
+ prop_t *propFromHost = NULL;
+ prop_t *propFromHostIP = NULL;
int bIsPermitted;
+ msg_t *pMsg;
+ int i;
+ rsRetVal localRet;
DEFiRet;
- if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
- dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
- CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP));
- bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
- (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
- if(!bIsPermitted) {
- DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
- fromHostFQDN);
- ABORT_FINALIZE(RS_RET_ERR);
- /* save some of the info we obtained */
- MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost);
- CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP));
- pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
+ for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
+ DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
+ if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK)
+ continue;
+ bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
+ (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
+ if(!bIsPermitted) {
+ DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
+ fromHostFQDN);
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ } else {
+ /* save some of the info we obtained */
+ MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
+ CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
+ pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
+ }
+ }
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ }
}
}
- if((pMsg->msgFlags & NEEDS_PARSING) != 0)
- CHKiRet(parser.ParseMsg(pMsg));
- ruleset.ProcessMsg(pMsg);
finalize_it:
+ if(propFromHost != NULL)
+ prop.Destruct(&propFromHost);
+ if(propFromHostIP != NULL)
+ prop.Destruct(&propFromHostIP);
RETiRet;
}
-
/* The consumer of dequeued messages. This function is called by the
* queue engine on dequeueing of a message. It runs on a SEPARATE
* THREAD. It receives an array of pointers, which it must iterate
@@ -670,22 +697,17 @@ static rsRetVal
msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
{
int i;
- prop_t *propFromHost = NULL;
- prop_t *propFromHostIP = NULL;
DEFiRet;
assert(pBatch != NULL);
+ preprocessBatch(pBatch, pbShutdownImmediate);
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem);
- msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP);
+ ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp);
pBatch->pElem[i].state = BATCH_STATE_COMM;
}
- if(propFromHost != NULL)
- prop.Destruct(&propFromHost);
- if(propFromHostIP != NULL)
- prop.Destruct(&propFromHostIP);
RETiRet;
}
@@ -735,7 +757,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
pRuleset = MsgGetRuleset(pMultiSub->ppMsgs[0]);
pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset);
- iRet = qqueueMultiEnqObj(pQueue, pMultiSub);
+ iRet = pQueue->MultiEnq(pQueue, pMultiSub);
pMultiSub->nElem = 0;
finalize_it: