summaryrefslogtreecommitdiffstats
path: root/BitTorrent/RateLimiter.py
blob: d58f2c40296ae46ad0232ee7f334aa7def046a71 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# The contents of this file are subject to the BitTorrent Open Source License
# Version 1.1 (the License).  You may not copy or use this file, in either
# source code or executable form, except in compliance with the License.  You
# may obtain a copy of the License at http://www.bittorrent.com/license/.
#
# Software distributed under the License is distributed on an AS IS basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
# for the specific language governing rights and limitations under the
# License.

# Written by Uoti Urpala and Andrew Loewenstern

from BitTorrent.platform import bttime

def minctx(a,b):
    A = B = 0
    if a.rate > 0:
        A = a.offset_amount / a.rate
    if b.rate > 0:
        B = b.offset_amount / b.rate
    if A <= B:
            return a
    return b

class Dummy(object):
    def __init__(self, next):
        self.next_upload = next
    def send_partial(self, size):
        return 0
    closed = False

class RateLimitedGroup(object):
    def __init__(self, rate, got_exception):
        self.got_exception = got_exception
        # limiting
        self.check_time = 0
        self.lasttime = bttime()
        self.offset_amount = 0
        self.set_rate(rate)
        # accounting
        self.count = 0
        self.counts = []
        
    def set_rate(self, new_rate):
        self.rate = new_rate * 1024
        self.check_time = 0
        self.offset_amount = 0
        
class MultiRateLimiter(object):
    def __init__(self, sched):
        self.sched = sched
        self.last = None
        self.upload_rate = 0
        self.unitsize = 17000
        self.offset_amount = 0
        self.ctxs = [] # list of contexts with connections in the queue
        self.ctx_counts = {} # dict conn -> how many connections each context has
        
    def set_parameters(self, rate, unitsize):
        if unitsize > 17000:
            # Since data is sent to peers in a round-robin fashion, max one
            # full request at a time, setting this higher would send more data
            # to peers that use request sizes larger than standard 16 KiB.
            # 17000 instead of 16384 to allow room for metadata messages.
            unitsize = 17000
        self.upload_rate = rate * 1024
        self.unitsize = unitsize
        self.lasttime = bttime()
        self.offset_amount = 0

    def queue(self, conn, ctx):
        assert conn.next_upload is None
        if ctx not in self.ctxs:
            ctx.check_time = 1
            self.ctxs.append(ctx)
            self.ctx_counts[ctx] = 1
        else:
            self.ctx_counts[ctx] += 1

        if self.last is None:
            self.last = conn
            conn.next_upload = conn
            self.try_send(True)
        else:
            conn.next_upload = self.last.next_upload
            self.last.next_upload = conn
            self.last = conn

    def increase_offset(self, bytes):
        self.offset_amount += bytes

    def try_send(self, check_time = False):
        t = bttime()
        cur = self.last.next_upload

        if self.upload_rate > 0:
            self.offset_amount -= (t - self.lasttime) * self.upload_rate
            if check_time:
                self.offset_amount = max(self.offset_amount, -1 * self.unitsize)
        else:
            self.offset_amount = 0

        self.lasttime = t

        for ctx in self.ctxs:
            if ctx.rate == 0:
                ctx.offset_amount = 0
                ctx.lasttime = t                
            elif ctx.lasttime != t:
                ctx.offset_amount -=(t - ctx.lasttime) * ctx.rate
                ctx.lasttime = t
                if ctx.check_time:
                    ctx.offset_amount = max(ctx.offset_amount, -1 * self.unitsize)

        min_offset = reduce(minctx, self.ctxs)
        ctx = cur.encoder.context.rlgroup
        while self.offset_amount <= 0 and min_offset.offset_amount <= 0:
            if ctx.offset_amount <= 0:
                try:
                    bytes = cur.send_partial(self.unitsize)
                except KeyboardInterrupt:
                    raise
                except Exception, e:
                    cur.encoder.context.rlgroup.got_exception(e)
                    cur = self.last.next_upload
                    bytes = 0

                if self.upload_rate > 0:
                    self.offset_amount += bytes
                if ctx.rate > 0:
                    ctx.offset_amount += bytes

                ctx.count += bytes
                
                if bytes == 0 or not cur.connection.is_flushed():
                    if self.last is cur:
                        self.last = None
                        cur.next_upload = None
                        self.ctx_counts = {}
                        self.ctxs = []
                        break
                    else:
                        self.last.next_upload = cur.next_upload
                        cur.next_upload = None
                        old = ctx
                        cur = self.last.next_upload
                        ctx = cur.encoder.context.rlgroup
                        self.ctx_counts[old] -= 1
                        if self.ctx_counts[old] == 0:
                            del(self.ctx_counts[old])
                            self.ctxs.remove(old)
                        if min_offset == old:
                            min_offset = reduce(minctx, self.ctxs)
                else:
                    self.last = cur
                    cur = cur.next_upload
                    ctx = cur.encoder.context.rlgroup
                    min_offset = reduce(minctx, self.ctxs)
            else:
                self.last = cur
                cur = self.last.next_upload
                ctx = cur.encoder.context.rlgroup
        else:
            myDelay = minCtxDelay = 0
            if self.upload_rate > 0:
                myDelay = 1.0 * self.offset_amount / self.upload_rate
            if min_offset.rate > 0:
                minCtxDelay = 1.0 * min_offset.offset_amount / min_offset.rate
            delay = max(myDelay, minCtxDelay)
            self.sched(self.try_send, delay)


    def clean_closed(self):
        if self.last is None:
            return
        orig = self.last
        if self.last.closed:
            self.last = Dummy(self.last.next_upload)
            self.last.encoder = orig.encoder
        c = self.last
        while True:
            if c.next_upload is orig:
                c.next_upload = self.last
                break
            if c.next_upload.closed:
                o = c.next_upload
                c.next_upload = Dummy(c.next_upload.next_upload)
                c.next_upload.encoder = o.encoder
            c = c.next_upload