summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c117
-rw-r--r--action.h6
-rw-r--r--doc/msgflow.txt6
-rw-r--r--runtime/atomic.h2
-rw-r--r--runtime/queue.c55
-rw-r--r--runtime/queue.h4
-rw-r--r--runtime/rule.c3
-rw-r--r--tools/syslogd.c74
8 files changed, 162 insertions, 105 deletions
diff --git a/action.c b/action.c
index 0e42b880..89cd00b2 100644
--- a/action.c
+++ b/action.c
@@ -46,11 +46,15 @@
#include "wti.h"
#include "datetime.h"
#include "unicode-helper.h"
+#include "atomic.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
+static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
+static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg);
+static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis)
pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
pThis->iSecsExecOnceInterval
);
- pThis->bSubmitFirehoseMode = 0;
+ pThis->submitToActQ = actionCallAction;
+ } else if(pThis->bWriteAllMarkMsgs == FALSE) {
+ /* nearly full-speed submission mode, default case */
+ pThis->submitToActQ = doSubmitToActionQNotAllMark;
} else {
- pThis->bSubmitFirehoseMode = 1;
+ /* full firehose submission mode */
+ pThis->submitToActQ = doSubmitToActionQ;
}
/* we need to make a safety check: if the queue is NOT in direct mode, a single
@@ -644,6 +652,7 @@ finalize_it:
rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
+ char *sz;
dbgprintf("%s: ", module.GetStateName(pThis->pMod));
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
@@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
- dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
+ if(pThis->submitToActQ == actionCallAction) {
+ sz = "slow, but feature-rich";
+ } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) {
+ sz = "fast, but supports partial mark messages";
+ } else if(pThis->submitToActQ == doSubmitToActionQ) {
+ sz = "firehose (fastest)";
+ } else {
+ sz = "unknown (need to update debug display?)";
+ }
+ dbgprintf("\tsubmission mode: %s\n", sz);
dbgprintf("\n");
RETiRet;
@@ -1122,20 +1140,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
}
-/* rgerhards 2004-11-09: fprintlog() is the actual driver for
- * the output channel. It receives the channel description (f) as
- * well as the message and outputs them according to the channel
- * semantics. The message is typically already contained in the
- * channel save buffer (f->f_prevline). This is not only the case
- * when a message was already repeated but also when a new message
- * arrived.
- * rgerhards 2007-08-01: interface changed to use action_t
- * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE
- * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do
- * not do this here. Failing to do so results in all kinds of
- * "interesting" problems!
- * RGERHARDS, 2008-01-29:
- * This is now the action caller and has been renamed.
+/* This function builds up a batch of messages to be (later)
+ * submitted to the action queue.
*/
rsRetVal
actionWriteToAction(action_t *pAction)
@@ -1317,33 +1323,51 @@ finalize_it:
}
+/* This submits the message to the action queue in case where we need to handle
+ * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
+ * for the synchronization.
+ * rgerhards, 2010-06-08
+ */
+static rsRetVal
+doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+ time_t now;
+ time_t lastAct;
+
+ if(pMsg->msgFlags & MARK) {
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if((now - lastAct) < MarkInterval / 2) {
+ DBGPRINTF("file was recently written, ignoring mark message\n");
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+ } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0);
+ }
+
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+finalize_it:
+ RETiRet;
+}
+
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed.
* rgerhards, 2010-06-08
*/
-static inline rsRetVal
+static rsRetVal
doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
-#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
- /* don't output marks to recently written outputs */
- if(pAction->bWriteAllMarkMsgs == FALSE
- && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
- ABORT_FINALIZE(RS_RET_OK);
- }
-#endif
-
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
-
-#if 0 // we would need this for bWriteAllMarkMsgs
- /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
- pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- pAction->f_time = pAction->f_pMsg->ttGenTime;
-
-#endif
iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
RETiRet;
@@ -1351,15 +1375,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
-/* call the configured action. Does all necessary housekeeping.
- * rgerhards, 2007-08-01
- * FYI: currently, this function is only called from the queue
- * consumer. So we (conceptually) run detached from the input
- * threads (which also means we may run much later than when the
- * message was generated).
+/* Call configured action, most complex case with all features supported (and thus slow).
+ * rgerhards, 2010-06-08
*/
#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
+static rsRetVal
actionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
@@ -1367,18 +1387,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
ISOBJ_TYPE_assert(pMsg, msg);
ASSERT(pAction != NULL);
- /* We need to lock the mutex only for repeated line processing.
- * rgerhards, 2009-06-19
- */
- if(pAction->bSubmitFirehoseMode == 1) {
- iRet = doSubmitToActionQ(pAction, pMsg);
- } else {
- LockObj(pAction);
- pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
- iRet = doActionCallAction(pAction, pMsg);
- UnlockObj(pAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- }
+ LockObj(pAction);
+ pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ iRet = doActionCallAction(pAction, pMsg);
+ UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
RETiRet;
}
diff --git a/action.h b/action.h
index 43e6ae7d..bf9ceafa 100644
--- a/action.h
+++ b/action.h
@@ -47,6 +47,7 @@ typedef enum {
/* the following struct defines the action object data structure
*/
+typedef struct action_s action_t;
struct action_s {
time_t f_time; /* used for "message repeated n times" - be careful, old, old code */
time_t tActNow; /* the current time for an action execution. Initially set to -1 and
@@ -69,10 +70,11 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
- sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */
short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
int f_prevcount; /* repetition cnt of prevline */
int f_repeatcount; /* number of "repeated" msgs */
+ rsRetVal (*submitToActQ)(action_t *, msg_t *); /* function submit message to action queue */
+ rsRetVal (*qConstruct)(struct queue_s *pThis);
enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING }
eParamPassing; /* mode of parameter passing to action */
int iNumTpls; /* number of array entries for template element below */
@@ -90,7 +92,6 @@ struct action_s {
void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
size_t *lenMsgs; /* length of message in ppMsgs */
};
-typedef struct action_s action_t;
/* function prototypes
@@ -101,7 +102,6 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
-rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
diff --git a/doc/msgflow.txt b/doc/msgflow.txt
index c1c440ef..b53ba7e7 100644
--- a/doc/msgflow.txt
+++ b/doc/msgflow.txt
@@ -18,11 +18,11 @@ syslogd.c/msgConsumeOne
parser.ParseMsg
ruleset.ProcessMsg (loops through ruleset)
ruleset.c/processMsgDoRules (for each rule in ruleset)
-rule.c/ProcessMsg
-rule.c/shouldProcessThisMessage
+rule.c/processMsg
+1:rule.c/shouldProcessThisMessage
(evaluates filters, optimize via ALL-Filter)
if to be processed, loop through associated actions ->
-rule.c/processMsgsDoAction
+2:rule.c/processMsgsDoAction
action.c/actionCallAction (LOCKs action object!)
action.c/doActionCallAction (does duplicate message reduction)
action.c/actionWriteToAction
diff --git a/runtime/atomic.h b/runtime/atomic.h
index e5fafe04..da0852fa 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -50,7 +50,7 @@
# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
-# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
+# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal))
# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
/* functions below are not needed if we have atomics */
diff --git a/runtime/queue.c b/runtime/queue.c
index b6c30278..d437d590 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
+static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -1203,6 +1205,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
@@ -1210,6 +1213,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddLinkedList;
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1217,6 +1221,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@@ -1225,6 +1230,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
break;
}
@@ -1709,7 +1715,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
-dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
@@ -2149,7 +2154,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
-//dbgCallStackPrintAll();
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2209,6 +2213,7 @@ finalize_it:
RETiRet;
}
+/* ------------------------------ multi-enqueue functions ------------------------------ */
/* enqueue multiple user data elements at once. The aim is to provide a faster interface
* for object submission. Uses the multi_submit_t helper object.
* Please note that this function is not cancel-safe and consequently
@@ -2216,9 +2221,12 @@ finalize_it:
* during its execution. If that is not done, race conditions occur if the
* thread is canceled (most important use case is input module termination).
* rgerhards, 2009-06-16
+ * Note: there now exists multiple different functions implementing specially
+ * optimized algorithms for different config cases. -- rgerhards, 2010-06-09
*/
-rsRetVal
-qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
+/* now the function for all modes but direct */
+static rsRetVal
+qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int iCancelStateSave;
int i;
@@ -2227,29 +2235,42 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pMultiSub != NULL);
- if(pThis->qType != QUEUETYPE_DIRECT) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
- }
-
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
}
-
qqueueChkPersist(pThis, pMultiSub->nElem);
finalize_it:
- if(pThis->qType != QUEUETYPE_DIRECT) {
- /* make sure at least one worker is running. */
- qqueueAdviseMaxWorkers(pThis);
- /* and release the mutex */
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+ /* make sure at least one worker is running. */
+ qqueueAdviseMaxWorkers(pThis);
+ /* and release the mutex */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+
+ RETiRet;
+}
+
+/* now, the same function, but for direct mode */
+static rsRetVal
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
+{
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pMultiSub != NULL);
+
+ for(i = 0 ; i < pMultiSub->nElem ; ++i) {
+ CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
}
+finalize_it:
RETiRet;
}
+/* ------------------------------ END multi-enqueue functions ------------------------------ */
/* enqueue a new user data element
diff --git a/runtime/queue.h b/runtime/queue.h
index 8ede6922..33b21c9a 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -114,6 +114,9 @@ struct queue_s {
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
rsRetVal (*qDel)(struct queue_s *pThis);
/* end type-specific handler */
+ /* public entry points (set during construction, permit to set best algorithm for params selected) */
+ rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub);
+ /* end public entry points */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
@@ -174,7 +177,6 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub);
rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
diff --git a/runtime/rule.c b/runtime/rule.c
index 65ad071e..3b98d7d1 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -110,7 +110,7 @@ DEFFUNC_llExecFunc(processMsgDoActions)
ABORT_FINALIZE(RS_RET_OK);
}
- iRetMod = actionCallAction(pAction, pDoActData->pMsg);
+ iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg);
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {
@@ -166,7 +166,6 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
}
}
-RUNLOG_VAR("%p", pRule->pCSProgNameComp);
if(pRule->pCSProgNameComp != NULL) {
int bInv = 0, bEqv = 0, offset = 0;
if(*(rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp)) == '-') {
diff --git a/tools/syslogd.c b/tools/syslogd.c
index dfbd184b..2c36e6c2 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -627,39 +627,66 @@ chkMsgAgainstACL() {
* (by definition!) considered committed.
* rgerhards, 2009-11-16
*/
+///static inline rsRetVal
+///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
+ ///DEFiRet;
+ //////RETiRet;
+///}
+
+/* preprocess a batch of messages, that is ready them for actual processing. This is done
+ * as a first stage and totally in parallel to any other worker active in the system. So
+ * it helps us keep up the overall concurrency level.
+ * rgerhards, 2010-06-09
+ */
static inline rsRetVal
-msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
+preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
+ prop_t *propFromHost = NULL;
+ prop_t *propFromHostIP = NULL;
int bIsPermitted;
+ msg_t *pMsg;
+ int i;
+ rsRetVal localRet;
DEFiRet;
- if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
- dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
- CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP));
- bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
- (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
- if(!bIsPermitted) {
- DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
- fromHostFQDN);
- ABORT_FINALIZE(RS_RET_ERR);
- /* save some of the info we obtained */
- MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost);
- CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP));
- pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
+ for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
+ DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
+ if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK)
+ continue;
+ bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
+ (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
+ if(!bIsPermitted) {
+ DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
+ fromHostFQDN);
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ } else {
+ /* save some of the info we obtained */
+ MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
+ CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
+ pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
+ }
+ }
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ pBatch->pElem[i].state = BATCH_STATE_DISC;
+ }
}
}
- if((pMsg->msgFlags & NEEDS_PARSING) != 0)
- CHKiRet(parser.ParseMsg(pMsg));
- ruleset.ProcessMsg(pMsg);
finalize_it:
+ if(propFromHost != NULL)
+ prop.Destruct(&propFromHost);
+ if(propFromHostIP != NULL)
+ prop.Destruct(&propFromHostIP);
RETiRet;
}
-
/* The consumer of dequeued messages. This function is called by the
* queue engine on dequeueing of a message. It runs on a SEPARATE
* THREAD. It receives an array of pointers, which it must iterate
@@ -670,22 +697,17 @@ static rsRetVal
msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
{
int i;
- prop_t *propFromHost = NULL;
- prop_t *propFromHostIP = NULL;
DEFiRet;
assert(pBatch != NULL);
+ preprocessBatch(pBatch, pbShutdownImmediate);
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem);
- msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP);
+ ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp);
pBatch->pElem[i].state = BATCH_STATE_COMM;
}
- if(propFromHost != NULL)
- prop.Destruct(&propFromHost);
- if(propFromHostIP != NULL)
- prop.Destruct(&propFromHostIP);
RETiRet;
}
@@ -735,7 +757,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
pRuleset = MsgGetRuleset(pMultiSub->ppMsgs[0]);
pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset);
- iRet = qqueueMultiEnqObj(pQueue, pMultiSub);
+ iRet = pQueue->MultiEnq(pQueue, pMultiSub);
pMultiSub->nElem = 0;
finalize_it: