summaryrefslogtreecommitdiffstats
path: root/BitTorrent/RateLimiter.py
diff options
context:
space:
mode:
Diffstat (limited to 'BitTorrent/RateLimiter.py')
-rw-r--r--BitTorrent/RateLimiter.py190
1 files changed, 190 insertions, 0 deletions
diff --git a/BitTorrent/RateLimiter.py b/BitTorrent/RateLimiter.py
new file mode 100644
index 0000000..d58f2c4
--- /dev/null
+++ b/BitTorrent/RateLimiter.py
@@ -0,0 +1,190 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+# Written by Uoti Urpala and Andrew Loewenstern
+
+from BitTorrent.platform import bttime
+
+def minctx(a,b):
+ A = B = 0
+ if a.rate > 0:
+ A = a.offset_amount / a.rate
+ if b.rate > 0:
+ B = b.offset_amount / b.rate
+ if A <= B:
+ return a
+ return b
+
+class Dummy(object):
+ def __init__(self, next):
+ self.next_upload = next
+ def send_partial(self, size):
+ return 0
+ closed = False
+
+class RateLimitedGroup(object):
+ def __init__(self, rate, got_exception):
+ self.got_exception = got_exception
+ # limiting
+ self.check_time = 0
+ self.lasttime = bttime()
+ self.offset_amount = 0
+ self.set_rate(rate)
+ # accounting
+ self.count = 0
+ self.counts = []
+
+ def set_rate(self, new_rate):
+ self.rate = new_rate * 1024
+ self.check_time = 0
+ self.offset_amount = 0
+
+class MultiRateLimiter(object):
+ def __init__(self, sched):
+ self.sched = sched
+ self.last = None
+ self.upload_rate = 0
+ self.unitsize = 17000
+ self.offset_amount = 0
+ self.ctxs = [] # list of contexts with connections in the queue
+ self.ctx_counts = {} # dict conn -> how many connections each context has
+
+ def set_parameters(self, rate, unitsize):
+ if unitsize > 17000:
+ # Since data is sent to peers in a round-robin fashion, max one
+ # full request at a time, setting this higher would send more data
+ # to peers that use request sizes larger than standard 16 KiB.
+ # 17000 instead of 16384 to allow room for metadata messages.
+ unitsize = 17000
+ self.upload_rate = rate * 1024
+ self.unitsize = unitsize
+ self.lasttime = bttime()
+ self.offset_amount = 0
+
+ def queue(self, conn, ctx):
+ assert conn.next_upload is None
+ if ctx not in self.ctxs:
+ ctx.check_time = 1
+ self.ctxs.append(ctx)
+ self.ctx_counts[ctx] = 1
+ else:
+ self.ctx_counts[ctx] += 1
+
+ if self.last is None:
+ self.last = conn
+ conn.next_upload = conn
+ self.try_send(True)
+ else:
+ conn.next_upload = self.last.next_upload
+ self.last.next_upload = conn
+ self.last = conn
+
+ def increase_offset(self, bytes):
+ self.offset_amount += bytes
+
+ def try_send(self, check_time = False):
+ t = bttime()
+ cur = self.last.next_upload
+
+ if self.upload_rate > 0:
+ self.offset_amount -= (t - self.lasttime) * self.upload_rate
+ if check_time:
+ self.offset_amount = max(self.offset_amount, -1 * self.unitsize)
+ else:
+ self.offset_amount = 0
+
+ self.lasttime = t
+
+ for ctx in self.ctxs:
+ if ctx.rate == 0:
+ ctx.offset_amount = 0
+ ctx.lasttime = t
+ elif ctx.lasttime != t:
+ ctx.offset_amount -=(t - ctx.lasttime) * ctx.rate
+ ctx.lasttime = t
+ if ctx.check_time:
+ ctx.offset_amount = max(ctx.offset_amount, -1 * self.unitsize)
+
+ min_offset = reduce(minctx, self.ctxs)
+ ctx = cur.encoder.context.rlgroup
+ while self.offset_amount <= 0 and min_offset.offset_amount <= 0:
+ if ctx.offset_amount <= 0:
+ try:
+ bytes = cur.send_partial(self.unitsize)
+ except KeyboardInterrupt:
+ raise
+ except Exception, e:
+ cur.encoder.context.rlgroup.got_exception(e)
+ cur = self.last.next_upload
+ bytes = 0
+
+ if self.upload_rate > 0:
+ self.offset_amount += bytes
+ if ctx.rate > 0:
+ ctx.offset_amount += bytes
+
+ ctx.count += bytes
+
+ if bytes == 0 or not cur.connection.is_flushed():
+ if self.last is cur:
+ self.last = None
+ cur.next_upload = None
+ self.ctx_counts = {}
+ self.ctxs = []
+ break
+ else:
+ self.last.next_upload = cur.next_upload
+ cur.next_upload = None
+ old = ctx
+ cur = self.last.next_upload
+ ctx = cur.encoder.context.rlgroup
+ self.ctx_counts[old] -= 1
+ if self.ctx_counts[old] == 0:
+ del(self.ctx_counts[old])
+ self.ctxs.remove(old)
+ if min_offset == old:
+ min_offset = reduce(minctx, self.ctxs)
+ else:
+ self.last = cur
+ cur = cur.next_upload
+ ctx = cur.encoder.context.rlgroup
+ min_offset = reduce(minctx, self.ctxs)
+ else:
+ self.last = cur
+ cur = self.last.next_upload
+ ctx = cur.encoder.context.rlgroup
+ else:
+ myDelay = minCtxDelay = 0
+ if self.upload_rate > 0:
+ myDelay = 1.0 * self.offset_amount / self.upload_rate
+ if min_offset.rate > 0:
+ minCtxDelay = 1.0 * min_offset.offset_amount / min_offset.rate
+ delay = max(myDelay, minCtxDelay)
+ self.sched(self.try_send, delay)
+
+
+ def clean_closed(self):
+ if self.last is None:
+ return
+ orig = self.last
+ if self.last.closed:
+ self.last = Dummy(self.last.next_upload)
+ self.last.encoder = orig.encoder
+ c = self.last
+ while True:
+ if c.next_upload is orig:
+ c.next_upload = self.last
+ break
+ if c.next_upload.closed:
+ o = c.next_upload
+ c.next_upload = Dummy(c.next_upload.next_upload)
+ c.next_upload.encoder = o.encoder
+ c = c.next_upload
+