diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-10 14:36:49 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-10 14:36:49 +0200 |
commit | fe8d317c1b40fe162891d5ddec1cb7df702bb7fe (patch) | |
tree | f001cb3ebd5cc81c14cbec090ca204df42763c7e /runtime/rule.c | |
parent | 74135da95971c8d03e4e45f7d3f703e1d40f76f4 (diff) | |
download | rsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.tar.gz rsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.tar.xz rsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.zip |
milestone commit(BUGGY): batch is now handed down to rule processing
Now, the full batch is passed down to the rule, which then enqueues
the elements as single messages. Note that this code has some known
defects and needs more changes until it is correct again. This is
primarily a commit to be able to return to a known-(somewhat)-good
state.
Diffstat (limited to 'runtime/rule.c')
-rw-r--r-- | runtime/rule.c | 48 |
1 files changed, 28 insertions, 20 deletions
diff --git a/runtime/rule.c b/runtime/rule.c index 3b98d7d1..c28e15c9 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -39,6 +39,7 @@ #include "vm.h" #include "var.h" #include "srUtils.h" +#include "batch.h" #include "unicode-helper.h" /* static data */ @@ -87,30 +88,38 @@ iterateAllActions(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void* pParam) } - /* helper to processMsg(), used to call the configured actions. It is * executed from within llExecFunc() of the action list. * rgerhards, 2007-08-02 */ -typedef struct processMsgDoActions_s { - int bPrevWasSuspended; /* was the previous action suspended? */ - msg_t *pMsg; -} processMsgDoActions_t; -DEFFUNC_llExecFunc(processMsgDoActions) +DEFFUNC_llExecFunc(processBatchDoActions) { DEFiRet; rsRetVal iRetMod; /* return value of module - we do not always pass that back */ action_t *pAction = (action_t*) pData; - processMsgDoActions_t *pDoActData = (processMsgDoActions_t*) pParam; + batch_t *pBatch = (batch_t*) pParam; assert(pAction != NULL); +#if 0 // TODO: move this to the action object if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) { dbgprintf("not calling action because the previous one is not suspended\n"); ABORT_FINALIZE(RS_RET_OK); } +#endif - iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg); + // NEW (potentially): iRetMod = actionSubmit(pAction, (batch_t*) pParam); + // old code -- milestone check +dbgprintf("ZZZ: inside processBatchDoActions, begin processing (nElem=%d)\n", batchNumMsgs(pBatch)); + int i; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { +dbgprintf("ZZZ: inside processBatchDoActions, processind elem %d/%d\n", i, batchNumMsgs(pBatch)); + if(pBatch->pElem[i].bFilterOK) { + iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + //end old code +#if 0 // TODO: this must be done inside the action as well! if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); } else if(iRetMod == RS_RET_SUSPENDED) { @@ -119,8 +128,8 @@ DEFFUNC_llExecFunc(processMsgDoActions) } else { pDoActData->bPrevWasSuspended = 0; } +#endif -finalize_it: RETiRet; } @@ -129,7 +138,7 @@ finalize_it: * provided filter condition. */ static rsRetVal -shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) +shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) { DEFiRet; unsigned short pbMustBeFreed; @@ -278,26 +287,25 @@ finalize_it: -/* Process (consume) a received message. Calls the actions configured. +/* Process (consume) a batch of messages. Calls the actions configured. * rgerhards, 2005-10-13 */ static rsRetVal -processMsg(rule_t *pThis, msg_t *pMsg) +processBatch(rule_t *pThis, batch_t *pBatch) { - int bProcessMsg; - processMsgDoActions_t DoActData; + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, rule); assert(pMsg != NULL); /* first check the filters... */ - CHKiRet(shouldProcessThisMessage(pThis, pMsg, &bProcessMsg)); - if(bProcessMsg) { - DoActData.pMsg = pMsg; - DoActData.bPrevWasSuspended = 0; - CHKiRet(llExecFunc(&pThis->llActList, processMsgDoActions, (void*)&DoActData)); + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp), + &(pBatch->pElem[i].bFilterOK))); + // TODO: really abort on error? 2010-06-10 } + CHKiRet(llExecFunc(&pThis->llActList, processBatchDoActions, pBatch)); finalize_it: RETiRet; @@ -440,7 +448,7 @@ CODESTARTobjQueryInterface(rule) pIf->DebugPrint = ruleDebugPrint; pIf->IterateAllActions = iterateAllActions; - pIf->ProcessMsg = processMsg; + pIf->ProcessBatch = processBatch; pIf->SetAssRuleset = setAssRuleset; pIf->GetAssRuleset = getAssRuleset; finalize_it: |