summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-04-03 13:19:48 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-04-03 13:19:48 +0000
commit46fbfee41e88034135725beb4136d44b94388ede (patch)
treeccb7efd204f881cd47d9ce8e942977614a5d35b3 /queue.c
parent5e279ea0f79250a07948ed6c24731f60e8221543 (diff)
downloadrsyslog-46fbfee41e88034135725beb4136d44b94388ede.tar.gz
rsyslog-46fbfee41e88034135725beb4136d44b94388ede.tar.xz
rsyslog-46fbfee41e88034135725beb4136d44b94388ede.zip
added the capability to specify a processing (actually dequeue) timeframe
with queues - so things can be configured to be done at off-peak hours
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c85
1 files changed, 67 insertions, 18 deletions
diff --git a/queue.c b/queue.c
index 57484e60..7456f4a6 100644
--- a/queue.c
+++ b/queue.c
@@ -1,16 +1,16 @@
/* queue.c
-*
-* This file implements the queue object and its several queueing methods.
-*
-* File begun on 2008-01-03 by RGerhards
-*
-* There is some in-depth documentation available in doc/dev_queue.html
-* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
-* if you are getting aquainted to the object.
-*
-* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
-*
-* This file is part of rsyslog.
+ *
+ * This file implements the queue object and its several queueing methods.
+ *
+ * File begun on 2008-01-03 by RGerhards
+ *
+ * There is some in-depth documentation available in doc/dev_queue.html
+ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
+ * if you are getting aquainted to the object.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -38,6 +38,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h> /* required for HP UX */
+#include <time.h>
#include <errno.h>
#include "rsyslog.h"
@@ -272,6 +273,8 @@ queueStartDA(queue_t *pThis)
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
+ CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
+ CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
@@ -342,7 +345,6 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
@@ -1269,6 +1271,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1486,20 +1489,67 @@ finalize_it:
* }
*
* Bottom line: we need to check which type of window we have and need to adjust our
- * logic accordingly. Of course, sleep calculations need to be done up to the minute,
+ * logic accordingly. Of course, sleep calculations need to be done up to the minute,
* but you get the idea from the code above.
*/
static rsRetVal
queueRateLimiter(queue_t *pThis)
{
DEFiRet;
+ int iDelay;
+ int iHrCurr;
+ time_t tCurr;
+ struct tm m;
ISOBJ_TYPE_assert(pThis, queue);
dbgoprint((obj_t*) pThis, "entering rate limiter\n");
- srSleep(2, 0);
-finalize_it:
+ iDelay = 0;
+dbgprintf("deq win from %d to %d\n", pThis->iDeqtWinFromHr, pThis->iDeqtWinToHr);
+ if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
+ /* time calls are expensive, so only do them when needed */
+ time(&tCurr);
+ localtime_r(&tCurr, &m);
+ iHrCurr = m.tm_hour;
+RUNLOG_VAR("%d", iHrCurr);
+
+ if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) {
+ if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) {
+ ; /* do not delay */
+ } else {
+ iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600;
+ /* this time, we are already into the next hour, so we need
+ * to subtract our current minute and seconds.
+ */
+ iDelay -= m.tm_min * 60;
+ iDelay -= m.tm_sec;
+ }
+ } else {
+ if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) {
+ ; /* do not delay */
+ } else {
+ if(iHrCurr < pThis->iDeqtWinFromHr) {
+ iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; /* -1 as we are already in the hour */
+ iDelay += (60 - m.tm_min) * 60;
+ iDelay += 60 - m.tm_sec;
+ } else {
+ iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600;
+ /* this time, we are already into the next hour, so we need
+ * to subtract our current minute and seconds.
+ */
+ iDelay -= m.tm_min * 60;
+ iDelay -= m.tm_sec;
+ }
+ }
+ }
+ }
+
+ if(iDelay > 0) {
+ dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
+ srSleep(iDelay, 0);
+ }
+
dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet);
RETiRet;
}
@@ -2272,6 +2322,5 @@ BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
ENDObjClassInit(queue)
-/*
- * vi:set ai:
+/* vi:set ai:
*/