summaryrefslogtreecommitdiffstats
path: root/BitTorrent/Encoder.py
diff options
context:
space:
mode:
Diffstat (limited to 'BitTorrent/Encoder.py')
-rw-r--r--BitTorrent/Encoder.py286
1 files changed, 286 insertions, 0 deletions
diff --git a/BitTorrent/Encoder.py b/BitTorrent/Encoder.py
new file mode 100644
index 0000000..a5bd6dc
--- /dev/null
+++ b/BitTorrent/Encoder.py
@@ -0,0 +1,286 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+# Written by Bram Cohen
+
+from socket import error as socketerror
+
+from BitTorrent import BTFailure
+from BitTorrent.RawServer_magic import Handler
+from BitTorrent.NatTraversal import UPNPError
+from BitTorrent.Connecter import Connection
+from BitTorrent.platform import is_frozen_exe
+from BitTorrent.ClientIdentifier import identify_client
+
+# header, reserved, download id, my id, [length, message]
+
+class InitialConnectionHandler(Handler):
+ def __init__(self, parent, id):
+ self.parent = parent
+ self.id = id
+ self.accept = True
+
+ def connection_started(self, s):
+
+ del self.parent.pending_connections[(s.ip, s.port)]
+
+ # prevents conenctions we no longer care about from being accepted
+ if not self.accept:
+ return
+
+ con = Connection(self.parent, s, self.id, True)
+ self.parent.connections[s] = con
+
+ # it might not be obvious why this is here.
+ # if the pending queue filled and put the remaining connections
+ # into the spare list, this will push more connections in to pending
+ self.parent.replace_connection()
+
+ def connection_failed(self, addr, exception):
+ del self.parent.pending_connections[addr]
+
+ if not self.accept:
+ # we don't need to rotate the spares with replace_connection()
+ # if the Encoder object has stopped all connections
+ return
+
+ self.parent.replace_connection()
+
+
+class Encoder(object):
+
+ def __init__(self, make_upload, downloader, choker, numpieces, ratelimiter,
+ rawserver, config, my_id, schedulefunc, download_id, context, addcontactfunc, reported_port):
+ self.make_upload = make_upload
+ self.downloader = downloader
+ self.choker = choker
+ self.numpieces = numpieces
+ self.ratelimiter = ratelimiter
+ self.rawserver = rawserver
+ self.my_id = my_id
+ self.config = config
+ self.schedulefunc = schedulefunc
+ self.download_id = download_id
+ self.context = context
+ self.addcontact = addcontactfunc
+ self.reported_port = reported_port
+ self.everinc = False
+
+ # submitted
+ self.pending_connections = {}
+ # transport connected
+ self.connections = {}
+ # protocol active
+ self.complete_connections = {}
+
+ self.spares = {}
+
+ self.banned = {}
+ schedulefunc(self.send_keepalives, config['keepalive_interval'])
+
+ def send_keepalives(self):
+ self.schedulefunc(self.send_keepalives,
+ self.config['keepalive_interval'])
+ for c in self.complete_connections:
+ c.send_keepalive()
+
+ # returns False if the connection has been pushed on to self.spares
+ # other filters and a successful connection return True
+ def start_connection(self, dns, id):
+ if dns[0] in self.banned:
+ return True
+ if id == self.my_id:
+ return True
+ for v in self.connections.values():
+ if id and v.id == id:
+ return True
+ if self.config['one_connection_per_ip'] and v.ip == dns[0]:
+ return True
+
+ #print "start", len(self.pending_connections), len(self.spares), len(self.connections)
+
+ total_outstanding = len(self.connections)
+ # it's possible the pending connections could eventually complete,
+ # so we have to account for those when enforcing max_initiate
+ total_outstanding += len(self.pending_connections)
+
+ if total_outstanding >= self.config['max_initiate']:
+ self.spares[dns] = 1
+ return False
+
+ # if these fail, I'm getting a very weird dns object
+ assert isinstance(dns, tuple)
+ assert isinstance(dns[0], str)
+ assert isinstance(dns[1], int)
+
+ # looks like we connect to the same peer several times in a row.
+ # we should probably stop doing that, but this prevents it from crashing
+ if dns in self.pending_connections:
+ # uncomment this if you want to debug the multi-connect problem
+ #print "Double Add on", dns
+ #traceback.print_stack()
+ return True
+
+ handler = InitialConnectionHandler(self, id)
+ self.pending_connections[dns] = handler
+ started = self.rawserver.async_start_connection(dns, handler, self.context)
+
+ if not started:
+ del self.pending_connections[dns]
+ self.spares[dns] = 1
+ return False
+
+ return True
+
+ def connection_completed(self, c):
+ self.complete_connections[c] = 1
+ c.upload = self.make_upload(c)
+ c.download = self.downloader.make_download(c)
+ self.choker.connection_made(c)
+ if c.uses_dht:
+ c.send_port(self.reported_port)
+
+ def got_port(self, c):
+ if self.addcontact and c.uses_dht and c.dht_port != None:
+ self.addcontact(c.connection.ip, c.dht_port)
+
+ def ever_got_incoming(self):
+ return self.everinc
+
+ def how_many_connections(self):
+ return len(self.complete_connections)
+
+ def replace_connection(self):
+ while self.spares:
+ started = self.start_connection(self.spares.popitem()[0], None)
+ if not started:
+ # start_connection decided to push this connection back on to
+ # self.spares because a limit was hit. break now or loop forever
+ break
+
+ def close_connections(self):
+ # drop connections which could be made after we're not interested
+ for c in self.pending_connections.itervalues():
+ c.accept = False
+
+ for c in self.connections.itervalues():
+ if not c.closed:
+ c.connection.close()
+ c.closed = True
+
+ def singleport_connection(self, listener, con):
+ if con.ip in self.banned:
+ return
+ m = self.config['max_allow_in']
+ if m and len(self.connections) >= m:
+ return
+ self.connections[con.connection] = con
+ del listener.connections[con.connection]
+ con.encoder = self
+ con.connection.context = self.context
+
+ def ban(self, ip):
+ self.banned[ip] = None
+
+
+class SingleportListener(Handler):
+
+ def __init__(self, rawserver, nattraverser):
+ self.rawserver = rawserver
+ self.nattraverser = nattraverser
+ self.port = 0
+ self.ports = {}
+ self.port_change_notification = None
+ self.torrents = {}
+ self.connections = {}
+ self.download_id = None
+
+ def _close(self, port):
+ serversocket = self.ports[port][0]
+ self.nattraverser.unregister_port(port, "TCP")
+ self.rawserver.stop_listening(serversocket)
+ serversocket.close()
+
+ def _check_close(self, port):
+ if not port or self.port == port or len(self.ports[port][1]) > 0:
+ return
+ self._close(port)
+ del self.ports[port]
+
+ def open_port(self, port, config):
+ if port in self.ports:
+ self.port = port
+ return
+ serversocket = self.rawserver.create_serversocket(
+ port, config['bind'], reuse=True, tos=config['peer_socket_tos'])
+ try:
+ d = self.nattraverser.register_port(port, port, "TCP", config['bind'])
+ d.addCallback(self._change_port)
+ except Exception, e:
+ # blanket, just incase - we don't want to interrupt things
+ # maybe we should log it, maybe not
+ #print "UPnP registration error", e
+ pass
+ self.rawserver.start_listening(serversocket, self)
+ oldport = self.port
+ self.port = port
+ self.ports[port] = [serversocket, {}]
+ self._check_close(oldport)
+
+ def _change_port(self, port):
+ if self.port == port:
+ return
+ [serversocket, callbacks] = self.ports[self.port]
+ self.ports[port] = [serversocket, callbacks]
+ del self.ports[self.port]
+ self.port = port
+ for callback in callbacks:
+ if callback:
+ callback(port)
+
+ def get_port(self, callback = None):
+ if self.port:
+ callbacks = self.ports[self.port][1]
+ if not callbacks.has_key(callback):
+ callbacks[callback] = 1
+ else:
+ callbacks[callback] += 1
+ return self.port
+
+ def release_port(self, port, callback = None):
+ callbacks = self.ports[port][1]
+ callbacks[callback] -= 1
+ if callbacks[callback] == 0:
+ del callbacks[callback]
+ self._check_close(port)
+
+ def close_sockets(self):
+ for port in self.ports.iterkeys():
+ self._close(port)
+
+ def add_torrent(self, infohash, encoder):
+ if infohash in self.torrents:
+ raise BTFailure(_("Can't start two separate instances of the same "
+ "torrent"))
+ self.torrents[infohash] = encoder
+
+ def remove_torrent(self, infohash):
+ del self.torrents[infohash]
+
+ def select_torrent(self, conn, infohash):
+ if infohash in self.torrents:
+ self.torrents[infohash].singleport_connection(self, conn)
+
+ def connection_made(self, connection):
+ con = Connection(self, connection, None, False)
+ self.connections[connection] = con
+
+ def replace_connection(self):
+ pass