summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c186
-rw-r--r--doc/msgflow.txt2
-rw-r--r--runtime/batch.h73
-rw-r--r--runtime/queue.c41
-rw-r--r--runtime/queue.h2
-rw-r--r--runtime/rule.c11
-rw-r--r--runtime/ruleset.c75
-rw-r--r--runtime/wti.c4
-rw-r--r--tests/Makefile.am2
-rwxr-xr-xtests/diag.sh6
-rw-r--r--tests/tcpflood.c1
-rw-r--r--tools/syslogd.c13
12 files changed, 337 insertions, 79 deletions
diff --git a/action.c b/action.c
index dc6cdaaf..c8117b78 100644
--- a/action.c
+++ b/action.c
@@ -585,7 +585,7 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static rsRetVal actionPrepare(action_t *pThis)
+static inline rsRetVal actionPrepare(action_t *pThis)
{
DEFiRet;
@@ -665,7 +665,6 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **pp
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- //CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(((uchar**)pAction->ppMsgs)[i]), &(pAction->lenMsgs[i])));
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i]));
break;
case ACT_ARRAY_PASSING:
@@ -726,29 +725,23 @@ static rsRetVal cleanupDoActionParams(action_t *pAction, uchar **ppMsgs)
* rgerhards, 2008-01-28
*/
rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg)
+actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
{
- uchar *ppMsgs[10];
- size_t lenMsgs[10];
int i;
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
- for(i = 0 ; i < 10 ; ++ i) {
- ppMsgs[i] = NULL;
- lenMsgs[i] = 0;
- }
-
DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
- CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs));
+ //CHKiRet(prepareDoActionParams(pThis, pMsg, ppMsgs, lenMsgs));
pThis->bHadAutoCommit = 0;
#if 1
//d_pthread_mutex_lock(&pThis->mutActExec);
//pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec);
- iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+ // original: iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+ iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData);
//pthread_cleanup_pop(1); /* unlock mutex */
//iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
#else
@@ -783,6 +776,7 @@ iRet = RS_RET_OK;
finalize_it:
+#if 0 // THIS NEEDS TO BE DONE TO THE BATCH!
switch(pThis->eParamPassing) {
case ACT_STRING_PASSING:
for(i = 0 ; i < 10 ; ++i)
@@ -795,6 +789,7 @@ finalize_it:
/* nothing to do in that case */
break;
}
+#endif
RETiRet;
@@ -805,8 +800,8 @@ finalize_it:
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg)
+static inline rsRetVal
+actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams)
{
DEFiRet;
@@ -815,7 +810,7 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg)
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg));
+ CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
iRet = getReturnCode(pThis);
finalize_it:
@@ -835,8 +830,11 @@ finishBatch(action_t *pThis, batch_t *pBatch)
ASSERT(pThis != NULL);
- if(pThis->eState == ACT_STATE_RDY)
+dbgprintf("ZZZ: finishBatch called, eState %d\n", pThis->eState);
+ if(pThis->eState == ACT_STATE_RDY) {
+ /* we just need to flag the batch as commited */
FINALIZE; /* nothing to do */
+ }
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX) {
@@ -846,7 +844,8 @@ finishBatch(action_t *pThis, batch_t *pBatch)
actionCommitted(pThis);
/* flag messages as committed */
for(i = 0 ; i < pBatch->nElem ; ++i) {
- pBatch->pElem[i].state = BATCH_STATE_COMM;
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+dbgprintf("ZZZ: finishBatch commits element %d\n", i);
}
break;
case RS_RET_SUSPENDED:
@@ -881,8 +880,8 @@ finalize_it:
/* try to submit a partial batch of elements.
* rgerhards, 2009-05-12
*/
-static rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate)
+static inline rsRetVal
+tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
{
int i;
int iElemProcessed;
@@ -894,17 +893,22 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
assert(pBatch != NULL);
assert(pnElem != NULL);
+dbgprintf("ZZZ1: tryDoAction, nElem %d, iDoneUpto %d\n", *pnElem, pBatch->iDoneUpTo);
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
- pAction->pbShutdownImmediate = pbShutdownImmediate;
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
- if(*pbShutdownImmediate)
+ if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
- if(pBatch->pElem[i].state != BATCH_STATE_DISC) {
- localRet = actionProcessMessage(pAction, pMsg);
+dbgprintf("ZZZ1: tryDoAction loop %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state);
+ if(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC) {
+dbgprintf("ZZZ1: trying to execute\n");
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams);
DBGPRINTF("action call returned %d\n", localRet);
+ /* Note: we directly modify the batch object state, because we know that
+ * wo do not overwrite DISC indicators!
+ */
if(localRet == RS_RET_OK) {
/* mark messages as committed */
while(iCommittedUpTo <= i) {
@@ -931,9 +935,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
}
finalize_it:
- if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) {
- iRet = RS_RET_DISCARDMSG;
- } else if(pBatch->iDoneUpTo != iCommittedUpTo) {
+ if(pBatch->iDoneUpTo != iCommittedUpTo) {
*pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
pBatch->iDoneUpTo = iCommittedUpTo;
}
@@ -944,21 +946,24 @@ finalize_it:
/* submit a batch for actual action processing.
* The first nElem elements are processed. This function calls itself
* recursively if it needs to handle errors.
+ * Note: we don't need the number of the first message to be processed as a parameter,
+ * because this is kept track of inside the batch itself (iDoneUpTo).
* rgerhards, 2009-05-12
*/
static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate)
+submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
{
int i;
int bDone;
rsRetVal localRet;
DEFiRet;
+dbgprintf("ZZZ1: submitBatch, nElem %d\n", nElem);
assert(pBatch != NULL);
bDone = 0;
do {
- localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate);
+ localRet = tryDoAction(pAction, pBatch, &nElem);
if(localRet == RS_RET_FORCE_TERM) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
@@ -968,38 +973,36 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi
/* try commit transaction, once done, we can simply do so as if
* that return state was returned from tryDoAction().
*/
- localRet = finishBatch(pAction, pBatch); // TODO: careful, do we need the elem counter?
+ localRet = finishBatch(pAction, pBatch);
}
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
bDone = 1;
- } else if(localRet == RS_RET_DISCARDMSG) {
- iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */
- bDone = 1;
} else if(localRet == RS_RET_SUSPENDED) {
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
/* in this case, the whole batch can not be processed */
for(i = 0 ; i < nElem ; ++i) {
- pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD;
+dbgprintf("ZZZ2: setting batch state for item %d\n", i);
+ batchSetElemState(pBatch, i, BATCH_STATE_BAD);
}
bDone = 1;
} else {
if(nElem == 1) {
- pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD;
+ batchSetElemState(pBatch, i, BATCH_STATE_BAD);
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
- submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate);
- submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate);
+ submitBatch(pAction, pBatch, nElem / 2);
+ submitBatch(pAction, pBatch, nElem - (nElem / 2));
bDone = 1;
}
}
- } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */
+ } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */
- if(*pbShutdownImmediate)
+ if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
finalize_it:
@@ -1007,17 +1010,46 @@ finalize_it:
}
+
+/* The following function prepares a batch for processing, that it is
+ * reinitializes batch states, generates strings and does everything else
+ * that needs to be done in order to make the batch ready for submission to
+ * the actual output module. Note that we look at the precomputed
+ * filter OK condition and process only those messages, that actually matched
+ * the filter.
+ * rgerhards, 2010-06-14
+ */
+static inline rsRetVal
+prepareBatch(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ batch_obj_t *pElem;
+ DEFiRet;
+
+ pBatch->iDoneUpTo = 0;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ pElem = &(pBatch->pElem[i]);
+ if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
+ pElem->state = BATCH_STATE_RDY;
+ prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp,
+ (uchar**) &(pElem->staticActParams), pElem->staticLenParams);
+ }
+ }
+ RETiRet;
+}
+
+
/* receive a batch and process it. This includes retry handling.
* rgerhards, 2009-05-12
*/
-static rsRetVal
-processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
+static inline rsRetVal
+processAction(action_t *pAction, batch_t *pBatch)
{
DEFiRet;
assert(pBatch != NULL);
- pBatch->iDoneUpTo = 0;
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate));
+dbgprintf("ZZZ1: processAction\n");
+ CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem));
iRet = finishBatch(pAction, pBatch);
finalize_it:
@@ -1033,10 +1065,18 @@ finalize_it:
static rsRetVal
processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
{
+ int *pbShutdownImmdtSave;
DEFiRet;
assert(pBatch != NULL);
+dbgprintf("ZZZ1: processBatchMain\n");
+ pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
+ pBatch->pbShutdownImmediate = pbShutdownImmediate;
+ pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate;
+ CHKiRet(prepareBatch(pAction, pBatch));
+
+dbgprintf("ZZZ1: processBatchMain\n");
/* We now must guard the output module against execution by multiple threads. The
* plugin interface specifies that output modules must not be thread-safe (except
* if they notify us they are - functionality not yet implemented...).
@@ -1045,10 +1085,19 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
d_pthread_mutex_lock(&pAction->mutActExec);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- iRet = processAction(pAction, pBatch, pbShutdownImmediate);
+ iRet = processAction(pAction, pBatch);
+ //
+ // DEBUG
+ int i;
+ for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
+dbgprintf("ZZZ: after processBatchMain item %d: filter %d, state %d\n", i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state);
+ }
+ // END DEBUG
pthread_cleanup_pop(1); /* unlock mutex */
+finalize_it:
+ pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -1323,7 +1372,11 @@ doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg)
}
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
- iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ else
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+ //iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
finalize_it:
RETiRet;
@@ -1341,7 +1394,48 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
DEFiRet;
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
- iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ else
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
+//*** EXPERIMENTAL ***/
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed.
+ * rgerhards, 2010-06-08
+ */
+rsRetVal
+doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ DEFiRet;
+
+ DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ else { /* in this case, we do single submits to the queue.
+ * TODO: optimize this, we may do at least a multi-submit!
+ */
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+dbgprintf("ZZZ: submitToActQ item %d:%s\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15);
+ pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ }
+ }
+
+ // DEBUG
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+dbgprintf("ZZZ: batch state after processing item %d: %d\n", i, pBatch->pElem[i].state);
+ }
+ }
+ // END DEBUG
+ }
RETiRet;
}
diff --git a/doc/msgflow.txt b/doc/msgflow.txt
index b53ba7e7..ebee18f8 100644
--- a/doc/msgflow.txt
+++ b/doc/msgflow.txt
@@ -16,7 +16,7 @@ syslogd.c/msgConsumeOne
MsgSetRcvFromIPStr
if NEEDS_PARSING:
parser.ParseMsg
-ruleset.ProcessMsg (loops through ruleset)
+ruleset.ProcessBatch (loops through ruleset)
ruleset.c/processMsgDoRules (for each rule in ruleset)
rule.c/processMsg
1:rule.c/shouldProcessThisMessage
diff --git a/runtime/batch.h b/runtime/batch.h
index 1245df11..80621631 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -26,6 +26,7 @@
#ifndef BATCH_H_INCLUDED
#define BATCH_H_INCLUDED
+#include <string.h>
#include "msg.h"
/* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used
@@ -76,6 +77,7 @@ struct batch_obj_s {
* is completed (else, the whole process does not work correctly).
*/
struct batch_s {
+ int maxElem; /* maximum number of elements that this batch supports */
int nElem; /* actual number of element in this entry */
int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */
int iDoneUpTo; /* all messages below this index have state other than RDY */
@@ -92,15 +94,84 @@ batchSetSingleRuleset(batch_t *pBatch, sbool val) {
pBatch->bSingleRuleset = val;
}
-/* get the batches ruleset */
+/* get the batches ruleset (if we have a single ruleset) */
static inline ruleset_t*
batchGetRuleset(batch_t *pBatch) {
return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
}
+/* get the ruleset of a specifc element of the batch (index not verified!) */
+static inline ruleset_t*
+batchElemGetRuleset(batch_t *pBatch, int i) {
+ return ((msg_t*) pBatch->pElem[i].pUsrp)->pRuleset;
+}
+
/* get number of msgs for this batch */
static inline int
batchNumMsgs(batch_t *pBatch) {
return pBatch->nElem;
}
+
+
+/* set the status of the i-th batch element. Note that once the status is
+ * DISC, it will never be reset. So this function can NOT be used to initialize
+ * the state table. -- rgerhards, 2010-06-10
+ */
+static inline void
+batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) {
+ if(pBatch->pElem[i].state != BATCH_STATE_DISC)
+ pBatch->pElem[i].state = newState;
+}
+
+
+/* check if an element is a valid entry. We do NOT verify if the
+ * element index is valid. -- rgerhards, 2010-06-10
+ */
+static inline int
+batchIsValidElem(batch_t *pBatch, int i) {
+ return(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC);
+}
+
+
+/* copy one batch element to another.
+ * This creates a complete duplicate in those cases where
+ * it is needed. Use duplication only when absolutely necessary!
+ * rgerhards, 2010-06-10
+ */
+static inline void
+batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) {
+ memcpy(pDest, pSrc, sizeof(batch_obj_t));
+}
+
+
+/* free members of a batch "object". Note that we can not do the usual
+ * destruction as the object typically is allocated on the stack and so the
+ * object itself cannot be freed! -- rgerhards, 2010-06-15
+ */
+static inline void
+batchFree(batch_t *pBatch) {
+ int i;
+ int j;
+ for(i = 0 ; i < pBatch->maxElem ; ++i) {
+ for(j = 0 ; j < CONF_OMOD_NUMSTRINGS_BUFSIZE ; ++j) {
+ free(pBatch->pElem[i].staticActParams[j]);
+ }
+ }
+ free(pBatch->pElem);
+}
+
+
+/* initialiaze a batch "object". The record must already exist,
+ * we "just" initialize it. The max number of elements must be
+ * provided. -- rgerhards, 2010-06-15
+ */
+static inline rsRetVal
+batchInit(batch_t *pBatch, int maxElem) {
+ DEFiRet;
+ pBatch->maxElem = maxElem;
+ CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
+ // TODO: replace calloc by inidividual writes?
+finalize_it:
+ RETiRet;
+}
#endif /* #ifndef BATCH_H_INCLUDED */
diff --git a/runtime/queue.c b/runtime/queue.c
index d437d590..5e9c67ca 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -841,6 +841,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
batch_obj_t batchObj;
DEFiRet;
+ //TODO: init batchObj (states _OK and new fields -- CHECK)
ASSERT(pThis != NULL);
/* calling the consumer is quite different here than it is from a worker thread */
@@ -861,6 +862,26 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
RETiRet;
}
+/*** EXPERIMENTAL ***/
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ /* calling the consumer is quite different here than it is from a worker thread */
+ /* we need to provide the consumer's return value back to the caller because in direct
+ * mode the consumer probably has a lot to convey (which get's lost in the other modes
+ * because they are asynchronous. But direct mode is deliberately synchronous.
+ * rgerhards, 2008-02-12
+ * We use our knowledge about the batch_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
+ */
+ iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate);
+
+ RETiRet;
+}
+
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -1364,10 +1385,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
-dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem);
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
+dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state);
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*) pUsr));
++nEnqueued;
@@ -1385,7 +1406,7 @@ dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem);
iRet = DeleteBatchFromQStore(pThis, pBatch);
- pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
+ pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14
RETiRet;
}
@@ -1430,6 +1451,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
/* all well, use this element */
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance
++nDequeued;
}
@@ -2273,6 +2295,21 @@ finalize_it:
/* ------------------------------ END multi-enqueue functions ------------------------------ */
+/* enqueue a new user data element in direct mode
+ * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better
+ * code later on (like multi submit!) 2010-06-10
+ * Enqueues the new element and awakes worker thread.
+ */
+rsRetVal
+qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ iRet = qAddDirect(pThis, pUsr);
+ RETiRet;
+}
+
+
/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/
diff --git a/runtime/queue.h b/runtime/queue.h
index 33b21c9a..1c758134 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -177,12 +177,14 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
+rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr);
rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
diff --git a/runtime/rule.c b/runtime/rule.c
index c28e15c9..453e631d 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -108,16 +108,19 @@ DEFFUNC_llExecFunc(processBatchDoActions)
}
#endif
- // NEW (potentially): iRetMod = actionSubmit(pAction, (batch_t*) pParam);
+#if 1
+ // NEW (potentially):
+ iRetMod = doSubmitToActionQBatch(pAction, (batch_t*) pParam);
+#else
// old code -- milestone check
-dbgprintf("ZZZ: inside processBatchDoActions, begin processing (nElem=%d)\n", batchNumMsgs(pBatch));
int i;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
-dbgprintf("ZZZ: inside processBatchDoActions, processind elem %d/%d\n", i, batchNumMsgs(pBatch));
+dbgprintf("ZZZ: inside processBatchDoActions, processing elem %d/%d\n", i, batchNumMsgs(pBatch));
if(pBatch->pElem[i].bFilterOK) {
iRetMod = pAction->submitToActQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
}
}
+#endif
//end old code
#if 0 // TODO: this must be done inside the action as well!
if(iRetMod == RS_RET_DISCARDMSG) {
@@ -297,7 +300,7 @@ processBatch(rule_t *pThis, batch_t *pBatch)
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
- assert(pMsg != NULL);
+ assert(pBatch != NULL);
/* first check the filters... */
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index caeb9357..31c2e1a7 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -60,6 +60,9 @@ linkedList_t llRulesets; /* this is NOT a pointer - no typo here ;) */
ruleset_t *pCurrRuleset = NULL; /* currently "active" ruleset */
ruleset_t *pDfltRuleset = NULL; /* current default ruleset, e.g. for binding to actions which have no other */
+/* forward definitions */
+static rsRetVal processBatch(batch_t *pBatch);
+
/* ---------- linked-list key handling functions ---------- */
/* destructor for linked list keys.
@@ -149,6 +152,69 @@ dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet);
}
+
+/* This function is similar to processBatch(), but works on a batch that
+ * contains rules from multiple rulesets. In this case, we can not push
+ * the whole batch through the ruleset. Instead, we examine it and
+ * partition it into sub-rulesets which we then push through the system.
+ * Note that when we evaluate which message must be processed, we do NOT need
+ * to look at bFilterOK, because this value is only set in a later processing
+ * stage. Doing so caused a bug during development ;)
+ * rgerhards, 2010-06-15
+ */
+static inline rsRetVal
+processBatchMultiRuleset(batch_t *pBatch)
+{
+ ruleset_t *currRuleset;
+ batch_t snglRuleBatch;
+ int i;
+ int iStart; /* start index of partial batch */
+ int iNew; /* index for new (temporary) batch */
+ DEFiRet;
+
+ CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
+ snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
+
+dbgprintf("ZZZ: multi-ruleset batch of %d elements must be processed\n", pBatch->nElem);
+ while(1) { /* loop broken inside */
+ /* search for first unprocessed element */
+ for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart)
+ /* just search, no action */;
+
+ if(iStart == pBatch->nElem)
+ FINALIZE; /* everything processed */
+
+ /* prepare temporary batch */
+ currRuleset = batchElemGetRuleset(pBatch, iStart);
+ iNew = 0;
+ for(i = iStart ; i < pBatch->nElem ; ++i) {
+ if(batchElemGetRuleset(pBatch, i) == currRuleset) {
+dbgprintf("ZZZ: proc elem %d:'%s'\n", i, ((msg_t*)(pBatch->pElem[i].pUsrp))->szRawMsg+15);
+ batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i]));
+ /* We indicate the element also as done, so it will not be processed again */
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ }
+ }
+ snglRuleBatch.nElem = iNew; /* was left just right by the for loop */
+ batchSetSingleRuleset(&snglRuleBatch, 1);
+ /* process temp batch */
+ processBatch(&snglRuleBatch);
+
+#if 0
+for(i = iStart ; i < pBatch->nElem ; ++i) {
+ dbgprintf("ZZZ: after partial execution item %d state %d\n", i, pBatch->pElem[i].state);
+}
+//dbgprintf("ZZZ: search item %d: state %d, bFilterOK %d, IsValid %d, msg:%s\n",
+//iStart, pBatch->pElem[iStart].state, pBatch->pElem[iStart].bFilterOK, batchIsValidElem(pBatch, iStart),
+//((msg_t*)(pBatch->pElem[iStart].pUsrp))->szRawMsg+40);
+#endif
+ }
+ batchFree(&snglRuleBatch);
+
+finalize_it:
+ RETiRet;
+}
+
/* Process (consume) a batch of messages. Calls the actions configured.
* If the whole batch uses a singel ruleset, we can process the batch as
* a whole. Otherwise, we need to process it slower, on a message-by-message
@@ -162,6 +228,7 @@ processBatch(batch_t *pBatch)
DEFiRet;
assert(pBatch != NULL);
+dbgprintf("ZZZ: processBatch: batch of %d elements must be processed\n", pBatch->nElem);
if(pBatch->bSingleRuleset) {
pThis = batchGetRuleset(pBatch);
if(pThis == NULL)
@@ -169,13 +236,7 @@ processBatch(batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, ruleset);
CHKiRet(llExecFunc(&pThis->llRules, processBatchDoRules, pBatch));
} else {
- #warning implementation missing!
- /* we need to split of the batch according to rulesets used */
- // TODO: do this at the deque level, much more performant!
- assert(0); // TODO mandatory to implement!
- dbgprintf("processbatch missing implementation, terminating!\n");
- printf("processBatch missing implementation, terminating!\n");
- exit(0);
+ CHKiRet(processBatchMultiRuleset(pBatch));
}
finalize_it:
diff --git a/runtime/wti.c b/runtime/wti.c
index 307f1af1..9343f5c5 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -172,7 +172,7 @@ wtiCancelThrd(wti_t *pThis)
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
/* actual destruction */
- free(pThis->batch.pElem);
+ batchFree(&pThis->batch);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
free(pThis->pszDbgHdr);
@@ -204,7 +204,7 @@ wtiConstructFinalize(wti_t *pThis)
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
- CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t)));
+ CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
finalize_it:
RETiRet;
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 32765ae4..ba0bb7ba 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -34,8 +34,8 @@ TESTS = $(TESTRUNS) cfg.sh \
complex1.sh \
queue-persist.sh \
pipeaction.sh \
- pipe_noreader.sh \
execonlyonce.sh \
+ pipe_noreader.sh \
dircreate_dflt.sh \
dircreate_off.sh \
queue-persist.sh
diff --git a/tests/diag.sh b/tests/diag.sh
index 5b74a6dc..efbf3315 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -8,10 +8,10 @@
#valgrind="valgrind --malloc-fill=ff --free-fill=fe --log-fd=1"
#valgrind="valgrind --tool=drd --log-fd=1"
#valgrind="valgrind --tool=helgrind --log-fd=1"
-#valgrind="valgrind --tool=exp-ptrcheck --log-fd=1"
+valgrind="valgrind --tool=exp-ptrcheck --log-fd=1"
#set -o xtrace
-#export RSYSLOG_DEBUG="debug nostdout"
-#export RSYSLOG_DEBUGLOG="log"
+export RSYSLOG_DEBUG="debug nostdout"
+export RSYSLOG_DEBUGLOG="log"
case $1 in
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
cp $srcdir/testsuites/diag-common.conf diag-common.conf
diff --git a/tests/tcpflood.c b/tests/tcpflood.c
index f93a87a2..9ed2dac9 100644
--- a/tests/tcpflood.c
+++ b/tests/tcpflood.c
@@ -5,6 +5,7 @@
* -t target address (default 127.0.0.1)
* -p target port (default 13514)
* -n number of target ports (targets are in range -p..(-p+-n-1)
+ * Note -c must also be set to at LEAST the number of -n!
* -c number of connections (default 1)
* -m number of messages to send (connection is random)
* -i initial message number (optional)
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 46587a27..9b7b77ab 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -622,17 +622,6 @@ chkMsgAgainstACL() {
#endif
-/* consumes a single messages - this function is primarily used to shuffle
- * out some code from msgConsumer(). After this function, the message is
- * (by definition!) considered committed.
- * rgerhards, 2009-11-16
- */
-///static inline rsRetVal
-///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
- ///DEFiRet;
- //////RETiRet;
-///}
-
/* preprocess a batch of messages, that is ready them for actual processing. This is done
* as a first stage and totally in parallel to any other worker active in the system. So
* it helps us keep up the overall concurrency level.
@@ -708,8 +697,8 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu
assert(pBatch != NULL);
pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */
preprocessBatch(pBatch);
+//pBatch->bSingleRuleset = 0; // TODO: testing aid, remove!!!!
ruleset.ProcessBatch(pBatch);
-dbgprintf("ZZZ: back in msgConsumer\n");
//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
//do not have this yet and so we emulate -- 2010-06-10
int i;