summaryrefslogtreecommitdiffstats
path: root/khashmir/krpc.py
diff options
context:
space:
mode:
Diffstat (limited to 'khashmir/krpc.py')
-rw-r--r--khashmir/krpc.py240
1 files changed, 240 insertions, 0 deletions
diff --git a/khashmir/krpc.py b/khashmir/krpc.py
new file mode 100644
index 0000000..86c9973
--- /dev/null
+++ b/khashmir/krpc.py
@@ -0,0 +1,240 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from BitTorrent.defer import Deferred
+from BitTorrent.bencode import bencode, bdecode
+import socket
+from BitTorrent.RawServer_magic import Handler
+from BitTorrent.platform import bttime
+import time
+from math import log10
+
+import sys
+from traceback import print_exc
+
+from khash import distance
+from cache import Cache
+from KRateLimiter import KRateLimiter
+from hammerlock import Hammerlock
+
+from const import *
+
+# commands
+TID = 't'
+REQ = 'q'
+RSP = 'r'
+TYP = 'y'
+ARG = 'a'
+ERR = 'e'
+
+class KRPCFailSilently(Exception):
+ pass
+
+class KRPCProtocolError(Exception):
+ pass
+
+class KRPCServerError(Exception):
+ pass
+
+class KRPCSelfNodeError(Exception):
+ pass
+
+class hostbroker(Handler):
+ def __init__(self, server, addr, transport, call_later, max_ul_rate, config, rlcount):
+ self.server = server
+ self.addr = addr
+ self.transport = transport
+ self.rltransport = KRateLimiter(transport, max_ul_rate, call_later, rlcount, config['max_rate_period'])
+ self.call_later = call_later
+ self.connections = Cache(touch_on_access=True)
+ self.hammerlock = Hammerlock(100, call_later)
+ self.expire_connections(loop=True)
+ self.config = config
+ if not self.config.has_key('pause'):
+ self.config['pause'] = False
+
+ def expire_connections(self, loop=False):
+ self.connections.expire(bttime() - KRPC_CONNECTION_CACHE_TIME)
+ if loop:
+ self.call_later(self.expire_connections, KRPC_CONNECTION_CACHE_TIME, (True,))
+
+ def data_came_in(self, addr, datagram):
+ #if addr != self.addr:
+ if not self.config['pause'] and self.hammerlock.check(addr):
+ c = self.connectionForAddr(addr)
+ c.datagramReceived(datagram, addr)
+
+ def connection_lost(self, socket):
+ ## this is like, bad
+ print ">>> connection lost!", socket
+
+ def connectionForAddr(self, addr):
+ if addr == self.addr:
+ raise KRPCSelfNodeError()
+ if not self.connections.has_key(addr):
+ conn = KRPC(addr, self.server, self.transport, self.rltransport, self.call_later)
+ self.connections[addr] = conn
+ else:
+ conn = self.connections[addr]
+ return conn
+
+
+## connection
+class KRPC:
+ noisy = 0
+ def __init__(self, addr, server, transport, rltransport, call_later):
+ self.call_later = call_later
+ self.transport = transport
+ self.rltransport = rltransport
+ self.factory = server
+ self.addr = addr
+ self.tids = {}
+ self.mtid = 0
+ self.pinging = False
+
+ def sendErr(self, addr, tid, code, msg):
+ ## send error
+ out = bencode({TID:tid, TYP:ERR, ERR :(code, msg)})
+ olen = len(out)
+ self.rltransport.sendto(out, 0, addr)
+ return olen
+
+ def datagramReceived(self, str, addr):
+ # bdecode
+ try:
+ msg = bdecode(str)
+ except Exception, e:
+ if self.noisy:
+ print "response decode error: " + `e`, `str`
+ else:
+ #if self.noisy:
+ # print msg
+ # look at msg type
+ if msg[TYP] == REQ:
+ ilen = len(str)
+ # if request
+ # tell factory to handle
+ f = getattr(self.factory ,"krpc_" + msg[REQ], None)
+ msg[ARG]['_krpc_sender'] = self.addr
+ if f and callable(f):
+ try:
+ ret = apply(f, (), msg[ARG])
+ except KRPCFailSilently:
+ pass
+ except KRPCServerError, e:
+ olen = self.sendErr(addr, msg[TID], 202, "Server Error: %s" % e.args[0])
+ except KRPCProtocolError, e:
+ olen = self.sendErr(addr, msg[TID], 204, "Protocol Error: %s" % e.args[0])
+ except Exception, e:
+ print_exc(20)
+ olen = self.sendErr(addr, msg[TID], 202, "Server Error")
+ else:
+ if ret:
+ # make response
+ out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
+ else:
+ out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
+ # send response
+ olen = len(out)
+ self.rltransport.sendto(out, 0, addr)
+
+ else:
+ if self.noisy:
+ #print "don't know about method %s" % msg[REQ]
+ pass
+ # unknown method
+ olen = self.sendErr(addr, msg[TID], *KERR_METHOD_UNKNOWN)
+ if self.noisy:
+ try:
+ ndist = 10 * log10(2**160 * 1.0 / distance(self.factory.node.id, msg[ARG]['id']))
+ ndist = int(ndist)
+ except OverflowError:
+ ndist = 999
+
+ h = None
+ if msg[ARG].has_key('target'):
+ h = msg[ARG]['target']
+ elif msg[ARG].has_key('info_hash'):
+ h = msg[ARG]['info_hash']
+ else:
+ tdist = '-'
+
+ if h != None:
+ try:
+ tdist = 10 * log10(2**160 * 1.0 / distance(self.factory.node.id, h))
+ tdist = int(tdist)
+ except OverflowError:
+ tdist = 999
+
+ t = time.localtime()
+ t = "%2d-%2d-%2d %2d:%2d:%2d" % (t[0], t[1], t[2], t[3], t[4], t[5])
+ print "%s %s %s >>> %s - %s %s %s - %s %s" % (t,
+ msg[ARG]['id'].encode('base64')[:4],
+ addr,
+ self.factory.node.port,
+ ilen,
+ msg[REQ],
+ olen,
+ ndist,
+ tdist)
+ elif msg[TYP] == RSP:
+ # if response
+ # lookup tid
+ if self.tids.has_key(msg[TID]):
+ df = self.tids[msg[TID]]
+ # callback
+ del(self.tids[msg[TID]])
+ df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
+ else:
+ # no tid, this transaction timed out already...
+ pass
+
+ elif msg[TYP] == ERR:
+ # if error
+ # lookup tid
+ if self.tids.has_key(msg[TID]):
+ df = self.tids[msg[TID]]
+ # callback
+ df.errback(msg[ERR])
+ del(self.tids[msg[TID]])
+ else:
+ # day late and dollar short
+ pass
+ else:
+ # unknown message type
+ df = self.tids[msg[TID]]
+ # callback
+ df.errback((KRPC_ERROR_RECEIVED_UNKNOWN, _("received unknown message type")))
+ del(self.tids[msg[TID]])
+
+ def sendRequest(self, method, args):
+ # make message
+ # send it
+ msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
+ self.mtid = (self.mtid + 1) % 256
+ s = bencode(msg)
+ d = Deferred()
+ self.tids[msg[TID]] = d
+ self.call_later(self.timeOut, KRPC_TIMEOUT, (msg[TID],))
+ self.call_later(self._send, 0, (s, d))
+ return d
+
+ def timeOut(self, id):
+ if self.tids.has_key(id):
+ df = self.tids[id]
+ del(self.tids[id])
+ df.errback((KRPC_ERROR_TIMEOUT, _("timeout")))
+
+ def _send(self, s, d):
+ try:
+ self.transport.sendto(s, 0, self.addr)
+ except socket.error:
+ d.errback((KRPC_SOCKET_ERROR, _("socket error")))
+