From 6cc46b15d953e1cd766f3f9f29011e740e51ca6c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 30 Jan 2008 15:37:23 +0000 Subject: - implemented simple output rate limiting - addded $ActionQueueDequeueSlowdown config directive - addded $MainMsgQueueDequeueSlowdown config directive - bugfix: MsgDup() did not work with new base object data structure --- action.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 77a22259..bdaae9c7 100644 --- a/action.c +++ b/action.c @@ -66,6 +66,7 @@ static int iActionQtoEnq = 2000; /* timeout for queue enque */ static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ +static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice @@ -103,6 +104,7 @@ actionResetQueueParams(void) iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + iActionQueueDeqSlowdown = 0; if(pszActionQFName != NULL) free(pszActionQFName); @@ -179,8 +181,12 @@ actionConstructFinalize(action_t *pThis) snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); /* create queue */ -RUNLOG_VAR("%d", ActionQueType); - CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction)); + /* action queues always (for now) have just one worker. This may change when + * we begin to implement an interface the enable output modules to request + * to be run on multiple threads. So far, this is forbidden by the interface + * spec. -- rgerhards, 2008-01-30 + */ + CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); objSetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -207,6 +213,7 @@ RUNLOG_VAR("%d", ActionQueType); setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); + setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); # undef setQPROP # undef setQPROPstr @@ -642,6 +649,7 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); finalize_it: RETiRet; -- cgit