summaryrefslogtreecommitdiffstats
path: root/BitTorrent/RawServer.py
diff options
context:
space:
mode:
Diffstat (limited to 'BitTorrent/RawServer.py')
-rw-r--r--BitTorrent/RawServer.py501
1 files changed, 0 insertions, 501 deletions
diff --git a/BitTorrent/RawServer.py b/BitTorrent/RawServer.py
deleted file mode 100644
index efa067d..0000000
--- a/BitTorrent/RawServer.py
+++ /dev/null
@@ -1,501 +0,0 @@
-# The contents of this file are subject to the BitTorrent Open Source License
-# Version 1.1 (the License). You may not copy or use this file, in either
-# source code or executable form, except in compliance with the License. You
-# may obtain a copy of the License at http://www.bittorrent.com/license/.
-#
-# Software distributed under the License is distributed on an AS IS basis,
-# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
-# for the specific language governing rights and limitations under the
-# License.
-
-# Written by Bram Cohen, Uoti Urpala
-
-import os
-import sys
-import socket
-import signal
-import struct
-import thread
-from bisect import insort
-from cStringIO import StringIO
-from traceback import print_exc
-from errno import EWOULDBLOCK, ENOBUFS, EINTR
-
-from BitTorrent.platform import bttime
-from BitTorrent import WARNING, CRITICAL, FAQ_URL
-from BitTorrent.defer import Deferred
-
-try:
- from select import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP
- timemult = 1000
-except ImportError:
- from BitTorrent.selectpoll import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP
- timemult = 1
-
-NOLINGER = struct.pack('ii', 1, 0)
-
-
-class Handler(object):
-
- # there is only a semantic difference between "made" and "started".
- # I prefer "started"
- def connection_started(self, s):
- self.connection_made(s)
- def connection_made(self, s):
- pass
-
- def connection_lost(self, s):
- pass
-
- # Maybe connection_lost should just have a default 'None' exception parameter
- def connection_failed(self, addr, exception):
- pass
-
- def connection_flushed(self, s):
- pass
- def data_came_in(self, addr, datagram):
- pass
-
-
-class SingleSocket(object):
-
- def __init__(self, rawserver, sock, handler, context, addr=None):
- self.rawserver = rawserver
- self.socket = sock
- self.handler = handler
- self.buffer = []
- self.last_hit = bttime()
- self.fileno = sock.fileno()
- self.connected = False
- self.context = context
- self.ip = None
- self.port = None
-
-
- if isinstance(addr, basestring):
- # UNIX socket, not really ip
- self.ip = addr
- else:
- peername = (None, None)
- try:
- peername = self.socket.getpeername()
- except socket.error, e:
- # UDP raises (107, 'Transport endpoint is not connected')
- # but so can a TCP socket we just got from start_connection,
- # in which case addr is set and we use it later.
- if (e[0] == 107) and (addr == None):
- # lies.
- # the peer endpoint should be gathered from the
- # tuple passed to data_came_in
- try:
- peername = self.socket.getsockname()
- except socket.error, e:
- pass
-
- # this is awesome!
- # max prefers a value over None, so this is a common case:
- # max(('ip', None), ('ip', 1234)) => ('ip', 1234)
- # or the default case:
- # max(('ip', None), None) => ('ip', None)
- self.ip, self.port = max(peername, addr)
-
-
- def close(self):
- sock = self.socket
- self.socket = None
- self.buffer = []
- del self.rawserver.single_sockets[self.fileno]
- self.rawserver.poll.unregister(sock)
- self.handler = None
- if self.rawserver.config['close_with_rst']:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)
- sock.close()
-
- def shutdown(self, val):
- self.socket.shutdown(val)
-
- def is_flushed(self):
- return len(self.buffer) == 0
-
- def write(self, s):
- assert self.socket is not None
- self.buffer.append(s)
- if len(self.buffer) == 1:
- self.try_write()
-
- def try_write(self):
- if self.connected:
- try:
- while self.buffer != []:
- amount = self.socket.send(self.buffer[0])
- if amount != len(self.buffer[0]):
- if amount != 0:
- self.buffer[0] = self.buffer[0][amount:]
- break
- del self.buffer[0]
- except socket.error, e:
- code, msg = e
- if code != EWOULDBLOCK:
- self.rawserver.dead_from_write.append(self)
- return
- if self.buffer == []:
- self.rawserver.poll.register(self.socket, POLLIN)
- else:
- self.rawserver.poll.register(self.socket, POLLIN | POLLOUT)
-
-
-def default_error_handler(level, message):
- print message
-
-class RawServer(object):
-
- def __init__(self, doneflag, config, noisy=True,
- errorfunc=default_error_handler, tos=0):
- self.config = config
- self.tos = tos
- self.poll = poll()
- # {socket: SingleSocket}
- self.single_sockets = {}
- self.udp_sockets = {}
- self.dead_from_write = []
- self.doneflag = doneflag
- self.noisy = noisy
- self.errorfunc = errorfunc
- self.funcs = []
- self.externally_added_tasks = []
- self.listening_handlers = {}
- self.serversockets = {}
- self.live_contexts = {None : True}
- self.ident = thread.get_ident()
- self.to_start = []
- self.add_task(self.scan_for_timeouts, config['timeout_check_interval'])
- if sys.platform.startswith('win'):
- # Windows doesn't support pipes with select(). Just prevent sleeps
- # longer than a second instead of proper wakeup for now.
- self.wakeupfds = (None, None)
- self._wakeup()
- else:
- self.wakeupfds = os.pipe()
- self.poll.register(self.wakeupfds[0], POLLIN)
-
- def _wakeup(self):
- self.add_task(self._wakeup, 1)
-
- def add_context(self, context):
- self.live_contexts[context] = True
-
- def remove_context(self, context):
- del self.live_contexts[context]
- self.funcs = [x for x in self.funcs if x[3] != context]
-
- def add_task(self, func, delay, args=(), context=None):
- assert thread.get_ident() == self.ident
- assert type(args) == list or type(args) == tuple
- if context in self.live_contexts:
- insort(self.funcs, (bttime() + delay, func, args, context))
-
- def external_add_task(self, func, delay, args=(), context=None):
- assert type(args) == list or type(args) == tuple
- self.externally_added_tasks.append((func, delay, args, context))
- # Wake up the RawServer thread in case it's sleeping in poll()
- if self.wakeupfds[1] is not None:
- os.write(self.wakeupfds[1], 'X')
-
- def scan_for_timeouts(self):
- self.add_task(self.scan_for_timeouts,
- self.config['timeout_check_interval'])
- t = bttime() - self.config['socket_timeout']
- tokill = []
- for s in [s for s in self.single_sockets.values() if s not in self.udp_sockets.keys()]:
- if s.last_hit < t:
- tokill.append(s)
- for k in tokill:
- if k.socket is not None:
- self._close_socket(k)
-
- def create_unixserversocket(filename):
- server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- server.setblocking(0)
- server.bind(filename)
- server.listen(5)
- return server
- create_unixserversocket = staticmethod(create_unixserversocket)
-
- def create_serversocket(port, bind='', reuse=False, tos=0):
- server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if reuse and os.name != 'nt':
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server.setblocking(0)
- if tos != 0:
- try:
- server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos)
- except:
- pass
- server.bind((bind, port))
- server.listen(5)
- return server
- create_serversocket = staticmethod(create_serversocket)
-
- def create_udpsocket(port, bind='', reuse=False, tos=0):
- server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- if reuse and os.name != 'nt':
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server.setblocking(0)
- if tos != 0:
- try:
- server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos)
- except:
- pass
- server.bind((bind, port))
- return server
- create_udpsocket = staticmethod(create_udpsocket)
-
- def start_listening(self, serversocket, handler, context=None):
- self.listening_handlers[serversocket.fileno()] = (handler, context)
- self.serversockets[serversocket.fileno()] = serversocket
- self.poll.register(serversocket, POLLIN)
-
- def start_listening_udp(self, serversocket, handler, context=None):
- self.listening_handlers[serversocket.fileno()] = (handler, context)
- nss = SingleSocket(self, serversocket, handler, context)
- self.single_sockets[serversocket.fileno()] = nss
- self.udp_sockets[nss] = 1
- self.poll.register(serversocket, POLLIN)
-
- def stop_listening(self, serversocket):
- del self.listening_handlers[serversocket.fileno()]
- del self.serversockets[serversocket.fileno()]
- self.poll.unregister(serversocket)
-
- def stop_listening_udp(self, serversocket):
- del self.listening_handlers[serversocket.fileno()]
- del self.single_sockets[serversocket.fileno()]
- l = [s for s in self.udp_sockets.keys() if s.socket == serversocket]
- del self.udp_sockets[l[0]]
- self.poll.unregister(serversocket)
-
- def start_connection(self, dns, handler=None, context=None, do_bind=True):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setblocking(0)
- bindaddr = do_bind and self.config['bind']
- if bindaddr:
- sock.bind((bindaddr, 0))
- if self.tos != 0:
- try:
- sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, self.tos)
- except:
- pass
- try:
- sock.connect_ex(dns)
- except socket.error:
- sock.close()
- raise
- except Exception, e:
- sock.close()
- raise socket.error(str(e))
- self.poll.register(sock, POLLIN)
- s = SingleSocket(self, sock, handler, context, dns)
- self.single_sockets[sock.fileno()] = s
- return s
-
- def _add_pending_connection(self, addr):
- pass
-
- def _remove_pending_connection(self, addr):
- pass
-
- def async_start_connection(self, dns, handler=None, context=None, do_bind=True):
- self.to_start.insert(0, (dns, handler, context, do_bind))
- self._start_connection()
- return True
-
- def _start_connection(self):
- dns, handler, context, do_bind = self.to_start.pop()
- try:
- s = self.start_connection(dns, handler, context, do_bind)
- except Exception, e:
- handler.connection_failed(dns, e)
- else:
- handler.connection_started(s)
-
- def wrap_socket(self, sock, handler, context=None, ip=None, port=None):
- sock.setblocking(0)
- self.poll.register(sock, POLLIN)
- s = SingleSocket(self, sock, handler, context, (ip, port))
- self.single_sockets[sock.fileno()] = s
- return s
-
- # must be called from the main thread
- def install_sigint_handler(self):
- signal.signal(signal.SIGINT, self._handler)
-
- def _handler(self, signum, frame):
- self.external_add_task(self.doneflag.set, 0)
- # Allow pressing ctrl-c multiple times to raise KeyboardInterrupt,
- # in case the program is in an infinite loop
- signal.signal(signal.SIGINT, signal.default_int_handler)
-
- def _handle_events(self, events):
- for sock, event in events:
- if sock in self.serversockets:
- s = self.serversockets[sock]
- if event & (POLLHUP | POLLERR) != 0:
- try:
- self.poll.unregister(s)
- s.close()
- except socket.error, e:
- self.errorfunc(WARNING, _("failed to unregister or close server socket: %s") % str(e))
- self.errorfunc(CRITICAL, _("lost server socket"))
- else:
- handler, context = self.listening_handlers[sock]
- try:
- newsock, addr = s.accept()
- except socket.error, e:
- continue
- try:
- newsock.setblocking(0)
- nss = SingleSocket(self, newsock, handler, context, addr)
- self.single_sockets[newsock.fileno()] = nss
- self.poll.register(newsock, POLLIN)
- self._make_wrapped_call(handler. \
- connection_made, (nss,), context=context)
- except socket.error, e:
- self.errorfunc(WARNING,
- _("Error handling accepted connection: ") +
- str(e))
- else:
- s = self.single_sockets.get(sock)
- if s is None:
- if sock == self.wakeupfds[0]:
- # Another thread wrote this just to wake us up.
- os.read(sock, 1)
- continue
- s.connected = True
- if event & POLLERR:
- self._close_socket(s)
- continue
- if event & (POLLIN | POLLHUP):
- s.last_hit = bttime()
- try:
- data, addr = s.socket.recvfrom(100000)
- except socket.error, e:
- code, msg = e
- if code != EWOULDBLOCK:
- self._close_socket(s)
- continue
- if data == '' and not self.udp_sockets.has_key(s):
- self._close_socket(s)
- else:
- if not self.udp_sockets.has_key(s):
- self._make_wrapped_call(s.handler.data_came_in,
- (s, data), s)
- else:
- self._make_wrapped_call(s.handler.data_came_in,
- (addr, data), s)
-
- # data_came_in could have closed the socket (s.socket = None)
- if event & POLLOUT and s.socket is not None:
- s.try_write()
- if s.is_flushed():
- self._make_wrapped_call(s.handler.connection_flushed,
- (s,), s)
-
- def _pop_externally_added(self):
- while self.externally_added_tasks:
- task = self.externally_added_tasks.pop(0)
- self.add_task(*task)
-
- def listen_forever(self):
- ret = 0
- self.ident = thread.get_ident()
- while not self.doneflag.isSet() and not ret:
- ret = self.listen_once()
-
- def listen_once(self, period=1e9):
- try:
- self._pop_externally_added()
- if self.funcs:
- period = self.funcs[0][0] - bttime()
- if period < 0:
- period = 0
- events = self.poll.poll(period * timemult)
- if self.doneflag.isSet():
- return 0
- while self.funcs and self.funcs[0][0] <= bttime():
- garbage, func, args, context = self.funcs.pop(0)
- self._make_wrapped_call(func, args, context=context)
- self._close_dead()
- self._handle_events(events)
- if self.doneflag.isSet():
- return 0
- self._close_dead()
- except error, e:
- if self.doneflag.isSet():
- return 0
- # I can't find a coherent explanation for what the behavior
- # should be here, and people report conflicting behavior,
- # so I'll just try all the possibilities
- code = None
- if hasattr(e, '__getitem__'):
- code = e[0]
- else:
- code = e
- if code == ENOBUFS:
- # log the traceback so we can see where the exception is coming from
- print_exc(file = sys.stderr)
- self.errorfunc(CRITICAL,
- _("Have to exit due to the TCP stack flaking "
- "out. Please see the FAQ at %s") % FAQ_URL)
- return -1
- elif code in (EINTR,):
- # add other ignorable error codes here
- pass
- else:
- self.errorfunc(CRITICAL, str(e))
- return 0
- except KeyboardInterrupt:
- print_exc()
- return -1
- except:
- data = StringIO()
- print_exc(file=data)
- self.errorfunc(CRITICAL, data.getvalue())
- return 0
-
- def _make_wrapped_call(self, function, args, socket=None, context=None):
- try:
- function(*args)
- except KeyboardInterrupt:
- raise
- except Exception, e: # hopefully nothing raises strings
- # Incoming sockets can be assigned to a particular torrent during
- # a data_came_in call, and it's possible (though not likely) that
- # there could be a torrent-specific exception during the same call.
- # Therefore read the context after the call.
- if socket is not None:
- context = socket.context
- if self.noisy and context is None:
- data = StringIO()
- print_exc(file=data)
- self.errorfunc(CRITICAL, data.getvalue())
- if context is not None:
- context.got_exception(e)
-
- def _close_dead(self):
- while len(self.dead_from_write) > 0:
- old = self.dead_from_write
- self.dead_from_write = []
- for s in old:
- if s.socket is not None:
- self._close_socket(s)
-
- def _close_socket(self, s):
- sock = s.socket.fileno()
- if self.config['close_with_rst']:
- s.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)
- s.socket.close()
- self.poll.unregister(sock)
- del self.single_sockets[sock]
- s.socket = None
- self._make_wrapped_call(s.handler.connection_lost, (s,), s)
- s.handler = None