diff options
Diffstat (limited to 'BitTorrent/Downloader.py')
-rw-r--r-- | BitTorrent/Downloader.py | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/BitTorrent/Downloader.py b/BitTorrent/Downloader.py new file mode 100644 index 0000000..accb332 --- /dev/null +++ b/BitTorrent/Downloader.py @@ -0,0 +1,363 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +# Written by Bram Cohen, Uoti Urpala + +from random import shuffle + +from BitTorrent.platform import bttime +from BitTorrent.CurrentRateMeasure import Measure +from BitTorrent.bitfield import Bitfield + + +class PerIPStats(object): + + def __init__(self): + self.numgood = 0 + self.bad = {} + self.numconnections = 0 + self.lastdownload = None + self.peerid = None + + +class BadDataGuard(object): + + def __init__(self, download): + self.download = download + self.ip = download.connection.ip + self.downloader = download.downloader + self.stats = self.downloader.perip[self.ip] + self.lastindex = None + + def bad(self, index, bump = False): + self.stats.bad.setdefault(index, 0) + self.stats.bad[index] += 1 + if self.ip not in self.downloader.bad_peers: + self.downloader.bad_peers[self.ip] = (False, self.stats) + if self.download is not None: + self.downloader.kick(self.download) + self.download = None + elif len(self.stats.bad) > 1 and self.stats.numconnections == 1 and \ + self.stats.lastdownload is not None: + # kick new connection from same IP if previous one sent bad data, + # mainly to give the algorithm time to find other bad pieces + # in case the peer is sending a lot of bad data + self.downloader.kick(self.stats.lastdownload) + if len(self.stats.bad) >= 3 and len(self.stats.bad) > \ + self.stats.numgood // 30: + self.downloader.ban(self.ip) + elif bump: + self.downloader.picker.bump(index) + + def good(self, index): + # lastindex is a hack to only increase numgood for by one for each good + # piece, however many chunks came from the connection(s) from this IP + if index != self.lastindex: + self.stats.numgood += 1 + self.lastindex = index + + +class SingleDownload(object): + + def __init__(self, downloader, connection): + self.downloader = downloader + self.connection = connection + self.choked = True + self.interested = False + self.active_requests = [] + self.measure = Measure(downloader.config['max_rate_period']) + self.peermeasure = Measure(max(downloader.storage.piece_size / 10000, + 20)) + self.have = Bitfield(downloader.numpieces) + self.last = 0 + self.example_interest = None + self.backlog = 2 + self.guard = BadDataGuard(self) + + def _backlog(self): + backlog = 2 + int(4 * self.measure.get_rate() / + self.downloader.chunksize) + if backlog > 50: + backlog = max(50, int(.075 * backlog)) + self.backlog = backlog + return backlog + + def disconnected(self): + self.downloader.lost_peer(self) + for i in xrange(len(self.have)): + if self.have[i]: + self.downloader.picker.lost_have(i) + self._letgo() + self.guard.download = None + + def _letgo(self): + if not self.active_requests: + return + if self.downloader.storage.endgame: + self.active_requests = [] + return + lost = [] + for index, begin, length in self.active_requests: + self.downloader.storage.request_lost(index, begin, length) + if index not in lost: + lost.append(index) + self.active_requests = [] + ds = [d for d in self.downloader.downloads if not d.choked] + shuffle(ds) + for d in ds: + d._request_more(lost) + for d in self.downloader.downloads: + if d.choked and not d.interested: + for l in lost: + if d.have[l] and self.downloader.storage.do_I_have_requests(l): + d.interested = True + d.connection.send_interested() + break + + def got_choke(self): + if not self.choked: + self.choked = True + self._letgo() + + def got_unchoke(self): + if self.choked: + self.choked = False + if self.interested: + self._request_more() + + def got_piece(self, index, begin, piece): + try: + self.active_requests.remove((index, begin, len(piece))) + except ValueError: + self.downloader.discarded_bytes += len(piece) + return False + if self.downloader.storage.endgame: + self.downloader.all_requests.remove((index, begin, len(piece))) + self.last = bttime() + self.measure.update_rate(len(piece)) + self.downloader.measurefunc(len(piece)) + self.downloader.downmeasure.update_rate(len(piece)) + if not self.downloader.storage.piece_came_in(index, begin, piece, + self.guard): + if self.downloader.storage.endgame: + while self.downloader.storage.do_I_have_requests(index): + nb, nl = self.downloader.storage.new_request(index) + self.downloader.all_requests.append((index, nb, nl)) + for d in self.downloader.downloads: + d.fix_download_endgame() + return False + ds = [d for d in self.downloader.downloads if not d.choked] + shuffle(ds) + for d in ds: + d._request_more([index]) + return False + if self.downloader.storage.do_I_have(index): + self.downloader.picker.complete(index) + if self.downloader.storage.endgame: + for d in self.downloader.downloads: + if d is not self and d.interested: + if d.choked: + d.fix_download_endgame() + else: + try: + d.active_requests.remove((index, begin, len(piece))) + except ValueError: + continue + d.connection.send_cancel(index, begin, len(piece)) + d.fix_download_endgame() + self._request_more() + if self.downloader.picker.am_I_complete(): + for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]: + d.connection.close() + return self.downloader.storage.do_I_have(index) + + def _want(self, index): + return self.have[index] and self.downloader.storage.do_I_have_requests(index) + + def _request_more(self, indices = None): + assert not self.choked + if len(self.active_requests) >= self._backlog(): + return + if self.downloader.storage.endgame: + self.fix_download_endgame() + return + lost_interests = [] + while len(self.active_requests) < self.backlog: + if indices is None: + interest = self.downloader.picker.next(self._want, self.have.numfalse == 0) + else: + interest = None + for i in indices: + if self.have[i] and self.downloader.storage.do_I_have_requests(i): + interest = i + break + if interest is None: + break + if not self.interested: + self.interested = True + self.connection.send_interested() + self.example_interest = interest + self.downloader.picker.requested(interest, self.have.numfalse == 0) + while len(self.active_requests) < (self.backlog-2) * 5 + 2: + begin, length = self.downloader.storage.new_request(interest) + self.active_requests.append((interest, begin, length)) + self.connection.send_request(interest, begin, length) + if not self.downloader.storage.do_I_have_requests(interest): + lost_interests.append(interest) + break + if not self.active_requests and self.interested: + self.interested = False + self.connection.send_not_interested() + if lost_interests: + for d in self.downloader.downloads: + if d.active_requests or not d.interested: + continue + if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest): + continue + for lost in lost_interests: + if d.have[lost]: + break + else: + continue + interest = self.downloader.picker.next(d._want, d.have.numfalse == 0) + if interest is None: + d.interested = False + d.connection.send_not_interested() + else: + d.example_interest = interest + if self.downloader.storage.endgame: + self.downloader.all_requests = [] + for d in self.downloader.downloads: + self.downloader.all_requests.extend(d.active_requests) + for d in self.downloader.downloads: + d.fix_download_endgame() + + def fix_download_endgame(self): + want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests] + if self.interested and not self.active_requests and not want: + self.interested = False + self.connection.send_not_interested() + return + if not self.interested and want: + self.interested = True + self.connection.send_interested() + if self.choked or len(self.active_requests) >= self._backlog(): + return + shuffle(want) + del want[self.backlog - len(self.active_requests):] + self.active_requests.extend(want) + for piece, begin, length in want: + self.connection.send_request(piece, begin, length) + + def got_have(self, index): + if self.have[index]: + return + if index == self.downloader.numpieces-1: + self.peermeasure.update_rate(self.downloader.storage.total_length- + (self.downloader.numpieces-1)*self.downloader.storage.piece_size) + else: + self.peermeasure.update_rate(self.downloader.storage.piece_size) + self.have[index] = True + self.downloader.picker.got_have(index) + if self.downloader.picker.am_I_complete() and self.have.numfalse == 0: + self.connection.close() + return + if self.downloader.storage.endgame: + self.fix_download_endgame() + elif self.downloader.storage.do_I_have_requests(index): + if not self.choked: + self._request_more([index]) + else: + if not self.interested: + self.interested = True + self.connection.send_interested() + + def got_have_bitfield(self, have): + if self.downloader.picker.am_I_complete() and have.numfalse == 0: + self.connection.close() + return + self.have = have + for i in xrange(len(self.have)): + if self.have[i]: + self.downloader.picker.got_have(i) + if self.downloader.storage.endgame: + for piece, begin, length in self.downloader.all_requests: + if self.have[piece]: + self.interested = True + self.connection.send_interested() + return + for i in xrange(len(self.have)): + if self.have[i] and self.downloader.storage.do_I_have_requests(i): + self.interested = True + self.connection.send_interested() + return + + def get_rate(self): + return self.measure.get_rate() + + def is_snubbed(self): + return bttime() - self.last > self.downloader.snub_time + + +class Downloader(object): + + def __init__(self, config, storage, picker, numpieces, downmeasure, + measurefunc, kickfunc, banfunc): + self.config = config + self.storage = storage + self.picker = picker + self.chunksize = config['download_slice_size'] + self.downmeasure = downmeasure + self.numpieces = numpieces + self.snub_time = config['snub_time'] + self.measurefunc = measurefunc + self.kickfunc = kickfunc + self.banfunc = banfunc + self.downloads = [] + self.perip = {} + self.bad_peers = {} + self.discarded_bytes = 0 + + def make_download(self, connection): + ip = connection.ip + perip = self.perip.get(ip) + if perip is None: + perip = PerIPStats() + self.perip[ip] = perip + perip.numconnections += 1 + d = SingleDownload(self, connection) + perip.lastdownload = d + perip.peerid = connection.id + self.downloads.append(d) + return d + + def lost_peer(self, download): + self.downloads.remove(download) + ip = download.connection.ip + self.perip[ip].numconnections -= 1 + if self.perip[ip].lastdownload == download: + self.perip[ip].lastdownload = None + + def kick(self, download): + if not self.config['retaliate_to_garbled_data']: + return + ip = download.connection.ip + peerid = download.connection.id + # kickfunc will schedule connection.close() to be executed later; we + # might now be inside RawServer event loop with events from that + # connection already queued, and trying to handle them after doing + # close() now could cause problems. + self.kickfunc(download.connection) + + def ban(self, ip): + if not self.config['retaliate_to_garbled_data']: + return + self.banfunc(ip) + self.bad_peers[ip] = (True, self.perip[ip]) |