diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/batch.h | 28 | ||||
-rw-r--r-- | runtime/rule.c | 48 | ||||
-rw-r--r-- | runtime/rule.h | 5 | ||||
-rw-r--r-- | runtime/ruleset.c | 39 | ||||
-rw-r--r-- | runtime/ruleset.h | 4 |
5 files changed, 88 insertions, 36 deletions
diff --git a/runtime/batch.h b/runtime/batch.h index ec257125..1245df11 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -26,6 +26,8 @@ #ifndef BATCH_H_INCLUDED #define BATCH_H_INCLUDED +#include "msg.h" + /* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used * for action processing. So far, this seems acceptable, the status is simply ignored inside the * main message queue. But over time, it could potentially be useful to split the two. @@ -45,12 +47,18 @@ typedef enum { struct batch_obj_s { obj_t *pUsrp; /* pointer to user object (most often message) */ batch_state_t state; /* associated state */ + /* work variables for action processing; these are reused for each action (or block of + * actions) + */ + sbool bFilterOK; /* work area for filter processing (per action, reused!) */ + sbool bPrevWasSuspended; void *pActParams; /* parameters to be passed to action */ size_t *pLenParams; /* length of the parameter in question */ void *staticActParams[CONF_OMOD_NUMSTRINGS_BUFSIZE]; /* a cache to save malloc(), if not absolutely necessary */ size_t staticLenParams[CONF_OMOD_NUMSTRINGS_BUFSIZE]; /* and the same for the message length (if used) */ + /* end action work variables */ }; /* the batch @@ -72,7 +80,27 @@ struct batch_s { int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */ int iDoneUpTo; /* all messages below this index have state other than RDY */ qDeqID deqID; /* ID of dequeue operation that generated this batch */ + int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ + sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */ batch_obj_t *pElem; /* batch elements */ }; + +/* some inline functions (we may move this off to an object .. or not) */ +static inline void +batchSetSingleRuleset(batch_t *pBatch, sbool val) { + pBatch->bSingleRuleset = val; +} + +/* get the batches ruleset */ +static inline ruleset_t* +batchGetRuleset(batch_t *pBatch) { + return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL; +} + +/* get number of msgs for this batch */ +static inline int +batchNumMsgs(batch_t *pBatch) { + return pBatch->nElem; +} #endif /* #ifndef BATCH_H_INCLUDED */ diff --git a/runtime/rule.c b/runtime/rule.c index 3b98d7d1..c28e15c9 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -39,6 +39,7 @@ #include "vm.h" #include "var.h" #include "srUtils.h" +#include "batch.h" #include "unicode-helper.h" /* static data */ @@ -87,30 +88,38 @@ iterateAllActions(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void* pParam) } - /* helper to processMsg(), used to call the configured actions. It is * executed from within llExecFunc() of the action list. * rgerhards, 2007-08-02 */ -typedef struct processMsgDoActions_s { - int bPrevWasSuspended; /* was the previous action suspended? */ - msg_t *pMsg; -} processMsgDoActions_t; -DEFFUNC_llExecFunc(processMsgDoActions) +DEFFUNC_llExecFunc(processBatchDoActions) { DEFiRet; rsRetVal iRetMod; /* return value of module - we do not always pass that back */ action_t *pAction = (action_t*) pData; - processMsgDoActions_t *pDoActData = (processMsgDoActions_t*) pParam; + batch_t *pBatch = (batch_t*) pParam; assert(pAction != NULL); +#if 0 // TODO: move this to the action object if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) { dbgprintf("not calling action because the previous one is not suspended\n"); ABORT_FINALIZE(RS_RET_OK); } +#endif - iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg); + // NEW (potentially): iRetMod = actionSubmit(pAction, (batch_t*) pParam); + // old code -- milestone check +dbgprintf("ZZZ: inside processBatchDoActions, begin processing (nElem=%d)\n", batchNumMsgs(pBatch)); + int i; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { +dbgprintf("ZZZ: inside processBatchDoActions, processind elem %d/%d\n", i, batchNumMsgs(pBatch)); + if(pBatch->pElem[i].bFilterOK) { + iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + } + } + //end old code +#if 0 // TODO: this must be done inside the action as well! if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); } else if(iRetMod == RS_RET_SUSPENDED) { @@ -119,8 +128,8 @@ DEFFUNC_llExecFunc(processMsgDoActions) } else { pDoActData->bPrevWasSuspended = 0; } +#endif -finalize_it: RETiRet; } @@ -129,7 +138,7 @@ finalize_it: * provided filter condition. */ static rsRetVal -shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) +shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) { DEFiRet; unsigned short pbMustBeFreed; @@ -278,26 +287,25 @@ finalize_it: -/* Process (consume) a received message. Calls the actions configured. +/* Process (consume) a batch of messages. Calls the actions configured. * rgerhards, 2005-10-13 */ static rsRetVal -processMsg(rule_t *pThis, msg_t *pMsg) +processBatch(rule_t *pThis, batch_t *pBatch) { - int bProcessMsg; - processMsgDoActions_t DoActData; + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, rule); assert(pMsg != NULL); /* first check the filters... */ - CHKiRet(shouldProcessThisMessage(pThis, pMsg, &bProcessMsg)); - if(bProcessMsg) { - DoActData.pMsg = pMsg; - DoActData.bPrevWasSuspended = 0; - CHKiRet(llExecFunc(&pThis->llActList, processMsgDoActions, (void*)&DoActData)); + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp), + &(pBatch->pElem[i].bFilterOK))); + // TODO: really abort on error? 2010-06-10 } + CHKiRet(llExecFunc(&pThis->llActList, processBatchDoActions, pBatch)); finalize_it: RETiRet; @@ -440,7 +448,7 @@ CODESTARTobjQueryInterface(rule) pIf->DebugPrint = ruleDebugPrint; pIf->IterateAllActions = iterateAllActions; - pIf->ProcessMsg = processMsg; + pIf->ProcessBatch = processBatch; pIf->SetAssRuleset = setAssRuleset; pIf->GetAssRuleset = getAssRuleset; finalize_it: diff --git a/runtime/rule.h b/runtime/rule.h index 7b607637..309a2ed8 100644 --- a/runtime/rule.h +++ b/runtime/rule.h @@ -64,11 +64,12 @@ BEGINinterface(rule) /* name must also be changed in ENDinterface macro! */ rsRetVal (*ConstructFinalize)(rule_t __attribute__((unused)) *pThis); rsRetVal (*Destruct)(rule_t **ppThis); rsRetVal (*IterateAllActions)(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void *pParam); - rsRetVal (*ProcessMsg)(rule_t *pThis, msg_t *pMsg); + rsRetVal (*ProcessBatch)(rule_t *pThis, batch_t *pBatch); rsRetVal (*SetAssRuleset)(rule_t *pThis, ruleset_t*); ruleset_t* (*GetAssRuleset)(rule_t *pThis); ENDinterface(rule) -#define ruleCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ +#define ruleCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */ +/* change for v2: ProcessMsg replaced by ProcessBatch - 2010-06-10 */ /* prototypes */ diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 1a77be2b..caeb9357 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -46,6 +46,7 @@ #include "rule.h" #include "errmsg.h" #include "parser.h" +#include "batch.h" #include "unicode-helper.h" #include "dirty.h" /* for main ruleset queue creation */ @@ -134,34 +135,48 @@ finalize_it: -/* helper to processMsg(), used to call the configured actions. It is +/* helper to processBatch(), used to call the configured actions. It is * executed from within llExecFunc() of the action list. * rgerhards, 2007-08-02 */ -DEFFUNC_llExecFunc(processMsgDoRules) +DEFFUNC_llExecFunc(processBatchDoRules) { rsRetVal iRet; ISOBJ_TYPE_assert(pData, rule); - iRet = rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam); + iRet = rule.ProcessBatch((rule_t*) pData, (batch_t*) pParam); dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet); return iRet; } -/* Process (consume) a received message. Calls the actions configured. +/* Process (consume) a batch of messages. Calls the actions configured. + * If the whole batch uses a singel ruleset, we can process the batch as + * a whole. Otherwise, we need to process it slower, on a message-by-message + * basis (what can be optimized to a per-ruleset basis) * rgerhards, 2005-10-13 */ static rsRetVal -processMsg(msg_t *pMsg) +processBatch(batch_t *pBatch) { ruleset_t *pThis; DEFiRet; - assert(pMsg != NULL); - - pThis = (pMsg->pRuleset == NULL) ? pDfltRuleset : pMsg->pRuleset; - ISOBJ_TYPE_assert(pThis, ruleset); - - CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg)); + assert(pBatch != NULL); + + if(pBatch->bSingleRuleset) { + pThis = batchGetRuleset(pBatch); + if(pThis == NULL) + pThis = pDfltRuleset; + ISOBJ_TYPE_assert(pThis, ruleset); + CHKiRet(llExecFunc(&pThis->llRules, processBatchDoRules, pBatch)); + } else { + #warning implementation missing! + /* we need to split of the batch according to rulesets used */ + // TODO: do this at the deque level, much more performant! + assert(0); // TODO mandatory to implement! + dbgprintf("processbatch missing implementation, terminating!\n"); + printf("processBatch missing implementation, terminating!\n"); + exit(0); + } finalize_it: dbgprintf("ruleset.ProcessMsg() returns %d\n", iRet); @@ -515,7 +530,7 @@ CODESTARTobjQueryInterface(ruleset) pIf->IterateAllActions = iterateAllActions; pIf->DestructAllActions = destructAllActions; pIf->AddRule = addRule; - pIf->ProcessMsg = processMsg; + pIf->ProcessBatch = processBatch; pIf->SetName = setName; pIf->DebugPrintAll = debugPrintAll; pIf->GetCurrent = GetCurrent; diff --git a/runtime/ruleset.h b/runtime/ruleset.h index 222d773e..acebd17a 100644 --- a/runtime/ruleset.h +++ b/runtime/ruleset.h @@ -48,7 +48,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ rsRetVal (*DestructAllActions)(void); rsRetVal (*AddRule)(ruleset_t *pThis, rule_t **ppRule); rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName); - rsRetVal (*ProcessMsg)(msg_t *pMsg); + rsRetVal (*ProcessBatch)(batch_t*); rsRetVal (*GetRuleset)(ruleset_t **ppThis, uchar*); rsRetVal (*SetDefaultRuleset)(uchar*); rsRetVal (*SetCurrRuleset)(uchar*); @@ -57,7 +57,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ /* v3, 2009-11-04 */ parserList_t* (*GetParserList)(msg_t *); ENDinterface(ruleset) -#define rulesetCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */ +#define rulesetCURR_IF_VERSION 4 /* increment whenever you change the interface structure! */ /* prototypes */ |