1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
|
# The contents of this file are subject to the BitTorrent Open Source License
# Version 1.1 (the License). You may not copy or use this file, in either
# source code or executable form, except in compliance with the License. You
# may obtain a copy of the License at http://www.bittorrent.com/license/.
#
# Software distributed under the License is distributed on an AS IS basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
# Originally written by Bram Cohen, heavily modified by Uoti Urpala
# required for python 2.2
from __future__ import generators
from binascii import b2a_hex
from struct import pack, unpack
from BitTorrent.RawServer_magic import Handler
from BitTorrent.bitfield import Bitfield
from BitTorrent.obsoletepythonsupport import *
def toint(s):
return unpack("!i", s)[0]
def tobinary(i):
return pack("!i", i)
CHOKE = chr(0)
UNCHOKE = chr(1)
INTERESTED = chr(2)
NOT_INTERESTED = chr(3)
# index
HAVE = chr(4)
# index, bitfield
BITFIELD = chr(5)
# index, begin, length
REQUEST = chr(6)
# index, begin, piece
PIECE = chr(7)
# index, begin, piece
CANCEL = chr(8)
# 2-byte port message
PORT = chr(9)
# reserved flags
DHT = 1
FLAGS = '\0' * 7 + '\1'
protocol_name = 'BitTorrent protocol'
class Connection(Handler):
def __init__(self, encoder, connection, id, is_local):
self.encoder = encoder
self.connection = connection
self.connection.handler = self
self.id = id
self.ip = connection.ip
self.locally_initiated = is_local
self.complete = False
self.closed = False
self.got_anything = False
self.next_upload = None
self.upload = None
self.download = None
self._buffer = []
self._buffer_len = 0
self._reader = self._read_messages()
self._next_len = self._reader.next()
self._partial_message = None
self._outqueue = []
self.choke_sent = True
self.uses_dht = False
self.dht_port = None
if self.locally_initiated:
connection.write(chr(len(protocol_name)) + protocol_name +
FLAGS + self.encoder.download_id)
if self.id is not None:
connection.write(self.encoder.my_id)
def close(self):
if not self.closed:
self.connection.close()
self._sever()
def send_interested(self):
self._send_message(INTERESTED)
def send_not_interested(self):
self._send_message(NOT_INTERESTED)
def send_choke(self):
if self._partial_message is None:
self._send_message(CHOKE)
self.choke_sent = True
self.upload.sent_choke()
def send_unchoke(self):
if self._partial_message is None:
self._send_message(UNCHOKE)
self.choke_sent = False
def send_port(self, port):
self._send_message(PORT+pack('!H', port))
def send_request(self, index, begin, length):
self._send_message(pack("!ciii", REQUEST, index, begin, length))
def send_cancel(self, index, begin, length):
self._send_message(pack("!ciii", CANCEL,index, begin, length))
def send_bitfield(self, bitfield):
self._send_message(BITFIELD + bitfield)
def send_have(self, index):
self._send_message(pack("!ci", HAVE, index))
def send_keepalive(self):
self._send_message('')
def send_partial(self, bytes):
if self.closed:
return 0
if self._partial_message is None:
s = self.upload.get_upload_chunk()
if s is None:
return 0
index, begin, piece = s
self._partial_message = pack("!icii%ss" % len(piece), len(piece) + 9, PIECE,
index, begin, piece)
if bytes < len(self._partial_message):
self.upload.update_rate(bytes)
self.connection.write(buffer(self._partial_message, 0, bytes))
self._partial_message = buffer(self._partial_message, bytes)
return bytes
queue = [str(self._partial_message)]
self._partial_message = None
if self.choke_sent != self.upload.choked:
if self.upload.choked:
self._outqueue.append(pack("!ic", 1, CHOKE))
self.upload.sent_choke()
else:
self._outqueue.append(pack("!ic", 1, UNCHOKE))
self.choke_sent = self.upload.choked
queue.extend(self._outqueue)
self._outqueue = []
queue = ''.join(queue)
self.upload.update_rate(len(queue))
self.connection.write(queue)
return len(queue)
# yields the number of bytes it wants next, gets those in self._message
def _read_messages(self):
yield 1 # header length
if ord(self._message) != len(protocol_name):
return
yield len(protocol_name)
if self._message != protocol_name:
return
yield 8 # reserved
# dht is on last reserved byte
if ord(self._message[7]) & DHT:
self.uses_dht = True
yield 20 # download id
if self.encoder.download_id is None: # incoming connection
# modifies self.encoder if successful
self.encoder.select_torrent(self, self._message)
if self.encoder.download_id is None:
return
elif self._message != self.encoder.download_id:
return
if not self.locally_initiated:
self.connection.write(chr(len(protocol_name)) + protocol_name +
FLAGS + self.encoder.download_id + self.encoder.my_id)
yield 20 # peer id
if not self.id:
self.id = self._message
if self.id == self.encoder.my_id:
return
for v in self.encoder.connections.itervalues():
if v is not self:
if v.id == self.id:
return
if self.encoder.config['one_connection_per_ip'] and \
v.ip == self.ip:
return
if self.locally_initiated:
self.connection.write(self.encoder.my_id)
else:
self.encoder.everinc = True
else:
if self._message != self.id:
return
self.complete = True
self.encoder.connection_completed(self)
while True:
yield 4 # message length
l = toint(self._message)
if l > self.encoder.config['max_message_length']:
return
if l > 0:
yield l
self._got_message(self._message)
def _got_message(self, message):
t = message[0]
if t == BITFIELD and self.got_anything:
self.close()
return
self.got_anything = True
if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and
len(message) != 1):
self.close()
return
if t == CHOKE:
self.download.got_choke()
elif t == UNCHOKE:
self.download.got_unchoke()
elif t == INTERESTED:
self.upload.got_interested()
elif t == NOT_INTERESTED:
self.upload.got_not_interested()
elif t == HAVE:
if len(message) != 5:
self.close()
return
i = unpack("!xi", message)[0]
if i >= self.encoder.numpieces:
self.close()
return
self.download.got_have(i)
elif t == BITFIELD:
try:
b = Bitfield(self.encoder.numpieces, message[1:])
except ValueError:
self.close()
return
self.download.got_have_bitfield(b)
elif t == REQUEST:
if len(message) != 13:
self.close()
return
i, a, b = unpack("!xiii", message)
if i >= self.encoder.numpieces:
self.close()
return
self.upload.got_request(i, a, b)
elif t == CANCEL:
if len(message) != 13:
self.close()
return
i, a, b = unpack("!xiii", message)
if i >= self.encoder.numpieces:
self.close()
return
self.upload.got_cancel(i, a, b)
elif t == PIECE:
if len(message) <= 9:
self.close()
return
n = len(message) - 9
i, a, b = unpack("!xii%ss" % n, message)
if i >= self.encoder.numpieces:
self.close()
return
if self.download.got_piece(i, a, b):
for co in self.encoder.complete_connections:
co.send_have(i)
elif t == PORT:
if len(message) != 3:
self.close()
return
self.dht_port = unpack('!H', message[1:3])[0]
self.encoder.got_port(self)
else:
self.close()
def _sever(self):
self.closed = True
self._reader = None
del self.encoder.connections[self.connection]
self.connection = None
self.encoder.replace_connection()
if self.complete:
del self.encoder.complete_connections[self]
self.download.disconnected()
self.encoder.choker.connection_lost(self)
self.upload = self.download = None
def _send_message(self, message):
s = tobinary(len(message)) + message
if self._partial_message is not None:
self._outqueue.append(s)
else:
self.connection.write(s)
def data_came_in(self, conn, s):
while True:
if self.closed:
return
i = self._next_len - self._buffer_len
if i > len(s):
self._buffer.append(s)
self._buffer_len += len(s)
return
m = s[:i]
if self._buffer_len > 0:
self._buffer.append(m)
m = ''.join(self._buffer)
self._buffer = []
self._buffer_len = 0
s = s[i:]
self._message = m
try:
self._next_len = self._reader.next()
except StopIteration:
self.close()
return
def connection_lost(self, conn):
if self.connection is None:
assert self.closed
else:
assert conn is self.connection
self._sever()
def connection_flushed(self, connection):
if self.complete and self.next_upload is None and (self._partial_message is not None
or (self.upload and self.upload.buffer)):
self.encoder.ratelimiter.queue(self, self.encoder.context.rlgroup)
|