# someday: http://files.dns-sd.org/draft-nat-port-mapping.txt # today: http://www.upnp.org/ debug = False import sys import socket import os if os.name == 'nt': import pywintypes import win32com.client has_set = False try: # python 2.4 s = set() del s has_set = True except NameError: try: # python 2.3 from sets import Set set = Set has_set = True except ImportError: # python 2.2 pass from BitTorrent import app_name, defer from BitTorrent import INFO, WARNING, ERROR from BitTorrent.platform import os_version from BitTorrent.RawServer_magic import RawServer, Handler from BitTorrent.BeautifulSupe import BeautifulSupe, Tag from urllib2 import URLError, HTTPError, Request #bleh from urllib import urlopen, FancyURLopener, addinfourl from httplib import HTTPResponse import threading import Queue import urlparse import random from traceback import print_stack, print_tb, print_exc def UnsupportedWarning(logfunc, s): logfunc(WARNING, "NAT Traversal warning " + ("(%s: %s)." % (os_version, s))) def UPNPError(logfunc, s): logfunc(ERROR, "UPnP ERROR: " + ("(%s: %s)." % (os_version, s))) class UPnPException(Exception): pass __host_ip = None import thread def get_host_ip(): global __host_ip if __host_ip is not None: return __host_ip #try: # ip = socket.gethostbyname(socket.gethostname()) #except socket.error, e: # mac sometimes throws an error, so they can just wait. # plus, complicated /etc/hosts will return invalid IPs try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(("bittorrent.com", 80)) endpoint = s.getsockname() __host_ip = endpoint[0] except socket.error, e: __host_ip = socket.gethostbyname(socket.gethostname()) return __host_ip class InfoFileHandle(object): def __init__(self, logfunc): self.logfunc = logfunc def write(self, s): self.logfunc(INFO, s) class NATEventLoop(threading.Thread): def __init__(self, logfunc): threading.Thread.__init__(self) self.log = InfoFileHandle(logfunc) self.queue = Queue.Queue() self.killswitch = 0 self.setDaemon(True) def run(self): while (self.killswitch == 0): event = self.queue.get() try: (f, a, kw) = event f(*a, **kw) except: # sys can be none during interpritter shutdown if sys is None: break # this prints the whole thing. print_exc(file = self.log) # this prints just the traceback to the application log #print_tb(sys.exc_info()[2], file = self.log) # this just prints the exception #self.logfunc(INFO, str(sys.exc_info()[0]) + ": " + str(sys.exc_info()[1].__str__())) class NatTraverser(object): def __init__(self, rawserver, logfunc): self.rawserver = rawserver def log_severity_filter(level, s, optional=True): global debug if level >= ERROR or debug or not optional: logfunc(level, s) self.logfunc = log_severity_filter self.register_requests = [] self.unregister_requests = [] self.list_requests = [] self.service = None self.services = [] self.current_service = 0 if self.rawserver.config['upnp']: if os.name == 'nt': self.services.append(WindowsUPnP) self.services.append(ManualUPnP) self.event_loop = NATEventLoop(self.logfunc) self.event_loop.start() self.resume_init_services() def add_task(self, f, *a, **kw): self.event_loop.queue.put((f, a, kw)) def init_services(self): # this loop is a little funny so a service can resume the init if it fails later if not self.rawserver.config['upnp']: return while self.current_service < len(self.services): service = self.services[self.current_service] self.current_service += 1 try: self.logfunc(INFO, ("Trying: %s" % service.__name__)) service(self) break except Exception, e: self.logfunc(WARNING, str(e)) else: UnsupportedWarning(self.logfunc, "Unable to map a port using any service.") def resume_init_services(self): self.add_task(self.init_services) def attach_service(self, service): self.logfunc(INFO, ("Using: %s" % type(service).__name__)) self.service = service self.add_task(self._flush_queue) def detach_service(self, service): if service != self.service: self.logfunc(ERROR, ("Service: %s is not in use!" % type(service).__name__)) return self.logfunc(INFO, ("Detached: %s" % type(service).__name__)) self.service = None def _flush_queue(self): if self.service: for mapping in self.register_requests: self.add_task(self.service.safe_register_port, mapping) self.register_requests = [] for request in self.unregister_requests: # unregisters can block, because they occur at shutdown self.service.unregister_port(*request) self.unregister_requests = [] for request in self.list_requests: self.add_task(self._list_ports, request) self.list_requests = [] def register_port(self, external_port, internal_port, protocol, host = None, service_name = None): mapping = UPnPPortMapping(external_port, internal_port, protocol, host, service_name) self.register_requests.append(mapping) self.add_task(self._flush_queue) return mapping.d def unregister_port(self, external_port, protocol): self.unregister_requests.append((external_port, protocol)) # unregisters can block, because they occur at shutdown self._flush_queue() def _list_ports(self, d): matches = self.service._list_ports() d.callback(matches) def list_ports(self): d = defer.Deferred() self.list_requests.append(d) self.add_task(self._flush_queue) return d class NATBase(object): def __init__(self, logfunc): self.logfunc = logfunc self.log = InfoFileHandle(logfunc) def safe_register_port(self, new_mapping): # check for the host now, while we're in the thread and before # we need to read it. new_mapping.populate_host() self.logfunc(INFO, "You asked for: " + str(new_mapping)) new_mapping.original_external_port = new_mapping.external_port mappings = self._list_ports() used_ports = [] for mapping in mappings: #only consider ports which match the same protocol if mapping.protocol == new_mapping.protocol: # look for exact matches if (mapping.host == new_mapping.host and mapping.internal_port == new_mapping.internal_port): # the service name could not match, that's ok. new_mapping.d.callback(mapping.external_port) self.logfunc(INFO, "Already effectively mapped: " + str(new_mapping), optional=False) return # otherwise, add it to the list of used external ports used_ports.append(mapping.external_port) if has_set: used_ports = set(used_ports) if (not has_set) or (len(used_ports) < 1000): # for small sets we can just guess around a little while new_mapping.external_port in used_ports: new_mapping.external_port += random.randint(1, 10) # maybe this happens, I really doubt it if new_mapping.external_port > 65535: new_mapping.external_port = 1025 else: # for larger sets we don't want to guess forever all_ports = set(range(1024, 65535)) free_ports = all_ports - used_ports new_mapping.external_port = random.choice(free_ports) self.logfunc(INFO, "I'll give you: " + str(new_mapping)) self.register_port(new_mapping) def register_port(self, port): pass def unregister_port(self, external_port, protocol): pass def _list_ports(self): pass class UPnPPortMapping(object): def __init__(self, external_port, internal_port, protocol, host = None, service_name = None): self.external_port = int(external_port) self.internal_port = int(internal_port) self.protocol = protocol self.host = host if service_name: self.service_name = service_name else: self.service_name = app_name self.d = defer.Deferred() def populate_host(self): # throw out '' or None or ints, also look for semi-valid IPs if (not isinstance(self.host, str)) or (self.host.count('.') < 3): self.host = get_host_ip() def __str__(self): return "%s %s external:%d %s:%d" % (self.service_name, self.protocol, self.external_port, self.host, self.internal_port) def VerifySOAPResponse(request, response): if response.code != 200: raise HTTPError(request.get_full_url(), response.code, str(response.msg) + " (unexpected SOAP response code)", response.info(), response) data = response.read() bs = BeautifulSupe(data) soap_response = bs.scour("m:", "Response") if not soap_response: # maybe I should read the SOAP spec. soap_response = bs.scour("u:", "Response") if not soap_response: raise HTTPError(request.get_full_url(), response.code, str(response.msg) + " (incorrect SOAP response method)", response.info(), response) return soap_response[0] def SOAPResponseToDict(soap_response): result = {} for tag in soap_response.child_elements(): value = None if tag.contents: value = str(tag.contents[0]) result[tag.name] = value return result def SOAPErrorToString(response): if not isinstance(response, Exception): data = response.read() bs = BeautifulSupe(data) error = bs.first('errorDescription') if error: return str(error.contents[0]) return str(response) _urlopener = None def urlopen_custom(req, rawserver): global _urlopener if not _urlopener: opener = FancyURLopener() _urlopener = opener #remove User-Agent del _urlopener.addheaders[:] if not isinstance(req, str): #for header in r.headers: # _urlopener.addheaders.append((header, r.headers[header])) #return _urlopener.open(r.get_full_url(), r.data) # All this has to be done manually, since httplib and urllib 1 and 2 # add headers to the request that some routers do not accept. # A minimal, functional request includes the headers: # Content-Length # Soapaction # I have found the following to be specifically disallowed: # User-agent # Connection # Accept-encoding s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(req.get_full_url()) if not scheme.startswith("http"): raise ValueError("UPnP URL scheme is not http: " + req.get_full_url()) if len(path) == 0: path = '/' if netloc.count(":") > 0: host, port = netloc.split(':', 1) try: port = int(port) except: raise ValueError("UPnP URL port is not int: " + req.get_full_url()) else: host = netloc port = 80 header_str = '' data = '' method = '' header_str = " " + path + " HTTP/1.0\r\n" if req.has_data(): method = 'POST' header_str = method + header_str header_str += "Content-Length: " + str(len(req.data)) + "\r\n" data = req.data + "\r\n" else: method = 'GET' header_str = method + header_str header_str += "Host: " + host + ":" + str(port) + "\r\n" for header in req.headers: header_str += header + ": " + str(req.headers[header]) + "\r\n" header_str += "\r\n" data = header_str + data try: rawserver._add_pending_connection(host) s.connect((host, port)) finally: rawserver._remove_pending_connection(host) s.send(data) r = HTTPResponse(s, method=method) r.begin() r.recv = r.read fp = socket._fileobject(r) resp = addinfourl(fp, r.msg, req.get_full_url()) resp.code = r.status resp.msg = r.reason return resp return _urlopener.open(req) class ManualUPnP(NATBase, Handler): upnp_addr = ('239.255.255.250', 1900) search_string = ('M-SEARCH * HTTP/1.1\r\n' + 'Host:239.255.255.250:1900\r\n' + 'ST:urn:schemas-upnp-org:device:InternetGatewayDevice:1\r\n' + 'Man:"ssdp:discover"\r\n' + 'MX:3\r\n' + '\r\n') # if you think for one second that I'm going to implement SOAP in any fashion, you're crazy get_mapping_template = ('' + '' + '' + '' + '%d' + '' + '' + '') add_mapping_template = ('' + '' + '' + '' + '1' + '' + '0' + '%d' + '%d' + '%s' + '%s' + '%s' + '' + '' + '') delete_mapping_template = ('' + '' + '' + '' + '' + '%d' + '%s' + '' + '' + '') def _pretify(self, body): # I actually found a router that needed one tag per line body = body.replace('><', '>\r\n<') body = body.encode('utf-8') return body def _build_get_mapping_request(self, pmi): body = (self.get_mapping_template % (pmi)) body = self._pretify(body) headers = {'SOAPAction': '"urn:schemas-upnp-org:service:WANIPConnection:1#' + 'GetGenericPortMappingEntry"'} return Request(self.controlURL, body, headers) def _build_add_mapping_request(self, mapping): body = (self.add_mapping_template % (mapping.internal_port, mapping.external_port, mapping.protocol, mapping.host, mapping.service_name)) body = self._pretify(body) headers = {'SOAPAction': '"urn:schemas-upnp-org:service:WANIPConnection:1#' + 'AddPortMapping"'} return Request(self.controlURL, body, headers) def _build_delete_mapping_request(self, external_port, protocol): body = (self.delete_mapping_template % (external_port, protocol)) body = self._pretify(body) headers = {'SOAPAction': '"urn:schemas-upnp-org:service:WANIPConnection:1#' + 'DeletePortMapping"'} return Request(self.controlURL, body, headers) def __init__(self, traverser): NATBase.__init__(self, traverser.logfunc) self.controlURL = None self.transport = None self.traverser = traverser self.rawserver = traverser.rawserver # this service can only be provided if rawserver supports multicast if not hasattr(self.rawserver, "create_multicastsocket"): raise AttributeError, "rawserver.create_multicastsocket" self.rawserver.external_add_task(self.begin_discovery, 0, ()) def begin_discovery(self): # bind to an available port, and join the multicast group for p in xrange(self.upnp_addr[1], self.upnp_addr[1]+5000): try: # Original RawServer cannot do this! s = self.rawserver.create_multicastsocket(p, get_host_ip()) self.transport = s self.rawserver.start_listening_multicast(s, self) s.listening_port.joinGroup(self.upnp_addr[0], socket.INADDR_ANY) break except socket.error, e: pass if not self.transport: # resume init services, because we couldn't bind to a port self.traverser.resume_init_services() else: self.transport.sendto(self.search_string, 0, self.upnp_addr) self.transport.sendto(self.search_string, 0, self.upnp_addr) self.rawserver.add_task(self._discovery_timedout, 6, ()) def _discovery_timedout(self): if self.transport: self.logfunc(WARNING, "Discovery timed out") self.rawserver.stop_listening_multicast(self.transport) self.transport = None # resume init services, because we know we've failed self.traverser.resume_init_services() def register_port(self, mapping): request = self._build_add_mapping_request(mapping) try: response = urlopen_custom(request, self.rawserver) response = VerifySOAPResponse(request, response) mapping.d.callback(mapping.external_port) self.logfunc(INFO, "registered: " + str(mapping), optional=False) except Exception, e: #HTTPError, URLError, BadStatusLine, you name it. error = SOAPErrorToString(e) mapping.d.errback(error) def unregister_port(self, external_port, protocol): request = self._build_delete_mapping_request(external_port, protocol) try: response = urlopen_custom(request, self.rawserver) response = VerifySOAPResponse(request, response) self.logfunc(INFO, ("unregisterd: %s, %s" % (external_port, protocol)), optional=False) except Exception, e: #HTTPError, URLError, BadStatusLine, you name it. error = SOAPErrorToString(e) self.logfunc(ERROR, error) def data_came_in(self, addr, datagram): if self.transport is None: return statusline, response = datagram.split('\r\n', 1) httpversion, statuscode, reasonline = statusline.split(None, 2) if (not httpversion.startswith('HTTP')) or (statuscode != '200'): return headers = response.split('\r\n') location = None for header in headers: prefix = 'location:' if header.lower().startswith(prefix): location = header[len(prefix):] location = location.strip() if location: self.rawserver.stop_listening_multicast(self.transport) self.transport = None self.traverser.add_task(self._got_location, location) def _got_location(self, location): if self.controlURL is not None: return URLBase = location data = urlopen_custom(location, self.rawserver).read() bs = BeautifulSupe(data) URLBase_tag = bs.first('URLBase') if URLBase_tag and URLBase_tag.contents: URLBase = str(URLBase_tag.contents[0]) wanservices = bs.fetch('service', dict(serviceType= 'urn:schemas-upnp-org:service:WANIPConnection:')) wanservices += bs.fetch('service', dict(serviceType= 'urn:schemas-upnp-org:service:WANPPPConnection:')) for service in wanservices: controlURL = service.get('controlURL') if controlURL: self.controlURL = urlparse.urljoin(URLBase, controlURL) break if self.controlURL is None: # resume init services, because we know we've failed self.traverser.resume_init_services() return # attach service, so the queue gets flushed self.traverser.attach_service(self) def _list_ports(self): mappings = [] index = 0 if self.controlURL is None: raise UPnPException("ManualUPnP is not prepared") while True: request = self._build_get_mapping_request(index) try: response = urlopen_custom(request, self.rawserver) soap_response = VerifySOAPResponse(request, response) results = SOAPResponseToDict(soap_response) mapping = UPnPPortMapping(results['NewExternalPort'], results['NewInternalPort'], results['NewProtocol'], results['NewInternalClient'], results['NewPortMappingDescription']) mappings.append(mapping) index += 1 except URLError, e: # SpecifiedArrayIndexInvalid, for example break except (HTTPError, BadStatusLine), e: self.logfunc(ERROR, ("list_ports failed with: %s" % (e))) return mappings class WindowsUPnPException(UPnPException): def __init__(self, msg, *args): msg += " (%s)" % os_version a = [msg] + list(args) UPnPException.__init__(self, *a) class WindowsUPnP(NATBase): def __init__(self, traverser): NATBase.__init__(self, traverser.logfunc) self.upnpnat = None self.port_collection = None self.traverser = traverser win32com.client.pythoncom.CoInitialize() try: self.upnpnat = win32com.client.Dispatch("HNetCfg.NATUPnP") except pywintypes.com_error, e: if (e[2][5] == -2147221005): raise WindowsUPnPException("invalid class string") else: raise try: self.port_collection = self.upnpnat.StaticPortMappingCollection if self.port_collection is None: raise WindowsUPnPException("none port_collection") except pywintypes.com_error, e: #if e[1].lower() == "exception occurred.": if (e[2][5] == -2147221164): #I think this is Class Not Registered #it happens on Windows 98 after the XP ICS wizard has been run raise WindowsUPnPException("exception occurred, class not registered") else: raise # attach service, so the queue gets flushed self.traverser.attach_service(self) def register_port(self, mapping): try: self.port_collection.Add(mapping.external_port, mapping.protocol, mapping.internal_port, mapping.host, True, mapping.service_name) self.logfunc(INFO, "registered: " + str(mapping), optional=False) mapping.d.callback(mapping.external_port) except pywintypes.com_error, e: # host == 'fake' or address already bound #if (e[2][5] == -2147024726): # host == '', or I haven't a clue #e.args[0] == -2147024894 #mapping.d.errback(e) # detach self so the queue isn't flushed self.traverser.detach_service(self) if hasattr(mapping, 'original_external_port'): mapping.external_port = mapping.original_external_port del mapping.original_external_port # push this mapping back on the queue self.traverser.register_requests.append(mapping) # resume init services, because we know we've failed self.traverser.resume_init_services() def unregister_port(self, external_port, protocol): try: self.port_collection.Remove(external_port, protocol) self.logfunc(INFO, ("unregisterd: %s, %s" % (external_port, protocol)), optional=False) except pywintypes.com_error, e: if (e[2][5] == -2147352567): UPNPError(self.logfunc, ("Port %d:%s not bound" % (external_port, protocol))) elif (e[2][5] == -2147221008): UPNPError(self.logfunc, ("Port %d:%s is bound and is not ours to remove" % (external_port, protocol))) elif (e[2][5] == -2147024894): UPNPError(self.logfunc, ("Port %d:%s not bound (2)" % (external_port, protocol))) else: raise def _list_ports(self): mappings = [] try: for mp in self.port_collection: mapping = UPnPPortMapping(mp.ExternalPort, mp.InternalPort, mp.Protocol, mp.InternalClient, mp.Description) mappings.append(mapping) except pywintypes.com_error, e: # it's the "for mp in self.port_collection" iter that can throw # an exception. # com_error: (-2147220976, 'The owner of the PerUser subscription is # not logged on to the system specified', # None, None) pass return mappings