diff options
-rw-r--r-- | action.c | 101 | ||||
-rw-r--r-- | action.h | 6 | ||||
-rw-r--r-- | runtime/atomic.h | 2 | ||||
-rw-r--r-- | runtime/rule.c | 2 |
4 files changed, 68 insertions, 43 deletions
@@ -46,11 +46,15 @@ #include "wti.h" #include "datetime.h" #include "unicode-helper.h" +#include "atomic.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); +static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis) pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, pThis->iSecsExecOnceInterval ); - pThis->bSubmitFirehoseMode = 0; + pThis->submitToActQ = actionCallAction; + } else if(pThis->bWriteAllMarkMsgs == FALSE) { + /* nearly full-speed submission mode, default case */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; } else { - pThis->bSubmitFirehoseMode = 1; + /* full firehose submission mode */ + pThis->submitToActQ = doSubmitToActionQ; } /* we need to make a safety check: if the queue is NOT in direct mode, a single @@ -644,6 +652,7 @@ finalize_it: rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; + char *sz; dbgprintf("%s: ", module.GetStateName(pThis->pMod)); pThis->pMod->dbgPrintInstInfo(pThis->pModData); @@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); - dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode); + if(pThis->submitToActQ == actionCallAction) { + sz = "slow, but feature-rich"; + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) { + sz = "fast, but supports partial mark messages"; + } else if(pThis->submitToActQ == doSubmitToActionQ) { + sz = "firehose (fastest)"; + } else { + sz = "unknown (need to update debug display?)"; + } + dbgprintf("\tsubmission mode: %s\n", sz); dbgprintf("\n"); RETiRet; @@ -1317,33 +1335,51 @@ finalize_it: } +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + time_t now; + time_t lastAct; + + if(pMsg->msgFlags & MARK) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if((now - lastAct) < MarkInterval / 2) { + DBGPRINTF("file was recently written, ignoring mark message\n"); + ABORT_FINALIZE(RS_RET_OK); + } + } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0); + } + + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + +finalize_it: + RETiRet; +} + /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing * and thus speed. * rgerhards, 2010-06-08 */ -static inline rsRetVal +static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; -#if 0 // TODO: we need to care about this -- after PoC 2010-06-08 - /* don't output marks to recently written outputs */ - if(pAction->bWriteAllMarkMsgs == FALSE - && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { - ABORT_FINALIZE(RS_RET_OK); - } -#endif - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - -#if 0 // we would need this for bWriteAllMarkMsgs - /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ - pAction->tLastExec = getActNow(pAction); /* re-init time flags */ - pAction->f_time = pAction->f_pMsg->ttGenTime; - -#endif iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); RETiRet; @@ -1351,15 +1387,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - * FYI: currently, this function is only called from the queue - * consumer. So we (conceptually) run detached from the input - * threads (which also means we may run much later than when the - * message was generated). +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal +static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; @@ -1367,18 +1399,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg) ISOBJ_TYPE_assert(pMsg, msg); ASSERT(pAction != NULL); - /* We need to lock the mutex only for repeated line processing. - * rgerhards, 2009-06-19 - */ - if(pAction->bSubmitFirehoseMode == 1) { - iRet = doSubmitToActionQ(pAction, pMsg); - } else { - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - iRet = doActionCallAction(pAction, pMsg); - UnlockObj(pAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - } + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + iRet = doActionCallAction(pAction, pMsg); + UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } @@ -47,6 +47,7 @@ typedef enum { /* the following struct defines the action object data structure */ +typedef struct action_s action_t; struct action_s { time_t f_time; /* used for "message repeated n times" - be careful, old, old code */ time_t tActNow; /* the current time for an action execution. Initially set to -1 and @@ -69,10 +70,11 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */ short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */ int f_prevcount; /* repetition cnt of prevline */ int f_repeatcount; /* number of "repeated" msgs */ + rsRetVal (*submitToActQ)(action_t *, msg_t *); /* function submit message to action queue */ + rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING } eParamPassing; /* mode of parameter passing to action */ int iNumTpls; /* number of array entries for template element below */ @@ -90,7 +92,6 @@ struct action_s { void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ size_t *lenMsgs; /* length of message in ppMsgs */ }; -typedef struct action_s action_t; /* function prototypes @@ -101,7 +102,6 @@ rsRetVal actionDestruct(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); -rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); rsRetVal actionWriteToAction(action_t *pAction); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); diff --git a/runtime/atomic.h b/runtime/atomic.h index e5fafe04..da0852fa 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -50,7 +50,7 @@ # define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0) # define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1) # define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) -# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); +# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal)) # define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal)); /* functions below are not needed if we have atomics */ diff --git a/runtime/rule.c b/runtime/rule.c index 65ad071e..7a26a03a 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -110,7 +110,7 @@ DEFFUNC_llExecFunc(processMsgDoActions) ABORT_FINALIZE(RS_RET_OK); } - iRetMod = actionCallAction(pAction, pDoActData->pMsg); + iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg); if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); } else if(iRetMod == RS_RET_SUSPENDED) { |