summaryrefslogtreecommitdiffstats
path: root/BitTorrent/Downloader.py
diff options
context:
space:
mode:
Diffstat (limited to 'BitTorrent/Downloader.py')
-rw-r--r--BitTorrent/Downloader.py363
1 files changed, 363 insertions, 0 deletions
diff --git a/BitTorrent/Downloader.py b/BitTorrent/Downloader.py
new file mode 100644
index 0000000..accb332
--- /dev/null
+++ b/BitTorrent/Downloader.py
@@ -0,0 +1,363 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+# Written by Bram Cohen, Uoti Urpala
+
+from random import shuffle
+
+from BitTorrent.platform import bttime
+from BitTorrent.CurrentRateMeasure import Measure
+from BitTorrent.bitfield import Bitfield
+
+
+class PerIPStats(object):
+
+ def __init__(self):
+ self.numgood = 0
+ self.bad = {}
+ self.numconnections = 0
+ self.lastdownload = None
+ self.peerid = None
+
+
+class BadDataGuard(object):
+
+ def __init__(self, download):
+ self.download = download
+ self.ip = download.connection.ip
+ self.downloader = download.downloader
+ self.stats = self.downloader.perip[self.ip]
+ self.lastindex = None
+
+ def bad(self, index, bump = False):
+ self.stats.bad.setdefault(index, 0)
+ self.stats.bad[index] += 1
+ if self.ip not in self.downloader.bad_peers:
+ self.downloader.bad_peers[self.ip] = (False, self.stats)
+ if self.download is not None:
+ self.downloader.kick(self.download)
+ self.download = None
+ elif len(self.stats.bad) > 1 and self.stats.numconnections == 1 and \
+ self.stats.lastdownload is not None:
+ # kick new connection from same IP if previous one sent bad data,
+ # mainly to give the algorithm time to find other bad pieces
+ # in case the peer is sending a lot of bad data
+ self.downloader.kick(self.stats.lastdownload)
+ if len(self.stats.bad) >= 3 and len(self.stats.bad) > \
+ self.stats.numgood // 30:
+ self.downloader.ban(self.ip)
+ elif bump:
+ self.downloader.picker.bump(index)
+
+ def good(self, index):
+ # lastindex is a hack to only increase numgood for by one for each good
+ # piece, however many chunks came from the connection(s) from this IP
+ if index != self.lastindex:
+ self.stats.numgood += 1
+ self.lastindex = index
+
+
+class SingleDownload(object):
+
+ def __init__(self, downloader, connection):
+ self.downloader = downloader
+ self.connection = connection
+ self.choked = True
+ self.interested = False
+ self.active_requests = []
+ self.measure = Measure(downloader.config['max_rate_period'])
+ self.peermeasure = Measure(max(downloader.storage.piece_size / 10000,
+ 20))
+ self.have = Bitfield(downloader.numpieces)
+ self.last = 0
+ self.example_interest = None
+ self.backlog = 2
+ self.guard = BadDataGuard(self)
+
+ def _backlog(self):
+ backlog = 2 + int(4 * self.measure.get_rate() /
+ self.downloader.chunksize)
+ if backlog > 50:
+ backlog = max(50, int(.075 * backlog))
+ self.backlog = backlog
+ return backlog
+
+ def disconnected(self):
+ self.downloader.lost_peer(self)
+ for i in xrange(len(self.have)):
+ if self.have[i]:
+ self.downloader.picker.lost_have(i)
+ self._letgo()
+ self.guard.download = None
+
+ def _letgo(self):
+ if not self.active_requests:
+ return
+ if self.downloader.storage.endgame:
+ self.active_requests = []
+ return
+ lost = []
+ for index, begin, length in self.active_requests:
+ self.downloader.storage.request_lost(index, begin, length)
+ if index not in lost:
+ lost.append(index)
+ self.active_requests = []
+ ds = [d for d in self.downloader.downloads if not d.choked]
+ shuffle(ds)
+ for d in ds:
+ d._request_more(lost)
+ for d in self.downloader.downloads:
+ if d.choked and not d.interested:
+ for l in lost:
+ if d.have[l] and self.downloader.storage.do_I_have_requests(l):
+ d.interested = True
+ d.connection.send_interested()
+ break
+
+ def got_choke(self):
+ if not self.choked:
+ self.choked = True
+ self._letgo()
+
+ def got_unchoke(self):
+ if self.choked:
+ self.choked = False
+ if self.interested:
+ self._request_more()
+
+ def got_piece(self, index, begin, piece):
+ try:
+ self.active_requests.remove((index, begin, len(piece)))
+ except ValueError:
+ self.downloader.discarded_bytes += len(piece)
+ return False
+ if self.downloader.storage.endgame:
+ self.downloader.all_requests.remove((index, begin, len(piece)))
+ self.last = bttime()
+ self.measure.update_rate(len(piece))
+ self.downloader.measurefunc(len(piece))
+ self.downloader.downmeasure.update_rate(len(piece))
+ if not self.downloader.storage.piece_came_in(index, begin, piece,
+ self.guard):
+ if self.downloader.storage.endgame:
+ while self.downloader.storage.do_I_have_requests(index):
+ nb, nl = self.downloader.storage.new_request(index)
+ self.downloader.all_requests.append((index, nb, nl))
+ for d in self.downloader.downloads:
+ d.fix_download_endgame()
+ return False
+ ds = [d for d in self.downloader.downloads if not d.choked]
+ shuffle(ds)
+ for d in ds:
+ d._request_more([index])
+ return False
+ if self.downloader.storage.do_I_have(index):
+ self.downloader.picker.complete(index)
+ if self.downloader.storage.endgame:
+ for d in self.downloader.downloads:
+ if d is not self and d.interested:
+ if d.choked:
+ d.fix_download_endgame()
+ else:
+ try:
+ d.active_requests.remove((index, begin, len(piece)))
+ except ValueError:
+ continue
+ d.connection.send_cancel(index, begin, len(piece))
+ d.fix_download_endgame()
+ self._request_more()
+ if self.downloader.picker.am_I_complete():
+ for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]:
+ d.connection.close()
+ return self.downloader.storage.do_I_have(index)
+
+ def _want(self, index):
+ return self.have[index] and self.downloader.storage.do_I_have_requests(index)
+
+ def _request_more(self, indices = None):
+ assert not self.choked
+ if len(self.active_requests) >= self._backlog():
+ return
+ if self.downloader.storage.endgame:
+ self.fix_download_endgame()
+ return
+ lost_interests = []
+ while len(self.active_requests) < self.backlog:
+ if indices is None:
+ interest = self.downloader.picker.next(self._want, self.have.numfalse == 0)
+ else:
+ interest = None
+ for i in indices:
+ if self.have[i] and self.downloader.storage.do_I_have_requests(i):
+ interest = i
+ break
+ if interest is None:
+ break
+ if not self.interested:
+ self.interested = True
+ self.connection.send_interested()
+ self.example_interest = interest
+ self.downloader.picker.requested(interest, self.have.numfalse == 0)
+ while len(self.active_requests) < (self.backlog-2) * 5 + 2:
+ begin, length = self.downloader.storage.new_request(interest)
+ self.active_requests.append((interest, begin, length))
+ self.connection.send_request(interest, begin, length)
+ if not self.downloader.storage.do_I_have_requests(interest):
+ lost_interests.append(interest)
+ break
+ if not self.active_requests and self.interested:
+ self.interested = False
+ self.connection.send_not_interested()
+ if lost_interests:
+ for d in self.downloader.downloads:
+ if d.active_requests or not d.interested:
+ continue
+ if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
+ continue
+ for lost in lost_interests:
+ if d.have[lost]:
+ break
+ else:
+ continue
+ interest = self.downloader.picker.next(d._want, d.have.numfalse == 0)
+ if interest is None:
+ d.interested = False
+ d.connection.send_not_interested()
+ else:
+ d.example_interest = interest
+ if self.downloader.storage.endgame:
+ self.downloader.all_requests = []
+ for d in self.downloader.downloads:
+ self.downloader.all_requests.extend(d.active_requests)
+ for d in self.downloader.downloads:
+ d.fix_download_endgame()
+
+ def fix_download_endgame(self):
+ want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]
+ if self.interested and not self.active_requests and not want:
+ self.interested = False
+ self.connection.send_not_interested()
+ return
+ if not self.interested and want:
+ self.interested = True
+ self.connection.send_interested()
+ if self.choked or len(self.active_requests) >= self._backlog():
+ return
+ shuffle(want)
+ del want[self.backlog - len(self.active_requests):]
+ self.active_requests.extend(want)
+ for piece, begin, length in want:
+ self.connection.send_request(piece, begin, length)
+
+ def got_have(self, index):
+ if self.have[index]:
+ return
+ if index == self.downloader.numpieces-1:
+ self.peermeasure.update_rate(self.downloader.storage.total_length-
+ (self.downloader.numpieces-1)*self.downloader.storage.piece_size)
+ else:
+ self.peermeasure.update_rate(self.downloader.storage.piece_size)
+ self.have[index] = True
+ self.downloader.picker.got_have(index)
+ if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
+ self.connection.close()
+ return
+ if self.downloader.storage.endgame:
+ self.fix_download_endgame()
+ elif self.downloader.storage.do_I_have_requests(index):
+ if not self.choked:
+ self._request_more([index])
+ else:
+ if not self.interested:
+ self.interested = True
+ self.connection.send_interested()
+
+ def got_have_bitfield(self, have):
+ if self.downloader.picker.am_I_complete() and have.numfalse == 0:
+ self.connection.close()
+ return
+ self.have = have
+ for i in xrange(len(self.have)):
+ if self.have[i]:
+ self.downloader.picker.got_have(i)
+ if self.downloader.storage.endgame:
+ for piece, begin, length in self.downloader.all_requests:
+ if self.have[piece]:
+ self.interested = True
+ self.connection.send_interested()
+ return
+ for i in xrange(len(self.have)):
+ if self.have[i] and self.downloader.storage.do_I_have_requests(i):
+ self.interested = True
+ self.connection.send_interested()
+ return
+
+ def get_rate(self):
+ return self.measure.get_rate()
+
+ def is_snubbed(self):
+ return bttime() - self.last > self.downloader.snub_time
+
+
+class Downloader(object):
+
+ def __init__(self, config, storage, picker, numpieces, downmeasure,
+ measurefunc, kickfunc, banfunc):
+ self.config = config
+ self.storage = storage
+ self.picker = picker
+ self.chunksize = config['download_slice_size']
+ self.downmeasure = downmeasure
+ self.numpieces = numpieces
+ self.snub_time = config['snub_time']
+ self.measurefunc = measurefunc
+ self.kickfunc = kickfunc
+ self.banfunc = banfunc
+ self.downloads = []
+ self.perip = {}
+ self.bad_peers = {}
+ self.discarded_bytes = 0
+
+ def make_download(self, connection):
+ ip = connection.ip
+ perip = self.perip.get(ip)
+ if perip is None:
+ perip = PerIPStats()
+ self.perip[ip] = perip
+ perip.numconnections += 1
+ d = SingleDownload(self, connection)
+ perip.lastdownload = d
+ perip.peerid = connection.id
+ self.downloads.append(d)
+ return d
+
+ def lost_peer(self, download):
+ self.downloads.remove(download)
+ ip = download.connection.ip
+ self.perip[ip].numconnections -= 1
+ if self.perip[ip].lastdownload == download:
+ self.perip[ip].lastdownload = None
+
+ def kick(self, download):
+ if not self.config['retaliate_to_garbled_data']:
+ return
+ ip = download.connection.ip
+ peerid = download.connection.id
+ # kickfunc will schedule connection.close() to be executed later; we
+ # might now be inside RawServer event loop with events from that
+ # connection already queued, and trying to handle them after doing
+ # close() now could cause problems.
+ self.kickfunc(download.connection)
+
+ def ban(self, ip):
+ if not self.config['retaliate_to_garbled_data']:
+ return
+ self.banfunc(ip)
+ self.bad_peers[ip] = (True, self.perip[ip])