summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-06-08 15:20:33 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-06-08 15:20:33 +0200
commit11bd517465360278b270ee7c18607b4d1d97e44e (patch)
tree429d9fda28dd132c8b592738d955cd34a56e823e /action.c
parent3e49a1075ab6750135e1a38cf0c213579fa30b4a (diff)
downloadrsyslog-11bd517465360278b270ee7c18607b4d1d97e44e.tar.gz
rsyslog-11bd517465360278b270ee7c18607b4d1d97e44e.tar.xz
rsyslog-11bd517465360278b270ee7c18607b4d1d97e44e.zip
added support for high-performance action queue submission if not all mark messages should be logged
this was previously not properly handeld. This is also the first occurence of a (real) CAS loop inside rsyslog. Note that the performance is now very well in the default configuration, and mark message directives are still correctly being handled. So this code looks close to final, but needs to have some bug cleanup as the testsuite shows.
Diffstat (limited to 'action.c')
-rw-r--r--action.c101
1 files changed, 63 insertions, 38 deletions
diff --git a/action.c b/action.c
index b055ebf4..b8751c63 100644
--- a/action.c
+++ b/action.c
@@ -46,11 +46,15 @@
#include "wti.h"
#include "datetime.h"
#include "unicode-helper.h"
+#include "atomic.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
+static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
+static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg);
+static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis)
pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
pThis->iSecsExecOnceInterval
);
- pThis->bSubmitFirehoseMode = 0;
+ pThis->submitToActQ = actionCallAction;
+ } else if(pThis->bWriteAllMarkMsgs == FALSE) {
+ /* nearly full-speed submission mode, default case */
+ pThis->submitToActQ = doSubmitToActionQNotAllMark;
} else {
- pThis->bSubmitFirehoseMode = 1;
+ /* full firehose submission mode */
+ pThis->submitToActQ = doSubmitToActionQ;
}
/* we need to make a safety check: if the queue is NOT in direct mode, a single
@@ -644,6 +652,7 @@ finalize_it:
rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
+ char *sz;
dbgprintf("%s: ", module.GetStateName(pThis->pMod));
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
@@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
- dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
+ if(pThis->submitToActQ == actionCallAction) {
+ sz = "slow, but feature-rich";
+ } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) {
+ sz = "fast, but supports partial mark messages";
+ } else if(pThis->submitToActQ == doSubmitToActionQ) {
+ sz = "firehose (fastest)";
+ } else {
+ sz = "unknown (need to update debug display?)";
+ }
+ dbgprintf("\tsubmission mode: %s\n", sz);
dbgprintf("\n");
RETiRet;
@@ -1317,33 +1335,51 @@ finalize_it:
}
+/* This submits the message to the action queue in case where we need to handle
+ * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
+ * for the synchronization.
+ * rgerhards, 2010-06-08
+ */
+static rsRetVal
+doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+ time_t now;
+ time_t lastAct;
+
+ if(pMsg->msgFlags & MARK) {
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if((now - lastAct) < MarkInterval / 2) {
+ DBGPRINTF("file was recently written, ignoring mark message\n");
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+ } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0);
+ }
+
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+finalize_it:
+ RETiRet;
+}
+
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed.
* rgerhards, 2010-06-08
*/
-static inline rsRetVal
+static rsRetVal
doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
-#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
- /* don't output marks to recently written outputs */
- if(pAction->bWriteAllMarkMsgs == FALSE
- && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
- ABORT_FINALIZE(RS_RET_OK);
- }
-#endif
-
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
-
-#if 0 // we would need this for bWriteAllMarkMsgs
- /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
- pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- pAction->f_time = pAction->f_pMsg->ttGenTime;
-
-#endif
iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
RETiRet;
@@ -1351,15 +1387,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
-/* call the configured action. Does all necessary housekeeping.
- * rgerhards, 2007-08-01
- * FYI: currently, this function is only called from the queue
- * consumer. So we (conceptually) run detached from the input
- * threads (which also means we may run much later than when the
- * message was generated).
+/* Call configured action, most complex case with all features supported (and thus slow).
+ * rgerhards, 2010-06-08
*/
#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
+static rsRetVal
actionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
@@ -1367,18 +1399,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
ISOBJ_TYPE_assert(pMsg, msg);
ASSERT(pAction != NULL);
- /* We need to lock the mutex only for repeated line processing.
- * rgerhards, 2009-06-19
- */
- if(pAction->bSubmitFirehoseMode == 1) {
- iRet = doSubmitToActionQ(pAction, pMsg);
- } else {
- LockObj(pAction);
- pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
- iRet = doActionCallAction(pAction, pMsg);
- UnlockObj(pAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- }
+ LockObj(pAction);
+ pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ iRet = doActionCallAction(pAction, pMsg);
+ UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
RETiRet;
}