summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-04-03 13:19:48 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-04-03 13:19:48 +0000
commit46fbfee41e88034135725beb4136d44b94388ede (patch)
treeccb7efd204f881cd47d9ce8e942977614a5d35b3
parent5e279ea0f79250a07948ed6c24731f60e8221543 (diff)
downloadrsyslog-46fbfee41e88034135725beb4136d44b94388ede.tar.gz
rsyslog-46fbfee41e88034135725beb4136d44b94388ede.tar.xz
rsyslog-46fbfee41e88034135725beb4136d44b94388ede.zip
added the capability to specify a processing (actually dequeue) timeframe
with queues - so things can be configured to be done at off-peak hours
-rw-r--r--ChangeLog3
-rw-r--r--action.c15
-rw-r--r--doc/features.html6
-rw-r--r--doc/queues.html12
-rw-r--r--doc/rsyslog_ng_comparison.html13
-rw-r--r--queue.c85
-rw-r--r--queue.h2
-rwxr-xr-xstringbuf.h1
-rw-r--r--syslogd.c23
9 files changed, 114 insertions, 46 deletions
diff --git a/ChangeLog b/ChangeLog
index 4648f791..f6291a67 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -5,6 +5,9 @@ Version 3.17.0 (rgerhards), 2008-04-??
- bugfix: memory leaks in script engine
- properties are now case-insensitive everywhere (script, filters,
templates)
+- added the capability to specify a processing (actually dequeue)
+ timeframe with queues - so things can be configured to be done
+ at off-peak hours
---------------------------------------------------------------------------
Version 3.15.1 (rgerhards), 2008-04-??
- disabled atomic operations for the time being because they introduce some
diff --git a/action.c b/action.c
index 99ae8b32..30bf3c92 100644
--- a/action.c
+++ b/action.c
@@ -74,8 +74,10 @@ static int iActionQtoEnq = 2000; /* timeout for queue enque */
static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
-static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
static int64 iActionQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
+static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
+static int iActionQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */
+static int iActionQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
@@ -113,8 +115,10 @@ actionResetQueueParams(void)
iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
- iActionQueueDeqSlowdown = 0;
iActionQueMaxDiskSpace = 0;
+ iActionQueueDeqSlowdown = 0;
+ iActionQueueDeqtWinFromHr = 0;
+ iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */
glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */
@@ -237,7 +241,9 @@ actionConstructFinalize(action_t *pThis)
setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
- setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
+ setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
+ setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
+ setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
@@ -680,6 +686,8 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL));
finalize_it:
RETiRet;
@@ -800,6 +808,5 @@ finalize_it:
RETiRet;
}
-
/* vi:set ai:
*/
diff --git a/doc/features.html b/doc/features.html
index 9573030e..f74f2aaf 100644
--- a/doc/features.html
+++ b/doc/features.html
@@ -1,7 +1,5 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
-<html><head><title>rsyslog features</title>
-
-</head>
+<html><head><title>rsyslog features</title></head>
<body>
<h1>RSyslog - Features</h1>
<p><b>This page lists both current features as well as
@@ -31,7 +29,7 @@ reliability</li>
<li>support for sending and receiving compressed syslog messages</li>
<li>support for on-demand on-disk spooling of messages that can
not be processed fast enough (a great feature for <a href="rsyslog_high_database_rate.html">writing massive
-amounts of syslog messages to a database</a>)</li>
+amounts of syslog messages to a database</a>)</li><li>support for selectively <a href="http://wiki.rsyslog.com/index.php/OffPeakHours">processing messages only during specific timeframes</a> and spooling them to disk otherwise</li>
<li>ability to monitor text files and convert their contents
into syslog messages (one per line)</li>
<li>ability to configure backup syslog/database servers - if
diff --git a/doc/queues.html b/doc/queues.html
index 80641d8c..a2074d36 100644
--- a/doc/queues.html
+++ b/doc/queues.html
@@ -288,7 +288,17 @@ directive allows to specify how long (in microseconds) dequeueing should be
delayed. While simple, it still is powerful. For example, using a
DequeueSlowdown delay of 1,000 microseconds on a UDP send action ensures that no
more than 1,000 messages can be sent within a second (actually less, as there is
-also some time needed for the processing itself). </p>
+also some time needed for the processing itself).</p><h2>Processing Timeframes</h2><p>Queues
+can be set to dequeue (process) messages only during certain
+timeframes. This is useful if you, for example, would like to transfer
+the bulk of messages only during off-peak hours, e.g. when you have
+only limited bandwidth on the network path the the central server.</p><p>Currently,
+only a single timeframe is supported and, even worse, it can only be
+specified by the hour. It is not hard to extend rsyslog's capabilities
+in this regard - it was just not requested so far. So if you need more
+fine-grained control, let us know and we'll probably implement it.
+There are two configuration directives, both should be used together or
+results are unpredictable:" <i>$&lt;object&gt;QueueDequeueTimeBegin &lt;hour&gt;</i>" and&nbsp;"<i>$&lt;object&gt;QueueDequeueTimeEnd &lt;hour&gt;</i>". The hour parameter must be specified in 24-hour format (so 10pm is 22). A use case for this parameter can be found in the <a href="http://wiki.rsyslog.com/index.php/OffPeakHours">rsyslog wiki</a>. </p>
<h2>Terminating Queues</h2>
<p>Terminating a process sounds easy, but can be complex.
<span style="font-size: 12pt; line-height: 115%; font-family: 'Times New Roman',serif;" lang="EN-US">
diff --git a/doc/rsyslog_ng_comparison.html b/doc/rsyslog_ng_comparison.html
index 852ab4ed..60eeee74 100644
--- a/doc/rsyslog_ng_comparison.html
+++ b/doc/rsyslog_ng_comparison.html
@@ -1,6 +1,6 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html><head>
-<meta content="de" http-equiv="Content-Language"><title>rsyslog vs. syslog-ng - a comparison</title>
+<meta http-equiv="Content-Language" content="de"><title>rsyslog vs. syslog-ng - a comparison</title>
</head>
<body>
@@ -122,7 +122,9 @@ based framing on syslog/tcp connections</td>
<td valign="top">yes</td>
</tr>
<tr>
-<td valign="top">syslog over RELP<br>truly reliable message delivery (<a href="http://rgerhards.blogspot.com/2008/04/on-unreliability-of-plain-tcp-syslog.html">Why is plain tcp syslog not reliable?</a>)</td>
+<td valign="top">syslog over RELP<br>
+truly reliable message delivery (<a href="http://rgerhards.blogspot.com/2008/04/on-unreliability-of-plain-tcp-syslog.html">Why
+is plain tcp syslog not reliable?</a>)</td>
<td valign="top">yes</td>
<td valign="top">no</td>
</tr>
@@ -337,6 +339,13 @@ be placed on different disk</td>
<td valign="top">no</td>
</tr>
<tr>
+<td valign="top">ability to process spooled
+messages only during a configured timeframe (e.g. process messages only
+during off-peak hours, during peak hours they are enqueued only)</td>
+<td valign="top"><a href="http://wiki.rsyslog.com/index.php/OffPeakHours">yes</a><br>(can independently be configured for the main queue and each action queue)</td>
+<td valign="top">no</td>
+</tr>
+<tr>
<td valign="top">ability to configure backup
syslog/database servers </td>
<td valign="top">yes</td>
diff --git a/queue.c b/queue.c
index 57484e60..7456f4a6 100644
--- a/queue.c
+++ b/queue.c
@@ -1,16 +1,16 @@
/* queue.c
-*
-* This file implements the queue object and its several queueing methods.
-*
-* File begun on 2008-01-03 by RGerhards
-*
-* There is some in-depth documentation available in doc/dev_queue.html
-* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
-* if you are getting aquainted to the object.
-*
-* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
-*
-* This file is part of rsyslog.
+ *
+ * This file implements the queue object and its several queueing methods.
+ *
+ * File begun on 2008-01-03 by RGerhards
+ *
+ * There is some in-depth documentation available in doc/dev_queue.html
+ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
+ * if you are getting aquainted to the object.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -38,6 +38,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h> /* required for HP UX */
+#include <time.h>
#include <errno.h>
#include "rsyslog.h"
@@ -272,6 +273,8 @@ queueStartDA(queue_t *pThis)
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
+ CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
+ CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
@@ -342,7 +345,6 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
@@ -1269,6 +1271,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1486,20 +1489,67 @@ finalize_it:
* }
*
* Bottom line: we need to check which type of window we have and need to adjust our
- * logic accordingly. Of course, sleep calculations need to be done up to the minute,
+ * logic accordingly. Of course, sleep calculations need to be done up to the minute,
* but you get the idea from the code above.
*/
static rsRetVal
queueRateLimiter(queue_t *pThis)
{
DEFiRet;
+ int iDelay;
+ int iHrCurr;
+ time_t tCurr;
+ struct tm m;
ISOBJ_TYPE_assert(pThis, queue);
dbgoprint((obj_t*) pThis, "entering rate limiter\n");
- srSleep(2, 0);
-finalize_it:
+ iDelay = 0;
+dbgprintf("deq win from %d to %d\n", pThis->iDeqtWinFromHr, pThis->iDeqtWinToHr);
+ if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
+ /* time calls are expensive, so only do them when needed */
+ time(&tCurr);
+ localtime_r(&tCurr, &m);
+ iHrCurr = m.tm_hour;
+RUNLOG_VAR("%d", iHrCurr);
+
+ if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) {
+ if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) {
+ ; /* do not delay */
+ } else {
+ iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600;
+ /* this time, we are already into the next hour, so we need
+ * to subtract our current minute and seconds.
+ */
+ iDelay -= m.tm_min * 60;
+ iDelay -= m.tm_sec;
+ }
+ } else {
+ if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) {
+ ; /* do not delay */
+ } else {
+ if(iHrCurr < pThis->iDeqtWinFromHr) {
+ iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; /* -1 as we are already in the hour */
+ iDelay += (60 - m.tm_min) * 60;
+ iDelay += 60 - m.tm_sec;
+ } else {
+ iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600;
+ /* this time, we are already into the next hour, so we need
+ * to subtract our current minute and seconds.
+ */
+ iDelay -= m.tm_min * 60;
+ iDelay -= m.tm_sec;
+ }
+ }
+ }
+ }
+
+ if(iDelay > 0) {
+ dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
+ srSleep(iDelay, 0);
+ }
+
dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet);
RETiRet;
}
@@ -2272,6 +2322,5 @@ BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
ENDObjClassInit(queue)
-/*
- * vi:set ai:
+/* vi:set ai:
*/
diff --git a/queue.h b/queue.h
index ecac6ee4..7dfeb226 100644
--- a/queue.h
+++ b/queue.h
@@ -87,7 +87,7 @@ typedef struct queue_s {
/* end rate limiting */
/* dequeue time window settings (may also be expanded) */
int iDeqtWinFromHr; /* begin of dequeue time window (hour only) */
- int iDeqtWinToHr; /* end of dequeue time window (hour only) */
+ int iDeqtWinToHr; /* end of dequeue time window (hour only), set to 25 to disable deq window! */
/* note that begin and end have specific semantics. It is a big difference if we have
* begin 4, end 22 or begin 22, end 4. In the later case, dequeuing will run from 10p,
* throughout the night and stop at 4 in the morning. In the first case, it will start
diff --git a/stringbuf.h b/stringbuf.h
index aa31884e..3475b8f6 100755
--- a/stringbuf.h
+++ b/stringbuf.h
@@ -121,6 +121,7 @@ void rsCStrSetAllocIncrement(cstr_t *pThis, int iNewIncrement);
rsRetVal rsCStrAppendInt(cstr_t *pThis, long i);
+rsRetVal strExit(void); /* TODO: remove once we have a real object interface! */
uchar* rsCStrGetSzStr(cstr_t *pThis);
uchar* rsCStrGetSzStrNoNULL(cstr_t *pThis);
rsRetVal rsCStrSetSzStr(cstr_t *pThis, uchar *pszNew);
diff --git a/syslogd.c b/syslogd.c
index b5554e5f..e6dae301 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -338,8 +338,10 @@ static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */
static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
-static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
static int64 iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
+static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+static int iMainMsgQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */
+static int iMainMsgQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */
/* support for simple textual representation of FIOP names
@@ -2323,6 +2325,8 @@ init(void)
setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown);
+ setQPROP(queueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr);
+ setQPROP(queueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
@@ -2687,6 +2691,8 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinFromHr, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinToHr, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL));
@@ -2766,21 +2772,6 @@ static void mainThread()
BEGINfunc
uchar *pTmp;
-#if 0 // code moved back to main()
- /* doing some core initializations */
- if((iRet = modInitIminternal()) != RS_RET_OK) {
- fprintf(stderr, "fatal error: could not initialize errbuf object (error code %d).\n",
- iRet);
- exit(1); /* "good" exit, leaving at init for fatal error */
- }
-
- if((iRet = loadBuildInModules()) != RS_RET_OK) {
- fprintf(stderr, "fatal error: could not activate built-in modules. Error code %d.\n",
- iRet);
- exit(1); /* "good" exit, leaving at init for fatal error */
- }
-#endif
-
/* Note: signals MUST be processed by the thread this code is running in. The reason
* is that we need to interrupt the select() system call. -- rgerhards, 2007-10-17
*/