summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-10 14:36:49 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-10 14:36:49 +0200
commitfe8d317c1b40fe162891d5ddec1cb7df702bb7fe (patch)
treef001cb3ebd5cc81c14cbec090ca204df42763c7e
parent74135da95971c8d03e4e45f7d3f703e1d40f76f4 (diff)
downloadrsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.tar.gz
rsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.tar.xz
rsyslog-fe8d317c1b40fe162891d5ddec1cb7df702bb7fe.zip
milestone commit(BUGGY): batch is now handed down to rule processing
Now, the full batch is passed down to the rule, which then enqueues the elements as single messages. Note that this code has some known defects and needs more changes until it is correct again. This is primarily a commit to be able to return to a known-(somewhat)-good state.
-rw-r--r--runtime/batch.h28
-rw-r--r--runtime/rule.c48
-rw-r--r--runtime/rule.h5
-rw-r--r--runtime/ruleset.c39
-rw-r--r--runtime/ruleset.h4
-rw-r--r--tools/syslogd.c26
6 files changed, 105 insertions, 45 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index ec257125..1245df11 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -26,6 +26,8 @@
#ifndef BATCH_H_INCLUDED
#define BATCH_H_INCLUDED
+#include "msg.h"
+
/* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used
* for action processing. So far, this seems acceptable, the status is simply ignored inside the
* main message queue. But over time, it could potentially be useful to split the two.
@@ -45,12 +47,18 @@ typedef enum {
struct batch_obj_s {
obj_t *pUsrp; /* pointer to user object (most often message) */
batch_state_t state; /* associated state */
+ /* work variables for action processing; these are reused for each action (or block of
+ * actions)
+ */
+ sbool bFilterOK; /* work area for filter processing (per action, reused!) */
+ sbool bPrevWasSuspended;
void *pActParams; /* parameters to be passed to action */
size_t *pLenParams; /* length of the parameter in question */
void *staticActParams[CONF_OMOD_NUMSTRINGS_BUFSIZE];
/* a cache to save malloc(), if not absolutely necessary */
size_t staticLenParams[CONF_OMOD_NUMSTRINGS_BUFSIZE];
/* and the same for the message length (if used) */
+ /* end action work variables */
};
/* the batch
@@ -72,7 +80,27 @@ struct batch_s {
int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */
int iDoneUpTo; /* all messages below this index have state other than RDY */
qDeqID deqID; /* ID of dequeue operation that generated this batch */
+ int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
+ sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */
batch_obj_t *pElem; /* batch elements */
};
+
+/* some inline functions (we may move this off to an object .. or not) */
+static inline void
+batchSetSingleRuleset(batch_t *pBatch, sbool val) {
+ pBatch->bSingleRuleset = val;
+}
+
+/* get the batches ruleset */
+static inline ruleset_t*
+batchGetRuleset(batch_t *pBatch) {
+ return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
+}
+
+/* get number of msgs for this batch */
+static inline int
+batchNumMsgs(batch_t *pBatch) {
+ return pBatch->nElem;
+}
#endif /* #ifndef BATCH_H_INCLUDED */
diff --git a/runtime/rule.c b/runtime/rule.c
index 3b98d7d1..c28e15c9 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -39,6 +39,7 @@
#include "vm.h"
#include "var.h"
#include "srUtils.h"
+#include "batch.h"
#include "unicode-helper.h"
/* static data */
@@ -87,30 +88,38 @@ iterateAllActions(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void* pParam)
}
-
/* helper to processMsg(), used to call the configured actions. It is
* executed from within llExecFunc() of the action list.
* rgerhards, 2007-08-02
*/
-typedef struct processMsgDoActions_s {
- int bPrevWasSuspended; /* was the previous action suspended? */
- msg_t *pMsg;
-} processMsgDoActions_t;
-DEFFUNC_llExecFunc(processMsgDoActions)
+DEFFUNC_llExecFunc(processBatchDoActions)
{
DEFiRet;
rsRetVal iRetMod; /* return value of module - we do not always pass that back */
action_t *pAction = (action_t*) pData;
- processMsgDoActions_t *pDoActData = (processMsgDoActions_t*) pParam;
+ batch_t *pBatch = (batch_t*) pParam;
assert(pAction != NULL);
+#if 0 // TODO: move this to the action object
if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) {
dbgprintf("not calling action because the previous one is not suspended\n");
ABORT_FINALIZE(RS_RET_OK);
}
+#endif
- iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg);
+ // NEW (potentially): iRetMod = actionSubmit(pAction, (batch_t*) pParam);
+ // old code -- milestone check
+dbgprintf("ZZZ: inside processBatchDoActions, begin processing (nElem=%d)\n", batchNumMsgs(pBatch));
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+dbgprintf("ZZZ: inside processBatchDoActions, processind elem %d/%d\n", i, batchNumMsgs(pBatch));
+ if(pBatch->pElem[i].bFilterOK) {
+ iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ }
+ }
+ //end old code
+#if 0 // TODO: this must be done inside the action as well!
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {
@@ -119,8 +128,8 @@ DEFFUNC_llExecFunc(processMsgDoActions)
} else {
pDoActData->bPrevWasSuspended = 0;
}
+#endif
-finalize_it:
RETiRet;
}
@@ -129,7 +138,7 @@ finalize_it:
* provided filter condition.
*/
static rsRetVal
-shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
+shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg)
{
DEFiRet;
unsigned short pbMustBeFreed;
@@ -278,26 +287,25 @@ finalize_it:
-/* Process (consume) a received message. Calls the actions configured.
+/* Process (consume) a batch of messages. Calls the actions configured.
* rgerhards, 2005-10-13
*/
static rsRetVal
-processMsg(rule_t *pThis, msg_t *pMsg)
+processBatch(rule_t *pThis, batch_t *pBatch)
{
- int bProcessMsg;
- processMsgDoActions_t DoActData;
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
assert(pMsg != NULL);
/* first check the filters... */
- CHKiRet(shouldProcessThisMessage(pThis, pMsg, &bProcessMsg));
- if(bProcessMsg) {
- DoActData.pMsg = pMsg;
- DoActData.bPrevWasSuspended = 0;
- CHKiRet(llExecFunc(&pThis->llActList, processMsgDoActions, (void*)&DoActData));
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
+ &(pBatch->pElem[i].bFilterOK)));
+ // TODO: really abort on error? 2010-06-10
}
+ CHKiRet(llExecFunc(&pThis->llActList, processBatchDoActions, pBatch));
finalize_it:
RETiRet;
@@ -440,7 +448,7 @@ CODESTARTobjQueryInterface(rule)
pIf->DebugPrint = ruleDebugPrint;
pIf->IterateAllActions = iterateAllActions;
- pIf->ProcessMsg = processMsg;
+ pIf->ProcessBatch = processBatch;
pIf->SetAssRuleset = setAssRuleset;
pIf->GetAssRuleset = getAssRuleset;
finalize_it:
diff --git a/runtime/rule.h b/runtime/rule.h
index 7b607637..309a2ed8 100644
--- a/runtime/rule.h
+++ b/runtime/rule.h
@@ -64,11 +64,12 @@ BEGINinterface(rule) /* name must also be changed in ENDinterface macro! */
rsRetVal (*ConstructFinalize)(rule_t __attribute__((unused)) *pThis);
rsRetVal (*Destruct)(rule_t **ppThis);
rsRetVal (*IterateAllActions)(rule_t *pThis, rsRetVal (*pFunc)(void*, void*), void *pParam);
- rsRetVal (*ProcessMsg)(rule_t *pThis, msg_t *pMsg);
+ rsRetVal (*ProcessBatch)(rule_t *pThis, batch_t *pBatch);
rsRetVal (*SetAssRuleset)(rule_t *pThis, ruleset_t*);
ruleset_t* (*GetAssRuleset)(rule_t *pThis);
ENDinterface(rule)
-#define ruleCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */
+#define ruleCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */
+/* change for v2: ProcessMsg replaced by ProcessBatch - 2010-06-10 */
/* prototypes */
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 1a77be2b..caeb9357 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -46,6 +46,7 @@
#include "rule.h"
#include "errmsg.h"
#include "parser.h"
+#include "batch.h"
#include "unicode-helper.h"
#include "dirty.h" /* for main ruleset queue creation */
@@ -134,34 +135,48 @@ finalize_it:
-/* helper to processMsg(), used to call the configured actions. It is
+/* helper to processBatch(), used to call the configured actions. It is
* executed from within llExecFunc() of the action list.
* rgerhards, 2007-08-02
*/
-DEFFUNC_llExecFunc(processMsgDoRules)
+DEFFUNC_llExecFunc(processBatchDoRules)
{
rsRetVal iRet;
ISOBJ_TYPE_assert(pData, rule);
- iRet = rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam);
+ iRet = rule.ProcessBatch((rule_t*) pData, (batch_t*) pParam);
dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet);
return iRet;
}
-/* Process (consume) a received message. Calls the actions configured.
+/* Process (consume) a batch of messages. Calls the actions configured.
+ * If the whole batch uses a singel ruleset, we can process the batch as
+ * a whole. Otherwise, we need to process it slower, on a message-by-message
+ * basis (what can be optimized to a per-ruleset basis)
* rgerhards, 2005-10-13
*/
static rsRetVal
-processMsg(msg_t *pMsg)
+processBatch(batch_t *pBatch)
{
ruleset_t *pThis;
DEFiRet;
- assert(pMsg != NULL);
-
- pThis = (pMsg->pRuleset == NULL) ? pDfltRuleset : pMsg->pRuleset;
- ISOBJ_TYPE_assert(pThis, ruleset);
-
- CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg));
+ assert(pBatch != NULL);
+
+ if(pBatch->bSingleRuleset) {
+ pThis = batchGetRuleset(pBatch);
+ if(pThis == NULL)
+ pThis = pDfltRuleset;
+ ISOBJ_TYPE_assert(pThis, ruleset);
+ CHKiRet(llExecFunc(&pThis->llRules, processBatchDoRules, pBatch));
+ } else {
+ #warning implementation missing!
+ /* we need to split of the batch according to rulesets used */
+ // TODO: do this at the deque level, much more performant!
+ assert(0); // TODO mandatory to implement!
+ dbgprintf("processbatch missing implementation, terminating!\n");
+ printf("processBatch missing implementation, terminating!\n");
+ exit(0);
+ }
finalize_it:
dbgprintf("ruleset.ProcessMsg() returns %d\n", iRet);
@@ -515,7 +530,7 @@ CODESTARTobjQueryInterface(ruleset)
pIf->IterateAllActions = iterateAllActions;
pIf->DestructAllActions = destructAllActions;
pIf->AddRule = addRule;
- pIf->ProcessMsg = processMsg;
+ pIf->ProcessBatch = processBatch;
pIf->SetName = setName;
pIf->DebugPrintAll = debugPrintAll;
pIf->GetCurrent = GetCurrent;
diff --git a/runtime/ruleset.h b/runtime/ruleset.h
index 222d773e..acebd17a 100644
--- a/runtime/ruleset.h
+++ b/runtime/ruleset.h
@@ -48,7 +48,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
rsRetVal (*DestructAllActions)(void);
rsRetVal (*AddRule)(ruleset_t *pThis, rule_t **ppRule);
rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName);
- rsRetVal (*ProcessMsg)(msg_t *pMsg);
+ rsRetVal (*ProcessBatch)(batch_t*);
rsRetVal (*GetRuleset)(ruleset_t **ppThis, uchar*);
rsRetVal (*SetDefaultRuleset)(uchar*);
rsRetVal (*SetCurrRuleset)(uchar*);
@@ -57,7 +57,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
/* v3, 2009-11-04 */
parserList_t* (*GetParserList)(msg_t *);
ENDinterface(ruleset)
-#define rulesetCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */
+#define rulesetCURR_IF_VERSION 4 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 2c36e6c2..46587a27 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -639,19 +639,24 @@ chkMsgAgainstACL() {
* rgerhards, 2010-06-09
*/
static inline rsRetVal
-preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
+preprocessBatch(batch_t *pBatch) {
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
prop_t *propFromHost = NULL;
prop_t *propFromHostIP = NULL;
+ int bSingleRuleset;
+ ruleset_t *batchRuleset; /* the ruleset used for all message inside the batch, if there is a single one */
int bIsPermitted;
msg_t *pMsg;
int i;
rsRetVal localRet;
DEFiRet;
- for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ bSingleRuleset = 1;
+ batchRuleset = (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
+
+ for(i = 0 ; i < pBatch->nElem && !*(pBatch->pbShutdownImmediate) ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
@@ -676,8 +681,11 @@ preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
pBatch->pElem[i].state = BATCH_STATE_DISC;
}
}
+ if(pMsg->pRuleset != batchRuleset)
+ bSingleRuleset = 0;
}
+ batchSetSingleRuleset(pBatch, bSingleRuleset);
finalize_it:
if(propFromHost != NULL)
@@ -696,18 +704,18 @@ finalize_it:
static rsRetVal
msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
{
- int i;
DEFiRet;
-
assert(pBatch != NULL);
-
- preprocessBatch(pBatch, pbShutdownImmediate);
+ pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */
+ preprocessBatch(pBatch);
+ ruleset.ProcessBatch(pBatch);
+dbgprintf("ZZZ: back in msgConsumer\n");
+//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
+//do not have this yet and so we emulate -- 2010-06-10
+int i;
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
- DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem);
- ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp);
pBatch->pElem[i].state = BATCH_STATE_COMM;
}
-
RETiRet;
}