From 6cc46b15d953e1cd766f3f9f29011e740e51ca6c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 30 Jan 2008 15:37:23 +0000 Subject: - implemented simple output rate limiting - addded $ActionQueueDequeueSlowdown config directive - addded $MainMsgQueueDequeueSlowdown config directive - bugfix: MsgDup() did not work with new base object data structure --- action.c | 12 ++++++++++-- cfsysline.c | 7 +++++-- msg.c | 5 +++-- queue.c | 11 +++++++++++ queue.h | 4 ++++ syslogd.c | 4 ++++ 6 files changed, 37 insertions(+), 6 deletions(-) diff --git a/action.c b/action.c index 77a22259..bdaae9c7 100644 --- a/action.c +++ b/action.c @@ -66,6 +66,7 @@ static int iActionQtoEnq = 2000; /* timeout for queue enque */ static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ +static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice @@ -103,6 +104,7 @@ actionResetQueueParams(void) iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + iActionQueueDeqSlowdown = 0; if(pszActionQFName != NULL) free(pszActionQFName); @@ -179,8 +181,12 @@ actionConstructFinalize(action_t *pThis) snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr); /* create queue */ -RUNLOG_VAR("%d", ActionQueType); - CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction)); + /* action queues always (for now) have just one worker. This may change when + * we begin to implement an interface the enable output modules to request + * to be run on multiple threads. So far, this is forbidden by the interface + * spec. -- rgerhards, 2008-01-30 + */ + CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); objSetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -207,6 +213,7 @@ RUNLOG_VAR("%d", ActionQueType); setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); + setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); # undef setQPROP # undef setQPROPstr @@ -642,6 +649,7 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); finalize_it: RETiRet; diff --git a/cfsysline.c b/cfsysline.c index c87c2a54..d307569b 100644 --- a/cfsysline.c +++ b/cfsysline.c @@ -129,8 +129,11 @@ static rsRetVal parseIntVal(uchar **pp, size_t *pVal) } /* pull value */ - for(i = 0 ; *p && isdigit((int) *p) ; ++p) - i = i * 10 + *p - '0'; + for(i = 0 ; *p && (isdigit((int) *p) || *p == '.' || *p == ',') ; ++p) { + if(isdigit((int) *p)) { + i = i * 10 + *p - '0'; + } + } if(bWasNegative) i *= -1; diff --git a/msg.c b/msg.c index 11b87b92..a5fc2bc7 100644 --- a/msg.c +++ b/msg.c @@ -328,8 +328,8 @@ msg_t* MsgDup(msg_t* pOld) assert(pOld != NULL); - if((pNew = (msg_t*) calloc(1, sizeof(msg_t))) == NULL) { - glblHadMemShortage = 1; + BEGINfunc + if(msgConstruct(&pNew) != RS_RET_OK) { return NULL; } @@ -364,6 +364,7 @@ msg_t* MsgDup(msg_t* pOld) * if they are needed once again. So we let them re-create if needed. */ + ENDfunc return pNew; } #undef tmpCOPYSZ diff --git a/queue.c b/queue.c index 0fef44e6..d829b9bd 100644 --- a/queue.c +++ b/queue.c @@ -235,6 +235,7 @@ queueStartDA(queue_t *pThis) pThis->pqDA->pqParent = pThis; CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); @@ -1379,6 +1380,15 @@ queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); + /* we now need to check if we should deliberately delay processing a bit + * and, if so, do that. -- rgerhards, 2008-01-30 + */ + if(pThis->iDeqSlowdown) { + dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", + pThis->iDeqSlowdown); + srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); + } + finalize_it: dbgoprint((obj_t*) pThis, "regular consumer returns %d\n", iRet); RETiRet; @@ -1985,6 +1995,7 @@ DEFpropSetMeth(queue, bIsDA, int); DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); DEFpropSetMeth(queue, bSaveOnShutdown, int); DEFpropSetMeth(queue, pUsr, void*); +DEFpropSetMeth(queue, iDeqSlowdown, int); /* This function can be used as a generic way to set properties. Only the subset diff --git a/queue.h b/queue.h index 12e4fcb6..090ff4c5 100644 --- a/queue.h +++ b/queue.h @@ -80,6 +80,9 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ + /* rate limiting settings (will be expanded */ + int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ + /* end rate limiting */ rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer @@ -174,6 +177,7 @@ PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int); PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int); PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(queue, pUsr, void*); +PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int); #define queueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ diff --git a/syslogd.c b/syslogd.c index 0f0076ed..4aabba5d 100644 --- a/syslogd.c +++ b/syslogd.c @@ -414,6 +414,7 @@ static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */ static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */ static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ +static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ @@ -528,6 +529,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iMainMsgQtoEnq = 2000; iMainMsgQtoWrkShutdown = 60000; iMainMsgQWrkMinMsgs = 100; + iMainMsgQDeqSlowdown = 0; bMainMsgQSaveOnShutdown = 1; MainMsgQueType = QUEUETYPE_FIXED_ARRAY; glbliActionResumeRetryCount = 0; @@ -3227,6 +3229,7 @@ init(void) setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); + setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown); # undef setQPROP # undef setQPROPstr @@ -4368,6 +4371,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL)); -- cgit