summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-04-02 16:53:29 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-04-02 16:53:29 +0000
commit9b48c4a481c64503605f25e1d0648d24f43437f1 (patch)
tree017325a20b756a2047cb3eb7d9aacdf19a52d964
parent38f0cd67626ce56b0014b05b513e2e573da25e6f (diff)
downloadrsyslog-9b48c4a481c64503605f25e1d0648d24f43437f1.tar.gz
rsyslog-9b48c4a481c64503605f25e1d0648d24f43437f1.tar.xz
rsyslog-9b48c4a481c64503605f25e1d0648d24f43437f1.zip
begun working on time-window based dequeueing (and rate limiting in
general)
-rw-r--r--doc/rsyslog_ng_comparison.html2
-rw-r--r--queue.c57
-rw-r--r--queue.h13
-rw-r--r--wti.c8
-rw-r--r--wtp.c1
-rw-r--r--wtp.h1
6 files changed, 80 insertions, 2 deletions
diff --git a/doc/rsyslog_ng_comparison.html b/doc/rsyslog_ng_comparison.html
index 2a1d15bd..852ab4ed 100644
--- a/doc/rsyslog_ng_comparison.html
+++ b/doc/rsyslog_ng_comparison.html
@@ -122,7 +122,7 @@ based framing on syslog/tcp connections</td>
<td valign="top">yes</td>
</tr>
<tr>
-<td valign="top">syslog over RELP<br>this is a truely reliable solution (plain tcp syslog can lose messages!)</td>
+<td valign="top">syslog over RELP<br>truly reliable message delivery (<a href="http://rgerhards.blogspot.com/2008/04/on-unreliability-of-plain-tcp-syslog.html">Why is plain tcp syslog not reliable?</a>)</td>
<td valign="top">yes</td>
<td valign="top">no</td>
</tr>
diff --git a/queue.c b/queue.c
index ed720c55..bfdac204 100644
--- a/queue.c
+++ b/queue.c
@@ -55,6 +55,7 @@ DEFobjStaticHelpers
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
+static rsRetVal queueRateLimiter(queue_t *pThis);
static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
@@ -341,6 +342,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
@@ -1450,6 +1452,60 @@ finalize_it:
}
+/* The rate limiter - we only need one - do we?
+ *
+* Here we may wait if a dequeue time window is defined or if we are
+ * rate-limited. TODO: If we do so, we should also look into the
+ * way new worker threads are spawned. Obviously, it doesn't make much
+ * sense to spawn additional worker threads when none of them can do any
+ * processing. However, it is deemed acceptable to allow this for an initial
+ * implementation of the timeframe/rate limiting feature.
+ * Please also note that these feature could also be implemented at the action
+ * level. However, that would limit them to be used together with actions. We have
+ * taken the broader approach, moving it right into the queue. This is even
+ * necessary if we want to prevent spawning of multiple unnecessary worker
+ * threads as described above. -- rgerhards, 2008-04-02
+ *
+ *
+ * time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format).
+ * We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy
+ * we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two
+ * windows: 0-4; 22-23:59
+ * so when to run? Let's assume we have 3am
+ *
+ * if(tTo < tFrom) {
+ * if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22])
+ * do work
+ * else
+ * sleep for tFrom - tCurr "hours" [22 - 5 --> 17]
+ * } else {
+ * if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10])
+ * do work
+ * else
+ * sleep for tTo - tCurr "hours" [4 - 3 --> 1]
+ * }
+ *
+ * Bottom line: we need to check which type of window we have and need to adjust our
+ * logic accordingly. Of course, sleep calculations need to be done up to the minute,
+ * but you get the idea from the code above.
+ */
+static rsRetVal
+queueRateLimiter(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ dbgoprint((obj_t*) pThis, "entering rate limiter\n");
+ srSleep(2, 0);
+
+finalize_it:
+ dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet);
+ RETiRet;
+}
+
+
+
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
* rgerhards, 2008-01-21
@@ -1690,6 +1746,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
diff --git a/queue.h b/queue.h
index bc09fbd8..6aa09a1b 100644
--- a/queue.h
+++ b/queue.h
@@ -82,9 +82,20 @@ typedef struct queue_s {
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
int toEnq; /* enqueue timeout */
- /* rate limiting settings (will be expanded */
+ /* rate limiting settings (will be expanded) */
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
/* end rate limiting */
+ /* dequeue time window settings (may also be expanded) */
+ int iDeqtWinFromHr; /* begin of dequeue time window (hour only) */
+ int iDeqtWinToHr; /* end of dequeue time window (hour only) */
+ /* note that begin and end have specific semantics. It is a big difference if we have
+ * begin 4, end 22 or begin 22, end 4. In the later case, dequeuing will run from 10p,
+ * throughout the night and stop at 4 in the morning. In the first case, it will start
+ * at 4am, run throughout the day, and stop at 10 in the evening! So far, not logic is
+ * applied to detect user configuration errors (and tell me how should we detect what
+ * the user really wanted...). -- rgerhards, 2008-04-02
+ */
+ /* ane dequeue time window */
rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
diff --git a/wti.c b/wti.c
index 2386dee9..c9184b36 100644
--- a/wti.c
+++ b/wti.c
@@ -370,6 +370,14 @@ wtiWorker(wti_t *pThis)
pthread_yield(); /* see big comment in function header */
# endif
+ /* if we have a rate-limiter set for this worker pool, let's call it. Please
+ * keep in mind that the rate-limiter may hold us for an extended period
+ * of time. -- rgerhards, 2008-04-02
+ */
+ if(pWtp->pfRateLimiter != NULL) {
+ pWtp->pfRateLimiter(pWtp->pUsr);
+ }
+
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
diff --git a/wtp.c b/wtp.c
index 1c2ea30f..d6192bc0 100644
--- a/wtp.c
+++ b/wtp.c
@@ -545,6 +545,7 @@ DEFpropSetMeth(wtp, pUsr, void*);
DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t);
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t);
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
+DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
diff --git a/wtp.h b/wtp.h
index 6100fe52..b928b149 100644
--- a/wtp.h
+++ b/wtp.h
@@ -68,6 +68,7 @@ typedef struct wtp_s {
pthread_mutex_t *pmutUsr;
pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
+ rsRetVal (*pfRateLimiter)(void *pUsr);
rsRetVal (*pfIsIdle)(void *pUsr, int);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
rsRetVal (*pfOnIdle)(void *pUsr, int);