diff options
Diffstat (limited to 'BitTorrent/Encoder.py')
-rw-r--r-- | BitTorrent/Encoder.py | 286 |
1 files changed, 286 insertions, 0 deletions
diff --git a/BitTorrent/Encoder.py b/BitTorrent/Encoder.py new file mode 100644 index 0000000..a5bd6dc --- /dev/null +++ b/BitTorrent/Encoder.py @@ -0,0 +1,286 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +# Written by Bram Cohen + +from socket import error as socketerror + +from BitTorrent import BTFailure +from BitTorrent.RawServer_magic import Handler +from BitTorrent.NatTraversal import UPNPError +from BitTorrent.Connecter import Connection +from BitTorrent.platform import is_frozen_exe +from BitTorrent.ClientIdentifier import identify_client + +# header, reserved, download id, my id, [length, message] + +class InitialConnectionHandler(Handler): + def __init__(self, parent, id): + self.parent = parent + self.id = id + self.accept = True + + def connection_started(self, s): + + del self.parent.pending_connections[(s.ip, s.port)] + + # prevents conenctions we no longer care about from being accepted + if not self.accept: + return + + con = Connection(self.parent, s, self.id, True) + self.parent.connections[s] = con + + # it might not be obvious why this is here. + # if the pending queue filled and put the remaining connections + # into the spare list, this will push more connections in to pending + self.parent.replace_connection() + + def connection_failed(self, addr, exception): + del self.parent.pending_connections[addr] +
+ if not self.accept: + # we don't need to rotate the spares with replace_connection()
+ # if the Encoder object has stopped all connections + return +
+ self.parent.replace_connection() + + +class Encoder(object): + + def __init__(self, make_upload, downloader, choker, numpieces, ratelimiter, + rawserver, config, my_id, schedulefunc, download_id, context, addcontactfunc, reported_port): + self.make_upload = make_upload + self.downloader = downloader + self.choker = choker + self.numpieces = numpieces + self.ratelimiter = ratelimiter + self.rawserver = rawserver + self.my_id = my_id + self.config = config + self.schedulefunc = schedulefunc + self.download_id = download_id + self.context = context + self.addcontact = addcontactfunc + self.reported_port = reported_port + self.everinc = False + + # submitted + self.pending_connections = {} + # transport connected + self.connections = {} + # protocol active + self.complete_connections = {} + + self.spares = {} + + self.banned = {} + schedulefunc(self.send_keepalives, config['keepalive_interval']) + + def send_keepalives(self): + self.schedulefunc(self.send_keepalives, + self.config['keepalive_interval']) + for c in self.complete_connections: + c.send_keepalive() + + # returns False if the connection has been pushed on to self.spares + # other filters and a successful connection return True + def start_connection(self, dns, id): + if dns[0] in self.banned: + return True + if id == self.my_id: + return True + for v in self.connections.values(): + if id and v.id == id: + return True + if self.config['one_connection_per_ip'] and v.ip == dns[0]: + return True + + #print "start", len(self.pending_connections), len(self.spares), len(self.connections) + + total_outstanding = len(self.connections) + # it's possible the pending connections could eventually complete, + # so we have to account for those when enforcing max_initiate + total_outstanding += len(self.pending_connections) + + if total_outstanding >= self.config['max_initiate']: + self.spares[dns] = 1 + return False + + # if these fail, I'm getting a very weird dns object + assert isinstance(dns, tuple) + assert isinstance(dns[0], str) + assert isinstance(dns[1], int) + + # looks like we connect to the same peer several times in a row. + # we should probably stop doing that, but this prevents it from crashing + if dns in self.pending_connections: + # uncomment this if you want to debug the multi-connect problem + #print "Double Add on", dns + #traceback.print_stack() + return True + + handler = InitialConnectionHandler(self, id) + self.pending_connections[dns] = handler + started = self.rawserver.async_start_connection(dns, handler, self.context) + + if not started: + del self.pending_connections[dns] + self.spares[dns] = 1 + return False + + return True + + def connection_completed(self, c): + self.complete_connections[c] = 1 + c.upload = self.make_upload(c) + c.download = self.downloader.make_download(c) + self.choker.connection_made(c) + if c.uses_dht: + c.send_port(self.reported_port) + + def got_port(self, c): + if self.addcontact and c.uses_dht and c.dht_port != None: + self.addcontact(c.connection.ip, c.dht_port) + + def ever_got_incoming(self): + return self.everinc + + def how_many_connections(self): + return len(self.complete_connections) + + def replace_connection(self): + while self.spares: + started = self.start_connection(self.spares.popitem()[0], None) + if not started: + # start_connection decided to push this connection back on to + # self.spares because a limit was hit. break now or loop forever + break + + def close_connections(self): + # drop connections which could be made after we're not interested + for c in self.pending_connections.itervalues(): + c.accept = False + + for c in self.connections.itervalues(): + if not c.closed: + c.connection.close() + c.closed = True + + def singleport_connection(self, listener, con): + if con.ip in self.banned: + return + m = self.config['max_allow_in'] + if m and len(self.connections) >= m: + return + self.connections[con.connection] = con + del listener.connections[con.connection] + con.encoder = self + con.connection.context = self.context + + def ban(self, ip): + self.banned[ip] = None + + +class SingleportListener(Handler): + + def __init__(self, rawserver, nattraverser): + self.rawserver = rawserver + self.nattraverser = nattraverser + self.port = 0 + self.ports = {} + self.port_change_notification = None + self.torrents = {} + self.connections = {} + self.download_id = None + + def _close(self, port): + serversocket = self.ports[port][0] + self.nattraverser.unregister_port(port, "TCP") + self.rawserver.stop_listening(serversocket) + serversocket.close() + + def _check_close(self, port): + if not port or self.port == port or len(self.ports[port][1]) > 0: + return + self._close(port) + del self.ports[port] + + def open_port(self, port, config): + if port in self.ports: + self.port = port + return + serversocket = self.rawserver.create_serversocket( + port, config['bind'], reuse=True, tos=config['peer_socket_tos']) + try: + d = self.nattraverser.register_port(port, port, "TCP", config['bind']) + d.addCallback(self._change_port) + except Exception, e: + # blanket, just incase - we don't want to interrupt things + # maybe we should log it, maybe not + #print "UPnP registration error", e + pass + self.rawserver.start_listening(serversocket, self) + oldport = self.port + self.port = port + self.ports[port] = [serversocket, {}] + self._check_close(oldport) + + def _change_port(self, port): + if self.port == port: + return + [serversocket, callbacks] = self.ports[self.port] + self.ports[port] = [serversocket, callbacks] + del self.ports[self.port] + self.port = port + for callback in callbacks: + if callback: + callback(port) + + def get_port(self, callback = None): + if self.port: + callbacks = self.ports[self.port][1] + if not callbacks.has_key(callback): + callbacks[callback] = 1 + else: + callbacks[callback] += 1 + return self.port + + def release_port(self, port, callback = None): + callbacks = self.ports[port][1] + callbacks[callback] -= 1 + if callbacks[callback] == 0: + del callbacks[callback] + self._check_close(port) + + def close_sockets(self): + for port in self.ports.iterkeys(): + self._close(port) + + def add_torrent(self, infohash, encoder): + if infohash in self.torrents: + raise BTFailure(_("Can't start two separate instances of the same " + "torrent")) + self.torrents[infohash] = encoder + + def remove_torrent(self, infohash): + del self.torrents[infohash] + + def select_torrent(self, conn, infohash): + if infohash in self.torrents: + self.torrents[infohash].singleport_connection(self, conn) + + def connection_made(self, connection): + con = Connection(self, connection, None, False) + self.connections[connection] = con + + def replace_connection(self): + pass |