diff options
Diffstat (limited to 'BitTorrent/RateLimiter.py')
-rw-r--r-- | BitTorrent/RateLimiter.py | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/BitTorrent/RateLimiter.py b/BitTorrent/RateLimiter.py new file mode 100644 index 0000000..d58f2c4 --- /dev/null +++ b/BitTorrent/RateLimiter.py @@ -0,0 +1,190 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +# Written by Uoti Urpala and Andrew Loewenstern + +from BitTorrent.platform import bttime + +def minctx(a,b): + A = B = 0 + if a.rate > 0: + A = a.offset_amount / a.rate + if b.rate > 0: + B = b.offset_amount / b.rate + if A <= B: + return a + return b + +class Dummy(object): + def __init__(self, next): + self.next_upload = next + def send_partial(self, size): + return 0 + closed = False + +class RateLimitedGroup(object): + def __init__(self, rate, got_exception): + self.got_exception = got_exception + # limiting + self.check_time = 0 + self.lasttime = bttime() + self.offset_amount = 0 + self.set_rate(rate) + # accounting + self.count = 0 + self.counts = [] + + def set_rate(self, new_rate): + self.rate = new_rate * 1024 + self.check_time = 0 + self.offset_amount = 0 + +class MultiRateLimiter(object): + def __init__(self, sched): + self.sched = sched + self.last = None + self.upload_rate = 0 + self.unitsize = 17000 + self.offset_amount = 0 + self.ctxs = [] # list of contexts with connections in the queue + self.ctx_counts = {} # dict conn -> how many connections each context has + + def set_parameters(self, rate, unitsize): + if unitsize > 17000: + # Since data is sent to peers in a round-robin fashion, max one + # full request at a time, setting this higher would send more data + # to peers that use request sizes larger than standard 16 KiB. + # 17000 instead of 16384 to allow room for metadata messages. + unitsize = 17000 + self.upload_rate = rate * 1024 + self.unitsize = unitsize + self.lasttime = bttime() + self.offset_amount = 0 + + def queue(self, conn, ctx): + assert conn.next_upload is None + if ctx not in self.ctxs: + ctx.check_time = 1 + self.ctxs.append(ctx) + self.ctx_counts[ctx] = 1 + else: + self.ctx_counts[ctx] += 1 + + if self.last is None: + self.last = conn + conn.next_upload = conn + self.try_send(True) + else: + conn.next_upload = self.last.next_upload + self.last.next_upload = conn + self.last = conn + + def increase_offset(self, bytes): + self.offset_amount += bytes + + def try_send(self, check_time = False): + t = bttime() + cur = self.last.next_upload + + if self.upload_rate > 0: + self.offset_amount -= (t - self.lasttime) * self.upload_rate + if check_time: + self.offset_amount = max(self.offset_amount, -1 * self.unitsize) + else: + self.offset_amount = 0 + + self.lasttime = t + + for ctx in self.ctxs: + if ctx.rate == 0: + ctx.offset_amount = 0 + ctx.lasttime = t + elif ctx.lasttime != t: + ctx.offset_amount -=(t - ctx.lasttime) * ctx.rate + ctx.lasttime = t + if ctx.check_time: + ctx.offset_amount = max(ctx.offset_amount, -1 * self.unitsize) + + min_offset = reduce(minctx, self.ctxs) + ctx = cur.encoder.context.rlgroup + while self.offset_amount <= 0 and min_offset.offset_amount <= 0: + if ctx.offset_amount <= 0: + try: + bytes = cur.send_partial(self.unitsize) + except KeyboardInterrupt: + raise + except Exception, e: + cur.encoder.context.rlgroup.got_exception(e) + cur = self.last.next_upload + bytes = 0 + + if self.upload_rate > 0: + self.offset_amount += bytes + if ctx.rate > 0: + ctx.offset_amount += bytes + + ctx.count += bytes + + if bytes == 0 or not cur.connection.is_flushed(): + if self.last is cur: + self.last = None + cur.next_upload = None + self.ctx_counts = {} + self.ctxs = [] + break + else: + self.last.next_upload = cur.next_upload + cur.next_upload = None + old = ctx + cur = self.last.next_upload + ctx = cur.encoder.context.rlgroup + self.ctx_counts[old] -= 1 + if self.ctx_counts[old] == 0: + del(self.ctx_counts[old]) + self.ctxs.remove(old) + if min_offset == old: + min_offset = reduce(minctx, self.ctxs) + else: + self.last = cur + cur = cur.next_upload + ctx = cur.encoder.context.rlgroup + min_offset = reduce(minctx, self.ctxs) + else: + self.last = cur + cur = self.last.next_upload + ctx = cur.encoder.context.rlgroup + else: + myDelay = minCtxDelay = 0 + if self.upload_rate > 0: + myDelay = 1.0 * self.offset_amount / self.upload_rate + if min_offset.rate > 0: + minCtxDelay = 1.0 * min_offset.offset_amount / min_offset.rate + delay = max(myDelay, minCtxDelay) + self.sched(self.try_send, delay) + + + def clean_closed(self): + if self.last is None: + return + orig = self.last + if self.last.closed: + self.last = Dummy(self.last.next_upload) + self.last.encoder = orig.encoder + c = self.last + while True: + if c.next_upload is orig: + c.next_upload = self.last + break + if c.next_upload.closed: + o = c.next_upload + c.next_upload = Dummy(c.next_upload.next_upload) + c.next_upload.encoder = o.encoder + c = c.next_upload + |