diff options
Diffstat (limited to 'khashmir')
-rw-r--r-- | khashmir/KRateLimiter.py | 75 | ||||
-rw-r--r-- | khashmir/__init__.py | 0 | ||||
-rw-r--r-- | khashmir/actions.py | 349 | ||||
-rw-r--r-- | khashmir/cache.py | 52 | ||||
-rw-r--r-- | khashmir/const.py | 74 | ||||
-rw-r--r-- | khashmir/hammerlock.py | 37 | ||||
-rw-r--r-- | khashmir/inserter.py | 49 | ||||
-rw-r--r-- | khashmir/khash.py | 120 | ||||
-rw-r--r-- | khashmir/khashmir.py | 442 | ||||
-rw-r--r-- | khashmir/knet.py | 76 | ||||
-rw-r--r-- | khashmir/knode.py | 82 | ||||
-rw-r--r-- | khashmir/krpc.py | 240 | ||||
-rw-r--r-- | khashmir/kstore.py | 119 | ||||
-rw-r--r-- | khashmir/ktable.py | 338 | ||||
-rw-r--r-- | khashmir/node.py | 95 | ||||
-rw-r--r-- | khashmir/setup.py | 70 | ||||
-rw-r--r-- | khashmir/test.py | 21 | ||||
-rw-r--r-- | khashmir/test_khashmir.py | 166 | ||||
-rw-r--r-- | khashmir/test_krpc.py | 161 | ||||
-rw-r--r-- | khashmir/test_kstore.py | 91 | ||||
-rw-r--r-- | khashmir/unet.py | 84 | ||||
-rw-r--r-- | khashmir/util.py | 69 | ||||
-rw-r--r-- | khashmir/utkhashmir.py | 224 |
23 files changed, 3034 insertions, 0 deletions
diff --git a/khashmir/KRateLimiter.py b/khashmir/KRateLimiter.py new file mode 100644 index 0000000..a9a9cdc --- /dev/null +++ b/khashmir/KRateLimiter.py @@ -0,0 +1,75 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from BitTorrent.platform import bttime as time +from BitTorrent.CurrentRateMeasure import Measure +from const import * +from random import randrange, shuffle +from traceback import print_exc + +class KRateLimiter: + # special rate limiter that drops entries that have been sitting in the queue for longer than self.age seconds + # by default we toss anything that has less than 5 seconds to live + def __init__(self, transport, rate, call_later, rlcount, rate_period, age=(KRPC_TIMEOUT - 5)): + self.q = [] + self.transport = transport + self.rate = rate + self.curr = 0 + self.running = False + self.age = age + self.last = 0 + self.call_later = call_later + self.rlcount = rlcount + self.measure = Measure(rate_period) + self.sent=self.dropped=0 + if self.rate == 0: + self.rate = 1e10 + + def sendto(self, s, i, addr): + self.q.append((time(), (s, i, addr))) + if not self.running: + self.run(check=True) + + def run(self, check=False): + t = time() + self.expire(t) + self.curr -= (t - self.last) * self.rate + self.last = t + if check: + self.curr = max(self.curr, 0 - self.rate) + + shuffle(self.q) + while self.q and self.curr <= 0: + x, tup = self.q.pop() + size = len(tup[0]) + self.curr += size + try: + self.transport.sendto(*tup) + self.sent+=1 + self.rlcount(size) + self.measure.update_rate(size) + except: + if tup[2][1] != 0: + print ">>> sendto exception", tup + print_exc() + self.q.sort() + if self.q or self.curr > 0: + self.running = True + # sleep for at least a half second + self.call_later(self.run, max(self.curr / self.rate, 0.5)) + else: + self.running = False + + def expire(self, t=time()): + if self.q: + expire_time = t - self.age + while self.q and self.q[0][0] < expire_time: + self.q.pop(0) + self.dropped+=1 diff --git a/khashmir/__init__.py b/khashmir/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/khashmir/__init__.py diff --git a/khashmir/actions.py b/khashmir/actions.py new file mode 100644 index 0000000..2bbe94b --- /dev/null +++ b/khashmir/actions.py @@ -0,0 +1,349 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from BitTorrent.platform import bttime as time + +import const + +from khash import intify +from ktable import KTable, K +from util import unpackNodes +from krpc import KRPCProtocolError, KRPCSelfNodeError +from bisect import insort + +class NodeWrap(object): + def __init__(self, node, target): + self.num = target + self.node = node + + def __cmp__(self, o): + """ this function is for sorting nodes relative to the ID we are looking for """ + x, y = self.num ^ o.num, self.num ^ self.node.num + if x > y: + return 1 + elif x < y: + return -1 + return 0 + +class ActionBase(object): + """ base class for some long running asynchronous proccesses like finding nodes or values """ + def __init__(self, table, target, callback, callLater): + self.table = table + self.target = target + self.callLater = callLater + self.num = intify(target) + self.found = {} + self.foundq = [] + self.queried = {} + self.queriedip = {} + self.answered = {} + self.callback = callback + self.outstanding = 0 + self.finished = 0 + + def sort(self, a, b): + """ this function is for sorting nodes relative to the ID we are looking for """ + x, y = self.num ^ a.num, self.num ^ b.num + if x > y: + return 1 + elif x < y: + return -1 + return 0 + + def shouldQuery(self, node): + if node.id == self.table.node.id: + return False + elif (node.host, node.port) not in self.queriedip and node.id not in self.queried: + self.queriedip[(node.host, node.port)] = 1 + self.queried[node.id] = 1 + return True + return False + + def _cleanup(self): + self.foundq = None + self.found = None + self.queried = None + self.queriedip = None + + def goWithNodes(self, t): + pass + + + +FIND_NODE_TIMEOUT = 15 + +class FindNode(ActionBase): + """ find node action merits it's own class as it is a long running stateful process """ + def handleGotNodes(self, dict): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + sender = {'id' : dict["id"]} + sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] + sender = self.table.Node().initWithDict(sender) + try: + l = unpackNodes(dict.get("nodes", [])) + if not self.answered.has_key(sender.id): + self.answered[sender.id] = sender + except: + l = [] + self.table.invalidateNode(sender) + + if self.finished: + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + for node in l: + n = self.table.Node().initWithDict(node) + if not self.found.has_key(n.id): + self.found[n.id] = n + insort(self.foundq, NodeWrap(n, self.num)) + self.table.insertNode(n, contacted=0) + self.schedule() + + def schedule(self): + """ + send messages to new peers, if necessary + """ + if self.finished: + return + l = [wrapper.node for wrapper in self.foundq[:K]] + for node in l: + if node.id == self.target: + self.finished=1 + return self.callback([node]) + if self.shouldQuery(node): + #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT + try: + df = node.findNode(self.target, self.table.node.id) + except KRPCSelfNodeError: + pass + else: + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done!! + self.finished=1 + self._cleanup() + self.callLater(self.callback, 0, (l[:K],)) + + def makeMsgFailed(self, node): + return self._defaultGotNodes + + def _defaultGotNodes(self, err): + self.outstanding = self.outstanding - 1 + self.schedule() + + def goWithNodes(self, nodes): + """ + this starts the process, our argument is a transaction with t.extras being our list of nodes + it's a transaction since we got called from the dispatcher + """ + for node in nodes: + if node.id == self.table.node.id: + continue + else: + self.found[node.id] = node + insort(self.foundq, NodeWrap(node, self.num)) + self.schedule() + + +get_value_timeout = 15 +class GetValue(FindNode): + def __init__(self, table, target, callback, callLater, find="findValue"): + FindNode.__init__(self, table, target, callback, callLater) + self.findValue = find + + """ get value task """ + def handleGotNodes(self, dict): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + sender = {'id' : dict["id"]} + sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] + sender = self.table.Node().initWithDict(sender) + + if self.finished or self.answered.has_key(sender.id): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + + self.answered[sender.id] = sender + # go through nodes + # if we have any closer than what we already got, query them + if dict.has_key('nodes'): + try: + l = unpackNodes(dict.get('nodes',[])) + except: + l = [] + del(self.answered[sender.id]) + + for node in l: + n = self.table.Node().initWithDict(node) + if not self.found.has_key(n.id): + self.table.insertNode(n) + self.found[n.id] = n + insort(self.foundq, NodeWrap(n, self.num)) + elif dict.has_key('values'): + def x(y, z=self.results): + if not z.has_key(y): + z[y] = 1 + return y + else: + return None + z = len(dict.get('values', [])) + v = filter(None, map(x, dict.get('values',[]))) + if(len(v)): + self.callLater(self.callback, 0, (v,)) + self.schedule() + + ## get value + def schedule(self): + if self.finished: + return + for node in [wrapper.node for wrapper in self.foundq[:K]]: + if self.shouldQuery(node): + #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT + try: + f = getattr(node, self.findValue) + except AttributeError: + print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) + else: + try: + df = f(self.target, self.table.node.id) + df.addCallback(self.handleGotNodes) + df.addErrback(self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + except KRPCSelfNodeError: + pass + if self.outstanding >= const.CONCURRENT_REQS: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done, didn't find it!! + self.finished=1 + self._cleanup() + self.callLater(self.callback,0, ([],)) + + ## get value + def goWithNodes(self, nodes, found=None): + self.results = {} + if found: + for n in found: + self.results[n] = 1 + for node in nodes: + if node.id == self.table.node.id: + continue + else: + self.found[node.id] = node + insort(self.foundq, NodeWrap(node, self.num)) + self.schedule() + + +class StoreValue(ActionBase): + def __init__(self, table, target, value, callback, callLater, store="storeValue"): + ActionBase.__init__(self, table, target, callback, callLater) + self.value = value + self.stored = [] + self.store = store + + def storedValue(self, t, node): + self.outstanding -= 1 + if self.finished: + return + self.stored.append(t) + if len(self.stored) >= const.STORE_REDUNDANCY: + self.finished=1 + self.callback(self.stored) + else: + if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: + self.schedule() + return t + + def storeFailed(self, t, node): + self.outstanding -= 1 + if self.finished: + return t + self.schedule() + return t + + def schedule(self): + if self.finished: + return + num = const.CONCURRENT_REQS - self.outstanding + if num > const.STORE_REDUNDANCY - len(self.stored): + num = const.STORE_REDUNDANCY - len(self.stored) + if num == 0 and not self.finished: + self.finished=1 + self.callback(self.stored) + while num > 0: + try: + node = self.nodes.pop() + except IndexError: + if self.outstanding == 0: + self.finished = 1 + self._cleanup() + self.callback(self.stored) + return + else: + if not node.id == self.table.node.id: + try: + f = getattr(node, self.store) + except AttributeError: + print ">>> %s doesn't have a %s method!" % (node, self.store) + else: + try: + df = f(self.target, self.value, self.table.node.id) + except KRPCProtocolError: + self.table.table.invalidateNode(node) + except KRPCSelfNodeError: + pass + else: + df.addCallback(self.storedValue,(),{'node':node}) + df.addErrback(self.storeFailed, (), {'node':node}) + self.outstanding += 1 + num -= 1 + + def goWithNodes(self, nodes): + self.nodes = nodes + self.nodes.sort(self.sort) + self.schedule() + + +class GetAndStore(GetValue): + def __init__(self, table, target, value, callback, storecallback, callLater, find="findValue", store="storeValue"): + self.store = store + self.value = value + self.cb2 = callback + self.storecallback = storecallback + def cb(res): + self.cb2(res) + if not(res): + n = StoreValue(self.table, self.target, self.value, self.doneStored, self.callLater, self.store) + n.goWithNodes(self.answered.values()) + GetValue.__init__(self, table, target, cb, callLater, find) + + def doneStored(self, dict): + self.storecallback(dict) + +class KeyExpirer: + def __init__(self, store, callLater): + self.store = store + self.callLater = callLater + self.callLater(self.doExpire, const.KEINITIAL_DELAY) + + def doExpire(self): + self.cut = time() - const.KE_AGE + self.store.expire(self.cut) + self.callLater(self.doExpire, const.KE_DELAY) diff --git a/khashmir/cache.py b/khashmir/cache.py new file mode 100644 index 0000000..ab4df30 --- /dev/null +++ b/khashmir/cache.py @@ -0,0 +1,52 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from BitTorrent.platform import bttime as time + +class Cache: + def __init__(self, touch_on_access = False): + self.data = {} + self.q = [] + self.touch = touch_on_access + + def __getitem__(self, key): + if self.touch: + v = self.data[key][1] + self[key] = v + return self.data[key][1] + + def __setitem__(self, key, value): + t = time() + self.data[key] = (t, value) + self.q.insert(0, (t, key, value)) + + def __delitem__(self, key): + del(self.data[key]) + + def has_key(self, key): + return self.data.has_key(key) + + def keys(self): + return self.data.keys() + + def expire(self, expire_time): + try: + while self.q[-1][0] < expire_time: + x = self.q.pop() + assert(x[0] < expire_time) + try: + t, v = self.data[x[1]] + if v == x[2] and t == x[0]: + del(self.data[x[1]]) + except KeyError: + pass + except IndexError: + pass + diff --git a/khashmir/const.py b/khashmir/const.py new file mode 100644 index 0000000..eabe0e8 --- /dev/null +++ b/khashmir/const.py @@ -0,0 +1,74 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +# magic id to use before we know a peer's id +NULL_ID = 20 * '\0' + +# Kademlia "K" constant, this should be an even number +K = 8 + +# SHA1 is 160 bits long +HASH_LENGTH = 160 + +# checkpoint every this many seconds +CHECKPOINT_INTERVAL = 60 * 5 # five minutes + +# how often to find our own nodes +FIND_CLOSE_INTERVAL = 60 * 15 # fifteen minutes + +### SEARCHING/STORING +# concurrent krpc calls per find node/value request! +CONCURRENT_REQS = K + +# how many hosts to post to +STORE_REDUNDANCY = 3 + + +### ROUTING TABLE STUFF +# how many times in a row a node can fail to respond before it's booted from the routing table +MAX_FAILURES = 3 + +# never ping a node more often than this +MIN_PING_INTERVAL = 60 * 15 # fifteen minutes + +# refresh buckets that haven't been touched in this long +BUCKET_STALENESS = 60 * 15 # fifteen minutes + + +### KEY EXPIRER +# time before expirer starts running +KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db + +# time between expirer runs +KE_DELAY = 60 * 5 # 5 minutes + +# expire entries older than this +KE_AGE = 60 * 30 # 30 minutes + + +## krpc errback codes +KRPC_TIMEOUT = 20 + +KRPC_ERROR = 1 +KRPC_ERROR_METHOD_UNKNOWN = 2 +KRPC_ERROR_RECEIVED_UNKNOWN = 3 +KRPC_ERROR_TIMEOUT = 4 +KRPC_SOCKET_ERROR = 5 + +KRPC_CONNECTION_CACHE_TIME = KRPC_TIMEOUT * 2 + + +## krpc erorr response codes +KERR_ERROR = (201, "Generic Error") +KERR_SERVER_ERROR = (202, "Server Error") +KERR_PROTOCOL_ERROR = (203, "Protocol Error") +KERR_METHOD_UNKNOWN = (204, "Method Unknown") +KERR_INVALID_ARGS = (205, "Invalid Argements") +KERR_INVALID_TOKEN = (206, "Invalid Token") diff --git a/khashmir/hammerlock.py b/khashmir/hammerlock.py new file mode 100644 index 0000000..dd4b5a1 --- /dev/null +++ b/khashmir/hammerlock.py @@ -0,0 +1,37 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +INTERVAL = 60 +PERIODS = 5 + +class Hammerlock: + def __init__(self, rate, call_later): + self.rate = rate + self.call_later = call_later + self.curr = 0 + self.buckets = [{} for x in range(PERIODS)] + self.call_later(self._cycle, INTERVAL) + + def _cycle(self): + self.curr = (self.curr + 1) % PERIODS + self.buckets[self.curr] = {} + self.call_later(self._cycle, INTERVAL) + + def check(self, addr): + x = self.buckets[self.curr].get(addr, 0) + 1 + self.buckets[self.curr][addr] = x + x = 0 + for bucket in self.buckets: + x += bucket.get(addr, 0) + if x >= self.rate: + return False + else: + return True + diff --git a/khashmir/inserter.py b/khashmir/inserter.py new file mode 100644 index 0000000..d45ff04 --- /dev/null +++ b/khashmir/inserter.py @@ -0,0 +1,49 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +### generate a bunch of nodes that use a single contact point +usage = "usage: inserter.py <contact host> <contact port>" + +from utkhashmir import UTKhashmir +from BitTorrent.RawServer_magic import RawServer +from BitTorrent.defaultargs import common_options, rare_options +from khashmir.khash import newID +from random import randrange +from threading import Event +import sys, os + +from khashmir.krpc import KRPC +KRPC.noisy = 1 +global done +done = 0 +def d(n): + global done + done = done+1 + +if __name__=="__main__": + global done + host, port = sys.argv[1:] + x = UTKhashmir("", 22038, "/tmp/cgcgcgc") + x.addContact(host, int(port)) + x.rawserver.listen_once() + x.findCloseNodes(d) + while not done: + x.rawserver.listen_once() + l = [] + for i in range(10): + k = newID() + v = randrange(10000,20000) + l.append((k, v)) + x.announcePeer(k, v, d) + done = 1 + while done < 10: + x.rawserver.listen_once(1) + for k,v in l: + print ">>>", `k`, v diff --git a/khashmir/khash.py b/khashmir/khash.py new file mode 100644 index 0000000..2750a7d --- /dev/null +++ b/khashmir/khash.py @@ -0,0 +1,120 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from sha import sha +from random import randint + +#this is ugly, hopefully os.entropy will be in 2.4 +try: + from entropy import entropy +except ImportError: + def entropy(n): + s = '' + for i in range(n): + s += chr(randint(0,255)) + return s + +def intify(hstr): + """20 bit hash, big-endian -> long python integer""" + assert len(hstr) == 20 + return long(hstr.encode('hex'), 16) + +def stringify(num): + """long int -> 20-character string""" + str = hex(num)[2:] + if str[-1] == 'L': + str = str[:-1] + if len(str) % 2 != 0: + str = '0' + str + str = str.decode('hex') + return (20 - len(str)) *'\x00' + str + +def distance(a, b): + """distance between two 160-bit hashes expressed as 20-character strings""" + return intify(a) ^ intify(b) + + +def newID(): + """returns a new pseudorandom globally unique ID string""" + h = sha() + h.update(entropy(20)) + return h.digest() + +def newIDInRange(min, max): + return stringify(randRange(min,max)) + +def randRange(min, max): + return min + intify(newID()) % (max - min) + +def newTID(): + return randRange(-2**30, 2**30) + +### Test Cases ### +import unittest + +class NewID(unittest.TestCase): + def testLength(self): + self.assertEqual(len(newID()), 20) + def testHundreds(self): + for x in xrange(100): + self.testLength + +class Intify(unittest.TestCase): + known = [('\0' * 20, 0), + ('\xff' * 20, 2L**160 - 1), + ] + def testKnown(self): + for str, value in self.known: + self.assertEqual(intify(str), value) + def testEndianessOnce(self): + h = newID() + while h[-1] == '\xff': + h = newID() + k = h[:-1] + chr(ord(h[-1]) + 1) + self.assertEqual(intify(k) - intify(h), 1) + def testEndianessLots(self): + for x in xrange(100): + self.testEndianessOnce() + +class Disantance(unittest.TestCase): + known = [ + (("\0" * 20, "\xff" * 20), 2**160L -1), + ((sha("foo").digest(), sha("foo").digest()), 0), + ((sha("bar").digest(), sha("bar").digest()), 0) + ] + def testKnown(self): + for pair, dist in self.known: + self.assertEqual(distance(pair[0], pair[1]), dist) + def testCommutitive(self): + for i in xrange(100): + x, y, z = newID(), newID(), newID() + self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z)) + +class RandRange(unittest.TestCase): + def testOnce(self): + a = intify(newID()) + b = intify(newID()) + if a < b: + c = randRange(a, b) + self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a)) + else: + c = randRange(b, a) + assert b <= c < a, "output out of range %d %d %d" % (b, c, a) + + def testOneHundredTimes(self): + for i in xrange(100): + self.testOnce() + + + +if __name__ == '__main__': + unittest.main() + + diff --git a/khashmir/khashmir.py b/khashmir/khashmir.py new file mode 100644 index 0000000..d939f67 --- /dev/null +++ b/khashmir/khashmir.py @@ -0,0 +1,442 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +import const +from socket import gethostbyname + +from BitTorrent.platform import bttime as time + +from sha import sha +import re +from BitTorrent.defaultargs import common_options, rare_options +from BitTorrent.RawServer_magic import RawServer + +from ktable import KTable, K +from knode import * +from kstore import KStore +from khash import newID, newIDInRange + +from util import packNodes +from actions import FindNode, GetValue, KeyExpirer, StoreValue +import krpc + +import sys +import os +import traceback + +from BitTorrent.bencode import bencode, bdecode + +from BitTorrent.defer import Deferred +from random import randrange +from kstore import sample + +from threading import Event, Thread + +ip_pat = re.compile('[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}') + +class KhashmirDBExcept(Exception): + pass + +def foo(bytes): + pass + +# this is the base class, has base functionality and find node, no key-value mappings +class KhashmirBase: + _Node = KNodeBase + def __init__(self, host, port, data_dir, rawserver=None, max_ul_rate=1024, checkpoint=True, errfunc=None, rlcount=foo, config={'pause':False, 'max_rate_period':20}): + if rawserver: + self.rawserver = rawserver + else: + self.flag = Event() + d = dict([(x[0],x[1]) for x in common_options + rare_options]) + self.rawserver = RawServer(self.flag, d) + self.max_ul_rate = max_ul_rate + self.socket = None + self.config = config + self.setup(host, port, data_dir, rlcount, checkpoint) + + def setup(self, host, port, data_dir, rlcount, checkpoint=True): + self.host = host + self.port = port + self.ddir = data_dir + self.store = KStore() + self.pingcache = {} + self.socket = self.rawserver.create_udpsocket(self.port, self.host, False) + self.udp = krpc.hostbroker(self, (self.host, self.port), self.socket, self.rawserver.add_task, self.max_ul_rate, self.config, rlcount) + self._load() + self.rawserver.start_listening_udp(self.socket, self.udp) + self.last = time() + KeyExpirer(self.store, self.rawserver.add_task) + self.refreshTable(force=1) + if checkpoint: + self.rawserver.add_task(self.findCloseNodes, 30, (lambda a: a, True)) + self.rawserver.add_task(self.checkpoint, 60, (1,)) + + def Node(self): + n = self._Node(self.udp.connectionForAddr) + n.table = self + return n + + def __del__(self): + if self.socket is not None: + self.rawserver.stop_listening_udp(self.socket) + self.socket.close() + + def _load(self): + do_load = False + try: + s = open(os.path.join(self.ddir, "routing_table"), 'r').read() + dict = bdecode(s) + except: + id = newID() + else: + id = dict['id'] + do_load = True + + self.node = self._Node(self.udp.connectionForAddr).init(id, self.host, self.port) + self.table = KTable(self.node) + if do_load: + self._loadRoutingTable(dict['rt']) + + + def checkpoint(self, auto=0): + d = {} + d['id'] = self.node.id + d['rt'] = self._dumpRoutingTable() + try: + f = open(os.path.join(self.ddir, "routing_table"), 'wb') + f.write(bencode(d)) + f.close() + except Exception, e: + #XXX real error here + print ">>> unable to dump routing table!", str(e) + pass + + + if auto: + self.rawserver.add_task(self.checkpoint, + randrange(int(const.CHECKPOINT_INTERVAL * .9), + int(const.CHECKPOINT_INTERVAL * 1.1)), + (1,)) + + def _loadRoutingTable(self, nodes): + """ + load routing table nodes from database + it's usually a good idea to call refreshTable(force=1) after loading the table + """ + for rec in nodes: + n = self.Node().initWithDict(rec) + self.table.insertNode(n, contacted=0, nocheck=True) + + def _dumpRoutingTable(self): + """ + save routing table nodes to the database + """ + l = [] + for bucket in self.table.buckets: + for node in bucket.l: + l.append({'id':node.id, 'host':node.host, 'port':node.port, 'age':int(node.age)}) + return l + + + def _addContact(self, host, port, callback=None): + """ + ping this node and add the contact info to the table on pong! + """ + n =self.Node().init(const.NULL_ID, host, port) + try: + self.sendPing(n, callback=callback) + except krpc.KRPCSelfNodeError: + # our own node + pass + + + ####### + ####### LOCAL INTERFACE - use these methods! + def addContact(self, ip, port, callback=None): + """ + ping this node and add the contact info to the table on pong! + """ + if ip_pat.match(ip): + self._addContact(ip, port) + else: + def go(ip=ip, port=port): + ip = gethostbyname(ip) + self.rawserver.external_add_task(self._addContact, 0, (ip, port)) + t = Thread(target=go) + t.start() + + + ## this call is async! + def findNode(self, id, callback, errback=None): + """ returns the contact info for node, or the k closest nodes, from the global table """ + # get K nodes out of local table/cache, or the node we want + nodes = self.table.findNodes(id, invalid=True) + l = [x for x in nodes if x.invalid] + if len(l) > 4: + nodes = sample(l , 4) + self.table.findNodes(id, invalid=False)[:4] + + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + if len(nodes) == 1 and nodes[0].id == id : + d.callback(nodes) + else: + # create our search state + state = FindNode(self, id, d.callback, self.rawserver.add_task) + self.rawserver.external_add_task(state.goWithNodes, 0, (nodes,)) + + def insertNode(self, n, contacted=1): + """ + insert a node in our local table, pinging oldest contact in bucket, if necessary + + If all you have is a host/port, then use addContact, which calls this method after + receiving the PONG from the remote node. The reason for the seperation is we can't insert + a node into the table without it's peer-ID. That means of course the node passed into this + method needs to be a properly formed Node object with a valid ID. + """ + old = self.table.insertNode(n, contacted=contacted) + if old and old != n: + if not old.inPing(): + self.checkOldNode(old, n, contacted) + else: + l = self.pingcache.get(old.id, []) + if len(l) < 10 or contacted: + l.append((n, contacted)) + self.pingcache[old.id] = l + + + + def checkOldNode(self, old, new, contacted=False): + ## these are the callbacks used when we ping the oldest node in a bucket + + def cmp(a, b): + if a[1] == 1 and b[1] == 0: + return -1 + elif b[1] == 1 and a[1] == 0: + return 1 + else: + return 0 + + def _staleNodeHandler(dict, old=old, new=new, contacted=contacted): + """ called if the pinged node never responds """ + if old.fails >= 2: + l = self.pingcache.get(old.id, []) + l.sort(cmp) + if l: + n, nc = l[0] + if (not contacted) and nc: + l = l[1:] + [(new, contacted)] + new = n + contacted = nc + o = self.table.replaceStaleNode(old, new) + if o and o != new: + self.checkOldNode(o, new) + try: + self.pingcache[o.id] = self.pingcache[old.id] + del(self.pingcache[old.id]) + except KeyError: + pass + else: + if l: + del(self.pingcache[old.id]) + l.sort(cmp) + for node in l: + self.insertNode(node[0], node[1]) + else: + l = self.pingcache.get(old.id, []) + if l: + del(self.pingcache[old.id]) + self.insertNode(new, contacted) + for node in l: + self.insertNode(node[0], node[1]) + + def _notStaleNodeHandler(dict, old=old, new=new, contacted=contacted): + """ called when we get a pong from the old node """ + self.table.insertNode(old, True) + self.insertNode(new, contacted) + l = self.pingcache.get(old.id, []) + l.sort(cmp) + for node in l: + self.insertNode(node[0], node[1]) + try: + del(self.pingcache[old.id]) + except KeyError: + pass + try: + df = old.ping(self.node.id) + except krpc.KRPCSelfNodeError: + pass + df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + + def sendPing(self, node, callback=None): + """ + ping a node + """ + try: + df = node.ping(self.node.id) + except krpc.KRPCSelfNodeError: + pass + else: + ## these are the callbacks we use when we issue a PING + def _pongHandler(dict, node=node, table=self.table, callback=callback): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + sender = {'id' : dict['id']} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + table.insertNode(n) + if callback: + callback() + def _defaultPong(err, node=node, table=self.table, callback=callback): + if callback: + callback() + + df.addCallbacks(_pongHandler,_defaultPong) + + def findCloseNodes(self, callback=lambda a: a, auto=False): + """ + This does a findNode on the ID one away from our own. + This will allow us to populate our table with nodes on our network closest to our own. + This is called as soon as we start up with an empty table + """ + if not self.config['pause']: + id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) + self.findNode(id, callback) + if auto: + if not self.config['pause']: + self.refreshTable() + self.rawserver.external_add_task(self.findCloseNodes, randrange(int(const.FIND_CLOSE_INTERVAL *0.9), + int(const.FIND_CLOSE_INTERVAL *1.1)), (lambda a: True, True)) + + def refreshTable(self, force=0): + """ + force=1 will refresh table regardless of last bucket access time + """ + def callback(nodes): + pass + + refresh = [bucket for bucket in self.table.buckets if force or (len(bucket.l) < K) or len(filter(lambda a: a.invalid, bucket.l)) or (time() - bucket.lastAccessed > const.BUCKET_STALENESS)] + for bucket in refresh: + id = newIDInRange(bucket.min, bucket.max) + self.findNode(id, callback) + + def stats(self): + """ + Returns (num_contacts, num_nodes) + num_contacts: number contacts in our routing table + num_nodes: number of nodes estimated in the entire dht + """ + num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) + num_nodes = const.K * (2**(len(self.table.buckets) - 1)) + return {'num_contacts':num_contacts, 'num_nodes':num_nodes} + + def krpc_ping(self, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + + def krpc_find_node(self, target, id, _krpc_sender): + nodes = self.table.findNodes(target, invalid=False) + nodes = map(lambda node: node.senderDict(), nodes) + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return {"nodes" : packNodes(nodes), "id" : self.node.id} + + +## This class provides read-only access to the DHT, valueForKey +## you probably want to use this mixin and provide your own write methods +class KhashmirRead(KhashmirBase): + _Node = KNodeRead + def retrieveValues(self, key): + try: + l = self.store[key] + except KeyError: + l = [] + return l + ## also async + def valueForKey(self, key, callback, searchlocal = 1): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(key) + + # get locals + if searchlocal: + l = self.retrieveValues(key) + if len(l) > 0: + self.rawserver.external_add_task(callback, 0, (l,)) + else: + l = [] + + # create our search state + state = GetValue(self, key, callback, self.rawserver.add_task) + self.rawserver.external_add_task(state.goWithNodes, 0, (nodes, l)) + + def krpc_find_value(self, key, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + self.insertNode(n, contacted=0) + + l = self.retrieveValues(key) + if len(l) > 0: + return {'values' : l, "id": self.node.id} + else: + nodes = self.table.findNodes(key, invalid=False) + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : packNodes(nodes), "id": self.node.id} + +### provides a generic write method, you probably don't want to deploy something that allows +### arbitrary value storage +class KhashmirWrite(KhashmirRead): + _Node = KNodeWrite + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + def storeValueForKey(self, key, value, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + action = StoreValue(self, key, value, response, self.rawserver.add_task) + self.rawserver.external_add_task(action.goWithNodes, 0, (nodes,)) + + # this call is asynch + self.findNode(key, _storeValueForKey) + + def krpc_store_value(self, key, value, id, _krpc_sender): + t = "%0.6f" % time() + self.store[key] = value + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + +# the whole shebang, for testing +class Khashmir(KhashmirWrite): + _Node = KNodeWrite diff --git a/khashmir/knet.py b/khashmir/knet.py new file mode 100644 index 0000000..3baf1e9 --- /dev/null +++ b/khashmir/knet.py @@ -0,0 +1,76 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +# +# knet.py +# create a network of khashmir nodes +# usage: knet.py <num_nodes> <start_port> <ip_address> + +from khashmir import Khashmir +from random import randrange +import sys, os + +class Network: + def __init__(self, size=0, startport=5555, localip='127.0.0.1'): + self.num = size + self.startport = startport + self.localip = localip + + def _done(self, val): + self.done = 1 + + def setUp(self): + self.kfiles() + self.l = [] + for i in range(self.num): + self.l.append(Khashmir('', self.startport + i, '/tmp/kh%s.db' % (self.startport + i))) + reactor.iterate() + reactor.iterate() + + for i in self.l: + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + reactor.iterate() + reactor.iterate() + reactor.iterate() + + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + + def tearDown(self): + for i in self.l: + i.listenport.stopListening() + self.kfiles() + + def kfiles(self): + for i in range(self.startport, self.startport+self.num): + try: + os.unlink('/tmp/kh%s.db' % i) + except: + pass + + reactor.iterate() + +if __name__ == "__main__": + n = Network(int(sys.argv[1]), int(sys.argv[2]), sys.argv[3]) + n.setUp() + try: + reactor.run() + finally: + n.tearDown() diff --git a/khashmir/knode.py b/khashmir/knode.py new file mode 100644 index 0000000..560252a --- /dev/null +++ b/khashmir/knode.py @@ -0,0 +1,82 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from node import Node +from BitTorrent.defer import Deferred +from const import NULL_ID +from krpc import KRPCProtocolError + +class IDChecker: + def __init__(self, id): + self.id = id + +class KNodeBase(Node): + def __init__(self, cfa): + Node.__init__(self) + self.cfa = cfa + + def conn(self): + return self.cfa((self.host, self.port)) + + def checkSender(self, dict): + try: + senderid = dict['rsp']['id'] + except KeyError: + raise KRPCProtocolError, "No peer id in response." + else: + if self.id != NULL_ID and senderid != self.id: + self.table.table.invalidateNode(self) + else: + if self.id == NULL_ID: + self.id = senderid + self.table.insertNode(self, contacted=1) + return dict + + def errBack(self, err): + self.table.table.nodeFailed(self) + return err[0] + + def ping(self, id): + df = self.conn().sendRequest('ping', {"id":id}) + self.conn().pinging = True + def endping(x): + self.conn().pinging = False + return x + df.addCallbacks(endping, endping) + df.addCallbacks(self.checkSender, self.errBack) + return df + + def findNode(self, target, id): + df = self.conn().sendRequest('find_node', {"target" : target, "id": id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + + def inPing(self): + return self.conn().pinging + +class KNodeRead(KNodeBase): + def findValue(self, key, id): + df = self.conn().sendRequest('find_value', {"key" : key, "id" : id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + +class KNodeWrite(KNodeRead): + def storeValue(self, key, value, id): + df = self.conn().sendRequest('store_value', {"key" : key, "value" : value, "id": id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + def storeValues(self, key, value, id): + df = self.conn().sendRequest('store_values', {"key" : key, "values" : value, "id": id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df diff --git a/khashmir/krpc.py b/khashmir/krpc.py new file mode 100644 index 0000000..86c9973 --- /dev/null +++ b/khashmir/krpc.py @@ -0,0 +1,240 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from BitTorrent.defer import Deferred +from BitTorrent.bencode import bencode, bdecode +import socket +from BitTorrent.RawServer_magic import Handler +from BitTorrent.platform import bttime +import time +from math import log10 + +import sys +from traceback import print_exc + +from khash import distance +from cache import Cache +from KRateLimiter import KRateLimiter +from hammerlock import Hammerlock + +from const import * + +# commands +TID = 't' +REQ = 'q' +RSP = 'r' +TYP = 'y' +ARG = 'a' +ERR = 'e' + +class KRPCFailSilently(Exception): + pass + +class KRPCProtocolError(Exception): + pass + +class KRPCServerError(Exception): + pass + +class KRPCSelfNodeError(Exception): + pass + +class hostbroker(Handler): + def __init__(self, server, addr, transport, call_later, max_ul_rate, config, rlcount): + self.server = server + self.addr = addr + self.transport = transport + self.rltransport = KRateLimiter(transport, max_ul_rate, call_later, rlcount, config['max_rate_period']) + self.call_later = call_later + self.connections = Cache(touch_on_access=True) + self.hammerlock = Hammerlock(100, call_later) + self.expire_connections(loop=True) + self.config = config + if not self.config.has_key('pause'): + self.config['pause'] = False + + def expire_connections(self, loop=False): + self.connections.expire(bttime() - KRPC_CONNECTION_CACHE_TIME) + if loop: + self.call_later(self.expire_connections, KRPC_CONNECTION_CACHE_TIME, (True,)) + + def data_came_in(self, addr, datagram): + #if addr != self.addr: + if not self.config['pause'] and self.hammerlock.check(addr): + c = self.connectionForAddr(addr) + c.datagramReceived(datagram, addr) + + def connection_lost(self, socket): + ## this is like, bad + print ">>> connection lost!", socket + + def connectionForAddr(self, addr): + if addr == self.addr: + raise KRPCSelfNodeError() + if not self.connections.has_key(addr): + conn = KRPC(addr, self.server, self.transport, self.rltransport, self.call_later) + self.connections[addr] = conn + else: + conn = self.connections[addr] + return conn + + +## connection +class KRPC: + noisy = 0 + def __init__(self, addr, server, transport, rltransport, call_later): + self.call_later = call_later + self.transport = transport + self.rltransport = rltransport + self.factory = server + self.addr = addr + self.tids = {} + self.mtid = 0 + self.pinging = False + + def sendErr(self, addr, tid, code, msg): + ## send error + out = bencode({TID:tid, TYP:ERR, ERR :(code, msg)}) + olen = len(out) + self.rltransport.sendto(out, 0, addr) + return olen + + def datagramReceived(self, str, addr): + # bdecode + try: + msg = bdecode(str) + except Exception, e: + if self.noisy: + print "response decode error: " + `e`, `str` + else: + #if self.noisy: + # print msg + # look at msg type + if msg[TYP] == REQ: + ilen = len(str) + # if request + # tell factory to handle + f = getattr(self.factory ,"krpc_" + msg[REQ], None) + msg[ARG]['_krpc_sender'] = self.addr + if f and callable(f): + try: + ret = apply(f, (), msg[ARG]) + except KRPCFailSilently: + pass + except KRPCServerError, e: + olen = self.sendErr(addr, msg[TID], 202, "Server Error: %s" % e.args[0]) + except KRPCProtocolError, e: + olen = self.sendErr(addr, msg[TID], 204, "Protocol Error: %s" % e.args[0]) + except Exception, e: + print_exc(20) + olen = self.sendErr(addr, msg[TID], 202, "Server Error") + else: + if ret: + # make response + out = bencode({TID : msg[TID], TYP : RSP, RSP : ret}) + else: + out = bencode({TID : msg[TID], TYP : RSP, RSP : {}}) + # send response + olen = len(out) + self.rltransport.sendto(out, 0, addr) + + else: + if self.noisy: + #print "don't know about method %s" % msg[REQ] + pass + # unknown method + olen = self.sendErr(addr, msg[TID], *KERR_METHOD_UNKNOWN) + if self.noisy: + try: + ndist = 10 * log10(2**160 * 1.0 / distance(self.factory.node.id, msg[ARG]['id'])) + ndist = int(ndist) + except OverflowError: + ndist = 999 + + h = None + if msg[ARG].has_key('target'): + h = msg[ARG]['target'] + elif msg[ARG].has_key('info_hash'): + h = msg[ARG]['info_hash'] + else: + tdist = '-' + + if h != None: + try: + tdist = 10 * log10(2**160 * 1.0 / distance(self.factory.node.id, h)) + tdist = int(tdist) + except OverflowError: + tdist = 999 + + t = time.localtime() + t = "%2d-%2d-%2d %2d:%2d:%2d" % (t[0], t[1], t[2], t[3], t[4], t[5]) + print "%s %s %s >>> %s - %s %s %s - %s %s" % (t, + msg[ARG]['id'].encode('base64')[:4], + addr, + self.factory.node.port, + ilen, + msg[REQ], + olen, + ndist, + tdist) + elif msg[TYP] == RSP: + # if response + # lookup tid + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] + # callback + del(self.tids[msg[TID]]) + df.callback({'rsp' : msg[RSP], '_krpc_sender': addr}) + else: + # no tid, this transaction timed out already... + pass + + elif msg[TYP] == ERR: + # if error + # lookup tid + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] + # callback + df.errback(msg[ERR]) + del(self.tids[msg[TID]]) + else: + # day late and dollar short + pass + else: + # unknown message type + df = self.tids[msg[TID]] + # callback + df.errback((KRPC_ERROR_RECEIVED_UNKNOWN, _("received unknown message type"))) + del(self.tids[msg[TID]]) + + def sendRequest(self, method, args): + # make message + # send it + msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} + self.mtid = (self.mtid + 1) % 256 + s = bencode(msg) + d = Deferred() + self.tids[msg[TID]] = d + self.call_later(self.timeOut, KRPC_TIMEOUT, (msg[TID],)) + self.call_later(self._send, 0, (s, d)) + return d + + def timeOut(self, id): + if self.tids.has_key(id): + df = self.tids[id] + del(self.tids[id]) + df.errback((KRPC_ERROR_TIMEOUT, _("timeout"))) + + def _send(self, s, d): + try: + self.transport.sendto(s, 0, self.addr) + except socket.error: + d.errback((KRPC_SOCKET_ERROR, _("socket error"))) + diff --git a/khashmir/kstore.py b/khashmir/kstore.py new file mode 100644 index 0000000..5c01200 --- /dev/null +++ b/khashmir/kstore.py @@ -0,0 +1,119 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +try: + from random import sample +except ImportError: + from random import choice + def sample(l, n): + if len(l) <= n: + return l + d = {} + while len(d) < n: + d[choice(l)] = 1 + return d.keys() + +from BitTorrent.platform import bttime as time + +class KItem: + def __init__(self, key, value): + self.t = time() + self.k = key + self.v = value + def __cmp__(self, a): + # same value = same item, only used to keep dupes out of db + if self.v == a.v: + return 0 + + # compare by time + if self.t < a.t: + return -1 + elif self.t > a.t: + return 1 + else: + return 0 + + def __hash__(self): + return self.v.__hash__() + + def __repr__(self): + return `(self.k, self.v, time() - self.t)` + +## in memory data store for distributed tracker +## keeps a list of values per key in dictionary +## keeps expiration for each key in a queue +## can efficiently expire all values older than a given time +## can insert one val at a time, or a list: ks['key'] = 'value' or ks['key'] = ['v1', 'v2', 'v3'] +class KStore: + def __init__(self): + self.d = {} + self.q = [] + + def __getitem__(self, key): + return [x.v for x in self.d[key]] + + def __setitem__(self, key, value): + if type(value) == type([]): + [self.__setitem__(key, v) for v in value] + return + x = KItem(key, value) + try: + l = self.d[key] + except KeyError: + self.d[key] = [x] + else: + # this is slow + try: + i = l.index(x) + del(l[i]) + except ValueError: + pass + l.insert(0, x) + self.q.append(x) + + def __delitem__(self, key): + del(self.d[key]) + + def __len__(self): + return len(self.d) + + def keys(self): + return self.d.keys() + + def values(self): + return [self[key] for key in self.keys()] + + def items(self): + return [(key, self[key]) for key in self.keys()] + + def expire(self, t): + #.expire values inserted prior to t + try: + while self.q[0].t <= t: + x = self.q.pop(0) + try: + l = self.d[x.k] + try: + while l[-1].t <= t: + l.pop() + except IndexError: + del(self.d[x.k]) + except KeyError: + pass + except IndexError: + pass + + def sample(self, key, n): + # returns n random values of key, or all values if less than n + try: + l = [x.v for x in sample(self.d[key], n)] + except ValueError: + l = [x.v for x in self.d[key]] + return l diff --git a/khashmir/ktable.py b/khashmir/ktable.py new file mode 100644 index 0000000..e0a07b4 --- /dev/null +++ b/khashmir/ktable.py @@ -0,0 +1,338 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from BitTorrent.platform import bttime as time +from bisect import * +from types import * + +import khash as hash +import const +from const import K, HASH_LENGTH, NULL_ID, MAX_FAILURES, MIN_PING_INTERVAL +from node import Node + + +def ls(a, b): + return cmp(a.lastSeen, b.lastSeen) + +class KTable(object): + __slots__ = ('node', 'buckets') + """local routing table for a kademlia like distributed hash table""" + def __init__(self, node): + # this is the root node, a.k.a. US! + self.node = node + self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)] + self.insertNode(node) + + def _bucketIndexForInt(self, num): + """the index of the bucket that should hold int""" + return bisect_left(self.buckets, num) + + def bucketForInt(self, num): + return self.buckets[self._bucketIndexForInt(num)] + + def findNodes(self, id, invalid=True): + """ + return K nodes in our own local table closest to the ID. + """ + + if isinstance(id, str): + num = hash.intify(id) + elif isinstance(id, Node): + num = id.num + elif isinstance(id, int) or isinstance(id, long): + num = id + else: + raise TypeError, "findNodes requires an int, string, or Node" + + nodes = [] + i = self._bucketIndexForInt(num) + + # if this node is already in our table then return it + try: + node = self.buckets[i].getNodeWithInt(num) + except ValueError: + pass + else: + return [node] + + # don't have the node, get the K closest nodes + nodes = nodes + self.buckets[i].l + if not invalid: + nodes = [a for a in nodes if not a.invalid] + if len(nodes) < K: + # need more nodes + min = i - 1 + max = i + 1 + while len(nodes) < K and (min >= 0 or max < len(self.buckets)): + #ASw: note that this requires K be even + if min >= 0: + nodes = nodes + self.buckets[min].l + if max < len(self.buckets): + nodes = nodes + self.buckets[max].l + min = min - 1 + max = max + 1 + if not invalid: + nodes = [a for a in nodes if not a.invalid] + + nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) + return nodes[:K] + + def _splitBucket(self, a): + diff = (a.max - a.min) / 2 + b = KBucket([], a.max - diff, a.max) + self.buckets.insert(self.buckets.index(a.min) + 1, b) + a.max = a.max - diff + # transfer nodes to new bucket + for anode in a.l[:]: + if anode.num >= a.max: + a.removeNode(anode) + b.addNode(anode) + + def replaceStaleNode(self, stale, new): + """this is used by clients to replace a node returned by insertNode after + it fails to respond to a Pong message""" + i = self._bucketIndexForInt(stale.num) + + if self.buckets[i].hasNode(stale): + self.buckets[i].removeNode(stale) + if new and self.buckets[i].hasNode(new): + self.buckets[i].seenNode(new) + elif new: + self.buckets[i].addNode(new) + + return + + def insertNode(self, node, contacted=1, nocheck=False): + """ + this insert the node, returning None if successful, returns the oldest node in the bucket if it's full + the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!! + contacted means that yes, we contacted THEM and we know the node is reachable + """ + if node.id == NULL_ID or node.id == self.node.id: + return + + if contacted: + node.updateLastSeen() + + # get the bucket for this node + i = self._bucketIndexForInt(node.num) + # check to see if node is in the bucket already + if self.buckets[i].hasNode(node): + it = self.buckets[i].l.index(node.num) + xnode = self.buckets[i].l[it] + if contacted: + node.age = xnode.age + self.buckets[i].seenNode(node) + elif xnode.lastSeen != 0 and xnode.port == node.port and xnode.host == node.host: + xnode.updateLastSeen() + return + + # we don't have this node, check to see if the bucket is full + if not self.buckets[i].bucketFull(): + # no, append this node and return + self.buckets[i].addNode(node) + return + + # full bucket, check to see if any nodes are invalid + t = time() + invalid = [x for x in self.buckets[i].invalid.values() if x.invalid] + if len(invalid) and not nocheck: + invalid.sort(ls) + while invalid and not self.buckets[i].hasNode(invalid[0]): + del(self.buckets[i].invalid[invalid[0].num]) + invalid = invalid[1:] + if invalid and (invalid[0].lastSeen == 0 and invalid[0].fails < MAX_FAILURES): + return invalid[0] + elif invalid: + self.replaceStaleNode(invalid[0], node) + return + + stale = [n for n in self.buckets[i].l if (t - n.lastSeen) > MIN_PING_INTERVAL] + if len(stale) and not nocheck: + stale.sort(ls) + return stale[0] + + # bucket is full and all nodes are valid, check to see if self.node is in the bucket + if not (self.buckets[i].min <= self.node < self.buckets[i].max): + return + + # this bucket is full and contains our node, split the bucket + if len(self.buckets) >= HASH_LENGTH: + # our table is FULL, this is really unlikely + print "Hash Table is FULL! Increase K!" + return + + self._splitBucket(self.buckets[i]) + + # now that the bucket is split and balanced, try to insert the node again + return self.insertNode(node, contacted) + + def justSeenNode(self, id): + """call this any time you get a message from a node + it will update it in the table if it's there """ + try: + n = self.findNodes(id)[0] + except IndexError: + return None + else: + tstamp = n.lastSeen + n.updateLastSeen() + bucket = self.bucketForInt(n.num) + bucket.seenNode(n) + return tstamp + + def invalidateNode(self, n): + """ + forget about node n - use when you know that node is invalid + """ + n.invalid = True + bucket = self.bucketForInt(n.num) + bucket.invalidateNode(n) + + def nodeFailed(self, node): + """ call this when a node fails to respond to a message, to invalidate that node """ + try: + n = self.findNodes(node.num)[0] + except IndexError: + return None + else: + if n.msgFailed() >= const.MAX_FAILURES: + self.invalidateNode(n) + + def numPeers(self): + """ estimated number of connectable nodes in global table """ + return 8 * (2 ** (len(self.buckets) - 1)) + +class KBucket(object): + __slots__ = ('min', 'max', 'lastAccessed', 'l', 'index', 'invalid') + def __init__(self, contents, min, max): + self.l = contents + self.index = {} + self.invalid = {} + self.min = min + self.max = max + self.lastAccessed = time() + + def touch(self): + self.lastAccessed = time() + + def lacmp(self, a, b): + if a.lastSeen > b.lastSeen: + return 1 + elif b.lastSeen > a.lastSeen: + return -1 + return 0 + + def sort(self): + self.l.sort(self.lacmp) + + def getNodeWithInt(self, num): + try: + node = self.index[num] + except KeyError: + raise ValueError + return node + + def addNode(self, node): + if len(self.l) >= K: + return + if self.index.has_key(node.num): + return + self.l.append(node) + self.index[node.num] = node + self.touch() + + def removeNode(self, node): + assert self.index.has_key(node.num) + del(self.l[self.l.index(node.num)]) + del(self.index[node.num]) + try: + del(self.invalid[node.num]) + except KeyError: + pass + self.touch() + + def invalidateNode(self, node): + self.invalid[node.num] = node + + def seenNode(self, node): + try: + del(self.invalid[node.num]) + except KeyError: + pass + it = self.l.index(node.num) + del(self.l[it]) + self.l.append(node) + self.index[node.num] = node + + def hasNode(self, node): + return self.index.has_key(node.num) + + def bucketFull(self): + return len(self.l) >= K + + def __repr__(self): + return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max) + + ## Comparators + # necessary for bisecting list of buckets with a hash expressed as an integer or a distance + # compares integer or node object with the bucket's range + def __lt__(self, a): + if isinstance(a, Node): a = a.num + return self.max <= a + def __le__(self, a): + if isinstance(a, Node): a = a.num + return self.min < a + def __gt__(self, a): + if isinstance(a, Node): a = a.num + return self.min > a + def __ge__(self, a): + if isinstance(a, Node): a = a.num + return self.max >= a + def __eq__(self, a): + if isinstance(a, Node): a = a.num + return self.min <= a and self.max > a + def __ne__(self, a): + if isinstance(a, Node): a = a.num + return self.min >= a or self.max < a + + +### UNIT TESTS ### +import unittest + +class TestKTable(unittest.TestCase): + def setUp(self): + self.a = Node().init(hash.newID(), 'localhost', 2002) + self.t = KTable(self.a) + + def testAddNode(self): + self.b = Node().init(hash.newID(), 'localhost', 2003) + self.t.insertNode(self.b) + self.assertEqual(len(self.t.buckets[0].l), 1) + self.assertEqual(self.t.buckets[0].l[0], self.b) + + def testRemove(self): + self.testAddNode() + self.t.invalidateNode(self.b) + self.assertEqual(len(self.t.buckets[0].l), 0) + + def testFail(self): + self.testAddNode() + for i in range(const.MAX_FAILURES - 1): + self.t.nodeFailed(self.b) + self.assertEqual(len(self.t.buckets[0].l), 1) + self.assertEqual(self.t.buckets[0].l[0], self.b) + + self.t.nodeFailed(self.b) + self.assertEqual(len(self.t.buckets[0].l), 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/khashmir/node.py b/khashmir/node.py new file mode 100644 index 0000000..65a48ef --- /dev/null +++ b/khashmir/node.py @@ -0,0 +1,95 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +import khash +from BitTorrent.platform import bttime as time +from types import * + +class Node: + """encapsulate contact info""" + def __init__(self): + self.fails = 0 + self.lastSeen = 0 + self.invalid = True + self.id = self.host = self.port = '' + self.age = time() + + def init(self, id, host, port): + self.id = id + self.num = khash.intify(id) + self.host = host + self.port = port + self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host} + return self + + def initWithDict(self, dict): + self._senderDict = dict + self.id = dict['id'] + self.num = khash.intify(self.id) + self.port = dict['port'] + self.host = dict['host'] + self.age = dict.get('age', self.age) + return self + + def updateLastSeen(self): + self.lastSeen = time() + self.fails = 0 + self.invalid = False + + def msgFailed(self): + self.fails = self.fails + 1 + return self.fails + + def senderDict(self): + return self._senderDict + + def __hash__(self): + return self.id.__hash__() + + def __repr__(self): + return ">node <%s> %s<" % (self.id.encode('base64')[:4], (self.host, self.port)) + + ## these comparators let us bisect/index a list full of nodes with either a node or an int/long + def __lt__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num < a + def __le__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num <= a + def __gt__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num > a + def __ge__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num >= a + def __eq__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num == a + def __ne__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num != a + + +import unittest + +class TestNode(unittest.TestCase): + def setUp(self): + self.node = Node().init(khash.newID(), 'localhost', 2002) + def testUpdateLastSeen(self): + t = self.node.lastSeen + self.node.updateLastSeen() + assert t < self.node.lastSeen + diff --git a/khashmir/setup.py b/khashmir/setup.py new file mode 100644 index 0000000..a9e680a --- /dev/null +++ b/khashmir/setup.py @@ -0,0 +1,70 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +#!/usr/bin/env python + +import os +import sys + +try: + import distutils.core + import distutils.command.build_ext +except ImportError: + raise SystemExit, """\ +You don't have the python development modules installed. + +If you have Debian you can install it by running + apt-get install python-dev + +If you have RedHat and know how to install this from an RPM please +email us so we can put instructions here. +""" + +try: + import twisted +except ImportError: + raise SystemExit, """\ +You don't have Twisted installed. + +Twisted can be downloaded from + http://twistedmatrix.com/products/download + +Anything later that version 1.0.3 should work +""" + +try: + import sqlite +except ImportError: + raise SystemExit, """\ +You don't have PySQLite installed. + +PySQLite can be downloaded from + http://sourceforge.net/project/showfiles.php?group_id=54058&release_id=139482 +""" + +setup_args = { + 'name': 'khashmir', + 'author': 'Andrew Loewenstern', + 'author_email': 'burris@users.sourceforge.net', + 'licence': 'MIT', + 'package_dir': {'khashmir': '.'}, + 'packages': [ + 'khashmir', + ], +} + +if hasattr(distutils.dist.DistributionMetadata, 'get_keywords'): + setup_args['keywords'] = "internet tcp p2p" + +if hasattr(distutils.dist.DistributionMetadata, 'get_platforms'): + setup_args['platforms'] = "win32 posix" + +if __name__ == '__main__': + apply(distutils.core.setup, (), setup_args) diff --git a/khashmir/test.py b/khashmir/test.py new file mode 100644 index 0000000..a6c8a9c --- /dev/null +++ b/khashmir/test.py @@ -0,0 +1,21 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +import unittest + +import ktable, khashmir +import khash, node, knode +import actions +import test_krpc +import test_khashmir +import kstore + +tests = unittest.defaultTestLoader.loadTestsFromNames(['kstore', 'khash', 'node', 'knode', 'actions', 'ktable', 'test_krpc', 'test_khashmir']) +result = unittest.TextTestRunner().run(tests) diff --git a/khashmir/test_khashmir.py b/khashmir/test_khashmir.py new file mode 100644 index 0000000..e22028e --- /dev/null +++ b/khashmir/test_khashmir.py @@ -0,0 +1,166 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from unittest import * + +from BitTorrent import RawServer_magic + +from khashmir import * +import khash +from copy import copy + +from random import randrange +from krpc import KRPC + +KRPC.noisy=0 +import os + +if __name__ =="__main__": + tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) + result = TextTestRunner().run(tests) + +class MultiTest(TestCase): + num = 25 + def _done(self, val): + self.done = 1 + + def setUp(self): + self.l = [] + self.startport = 10088 + d = dict([(x[0],x[1]) for x in common_options + rare_options]) + self.r = RawServer(Event(), d) + for i in range(self.num): + self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i), self.r)) + self.r.listen_once(1) + self.r.listen_once(1) + + for i in self.l: + try: + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + except: + pass + try: + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + except: + pass + try: + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + except: + pass + self.r.listen_once(1) + self.r.listen_once(1) + self.r.listen_once(1) + + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + self.r.listen_once(1) + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + self.r.listen_once(1) + + def tearDown(self): + for i in self.l: + self.r.stop_listening_udp(i.socket) + i.socket.close() + + self.r.listen_once(1) + + def testStoreRetrieve(self): + for i in range(10): + K = khash.newID() + V = khash.newID() + + for a in range(3): + self.done = 0 + def _scb(val): + self.done = 1 + self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb) + while not self.done: + self.r.listen_once(1) + + + def _rcb(val): + if not val: + self.done = 1 + self.assertEqual(self.got, 1) + elif V in val: + self.got = 1 + for x in range(3): + self.got = 0 + self.done = 0 + self.l[randrange(0, self.num)].valueForKey(K, _rcb) + while not self.done: + self.r.listen_once(1) + +class AASimpleTests(TestCase): + def setUp(self): + d = dict([(x[0],x[1]) for x in common_options + rare_options]) + self.r = RawServer(Event(), d) + self.a = Khashmir('127.0.0.1', 4044, '/tmp/a.test', self.r) + self.b = Khashmir('127.0.0.1', 4045, '/tmp/b.test', self.r) + + def tearDown(self): + self.r.stop_listening_udp(self.a.socket) + self.r.stop_listening_udp(self.b.socket) + self.a.socket.close() + self.b.socket.close() + + def addContacts(self): + self.a.addContact('127.0.0.1', 4045) + self.r.listen_once(1) + self.r.listen_once(1) + + def testStoreRetrieve(self): + self.addContacts() + self.got = 0 + self.a.storeValueForKey(sha('foo').digest(), 'foobar') + self.r.listen_once(1) + self.r.listen_once(1) + self.r.listen_once(1) + self.r.listen_once(1) + self.r.listen_once(1) + self.r.listen_once(1) + self.a.valueForKey(sha('foo').digest(), self._cb) + self.r.listen_once(1) + self.r.listen_once(1) + self.r.listen_once(1) + self.r.listen_once(1) + + def _cb(self, val): + if not val: + self.assertEqual(self.got, 1) + elif 'foobar' in val: + self.got = 1 + + def testAddContact(self): + self.assertEqual(len(self.a.table.buckets), 1) + self.assertEqual(len(self.a.table.buckets[0].l), 0) + + self.assertEqual(len(self.b.table.buckets), 1) + self.assertEqual(len(self.b.table.buckets[0].l), 0) + + self.addContacts() + + self.assertEqual(len(self.a.table.buckets), 1) + self.assertEqual(len(self.a.table.buckets[0].l), 1) + self.assertEqual(len(self.b.table.buckets), 1) + self.assertEqual(len(self.b.table.buckets[0].l), 1) + + + + + + + + diff --git a/khashmir/test_krpc.py b/khashmir/test_krpc.py new file mode 100644 index 0000000..2ba4343 --- /dev/null +++ b/khashmir/test_krpc.py @@ -0,0 +1,161 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from unittest import * +from krpc import * +from BitTorrent.defaultargs import common_options, rare_options +from threading import Event +from node import Node + +KRPC.noisy = 0 + +import sys + +if __name__ =="__main__": + tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) + result = TextTestRunner().run(tests) + + +def connectionForAddr(host, port): + return host + + +class Receiver(object): + protocol = KRPC + def __init__(self, addr): + self.buf = [] + self.node = Node().init('0'*20, addr[0], addr[1]) + def krpc_store(self, msg, _krpc_sender): + self.buf += [msg] + def krpc_echo(self, msg, _krpc_sender): + return msg + +class KRPCTests(TestCase): + def setUp(self): + self.noisy = 0 + d = dict([(x[0],x[1]) for x in common_options + rare_options]) + self.r = RawServer(Event(), d) + + addr = ('127.0.0.1', 1180) + self.as = self.r.create_udpsocket(addr[1], addr[0], True) + self.af = Receiver(addr) + self.a = hostbroker(self.af, addr, self.as, self.r.add_task) + self.r.start_listening_udp(self.as, self.a) + + addr = ('127.0.0.1', 1181) + self.bs = self.r.create_udpsocket(addr[1], addr[0], True) + self.bf = Receiver(addr) + self.b = hostbroker(self.bf, addr, self.bs, self.r.add_task) + self.r.start_listening_udp(self.bs, self.b) + + def tearDown(self): + self.as.close() + self.bs.close() + + def testSimpleMessage(self): + self.noisy = 0 + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) + self.r.listen_once(0.01) + self.assertEqual(self.bf.buf, ["This is a test."]) + + def testMessageBlast(self): + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) + self.r.listen_once(0.01) + self.assertEqual(self.bf.buf, ["This is a test."]) + self.bf.buf = [] + + for i in range(100): + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) + self.r.listen_once(0.01) + #self.bf.buf = [] + self.assertEqual(self.bf.buf, ["This is a test."] * 100) + + def testEcho(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is a test.") + + def gotMsg(self, dict): + _krpc_sender = dict['_krpc_sender'] + msg = dict['rsp'] + self.msg = msg + + def testManyEcho(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is a test.") + for i in xrange(100): + self.msg = None + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is a test.") + + def testMultiEcho(self): + self.noisy = 0 + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is a test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is another test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is yet another test.") + + def testEchoReset(self): + self.noisy = 0 + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is a test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is another test.") + + del(self.a.connections[('127.0.0.1', 1181)]) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) + df.addCallback(self.gotMsg) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.msg, "This is yet another test.") + + def testLotsofEchoReset(self): + for i in range(100): + self.testEchoReset() + + def testUnknownMeth(self): + self.noisy = 0 + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."}) + df.addErrback(self.gotErr) + self.r.listen_once(0.01) + self.r.listen_once(0.01) + self.assertEqual(self.err, KRPC_ERROR_METHOD_UNKNOWN) + + def gotErr(self, err): + self.err = err + diff --git a/khashmir/test_kstore.py b/khashmir/test_kstore.py new file mode 100644 index 0000000..d2f2b15 --- /dev/null +++ b/khashmir/test_kstore.py @@ -0,0 +1,91 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +import unittest +from BitTorrent.platform import bttime +from time import sleep + +from kstore import KStore +if __name__ =="__main__": + tests = unittest.defaultTestLoader.loadTestsFromNames(['test_kstore']) + result = unittest.TextTestRunner().run(tests) + + +class BasicTests(unittest.TestCase): + def setUp(self): + self.k = KStore() + + def testNoKeys(self): + self.assertEqual(self.k.keys(), []) + + def testKey(self): + self.k['foo'] = 'bar' + self.assertEqual(self.k.keys(), ['foo']) + + def testKeys(self): + self.k['foo'] = 'bar' + self.k['wing'] = 'wang' + l = self.k.keys() + l.sort() + self.assertEqual(l, ['foo', 'wing']) + + def testInsert(self): + self.k['foo'] = 'bar' + self.assertEqual(self.k['foo'], ['bar']) + + def testInsertTwo(self): + self.k['foo'] = 'bar' + self.k['foo'] = 'bing' + l = self.k['foo'] + l.sort() + self.assertEqual(l, ['bar', 'bing']) + + def testExpire(self): + self.k['foo'] = 'bar' + self.k.expire(bttime() - 1) + l = self.k['foo'] + l.sort() + self.assertEqual(l, ['bar']) + self.k['foo'] = 'bing' + t = bttime() + self.k.expire(bttime() - 1) + l = self.k['foo'] + l.sort() + self.assertEqual(l, ['bar', 'bing']) + self.k['foo'] = 'ding' + self.k['foo'] = 'dang' + l = self.k['foo'] + l.sort() + self.assertEqual(l, ['bar', 'bing', 'dang', 'ding']) + self.k.expire(t) + l = self.k['foo'] + l.sort() + self.assertEqual(l, ['dang', 'ding']) + + def testDup(self): + self.k['foo'] = 'bar' + self.k['foo'] = 'bar' + self.assertEqual(self.k['foo'], ['bar']) + + def testSample(self): + for i in xrange(2): + self.k['foo'] = i + l = self.k.sample('foo', 5) + l.sort() + self.assertEqual(l, [0, 1]) + + for i in xrange(10): + for i in xrange(10): + self.k['bar'] = i + l = self.k.sample('bar', 5) + self.assertEqual(len(l), 5) + for i in xrange(len(l)): + self.assert_(l[i] not in l[i+1:]) + diff --git a/khashmir/unet.py b/khashmir/unet.py new file mode 100644 index 0000000..e163a3f --- /dev/null +++ b/khashmir/unet.py @@ -0,0 +1,84 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +# +# knet.py +# create a network of khashmir nodes +# usage: knet.py <num_nodes> <start_port> <ip_address> + +from utkhashmir import UTKhashmir +from BitTorrent.RawServer_magic import RawServer +from BitTorrent.defaultargs import common_options, rare_options +from random import randrange +from threading import Event +import sys, os + +from krpc import KRPC +KRPC.noisy = 1 + +class Network: + def __init__(self, size=0, startport=5555, localip='127.0.0.1'): + self.num = size + self.startport = startport + self.localip = localip + + def _done(self, val): + self.done = 1 + + def simpleSetUp(self): + #self.kfiles() + d = dict([(x[0],x[1]) for x in common_options + rare_options]) + self.r = RawServer(Event(), d) + self.l = [] + for i in range(self.num): + self.l.append(UTKhashmir('', self.startport + i, 'kh%s.db' % (self.startport + i), self.r)) + + for i in self.l: + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + self.r.listen_once(1) + self.r.listen_once(1) + self.r.listen_once(1) + + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + self.r.listen_once(1) + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + self.r.listen_once(1) + + def tearDown(self): + for i in self.l: + i.rawserver.stop_listening_udp(i.socket) + i.socket.close() + #self.kfiles() + + def kfiles(self): + for i in range(self.startport, self.startport+self.num): + try: + os.unlink('kh%s.db' % i) + except: + pass + + self.r.listen_once(1) + +if __name__ == "__main__": + n = Network(int(sys.argv[1]), int(sys.argv[2])) + n.simpleSetUp() + print ">>> network ready" + try: + n.r.listen_forever() + finally: + n.tearDown() diff --git a/khashmir/util.py b/khashmir/util.py new file mode 100644 index 0000000..3bc456e --- /dev/null +++ b/khashmir/util.py @@ -0,0 +1,69 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +from struct import pack, unpack + +def bucket_stats(l): + """given a list of khashmir instances, finds min, max, and average number of nodes in tables""" + max = avg = 0 + min = None + def count(buckets): + c = 0 + for bucket in buckets: + c = c + len(bucket.l) + return c + for node in l: + c = count(node.table.buckets) + if min == None: + min = c + elif c < min: + min = c + if c > max: + max = c + avg = avg + c + avg = avg / len(l) + return {'min':min, 'max':max, 'avg':avg} + +def compact_peer_info(ip, port): + return pack('!BBBBH', *([int(i) for i in ip.split('.')] + [port])) + +def packPeers(peers): + return map(lambda a: compact_peer_info(a[0], a[1]), peers) + +def reducePeers(peers): + return reduce(lambda a, b: a + b, peers, '') + +def unpackPeers(p): + peers = [] + if type(p) == type(''): + for x in xrange(0, len(p), 6): + ip = '.'.join([str(ord(i)) for i in p[x:x+4]]) + port = unpack('!H', p[x+4:x+6])[0] + peers.append((ip, port, None)) + else: + for x in p: + peers.append((x['ip'], x['port'], x.get('peer id'))) + return peers + + +def compact_node_info(id, ip, port): + return id + compact_peer_info(ip, port) + +def packNodes(nodes): + return ''.join([compact_node_info(x['id'], x['host'], x['port']) for x in nodes]) + +def unpackNodes(n): + nodes = [] + for x in xrange(0, len(n), 26): + id = n[x:x+20] + ip = '.'.join([str(ord(i)) for i in n[x+20:x+24]]) + port = unpack('!H', n[x+24:x+26])[0] + nodes.append({'id':id, 'host':ip, 'port': port}) + return nodes diff --git a/khashmir/utkhashmir.py b/khashmir/utkhashmir.py new file mode 100644 index 0000000..66c7441 --- /dev/null +++ b/khashmir/utkhashmir.py @@ -0,0 +1,224 @@ +# The contents of this file are subject to the BitTorrent Open Source License +# Version 1.1 (the License). You may not copy or use this file, in either +# source code or executable form, except in compliance with the License. You +# may obtain a copy of the License at http://www.bittorrent.com/license/. +# +# Software distributed under the License is distributed on an AS IS basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +# for the specific language governing rights and limitations under the +# License. + +import khashmir, knode +from actions import * +from khash import newID +from krpc import KRPCProtocolError, KRPCFailSilently +from cache import Cache +from sha import sha +from util import * +from threading import Thread +from socket import gethostbyname +from const import * +from kstore import sample + +TOKEN_UPDATE_INTERVAL = 5 * 60 # five minutes +NUM_PEERS = 50 # number of peers to return + +class UTNode(knode.KNodeBase): + def announcePeer(self, info_hash, port, khashmir_id): + assert type(port) == type(1) + assert type(info_hash) == type('') + assert type(khashmir_id) == type('') + assert len(info_hash) == 20 + assert len(khashmir_id) == 20 + + try: + token = self.table.tcache[self.id] + except: + token = None + if token: + assert type(token) == type("") + assert len(token) == 20 + df = self.conn().sendRequest('announce_peer', {'info_hash':info_hash, + 'port':port, + 'id':khashmir_id, + 'token':token}) + else: + raise KRPCProtocolError("no write token for node") + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + + def getPeers(self, info_hash, khashmir_id): + df = self.conn().sendRequest('get_peers', {'info_hash':info_hash, 'id':khashmir_id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + + def checkSender(self, dict): + d = knode.KNodeBase.checkSender(self, dict) + try: + self.table.tcache[d['rsp']['id']] = d['rsp']['token'] + except KeyError: + pass + return d + +class UTStoreValue(StoreValue): + def callNode(self, node, f): + return f(self.target, self.value, node.token, self.table.node.id) + +class UTKhashmir(khashmir.KhashmirBase): + _Node = UTNode + + def setup(self, host, port, data_dir, rlcount, checkpoint=True): + khashmir.KhashmirBase.setup(self, host, port,data_dir, rlcount, checkpoint) + self.cur_token = self.last_token = sha('') + self.tcache = Cache() + self.gen_token(loop=True) + self.expire_cached_tokens(loop=True) + + def expire_cached_tokens(self, loop=False): + self.tcache.expire(time() - TOKEN_UPDATE_INTERVAL) + if loop: + self.rawserver.external_add_task(self.expire_cached_tokens, TOKEN_UPDATE_INTERVAL, (True,)) + + def gen_token(self, loop=False): + self.last_token = self.cur_token + self.cur_token = sha(newID()) + if loop: + self.rawserver.external_add_task(self.gen_token, TOKEN_UPDATE_INTERVAL, (True,)) + + def get_token(self, host, port): + x = self.cur_token.copy() + x.update("%s%s" % (host, port)) + h = x.digest() + return h + + + def val_token(self, token, host, port): + x = self.cur_token.copy() + x.update("%s%s" % (host, port)) + a = x.digest() + if token == a: + return True + + x = self.last_token.copy() + x.update("%s%s" % (host, port)) + b = x.digest() + if token == b: + return True + + return False + + def addContact(self, host, port, callback=None): + # use dns on host, then call khashmir.addContact + Thread(target=self._get_host, args=[host, port, callback]).start() + + def _get_host(self, host, port, callback): + + # this exception catch can go away once we actually fix the bug + try: + ip = gethostbyname(host) + except TypeError, e: + raise TypeError(str(e) + (": host(%s) port(%s)" % (repr(host), repr(port)))) + + self.rawserver.external_add_task(self._got_host, 0, (ip, port, callback)) + + def _got_host(self, host, port, callback): + khashmir.KhashmirBase.addContact(self, host, port, callback) + + def announcePeer(self, info_hash, port, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=info_hash, value=port, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + action = UTStoreValue(self, key, value, response, self.rawserver.add_task, "announcePeer") + self.rawserver.external_add_task(action.goWithNodes, 0, (nodes,)) + + # this call is asynch + self.findNode(info_hash, _storeValueForKey) + + def krpc_announce_peer(self, info_hash, port, id, token, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + if not self.val_token(token, sender['host'], sender['port']): + raise KRPCProtocolError("Invalid Write Token") + value = compact_peer_info(_krpc_sender[0], port) + self.store[info_hash] = value + n = self.Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + + def retrieveValues(self, key): + try: + l = self.store.sample(key, NUM_PEERS) + except KeyError: + l = [] + return l + + def getPeers(self, info_hash, callback, searchlocal = 1): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(info_hash, invalid=True) + l = [x for x in nodes if x.invalid] + if len(l) > 4: + nodes = sample(l , 4) + self.table.findNodes(info_hash, invalid=False)[:4] + + # get locals + if searchlocal: + l = self.retrieveValues(info_hash) + if len(l) > 0: + self.rawserver.external_add_task(callback, 0, ([reducePeers(l)],)) + else: + l = [] + # create our search state + state = GetValue(self, info_hash, callback, self.rawserver.add_task, 'getPeers') + self.rawserver.external_add_task(state.goWithNodes, 0, (nodes, l)) + + def getPeersAndAnnounce(self, info_hash, port, callback, searchlocal = 1): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(info_hash, invalid=False) + nodes += self.table.findNodes(info_hash, invalid=True) + + # get locals + if searchlocal: + l = self.retrieveValues(info_hash) + if len(l) > 0: + self.rawserver.external_add_task(callback, 0, ([reducePeers(l)],)) + else: + l = [] + # create our search state + x = lambda a: a + state = GetAndStore(self, info_hash, port, callback, x, self.rawserver.add_task, 'getPeers', "announcePeer") + self.rawserver.external_add_task(state.goWithNodes, 0, (nodes, l)) + + def krpc_get_peers(self, info_hash, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + self.insertNode(n, contacted=0) + + l = self.retrieveValues(info_hash) + if len(l) > 0: + return {'values' : [reducePeers(l)], + "id": self.node.id, + "token" : self.get_token(sender['host'], sender['port'])} + else: + nodes = self.table.findNodes(info_hash, invalid=False) + nodes = [node.senderDict() for node in nodes] + return {'nodes' : packNodes(nodes), + "id": self.node.id, + "token" : self.get_token(sender['host'], sender['port'])} + |