diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-15 14:02:34 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-15 14:02:34 +0200 |
commit | 802f6d8a8f39e5ba578e0183e4500bef8e3a198c (patch) | |
tree | 6ebcee0347ee0f705edd0b1069bec3f49eb36060 /runtime/ruleset.c | |
parent | fe8d317c1b40fe162891d5ddec1cb7df702bb7fe (diff) | |
download | rsyslog-802f6d8a8f39e5ba578e0183e4500bef8e3a198c.tar.gz rsyslog-802f6d8a8f39e5ba578e0183e4500bef8e3a198c.tar.xz rsyslog-802f6d8a8f39e5ba578e0183e4500bef8e3a198c.zip |
milestone(BUGGY): batch now pushed down to action
at least in important cases (not for non-direct action queues and some
other minor things). This version is definitely buggy, but may be tried
with success on a non-production system. I will continue to work on the
correctness, but needed to commit now to get a baseline.
Diffstat (limited to 'runtime/ruleset.c')
-rw-r--r-- | runtime/ruleset.c | 75 |
1 files changed, 68 insertions, 7 deletions
diff --git a/runtime/ruleset.c b/runtime/ruleset.c index caeb9357..31c2e1a7 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -60,6 +60,9 @@ linkedList_t llRulesets; /* this is NOT a pointer - no typo here ;) */ ruleset_t *pCurrRuleset = NULL; /* currently "active" ruleset */ ruleset_t *pDfltRuleset = NULL; /* current default ruleset, e.g. for binding to actions which have no other */ +/* forward definitions */ +static rsRetVal processBatch(batch_t *pBatch); + /* ---------- linked-list key handling functions ---------- */ /* destructor for linked list keys. @@ -149,6 +152,69 @@ dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet); } + +/* This function is similar to processBatch(), but works on a batch that + * contains rules from multiple rulesets. In this case, we can not push + * the whole batch through the ruleset. Instead, we examine it and + * partition it into sub-rulesets which we then push through the system. + * Note that when we evaluate which message must be processed, we do NOT need + * to look at bFilterOK, because this value is only set in a later processing + * stage. Doing so caused a bug during development ;) + * rgerhards, 2010-06-15 + */ +static inline rsRetVal +processBatchMultiRuleset(batch_t *pBatch) +{ + ruleset_t *currRuleset; + batch_t snglRuleBatch; + int i; + int iStart; /* start index of partial batch */ + int iNew; /* index for new (temporary) batch */ + DEFiRet; + + CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem)); + snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate; + +dbgprintf("ZZZ: multi-ruleset batch of %d elements must be processed\n", pBatch->nElem); + while(1) { /* loop broken inside */ + /* search for first unprocessed element */ + for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart) + /* just search, no action */; + + if(iStart == pBatch->nElem) + FINALIZE; /* everything processed */ + + /* prepare temporary batch */ + currRuleset = batchElemGetRuleset(pBatch, iStart); + iNew = 0; + for(i = iStart ; i < pBatch->nElem ; ++i) { + if(batchElemGetRuleset(pBatch, i) == currRuleset) { +dbgprintf("ZZZ: proc elem %d:'%s'\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15); + batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i])); + /* We indicate the element also as done, so it will not be processed again */ + pBatch->pElem[i].state = BATCH_STATE_DISC; + } + } + snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ + batchSetSingleRuleset(&snglRuleBatch, 1); + /* process temp batch */ + processBatch(&snglRuleBatch); + +#if 0 +for(i = iStart ; i < pBatch->nElem ; ++i) { + dbgprintf("ZZZ: after partial execution item %d state %d\n", i, pBatch->pElem[i].state); +} +//dbgprintf("ZZZ: search item %d: state %d, bFilterOK %d, IsValid %d, msg:%s\n", +//iStart, pBatch->pElem[iStart].state, pBatch->pElem[iStart].bFilterOK, batchIsValidElem(pBatch, iStart), +//((msg_t*)(pBatch->pElem[iStart].pUsrp))->szRawMsg+40); +#endif + } + batchFree(&snglRuleBatch); + +finalize_it: + RETiRet; +} + /* Process (consume) a batch of messages. Calls the actions configured. * If the whole batch uses a singel ruleset, we can process the batch as * a whole. Otherwise, we need to process it slower, on a message-by-message @@ -162,6 +228,7 @@ processBatch(batch_t *pBatch) DEFiRet; assert(pBatch != NULL); +dbgprintf("ZZZ: processBatch: batch of %d elements must be processed\n", pBatch->nElem); if(pBatch->bSingleRuleset) { pThis = batchGetRuleset(pBatch); if(pThis == NULL) @@ -169,13 +236,7 @@ processBatch(batch_t *pBatch) ISOBJ_TYPE_assert(pThis, ruleset); CHKiRet(llExecFunc(&pThis->llRules, processBatchDoRules, pBatch)); } else { - #warning implementation missing! - /* we need to split of the batch according to rulesets used */ - // TODO: do this at the deque level, much more performant! - assert(0); // TODO mandatory to implement! - dbgprintf("processbatch missing implementation, terminating!\n"); - printf("processBatch missing implementation, terminating!\n"); - exit(0); + CHKiRet(processBatchMultiRuleset(pBatch)); } finalize_it: |