summaryrefslogtreecommitdiffstats
path: root/runtime/rule.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-10 14:36:49 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-10 14:36:49 +0200
commitfe8d317c1b40fe162891d5ddec1cb7df702bb7fe (patch)
treef001cb3ebd5cc81c14cbec090ca204df42763c7e /runtime/rule.c
parent74135da95971c8d03e4e45f7d3f703e1d40f76f4 (diff)
downloadrsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.tar.gz
rsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.tar.xz
rsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.zip
milestone commit(BUGGY): batch is now handed down to rule processing
Now, the full batch is passed down to the rule, which then enqueues the elements as single messages. Note that this code has some known defects and needs more changes until it is correct again. This is primarily a commit to be able to return to a known-(somewhat)-good state.
Diffstat (limited to 'runtime/rule.c')
-rw-r--r--runtime/rule.c48
1 files changed, 28 insertions, 20 deletions
diff --git a/runtime/rule.c b/runtime/rule.c
index 3b98d7d1..c28e15c9 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -39,6 +39,7 @@
#include "vm.h"
#include "var.h"
#include "srUtils.h"
+#include "batch.h"
#include "unicode-helper.h"
/* static data */
@@ -87,30 +88,38 @@ iterateAllActions(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void* pParam)
}
-
/* helper to processMsg(), used to call the configured actions. It is
* executed from within llExecFunc() of the action list.
* rgerhards, 2007-08-02
*/
-typedef struct processMsgDoActions_s {
- int bPrevWasSuspended; /* was the previous action suspended? */
- msg_t *pMsg;
-} processMsgDoActions_t;
-DEFFUNC_llExecFunc(processMsgDoActions)
+DEFFUNC_llExecFunc(processBatchDoActions)
{
DEFiRet;
rsRetVal iRetMod; /* return value of module - we do not always pass that back */
action_t *pAction = (action_t*) pData;
- processMsgDoActions_t *pDoActData = (processMsgDoActions_t*) pParam;
+ batch_t *pBatch = (batch_t*) pParam;
assert(pAction != NULL);
+#if 0 // TODO: move this to the action object
if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) {
dbgprintf("not calling action because the previous one is not suspended\n");
ABORT_FINALIZE(RS_RET_OK);
}
+#endif
- iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg);
+ // NEW (potentially): iRetMod = actionSubmit(pAction, (batch_t*) pParam);
+ // old code -- milestone check
+dbgprintf("ZZZ: inside processBatchDoActions, begin processing (nElem=%d)\n", batchNumMsgs(pBatch));
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+dbgprintf("ZZZ: inside processBatchDoActions, processind elem %d/%d\n", i, batchNumMsgs(pBatch));
+ if(pBatch->pElem[i].bFilterOK) {
+ iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ }
+ }
+ //end old code
+#if 0 // TODO: this must be done inside the action as well!
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {
@@ -119,8 +128,8 @@ DEFFUNC_llExecFunc(processMsgDoActions)
} else {
pDoActData->bPrevWasSuspended = 0;
}
+#endif
-finalize_it:
RETiRet;
}
@@ -129,7 +138,7 @@ finalize_it:
* provided filter condition.
*/
static rsRetVal
-shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
+shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg)
{
DEFiRet;
unsigned short pbMustBeFreed;
@@ -278,26 +287,25 @@ finalize_it:
-/* Process (consume) a received message. Calls the actions configured.
+/* Process (consume) a batch of messages. Calls the actions configured.
* rgerhards, 2005-10-13
*/
static rsRetVal
-processMsg(rule_t *pThis, msg_t *pMsg)
+processBatch(rule_t *pThis, batch_t *pBatch)
{
- int bProcessMsg;
- processMsgDoActions_t DoActData;
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
assert(pMsg != NULL);
/* first check the filters... */
- CHKiRet(shouldProcessThisMessage(pThis, pMsg, &bProcessMsg));
- if(bProcessMsg) {
- DoActData.pMsg = pMsg;
- DoActData.bPrevWasSuspended = 0;
- CHKiRet(llExecFunc(&pThis->llActList, processMsgDoActions, (void*)&DoActData));
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
+ &(pBatch->pElem[i].bFilterOK)));
+ // TODO: really abort on error? 2010-06-10
}
+ CHKiRet(llExecFunc(&pThis->llActList, processBatchDoActions, pBatch));
finalize_it:
RETiRet;
@@ -440,7 +448,7 @@ CODESTARTobjQueryInterface(rule)
pIf->DebugPrint = ruleDebugPrint;
pIf->IterateAllActions = iterateAllActions;
- pIf->ProcessMsg = processMsg;
+ pIf->ProcessBatch = processBatch;
pIf->SetAssRuleset = setAssRuleset;
pIf->GetAssRuleset = getAssRuleset;
finalize_it: