summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-30 15:37:23 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-30 15:37:23 +0000
commit6cc46b15d953e1cd766f3f9f29011e740e51ca6c (patch)
treed97914e6db3c1decf888cd4a00fcb370f34e4801
parentf6f4bcb0fdb30646927724b2a86149e5b7d36e8b (diff)
downloadrsyslog-6cc46b15d953e1cd766f3f9f29011e740e51ca6c.tar.gz
rsyslog-6cc46b15d953e1cd766f3f9f29011e740e51ca6c.tar.xz
rsyslog-6cc46b15d953e1cd766f3f9f29011e740e51ca6c.zip
- implemented simple output rate limiting
- addded $ActionQueueDequeueSlowdown config directive - addded $MainMsgQueueDequeueSlowdown config directive - bugfix: MsgDup() did not work with new base object data structure
-rw-r--r--action.c12
-rw-r--r--cfsysline.c7
-rw-r--r--msg.c5
-rw-r--r--queue.c11
-rw-r--r--queue.h4
-rw-r--r--syslogd.c4
6 files changed, 37 insertions, 6 deletions
diff --git a/action.c b/action.c
index 77a22259..bdaae9c7 100644
--- a/action.c
+++ b/action.c
@@ -66,6 +66,7 @@ static int iActionQtoEnq = 2000; /* timeout for queue enque */
static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
@@ -103,6 +104,7 @@ actionResetQueueParams(void)
iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ iActionQueueDeqSlowdown = 0;
if(pszActionQFName != NULL)
free(pszActionQFName);
@@ -179,8 +181,12 @@ actionConstructFinalize(action_t *pThis)
snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
/* create queue */
-RUNLOG_VAR("%d", ActionQueType);
- CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ /* action queues always (for now) have just one worker. This may change when
+ * we begin to implement an interface the enable output modules to request
+ * to be run on multiple threads. So far, this is forbidden by the interface
+ * spec. -- rgerhards, 2008-01-30
+ */
+ CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
objSetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -207,6 +213,7 @@ RUNLOG_VAR("%d", ActionQueType);
setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
+ setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
# undef setQPROP
# undef setQPROPstr
@@ -642,6 +649,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL));
finalize_it:
RETiRet;
diff --git a/cfsysline.c b/cfsysline.c
index c87c2a54..d307569b 100644
--- a/cfsysline.c
+++ b/cfsysline.c
@@ -129,8 +129,11 @@ static rsRetVal parseIntVal(uchar **pp, size_t *pVal)
}
/* pull value */
- for(i = 0 ; *p && isdigit((int) *p) ; ++p)
- i = i * 10 + *p - '0';
+ for(i = 0 ; *p && (isdigit((int) *p) || *p == '.' || *p == ',') ; ++p) {
+ if(isdigit((int) *p)) {
+ i = i * 10 + *p - '0';
+ }
+ }
if(bWasNegative)
i *= -1;
diff --git a/msg.c b/msg.c
index 11b87b92..a5fc2bc7 100644
--- a/msg.c
+++ b/msg.c
@@ -328,8 +328,8 @@ msg_t* MsgDup(msg_t* pOld)
assert(pOld != NULL);
- if((pNew = (msg_t*) calloc(1, sizeof(msg_t))) == NULL) {
- glblHadMemShortage = 1;
+ BEGINfunc
+ if(msgConstruct(&pNew) != RS_RET_OK) {
return NULL;
}
@@ -364,6 +364,7 @@ msg_t* MsgDup(msg_t* pOld)
* if they are needed once again. So we let them re-create if needed.
*/
+ ENDfunc
return pNew;
}
#undef tmpCOPYSZ
diff --git a/queue.c b/queue.c
index 0fef44e6..d829b9bd 100644
--- a/queue.c
+++ b/queue.c
@@ -235,6 +235,7 @@ queueStartDA(queue_t *pThis)
pThis->pqDA->pqParent = pThis;
CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
+ CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
@@ -1379,6 +1380,15 @@ queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
+ /* we now need to check if we should deliberately delay processing a bit
+ * and, if so, do that. -- rgerhards, 2008-01-30
+ */
+ if(pThis->iDeqSlowdown) {
+ dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
+ pThis->iDeqSlowdown);
+ srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
+ }
+
finalize_it:
dbgoprint((obj_t*) pThis, "regular consumer returns %d\n", iRet);
RETiRet;
@@ -1985,6 +1995,7 @@ DEFpropSetMeth(queue, bIsDA, int);
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
DEFpropSetMeth(queue, bSaveOnShutdown, int);
DEFpropSetMeth(queue, pUsr, void*);
+DEFpropSetMeth(queue, iDeqSlowdown, int);
/* This function can be used as a generic way to set properties. Only the subset
diff --git a/queue.h b/queue.h
index 12e4fcb6..090ff4c5 100644
--- a/queue.h
+++ b/queue.h
@@ -80,6 +80,9 @@ typedef struct queue_s {
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
int toEnq; /* enqueue timeout */
+ /* rate limiting settings (will be expanded */
+ int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
+ /* end rate limiting */
rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
@@ -174,6 +177,7 @@ PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int);
PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int);
PROTOTYPEpropSetMeth(queue, pUsr, void*);
+PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int);
#define queueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/syslogd.c b/syslogd.c
index 0f0076ed..4aabba5d 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -414,6 +414,7 @@ static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */
static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */
static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
+static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
@@ -528,6 +529,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iMainMsgQtoEnq = 2000;
iMainMsgQtoWrkShutdown = 60000;
iMainMsgQWrkMinMsgs = 100;
+ iMainMsgQDeqSlowdown = 0;
bMainMsgQSaveOnShutdown = 1;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
glbliActionResumeRetryCount = 0;
@@ -3227,6 +3229,7 @@ init(void)
setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity);
setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
+ setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown);
# undef setQPROP
# undef setQPROPstr
@@ -4368,6 +4371,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));