summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-08 15:20:33 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-08 15:20:33 +0200
commit11bd517465360278b270ee7c18607b4d1d97e44e (patch)
tree429d9fda28dd132c8b592738d955cd34a56e823e
parent3e49a1075ab6750135e1a38cf0c213579fa30b4a (diff)
downloadrsyslog-11bd517465360278b270ee7c18607b4d1d97e44e.tar.gz
rsyslog-11bd517465360278b270ee7c18607b4d1d97e44e.tar.xz
rsyslog-11bd517465360278b270ee7c18607b4d1d97e44e.zip
added support for high-performance action queue submission if not all mark messages should be logged
this was previously not properly handeld. This is also the first occurence of a (real) CAS loop inside rsyslog. Note that the performance is now very well in the default configuration, and mark message directives are still correctly being handled. So this code looks close to final, but needs to have some bug cleanup as the testsuite shows.
-rw-r--r--action.c101
-rw-r--r--action.h6
-rw-r--r--runtime/atomic.h2
-rw-r--r--runtime/rule.c2
4 files changed, 68 insertions, 43 deletions
diff --git a/action.c b/action.c
index b055ebf4..b8751c63 100644
--- a/action.c
+++ b/action.c
@@ -46,11 +46,15 @@
#include "wti.h"
#include "datetime.h"
#include "unicode-helper.h"
+#include "atomic.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
+static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
+static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg);
+static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis)
pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
pThis->iSecsExecOnceInterval
);
- pThis->bSubmitFirehoseMode = 0;
+ pThis->submitToActQ = actionCallAction;
+ } else if(pThis->bWriteAllMarkMsgs == FALSE) {
+ /* nearly full-speed submission mode, default case */
+ pThis->submitToActQ = doSubmitToActionQNotAllMark;
} else {
- pThis->bSubmitFirehoseMode = 1;
+ /* full firehose submission mode */
+ pThis->submitToActQ = doSubmitToActionQ;
}
/* we need to make a safety check: if the queue is NOT in direct mode, a single
@@ -644,6 +652,7 @@ finalize_it:
rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
+ char *sz;
dbgprintf("%s: ", module.GetStateName(pThis->pMod));
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
@@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
- dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
+ if(pThis->submitToActQ == actionCallAction) {
+ sz = "slow, but feature-rich";
+ } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) {
+ sz = "fast, but supports partial mark messages";
+ } else if(pThis->submitToActQ == doSubmitToActionQ) {
+ sz = "firehose (fastest)";
+ } else {
+ sz = "unknown (need to update debug display?)";
+ }
+ dbgprintf("\tsubmission mode: %s\n", sz);
dbgprintf("\n");
RETiRet;
@@ -1317,33 +1335,51 @@ finalize_it:
}
+/* This submits the message to the action queue in case where we need to handle
+ * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
+ * for the synchronization.
+ * rgerhards, 2010-06-08
+ */
+static rsRetVal
+doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+ time_t now;
+ time_t lastAct;
+
+ if(pMsg->msgFlags & MARK) {
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if((now - lastAct) < MarkInterval / 2) {
+ DBGPRINTF("file was recently written, ignoring mark message\n");
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+ } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0);
+ }
+
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+finalize_it:
+ RETiRet;
+}
+
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed.
* rgerhards, 2010-06-08
*/
-static inline rsRetVal
+static rsRetVal
doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
-#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
- /* don't output marks to recently written outputs */
- if(pAction->bWriteAllMarkMsgs == FALSE
- && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
- ABORT_FINALIZE(RS_RET_OK);
- }
-#endif
-
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
-
-#if 0 // we would need this for bWriteAllMarkMsgs
- /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
- pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- pAction->f_time = pAction->f_pMsg->ttGenTime;
-
-#endif
iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
RETiRet;
@@ -1351,15 +1387,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
-/* call the configured action. Does all necessary housekeeping.
- * rgerhards, 2007-08-01
- * FYI: currently, this function is only called from the queue
- * consumer. So we (conceptually) run detached from the input
- * threads (which also means we may run much later than when the
- * message was generated).
+/* Call configured action, most complex case with all features supported (and thus slow).
+ * rgerhards, 2010-06-08
*/
#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
+static rsRetVal
actionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
@@ -1367,18 +1399,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
ISOBJ_TYPE_assert(pMsg, msg);
ASSERT(pAction != NULL);
- /* We need to lock the mutex only for repeated line processing.
- * rgerhards, 2009-06-19
- */
- if(pAction->bSubmitFirehoseMode == 1) {
- iRet = doSubmitToActionQ(pAction, pMsg);
- } else {
- LockObj(pAction);
- pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
- iRet = doActionCallAction(pAction, pMsg);
- UnlockObj(pAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- }
+ LockObj(pAction);
+ pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ iRet = doActionCallAction(pAction, pMsg);
+ UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
RETiRet;
}
diff --git a/action.h b/action.h
index 43e6ae7d..bf9ceafa 100644
--- a/action.h
+++ b/action.h
@@ -47,6 +47,7 @@ typedef enum {
/* the following struct defines the action object data structure
*/
+typedef struct action_s action_t;
struct action_s {
time_t f_time; /* used for "message repeated n times" - be careful, old, old code */
time_t tActNow; /* the current time for an action execution. Initially set to -1 and
@@ -69,10 +70,11 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
- sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */
short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
int f_prevcount; /* repetition cnt of prevline */
int f_repeatcount; /* number of "repeated" msgs */
+ rsRetVal (*submitToActQ)(action_t *, msg_t *); /* function submit message to action queue */
+ rsRetVal (*qConstruct)(struct queue_s *pThis);
enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING }
eParamPassing; /* mode of parameter passing to action */
int iNumTpls; /* number of array entries for template element below */
@@ -90,7 +92,6 @@ struct action_s {
void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
size_t *lenMsgs; /* length of message in ppMsgs */
};
-typedef struct action_s action_t;
/* function prototypes
@@ -101,7 +102,6 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
-rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
diff --git a/runtime/atomic.h b/runtime/atomic.h
index e5fafe04..da0852fa 100644
--- a/runtime/atomic.h
+++ b/runtime/atomic.h
@@ -50,7 +50,7 @@
# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
-# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
+# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal))
# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
/* functions below are not needed if we have atomics */
diff --git a/runtime/rule.c b/runtime/rule.c
index 65ad071e..7a26a03a 100644
--- a/runtime/rule.c
+++ b/runtime/rule.c
@@ -110,7 +110,7 @@ DEFFUNC_llExecFunc(processMsgDoActions)
ABORT_FINALIZE(RS_RET_OK);
}
- iRetMod = actionCallAction(pAction, pDoActData->pMsg);
+ iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg);
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {