# The contents of this file are subject to the BitTorrent Open Source License # Version 1.1 (the License). You may not copy or use this file, in either # source code or executable form, except in compliance with the License. You # may obtain a copy of the License at http://www.bittorrent.com/license/. # # Software distributed under the License is distributed on an AS IS basis, # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License # for the specific language governing rights and limitations under the # License. # Written by Bram Cohen, Uoti Urpala from random import shuffle from NohGooee.platform import bttime from NohGooee.CurrentRateMeasure import Measure from NohGooee.bitfield import Bitfield class PerIPStats(object): def __init__(self): self.numgood = 0 self.bad = {} self.numconnections = 0 self.lastdownload = None self.peerid = None class BadDataGuard(object): def __init__(self, download): self.download = download self.ip = download.connection.ip self.downloader = download.downloader self.stats = self.downloader.perip[self.ip] self.lastindex = None def bad(self, index, bump = False): self.stats.bad.setdefault(index, 0) self.stats.bad[index] += 1 if self.ip not in self.downloader.bad_peers: self.downloader.bad_peers[self.ip] = (False, self.stats) if self.download is not None: self.downloader.kick(self.download) self.download = None elif len(self.stats.bad) > 1 and self.stats.numconnections == 1 and \ self.stats.lastdownload is not None: # kick new connection from same IP if previous one sent bad data, # mainly to give the algorithm time to find other bad pieces # in case the peer is sending a lot of bad data self.downloader.kick(self.stats.lastdownload) if len(self.stats.bad) >= 3 and len(self.stats.bad) > \ self.stats.numgood // 30: self.downloader.ban(self.ip) elif bump: self.downloader.picker.bump(index) def good(self, index): # lastindex is a hack to only increase numgood for by one for each good # piece, however many chunks came from the connection(s) from this IP if index != self.lastindex: self.stats.numgood += 1 self.lastindex = index class SingleDownload(object): def __init__(self, downloader, connection): self.downloader = downloader self.connection = connection self.choked = True self.interested = False self.active_requests = [] self.measure = Measure(downloader.config['max_rate_period']) self.peermeasure = Measure(max(downloader.storage.piece_size / 10000, 20)) self.have = Bitfield(downloader.numpieces) self.last = 0 self.example_interest = None self.backlog = 2 self.guard = BadDataGuard(self) def _backlog(self): backlog = 2 + int(4 * self.measure.get_rate() / self.downloader.chunksize) if backlog > 50: backlog = max(50, int(.075 * backlog)) self.backlog = backlog return backlog def disconnected(self): self.downloader.lost_peer(self) for i in xrange(len(self.have)): if self.have[i]: self.downloader.picker.lost_have(i) self._letgo() self.guard.download = None def _letgo(self): if not self.active_requests: return if self.downloader.storage.endgame: self.active_requests = [] return lost = [] for index, begin, length in self.active_requests: self.downloader.storage.request_lost(index, begin, length) if index not in lost: lost.append(index) self.active_requests = [] ds = [d for d in self.downloader.downloads if not d.choked] shuffle(ds) for d in ds: d._request_more(lost) for d in self.downloader.downloads: if d.choked and not d.interested: for l in lost: if d.have[l] and self.downloader.storage.do_I_have_requests(l): d.interested = True d.connection.send_interested() break def got_choke(self): if not self.choked: self.choked = True self._letgo() def got_unchoke(self): if self.choked: self.choked = False if self.interested: self._request_more() def got_piece(self, index, begin, piece): try: self.active_requests.remove((index, begin, len(piece))) except ValueError: self.downloader.discarded_bytes += len(piece) return False if self.downloader.storage.endgame: self.downloader.all_requests.remove((index, begin, len(piece))) self.last = bttime() self.measure.update_rate(len(piece)) self.downloader.measurefunc(len(piece)) self.downloader.downmeasure.update_rate(len(piece)) if not self.downloader.storage.piece_came_in(index, begin, piece, self.guard): if self.downloader.storage.endgame: while self.downloader.storage.do_I_have_requests(index): nb, nl = self.downloader.storage.new_request(index) self.downloader.all_requests.append((index, nb, nl)) for d in self.downloader.downloads: d.fix_download_endgame() return False ds = [d for d in self.downloader.downloads if not d.choked] shuffle(ds) for d in ds: d._request_more([index]) return False if self.downloader.storage.do_I_have(index): self.downloader.picker.complete(index) if self.downloader.storage.endgame: for d in self.downloader.downloads: if d is not self and d.interested: if d.choked: d.fix_download_endgame() else: try: d.active_requests.remove((index, begin, len(piece))) except ValueError: continue d.connection.send_cancel(index, begin, len(piece)) d.fix_download_endgame() self._request_more() if self.downloader.picker.am_I_complete(): for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]: d.connection.close() return self.downloader.storage.do_I_have(index) def _want(self, index): return self.have[index] and self.downloader.storage.do_I_have_requests(index) def _request_more(self, indices = None): assert not self.choked if len(self.active_requests) >= self._backlog(): return if self.downloader.storage.endgame: self.fix_download_endgame() return lost_interests = [] while len(self.active_requests) < self.backlog: if indices is None: interest = self.downloader.picker.next(self._want, self.have.numfalse == 0) else: interest = None for i in indices: if self.have[i] and self.downloader.storage.do_I_have_requests(i): interest = i break if interest is None: break if not self.interested: self.interested = True self.connection.send_interested() self.example_interest = interest self.downloader.picker.requested(interest, self.have.numfalse == 0) while len(self.active_requests) < (self.backlog-2) * 5 + 2: begin, length = self.downloader.storage.new_request(interest) self.active_requests.append((interest, begin, length)) self.connection.send_request(interest, begin, length) if not self.downloader.storage.do_I_have_requests(interest): lost_interests.append(interest) break if not self.active_requests and self.interested: self.interested = False self.connection.send_not_interested() if lost_interests: for d in self.downloader.downloads: if d.active_requests or not d.interested: continue if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest): continue for lost in lost_interests: if d.have[lost]: break else: continue interest = self.downloader.picker.next(d._want, d.have.numfalse == 0) if interest is None: d.interested = False d.connection.send_not_interested() else: d.example_interest = interest if self.downloader.storage.endgame: self.downloader.all_requests = [] for d in self.downloader.downloads: self.downloader.all_requests.extend(d.active_requests) for d in self.downloader.downloads: d.fix_download_endgame() def fix_download_endgame(self): want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests] if self.interested and not self.active_requests and not want: self.interested = False self.connection.send_not_interested() return if not self.interested and want: self.interested = True self.connection.send_interested() if self.choked or len(self.active_requests) >= self._backlog(): return shuffle(want) del want[self.backlog - len(self.active_requests):] self.active_requests.extend(want) for piece, begin, length in want: self.connection.send_request(piece, begin, length) def got_have(self, index): if self.have[index]: return if index == self.downloader.numpieces-1: self.peermeasure.update_rate(self.downloader.storage.total_length- (self.downloader.numpieces-1)*self.downloader.storage.piece_size) else: self.peermeasure.update_rate(self.downloader.storage.piece_size) self.have[index] = True self.downloader.picker.got_have(index) if self.downloader.picker.am_I_complete() and self.have.numfalse == 0: self.connection.close() return if self.downloader.storage.endgame: self.fix_download_endgame() elif self.downloader.storage.do_I_have_requests(index): if not self.choked: self._request_more([index]) else: if not self.interested: self.interested = True self.connection.send_interested() def got_have_bitfield(self, have): if self.downloader.picker.am_I_complete() and have.numfalse == 0: self.connection.close() return self.have = have for i in xrange(len(self.have)): if self.have[i]: self.downloader.picker.got_have(i) if self.downloader.storage.endgame: for piece, begin, length in self.downloader.all_requests: if self.have[piece]: self.interested = True self.connection.send_interested() return for i in xrange(len(self.have)): if self.have[i] and self.downloader.storage.do_I_have_requests(i): self.interested = True self.connection.send_interested() return def get_rate(self): return self.measure.get_rate() def is_snubbed(self): return bttime() - self.last > self.downloader.snub_time class Downloader(object): def __init__(self, config, storage, picker, numpieces, downmeasure, measurefunc, kickfunc, banfunc): self.config = config self.storage = storage self.picker = picker self.chunksize = config['download_slice_size'] self.downmeasure = downmeasure self.numpieces = numpieces self.snub_time = config['snub_time'] self.measurefunc = measurefunc self.kickfunc = kickfunc self.banfunc = banfunc self.downloads = [] self.perip = {} self.bad_peers = {} self.discarded_bytes = 0 def make_download(self, connection): ip = connection.ip perip = self.perip.get(ip) if perip is None: perip = PerIPStats() self.perip[ip] = perip perip.numconnections += 1 d = SingleDownload(self, connection) perip.lastdownload = d perip.peerid = connection.id self.downloads.append(d) return d def lost_peer(self, download): self.downloads.remove(download) ip = download.connection.ip self.perip[ip].numconnections -= 1 if self.perip[ip].lastdownload == download: self.perip[ip].lastdownload = None def kick(self, download): if not self.config['retaliate_to_garbled_data']: return ip = download.connection.ip peerid = download.connection.id # kickfunc will schedule connection.close() to be executed later; we # might now be inside RawServer event loop with events from that # connection already queued, and trying to handle them after doing # close() now could cause problems. self.kickfunc(download.connection) def ban(self, ip): if not self.config['retaliate_to_garbled_data']: return self.banfunc(ip) self.bad_peers[ip] = (True, self.perip[ip])