summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/batch.h28
-rw-r--r--runtime/rule.c48
-rw-r--r--runtime/rule.h5
-rw-r--r--runtime/ruleset.c39
-rw-r--r--runtime/ruleset.h4
5 files changed, 88 insertions, 36 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index ec257125..1245df11 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -26,6 +26,8 @@
#ifndef BATCH_H_INCLUDED
#define BATCH_H_INCLUDED
+#include "msg.h"
+
/* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used
* for action processing. So far, this seems acceptable, the status is simply ignored inside the
* main message queue. But over time, it could potentially be useful to split the two.
@@ -45,12 +47,18 @@ typedef enum {
struct batch_obj_s {
obj_t *pUsrp; /* pointer to user object (most often message) */
batch_state_t state; /* associated state */
+ /* work variables for action processing; these are reused for each action (or block of
+ * actions)
+ */
+ sbool bFilterOK; /* work area for filter processing (per action, reused!) */
+ sbool bPrevWasSuspended;
void *pActParams; /* parameters to be passed to action */
size_t *pLenParams; /* length of the parameter in question */
void *staticActParams[CONF_OMOD_NUMSTRINGS_BUFSIZE];
/* a cache to save malloc(), if not absolutely necessary */
size_t staticLenParams[CONF_OMOD_NUMSTRINGS_BUFSIZE];
/* and the same for the message length (if used) */
+ /* end action work variables */
};
/* the batch
@@ -72,7 +80,27 @@ struct batch_s {
int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */
int iDoneUpTo; /* all messages below this index have state other than RDY */
qDeqID deqID; /* ID of dequeue operation that generated this batch */
+ int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
+ sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */
batch_obj_t *pElem; /* batch elements */
};
+
+/* some inline functions (we may move this off to an object .. or not) */
+static inline void
+batchSetSingleRuleset(batch_t *pBatch, sbool val) {
+ pBatch->bSingleRuleset = val;
+}
+
+/* get the batches ruleset */
+static inline ruleset_t*
+batchGetRuleset(batch_t *pBatch) {
+ return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
+}
+
+/* get number of msgs for this batch */
+static inline int
+batchNumMsgs(batch_t *pBatch) {
+ return pBatch->nElem;
+}
#endif /* #ifndef BATCH_H_INCLUDED */
diff --git a/runtime/rule.c b/runtime/rule.c
index 3b98d7d1..c28e15c9 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -39,6 +39,7 @@
#include "vm.h"
#include "var.h"
#include "srUtils.h"
+#include "batch.h"
#include "unicode-helper.h"
/* static data */
@@ -87,30 +88,38 @@ iterateAllActions(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void* pParam)
}
-
/* helper to processMsg(), used to call the configured actions. It is
* executed from within llExecFunc() of the action list.
* rgerhards, 2007-08-02
*/
-typedef struct processMsgDoActions_s {
- int bPrevWasSuspended; /* was the previous action suspended? */
- msg_t *pMsg;
-} processMsgDoActions_t;
-DEFFUNC_llExecFunc(processMsgDoActions)
+DEFFUNC_llExecFunc(processBatchDoActions)
{
DEFiRet;
rsRetVal iRetMod; /* return value of module - we do not always pass that back */
action_t *pAction = (action_t*) pData;
- processMsgDoActions_t *pDoActData = (processMsgDoActions_t*) pParam;
+ batch_t *pBatch = (batch_t*) pParam;
assert(pAction != NULL);
+#if 0 // TODO: move this to the action object
if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) {
dbgprintf("not calling action because the previous one is not suspended\n");
ABORT_FINALIZE(RS_RET_OK);
}
+#endif
- iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg);
+ // NEW (potentially): iRetMod = actionSubmit(pAction, (batch_t*) pParam);
+ // old code -- milestone check
+dbgprintf("ZZZ: inside processBatchDoActions, begin processing (nElem=%d)\n", batchNumMsgs(pBatch));
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+dbgprintf("ZZZ: inside processBatchDoActions, processind elem %d/%d\n", i, batchNumMsgs(pBatch));
+ if(pBatch->pElem[i].bFilterOK) {
+ iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ }
+ }
+ //end old code
+#if 0 // TODO: this must be done inside the action as well!
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {
@@ -119,8 +128,8 @@ DEFFUNC_llExecFunc(processMsgDoActions)
} else {
pDoActData->bPrevWasSuspended = 0;
}
+#endif
-finalize_it:
RETiRet;
}
@@ -129,7 +138,7 @@ finalize_it:
* provided filter condition.
*/
static rsRetVal
-shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
+shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg)
{
DEFiRet;
unsigned short pbMustBeFreed;
@@ -278,26 +287,25 @@ finalize_it:
-/* Process (consume) a received message. Calls the actions configured.
+/* Process (consume) a batch of messages. Calls the actions configured.
* rgerhards, 2005-10-13
*/
static rsRetVal
-processMsg(rule_t *pThis, msg_t *pMsg)
+processBatch(rule_t *pThis, batch_t *pBatch)
{
- int bProcessMsg;
- processMsgDoActions_t DoActData;
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
assert(pMsg != NULL);
/* first check the filters... */
- CHKiRet(shouldProcessThisMessage(pThis, pMsg, &bProcessMsg));
- if(bProcessMsg) {
- DoActData.pMsg = pMsg;
- DoActData.bPrevWasSuspended = 0;
- CHKiRet(llExecFunc(&pThis->llActList, processMsgDoActions, (void*)&DoActData));
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
+ &(pBatch->pElem[i].bFilterOK)));
+ // TODO: really abort on error? 2010-06-10
}
+ CHKiRet(llExecFunc(&pThis->llActList, processBatchDoActions, pBatch));
finalize_it:
RETiRet;
@@ -440,7 +448,7 @@ CODESTARTobjQueryInterface(rule)
pIf->DebugPrint = ruleDebugPrint;
pIf->IterateAllActions = iterateAllActions;
- pIf->ProcessMsg = processMsg;
+ pIf->ProcessBatch = processBatch;
pIf->SetAssRuleset = setAssRuleset;
pIf->GetAssRuleset = getAssRuleset;
finalize_it:
diff --git a/runtime/rule.h b/runtime/rule.h
index 7b607637..309a2ed8 100644
--- a/runtime/rule.h
+++ b/runtime/rule.h
@@ -64,11 +64,12 @@ BEGINinterface(rule) /* name must also be changed in ENDinterface macro! */
rsRetVal (*ConstructFinalize)(rule_t __attribute__((unused)) *pThis);
rsRetVal (*Destruct)(rule_t **ppThis);
rsRetVal (*IterateAllActions)(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void *pParam);
- rsRetVal (*ProcessMsg)(rule_t *pThis, msg_t *pMsg);
+ rsRetVal (*ProcessBatch)(rule_t *pThis, batch_t *pBatch);
rsRetVal (*SetAssRuleset)(rule_t *pThis, ruleset_t*);
ruleset_t* (*GetAssRuleset)(rule_t *pThis);
ENDinterface(rule)
-#define ruleCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+#define ruleCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */
+/* change for v2: ProcessMsg replaced by ProcessBatch - 2010-06-10 */
/* prototypes */
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 1a77be2b..caeb9357 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -46,6 +46,7 @@
#include "rule.h"
#include "errmsg.h"
#include "parser.h"
+#include "batch.h"
#include "unicode-helper.h"
#include "dirty.h" /* for main ruleset queue creation */
@@ -134,34 +135,48 @@ finalize_it:
-/* helper to processMsg(), used to call the configured actions. It is
+/* helper to processBatch(), used to call the configured actions. It is
* executed from within llExecFunc() of the action list.
* rgerhards, 2007-08-02
*/
-DEFFUNC_llExecFunc(processMsgDoRules)
+DEFFUNC_llExecFunc(processBatchDoRules)
{
rsRetVal iRet;
ISOBJ_TYPE_assert(pData, rule);
- iRet = rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam);
+ iRet = rule.ProcessBatch((rule_t*) pData, (batch_t*) pParam);
dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet);
return iRet;
}
-/* Process (consume) a received message. Calls the actions configured.
+/* Process (consume) a batch of messages. Calls the actions configured.
+ * If the whole batch uses a singel ruleset, we can process the batch as
+ * a whole. Otherwise, we need to process it slower, on a message-by-message
+ * basis (what can be optimized to a per-ruleset basis)
* rgerhards, 2005-10-13
*/
static rsRetVal
-processMsg(msg_t *pMsg)
+processBatch(batch_t *pBatch)
{
ruleset_t *pThis;
DEFiRet;
- assert(pMsg != NULL);
-
- pThis = (pMsg->pRuleset == NULL) ? pDfltRuleset : pMsg->pRuleset;
- ISOBJ_TYPE_assert(pThis, ruleset);
-
- CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg));
+ assert(pBatch != NULL);
+
+ if(pBatch->bSingleRuleset) {
+ pThis = batchGetRuleset(pBatch);
+ if(pThis == NULL)
+ pThis = pDfltRuleset;
+ ISOBJ_TYPE_assert(pThis, ruleset);
+ CHKiRet(llExecFunc(&pThis->llRules, processBatchDoRules, pBatch));
+ } else {
+ #warning implementation missing!
+ /* we need to split of the batch according to rulesets used */
+ // TODO: do this at the deque level, much more performant!
+ assert(0); // TODO mandatory to implement!
+ dbgprintf("processbatch missing implementation, terminating!\n");
+ printf("processBatch missing implementation, terminating!\n");
+ exit(0);
+ }
finalize_it:
dbgprintf("ruleset.ProcessMsg() returns %d\n", iRet);
@@ -515,7 +530,7 @@ CODESTARTobjQueryInterface(ruleset)
pIf->IterateAllActions = iterateAllActions;
pIf->DestructAllActions = destructAllActions;
pIf->AddRule = addRule;
- pIf->ProcessMsg = processMsg;
+ pIf->ProcessBatch = processBatch;
pIf->SetName = setName;
pIf->DebugPrintAll = debugPrintAll;
pIf->GetCurrent = GetCurrent;
diff --git a/runtime/ruleset.h b/runtime/ruleset.h
index 222d773e..acebd17a 100644
--- a/runtime/ruleset.h
+++ b/runtime/ruleset.h
@@ -48,7 +48,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
rsRetVal (*DestructAllActions)(void);
rsRetVal (*AddRule)(ruleset_t *pThis, rule_t **ppRule);
rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName);
- rsRetVal (*ProcessMsg)(msg_t *pMsg);
+ rsRetVal (*ProcessBatch)(batch_t*);
rsRetVal (*GetRuleset)(ruleset_t **ppThis, uchar*);
rsRetVal (*SetDefaultRuleset)(uchar*);
rsRetVal (*SetCurrRuleset)(uchar*);
@@ -57,7 +57,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
/* v3, 2009-11-04 */
parserList_t* (*GetParserList)(msg_t *);
ENDinterface(ruleset)
-#define rulesetCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */
+#define rulesetCURR_IF_VERSION 4 /* increment whenever you change the interface structure! */
/* prototypes */