summaryrefslogtreecommitdiffstats
path: root/runtime/ruleset.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-15 14:02:34 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-15 14:02:34 +0200
commit802f6d8a8f39e5ba578e0183e4500bef8e3a198c (patch)
tree6ebcee0347ee0f705edd0b1069bec3f49eb36060 /runtime/ruleset.c
parentfe8d317c1b40fe162891d5ddec1cb7df702bb7fe (diff)
downloadrsyslog-802f6d8a8f39e5ba578e0183e4500bef8e3a198c.tar.gz
rsyslog-802f6d8a8f39e5ba578e0183e4500bef8e3a198c.tar.xz
rsyslog-802f6d8a8f39e5ba578e0183e4500bef8e3a198c.zip
milestone(BUGGY): batch now pushed down to action
at least in important cases (not for non-direct action queues and some other minor things). This version is definitely buggy, but may be tried with success on a non-production system. I will continue to work on the correctness, but needed to commit now to get a baseline.
Diffstat (limited to 'runtime/ruleset.c')
-rw-r--r--runtime/ruleset.c75
1 files changed, 68 insertions, 7 deletions
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index caeb9357..31c2e1a7 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -60,6 +60,9 @@ linkedList_t llRulesets; /* this is NOT a pointer - no typo here ;) */
ruleset_t *pCurrRuleset = NULL; /* currently "active" ruleset */
ruleset_t *pDfltRuleset = NULL; /* current default ruleset, e.g. for binding to actions which have no other */
+/* forward definitions */
+static rsRetVal processBatch(batch_t *pBatch);
+
/* ---------- linked-list key handling functions ---------- */
/* destructor for linked list keys.
@@ -149,6 +152,69 @@ dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet);
}
+
+/* This function is similar to processBatch(), but works on a batch that
+ * contains rules from multiple rulesets. In this case, we can not push
+ * the whole batch through the ruleset. Instead, we examine it and
+ * partition it into sub-rulesets which we then push through the system.
+ * Note that when we evaluate which message must be processed, we do NOT need
+ * to look at bFilterOK, because this value is only set in a later processing
+ * stage. Doing so caused a bug during development ;)
+ * rgerhards, 2010-06-15
+ */
+static inline rsRetVal
+processBatchMultiRuleset(batch_t *pBatch)
+{
+ ruleset_t *currRuleset;
+ batch_t snglRuleBatch;
+ int i;
+ int iStart; /* start index of partial batch */
+ int iNew; /* index for new (temporary) batch */
+ DEFiRet;
+
+ CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
+ snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
+
+dbgprintf("ZZZ: multi-ruleset batch of %d elements must be processed\n", pBatch->nElem);
+ while(1) { /* loop broken inside */
+ /* search for first unprocessed element */
+ for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart)
+ /* just search, no action */;
+
+ if(iStart == pBatch->nElem)
+ FINALIZE; /* everything processed */
+
+ /* prepare temporary batch */
+ currRuleset = batchElemGetRuleset(pBatch, iStart);
+ iNew = 0;
+ for(i = iStart ; i < pBatch->nElem ; ++i) {
+ if(batchElemGetRuleset(pBatch, i) == currRuleset) {
+dbgprintf("ZZZ: proc elem %d:'%s'\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15);
+ batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i]));
+ /* We indicate the element also as done, so it will not be processed again */
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ }
+ }
+ snglRuleBatch.nElem = iNew; /* was left just right by the for loop */
+ batchSetSingleRuleset(&snglRuleBatch, 1);
+ /* process temp batch */
+ processBatch(&snglRuleBatch);
+
+#if 0
+for(i = iStart ; i < pBatch->nElem ; ++i) {
+ dbgprintf("ZZZ: after partial execution item %d state %d\n", i, pBatch->pElem[i].state);
+}
+//dbgprintf("ZZZ: search item %d: state %d, bFilterOK %d, IsValid %d, msg:%s\n",
+//iStart, pBatch->pElem[iStart].state, pBatch->pElem[iStart].bFilterOK, batchIsValidElem(pBatch, iStart),
+//((msg_t*)(pBatch->pElem[iStart].pUsrp))->szRawMsg+40);
+#endif
+ }
+ batchFree(&snglRuleBatch);
+
+finalize_it:
+ RETiRet;
+}
+
/* Process (consume) a batch of messages. Calls the actions configured.
* If the whole batch uses a singel ruleset, we can process the batch as
* a whole. Otherwise, we need to process it slower, on a message-by-message
@@ -162,6 +228,7 @@ processBatch(batch_t *pBatch)
DEFiRet;
assert(pBatch != NULL);
+dbgprintf("ZZZ: processBatch: batch of %d elements must be processed\n", pBatch->nElem);
if(pBatch->bSingleRuleset) {
pThis = batchGetRuleset(pBatch);
if(pThis == NULL)
@@ -169,13 +236,7 @@ processBatch(batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, ruleset);
CHKiRet(llExecFunc(&pThis->llRules, processBatchDoRules, pBatch));
} else {
- #warning implementation missing!
- /* we need to split of the batch according to rulesets used */
- // TODO: do this at the deque level, much more performant!
- assert(0); // TODO mandatory to implement!
- dbgprintf("processbatch missing implementation, terminating!\n");
- printf("processBatch missing implementation, terminating!\n");
- exit(0);
+ CHKiRet(processBatchMultiRuleset(pBatch));
}
finalize_it: