summaryrefslogtreecommitdiffstats
path: root/BitTorrent/Connecter.py
blob: 2d69a2d9ab83bba85a0a6ebbd61aa70d57878997 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# The contents of this file are subject to the BitTorrent Open Source License
# Version 1.1 (the License).  You may not copy or use this file, in either
# source code or executable form, except in compliance with the License.  You
# may obtain a copy of the License at http://www.bittorrent.com/license/.
#
# Software distributed under the License is distributed on an AS IS basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
# for the specific language governing rights and limitations under the
# License.

# Originally written by Bram Cohen, heavily modified by Uoti Urpala

# required for python 2.2
from __future__ import generators

from binascii import b2a_hex
from struct import pack, unpack

from BitTorrent.RawServer_magic import Handler
from BitTorrent.bitfield import Bitfield
from BitTorrent.obsoletepythonsupport import *

def toint(s):
    return unpack("!i", s)[0]

def tobinary(i):
    return pack("!i", i)

CHOKE = chr(0)
UNCHOKE = chr(1)
INTERESTED = chr(2)
NOT_INTERESTED = chr(3)
# index
HAVE = chr(4)
# index, bitfield
BITFIELD = chr(5)
# index, begin, length
REQUEST = chr(6)
# index, begin, piece
PIECE = chr(7)
# index, begin, piece
CANCEL = chr(8)

# 2-byte port message
PORT = chr(9)

# reserved flags
DHT = 1
FLAGS = '\0' * 7 + '\1'
protocol_name = 'BitTorrent protocol'


class Connection(Handler):

    def __init__(self, encoder, connection, id, is_local):
        self.encoder = encoder
        self.connection = connection
        self.connection.handler = self
        self.id = id
        self.ip = connection.ip
        self.locally_initiated = is_local
        self.complete = False
        self.closed = False
        self.got_anything = False
        self.next_upload = None
        self.upload = None
        self.download = None
        self._buffer = []
        self._buffer_len = 0
        self._reader = self._read_messages()
        self._next_len = self._reader.next()
        self._partial_message = None
        self._outqueue = []
        self.choke_sent = True
        self.uses_dht = False
        self.dht_port = None
        if self.locally_initiated:
            connection.write(chr(len(protocol_name)) + protocol_name +
                FLAGS + self.encoder.download_id)
            if self.id is not None:
                connection.write(self.encoder.my_id)

    def close(self):
        if not self.closed:
            self.connection.close()
            self._sever()

    def send_interested(self):
        self._send_message(INTERESTED)

    def send_not_interested(self):
        self._send_message(NOT_INTERESTED)

    def send_choke(self):
        if self._partial_message is None:
            self._send_message(CHOKE)
            self.choke_sent = True
            self.upload.sent_choke()

    def send_unchoke(self):
        if self._partial_message is None:
            self._send_message(UNCHOKE)
            self.choke_sent = False

    def send_port(self, port):
        self._send_message(PORT+pack('!H', port))
        
    def send_request(self, index, begin, length):
        self._send_message(pack("!ciii", REQUEST, index, begin, length))

    def send_cancel(self, index, begin, length):
        self._send_message(pack("!ciii", CANCEL,index, begin, length))

    def send_bitfield(self, bitfield):
        self._send_message(BITFIELD + bitfield)

    def send_have(self, index):
        self._send_message(pack("!ci", HAVE, index))

    def send_keepalive(self):
        self._send_message('')

    def send_partial(self, bytes):
        if self.closed:
            return 0
        if self._partial_message is None:
            s = self.upload.get_upload_chunk()
            if s is None:
                return 0
            index, begin, piece = s
            self._partial_message = pack("!icii%ss" % len(piece), len(piece) + 9, PIECE,
                                    index, begin, piece)
        if bytes < len(self._partial_message):
            self.upload.update_rate(bytes)
            self.connection.write(buffer(self._partial_message, 0, bytes))
            self._partial_message = buffer(self._partial_message, bytes)
            return bytes

        queue = [str(self._partial_message)]
        self._partial_message = None
        if self.choke_sent != self.upload.choked:
            if self.upload.choked:
                self._outqueue.append(pack("!ic", 1, CHOKE))
                self.upload.sent_choke()
            else:
                self._outqueue.append(pack("!ic", 1, UNCHOKE))
            self.choke_sent = self.upload.choked
        queue.extend(self._outqueue)
        self._outqueue = []
        queue = ''.join(queue)
        self.upload.update_rate(len(queue))
        self.connection.write(queue)
        return len(queue)

    # yields the number of bytes it wants next, gets those in self._message
    def _read_messages(self):
        yield 1   # header length
        if ord(self._message) != len(protocol_name):
            return

        yield len(protocol_name)
        if self._message != protocol_name:
            return

        yield 8  # reserved
        # dht is on last reserved byte
        if ord(self._message[7]) & DHT:
            self.uses_dht = True
        
        yield 20 # download id
        if self.encoder.download_id is None:  # incoming connection
            # modifies self.encoder if successful
            self.encoder.select_torrent(self, self._message)
            if self.encoder.download_id is None:
                return
        elif self._message != self.encoder.download_id:
            return
        if not self.locally_initiated:
            self.connection.write(chr(len(protocol_name)) + protocol_name +
                FLAGS + self.encoder.download_id + self.encoder.my_id)

        yield 20  # peer id
        if not self.id:
            self.id = self._message
            if self.id == self.encoder.my_id:
                return
            for v in self.encoder.connections.itervalues():
                if v is not self:
                    if v.id == self.id:
                        return
                    if self.encoder.config['one_connection_per_ip'] and \
                           v.ip == self.ip:
                        return
            if self.locally_initiated:
                self.connection.write(self.encoder.my_id)
            else:
                self.encoder.everinc = True
        else:
            if self._message != self.id:
                return
        self.complete = True
        self.encoder.connection_completed(self)

        while True:
            yield 4   # message length
            l = toint(self._message)
            if l > self.encoder.config['max_message_length']:
                return
            if l > 0:
                yield l
                self._got_message(self._message)

    def _got_message(self, message):
        t = message[0]
        if t == BITFIELD and self.got_anything:
            self.close()
            return
        self.got_anything = True
        if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and
                len(message) != 1):
            self.close()
            return
        if t == CHOKE:
            self.download.got_choke()
        elif t == UNCHOKE:
            self.download.got_unchoke()
        elif t == INTERESTED:
            self.upload.got_interested()
        elif t == NOT_INTERESTED:
            self.upload.got_not_interested()
        elif t == HAVE:
            if len(message) != 5:
                self.close()
                return
            i = unpack("!xi", message)[0]
            if i >= self.encoder.numpieces:
                self.close()
                return
            self.download.got_have(i)
        elif t == BITFIELD:
            try:
                b = Bitfield(self.encoder.numpieces, message[1:])
            except ValueError:
                self.close()
                return
            self.download.got_have_bitfield(b)
        elif t == REQUEST:
            if len(message) != 13:
                self.close()
                return
            i, a, b = unpack("!xiii", message)
            if i >= self.encoder.numpieces:
                self.close()
                return
            self.upload.got_request(i, a, b)
        elif t == CANCEL:
            if len(message) != 13:
                self.close()
                return
            i, a, b = unpack("!xiii", message)
            if i >= self.encoder.numpieces:
                self.close()
                return
            self.upload.got_cancel(i, a, b)
        elif t == PIECE:
            if len(message) <= 9:
                self.close()
                return
            n = len(message) - 9
            i, a, b = unpack("!xii%ss" % n, message)
            if i >= self.encoder.numpieces:
                self.close()
                return
            if self.download.got_piece(i, a, b):
                for co in self.encoder.complete_connections:
                    co.send_have(i)
        elif t == PORT:
            if len(message) != 3:
                self.close()
                return
            self.dht_port = unpack('!H', message[1:3])[0]
            self.encoder.got_port(self)
        else:
            self.close()

    def _sever(self):
        self.closed = True
        self._reader = None
        del self.encoder.connections[self.connection]
        self.connection = None
        self.encoder.replace_connection()
        if self.complete:
            del self.encoder.complete_connections[self]
            self.download.disconnected()
            self.encoder.choker.connection_lost(self)
            self.upload = self.download = None

    def _send_message(self, message):
        s = tobinary(len(message)) + message
        if self._partial_message is not None:
            self._outqueue.append(s)
        else:
            self.connection.write(s)

    def data_came_in(self, conn, s):
        while True:
            if self.closed:
                return
            i = self._next_len - self._buffer_len
            if i > len(s):
                self._buffer.append(s)
                self._buffer_len += len(s)
                return
            m = s[:i]
            if self._buffer_len > 0:
                self._buffer.append(m)
                m = ''.join(self._buffer)
                self._buffer = []
                self._buffer_len = 0
            s = s[i:]
            self._message = m
            try:
                self._next_len = self._reader.next()
            except StopIteration:
                self.close()
                return

    def connection_lost(self, conn):
        if self.connection is None:
            assert self.closed
        else:
            assert conn is self.connection
            self._sever()

    def connection_flushed(self, connection):
        if self.complete and self.next_upload is None and (self._partial_message is not None
                                             or (self.upload and self.upload.buffer)):
                self.encoder.ratelimiter.queue(self, self.encoder.context.rlgroup)