summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeffrey C. Ollie <jeff@ocjtech.us>2008-08-19 18:26:28 -0500
committerJeffrey C. Ollie <jeff@ocjtech.us>2008-08-19 18:26:28 -0500
commitad0fe53806ab5da2ead4a790af22f47e4ea2e713 (patch)
treedf3fc3051f7c8d4255215bdc4a37e0c430221cad
parent217899d53991ce8ba4e8c534a71742d2f3117df6 (diff)
downloadnohgooee-ad0fe53806ab5da2ead4a790af22f47e4ea2e713.tar.gz
nohgooee-ad0fe53806ab5da2ead4a790af22f47e4ea2e713.tar.xz
nohgooee-ad0fe53806ab5da2ead4a790af22f47e4ea2e713.zip
Get rid of HTTPHandler and RawServer and use Twisted framework directly.
-rw-r--r--BitTorrent/HTTPHandler.py188
-rw-r--r--BitTorrent/NatCheck.py175
-rw-r--r--BitTorrent/RawServer.py501
-rw-r--r--BitTorrent/RawServer_magic.py63
-rw-r--r--BitTorrent/RawServer_twisted.py771
-rw-r--r--BitTorrent/__init__.py45
-rw-r--r--BitTorrent/parseargs.py7
-rw-r--r--BitTorrent/track.py607
-rwxr-xr-xbittorrent-tracker.py26
9 files changed, 435 insertions, 1948 deletions
diff --git a/BitTorrent/HTTPHandler.py b/BitTorrent/HTTPHandler.py
deleted file mode 100644
index d147790..0000000
--- a/BitTorrent/HTTPHandler.py
+++ /dev/null
@@ -1,188 +0,0 @@
-# The contents of this file are subject to the BitTorrent Open Source License
-# Version 1.1 (the License). You may not copy or use this file, in either
-# source code or executable form, except in compliance with the License. You
-# may obtain a copy of the License at http://www.bittorrent.com/license/.
-#
-# Software distributed under the License is distributed on an AS IS basis,
-# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
-# for the specific language governing rights and limitations under the
-# License.
-
-# Written by Bram Cohen
-
-from RawServer_magic import Handler
-from cStringIO import StringIO
-from sys import stdout
-import time
-from gzip import GzipFile
-
-DEBUG = False
-
-weekdays = [_("Mon"), _("Tue"), _("Wed"), _("Thu"), _("Fri"), _("Sat"), _("Sun")]
-
-months = [None, _("Jan"), _("Feb"), _("Mar"), _("Apr"), _("May"), _("Jun"),
- _("Jul"), _("Aug"), _("Sep"), _("Oct"), _("Nov"), _("Dec")]
-
-
-class HTTPConnection(object):
-
- def __init__(self, handler, connection):
- self.handler = handler
- self.connection = connection
- self.buf = ''
- self.closed = False
- self.done = False
- self.donereading = False
- self.next_func = self.read_type
-
- def get_ip(self):
- return self.connection.ip
-
- def data_came_in(self, data):
- if self.donereading or self.next_func is None:
- return True
- self.buf += data
- while True:
- try:
- i = self.buf.index('\n')
- except ValueError:
- return True
- val = self.buf[:i]
- self.buf = self.buf[i+1:]
- self.next_func = self.next_func(val)
- if self.donereading:
- return True
- if self.next_func is None or self.closed:
- return False
-
- def read_type(self, data):
- self.header = data.strip()
- words = data.split()
- if len(words) == 3:
- self.command, self.path, garbage = words
- self.pre1 = False
- elif len(words) == 2:
- self.command, self.path = words
- self.pre1 = True
- if self.command != 'GET':
- return None
- else:
- return None
- if self.command not in ('HEAD', 'GET'):
- return None
- self.headers = {}
- return self.read_header
-
- def read_header(self, data):
- data = data.strip()
- if data == '':
- self.donereading = True
- # check for Accept-Encoding: header, pick a
- if self.headers.has_key('accept-encoding'):
- ae = self.headers['accept-encoding']
- if DEBUG:
- print "Got Accept-Encoding: " + ae + "\n"
- else:
- #identity assumed if no header
- ae = 'identity'
- # this eventually needs to support multple acceptable types
- # q-values and all that fancy HTTP crap
- # for now assume we're only communicating with our own client
- if ae.find('gzip') != -1:
- self.encoding = 'gzip'
- else:
- #default to identity.
- self.encoding = 'identity'
- r = self.handler.getfunc(self, self.path, self.headers)
- if r is not None:
- self.answer(r)
- return None
- try:
- i = data.index(':')
- except ValueError:
- return None
- self.headers[data[:i].strip().lower()] = data[i+1:].strip()
- if DEBUG:
- print data[:i].strip() + ": " + data[i+1:].strip()
- return self.read_header
-
- def answer(self, (responsecode, responsestring, headers, data)):
- if self.closed:
- return
- if self.encoding == 'gzip':
- #transform data using gzip compression
- #this is nasty but i'm unsure of a better way at the moment
- compressed = StringIO()
- gz = GzipFile(fileobj = compressed, mode = 'wb', compresslevel = 9)
- gz.write(data)
- gz.close()
- compressed.seek(0,0)
- cdata = compressed.read()
- compressed.close()
- if len(cdata) >= len(data):
- self.encoding = 'identity'
- else:
- if DEBUG:
- print _("Compressed: %i Uncompressed: %i\n") % (len(cdata),len(data))
- data = cdata
- headers['Content-Encoding'] = 'gzip'
-
- # i'm abusing the identd field here, but this should be ok
- if self.encoding == 'identity':
- ident = '-'
- else:
- ident = self.encoding
- username = '-'
- referer = self.headers.get('referer','-')
- useragent = self.headers.get('user-agent','-')
- year, month, day, hour, minute, second, a, b, c = time.localtime(time.time())
- print '%s %s %s [%02d/%3s/%04d:%02d:%02d:%02d] "%s" %i %i "%s" "%s"' % (
- self.connection.ip, ident, username, day, months[month], year, hour,
- minute, second, self.header, responsecode, len(data), referer, useragent)
- t = time.time()
- if t - self.handler.lastflush > self.handler.minflush:
- self.handler.lastflush = t
- stdout.flush()
-
- self.done = True
- r = StringIO()
- r.write('HTTP/1.0 ' + str(responsecode) + ' ' +
- responsestring + '\r\n')
- if not self.pre1:
- headers['Content-Length'] = len(data)
- for key, value in headers.items():
- r.write(key + ': ' + str(value) + '\r\n')
- r.write('\r\n')
- if self.command != 'HEAD':
- r.write(data)
- self.connection.write(r.getvalue())
- if self.connection.is_flushed():
- self.connection.shutdown(1)
-
-
-class HTTPHandler(Handler):
-
- def __init__(self, getfunc, minflush):
- self.connections = {}
- self.getfunc = getfunc
- self.minflush = minflush
- self.lastflush = time.time()
-
- def connection_made(self, connection):
- self.connections[connection] = HTTPConnection(self, connection)
-
- def connection_flushed(self, connection):
- if self.connections[connection].done:
- connection.shutdown(1)
-
- def connection_lost(self, connection):
- ec = self.connections[connection]
- ec.closed = True
- del ec.connection
- del ec.next_func
- del self.connections[connection]
-
- def data_came_in(self, connection, data):
- c = self.connections[connection]
- if not c.data_came_in(data) and not c.closed:
- c.connection.shutdown(1)
diff --git a/BitTorrent/NatCheck.py b/BitTorrent/NatCheck.py
index 6363ded..fe59a15 100644
--- a/BitTorrent/NatCheck.py
+++ b/BitTorrent/NatCheck.py
@@ -17,85 +17,130 @@ protocol_name = 'BitTorrent protocol'
# header, reserved, download id, my id, [length, message]
+from twisted.internet.protocol import Protocol, ClientFactory
+from twisted.internet import reactor
+from twisted.python import log
class NatCheck(object):
- def __init__(self, resultfunc, downloadid, peerid, ip, port, rawserver):
+ def __init__(self, resultfunc, downloadid, peerid, ip, port):
self.resultfunc = resultfunc
self.downloadid = downloadid
self.peerid = peerid
self.ip = ip
self.port = port
- self.closed = False
- self.buffer = StringIO()
- self.next_len = 1
- self.next_func = self.read_header_len
- rawserver.async_start_connection((ip, port), self)
+ self.answered = False
- def connection_started(self, s):
- self.connection = s
- self.connection.write(chr(len(protocol_name)) + protocol_name +
- (chr(0) * 8) + self.downloadid)
+ factory = NatCheckProtocolFactory(self, downloadid, peerid)
- def connection_failed(self, addr, exception):
- self.answer(False)
+ reactor.connectTCP(ip, port, factory)
def answer(self, result):
- self.closed = True
- try:
- self.connection.close()
- except AttributeError:
- pass
- self.resultfunc(result, self.downloadid, self.peerid, self.ip, self.port)
-
- def read_header_len(self, s):
- if ord(s) != len(protocol_name):
- return None
- return len(protocol_name), self.read_header
-
- def read_header(self, s):
- if s != protocol_name:
- return None
- return 8, self.read_reserved
-
- def read_reserved(self, s):
- return 20, self.read_download_id
-
- def read_download_id(self, s):
- if s != self.downloadid:
- return None
- return 20, self.read_peer_id
-
- def read_peer_id(self, s):
- if s != self.peerid:
- return None
- self.answer(True)
- return None
-
- def data_came_in(self, connection, s):
- while True:
- if self.closed:
+ if not self.answered:
+ self.answered = True
+ log.msg('NAT check for %s:%i is %s' % (self.ip, self.port, result))
+ self.resultfunc(result, self.downloadid, self.peerid, self.ip, self.port)
+
+class NatCheckProtocolFactory(ClientFactory):
+ def __init__(self, natcheck, downloadid, peerid):
+ self.natcheck = natcheck
+ self.downloadid = downloadid
+ self.peerid = peerid
+
+ def startedConnecting(self, connector):
+ log.msg('Started to connect.')
+
+ def buildProtocol(self, addr):
+ return NatCheckProtocol(self, self.downloadid, self.peerid)
+
+ def clientConnectionLost(self, connector, reason):
+ self.natcheck.answer(False)
+ log.msg('Lost connection. Reason: %s' % reason)
+
+ def clientConnectionFailed(self, connector, reason):
+ self.natcheck.answer(False)
+ log.msg('Connection failed. Reason: %s' % reason)
+
+class NatCheckProtocol(Protocol):
+ def __init__(self, factory, downloadid, peerid):
+ self.factory = factory
+ self.downloadid = downloadid
+ self.peerid = peerid
+ self.data = ''
+ self.received_protocol_name_len = None
+ self.received_protocol_name = None
+ self.received_reserved = None
+ self.received_downloadid = None
+ self.received_peerid = None
+
+ def connectionMade(self):
+ self.transport.write(chr(len(protocol_name)))
+ self.transport.write(protocol_name)
+ self.transport.write(chr(0) * 8)
+ self.transport.write(self.downloadid)
+
+ def dataReceived(self, data):
+ self.data += data
+
+ if self.received_protocol_name_len is None:
+ if len(self.data) >= 1:
+ self.received_protocol_name_len = ord(self.data[0])
+ self.data = self.data[1:]
+ if self.received_protocol_name_len != len(protocol_name):
+ self.factory.natcheck.answer(False)
+ self.transport.loseConnection()
+ return
+ else:
return
- i = self.next_len - self.buffer.tell()
- if i > len(s):
- self.buffer.write(s)
+
+ if self.received_protocol_name is None:
+ if len(self.data) >= self.received_protocol_name_len:
+ self.received_protocol_name = self.data[:self.received_protocol_name_len]
+ self.data = self.data[self.received_protocol_name_len:]
+ if self.received_protocol_name != protocol_name:
+ log.err('Received protocol name did not match!')
+ self.factory.natcheck.answer(False)
+ self.transport.loseConnection()
+ return
+ else:
return
- self.buffer.write(s[:i])
- s = s[i:]
- m = self.buffer.getvalue()
- self.buffer.reset()
- self.buffer.truncate()
- x = self.next_func(m)
- if x is None:
- if not self.closed:
- self.answer(False)
+
+ if self.received_reserved is None:
+ if len(self.data) >= 8:
+ self.received_reserved = self.data[:8]
+ self.data = self.data[8:]
+ else:
return
- self.next_len, self.next_func = x
- def connection_lost(self, connection):
- if not self.closed:
- self.closed = True
- self.resultfunc(False, self.downloadid, self.peerid, self.ip, self.port)
+ if self.received_downloadid is None:
+ if len(self.data) >= 20:
+ self.received_downloadid = self.data[:20]
+ self.data = self.data[20:]
+ if self.received_downloadid != self.downloadid:
+ log.err('Received download id did not match!')
+ self.factory.natcheck.answer(False)
+ self.transport.loseConnection()
+ return
+ else:
+ return
+
+ if self.received_peerid is None:
+ if len(self.data) >= 20:
+ log.msg('Peerid length: %i' % len(self.peerid))
+ self.received_peerid = self.data[:20]
+ self.data = self.data[20:]
+ log.msg('Received: %s' % self.received_peerid.encode('hex'))
+ log.msg('Received: %s' % self.received_peerid.encode('quoted-printable'))
+ log.msg('Expected: %s' % self.peerid.encode('hex'))
+ log.msg('Expected: %s' % self.peerid.encode('quoted-printable'))
+ if self.received_peerid != self.peerid:
+ log.err('Received peer id did not match!')
+ self.factory.natcheck.answer(False)
+ self.transport.loseConnection()
+ return
+ else:
+ return
- def connection_flushed(self, connection):
- pass
+ if self.received_protocol_name == protocol_name and self.received_downloadid == self.downloadid and self.received_peerid == self.peerid:
+ self.factory.natcheck.answer(True)
+ self.transport.loseConnection()
diff --git a/BitTorrent/RawServer.py b/BitTorrent/RawServer.py
deleted file mode 100644
index efa067d..0000000
--- a/BitTorrent/RawServer.py
+++ /dev/null
@@ -1,501 +0,0 @@
-# The contents of this file are subject to the BitTorrent Open Source License
-# Version 1.1 (the License). You may not copy or use this file, in either
-# source code or executable form, except in compliance with the License. You
-# may obtain a copy of the License at http://www.bittorrent.com/license/.
-#
-# Software distributed under the License is distributed on an AS IS basis,
-# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
-# for the specific language governing rights and limitations under the
-# License.
-
-# Written by Bram Cohen, Uoti Urpala
-
-import os
-import sys
-import socket
-import signal
-import struct
-import thread
-from bisect import insort
-from cStringIO import StringIO
-from traceback import print_exc
-from errno import EWOULDBLOCK, ENOBUFS, EINTR
-
-from BitTorrent.platform import bttime
-from BitTorrent import WARNING, CRITICAL, FAQ_URL
-from BitTorrent.defer import Deferred
-
-try:
- from select import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP
- timemult = 1000
-except ImportError:
- from BitTorrent.selectpoll import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP
- timemult = 1
-
-NOLINGER = struct.pack('ii', 1, 0)
-
-
-class Handler(object):
-
- # there is only a semantic difference between "made" and "started".
- # I prefer "started"
- def connection_started(self, s):
- self.connection_made(s)
- def connection_made(self, s):
- pass
-
- def connection_lost(self, s):
- pass
-
- # Maybe connection_lost should just have a default 'None' exception parameter
- def connection_failed(self, addr, exception):
- pass
-
- def connection_flushed(self, s):
- pass
- def data_came_in(self, addr, datagram):
- pass
-
-
-class SingleSocket(object):
-
- def __init__(self, rawserver, sock, handler, context, addr=None):
- self.rawserver = rawserver
- self.socket = sock
- self.handler = handler
- self.buffer = []
- self.last_hit = bttime()
- self.fileno = sock.fileno()
- self.connected = False
- self.context = context
- self.ip = None
- self.port = None
-
-
- if isinstance(addr, basestring):
- # UNIX socket, not really ip
- self.ip = addr
- else:
- peername = (None, None)
- try:
- peername = self.socket.getpeername()
- except socket.error, e:
- # UDP raises (107, 'Transport endpoint is not connected')
- # but so can a TCP socket we just got from start_connection,
- # in which case addr is set and we use it later.
- if (e[0] == 107) and (addr == None):
- # lies.
- # the peer endpoint should be gathered from the
- # tuple passed to data_came_in
- try:
- peername = self.socket.getsockname()
- except socket.error, e:
- pass
-
- # this is awesome!
- # max prefers a value over None, so this is a common case:
- # max(('ip', None), ('ip', 1234)) => ('ip', 1234)
- # or the default case:
- # max(('ip', None), None) => ('ip', None)
- self.ip, self.port = max(peername, addr)
-
-
- def close(self):
- sock = self.socket
- self.socket = None
- self.buffer = []
- del self.rawserver.single_sockets[self.fileno]
- self.rawserver.poll.unregister(sock)
- self.handler = None
- if self.rawserver.config['close_with_rst']:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)
- sock.close()
-
- def shutdown(self, val):
- self.socket.shutdown(val)
-
- def is_flushed(self):
- return len(self.buffer) == 0
-
- def write(self, s):
- assert self.socket is not None
- self.buffer.append(s)
- if len(self.buffer) == 1:
- self.try_write()
-
- def try_write(self):
- if self.connected:
- try:
- while self.buffer != []:
- amount = self.socket.send(self.buffer[0])
- if amount != len(self.buffer[0]):
- if amount != 0:
- self.buffer[0] = self.buffer[0][amount:]
- break
- del self.buffer[0]
- except socket.error, e:
- code, msg = e
- if code != EWOULDBLOCK:
- self.rawserver.dead_from_write.append(self)
- return
- if self.buffer == []:
- self.rawserver.poll.register(self.socket, POLLIN)
- else:
- self.rawserver.poll.register(self.socket, POLLIN | POLLOUT)
-
-
-def default_error_handler(level, message):
- print message
-
-class RawServer(object):
-
- def __init__(self, doneflag, config, noisy=True,
- errorfunc=default_error_handler, tos=0):
- self.config = config
- self.tos = tos
- self.poll = poll()
- # {socket: SingleSocket}
- self.single_sockets = {}
- self.udp_sockets = {}
- self.dead_from_write = []
- self.doneflag = doneflag
- self.noisy = noisy
- self.errorfunc = errorfunc
- self.funcs = []
- self.externally_added_tasks = []
- self.listening_handlers = {}
- self.serversockets = {}
- self.live_contexts = {None : True}
- self.ident = thread.get_ident()
- self.to_start = []
- self.add_task(self.scan_for_timeouts, config['timeout_check_interval'])
- if sys.platform.startswith('win'):
- # Windows doesn't support pipes with select(). Just prevent sleeps
- # longer than a second instead of proper wakeup for now.
- self.wakeupfds = (None, None)
- self._wakeup()
- else:
- self.wakeupfds = os.pipe()
- self.poll.register(self.wakeupfds[0], POLLIN)
-
- def _wakeup(self):
- self.add_task(self._wakeup, 1)
-
- def add_context(self, context):
- self.live_contexts[context] = True
-
- def remove_context(self, context):
- del self.live_contexts[context]
- self.funcs = [x for x in self.funcs if x[3] != context]
-
- def add_task(self, func, delay, args=(), context=None):
- assert thread.get_ident() == self.ident
- assert type(args) == list or type(args) == tuple
- if context in self.live_contexts:
- insort(self.funcs, (bttime() + delay, func, args, context))
-
- def external_add_task(self, func, delay, args=(), context=None):
- assert type(args) == list or type(args) == tuple
- self.externally_added_tasks.append((func, delay, args, context))
- # Wake up the RawServer thread in case it's sleeping in poll()
- if self.wakeupfds[1] is not None:
- os.write(self.wakeupfds[1], 'X')
-
- def scan_for_timeouts(self):
- self.add_task(self.scan_for_timeouts,
- self.config['timeout_check_interval'])
- t = bttime() - self.config['socket_timeout']
- tokill = []
- for s in [s for s in self.single_sockets.values() if s not in self.udp_sockets.keys()]:
- if s.last_hit < t:
- tokill.append(s)
- for k in tokill:
- if k.socket is not None:
- self._close_socket(k)
-
- def create_unixserversocket(filename):
- server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- server.setblocking(0)
- server.bind(filename)
- server.listen(5)
- return server
- create_unixserversocket = staticmethod(create_unixserversocket)
-
- def create_serversocket(port, bind='', reuse=False, tos=0):
- server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if reuse and os.name != 'nt':
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server.setblocking(0)
- if tos != 0:
- try:
- server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos)
- except:
- pass
- server.bind((bind, port))
- server.listen(5)
- return server
- create_serversocket = staticmethod(create_serversocket)
-
- def create_udpsocket(port, bind='', reuse=False, tos=0):
- server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- if reuse and os.name != 'nt':
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server.setblocking(0)
- if tos != 0:
- try:
- server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos)
- except:
- pass
- server.bind((bind, port))
- return server
- create_udpsocket = staticmethod(create_udpsocket)
-
- def start_listening(self, serversocket, handler, context=None):
- self.listening_handlers[serversocket.fileno()] = (handler, context)
- self.serversockets[serversocket.fileno()] = serversocket
- self.poll.register(serversocket, POLLIN)
-
- def start_listening_udp(self, serversocket, handler, context=None):
- self.listening_handlers[serversocket.fileno()] = (handler, context)
- nss = SingleSocket(self, serversocket, handler, context)
- self.single_sockets[serversocket.fileno()] = nss
- self.udp_sockets[nss] = 1
- self.poll.register(serversocket, POLLIN)
-
- def stop_listening(self, serversocket):
- del self.listening_handlers[serversocket.fileno()]
- del self.serversockets[serversocket.fileno()]
- self.poll.unregister(serversocket)
-
- def stop_listening_udp(self, serversocket):
- del self.listening_handlers[serversocket.fileno()]
- del self.single_sockets[serversocket.fileno()]
- l = [s for s in self.udp_sockets.keys() if s.socket == serversocket]
- del self.udp_sockets[l[0]]
- self.poll.unregister(serversocket)
-
- def start_connection(self, dns, handler=None, context=None, do_bind=True):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setblocking(0)
- bindaddr = do_bind and self.config['bind']
- if bindaddr:
- sock.bind((bindaddr, 0))
- if self.tos != 0:
- try:
- sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, self.tos)
- except:
- pass
- try:
- sock.connect_ex(dns)
- except socket.error:
- sock.close()
- raise
- except Exception, e:
- sock.close()
- raise socket.error(str(e))
- self.poll.register(sock, POLLIN)
- s = SingleSocket(self, sock, handler, context, dns)
- self.single_sockets[sock.fileno()] = s
- return s
-
- def _add_pending_connection(self, addr):
- pass
-
- def _remove_pending_connection(self, addr):
- pass
-
- def async_start_connection(self, dns, handler=None, context=None, do_bind=True):
- self.to_start.insert(0, (dns, handler, context, do_bind))
- self._start_connection()
- return True
-
- def _start_connection(self):
- dns, handler, context, do_bind = self.to_start.pop()
- try:
- s = self.start_connection(dns, handler, context, do_bind)
- except Exception, e:
- handler.connection_failed(dns, e)
- else:
- handler.connection_started(s)
-
- def wrap_socket(self, sock, handler, context=None, ip=None, port=None):
- sock.setblocking(0)
- self.poll.register(sock, POLLIN)
- s = SingleSocket(self, sock, handler, context, (ip, port))
- self.single_sockets[sock.fileno()] = s
- return s
-
- # must be called from the main thread
- def install_sigint_handler(self):
- signal.signal(signal.SIGINT, self._handler)
-
- def _handler(self, signum, frame):
- self.external_add_task(self.doneflag.set, 0)
- # Allow pressing ctrl-c multiple times to raise KeyboardInterrupt,
- # in case the program is in an infinite loop
- signal.signal(signal.SIGINT, signal.default_int_handler)
-
- def _handle_events(self, events):
- for sock, event in events:
- if sock in self.serversockets:
- s = self.serversockets[sock]
- if event & (POLLHUP | POLLERR) != 0:
- try:
- self.poll.unregister(s)
- s.close()
- except socket.error, e:
- self.errorfunc(WARNING, _("failed to unregister or close server socket: %s") % str(e))
- self.errorfunc(CRITICAL, _("lost server socket"))
- else:
- handler, context = self.listening_handlers[sock]
- try:
- newsock, addr = s.accept()
- except socket.error, e:
- continue
- try:
- newsock.setblocking(0)
- nss = SingleSocket(self, newsock, handler, context, addr)
- self.single_sockets[newsock.fileno()] = nss
- self.poll.register(newsock, POLLIN)
- self._make_wrapped_call(handler. \
- connection_made, (nss,), context=context)
- except socket.error, e:
- self.errorfunc(WARNING,
- _("Error handling accepted connection: ") +
- str(e))
- else:
- s = self.single_sockets.get(sock)
- if s is None:
- if sock == self.wakeupfds[0]:
- # Another thread wrote this just to wake us up.
- os.read(sock, 1)
- continue
- s.connected = True
- if event & POLLERR:
- self._close_socket(s)
- continue
- if event & (POLLIN | POLLHUP):
- s.last_hit = bttime()
- try:
- data, addr = s.socket.recvfrom(100000)
- except socket.error, e:
- code, msg = e
- if code != EWOULDBLOCK:
- self._close_socket(s)
- continue
- if data == '' and not self.udp_sockets.has_key(s):
- self._close_socket(s)
- else:
- if not self.udp_sockets.has_key(s):
- self._make_wrapped_call(s.handler.data_came_in,
- (s, data), s)
- else:
- self._make_wrapped_call(s.handler.data_came_in,
- (addr, data), s)
-
- # data_came_in could have closed the socket (s.socket = None)
- if event & POLLOUT and s.socket is not None:
- s.try_write()
- if s.is_flushed():
- self._make_wrapped_call(s.handler.connection_flushed,
- (s,), s)
-
- def _pop_externally_added(self):
- while self.externally_added_tasks:
- task = self.externally_added_tasks.pop(0)
- self.add_task(*task)
-
- def listen_forever(self):
- ret = 0
- self.ident = thread.get_ident()
- while not self.doneflag.isSet() and not ret:
- ret = self.listen_once()
-
- def listen_once(self, period=1e9):
- try:
- self._pop_externally_added()
- if self.funcs:
- period = self.funcs[0][0] - bttime()
- if period < 0:
- period = 0
- events = self.poll.poll(period * timemult)
- if self.doneflag.isSet():
- return 0
- while self.funcs and self.funcs[0][0] <= bttime():
- garbage, func, args, context = self.funcs.pop(0)
- self._make_wrapped_call(func, args, context=context)
- self._close_dead()
- self._handle_events(events)
- if self.doneflag.isSet():
- return 0
- self._close_dead()
- except error, e:
- if self.doneflag.isSet():
- return 0
- # I can't find a coherent explanation for what the behavior
- # should be here, and people report conflicting behavior,
- # so I'll just try all the possibilities
- code = None
- if hasattr(e, '__getitem__'):
- code = e[0]
- else:
- code = e
- if code == ENOBUFS:
- # log the traceback so we can see where the exception is coming from
- print_exc(file = sys.stderr)
- self.errorfunc(CRITICAL,
- _("Have to exit due to the TCP stack flaking "
- "out. Please see the FAQ at %s") % FAQ_URL)
- return -1
- elif code in (EINTR,):
- # add other ignorable error codes here
- pass
- else:
- self.errorfunc(CRITICAL, str(e))
- return 0
- except KeyboardInterrupt:
- print_exc()
- return -1
- except:
- data = StringIO()
- print_exc(file=data)
- self.errorfunc(CRITICAL, data.getvalue())
- return 0
-
- def _make_wrapped_call(self, function, args, socket=None, context=None):
- try:
- function(*args)
- except KeyboardInterrupt:
- raise
- except Exception, e: # hopefully nothing raises strings
- # Incoming sockets can be assigned to a particular torrent during
- # a data_came_in call, and it's possible (though not likely) that
- # there could be a torrent-specific exception during the same call.
- # Therefore read the context after the call.
- if socket is not None:
- context = socket.context
- if self.noisy and context is None:
- data = StringIO()
- print_exc(file=data)
- self.errorfunc(CRITICAL, data.getvalue())
- if context is not None:
- context.got_exception(e)
-
- def _close_dead(self):
- while len(self.dead_from_write) > 0:
- old = self.dead_from_write
- self.dead_from_write = []
- for s in old:
- if s.socket is not None:
- self._close_socket(s)
-
- def _close_socket(self, s):
- sock = s.socket.fileno()
- if self.config['close_with_rst']:
- s.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)
- s.socket.close()
- self.poll.unregister(sock)
- del self.single_sockets[sock]
- s.socket = None
- self._make_wrapped_call(s.handler.connection_lost, (s,), s)
- s.handler = None
diff --git a/BitTorrent/RawServer_magic.py b/BitTorrent/RawServer_magic.py
deleted file mode 100644
index 198e237..0000000
--- a/BitTorrent/RawServer_magic.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# The contents of this file are subject to the BitTorrent Open Source License
-# Version 1.1 (the License). You may not copy or use this file, in either
-# source code or executable form, except in compliance with the License. You
-# may obtain a copy of the License at http://www.bittorrent.com/license/.
-#
-# Software distributed under the License is distributed on an AS IS basis,
-# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
-# for the specific language governing rights and limitations under the
-# License.
-
-# Written by Greg Hazel
-
-class BaseMagic:
- base = None
- too_late = False
-
-magic = BaseMagic()
-
-from BitTorrent import BTFailure
-
-try:
- import RawServer_twisted
- magic.base = RawServer_twisted.RawServer
- Handler = RawServer_twisted.Handler
-except ImportError:
- import RawServer
- magic.base = RawServer.RawServer
- Handler = RawServer.Handler
-
-def switch_rawserver(choice):
- if magic.too_late:
- raise BTFailure(_("Too late to switch RawServer backends, %s has already been used.") % str(magic.base))
-
- if choice.lower() == 'twisted':
- import RawServer_twisted
- magic.base = RawServer_twisted.RawServer
- else:
- import RawServer
- magic.base = RawServer.RawServer
-
-class _RawServerMetaclass:
- def __init__(self, *args):
- pass
-
- def __getattr__(self, name):
- magic.too_late = True
- try:
- return getattr(magic.base, name)
- except:
- raise AttributeError, name
-
-class RawServer:
- __metaclass__ = _RawServerMetaclass
- def __init__(self, *args, **kwargs):
- magic.too_late = True
- self.instance = magic.base(*args, **kwargs)
-
- def __getattr__(self, name):
- try:
- return getattr(self.instance, name)
- except:
- raise AttributeError, name
-
diff --git a/BitTorrent/RawServer_twisted.py b/BitTorrent/RawServer_twisted.py
deleted file mode 100644
index 53fc000..0000000
--- a/BitTorrent/RawServer_twisted.py
+++ /dev/null
@@ -1,771 +0,0 @@
-# The contents of this file are subject to the BitTorrent Open Source License
-# Version 1.1 (the License). You may not copy or use this file, in either
-# source code or executable form, except in compliance with the License. You
-# may obtain a copy of the License at http://www.bittorrent.com/license/.
-#
-# Software distributed under the License is distributed on an AS IS basis,
-# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
-# for the specific language governing rights and limitations under the
-# License.
-
-# Written by Greg Hazel
-
-import os
-import sys
-import socket
-import signal
-import struct
-import thread
-import threading
-from cStringIO import StringIO
-from traceback import print_exc, print_stack
-
-from BitTorrent import BTFailure, WARNING, CRITICAL, FAQ_URL
-
-from twisted.python import threadable
-# needed for twisted 1.3, otherwise the thread safe functions are not thread safe
-threadable.init(1)
-
-noSignals = True
-
-if os.name == 'nt':
- try:
- from twisted.internet import iocpreactor
- iocpreactor.proactor.install()
- noSignals = False
- except:
- # just as limited (if not more) as select, and also (supposedly) buggy
- #try:
- # from twisted.internet import win32eventreactor
- # win32eventreactor.install()
- #except:
- # pass
- pass
-else:
- try:
- from twisted.internet import kqreactor
- kqreactor.install()
- except:
- try:
- from twisted.internet import pollreactor
- pollreactor.install()
- except:
- pass
-
-#the default reactor is select-based, and will be install()ed if another has not
-from twisted.internet import reactor, task, error
-
-# as far as I know, we work with twisted 1.3 and >= 2.0
-#import twisted.copyright
-#if twisted.copyright.version.split('.') < 2:
-# raise ImportError(_("RawServer_twisted requires twisted 2.0.0 or greater"))
-
-from twisted.internet.protocol import DatagramProtocol, Protocol, Factory, ClientFactory
-from twisted.protocols.policies import TimeoutMixin
-
-NOLINGER = struct.pack('ii', 1, 0)
-
-class Handler(object):
-
- # there is only a semantic difference between "made" and "started".
- # I prefer "started"
- def connection_started(self, s):
- self.connection_made(s)
- def connection_made(self, s):
- pass
-
- def connection_lost(self, s):
- pass
-
- # Maybe connection_lost should just have a default 'None' exception parameter
- def connection_failed(self, addr, exception):
- pass
-
- def connection_flushed(self, s):
- pass
- def data_came_in(self, addr, datagram):
- pass
-
-class ConnectionWrapper(object):
- def __init__(self, rawserver, handler, context, tos=0):
- self.dying = False
- self.ip = None
- self.port = None
- self.transport = None
- self.reset_timeout = None
- self.callback_connection = None
-
- self.post_init(rawserver, handler, context)
-
- self.tos = tos
-
- self.buffer = OutputBuffer(self)
-
- def post_init(self, rawserver, handler, context):
- self.rawserver = rawserver
- self.handler = handler
- self.context = context
- if self.rawserver:
- self.rawserver.single_sockets[self] = self
-
- def get_socket(self):
- s = None
- try:
- s = self.transport.getHandle()
- except:
- try:
- # iocpreactor doesn't implement ISystemHandle like it should
- s = self.transport.socket
- except:
- pass
- return s
-
- def attach_transport(self, callback_connection, transport, reset_timeout):
- self.transport = transport
- self.callback_connection = callback_connection
- self.reset_timeout = reset_timeout
-
- try:
- address = self.transport.getPeer()
- except:
- try:
- # udp, for example
- address = self.transport.getHost()
- except:
- if not self.transport.__dict__.has_key("state"):
- self.transport.state = 'NO STATE!'
- sys.stderr.write('UNKNOWN HOST/PEER: %s:%s:%s\n' % (str(self.transport), str(self.transport.state), str(self.handler)))
- print_stack()
- # fallback incase the unknown happens,
- # there's no use raising an exception
- address = ('unknown', -1)
- pass
-
- try:
- self.ip = address.host
- self.port = address.port
- except:
- #unix sockets, for example
- pass
-
- if self.tos != 0:
- s = self.get_socket()
-
- try:
- s.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, self.tos)
- except:
- pass
-
- def sendto(self, packet, flags, addr):
- # all this can go away once we pin down the bug
- if not hasattr(self.transport, "listening"):
- self.rawserver.errorfunc(WARNING, "UDP port never setup properly when asked to write")
- elif not self.transport.listening:
- self.rawserver.errorfunc(WARNING, "UDP port cleaned up already when asked to write")
-
- ret = None
- try:
- ret = self.transport.write(packet, addr)
- except Exception, e:
- self.rawserver.errorfunc(WARNING, "UDP sendto failed: %s" % str(e))
-
- return ret
-
- def write(self, b):
- self.buffer.add(b)
-
- def _flushed(self):
- s = self
- #why do you tease me so?
- if s.handler is not None:
- #calling flushed from the write is bad form
- self.rawserver.add_task(s.handler.connection_flushed, 0, (s,))
-
- def is_flushed(self):
- return self.buffer.is_flushed()
-
- def shutdown(self, how):
- if how == socket.SHUT_WR:
- self.transport.loseWriteConnection()
- self.buffer.stopWriting()
- elif how == socket.SHUT_RD:
- self.transport.stopListening()
- else:
- self.close()
-
- def close(self):
- self.buffer.stopWriting()
-
- if self.rawserver.config['close_with_rst']:
- try:
- s = self.get_socket()
- s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)
- except:
- pass
-
- if self.rawserver.udp_sockets.has_key(self):
- # udp connections should only call stopListening
- self.transport.stopListening()
- else:
- self.transport.loseConnection()
-
- def _cleanup(self):
-
- self.buffer.connection = None
- del self.buffer
-
- self.handler = None
-
- del self.transport
-
- if self.callback_connection:
- if self.callback_connection.can_timeout:
- self.callback_connection.setTimeout(None)
- self.callback_connection.connection = None
- del self.callback_connection
-
-
-
-class OutputBuffer(object):
-
- def __init__(self, connection):
- self.connection = connection
- self.consumer = None
- self._buffer = StringIO()
-
- def is_flushed(self):
- return (self._buffer.tell() == 0)
-
- def add(self, b):
- # sometimes we get strings, sometimes we get buffers. ugg.
- if (isinstance(b, buffer)):
- b = str(b)
- self._buffer.write(b)
-
- if self.consumer is None:
- self.beginWriting()
-
- def beginWriting(self):
- self.stopWriting()
- self.consumer = self.connection.transport
- self.consumer.registerProducer(self, False)
-
- def stopWriting(self):
- if self.consumer is not None:
- self.consumer.unregisterProducer()
- self.consumer = None
-
- def resumeProducing(self):
- if self.consumer is None:
- return
-
- if self._buffer.tell() > 0:
- self.consumer.write(self._buffer.getvalue())
- self._buffer.seek(0)
- self._buffer.truncate(0)
- self.connection._flushed()
- else:
- self.stopWriting()
-
-
- def pauseProducing(self):
- pass
-
- def stopProducing(self):
- pass
-
-class CallbackConnection(object):
-
- def attachTransport(self, transport, connection, *args):
- s = connection
- if s is None:
- s = ConnectionWrapper(*args)
-
- s.attach_transport(self, transport=transport, reset_timeout=self.optionalResetTimeout)
- self.connection = s
-
- def connectionMade(self):
- s = self.connection
- s.handler.connection_started(s)
- self.optionalResetTimeout()
-
- def connectionLost(self, reason):
- reactor.callLater(0, self.post_connectionLost, reason)
-
- #twisted api inconsistancy workaround
- #sometimes connectionLost is called (not fired) from inside write()
- def post_connectionLost(self, reason):
- #print "Connection Lost", str(reason).split(":")[-1]
- #hack to try and dig up the connection if one was ever made
- if not self.__dict__.has_key("connection"):
- self.connection = self.factory.connection
- if self.connection is not None:
- self.factory.rawserver._remove_socket(self.connection)
-
- def dataReceived(self, data):
- self.optionalResetTimeout()
-
- s = self.connection
- s.rawserver._make_wrapped_call(s.handler.data_came_in,
- (s, data), s)
-
- def datagramReceived(self, data, (host, port)):
- s = self.connection
- s.rawserver._make_wrapped_call(s.handler.data_came_in,
- ((host, port), data), s)
-
- def connectionRefused(self):
- s = self.connection
- dns = (s.ip, s.port)
- reason = _("connection refused")
-
- #print "Connection Refused"
-
- if not s.dying:
- # this might not work - reason is not an exception
- s.handler.connection_failed(dns, reason)
-
- s.dying = True
-
- s.rawserver._remove_socket(s)
-
- def optionalResetTimeout(self):
- if self.can_timeout:
- self.resetTimeout()
-
-class CallbackProtocol(CallbackConnection, TimeoutMixin, Protocol):
-
- def makeConnection(self, transport):
- if isinstance(self.factory, OutgoingConnectionFactory):
- self.factory.rawserver._remove_pending_connection(self.factory.addr)
- self.can_timeout = 1
- self.setTimeout(self.factory.rawserver.config['socket_timeout'])
- self.attachTransport(transport, self.factory.connection, *self.factory.connection_args)
- Protocol.makeConnection(self, transport)
-
-class CallbackDatagramProtocol(CallbackConnection, DatagramProtocol):
-
- def startProtocol(self):
- self.can_timeout = 0
- self.attachTransport(self.transport, self.connection, ())
- DatagramProtocol.startProtocol(self)
-
- def connectionRefused(self):
- # we don't use these at all for udp, so skip the CallbackConnection
- DatagramProtocol.connectionRefused(self)
-
-class OutgoingConnectionFactory(ClientFactory):
-
- def clientConnectionFailed(self, connector, reason):
- #print "Client connection failed", str(reason).split(":")[-1]
- peer = connector.getDestination()
- dns = (peer.host, peer.port)
-
- s = self.connection
- # opt-out
- if not s.dying:
- # this might not work - reason is not an exception
- s.handler.connection_failed(dns, reason)
-
- s.dying = True
-
- self.rawserver._remove_pending_connection(peer.host)
- self.rawserver._remove_socket(self.connection)
-
-def UnimplementedWarning(msg):
- #ok, I think people get the message
- #print "UnimplementedWarning: " + str(msg)
- pass
-
-#Encoder calls stop_listening(socket) then socket.close()
-#to us they mean the same thing, so swallow the second call
-class CloseSwallower:
- def close(self):
- pass
-
-#storage for socket creation requestions, and proxy once the connection is made
-class SocketProxy(object):
- def __init__(self, port, bind, reuse, tos, protocol):
- self.port = port
- self.bind = bind
- self.reuse = reuse
- self.tos = tos
- self.protocol = protocol
- self.connection = None
-
- def __getattr__(self, name):
- try:
- return getattr(self.connection, name)
- except:
- raise AttributeError, name
-
-def default_error_handler(level, message):
- print message
-
-class RawServerMixin(object):
-
- def __init__(self, doneflag, config, noisy=True,
- errorfunc=default_error_handler, tos=0):
- self.doneflag = doneflag
- self.noisy = noisy
- self.errorfunc = errorfunc
- self.config = config
- self.tos = tos
- self.ident = thread.get_ident()
-
- def _make_wrapped_call(self, function, args, wrapper=None, context=None):
- try:
- function(*args)
- except KeyboardInterrupt:
- raise
- except Exception, e: # hopefully nothing raises strings
- # Incoming sockets can be assigned to a particular torrent during
- # a data_came_in call, and it's possible (though not likely) that
- # there could be a torrent-specific exception during the same call.
- # Therefore read the context after the call.
- if wrapper is not None:
- context = wrapper.context
- if self.noisy and context is None:
- data = StringIO()
- print_exc(file=data)
- data.seek(-1)
- self.errorfunc(CRITICAL, data.read())
- if context is not None:
- context.got_exception(e)
-
- # must be called from the main thread
- def install_sigint_handler(self):
- signal.signal(signal.SIGINT, self._handler)
-
- def _handler(self, signum, frame):
- self.external_add_task(self.doneflag.set, 0)
- # Allow pressing ctrl-c multiple times to raise KeyboardInterrupt,
- # in case the program is in an infinite loop
- signal.signal(signal.SIGINT, signal.default_int_handler)
-
-def _sl(x):
- if hasattr(x, "__len__"):
- return str(len(x))
- else:
- return str(x)
-
-class RawServer(RawServerMixin):
-
- def __init__(self, doneflag, config, noisy=True,
- errorfunc=default_error_handler, tos=0):
- RawServerMixin.__init__(self, doneflag, config, noisy, errorfunc, tos)
-
- self.listening_handlers = {}
- self.single_sockets = {}
- self.udp_sockets = {}
- self.live_contexts = {None : 1}
- self.listened = 0
-
- # for connection rate limiting
- self.pending_sockets = {}
- # this can go away when urllib does
- self.pending_sockets_lock = threading.Lock()
-
- #l2 = task.LoopingCall(self._print_connection_count)
- #l2.start(10)
-
- def _log(self, msg):
- f = open("connections.txt", "a")
- print str(msg)
- f.write(str(msg) + "\n")
- f.close()
-
- def _print_connection_count(self):
- c = len(self.single_sockets)
- u = len(self.udp_sockets)
- c -= u
- #s = "Connections(" + str(id(self)) + "): tcp(" + str(c) + ") upd(" + str(u) + ")"
- #self._log(s)
-
- d = dict()
- for s in self.single_sockets:
- state = "None"
- if not s.dying and s.transport:
- try:
- state = s.transport.state
- except:
- state = "has transport"
- else:
- state = "No transport"
- if not d.has_key(state):
- d[state] = 0
- d[state] += 1
- self._log(d)
-
- sizes = "ps(" + _sl(self.pending_sockets)
- sizes += ") lh(" + _sl(self.listening_handlers)
- sizes += ") ss(" + _sl(self.single_sockets)
- sizes += ") us(" + _sl(self.udp_sockets)
- sizes += ") lc(" + _sl(self.live_contexts) + ")"
- self._log(sizes)
-
- def add_context(self, context):
- self.live_contexts[context] = 1
-
- def remove_context(self, context):
- del self.live_contexts[context]
-
- def autoprune(self, f, *a, **kw):
- if self.live_contexts.has_key(kw['context']):
- f(*a, **kw)
-
- def add_task(self, func, delay, args=(), context=None):
- assert thread.get_ident() == self.ident
- assert type(args) == list or type(args) == tuple
-
- #we're going to check again later anyway
- #if self.live_contexts.has_key(context):
- reactor.callLater(delay, self.autoprune, self._make_wrapped_call,
- func, args, context=context)
-
- def external_add_task(self, func, delay, args=(), context=None):
- assert type(args) == list or type(args) == tuple
- reactor.callFromThread(self.add_task, func, delay, args, context)
-
- def create_unixserversocket(filename):
- s = SocketProxy(0, filename, True, 0, 'tcp')
- s.factory = Factory()
-
- if s.reuse == False:
- UnimplementedWarning(_("You asked for reuse to be off when binding. Sorry, I can't do that."))
-
- listening_port = reactor.listenUNIX(s.bind, s.factory)
- listening_port.listening = 1
- s.listening_port = listening_port
-
- return s
- create_unixserversocket = staticmethod(create_unixserversocket)
-
- def create_serversocket(port, bind='', reuse=False, tos=0):
- s = SocketProxy(port, bind, reuse, tos, 'tcp')
- s.factory = Factory()
-
- if s.reuse == False:
- UnimplementedWarning(_("You asked for reuse to be off when binding. Sorry, I can't do that."))
-
- try:
- listening_port = reactor.listenTCP(s.port, s.factory, interface=s.bind)
- except error.CannotListenError, e:
- if e[0] != 0:
- raise e.socketError
- else:
- raise
- listening_port.listening = 1
- s.listening_port = listening_port
-
- return s
- create_serversocket = staticmethod(create_serversocket)
-
- def create_udpsocket(port, bind='', reuse=False, tos=0):
- s = SocketProxy(port, bind, reuse, tos, 'udp')
- s.protocol = CallbackDatagramProtocol()
- c = ConnectionWrapper(None, None, None, tos)
- s.connection = c
- s.protocol.connection = c
-
- if s.reuse == False:
- UnimplementedWarning(_("You asked for reuse to be off when binding. Sorry, I can't do that."))
-
- try:
- listening_port = reactor.listenUDP(s.port, s.protocol, interface=s.bind)
- except error.CannotListenError, e:
- raise e.socketError
- listening_port.listening = 1
- s.listening_port = listening_port
-
- return s
- create_udpsocket = staticmethod(create_udpsocket)
-
- def create_multicastsocket(port, bind='', reuse=False, tos=0):
- s = SocketProxy(port, bind, reuse, tos, 'udp')
- s.protocol = CallbackDatagramProtocol()
- c = ConnectionWrapper(None, None, None, tos)
- s.connection = c
- s.protocol.connection = c
-
- if s.reuse == False:
- UnimplementedWarning("You asked for reuse to be off when binding. Sorry, I can't do that.")
-
- try:
- listening_port = reactor.listenMulticast(s.port, s.protocol, interface=s.bind)
- except error.CannotListenError, e:
- raise e.socketError
- listening_port.listening = 1
- s.listening_port = listening_port
-
- return s
- create_multicastsocket = staticmethod(create_multicastsocket)
-
- def start_listening(self, serversocket, handler, context=None):
- s = serversocket
- s.factory.rawserver = self
- s.factory.protocol = CallbackProtocol
- s.factory.connection = None
- s.factory.connection_args = (self, handler, context, serversocket.tos)
-
- if not s.listening_port.listening:
- s.listening_port.startListening()
- s.listening_port.listening = 1
-
- self.listening_handlers[s] = s.listening_port
-
- #provides a harmless close() method
- s.connection = CloseSwallower()
-
- def start_listening_udp(self, serversocket, handler, context=None):
- s = serversocket
-
- c = s.connection
- c.post_init(self, handler, context)
-
- if not s.listening_port.listening:
- s.listening_port.startListening()
- s.listening_port.listening = 1
-
- self.listening_handlers[serversocket] = s.listening_port
-
- self.udp_sockets[c] = c
-
- def start_listening_multicast(self, serversocket, handler, context=None):
- s = serversocket
-
- c = s.connection
- c.post_init(self, handler, context)
-
- if not s.listening_port.listening:
- s.listening_port.startListening()
- s.listening_port.listening = 1
-
- self.listening_handlers[serversocket] = s.listening_port
-
- self.udp_sockets[c] = c
-
- def stop_listening(self, serversocket):
- listening_port = self.listening_handlers[serversocket]
- try:
- listening_port.stopListening()
- listening_port.listening = 0
- except error.NotListeningError:
- pass
- del self.listening_handlers[serversocket]
-
- def stop_listening_udp(self, serversocket):
- listening_port = self.listening_handlers[serversocket]
- listening_port.stopListening()
- del self.listening_handlers[serversocket]
- del self.udp_sockets[serversocket.connection]
- del self.single_sockets[serversocket.connection]
-
- def stop_listening_multicast(self, serversocket):
- self.stop_listening_udp(serversocket)
-
- def start_connection(self, dns, handler, context=None, do_bind=True):
- addr = dns[0]
- port = int(dns[1])
-
- self._add_pending_connection(addr)
-
- bindaddr = None
- if do_bind:
- bindaddr = self.config['bind']
- if bindaddr and len(bindaddr) >= 0:
- bindaddr = (bindaddr, 0)
- else:
- bindaddr = None
-
- factory = OutgoingConnectionFactory()
- # maybe this can be had from elsewhere
- factory.addr = addr
- factory.protocol = CallbackProtocol
- factory.rawserver = self
-
- c = ConnectionWrapper(self, handler, context, self.tos)
-
- factory.connection = c
- factory.connection_args = ()
-
- connector = reactor.connectTCP(addr, port, factory, bindAddress=bindaddr)
-
- self.single_sockets[c] = c
- return c
-
- def _add_pending_connection(self, addr):
- # the XP connection rate limiting is unique at the IP level
- assert isinstance(addr, str)
- self.pending_sockets_lock.acquire()
- self.__add_pending_connection(addr)
- self.pending_sockets_lock.release()
-
- def __add_pending_connection(self, addr):
- if addr not in self.pending_sockets:
- self.pending_sockets[addr] = 1
- else:
- self.pending_sockets[addr] += 1
-
- def _remove_pending_connection(self, addr):
- self.pending_sockets_lock.acquire()
- self.__remove_pending_connection(addr)
- self.pending_sockets_lock.release()
-
- def __remove_pending_connection(self, addr):
- self.pending_sockets[addr] -= 1
- if self.pending_sockets[addr] <= 0:
- del self.pending_sockets[addr]
-
- def async_start_connection(self, dns, handler, context=None, do_bind=True):
-
- # the XP connection rate limiting is unique at the IP level
- addr = dns[0]
- if (len(self.pending_sockets) >= self.config['max_incomplete'] and
- addr not in self.pending_sockets):
- return False
-
- self.start_connection(dns, handler, context, do_bind)
- return True
-
-
- def listen_forever(self):
- self.ident = thread.get_ident()
- if self.listened:
- UnimplementedWarning(_("listen_forever() should only be called once per reactor."))
- self.listened = 1
-
- l = task.LoopingCall(self.stop)
- l.start(1)#, now = False)
-
- if noSignals:
- reactor.run(installSignalHandlers=False)
- else:
- reactor.run()
-
- def listen_once(self, period=1e9):
- UnimplementedWarning(_("listen_once() might not return until there is activity, and might not process the event you want. Use listen_forever()."))
- reactor.iterate(period)
-
- def stop(self):
- if (self.doneflag.isSet()):
-
- for connection in self.single_sockets.values():
- try:
- #I think this still sends all the data
- connection.close()
- except:
- pass
-
- reactor.suggestThreadPoolSize(0)
- reactor.stop()
-
- def _remove_socket(self, s):
- # opt-out
- if not s.dying:
- self._make_wrapped_call(s.handler.connection_lost, (s,), s)
-
- s._cleanup()
-
- del self.single_sockets[s]
- \ No newline at end of file
diff --git a/BitTorrent/__init__.py b/BitTorrent/__init__.py
index 877e27f..cae9c77 100644
--- a/BitTorrent/__init__.py
+++ b/BitTorrent/__init__.py
@@ -1,4 +1,4 @@
-# -*- coding: UTF-8 -*-
+# -*- coding: utf-8 -*-
# The contents of this file are subject to the BitTorrent Open Source License
# Version 1.1 (the License). You may not copy or use this file, in either
# source code or executable form, except in compliance with the License. You
@@ -9,14 +9,8 @@
# for the specific language governing rights and limitations under the
# License.
-app_name = 'BitTorrent'
-version = '4.4.0'
-
-URL = 'http://www.bittorrent.com/'
-DONATE_URL = URL + 'donate.html'
-FAQ_URL = URL + 'FAQ.html'
-HELP_URL = URL + 'documentation.html'
-SEARCH_URL = 'http://search.bittorrent.com/search.jsp?client=%(client)s&query=%(query)s'
+app_name = 'NoGooee'
+version = '1.0.0'
import sys
assert sys.version_info >= (2, 2, 1), _("Python %s or newer required") % '2.2.1'
@@ -35,39 +29,6 @@ if os.name == 'posix':
from BitTorrent.platform import install_translation
install_translation()
-# hackery to get around bug in py2exe that tries to write log files to
-# application directories, which may not be writable by non-admin users
-if is_frozen_exe:
- baseclass = sys.stderr.__class__
- class Stderr(baseclass):
- logroot = get_home_dir()
-
- if logroot is None:
- logroot = os.path.splitdrive(sys.executable)[0]
- if logroot[-1] != os.sep:
- logroot += os.sep
- logname = os.path.splitext(os.path.split(sys.executable)[1])[0] + '_errors.log'
- logpath = os.path.join(logroot, logname)
-
- def __init__(self):
- self.just_wrote_newline = True
-
- def write(self, text, alert=None, fname=logpath):
- output = text
-
- if self.just_wrote_newline and not text.startswith('[%s ' % version):
- output = '[%s %s] %s' % (version, time.strftime('%Y-%m-%d %H:%M:%S'), text)
-
- if 'GtkWarning' not in text:
- baseclass.write(self, output, fname=fname)
-
- if output[-1] == '\n':
- self.just_wrote_newline = True
- else:
- self.just_wrote_newline = False
-
- sys.stderr = Stderr()
-
del sys, get_home_dir, is_frozen_exe
INFO = 0
diff --git a/BitTorrent/parseargs.py b/BitTorrent/parseargs.py
index 5289aff..bd38558 100644
--- a/BitTorrent/parseargs.py
+++ b/BitTorrent/parseargs.py
@@ -17,7 +17,6 @@ from BitTorrent.defaultargs import MyBool, MYTRUE
from BitTorrent import BTFailure
from BitTorrent.bencode import bdecode
from BitTorrent.platform import is_frozen_exe
-from BitTorrent.RawServer_magic import switch_rawserver
def makeHelp(uiname, defaults):
ret = ''
@@ -146,12 +145,6 @@ def parseargs(argv, options, minargs=None, maxargs=None, presets=None):
if maxargs is not None and len(args) > maxargs:
usage(_("Too many arguments - %d maximum.") % maxargs)
- if config.has_key('twisted'):
- if config['twisted'] == 0:
- switch_rawserver('untwisted')
- elif config['twisted'] == 1:
- switch_rawserver('twisted')
-
return (config, args)
def parse_options(defaults, newvalues):
diff --git a/BitTorrent/track.py b/BitTorrent/track.py
index b6ec2db..fe4788f 100644
--- a/BitTorrent/track.py
+++ b/BitTorrent/track.py
@@ -12,20 +12,17 @@
import sys
import os
-import signal
import re
-from threading import Event
-from urlparse import urlparse
-from traceback import print_exc
from time import time, gmtime, strftime, localtime
from random import shuffle
from types import StringType, IntType, LongType, ListType, DictType
-from binascii import b2a_hex
-from cStringIO import StringIO
+
+from twisted.web import server
+from twisted.web.resource import Resource
+from twisted.internet import reactor
+from twisted.python import log
from BitTorrent.parseargs import parseargs, formatDefinitions
-from BitTorrent.RawServer_magic import RawServer
-from BitTorrent.HTTPHandler import HTTPHandler, months, weekdays
from BitTorrent.parsedir import parsedir
from BitTorrent.NatCheck import NatCheck
from BitTorrent.bencode import bencode, bdecode, Bencached
@@ -36,7 +33,7 @@ from BitTorrent import version
defaults = [
('port', 80,
_("Port to listen on.")),
- ('dfile', None,
+ ('dfile', '/tmp/dfile.txt',
_("file to store recent downloader info in")),
('bind', '',
_("ip to bind to locally")),
@@ -76,8 +73,6 @@ defaults = [
('allowed_controls', 0,
_("allow special keys in torrents in the allowed_dir to affect "
"tracker access")),
- ('hupmonitor', 0,
- _("whether to reopen the log file upon receipt of HUP signal")),
('show_infopage', 1,
_("whether to display an info page when the tracker's root dir "
"is loaded")),
@@ -93,8 +88,6 @@ defaults = [
"local network IPs (0 = never, 1 = always, 2 = ignore if NAT "
"checking is not enabled). HTTP proxy headers giving address "
"of original client are treated the same as --ip.")),
- ('logfile', '',
- _("file to write the tracker logs, use - for stdout (default)")),
('allow_get', 0,
_("use with allowed_dir; adds a /file?hash={hash} url that "
"allows users to download the torrent file")),
@@ -105,10 +98,6 @@ defaults = [
_("scrape access allowed (can be none, specific or full)")),
('max_give', 200,
_("maximum number of peers to give with any one request")),
- ('twisted', -1,
- _("Use Twisted network libraries for network connections. 1 means use twisted, 0 means do not use twisted, -1 means autodetect, and prefer twisted")),
- ('pid', '/var/run/bittorrent-tracker.pid',
- "Path to PID file")
]
def statefiletemplate(x):
@@ -159,7 +148,6 @@ def statefiletemplate(x):
raise ValueError
dirkeys[y[1]] = 1
-
alas = _("your file may exist elsewhere in the universe\nbut alas, not here\n")
def isotime(secs = None):
@@ -169,9 +157,9 @@ def isotime(secs = None):
http_via_filter = re.compile(' for ([0-9.]+)\Z')
-def _get_forwarded_ip(headers):
- if headers.has_key('http_x_forwarded_for'):
- header = headers['http_x_forwarded_for']
+def _get_forwarded_ip(request):
+ header = request.getHeader('X-Forwarded-For')
+ if header:
try:
x,y = header.split(',')
except:
@@ -179,20 +167,22 @@ def _get_forwarded_ip(headers):
if not is_local_ip(x):
return x
return y
- if headers.has_key('http_client_ip'):
- return headers['http_client_ip']
- if headers.has_key('http_via'):
- x = http_via_filter.search(headers['http_via'])
- try:
+ header = request.getHeader('Client-IP')
+ if header:
+ return header
+ header = request.getHeader('Via')
+ if header:
+ x = http_via_filter.search(header)
+ if x:
return x.group(1)
- except:
- pass
- if headers.has_key('http_from'):
- return headers['http_from']
+
+ header = request.getHeader('From')
+ if header:
+ return header
return None
-def get_forwarded_ip(headers):
- x = _get_forwarded_ip(headers)
+def get_forwarded_ip(request):
+ x = _get_forwarded_ip(request)
if x is None or not is_valid_ipv4(x) or is_local_ip(x):
return None
return x
@@ -222,16 +212,17 @@ def is_local_ip(ip):
try:
v = [int(x) for x in ip.split('.')]
if v[0] == 10 or v[0] == 127 or v[:2] in ([192, 168], [169, 254]):
- return 1
+ return True
if v[0] == 172 and v[1] >= 16 and v[1] <= 31:
- return 1
+ return True
except ValueError:
- return 0
-
+ return False
class Tracker(object):
- def __init__(self, config, rawserver):
+ def __init__(self):
+
+ config, files = parseargs([], defaults, 0, 0)
self.config = config
self.response_size = config['response_size']
self.max_give = config['max_give']
@@ -245,8 +236,7 @@ class Tracker(object):
self.favicon = h.read()
h.close()
except:
- print _("**warning** specified favicon file -- %s -- does not exist.") % favicon
- self.rawserver = rawserver
+ log.msg(_("**warning** specified favicon file -- %s -- does not exist.") % favicon)
self.cached = {} # format: infohash: [[time1, l1, s1], [time2, l2, s2], [time3, l3, s3]]
self.cached_t = {} # format: infohash: [time, cache]
self.times = {}
@@ -268,8 +258,7 @@ class Tracker(object):
statefiletemplate(tempstate)
self.state = tempstate
except:
- print _("**warning** statefile %s corrupt; resetting") % \
- self.dfile
+ log.msg(_("**warning** statefile %s corrupt; resetting") % self.dfile)
self.downloads = self.state.setdefault('peers', {})
self.completed = self.state.setdefault('completed', {})
@@ -293,32 +282,10 @@ class Tracker(object):
self.reannounce_interval = config['reannounce_interval']
self.save_dfile_interval = config['save_dfile_interval']
self.show_names = config['show_names']
- rawserver.add_task(self.save_dfile, self.save_dfile_interval)
+ reactor.callLater(self.save_dfile_interval, self.save_dfile)
self.prevtime = time()
self.timeout_downloaders_interval = config['timeout_downloaders_interval']
- rawserver.add_task(self.expire_downloaders, self.timeout_downloaders_interval)
- self.logfile = None
- self.log = None
- if (config['logfile'] != '') and (config['logfile'] != '-'):
- try:
- self.logfile = config['logfile']
- self.log = open(self.logfile,'a')
- sys.stdout = self.log
- print _("# Log Started: "), isotime()
- except:
- print _("**warning** could not redirect stdout to log file: "), sys.exc_info()[0]
-
- if config['hupmonitor']:
- def huphandler(signum, frame, self = self):
- try:
- self.log.close ()
- self.log = open(self.logfile,'a')
- sys.stdout = self.log
- print _("# Log reopened: "), isotime()
- except:
- print _("**warning** could not reopen logfile")
-
- signal.signal(signal.SIGHUP, huphandler)
+ reactor.callLater(self.timeout_downloaders_interval, self.expire_downloaders)
self.allow_get = config['allow_get']
@@ -347,99 +314,6 @@ class Tracker(object):
return is_valid_ipv4(given_ip) and (
not self.only_local_override_ip or is_local_ip(ip) )
- def get_infopage(self):
- try:
- if not self.config['show_infopage']:
- return (404, 'Not Found', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas)
- red = self.config['infopage_redirect']
- if red != '':
- return (302, 'Found', {'Content-Type': 'text/html', 'Location': red},
- '<A HREF="'+red+'">Click Here</A>')
-
- s = StringIO()
- s.write('<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">\n' \
- '<html><head><title>BitTorrent download info</title>\n')
- if self.favicon is not None:
- s.write('<link rel="shortcut icon" href="/favicon.ico">\n')
- s.write('</head>\n<body>\n' \
- '<h3>BitTorrent download info</h3>\n'\
- '<ul>\n'
- '<li><strong>tracker version:</strong> %s</li>\n' \
- '<li><strong>server time:</strong> %s</li>\n' \
- '</ul>\n' % (version, isotime()))
- if self.allowed is not None:
- if self.show_names:
- names = [ (value['name'], infohash)
- for infohash, value in self.allowed.iteritems()]
- else:
- names = [(None, infohash) for infohash in self.allowed]
- else:
- names = [ (None, infohash) for infohash in self.downloads]
- if not names:
- s.write('<p>not tracking any files yet...</p>\n')
- else:
- names.sort()
- tn = 0
- tc = 0
- td = 0
- tt = 0 # Total transferred
- ts = 0 # Total size
- nf = 0 # Number of files displayed
- if self.allowed is not None and self.show_names:
- s.write('<table summary="files" border="1">\n' \
- '<tr><th>info hash</th><th>torrent name</th><th align="right">size</th><th align="right">complete</th><th align="right">downloading</th><th align="right">downloaded</th><th align="right">transferred</th></tr>\n')
- else:
- s.write('<table summary="files">\n' \
- '<tr><th>info hash</th><th align="right">complete</th><th align="right">downloading</th><th align="right">downloaded</th></tr>\n')
- for name, infohash in names:
- l = self.downloads[infohash]
- n = self.completed.get(infohash, 0)
- tn = tn + n
- c = self.seedcount[infohash]
- tc = tc + c
- d = len(l) - c
- td = td + d
- nf = nf + 1
- if self.allowed is not None and self.show_names:
- if self.allowed.has_key(infohash):
- sz = self.allowed[infohash]['length'] # size
- ts = ts + sz
- szt = sz * n # Transferred for this torrent
- tt = tt + szt
- if self.allow_get == 1:
- linkname = '<a href="/file?info_hash=' + quote(infohash) + '">' + name + '</a>'
- else:
- linkname = name
- s.write('<tr><td><code>%s</code></td><td>%s</td><td align="right">%s</td><td align="right">%i</td><td align="right">%i</td><td align="right">%i</td><td align="right">%s</td></tr>\n' \
- % (b2a_hex(infohash), linkname, size_format(sz), c, d, n, size_format(szt)))
- else:
- s.write('<tr><td><code>%s</code></td><td align="right"><code>%i</code></td><td align="right"><code>%i</code></td><td align="right"><code>%i</code></td></tr>\n' \
- % (b2a_hex(infohash), c, d, n))
- ttn = 0
- for i in self.completed.values():
- ttn = ttn + i
- if self.allowed is not None and self.show_names:
- s.write('<tr><td align="right" colspan="2">%i files</td><td align="right">%s</td><td align="right">%i</td><td align="right">%i</td><td align="right">%i/%i</td><td align="right">%s</td></tr>\n'
- % (nf, size_format(ts), tc, td, tn, ttn, size_format(tt)))
- else:
- s.write('<tr><td align="right">%i files</td><td align="right">%i</td><td align="right">%i</td><td align="right">%i/%i</td></tr>\n'
- % (nf, tc, td, tn, ttn))
- s.write('</table>\n' \
- '<ul>\n' \
- '<li><em>info hash:</em> SHA1 hash of the "info" section of the metainfo (*.torrent)</li>\n' \
- '<li><em>complete:</em> number of connected clients with the complete file</li>\n' \
- '<li><em>downloading:</em> number of connected clients still downloading</li>\n' \
- '<li><em>downloaded:</em> reported complete downloads (total: current/all)</li>\n' \
- '<li><em>transferred:</em> torrent size * total downloaded (does not include partial transfers)</li>\n' \
- '</ul>\n')
-
- s.write('</body>\n' \
- '</html>\n')
- return (200, 'OK', {'Content-Type': 'text/html; charset=iso-8859-1'}, s.getvalue())
- except:
- print_exc()
- return (500, 'Internal Server Error', {'Content-Type': 'text/html; charset=iso-8859-1'}, 'Server Error')
-
def scrapedata(self, infohash, return_name = True):
l = self.downloads[infohash]
n = self.completed.get(infohash, 0)
@@ -450,57 +324,6 @@ class Tracker(object):
f['name'] = self.allowed[infohash]['name']
return (f)
- def get_scrape(self, paramslist):
- fs = {}
- if paramslist.has_key('info_hash'):
- if self.config['scrape_allowed'] not in ['specific', 'full']:
- return (400, 'Not Authorized', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'},
- bencode({'failure reason':
- _("specific scrape function is not available with this tracker.")}))
- for infohash in paramslist['info_hash']:
- if self.allowed is not None and infohash not in self.allowed:
- continue
- if infohash in self.downloads:
- fs[infohash] = self.scrapedata(infohash)
- else:
- if self.config['scrape_allowed'] != 'full':
- return (400, 'Not Authorized', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'},
- bencode({'failure reason':
- _("full scrape function is not available with this tracker.")}))
- if self.allowed is not None:
- hashes = self.allowed
- else:
- hashes = self.downloads
- for infohash in hashes:
- fs[infohash] = self.scrapedata(infohash)
-
- return (200, 'OK', {'Content-Type': 'text/plain'}, bencode({'files': fs}))
-
- def get_file(self, infohash):
- if not self.allow_get:
- return (400, 'Not Authorized', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'},
- _("get function is not available with this tracker."))
- if not self.allowed.has_key(infohash):
- return (404, 'Not Found', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas)
- fname = self.allowed[infohash]['file']
- fpath = self.allowed[infohash]['path']
- return (200, 'OK', {'Content-Type': 'application/x-bittorrent',
- 'Content-Disposition': 'attachment; filename=' + fname},
- open(fpath, 'rb').read())
-
- def check_allowed(self, infohash, paramslist):
- if self.allowed is not None:
- if not self.allowed.has_key(infohash):
- return (200, 'Not Authorized', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'},
- bencode({'failure reason':
- _("Requested download is not authorized for use with this tracker.")}))
- if self.config['allowed_controls']:
- if self.allowed[infohash].has_key('failure reason'):
- return (200, 'Not Authorized', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'},
- bencode({'failure reason': self.allowed[infohash]['failure reason']}))
-
- return None
-
def add_data(self, infohash, event, ip, paramslist):
peers = self.downloads.setdefault(infohash, {})
ts = self.times.setdefault(infohash, {})
@@ -558,7 +381,7 @@ class Tracker(object):
peer['nat'] = 0
self.natcheckOK(infohash,myid,ip1,port,left)
else:
- NatCheck(self.connectback_result,infohash,myid,ip1,port,self.rawserver)
+ NatCheck(self.connectback_result,infohash,myid,ip1,port)
else:
peer['nat'] = 2**30
if event == 'completed':
@@ -613,7 +436,7 @@ class Tracker(object):
recheck = True
if recheck:
- NatCheck(self.connectback_result,infohash,myid,ip1,port,self.rawserver)
+ NatCheck(self.connectback_result,infohash,myid,ip1,port)
return rsize
@@ -682,74 +505,6 @@ class Tracker(object):
data['peers'] = peerdata
return data
- def get(self, connection, path, headers):
- ip = connection.get_ip()
-
- nip = get_forwarded_ip(headers)
- if nip and not self.only_local_override_ip:
- ip = nip
-
- paramslist = {}
- def params(key, default = None, l = paramslist):
- if l.has_key(key):
- return l[key][0]
- return default
-
- try:
- (scheme, netloc, path, pars, query, fragment) = urlparse(path)
- if self.uq_broken == 1:
- path = path.replace('+',' ')
- query = query.replace('+',' ')
- path = unquote(path)[1:]
- for s in query.split('&'):
- if s != '':
- i = s.index('=')
- kw = unquote(s[:i])
- paramslist.setdefault(kw, [])
- paramslist[kw] += [unquote(s[i+1:])]
-
- if path == '' or path == 'index.html':
- return self.get_infopage()
- if path == 'scrape':
- return self.get_scrape(paramslist)
- if (path == 'file'):
- return self.get_file(params('info_hash'))
- if path == 'favicon.ico' and self.favicon is not None:
- return (200, 'OK', {'Content-Type' : 'image/x-icon'}, self.favicon)
- if path != 'announce':
- return (404, 'Not Found', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas)
-
- # main tracker function
- infohash = params('info_hash')
- if not infohash:
- raise ValueError, 'no info hash'
-
- notallowed = self.check_allowed(infohash, paramslist)
- if notallowed:
- return notallowed
-
- event = params('event')
-
- rsize = self.add_data(infohash, event, ip, paramslist)
-
- except ValueError, e:
- return (400, 'Bad Request', {'Content-Type': 'text/plain'},
- 'you sent me garbage - ' + str(e))
-
- if params('compact'):
- return_type = 2
- elif params('no_peer_id'):
- return_type = 1
- else:
- return_type = 0
-
- data = self.peerlist(infohash, event=='stopped', not params('left'),
- return_type, rsize)
-
- if paramslist.has_key('scrape'):
- data['scrape'] = self.scrapedata(infohash, False)
-
- return (200, 'OK', {'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, bencode(data))
def natcheckOK(self, infohash, peerid, ip, port, not_seed):
bc = self.becache.setdefault(infohash,[[{}, {}], [{}, {}], [{}, {}]])
@@ -760,9 +515,9 @@ class Tracker(object):
def natchecklog(self, peerid, ip, port, result):
year, month, day, hour, minute, second, a, b, c = localtime(time())
- print '%s - %s [%02d/%3s/%04d:%02d:%02d:%02d] "!natcheck-%s:%i" %i 0 - -' % (
- ip, quote(peerid), day, months[month], year, hour, minute, second,
- ip, port, result)
+ log.msg('%s - %s [%02d/%3s/%04d:%02d:%02d:%02d] "!natcheck-%s:%i" %i 0 - -' % (
+ ip, quote(peerid), day, months[month], year, hour, minute, second,
+ ip, port, result))
def connectback_result(self, result, downloadid, peerid, ip, port):
record = self.downloads.get(downloadid, {}).get(peerid)
@@ -789,13 +544,16 @@ class Tracker(object):
record['nat'] += 1
def save_dfile(self):
- self.rawserver.add_task(self.save_dfile, self.save_dfile_interval)
+ # need to arrange for this to be called just before shutdown
+ log.msg('save_dfile')
+ reactor.callLater(self.save_dfile_interval, self.save_dfile)
h = open(self.dfile, 'wb')
h.write(bencode(self.state))
h.close()
def parse_allowed(self):
- self.rawserver.add_task(self.parse_allowed, self.parse_dir_interval)
+ log.msg('parse_allowed')
+ reactor.callLater(self.parse_dir_interval, self.parse_allowed)
# logging broken .torrent files would be useful but could confuse
# programs parsing log files, so errors are just ignored for now
@@ -828,6 +586,8 @@ class Tracker(object):
del dls[peerid]
def expire_downloaders(self):
+ log.msg('expire_downloaders')
+ reactor.callLater(self.timeout_downloaders_interval, self.expire_downloaders)
for infohash, peertimes in self.times.items():
for myid, t in peertimes.items():
if t < self.prevtime:
@@ -840,26 +600,267 @@ class Tracker(object):
del self.times[key]
del self.downloads[key]
del self.seedcount[key]
- self.rawserver.add_task(self.expire_downloaders, self.timeout_downloaders_interval)
-def track(args):
- if len(args) == 0:
- print formatDefinitions(defaults, 80)
- return
- try:
- config, files = parseargs(args, defaults, 0, 0)
- except ValueError, e:
- print _("error: ") + str(e)
- print _("run with no arguments for parameter explanations")
- return
- file(config['pid'], 'w').write(str(os.getpid()))
- r = RawServer(Event(), config)
- t = Tracker(config, r)
- s = r.create_serversocket(config['port'], config['bind'], True)
- r.start_listening(s, HTTPHandler(t.get, config['min_time_between_log_flushes']))
- r.listen_forever()
- t.save_dfile()
- print _("# Shutting down: ") + isotime()
+class InfoPage(Resource):
+ def __init__(self, tracker):
+ Resource.__init__(self)
+ self.tracker = tracker
+
+ def getChild(self, name, request):
+ if name in ['', 'index.html', 'index.htm']:
+ return self
+ return Resource.getChild(self, name, request)
+
+ def render_GET(self, request):
+ try:
+ if not self.tracker.config['show_infopage']:
+ request.setResponseCode(404, 'Not Found')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return alas
+
+ red = self.tracker.config['infopage_redirect']
+ if red != '':
+ request.redirect(red)
+ request.finish()
+ return server.NOT_DONE_YET
+
+ request.write('<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">\n' \
+ '<html><head><title>BitTorrent download info</title>\n')
+ if self.tracker.favicon is not None:
+ request.write('<link rel="shortcut icon" href="/favicon.ico">\n')
+ request.write('</head>\n<body>\n' \
+ '<h3>BitTorrent download info</h3>\n'\
+ '<ul>\n'
+ '<li><strong>tracker version:</strong> %s</li>\n' \
+ '<li><strong>server time:</strong> %s</li>\n' \
+ '</ul>\n' % (version, isotime()))
+ if self.tracker.allowed is not None:
+ if self.tracker.show_names:
+ names = [ (value['name'], infohash)
+ for infohash, value in self.tracker.allowed.iteritems()]
+ else:
+ names = [(None, infohash) for infohash in self.tracker.allowed]
+ else:
+ names = [ (None, infohash) for infohash in self.tracker.downloads]
+ if not names:
+ request.write('<p>not tracking any files yet...</p>\n')
+ else:
+ names.sort()
+ tn = 0
+ tc = 0
+ td = 0
+ tt = 0 # Total transferred
+ ts = 0 # Total size
+ nf = 0 # Number of files displayed
+ if self.tracker.allowed is not None and self.tracker.show_names:
+ request.write('<table summary="files" border="1">\n' \
+ '<tr><th>info hash</th><th>torrent name</th><th align="right">size</th><th align="right">complete</th><th align="right">downloading</th><th align="right">downloaded</th><th align="right">transferred</th></tr>\n')
+ else:
+ request.write('<table summary="files">\n' \
+ '<tr><th>info hash</th><th align="right">complete</th><th align="right">downloading</th><th align="right">downloaded</th></tr>\n')
+ for name, infohash in names:
+ l = self.tracker.downloads[infohash]
+ n = self.tracker.completed.get(infohash, 0)
+ tn = tn + n
+ c = self.tracker.seedcount[infohash]
+ tc = tc + c
+ d = len(l) - c
+ td = td + d
+ nf = nf + 1
+ if self.tracker.allowed is not None and self.tracker.show_names:
+ if self.tracker.allowed.has_key(infohash):
+ sz = self.tracker.allowed[infohash]['length'] # size
+ ts = ts + sz
+ szt = sz * n # Transferred for this torrent
+ tt = tt + szt
+ if self.tracker.allow_get == 1:
+ linkname = '<a href="/file?info_hash=' + quote(infohash) + '">' + name + '</a>'
+ else:
+ linkname = name
+ request.write('<tr><td><code>%s</code></td><td>%s</td><td align="right">%s</td><td align="right">%i</td><td align="right">%i</td><td align="right">%i</td><td align="right">%s</td></tr>\n' \
+ % (infohash.encode('hex'), linkname, size_format(sz), c, d, n, size_format(szt)))
+ else:
+ request.write('<tr><td><code>%s</code></td><td align="right"><code>%i</code></td><td align="right"><code>%i</code></td><td align="right"><code>%i</code></td></tr>\n' \
+ % (infohash.encode('hex'), c, d, n))
+ ttn = 0
+ for i in self.tracker.completed.values():
+ ttn = ttn + i
+ if self.tracker.allowed is not None and self.tracker.show_names:
+ request.write('<tr><td align="right" colspan="2">%i files</td><td align="right">%s</td><td align="right">%i</td><td align="right">%i</td><td align="right">%i/%i</td><td align="right">%s</td></tr>\n'
+ % (nf, size_format(ts), tc, td, tn, ttn, size_format(tt)))
+ else:
+ request.write('<tr><td align="right">%i files</td><td align="right">%i</td><td align="right">%i</td><td align="right">%i/%i</td></tr>\n'
+ % (nf, tc, td, tn, ttn))
+ request.write('</table>\n' \
+ '<ul>\n' \
+ '<li><em>info hash:</em> SHA1 hash of the "info" section of the metainfo (*.torrent)</li>\n' \
+ '<li><em>complete:</em> number of connected clients with the complete file</li>\n' \
+ '<li><em>downloading:</em> number of connected clients still downloading</li>\n' \
+ '<li><em>downloaded:</em> reported complete downloads (total: current/all)</li>\n' \
+ '<li><em>transferred:</em> torrent size * total downloaded (does not include partial transfers)</li>\n' \
+ '</ul>\n')
+
+ request.write('</body>\n' \
+ '</html>\n')
+ request.finish()
+ return server.NOT_DONE_YET
+
+ except:
+ request.setResponseCode(500, 'Internal Server Error')
+ log.err()
+ return 'Server Error'
+
+class Scrape(Resource):
+ isLeaf = True
+
+ def __init__(self, tracker):
+ Resource.__init__(self)
+ self.tracker = tracker
+
+ def render_GET(self, request):
+ fs = {}
+ if request.args.has_key('info_hash'):
+ if self.tracker.config['scrape_allowed'] not in ['specific', 'full']:
+ request.setResponseCode(400, 'Not Authorized')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return bencode({'failure reason':
+ _("specific scrape function is not available with this tracker.")})
+ for infohash in request.args['info_hash']:
+ if self.tracker.allowed is not None and infohash not in self.tracker.allowed:
+ continue
+ if infohash in self.tracker.downloads:
+ fs[infohash] = self.tracker.scrapedata(infohash)
+ else:
+ if self.tracker.config['scrape_allowed'] != 'full':
+ request.setResponseCode(400, 'Not Authorized')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return bencode({'failure reason':
+ _("full scrape function is not available with this tracker.")})
+ if self.tracker.allowed is not None:
+ hashes = self.tracker.allowed
+ else:
+ hashes = self.tracker.downloads
+ for infohash in hashes:
+ fs[infohash] = self.tracker.scrapedata(infohash)
+
+ request.setHeader('Content-Type', 'text/plain')
+ return bencode({'files': fs})
+
+class File(Resource):
+ isLeaf = True
+
+ def __init__(self, tracker):
+ Resource.__init__(self)
+ self.tracker = tracker
+
+ def render_GET(self, request):
+ if not self.tracker.allow_get:
+ request.setResponseCode(400, 'Not Authorized')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return _("get function is not available with this tracker.")
+
+ infohash = None
+ if request.args.has_key('info_hash'):
+ infohash = request.args['info_hash'][0]
+
+ if not self.tracker.allowed.has_key(infohash):
+ request.setResponseCode(400, 'Not Authorized')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return alas
+
+ fname = self.tracker.allowed[infohash]['file']
+ fpath = self.tracker.allowed[infohash]['path']
+
+ request.setHeader('Content-Type', 'application/x-bittorrent')
+ reuqest.setHeader('Content-Disposition', 'attachment; filename=' + fname)
+ return open(fpath, 'rb').read()
+
+class Announce(Resource):
+ isLeaf = True
+
+ def __init__(self, tracker):
+ Resource.__init__(self)
+ self.tracker = tracker
+
+ def render_GET(self, request):
+ ip = request.getClientIP()
+
+ nip = get_forwarded_ip(request)
+ if nip and not self.tracker.only_local_override_ip:
+ ip = nip
+
+ infohash = request.args.get('info_hash', [None])[0]
+
+ if infohash is None:
+ request.setResponseCode(400, 'Bad Request')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return 'info_hash not specified'
+
+ if self.tracker.allowed is not None:
+ if not self.tracker.allowed.has_key(infohash):
+ # is 200 really right?
+ request.setResponseCode(200, 'Not Authorized')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return bencode({'failure reason':
+ _("Requested download is not authorized for use with this tracker.")})
+
+ if self.tracker.config['allowed_controls']:
+ if self.tracker.allowed[infohash].has_key('failure reason'):
+ # is 200 really right?
+ request.setResponseCode(200, 'Not Authorized')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return bencode({'failure reason': self.tracker.allowed[infohash]['failure reason']})
+
+ event = request.args.get('event', [None])[0]
+
+ rsize = self.tracker.add_data(infohash, event, ip, request.args)
+
+ compact = request.args.get('compact', [None])[0]
+
+ no_peer_id = request.args.get('no_peer_id', [None])[0]
+
+ if compact:
+ return_type = 2
+ elif no_peer_id:
+ return_type = 1
+ else:
+ return_type = 0
+
+ left = request.args.get('left', [None])[0]
+
+ data = self.tracker.peerlist(infohash, event == 'stopped', not left, return_type, rsize)
+
+ if request.args.has_key('scrape'):
+ data['scrape'] = self.tracker.scrapedata(infohash, False)
+
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return bencode(data)
+
+class FavIcon(Resource):
+ isLeaf = True
+
+ def __init__(self, tracker):
+ Resource.__init__(self)
+ self.tracker = tracker
+
+ def render_GET(self, request):
+ if self.tracker.favicon is None:
+ request.setResponseCode(404, 'Not Found')
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Pragma', 'no-cache')
+ return 'Not Found!'
+
+ request.setHeader('Content-Type', 'image/x-icon')
+ return self.tracker.favicon
def size_format(s):
if (s < 1024):
diff --git a/bittorrent-tracker.py b/bittorrent-tracker.py
index 52dace4..4c176da 100755
--- a/bittorrent-tracker.py
+++ b/bittorrent-tracker.py
@@ -12,13 +12,23 @@
# Written by Bram Cohen
-if __name__ == '__main__':
- from BitTorrent.platform import install_translation
- install_translation()
+from BitTorrent.platform import install_translation
+install_translation()
-import os
-from sys import argv
-from BitTorrent.track import track
+from BitTorrent.track import Tracker, InfoPage, Scrape, File, Announce, FavIcon
-if __name__ == '__main__':
- track(argv[1:])
+from twisted.application import internet, service
+from twisted.web import static, server
+
+tracker = Tracker()
+root = InfoPage(tracker)
+root.putChild('scrape', Scrape(tracker))
+root.putChild('file', File(tracker))
+root.putChild('announce', Announce(tracker))
+root.putChild('favicon.ico', FavIcon(tracker))
+
+application = service.Application('NoGooee BitTorrent Tracker')
+site = server.Site(root)
+sc = service.IServiceCollection(application)
+i = internet.TCPServer(8080, site)
+i.setServiceParent(sc)