diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-04-03 13:19:48 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-04-03 13:19:48 +0000 |
commit | 46fbfee41e88034135725beb4136d44b94388ede (patch) | |
tree | ccb7efd204f881cd47d9ce8e942977614a5d35b3 | |
parent | 5e279ea0f79250a07948ed6c24731f60e8221543 (diff) | |
download | rsyslog-46fbfee41e88034135725beb4136d44b94388ede.tar.gz rsyslog-46fbfee41e88034135725beb4136d44b94388ede.tar.xz rsyslog-46fbfee41e88034135725beb4136d44b94388ede.zip |
added the capability to specify a processing (actually dequeue) timeframe
with queues - so things can be configured to be done at off-peak hours
-rw-r--r-- | ChangeLog | 3 | ||||
-rw-r--r-- | action.c | 15 | ||||
-rw-r--r-- | doc/features.html | 6 | ||||
-rw-r--r-- | doc/queues.html | 12 | ||||
-rw-r--r-- | doc/rsyslog_ng_comparison.html | 13 | ||||
-rw-r--r-- | queue.c | 85 | ||||
-rw-r--r-- | queue.h | 2 | ||||
-rwxr-xr-x | stringbuf.h | 1 | ||||
-rw-r--r-- | syslogd.c | 23 |
9 files changed, 114 insertions, 46 deletions
@@ -5,6 +5,9 @@ Version 3.17.0 (rgerhards), 2008-04-?? - bugfix: memory leaks in script engine - properties are now case-insensitive everywhere (script, filters, templates) +- added the capability to specify a processing (actually dequeue) + timeframe with queues - so things can be configured to be done + at off-peak hours --------------------------------------------------------------------------- Version 3.15.1 (rgerhards), 2008-04-?? - disabled atomic operations for the time being because they introduce some @@ -74,8 +74,10 @@ static int iActionQtoEnq = 2000; /* timeout for queue enque */ static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ -static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ static int64 iActionQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */ +static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ +static int iActionQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */ +static int iActionQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice @@ -113,8 +115,10 @@ actionResetQueueParams(void) iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ - iActionQueueDeqSlowdown = 0; iActionQueMaxDiskSpace = 0; + iActionQueueDeqSlowdown = 0; + iActionQueueDeqtWinFromHr = 0; + iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */ glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */ @@ -237,7 +241,9 @@ actionConstructFinalize(action_t *pThis) setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); - setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); + setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); + setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); + setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr @@ -680,6 +686,8 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); finalize_it: RETiRet; @@ -800,6 +808,5 @@ finalize_it: RETiRet; } - /* vi:set ai: */ diff --git a/doc/features.html b/doc/features.html index 9573030e..f74f2aaf 100644 --- a/doc/features.html +++ b/doc/features.html @@ -1,7 +1,5 @@ <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> -<html><head><title>rsyslog features</title> - -</head> +<html><head><title>rsyslog features</title></head> <body> <h1>RSyslog - Features</h1> <p><b>This page lists both current features as well as @@ -31,7 +29,7 @@ reliability</li> <li>support for sending and receiving compressed syslog messages</li> <li>support for on-demand on-disk spooling of messages that can not be processed fast enough (a great feature for <a href="rsyslog_high_database_rate.html">writing massive -amounts of syslog messages to a database</a>)</li> +amounts of syslog messages to a database</a>)</li><li>support for selectively <a href="http://wiki.rsyslog.com/index.php/OffPeakHours">processing messages only during specific timeframes</a> and spooling them to disk otherwise</li> <li>ability to monitor text files and convert their contents into syslog messages (one per line)</li> <li>ability to configure backup syslog/database servers - if diff --git a/doc/queues.html b/doc/queues.html index 80641d8c..a2074d36 100644 --- a/doc/queues.html +++ b/doc/queues.html @@ -288,7 +288,17 @@ directive allows to specify how long (in microseconds) dequeueing should be delayed. While simple, it still is powerful. For example, using a DequeueSlowdown delay of 1,000 microseconds on a UDP send action ensures that no more than 1,000 messages can be sent within a second (actually less, as there is -also some time needed for the processing itself). </p> +also some time needed for the processing itself).</p><h2>Processing Timeframes</h2><p>Queues +can be set to dequeue (process) messages only during certain +timeframes. This is useful if you, for example, would like to transfer +the bulk of messages only during off-peak hours, e.g. when you have +only limited bandwidth on the network path the the central server.</p><p>Currently, +only a single timeframe is supported and, even worse, it can only be +specified by the hour. It is not hard to extend rsyslog's capabilities +in this regard - it was just not requested so far. So if you need more +fine-grained control, let us know and we'll probably implement it. +There are two configuration directives, both should be used together or +results are unpredictable:" <i>$<object>QueueDequeueTimeBegin <hour></i>" and "<i>$<object>QueueDequeueTimeEnd <hour></i>". The hour parameter must be specified in 24-hour format (so 10pm is 22). A use case for this parameter can be found in the <a href="http://wiki.rsyslog.com/index.php/OffPeakHours">rsyslog wiki</a>. </p> <h2>Terminating Queues</h2> <p>Terminating a process sounds easy, but can be complex. <span style="font-size: 12pt; line-height: 115%; font-family: 'Times New Roman',serif;" lang="EN-US"> diff --git a/doc/rsyslog_ng_comparison.html b/doc/rsyslog_ng_comparison.html index 852ab4ed..60eeee74 100644 --- a/doc/rsyslog_ng_comparison.html +++ b/doc/rsyslog_ng_comparison.html @@ -1,6 +1,6 @@ <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> <html><head> -<meta content="de" http-equiv="Content-Language"><title>rsyslog vs. syslog-ng - a comparison</title> +<meta http-equiv="Content-Language" content="de"><title>rsyslog vs. syslog-ng - a comparison</title> </head> <body> @@ -122,7 +122,9 @@ based framing on syslog/tcp connections</td> <td valign="top">yes</td> </tr> <tr> -<td valign="top">syslog over RELP<br>truly reliable message delivery (<a href="http://rgerhards.blogspot.com/2008/04/on-unreliability-of-plain-tcp-syslog.html">Why is plain tcp syslog not reliable?</a>)</td> +<td valign="top">syslog over RELP<br> +truly reliable message delivery (<a href="http://rgerhards.blogspot.com/2008/04/on-unreliability-of-plain-tcp-syslog.html">Why +is plain tcp syslog not reliable?</a>)</td> <td valign="top">yes</td> <td valign="top">no</td> </tr> @@ -337,6 +339,13 @@ be placed on different disk</td> <td valign="top">no</td> </tr> <tr> +<td valign="top">ability to process spooled +messages only during a configured timeframe (e.g. process messages only +during off-peak hours, during peak hours they are enqueued only)</td> +<td valign="top"><a href="http://wiki.rsyslog.com/index.php/OffPeakHours">yes</a><br>(can independently be configured for the main queue and each action queue)</td> +<td valign="top">no</td> +</tr> +<tr> <td valign="top">ability to configure backup syslog/database servers </td> <td valign="top">yes</td> @@ -1,16 +1,16 @@ /* queue.c -* -* This file implements the queue object and its several queueing methods. -* -* File begun on 2008-01-03 by RGerhards -* -* There is some in-depth documentation available in doc/dev_queue.html -* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it -* if you are getting aquainted to the object. -* -* Copyright 2008 Rainer Gerhards and Adiscon GmbH. -* -* This file is part of rsyslog. + * + * This file implements the queue object and its several queueing methods. + * + * File begun on 2008-01-03 by RGerhards + * + * There is some in-depth documentation available in doc/dev_queue.html + * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it + * if you are getting aquainted to the object. + * + * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. * * Rsyslog is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -38,6 +38,7 @@ #include <fcntl.h> #include <unistd.h> #include <sys/stat.h> /* required for HP UX */ +#include <time.h> #include <errno.h> #include "rsyslog.h" @@ -272,6 +273,8 @@ queueStartDA(queue_t *pThis) CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); + CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); + CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { @@ -342,7 +345,6 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); @@ -1269,6 +1271,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1486,20 +1489,67 @@ finalize_it: * } * * Bottom line: we need to check which type of window we have and need to adjust our - * logic accordingly. Of course, sleep calculations need to be done up to the minute, + * logic accordingly. Of course, sleep calculations need to be done up to the minute, * but you get the idea from the code above. */ static rsRetVal queueRateLimiter(queue_t *pThis) { DEFiRet; + int iDelay; + int iHrCurr; + time_t tCurr; + struct tm m; ISOBJ_TYPE_assert(pThis, queue); dbgoprint((obj_t*) pThis, "entering rate limiter\n"); - srSleep(2, 0); -finalize_it: + iDelay = 0; +dbgprintf("deq win from %d to %d\n", pThis->iDeqtWinFromHr, pThis->iDeqtWinToHr); + if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ + /* time calls are expensive, so only do them when needed */ + time(&tCurr); + localtime_r(&tCurr, &m); + iHrCurr = m.tm_hour; +RUNLOG_VAR("%d", iHrCurr); + + if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) { + if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) { + ; /* do not delay */ + } else { + iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600; + /* this time, we are already into the next hour, so we need + * to subtract our current minute and seconds. + */ + iDelay -= m.tm_min * 60; + iDelay -= m.tm_sec; + } + } else { + if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) { + ; /* do not delay */ + } else { + if(iHrCurr < pThis->iDeqtWinFromHr) { + iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; /* -1 as we are already in the hour */ + iDelay += (60 - m.tm_min) * 60; + iDelay += 60 - m.tm_sec; + } else { + iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600; + /* this time, we are already into the next hour, so we need + * to subtract our current minute and seconds. + */ + iDelay -= m.tm_min * 60; + iDelay -= m.tm_sec; + } + } + } + } + + if(iDelay > 0) { + dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); + srSleep(iDelay, 0); + } + dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet); RETiRet; } @@ -2272,6 +2322,5 @@ BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE) OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); ENDObjClassInit(queue) -/* - * vi:set ai: +/* vi:set ai: */ @@ -87,7 +87,7 @@ typedef struct queue_s { /* end rate limiting */ /* dequeue time window settings (may also be expanded) */ int iDeqtWinFromHr; /* begin of dequeue time window (hour only) */ - int iDeqtWinToHr; /* end of dequeue time window (hour only) */ + int iDeqtWinToHr; /* end of dequeue time window (hour only), set to 25 to disable deq window! */ /* note that begin and end have specific semantics. It is a big difference if we have * begin 4, end 22 or begin 22, end 4. In the later case, dequeuing will run from 10p, * throughout the night and stop at 4 in the morning. In the first case, it will start diff --git a/stringbuf.h b/stringbuf.h index aa31884e..3475b8f6 100755 --- a/stringbuf.h +++ b/stringbuf.h @@ -121,6 +121,7 @@ void rsCStrSetAllocIncrement(cstr_t *pThis, int iNewIncrement); rsRetVal rsCStrAppendInt(cstr_t *pThis, long i); +rsRetVal strExit(void); /* TODO: remove once we have a real object interface! */ uchar* rsCStrGetSzStr(cstr_t *pThis); uchar* rsCStrGetSzStrNoNULL(cstr_t *pThis); rsRetVal rsCStrSetSzStr(cstr_t *pThis, uchar *pszNew); @@ -338,8 +338,10 @@ static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */ static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ -static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ static int64 iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */ +static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ +static int iMainMsgQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */ +static int iMainMsgQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */ /* support for simple textual representation of FIOP names @@ -2323,6 +2325,8 @@ init(void) setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown); + setQPROP(queueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr); + setQPROP(queueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr @@ -2687,6 +2691,8 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinToHr, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL)); @@ -2766,21 +2772,6 @@ static void mainThread() BEGINfunc uchar *pTmp; -#if 0 // code moved back to main() - /* doing some core initializations */ - if((iRet = modInitIminternal()) != RS_RET_OK) { - fprintf(stderr, "fatal error: could not initialize errbuf object (error code %d).\n", - iRet); - exit(1); /* "good" exit, leaving at init for fatal error */ - } - - if((iRet = loadBuildInModules()) != RS_RET_OK) { - fprintf(stderr, "fatal error: could not activate built-in modules. Error code %d.\n", - iRet); - exit(1); /* "good" exit, leaving at init for fatal error */ - } -#endif - /* Note: signals MUST be processed by the thread this code is running in. The reason * is that we need to interrupt the select() system call. -- rgerhards, 2007-10-17 */ |