diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-10 10:18:59 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2010-06-10 10:18:59 +0200 |
commit | d630bc742f2f0b6a29c745bba743ecb8a03033c6 (patch) | |
tree | 4c8a70641fb757808c2fd0cb5a0c257cb2c0eeb6 /action.c | |
parent | 559cb84a79a9848ce1415569158928478991108c (diff) | |
parent | 8fbcea483710faae468ecf0ba706adc7e60ed41d (diff) | |
download | rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.tar.gz rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.tar.xz rsyslog-d630bc742f2f0b6a29c745bba743ecb8a03033c6.zip |
Merge branch 'concurrent-output' into tmp
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 117 |
1 files changed, 65 insertions, 52 deletions
@@ -46,11 +46,15 @@ #include "wti.h" #include "datetime.h" #include "unicode-helper.h" +#include "atomic.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); +static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis) pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, pThis->iSecsExecOnceInterval ); - pThis->bSubmitFirehoseMode = 0; + pThis->submitToActQ = actionCallAction; + } else if(pThis->bWriteAllMarkMsgs == FALSE) { + /* nearly full-speed submission mode, default case */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; } else { - pThis->bSubmitFirehoseMode = 1; + /* full firehose submission mode */ + pThis->submitToActQ = doSubmitToActionQ; } /* we need to make a safety check: if the queue is NOT in direct mode, a single @@ -644,6 +652,7 @@ finalize_it: rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; + char *sz; dbgprintf("%s: ", module.GetStateName(pThis->pMod)); pThis->pMod->dbgPrintInstInfo(pThis->pModData); @@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); - dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode); + if(pThis->submitToActQ == actionCallAction) { + sz = "slow, but feature-rich"; + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) { + sz = "fast, but supports partial mark messages"; + } else if(pThis->submitToActQ == doSubmitToActionQ) { + sz = "firehose (fastest)"; + } else { + sz = "unknown (need to update debug display?)"; + } + dbgprintf("\tsubmission mode: %s\n", sz); dbgprintf("\n"); RETiRet; @@ -1122,20 +1140,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } -/* rgerhards 2004-11-09: fprintlog() is the actual driver for - * the output channel. It receives the channel description (f) as - * well as the message and outputs them according to the channel - * semantics. The message is typically already contained in the - * channel save buffer (f->f_prevline). This is not only the case - * when a message was already repeated but also when a new message - * arrived. - * rgerhards 2007-08-01: interface changed to use action_t - * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE - * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do - * not do this here. Failing to do so results in all kinds of - * "interesting" problems! - * RGERHARDS, 2008-01-29: - * This is now the action caller and has been renamed. +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. */ rsRetVal actionWriteToAction(action_t *pAction) @@ -1317,33 +1323,51 @@ finalize_it: } +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + time_t now; + time_t lastAct; + + if(pMsg->msgFlags & MARK) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if((now - lastAct) < MarkInterval / 2) { + DBGPRINTF("file was recently written, ignoring mark message\n"); + ABORT_FINALIZE(RS_RET_OK); + } + } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0); + } + + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + +finalize_it: + RETiRet; +} + /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing * and thus speed. * rgerhards, 2010-06-08 */ -static inline rsRetVal +static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; -#if 0 // TODO: we need to care about this -- after PoC 2010-06-08 - /* don't output marks to recently written outputs */ - if(pAction->bWriteAllMarkMsgs == FALSE - && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { - ABORT_FINALIZE(RS_RET_OK); - } -#endif - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - -#if 0 // we would need this for bWriteAllMarkMsgs - /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ - pAction->tLastExec = getActNow(pAction); /* re-init time flags */ - pAction->f_time = pAction->f_pMsg->ttGenTime; - -#endif iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); RETiRet; @@ -1351,15 +1375,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - * FYI: currently, this function is only called from the queue - * consumer. So we (conceptually) run detached from the input - * threads (which also means we may run much later than when the - * message was generated). +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal +static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; @@ -1367,18 +1387,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg) ISOBJ_TYPE_assert(pMsg, msg); ASSERT(pAction != NULL); - /* We need to lock the mutex only for repeated line processing. - * rgerhards, 2009-06-19 - */ - if(pAction->bSubmitFirehoseMode == 1) { - iRet = doSubmitToActionQ(pAction, pMsg); - } else { - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - iRet = doActionCallAction(pAction, pMsg); - UnlockObj(pAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - } + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + iRet = doActionCallAction(pAction, pMsg); + UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } |