1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
# The contents of this file are subject to the BitTorrent Open Source License
# Version 1.1 (the License). You may not copy or use this file, in either
# source code or executable form, except in compliance with the License. You
# may obtain a copy of the License at http://www.bittorrent.com/license/.
#
# Software distributed under the License is distributed on an AS IS basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
# Written by Uoti Urpala and Andrew Loewenstern
from BitTorrent.platform import bttime
def minctx(a,b):
A = B = 0
if a.rate > 0:
A = a.offset_amount / a.rate
if b.rate > 0:
B = b.offset_amount / b.rate
if A <= B:
return a
return b
class Dummy(object):
def __init__(self, next):
self.next_upload = next
def send_partial(self, size):
return 0
closed = False
class RateLimitedGroup(object):
def __init__(self, rate, got_exception):
self.got_exception = got_exception
# limiting
self.check_time = 0
self.lasttime = bttime()
self.offset_amount = 0
self.set_rate(rate)
# accounting
self.count = 0
self.counts = []
def set_rate(self, new_rate):
self.rate = new_rate * 1024
self.check_time = 0
self.offset_amount = 0
class MultiRateLimiter(object):
def __init__(self, sched):
self.sched = sched
self.last = None
self.upload_rate = 0
self.unitsize = 17000
self.offset_amount = 0
self.ctxs = [] # list of contexts with connections in the queue
self.ctx_counts = {} # dict conn -> how many connections each context has
def set_parameters(self, rate, unitsize):
if unitsize > 17000:
# Since data is sent to peers in a round-robin fashion, max one
# full request at a time, setting this higher would send more data
# to peers that use request sizes larger than standard 16 KiB.
# 17000 instead of 16384 to allow room for metadata messages.
unitsize = 17000
self.upload_rate = rate * 1024
self.unitsize = unitsize
self.lasttime = bttime()
self.offset_amount = 0
def queue(self, conn, ctx):
assert conn.next_upload is None
if ctx not in self.ctxs:
ctx.check_time = 1
self.ctxs.append(ctx)
self.ctx_counts[ctx] = 1
else:
self.ctx_counts[ctx] += 1
if self.last is None:
self.last = conn
conn.next_upload = conn
self.try_send(True)
else:
conn.next_upload = self.last.next_upload
self.last.next_upload = conn
self.last = conn
def increase_offset(self, bytes):
self.offset_amount += bytes
def try_send(self, check_time = False):
t = bttime()
cur = self.last.next_upload
if self.upload_rate > 0:
self.offset_amount -= (t - self.lasttime) * self.upload_rate
if check_time:
self.offset_amount = max(self.offset_amount, -1 * self.unitsize)
else:
self.offset_amount = 0
self.lasttime = t
for ctx in self.ctxs:
if ctx.rate == 0:
ctx.offset_amount = 0
ctx.lasttime = t
elif ctx.lasttime != t:
ctx.offset_amount -=(t - ctx.lasttime) * ctx.rate
ctx.lasttime = t
if ctx.check_time:
ctx.offset_amount = max(ctx.offset_amount, -1 * self.unitsize)
min_offset = reduce(minctx, self.ctxs)
ctx = cur.encoder.context.rlgroup
while self.offset_amount <= 0 and min_offset.offset_amount <= 0:
if ctx.offset_amount <= 0:
try:
bytes = cur.send_partial(self.unitsize)
except KeyboardInterrupt:
raise
except Exception, e:
cur.encoder.context.rlgroup.got_exception(e)
cur = self.last.next_upload
bytes = 0
if self.upload_rate > 0:
self.offset_amount += bytes
if ctx.rate > 0:
ctx.offset_amount += bytes
ctx.count += bytes
if bytes == 0 or not cur.connection.is_flushed():
if self.last is cur:
self.last = None
cur.next_upload = None
self.ctx_counts = {}
self.ctxs = []
break
else:
self.last.next_upload = cur.next_upload
cur.next_upload = None
old = ctx
cur = self.last.next_upload
ctx = cur.encoder.context.rlgroup
self.ctx_counts[old] -= 1
if self.ctx_counts[old] == 0:
del(self.ctx_counts[old])
self.ctxs.remove(old)
if min_offset == old:
min_offset = reduce(minctx, self.ctxs)
else:
self.last = cur
cur = cur.next_upload
ctx = cur.encoder.context.rlgroup
min_offset = reduce(minctx, self.ctxs)
else:
self.last = cur
cur = self.last.next_upload
ctx = cur.encoder.context.rlgroup
else:
myDelay = minCtxDelay = 0
if self.upload_rate > 0:
myDelay = 1.0 * self.offset_amount / self.upload_rate
if min_offset.rate > 0:
minCtxDelay = 1.0 * min_offset.offset_amount / min_offset.rate
delay = max(myDelay, minCtxDelay)
self.sched(self.try_send, delay)
def clean_closed(self):
if self.last is None:
return
orig = self.last
if self.last.closed:
self.last = Dummy(self.last.next_upload)
self.last.encoder = orig.encoder
c = self.last
while True:
if c.next_upload is orig:
c.next_upload = self.last
break
if c.next_upload.closed:
o = c.next_upload
c.next_upload = Dummy(c.next_upload.next_upload)
c.next_upload.encoder = o.encoder
c = c.next_upload
|