summaryrefslogtreecommitdiffstats
path: root/BitTorrent/RawServer_twisted.py
diff options
context:
space:
mode:
Diffstat (limited to 'BitTorrent/RawServer_twisted.py')
-rw-r--r--BitTorrent/RawServer_twisted.py771
1 files changed, 0 insertions, 771 deletions
diff --git a/BitTorrent/RawServer_twisted.py b/BitTorrent/RawServer_twisted.py
deleted file mode 100644
index 53fc000..0000000
--- a/BitTorrent/RawServer_twisted.py
+++ /dev/null
@@ -1,771 +0,0 @@
-# The contents of this file are subject to the BitTorrent Open Source License
-# Version 1.1 (the License). You may not copy or use this file, in either
-# source code or executable form, except in compliance with the License. You
-# may obtain a copy of the License at http://www.bittorrent.com/license/.
-#
-# Software distributed under the License is distributed on an AS IS basis,
-# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
-# for the specific language governing rights and limitations under the
-# License.
-
-# Written by Greg Hazel
-
-import os
-import sys
-import socket
-import signal
-import struct
-import thread
-import threading
-from cStringIO import StringIO
-from traceback import print_exc, print_stack
-
-from BitTorrent import BTFailure, WARNING, CRITICAL, FAQ_URL
-
-from twisted.python import threadable
-# needed for twisted 1.3, otherwise the thread safe functions are not thread safe
-threadable.init(1)
-
-noSignals = True
-
-if os.name == 'nt':
- try:
- from twisted.internet import iocpreactor
- iocpreactor.proactor.install()
- noSignals = False
- except:
- # just as limited (if not more) as select, and also (supposedly) buggy
- #try:
- # from twisted.internet import win32eventreactor
- # win32eventreactor.install()
- #except:
- # pass
- pass
-else:
- try:
- from twisted.internet import kqreactor
- kqreactor.install()
- except:
- try:
- from twisted.internet import pollreactor
- pollreactor.install()
- except:
- pass
-
-#the default reactor is select-based, and will be install()ed if another has not
-from twisted.internet import reactor, task, error
-
-# as far as I know, we work with twisted 1.3 and >= 2.0
-#import twisted.copyright
-#if twisted.copyright.version.split('.') < 2:
-# raise ImportError(_("RawServer_twisted requires twisted 2.0.0 or greater"))
-
-from twisted.internet.protocol import DatagramProtocol, Protocol, Factory, ClientFactory
-from twisted.protocols.policies import TimeoutMixin
-
-NOLINGER = struct.pack('ii', 1, 0)
-
-class Handler(object):
-
- # there is only a semantic difference between "made" and "started".
- # I prefer "started"
- def connection_started(self, s):
- self.connection_made(s)
- def connection_made(self, s):
- pass
-
- def connection_lost(self, s):
- pass
-
- # Maybe connection_lost should just have a default 'None' exception parameter
- def connection_failed(self, addr, exception):
- pass
-
- def connection_flushed(self, s):
- pass
- def data_came_in(self, addr, datagram):
- pass
-
-class ConnectionWrapper(object):
- def __init__(self, rawserver, handler, context, tos=0):
- self.dying = False
- self.ip = None
- self.port = None
- self.transport = None
- self.reset_timeout = None
- self.callback_connection = None
-
- self.post_init(rawserver, handler, context)
-
- self.tos = tos
-
- self.buffer = OutputBuffer(self)
-
- def post_init(self, rawserver, handler, context):
- self.rawserver = rawserver
- self.handler = handler
- self.context = context
- if self.rawserver:
- self.rawserver.single_sockets[self] = self
-
- def get_socket(self):
- s = None
- try:
- s = self.transport.getHandle()
- except:
- try:
- # iocpreactor doesn't implement ISystemHandle like it should
- s = self.transport.socket
- except:
- pass
- return s
-
- def attach_transport(self, callback_connection, transport, reset_timeout):
- self.transport = transport
- self.callback_connection = callback_connection
- self.reset_timeout = reset_timeout
-
- try:
- address = self.transport.getPeer()
- except:
- try:
- # udp, for example
- address = self.transport.getHost()
- except:
- if not self.transport.__dict__.has_key("state"):
- self.transport.state = 'NO STATE!'
- sys.stderr.write('UNKNOWN HOST/PEER: %s:%s:%s\n' % (str(self.transport), str(self.transport.state), str(self.handler)))
- print_stack()
- # fallback incase the unknown happens,
- # there's no use raising an exception
- address = ('unknown', -1)
- pass
-
- try:
- self.ip = address.host
- self.port = address.port
- except:
- #unix sockets, for example
- pass
-
- if self.tos != 0:
- s = self.get_socket()
-
- try:
- s.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, self.tos)
- except:
- pass
-
- def sendto(self, packet, flags, addr):
- # all this can go away once we pin down the bug
- if not hasattr(self.transport, "listening"):
- self.rawserver.errorfunc(WARNING, "UDP port never setup properly when asked to write")
- elif not self.transport.listening:
- self.rawserver.errorfunc(WARNING, "UDP port cleaned up already when asked to write")
-
- ret = None
- try:
- ret = self.transport.write(packet, addr)
- except Exception, e:
- self.rawserver.errorfunc(WARNING, "UDP sendto failed: %s" % str(e))
-
- return ret
-
- def write(self, b):
- self.buffer.add(b)
-
- def _flushed(self):
- s = self
- #why do you tease me so?
- if s.handler is not None:
- #calling flushed from the write is bad form
- self.rawserver.add_task(s.handler.connection_flushed, 0, (s,))
-
- def is_flushed(self):
- return self.buffer.is_flushed()
-
- def shutdown(self, how):
- if how == socket.SHUT_WR:
- self.transport.loseWriteConnection()
- self.buffer.stopWriting()
- elif how == socket.SHUT_RD:
- self.transport.stopListening()
- else:
- self.close()
-
- def close(self):
- self.buffer.stopWriting()
-
- if self.rawserver.config['close_with_rst']:
- try:
- s = self.get_socket()
- s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, NOLINGER)
- except:
- pass
-
- if self.rawserver.udp_sockets.has_key(self):
- # udp connections should only call stopListening
- self.transport.stopListening()
- else:
- self.transport.loseConnection()
-
- def _cleanup(self):
-
- self.buffer.connection = None
- del self.buffer
-
- self.handler = None
-
- del self.transport
-
- if self.callback_connection:
- if self.callback_connection.can_timeout:
- self.callback_connection.setTimeout(None)
- self.callback_connection.connection = None
- del self.callback_connection
-
-
-
-class OutputBuffer(object):
-
- def __init__(self, connection):
- self.connection = connection
- self.consumer = None
- self._buffer = StringIO()
-
- def is_flushed(self):
- return (self._buffer.tell() == 0)
-
- def add(self, b):
- # sometimes we get strings, sometimes we get buffers. ugg.
- if (isinstance(b, buffer)):
- b = str(b)
- self._buffer.write(b)
-
- if self.consumer is None:
- self.beginWriting()
-
- def beginWriting(self):
- self.stopWriting()
- self.consumer = self.connection.transport
- self.consumer.registerProducer(self, False)
-
- def stopWriting(self):
- if self.consumer is not None:
- self.consumer.unregisterProducer()
- self.consumer = None
-
- def resumeProducing(self):
- if self.consumer is None:
- return
-
- if self._buffer.tell() > 0:
- self.consumer.write(self._buffer.getvalue())
- self._buffer.seek(0)
- self._buffer.truncate(0)
- self.connection._flushed()
- else:
- self.stopWriting()
-
-
- def pauseProducing(self):
- pass
-
- def stopProducing(self):
- pass
-
-class CallbackConnection(object):
-
- def attachTransport(self, transport, connection, *args):
- s = connection
- if s is None:
- s = ConnectionWrapper(*args)
-
- s.attach_transport(self, transport=transport, reset_timeout=self.optionalResetTimeout)
- self.connection = s
-
- def connectionMade(self):
- s = self.connection
- s.handler.connection_started(s)
- self.optionalResetTimeout()
-
- def connectionLost(self, reason):
- reactor.callLater(0, self.post_connectionLost, reason)
-
- #twisted api inconsistancy workaround
- #sometimes connectionLost is called (not fired) from inside write()
- def post_connectionLost(self, reason):
- #print "Connection Lost", str(reason).split(":")[-1]
- #hack to try and dig up the connection if one was ever made
- if not self.__dict__.has_key("connection"):
- self.connection = self.factory.connection
- if self.connection is not None:
- self.factory.rawserver._remove_socket(self.connection)
-
- def dataReceived(self, data):
- self.optionalResetTimeout()
-
- s = self.connection
- s.rawserver._make_wrapped_call(s.handler.data_came_in,
- (s, data), s)
-
- def datagramReceived(self, data, (host, port)):
- s = self.connection
- s.rawserver._make_wrapped_call(s.handler.data_came_in,
- ((host, port), data), s)
-
- def connectionRefused(self):
- s = self.connection
- dns = (s.ip, s.port)
- reason = _("connection refused")
-
- #print "Connection Refused"
-
- if not s.dying:
- # this might not work - reason is not an exception
- s.handler.connection_failed(dns, reason)
-
- s.dying = True
-
- s.rawserver._remove_socket(s)
-
- def optionalResetTimeout(self):
- if self.can_timeout:
- self.resetTimeout()
-
-class CallbackProtocol(CallbackConnection, TimeoutMixin, Protocol):
-
- def makeConnection(self, transport):
- if isinstance(self.factory, OutgoingConnectionFactory):
- self.factory.rawserver._remove_pending_connection(self.factory.addr)
- self.can_timeout = 1
- self.setTimeout(self.factory.rawserver.config['socket_timeout'])
- self.attachTransport(transport, self.factory.connection, *self.factory.connection_args)
- Protocol.makeConnection(self, transport)
-
-class CallbackDatagramProtocol(CallbackConnection, DatagramProtocol):
-
- def startProtocol(self):
- self.can_timeout = 0
- self.attachTransport(self.transport, self.connection, ())
- DatagramProtocol.startProtocol(self)
-
- def connectionRefused(self):
- # we don't use these at all for udp, so skip the CallbackConnection
- DatagramProtocol.connectionRefused(self)
-
-class OutgoingConnectionFactory(ClientFactory):
-
- def clientConnectionFailed(self, connector, reason):
- #print "Client connection failed", str(reason).split(":")[-1]
- peer = connector.getDestination()
- dns = (peer.host, peer.port)
-
- s = self.connection
- # opt-out
- if not s.dying:
- # this might not work - reason is not an exception
- s.handler.connection_failed(dns, reason)
-
- s.dying = True
-
- self.rawserver._remove_pending_connection(peer.host)
- self.rawserver._remove_socket(self.connection)
-
-def UnimplementedWarning(msg):
- #ok, I think people get the message
- #print "UnimplementedWarning: " + str(msg)
- pass
-
-#Encoder calls stop_listening(socket) then socket.close()
-#to us they mean the same thing, so swallow the second call
-class CloseSwallower:
- def close(self):
- pass
-
-#storage for socket creation requestions, and proxy once the connection is made
-class SocketProxy(object):
- def __init__(self, port, bind, reuse, tos, protocol):
- self.port = port
- self.bind = bind
- self.reuse = reuse
- self.tos = tos
- self.protocol = protocol
- self.connection = None
-
- def __getattr__(self, name):
- try:
- return getattr(self.connection, name)
- except:
- raise AttributeError, name
-
-def default_error_handler(level, message):
- print message
-
-class RawServerMixin(object):
-
- def __init__(self, doneflag, config, noisy=True,
- errorfunc=default_error_handler, tos=0):
- self.doneflag = doneflag
- self.noisy = noisy
- self.errorfunc = errorfunc
- self.config = config
- self.tos = tos
- self.ident = thread.get_ident()
-
- def _make_wrapped_call(self, function, args, wrapper=None, context=None):
- try:
- function(*args)
- except KeyboardInterrupt:
- raise
- except Exception, e: # hopefully nothing raises strings
- # Incoming sockets can be assigned to a particular torrent during
- # a data_came_in call, and it's possible (though not likely) that
- # there could be a torrent-specific exception during the same call.
- # Therefore read the context after the call.
- if wrapper is not None:
- context = wrapper.context
- if self.noisy and context is None:
- data = StringIO()
- print_exc(file=data)
- data.seek(-1)
- self.errorfunc(CRITICAL, data.read())
- if context is not None:
- context.got_exception(e)
-
- # must be called from the main thread
- def install_sigint_handler(self):
- signal.signal(signal.SIGINT, self._handler)
-
- def _handler(self, signum, frame):
- self.external_add_task(self.doneflag.set, 0)
- # Allow pressing ctrl-c multiple times to raise KeyboardInterrupt,
- # in case the program is in an infinite loop
- signal.signal(signal.SIGINT, signal.default_int_handler)
-
-def _sl(x):
- if hasattr(x, "__len__"):
- return str(len(x))
- else:
- return str(x)
-
-class RawServer(RawServerMixin):
-
- def __init__(self, doneflag, config, noisy=True,
- errorfunc=default_error_handler, tos=0):
- RawServerMixin.__init__(self, doneflag, config, noisy, errorfunc, tos)
-
- self.listening_handlers = {}
- self.single_sockets = {}
- self.udp_sockets = {}
- self.live_contexts = {None : 1}
- self.listened = 0
-
- # for connection rate limiting
- self.pending_sockets = {}
- # this can go away when urllib does
- self.pending_sockets_lock = threading.Lock()
-
- #l2 = task.LoopingCall(self._print_connection_count)
- #l2.start(10)
-
- def _log(self, msg):
- f = open("connections.txt", "a")
- print str(msg)
- f.write(str(msg) + "\n")
- f.close()
-
- def _print_connection_count(self):
- c = len(self.single_sockets)
- u = len(self.udp_sockets)
- c -= u
- #s = "Connections(" + str(id(self)) + "): tcp(" + str(c) + ") upd(" + str(u) + ")"
- #self._log(s)
-
- d = dict()
- for s in self.single_sockets:
- state = "None"
- if not s.dying and s.transport:
- try:
- state = s.transport.state
- except:
- state = "has transport"
- else:
- state = "No transport"
- if not d.has_key(state):
- d[state] = 0
- d[state] += 1
- self._log(d)
-
- sizes = "ps(" + _sl(self.pending_sockets)
- sizes += ") lh(" + _sl(self.listening_handlers)
- sizes += ") ss(" + _sl(self.single_sockets)
- sizes += ") us(" + _sl(self.udp_sockets)
- sizes += ") lc(" + _sl(self.live_contexts) + ")"
- self._log(sizes)
-
- def add_context(self, context):
- self.live_contexts[context] = 1
-
- def remove_context(self, context):
- del self.live_contexts[context]
-
- def autoprune(self, f, *a, **kw):
- if self.live_contexts.has_key(kw['context']):
- f(*a, **kw)
-
- def add_task(self, func, delay, args=(), context=None):
- assert thread.get_ident() == self.ident
- assert type(args) == list or type(args) == tuple
-
- #we're going to check again later anyway
- #if self.live_contexts.has_key(context):
- reactor.callLater(delay, self.autoprune, self._make_wrapped_call,
- func, args, context=context)
-
- def external_add_task(self, func, delay, args=(), context=None):
- assert type(args) == list or type(args) == tuple
- reactor.callFromThread(self.add_task, func, delay, args, context)
-
- def create_unixserversocket(filename):
- s = SocketProxy(0, filename, True, 0, 'tcp')
- s.factory = Factory()
-
- if s.reuse == False:
- UnimplementedWarning(_("You asked for reuse to be off when binding. Sorry, I can't do that."))
-
- listening_port = reactor.listenUNIX(s.bind, s.factory)
- listening_port.listening = 1
- s.listening_port = listening_port
-
- return s
- create_unixserversocket = staticmethod(create_unixserversocket)
-
- def create_serversocket(port, bind='', reuse=False, tos=0):
- s = SocketProxy(port, bind, reuse, tos, 'tcp')
- s.factory = Factory()
-
- if s.reuse == False:
- UnimplementedWarning(_("You asked for reuse to be off when binding. Sorry, I can't do that."))
-
- try:
- listening_port = reactor.listenTCP(s.port, s.factory, interface=s.bind)
- except error.CannotListenError, e:
- if e[0] != 0:
- raise e.socketError
- else:
- raise
- listening_port.listening = 1
- s.listening_port = listening_port
-
- return s
- create_serversocket = staticmethod(create_serversocket)
-
- def create_udpsocket(port, bind='', reuse=False, tos=0):
- s = SocketProxy(port, bind, reuse, tos, 'udp')
- s.protocol = CallbackDatagramProtocol()
- c = ConnectionWrapper(None, None, None, tos)
- s.connection = c
- s.protocol.connection = c
-
- if s.reuse == False:
- UnimplementedWarning(_("You asked for reuse to be off when binding. Sorry, I can't do that."))
-
- try:
- listening_port = reactor.listenUDP(s.port, s.protocol, interface=s.bind)
- except error.CannotListenError, e:
- raise e.socketError
- listening_port.listening = 1
- s.listening_port = listening_port
-
- return s
- create_udpsocket = staticmethod(create_udpsocket)
-
- def create_multicastsocket(port, bind='', reuse=False, tos=0):
- s = SocketProxy(port, bind, reuse, tos, 'udp')
- s.protocol = CallbackDatagramProtocol()
- c = ConnectionWrapper(None, None, None, tos)
- s.connection = c
- s.protocol.connection = c
-
- if s.reuse == False:
- UnimplementedWarning("You asked for reuse to be off when binding. Sorry, I can't do that.")
-
- try:
- listening_port = reactor.listenMulticast(s.port, s.protocol, interface=s.bind)
- except error.CannotListenError, e:
- raise e.socketError
- listening_port.listening = 1
- s.listening_port = listening_port
-
- return s
- create_multicastsocket = staticmethod(create_multicastsocket)
-
- def start_listening(self, serversocket, handler, context=None):
- s = serversocket
- s.factory.rawserver = self
- s.factory.protocol = CallbackProtocol
- s.factory.connection = None
- s.factory.connection_args = (self, handler, context, serversocket.tos)
-
- if not s.listening_port.listening:
- s.listening_port.startListening()
- s.listening_port.listening = 1
-
- self.listening_handlers[s] = s.listening_port
-
- #provides a harmless close() method
- s.connection = CloseSwallower()
-
- def start_listening_udp(self, serversocket, handler, context=None):
- s = serversocket
-
- c = s.connection
- c.post_init(self, handler, context)
-
- if not s.listening_port.listening:
- s.listening_port.startListening()
- s.listening_port.listening = 1
-
- self.listening_handlers[serversocket] = s.listening_port
-
- self.udp_sockets[c] = c
-
- def start_listening_multicast(self, serversocket, handler, context=None):
- s = serversocket
-
- c = s.connection
- c.post_init(self, handler, context)
-
- if not s.listening_port.listening:
- s.listening_port.startListening()
- s.listening_port.listening = 1
-
- self.listening_handlers[serversocket] = s.listening_port
-
- self.udp_sockets[c] = c
-
- def stop_listening(self, serversocket):
- listening_port = self.listening_handlers[serversocket]
- try:
- listening_port.stopListening()
- listening_port.listening = 0
- except error.NotListeningError:
- pass
- del self.listening_handlers[serversocket]
-
- def stop_listening_udp(self, serversocket):
- listening_port = self.listening_handlers[serversocket]
- listening_port.stopListening()
- del self.listening_handlers[serversocket]
- del self.udp_sockets[serversocket.connection]
- del self.single_sockets[serversocket.connection]
-
- def stop_listening_multicast(self, serversocket):
- self.stop_listening_udp(serversocket)
-
- def start_connection(self, dns, handler, context=None, do_bind=True):
- addr = dns[0]
- port = int(dns[1])
-
- self._add_pending_connection(addr)
-
- bindaddr = None
- if do_bind:
- bindaddr = self.config['bind']
- if bindaddr and len(bindaddr) >= 0:
- bindaddr = (bindaddr, 0)
- else:
- bindaddr = None
-
- factory = OutgoingConnectionFactory()
- # maybe this can be had from elsewhere
- factory.addr = addr
- factory.protocol = CallbackProtocol
- factory.rawserver = self
-
- c = ConnectionWrapper(self, handler, context, self.tos)
-
- factory.connection = c
- factory.connection_args = ()
-
- connector = reactor.connectTCP(addr, port, factory, bindAddress=bindaddr)
-
- self.single_sockets[c] = c
- return c
-
- def _add_pending_connection(self, addr):
- # the XP connection rate limiting is unique at the IP level
- assert isinstance(addr, str)
- self.pending_sockets_lock.acquire()
- self.__add_pending_connection(addr)
- self.pending_sockets_lock.release()
-
- def __add_pending_connection(self, addr):
- if addr not in self.pending_sockets:
- self.pending_sockets[addr] = 1
- else:
- self.pending_sockets[addr] += 1
-
- def _remove_pending_connection(self, addr):
- self.pending_sockets_lock.acquire()
- self.__remove_pending_connection(addr)
- self.pending_sockets_lock.release()
-
- def __remove_pending_connection(self, addr):
- self.pending_sockets[addr] -= 1
- if self.pending_sockets[addr] <= 0:
- del self.pending_sockets[addr]
-
- def async_start_connection(self, dns, handler, context=None, do_bind=True):
-
- # the XP connection rate limiting is unique at the IP level
- addr = dns[0]
- if (len(self.pending_sockets) >= self.config['max_incomplete'] and
- addr not in self.pending_sockets):
- return False
-
- self.start_connection(dns, handler, context, do_bind)
- return True
-
-
- def listen_forever(self):
- self.ident = thread.get_ident()
- if self.listened:
- UnimplementedWarning(_("listen_forever() should only be called once per reactor."))
- self.listened = 1
-
- l = task.LoopingCall(self.stop)
- l.start(1)#, now = False)
-
- if noSignals:
- reactor.run(installSignalHandlers=False)
- else:
- reactor.run()
-
- def listen_once(self, period=1e9):
- UnimplementedWarning(_("listen_once() might not return until there is activity, and might not process the event you want. Use listen_forever()."))
- reactor.iterate(period)
-
- def stop(self):
- if (self.doneflag.isSet()):
-
- for connection in self.single_sockets.values():
- try:
- #I think this still sends all the data
- connection.close()
- except:
- pass
-
- reactor.suggestThreadPoolSize(0)
- reactor.stop()
-
- def _remove_socket(self, s):
- # opt-out
- if not s.dying:
- self._make_wrapped_call(s.handler.connection_lost, (s,), s)
-
- s._cleanup()
-
- del self.single_sockets[s]
- \ No newline at end of file