summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c101
-rw-r--r--action.h6
-rw-r--r--runtime/atomic.h2
-rw-r--r--runtime/rule.c2
4 files changed, 68 insertions, 43 deletions
diff --git a/action.c b/action.c
index b055ebf4..b8751c63 100644
--- a/action.c
+++ b/action.c
@@ -46,11 +46,15 @@
#include "wti.h"
#include "datetime.h"
#include "unicode-helper.h"
+#include "atomic.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
+static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
+static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg);
+static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis)
pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
pThis->iSecsExecOnceInterval
);
- pThis->bSubmitFirehoseMode = 0;
+ pThis->submitToActQ = actionCallAction;
+ } else if(pThis->bWriteAllMarkMsgs == FALSE) {
+ /* nearly full-speed submission mode, default case */
+ pThis->submitToActQ = doSubmitToActionQNotAllMark;
} else {
- pThis->bSubmitFirehoseMode = 1;
+ /* full firehose submission mode */
+ pThis->submitToActQ = doSubmitToActionQ;
}
/* we need to make a safety check: if the queue is NOT in direct mode, a single
@@ -644,6 +652,7 @@ finalize_it:
rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
+ char *sz;
dbgprintf("%s: ", module.GetStateName(pThis->pMod));
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
@@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
- dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
+ if(pThis->submitToActQ == actionCallAction) {
+ sz = "slow, but feature-rich";
+ } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) {
+ sz = "fast, but supports partial mark messages";
+ } else if(pThis->submitToActQ == doSubmitToActionQ) {
+ sz = "firehose (fastest)";
+ } else {
+ sz = "unknown (need to update debug display?)";
+ }
+ dbgprintf("\tsubmission mode: %s\n", sz);
dbgprintf("\n");
RETiRet;
@@ -1317,33 +1335,51 @@ finalize_it:
}
+/* This submits the message to the action queue in case where we need to handle
+ * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
+ * for the synchronization.
+ * rgerhards, 2010-06-08
+ */
+static rsRetVal
+doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+ time_t now;
+ time_t lastAct;
+
+ if(pMsg->msgFlags & MARK) {
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if((now - lastAct) < MarkInterval / 2) {
+ DBGPRINTF("file was recently written, ignoring mark message\n");
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+ } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0);
+ }
+
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+finalize_it:
+ RETiRet;
+}
+
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed.
* rgerhards, 2010-06-08
*/
-static inline rsRetVal
+static rsRetVal
doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
-#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
- /* don't output marks to recently written outputs */
- if(pAction->bWriteAllMarkMsgs == FALSE
- && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
- ABORT_FINALIZE(RS_RET_OK);
- }
-#endif
-
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
-
-#if 0 // we would need this for bWriteAllMarkMsgs
- /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
- pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- pAction->f_time = pAction->f_pMsg->ttGenTime;
-
-#endif
iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
RETiRet;
@@ -1351,15 +1387,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
-/* call the configured action. Does all necessary housekeeping.
- * rgerhards, 2007-08-01
- * FYI: currently, this function is only called from the queue
- * consumer. So we (conceptually) run detached from the input
- * threads (which also means we may run much later than when the
- * message was generated).
+/* Call configured action, most complex case with all features supported (and thus slow).
+ * rgerhards, 2010-06-08
*/
#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
+static rsRetVal
actionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
@@ -1367,18 +1399,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
ISOBJ_TYPE_assert(pMsg, msg);
ASSERT(pAction != NULL);
- /* We need to lock the mutex only for repeated line processing.
- * rgerhards, 2009-06-19
- */
- if(pAction->bSubmitFirehoseMode == 1) {
- iRet = doSubmitToActionQ(pAction, pMsg);
- } else {
- LockObj(pAction);
- pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
- iRet = doActionCallAction(pAction, pMsg);
- UnlockObj(pAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- }
+ LockObj(pAction);
+ pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ iRet = doActionCallAction(pAction, pMsg);
+ UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
RETiRet;
}
diff --git a/action.h b/action.h
index 43e6ae7d..bf9ceafa 100644
--- a/action.h
+++ b/action.h
@@ -47,6 +47,7 @@ typedef enum {
/* the following struct defines the action object data structure
*/
+typedef struct action_s action_t;
struct action_s {
time_t f_time; /* used for "message repeated n times" - be careful, old, old code */
time_t tActNow; /* the current time for an action execution. Initially set to -1 and
@@ -69,10 +70,11 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
- sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */
short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
int f_prevcount; /* repetition cnt of prevline */
int f_repeatcount; /* number of "repeated" msgs */
+ rsRetVal (*submitToActQ)(action_t *, msg_t *); /* function submit message to action queue */
+ rsRetVal (*qConstruct)(struct queue_s *pThis);
enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING }
eParamPassing; /* mode of parameter passing to action */
int iNumTpls; /* number of array entries for template element below */
@@ -90,7 +92,6 @@ struct action_s {
void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
size_t *lenMsgs; /* length of message in ppMsgs */
};
-typedef struct action_s action_t;
/* function prototypes
@@ -101,7 +102,6 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
-rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
diff --git a/runtime/atomic.h b/runtime/atomic.h
index e5fafe04..da0852fa 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -50,7 +50,7 @@
# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
-# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
+# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal))
# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
/* functions below are not needed if we have atomics */
diff --git a/runtime/rule.c b/runtime/rule.c
index 65ad071e..7a26a03a 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -110,7 +110,7 @@ DEFFUNC_llExecFunc(processMsgDoActions)
ABORT_FINALIZE(RS_RET_OK);
}
- iRetMod = actionCallAction(pAction, pDoActData->pMsg);
+ iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg);
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {