diff options
-rw-r--r-- | action.c | 117 | ||||
-rw-r--r-- | action.h | 6 | ||||
-rw-r--r-- | doc/msgflow.txt | 6 | ||||
-rw-r--r-- | runtime/atomic.h | 2 | ||||
-rw-r--r-- | runtime/queue.c | 55 | ||||
-rw-r--r-- | runtime/queue.h | 4 | ||||
-rw-r--r-- | runtime/rule.c | 3 | ||||
-rw-r--r-- | tools/syslogd.c | 74 |
8 files changed, 162 insertions, 105 deletions
@@ -46,11 +46,15 @@ #include "wti.h" #include "datetime.h" #include "unicode-helper.h" +#include "atomic.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); +static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis) pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, pThis->iSecsExecOnceInterval ); - pThis->bSubmitFirehoseMode = 0; + pThis->submitToActQ = actionCallAction; + } else if(pThis->bWriteAllMarkMsgs == FALSE) { + /* nearly full-speed submission mode, default case */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; } else { - pThis->bSubmitFirehoseMode = 1; + /* full firehose submission mode */ + pThis->submitToActQ = doSubmitToActionQ; } /* we need to make a safety check: if the queue is NOT in direct mode, a single @@ -644,6 +652,7 @@ finalize_it: rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; + char *sz; dbgprintf("%s: ", module.GetStateName(pThis->pMod)); pThis->pMod->dbgPrintInstInfo(pThis->pModData); @@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); - dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode); + if(pThis->submitToActQ == actionCallAction) { + sz = "slow, but feature-rich"; + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) { + sz = "fast, but supports partial mark messages"; + } else if(pThis->submitToActQ == doSubmitToActionQ) { + sz = "firehose (fastest)"; + } else { + sz = "unknown (need to update debug display?)"; + } + dbgprintf("\tsubmission mode: %s\n", sz); dbgprintf("\n"); RETiRet; @@ -1122,20 +1140,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } -/* rgerhards 2004-11-09: fprintlog() is the actual driver for - * the output channel. It receives the channel description (f) as - * well as the message and outputs them according to the channel - * semantics. The message is typically already contained in the - * channel save buffer (f->f_prevline). This is not only the case - * when a message was already repeated but also when a new message - * arrived. - * rgerhards 2007-08-01: interface changed to use action_t - * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE - * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do - * not do this here. Failing to do so results in all kinds of - * "interesting" problems! - * RGERHARDS, 2008-01-29: - * This is now the action caller and has been renamed. +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. */ rsRetVal actionWriteToAction(action_t *pAction) @@ -1317,33 +1323,51 @@ finalize_it: } +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + time_t now; + time_t lastAct; + + if(pMsg->msgFlags & MARK) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if((now - lastAct) < MarkInterval / 2) { + DBGPRINTF("file was recently written, ignoring mark message\n"); + ABORT_FINALIZE(RS_RET_OK); + } + } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0); + } + + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + +finalize_it: + RETiRet; +} + /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing * and thus speed. * rgerhards, 2010-06-08 */ -static inline rsRetVal +static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; -#if 0 // TODO: we need to care about this -- after PoC 2010-06-08 - /* don't output marks to recently written outputs */ - if(pAction->bWriteAllMarkMsgs == FALSE - && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { - ABORT_FINALIZE(RS_RET_OK); - } -#endif - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - -#if 0 // we would need this for bWriteAllMarkMsgs - /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ - pAction->tLastExec = getActNow(pAction); /* re-init time flags */ - pAction->f_time = pAction->f_pMsg->ttGenTime; - -#endif iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); RETiRet; @@ -1351,15 +1375,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - * FYI: currently, this function is only called from the queue - * consumer. So we (conceptually) run detached from the input - * threads (which also means we may run much later than when the - * message was generated). +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal +static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; @@ -1367,18 +1387,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg) ISOBJ_TYPE_assert(pMsg, msg); ASSERT(pAction != NULL); - /* We need to lock the mutex only for repeated line processing. - * rgerhards, 2009-06-19 - */ - if(pAction->bSubmitFirehoseMode == 1) { - iRet = doSubmitToActionQ(pAction, pMsg); - } else { - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - iRet = doActionCallAction(pAction, pMsg); - UnlockObj(pAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - } + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + iRet = doActionCallAction(pAction, pMsg); + UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } @@ -47,6 +47,7 @@ typedef enum { /* the following struct defines the action object data structure */ +typedef struct action_s action_t; struct action_s { time_t f_time; /* used for "message repeated n times" - be careful, old, old code */ time_t tActNow; /* the current time for an action execution. Initially set to -1 and @@ -69,10 +70,11 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */ short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */ int f_prevcount; /* repetition cnt of prevline */ int f_repeatcount; /* number of "repeated" msgs */ + rsRetVal (*submitToActQ)(action_t *, msg_t *); /* function submit message to action queue */ + rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING } eParamPassing; /* mode of parameter passing to action */ int iNumTpls; /* number of array entries for template element below */ @@ -90,7 +92,6 @@ struct action_s { void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ size_t *lenMsgs; /* length of message in ppMsgs */ }; -typedef struct action_s action_t; /* function prototypes @@ -101,7 +102,6 @@ rsRetVal actionDestruct(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); -rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); rsRetVal actionWriteToAction(action_t *pAction); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); diff --git a/doc/msgflow.txt b/doc/msgflow.txt index c1c440ef..b53ba7e7 100644 --- a/doc/msgflow.txt +++ b/doc/msgflow.txt @@ -18,11 +18,11 @@ syslogd.c/msgConsumeOne parser.ParseMsg ruleset.ProcessMsg (loops through ruleset) ruleset.c/processMsgDoRules (for each rule in ruleset) -rule.c/ProcessMsg -rule.c/shouldProcessThisMessage +rule.c/processMsg +1:rule.c/shouldProcessThisMessage (evaluates filters, optimize via ALL-Filter) if to be processed, loop through associated actions -> -rule.c/processMsgsDoAction +2:rule.c/processMsgsDoAction action.c/actionCallAction (LOCKs action object!) action.c/doActionCallAction (does duplicate message reduction) action.c/actionWriteToAction diff --git a/runtime/atomic.h b/runtime/atomic.h index e5fafe04..da0852fa 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -50,7 +50,7 @@ # define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0) # define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1) # define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) -# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); +# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal)) # define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal)); /* functions below are not needed if we have atomics */ diff --git a/runtime/queue.c b/runtime/queue.c index b6c30278..d437d590 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -1203,6 +1205,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1210,6 +1213,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1217,6 +1221,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1225,6 +1230,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; break; } @@ -1709,7 +1715,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ -dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ @@ -2149,7 +2154,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); -//dbgCallStackPrintAll(); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2209,6 +2213,7 @@ finalize_it: RETiRet; } +/* ------------------------------ multi-enqueue functions ------------------------------ */ /* enqueue multiple user data elements at once. The aim is to provide a faster interface * for object submission. Uses the multi_submit_t helper object. * Please note that this function is not cancel-safe and consequently @@ -2216,9 +2221,12 @@ finalize_it: * during its execution. If that is not done, race conditions occur if the * thread is canceled (most important use case is input module termination). * rgerhards, 2009-06-16 + * Note: there now exists multiple different functions implementing specially + * optimized algorithms for different config cases. -- rgerhards, 2010-06-09 */ -rsRetVal -qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) +/* now the function for all modes but direct */ +static rsRetVal +qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int iCancelStateSave; int i; @@ -2227,29 +2235,42 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) ISOBJ_TYPE_assert(pThis, qqueue); assert(pMultiSub != NULL); - if(pThis->qType != QUEUETYPE_DIRECT) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); - } - + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); for(i = 0 ; i < pMultiSub->nElem ; ++i) { CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } - qqueueChkPersist(pThis, pMultiSub->nElem); finalize_it: - if(pThis->qType != QUEUETYPE_DIRECT) { - /* make sure at least one worker is running. */ - qqueueAdviseMaxWorkers(pThis); - /* and release the mutex */ - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + /* make sure at least one worker is running. */ + qqueueAdviseMaxWorkers(pThis); + /* and release the mutex */ + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + + RETiRet; +} + +/* now, the same function, but for direct mode */ +static rsRetVal +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +{ + int i; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pMultiSub != NULL); + + for(i = 0 ; i < pMultiSub->nElem ; ++i) { + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); } +finalize_it: RETiRet; } +/* ------------------------------ END multi-enqueue functions ------------------------------ */ /* enqueue a new user data element diff --git a/runtime/queue.h b/runtime/queue.h index 8ede6922..33b21c9a 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -114,6 +114,9 @@ struct queue_s { rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); /* end type-specific handler */ + /* public entry points (set during construction, permit to set best algorithm for params selected) */ + rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub); + /* end public entry points */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ @@ -174,7 +177,6 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub); rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); diff --git a/runtime/rule.c b/runtime/rule.c index 65ad071e..3b98d7d1 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -110,7 +110,7 @@ DEFFUNC_llExecFunc(processMsgDoActions) ABORT_FINALIZE(RS_RET_OK); } - iRetMod = actionCallAction(pAction, pDoActData->pMsg); + iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg); if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); } else if(iRetMod == RS_RET_SUSPENDED) { @@ -166,7 +166,6 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) } } -RUNLOG_VAR("%p", pRule->pCSProgNameComp); if(pRule->pCSProgNameComp != NULL) { int bInv = 0, bEqv = 0, offset = 0; if(*(rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp)) == '-') { diff --git a/tools/syslogd.c b/tools/syslogd.c index dfbd184b..2c36e6c2 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -627,39 +627,66 @@ chkMsgAgainstACL() { * (by definition!) considered committed. * rgerhards, 2009-11-16 */ +///static inline rsRetVal +///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { + ///DEFiRet; + //////RETiRet; +///} + +/* preprocess a batch of messages, that is ready them for actual processing. This is done + * as a first stage and totally in parallel to any other worker active in the system. So + * it helps us keep up the overall concurrency level. + * rgerhards, 2010-06-09 + */ static inline rsRetVal -msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { +preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) { uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; + prop_t *propFromHost = NULL; + prop_t *propFromHostIP = NULL; int bIsPermitted; + msg_t *pMsg; + int i; + rsRetVal localRet; DEFiRet; - if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { - dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); - CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP)); - bIsPermitted = net.isAllowedSender2((uchar*)"UDP", - (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1); - if(!bIsPermitted) { - DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", - fromHostFQDN); - ABORT_FINALIZE(RS_RET_ERR); - /* save some of the info we obtained */ - MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost); - CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP)); - pMsg->msgFlags &= ~NEEDS_ACLCHK_U; + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { + DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); + if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK) + continue; + bIsPermitted = net.isAllowedSender2((uchar*)"UDP", + (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1); + if(!bIsPermitted) { + DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", + fromHostFQDN); + pBatch->pElem[i].state = BATCH_STATE_DISC; + } else { + /* save some of the info we obtained */ + MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); + CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); + pMsg->msgFlags &= ~NEEDS_ACLCHK_U; + } + } + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + pBatch->pElem[i].state = BATCH_STATE_DISC; + } } } - if((pMsg->msgFlags & NEEDS_PARSING) != 0) - CHKiRet(parser.ParseMsg(pMsg)); - ruleset.ProcessMsg(pMsg); finalize_it: + if(propFromHost != NULL) + prop.Destruct(&propFromHost); + if(propFromHostIP != NULL) + prop.Destruct(&propFromHostIP); RETiRet; } - /* The consumer of dequeued messages. This function is called by the * queue engine on dequeueing of a message. It runs on a SEPARATE * THREAD. It receives an array of pointers, which it must iterate @@ -670,22 +697,17 @@ static rsRetVal msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate) { int i; - prop_t *propFromHost = NULL; - prop_t *propFromHostIP = NULL; DEFiRet; assert(pBatch != NULL); + preprocessBatch(pBatch, pbShutdownImmediate); for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem); - msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP); + ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp); pBatch->pElem[i].state = BATCH_STATE_COMM; } - if(propFromHost != NULL) - prop.Destruct(&propFromHost); - if(propFromHostIP != NULL) - prop.Destruct(&propFromHostIP); RETiRet; } @@ -735,7 +757,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub) pRuleset = MsgGetRuleset(pMultiSub->ppMsgs[0]); pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset); - iRet = qqueueMultiEnqObj(pQueue, pMultiSub); + iRet = pQueue->MultiEnq(pQueue, pMultiSub); pMultiSub->nElem = 0; finalize_it: |