summaryrefslogtreecommitdiffstats
path: root/khashmir
diff options
context:
space:
mode:
Diffstat (limited to 'khashmir')
-rw-r--r--khashmir/KRateLimiter.py75
-rw-r--r--khashmir/__init__.py0
-rw-r--r--khashmir/actions.py349
-rw-r--r--khashmir/cache.py52
-rw-r--r--khashmir/const.py74
-rw-r--r--khashmir/hammerlock.py37
-rw-r--r--khashmir/inserter.py49
-rw-r--r--khashmir/khash.py120
-rw-r--r--khashmir/khashmir.py442
-rw-r--r--khashmir/knet.py76
-rw-r--r--khashmir/knode.py82
-rw-r--r--khashmir/krpc.py240
-rw-r--r--khashmir/kstore.py119
-rw-r--r--khashmir/ktable.py338
-rw-r--r--khashmir/node.py95
-rw-r--r--khashmir/setup.py70
-rw-r--r--khashmir/test.py21
-rw-r--r--khashmir/test_khashmir.py166
-rw-r--r--khashmir/test_krpc.py161
-rw-r--r--khashmir/test_kstore.py91
-rw-r--r--khashmir/unet.py84
-rw-r--r--khashmir/util.py69
-rw-r--r--khashmir/utkhashmir.py224
23 files changed, 3034 insertions, 0 deletions
diff --git a/khashmir/KRateLimiter.py b/khashmir/KRateLimiter.py
new file mode 100644
index 0000000..a9a9cdc
--- /dev/null
+++ b/khashmir/KRateLimiter.py
@@ -0,0 +1,75 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from BitTorrent.platform import bttime as time
+from BitTorrent.CurrentRateMeasure import Measure
+from const import *
+from random import randrange, shuffle
+from traceback import print_exc
+
+class KRateLimiter:
+ # special rate limiter that drops entries that have been sitting in the queue for longer than self.age seconds
+ # by default we toss anything that has less than 5 seconds to live
+ def __init__(self, transport, rate, call_later, rlcount, rate_period, age=(KRPC_TIMEOUT - 5)):
+ self.q = []
+ self.transport = transport
+ self.rate = rate
+ self.curr = 0
+ self.running = False
+ self.age = age
+ self.last = 0
+ self.call_later = call_later
+ self.rlcount = rlcount
+ self.measure = Measure(rate_period)
+ self.sent=self.dropped=0
+ if self.rate == 0:
+ self.rate = 1e10
+
+ def sendto(self, s, i, addr):
+ self.q.append((time(), (s, i, addr)))
+ if not self.running:
+ self.run(check=True)
+
+ def run(self, check=False):
+ t = time()
+ self.expire(t)
+ self.curr -= (t - self.last) * self.rate
+ self.last = t
+ if check:
+ self.curr = max(self.curr, 0 - self.rate)
+
+ shuffle(self.q)
+ while self.q and self.curr <= 0:
+ x, tup = self.q.pop()
+ size = len(tup[0])
+ self.curr += size
+ try:
+ self.transport.sendto(*tup)
+ self.sent+=1
+ self.rlcount(size)
+ self.measure.update_rate(size)
+ except:
+ if tup[2][1] != 0:
+ print ">>> sendto exception", tup
+ print_exc()
+ self.q.sort()
+ if self.q or self.curr > 0:
+ self.running = True
+ # sleep for at least a half second
+ self.call_later(self.run, max(self.curr / self.rate, 0.5))
+ else:
+ self.running = False
+
+ def expire(self, t=time()):
+ if self.q:
+ expire_time = t - self.age
+ while self.q and self.q[0][0] < expire_time:
+ self.q.pop(0)
+ self.dropped+=1
diff --git a/khashmir/__init__.py b/khashmir/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/khashmir/__init__.py
diff --git a/khashmir/actions.py b/khashmir/actions.py
new file mode 100644
index 0000000..2bbe94b
--- /dev/null
+++ b/khashmir/actions.py
@@ -0,0 +1,349 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from BitTorrent.platform import bttime as time
+
+import const
+
+from khash import intify
+from ktable import KTable, K
+from util import unpackNodes
+from krpc import KRPCProtocolError, KRPCSelfNodeError
+from bisect import insort
+
+class NodeWrap(object):
+ def __init__(self, node, target):
+ self.num = target
+ self.node = node
+
+ def __cmp__(self, o):
+ """ this function is for sorting nodes relative to the ID we are looking for """
+ x, y = self.num ^ o.num, self.num ^ self.node.num
+ if x > y:
+ return 1
+ elif x < y:
+ return -1
+ return 0
+
+class ActionBase(object):
+ """ base class for some long running asynchronous proccesses like finding nodes or values """
+ def __init__(self, table, target, callback, callLater):
+ self.table = table
+ self.target = target
+ self.callLater = callLater
+ self.num = intify(target)
+ self.found = {}
+ self.foundq = []
+ self.queried = {}
+ self.queriedip = {}
+ self.answered = {}
+ self.callback = callback
+ self.outstanding = 0
+ self.finished = 0
+
+ def sort(self, a, b):
+ """ this function is for sorting nodes relative to the ID we are looking for """
+ x, y = self.num ^ a.num, self.num ^ b.num
+ if x > y:
+ return 1
+ elif x < y:
+ return -1
+ return 0
+
+ def shouldQuery(self, node):
+ if node.id == self.table.node.id:
+ return False
+ elif (node.host, node.port) not in self.queriedip and node.id not in self.queried:
+ self.queriedip[(node.host, node.port)] = 1
+ self.queried[node.id] = 1
+ return True
+ return False
+
+ def _cleanup(self):
+ self.foundq = None
+ self.found = None
+ self.queried = None
+ self.queriedip = None
+
+ def goWithNodes(self, t):
+ pass
+
+
+
+FIND_NODE_TIMEOUT = 15
+
+class FindNode(ActionBase):
+ """ find node action merits it's own class as it is a long running stateful process """
+ def handleGotNodes(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
+ sender = {'id' : dict["id"]}
+ sender['port'] = _krpc_sender[1]
+ sender['host'] = _krpc_sender[0]
+ sender = self.table.Node().initWithDict(sender)
+ try:
+ l = unpackNodes(dict.get("nodes", []))
+ if not self.answered.has_key(sender.id):
+ self.answered[sender.id] = sender
+ except:
+ l = []
+ self.table.invalidateNode(sender)
+
+ if self.finished:
+ # a day late and a dollar short
+ return
+ self.outstanding = self.outstanding - 1
+ for node in l:
+ n = self.table.Node().initWithDict(node)
+ if not self.found.has_key(n.id):
+ self.found[n.id] = n
+ insort(self.foundq, NodeWrap(n, self.num))
+ self.table.insertNode(n, contacted=0)
+ self.schedule()
+
+ def schedule(self):
+ """
+ send messages to new peers, if necessary
+ """
+ if self.finished:
+ return
+ l = [wrapper.node for wrapper in self.foundq[:K]]
+ for node in l:
+ if node.id == self.target:
+ self.finished=1
+ return self.callback([node])
+ if self.shouldQuery(node):
+ #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
+ try:
+ df = node.findNode(self.target, self.table.node.id)
+ except KRPCSelfNodeError:
+ pass
+ else:
+ df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+ assert(self.outstanding) >=0
+ if self.outstanding == 0:
+ ## all done!!
+ self.finished=1
+ self._cleanup()
+ self.callLater(self.callback, 0, (l[:K],))
+
+ def makeMsgFailed(self, node):
+ return self._defaultGotNodes
+
+ def _defaultGotNodes(self, err):
+ self.outstanding = self.outstanding - 1
+ self.schedule()
+
+ def goWithNodes(self, nodes):
+ """
+ this starts the process, our argument is a transaction with t.extras being our list of nodes
+ it's a transaction since we got called from the dispatcher
+ """
+ for node in nodes:
+ if node.id == self.table.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+ insort(self.foundq, NodeWrap(node, self.num))
+ self.schedule()
+
+
+get_value_timeout = 15
+class GetValue(FindNode):
+ def __init__(self, table, target, callback, callLater, find="findValue"):
+ FindNode.__init__(self, table, target, callback, callLater)
+ self.findValue = find
+
+ """ get value task """
+ def handleGotNodes(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
+ sender = {'id' : dict["id"]}
+ sender['port'] = _krpc_sender[1]
+ sender['host'] = _krpc_sender[0]
+ sender = self.table.Node().initWithDict(sender)
+
+ if self.finished or self.answered.has_key(sender.id):
+ # a day late and a dollar short
+ return
+ self.outstanding = self.outstanding - 1
+
+ self.answered[sender.id] = sender
+ # go through nodes
+ # if we have any closer than what we already got, query them
+ if dict.has_key('nodes'):
+ try:
+ l = unpackNodes(dict.get('nodes',[]))
+ except:
+ l = []
+ del(self.answered[sender.id])
+
+ for node in l:
+ n = self.table.Node().initWithDict(node)
+ if not self.found.has_key(n.id):
+ self.table.insertNode(n)
+ self.found[n.id] = n
+ insort(self.foundq, NodeWrap(n, self.num))
+ elif dict.has_key('values'):
+ def x(y, z=self.results):
+ if not z.has_key(y):
+ z[y] = 1
+ return y
+ else:
+ return None
+ z = len(dict.get('values', []))
+ v = filter(None, map(x, dict.get('values',[])))
+ if(len(v)):
+ self.callLater(self.callback, 0, (v,))
+ self.schedule()
+
+ ## get value
+ def schedule(self):
+ if self.finished:
+ return
+ for node in [wrapper.node for wrapper in self.foundq[:K]]:
+ if self.shouldQuery(node):
+ #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+ try:
+ f = getattr(node, self.findValue)
+ except AttributeError:
+ print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
+ else:
+ try:
+ df = f(self.target, self.table.node.id)
+ df.addCallback(self.handleGotNodes)
+ df.addErrback(self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
+ except KRPCSelfNodeError:
+ pass
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+ assert(self.outstanding) >=0
+ if self.outstanding == 0:
+ ## all done, didn't find it!!
+ self.finished=1
+ self._cleanup()
+ self.callLater(self.callback,0, ([],))
+
+ ## get value
+ def goWithNodes(self, nodes, found=None):
+ self.results = {}
+ if found:
+ for n in found:
+ self.results[n] = 1
+ for node in nodes:
+ if node.id == self.table.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+ insort(self.foundq, NodeWrap(node, self.num))
+ self.schedule()
+
+
+class StoreValue(ActionBase):
+ def __init__(self, table, target, value, callback, callLater, store="storeValue"):
+ ActionBase.__init__(self, table, target, callback, callLater)
+ self.value = value
+ self.stored = []
+ self.store = store
+
+ def storedValue(self, t, node):
+ self.outstanding -= 1
+ if self.finished:
+ return
+ self.stored.append(t)
+ if len(self.stored) >= const.STORE_REDUNDANCY:
+ self.finished=1
+ self.callback(self.stored)
+ else:
+ if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
+ self.schedule()
+ return t
+
+ def storeFailed(self, t, node):
+ self.outstanding -= 1
+ if self.finished:
+ return t
+ self.schedule()
+ return t
+
+ def schedule(self):
+ if self.finished:
+ return
+ num = const.CONCURRENT_REQS - self.outstanding
+ if num > const.STORE_REDUNDANCY - len(self.stored):
+ num = const.STORE_REDUNDANCY - len(self.stored)
+ if num == 0 and not self.finished:
+ self.finished=1
+ self.callback(self.stored)
+ while num > 0:
+ try:
+ node = self.nodes.pop()
+ except IndexError:
+ if self.outstanding == 0:
+ self.finished = 1
+ self._cleanup()
+ self.callback(self.stored)
+ return
+ else:
+ if not node.id == self.table.node.id:
+ try:
+ f = getattr(node, self.store)
+ except AttributeError:
+ print ">>> %s doesn't have a %s method!" % (node, self.store)
+ else:
+ try:
+ df = f(self.target, self.value, self.table.node.id)
+ except KRPCProtocolError:
+ self.table.table.invalidateNode(node)
+ except KRPCSelfNodeError:
+ pass
+ else:
+ df.addCallback(self.storedValue,(),{'node':node})
+ df.addErrback(self.storeFailed, (), {'node':node})
+ self.outstanding += 1
+ num -= 1
+
+ def goWithNodes(self, nodes):
+ self.nodes = nodes
+ self.nodes.sort(self.sort)
+ self.schedule()
+
+
+class GetAndStore(GetValue):
+ def __init__(self, table, target, value, callback, storecallback, callLater, find="findValue", store="storeValue"):
+ self.store = store
+ self.value = value
+ self.cb2 = callback
+ self.storecallback = storecallback
+ def cb(res):
+ self.cb2(res)
+ if not(res):
+ n = StoreValue(self.table, self.target, self.value, self.doneStored, self.callLater, self.store)
+ n.goWithNodes(self.answered.values())
+ GetValue.__init__(self, table, target, cb, callLater, find)
+
+ def doneStored(self, dict):
+ self.storecallback(dict)
+
+class KeyExpirer:
+ def __init__(self, store, callLater):
+ self.store = store
+ self.callLater = callLater
+ self.callLater(self.doExpire, const.KEINITIAL_DELAY)
+
+ def doExpire(self):
+ self.cut = time() - const.KE_AGE
+ self.store.expire(self.cut)
+ self.callLater(self.doExpire, const.KE_DELAY)
diff --git a/khashmir/cache.py b/khashmir/cache.py
new file mode 100644
index 0000000..ab4df30
--- /dev/null
+++ b/khashmir/cache.py
@@ -0,0 +1,52 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from BitTorrent.platform import bttime as time
+
+class Cache:
+ def __init__(self, touch_on_access = False):
+ self.data = {}
+ self.q = []
+ self.touch = touch_on_access
+
+ def __getitem__(self, key):
+ if self.touch:
+ v = self.data[key][1]
+ self[key] = v
+ return self.data[key][1]
+
+ def __setitem__(self, key, value):
+ t = time()
+ self.data[key] = (t, value)
+ self.q.insert(0, (t, key, value))
+
+ def __delitem__(self, key):
+ del(self.data[key])
+
+ def has_key(self, key):
+ return self.data.has_key(key)
+
+ def keys(self):
+ return self.data.keys()
+
+ def expire(self, expire_time):
+ try:
+ while self.q[-1][0] < expire_time:
+ x = self.q.pop()
+ assert(x[0] < expire_time)
+ try:
+ t, v = self.data[x[1]]
+ if v == x[2] and t == x[0]:
+ del(self.data[x[1]])
+ except KeyError:
+ pass
+ except IndexError:
+ pass
+
diff --git a/khashmir/const.py b/khashmir/const.py
new file mode 100644
index 0000000..eabe0e8
--- /dev/null
+++ b/khashmir/const.py
@@ -0,0 +1,74 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+# magic id to use before we know a peer's id
+NULL_ID = 20 * '\0'
+
+# Kademlia "K" constant, this should be an even number
+K = 8
+
+# SHA1 is 160 bits long
+HASH_LENGTH = 160
+
+# checkpoint every this many seconds
+CHECKPOINT_INTERVAL = 60 * 5 # five minutes
+
+# how often to find our own nodes
+FIND_CLOSE_INTERVAL = 60 * 15 # fifteen minutes
+
+### SEARCHING/STORING
+# concurrent krpc calls per find node/value request!
+CONCURRENT_REQS = K
+
+# how many hosts to post to
+STORE_REDUNDANCY = 3
+
+
+### ROUTING TABLE STUFF
+# how many times in a row a node can fail to respond before it's booted from the routing table
+MAX_FAILURES = 3
+
+# never ping a node more often than this
+MIN_PING_INTERVAL = 60 * 15 # fifteen minutes
+
+# refresh buckets that haven't been touched in this long
+BUCKET_STALENESS = 60 * 15 # fifteen minutes
+
+
+### KEY EXPIRER
+# time before expirer starts running
+KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db
+
+# time between expirer runs
+KE_DELAY = 60 * 5 # 5 minutes
+
+# expire entries older than this
+KE_AGE = 60 * 30 # 30 minutes
+
+
+## krpc errback codes
+KRPC_TIMEOUT = 20
+
+KRPC_ERROR = 1
+KRPC_ERROR_METHOD_UNKNOWN = 2
+KRPC_ERROR_RECEIVED_UNKNOWN = 3
+KRPC_ERROR_TIMEOUT = 4
+KRPC_SOCKET_ERROR = 5
+
+KRPC_CONNECTION_CACHE_TIME = KRPC_TIMEOUT * 2
+
+
+## krpc erorr response codes
+KERR_ERROR = (201, "Generic Error")
+KERR_SERVER_ERROR = (202, "Server Error")
+KERR_PROTOCOL_ERROR = (203, "Protocol Error")
+KERR_METHOD_UNKNOWN = (204, "Method Unknown")
+KERR_INVALID_ARGS = (205, "Invalid Argements")
+KERR_INVALID_TOKEN = (206, "Invalid Token")
diff --git a/khashmir/hammerlock.py b/khashmir/hammerlock.py
new file mode 100644
index 0000000..dd4b5a1
--- /dev/null
+++ b/khashmir/hammerlock.py
@@ -0,0 +1,37 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+INTERVAL = 60
+PERIODS = 5
+
+class Hammerlock:
+ def __init__(self, rate, call_later):
+ self.rate = rate
+ self.call_later = call_later
+ self.curr = 0
+ self.buckets = [{} for x in range(PERIODS)]
+ self.call_later(self._cycle, INTERVAL)
+
+ def _cycle(self):
+ self.curr = (self.curr + 1) % PERIODS
+ self.buckets[self.curr] = {}
+ self.call_later(self._cycle, INTERVAL)
+
+ def check(self, addr):
+ x = self.buckets[self.curr].get(addr, 0) + 1
+ self.buckets[self.curr][addr] = x
+ x = 0
+ for bucket in self.buckets:
+ x += bucket.get(addr, 0)
+ if x >= self.rate:
+ return False
+ else:
+ return True
+
diff --git a/khashmir/inserter.py b/khashmir/inserter.py
new file mode 100644
index 0000000..d45ff04
--- /dev/null
+++ b/khashmir/inserter.py
@@ -0,0 +1,49 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+### generate a bunch of nodes that use a single contact point
+usage = "usage: inserter.py <contact host> <contact port>"
+
+from utkhashmir import UTKhashmir
+from BitTorrent.RawServer_magic import RawServer
+from BitTorrent.defaultargs import common_options, rare_options
+from khashmir.khash import newID
+from random import randrange
+from threading import Event
+import sys, os
+
+from khashmir.krpc import KRPC
+KRPC.noisy = 1
+global done
+done = 0
+def d(n):
+ global done
+ done = done+1
+
+if __name__=="__main__":
+ global done
+ host, port = sys.argv[1:]
+ x = UTKhashmir("", 22038, "/tmp/cgcgcgc")
+ x.addContact(host, int(port))
+ x.rawserver.listen_once()
+ x.findCloseNodes(d)
+ while not done:
+ x.rawserver.listen_once()
+ l = []
+ for i in range(10):
+ k = newID()
+ v = randrange(10000,20000)
+ l.append((k, v))
+ x.announcePeer(k, v, d)
+ done = 1
+ while done < 10:
+ x.rawserver.listen_once(1)
+ for k,v in l:
+ print ">>>", `k`, v
diff --git a/khashmir/khash.py b/khashmir/khash.py
new file mode 100644
index 0000000..2750a7d
--- /dev/null
+++ b/khashmir/khash.py
@@ -0,0 +1,120 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from sha import sha
+from random import randint
+
+#this is ugly, hopefully os.entropy will be in 2.4
+try:
+ from entropy import entropy
+except ImportError:
+ def entropy(n):
+ s = ''
+ for i in range(n):
+ s += chr(randint(0,255))
+ return s
+
+def intify(hstr):
+ """20 bit hash, big-endian -> long python integer"""
+ assert len(hstr) == 20
+ return long(hstr.encode('hex'), 16)
+
+def stringify(num):
+ """long int -> 20-character string"""
+ str = hex(num)[2:]
+ if str[-1] == 'L':
+ str = str[:-1]
+ if len(str) % 2 != 0:
+ str = '0' + str
+ str = str.decode('hex')
+ return (20 - len(str)) *'\x00' + str
+
+def distance(a, b):
+ """distance between two 160-bit hashes expressed as 20-character strings"""
+ return intify(a) ^ intify(b)
+
+
+def newID():
+ """returns a new pseudorandom globally unique ID string"""
+ h = sha()
+ h.update(entropy(20))
+ return h.digest()
+
+def newIDInRange(min, max):
+ return stringify(randRange(min,max))
+
+def randRange(min, max):
+ return min + intify(newID()) % (max - min)
+
+def newTID():
+ return randRange(-2**30, 2**30)
+
+### Test Cases ###
+import unittest
+
+class NewID(unittest.TestCase):
+ def testLength(self):
+ self.assertEqual(len(newID()), 20)
+ def testHundreds(self):
+ for x in xrange(100):
+ self.testLength
+
+class Intify(unittest.TestCase):
+ known = [('\0' * 20, 0),
+ ('\xff' * 20, 2L**160 - 1),
+ ]
+ def testKnown(self):
+ for str, value in self.known:
+ self.assertEqual(intify(str), value)
+ def testEndianessOnce(self):
+ h = newID()
+ while h[-1] == '\xff':
+ h = newID()
+ k = h[:-1] + chr(ord(h[-1]) + 1)
+ self.assertEqual(intify(k) - intify(h), 1)
+ def testEndianessLots(self):
+ for x in xrange(100):
+ self.testEndianessOnce()
+
+class Disantance(unittest.TestCase):
+ known = [
+ (("\0" * 20, "\xff" * 20), 2**160L -1),
+ ((sha("foo").digest(), sha("foo").digest()), 0),
+ ((sha("bar").digest(), sha("bar").digest()), 0)
+ ]
+ def testKnown(self):
+ for pair, dist in self.known:
+ self.assertEqual(distance(pair[0], pair[1]), dist)
+ def testCommutitive(self):
+ for i in xrange(100):
+ x, y, z = newID(), newID(), newID()
+ self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z))
+
+class RandRange(unittest.TestCase):
+ def testOnce(self):
+ a = intify(newID())
+ b = intify(newID())
+ if a < b:
+ c = randRange(a, b)
+ self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a))
+ else:
+ c = randRange(b, a)
+ assert b <= c < a, "output out of range %d %d %d" % (b, c, a)
+
+ def testOneHundredTimes(self):
+ for i in xrange(100):
+ self.testOnce()
+
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+
diff --git a/khashmir/khashmir.py b/khashmir/khashmir.py
new file mode 100644
index 0000000..d939f67
--- /dev/null
+++ b/khashmir/khashmir.py
@@ -0,0 +1,442 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+import const
+from socket import gethostbyname
+
+from BitTorrent.platform import bttime as time
+
+from sha import sha
+import re
+from BitTorrent.defaultargs import common_options, rare_options
+from BitTorrent.RawServer_magic import RawServer
+
+from ktable import KTable, K
+from knode import *
+from kstore import KStore
+from khash import newID, newIDInRange
+
+from util import packNodes
+from actions import FindNode, GetValue, KeyExpirer, StoreValue
+import krpc
+
+import sys
+import os
+import traceback
+
+from BitTorrent.bencode import bencode, bdecode
+
+from BitTorrent.defer import Deferred
+from random import randrange
+from kstore import sample
+
+from threading import Event, Thread
+
+ip_pat = re.compile('[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}')
+
+class KhashmirDBExcept(Exception):
+ pass
+
+def foo(bytes):
+ pass
+
+# this is the base class, has base functionality and find node, no key-value mappings
+class KhashmirBase:
+ _Node = KNodeBase
+ def __init__(self, host, port, data_dir, rawserver=None, max_ul_rate=1024, checkpoint=True, errfunc=None, rlcount=foo, config={'pause':False, 'max_rate_period':20}):
+ if rawserver:
+ self.rawserver = rawserver
+ else:
+ self.flag = Event()
+ d = dict([(x[0],x[1]) for x in common_options + rare_options])
+ self.rawserver = RawServer(self.flag, d)
+ self.max_ul_rate = max_ul_rate
+ self.socket = None
+ self.config = config
+ self.setup(host, port, data_dir, rlcount, checkpoint)
+
+ def setup(self, host, port, data_dir, rlcount, checkpoint=True):
+ self.host = host
+ self.port = port
+ self.ddir = data_dir
+ self.store = KStore()
+ self.pingcache = {}
+ self.socket = self.rawserver.create_udpsocket(self.port, self.host, False)
+ self.udp = krpc.hostbroker(self, (self.host, self.port), self.socket, self.rawserver.add_task, self.max_ul_rate, self.config, rlcount)
+ self._load()
+ self.rawserver.start_listening_udp(self.socket, self.udp)
+ self.last = time()
+ KeyExpirer(self.store, self.rawserver.add_task)
+ self.refreshTable(force=1)
+ if checkpoint:
+ self.rawserver.add_task(self.findCloseNodes, 30, (lambda a: a, True))
+ self.rawserver.add_task(self.checkpoint, 60, (1,))
+
+ def Node(self):
+ n = self._Node(self.udp.connectionForAddr)
+ n.table = self
+ return n
+
+ def __del__(self):
+ if self.socket is not None:
+ self.rawserver.stop_listening_udp(self.socket)
+ self.socket.close()
+
+ def _load(self):
+ do_load = False
+ try:
+ s = open(os.path.join(self.ddir, "routing_table"), 'r').read()
+ dict = bdecode(s)
+ except:
+ id = newID()
+ else:
+ id = dict['id']
+ do_load = True
+
+ self.node = self._Node(self.udp.connectionForAddr).init(id, self.host, self.port)
+ self.table = KTable(self.node)
+ if do_load:
+ self._loadRoutingTable(dict['rt'])
+
+
+ def checkpoint(self, auto=0):
+ d = {}
+ d['id'] = self.node.id
+ d['rt'] = self._dumpRoutingTable()
+ try:
+ f = open(os.path.join(self.ddir, "routing_table"), 'wb')
+ f.write(bencode(d))
+ f.close()
+ except Exception, e:
+ #XXX real error here
+ print ">>> unable to dump routing table!", str(e)
+ pass
+
+
+ if auto:
+ self.rawserver.add_task(self.checkpoint,
+ randrange(int(const.CHECKPOINT_INTERVAL * .9),
+ int(const.CHECKPOINT_INTERVAL * 1.1)),
+ (1,))
+
+ def _loadRoutingTable(self, nodes):
+ """
+ load routing table nodes from database
+ it's usually a good idea to call refreshTable(force=1) after loading the table
+ """
+ for rec in nodes:
+ n = self.Node().initWithDict(rec)
+ self.table.insertNode(n, contacted=0, nocheck=True)
+
+ def _dumpRoutingTable(self):
+ """
+ save routing table nodes to the database
+ """
+ l = []
+ for bucket in self.table.buckets:
+ for node in bucket.l:
+ l.append({'id':node.id, 'host':node.host, 'port':node.port, 'age':int(node.age)})
+ return l
+
+
+ def _addContact(self, host, port, callback=None):
+ """
+ ping this node and add the contact info to the table on pong!
+ """
+ n =self.Node().init(const.NULL_ID, host, port)
+ try:
+ self.sendPing(n, callback=callback)
+ except krpc.KRPCSelfNodeError:
+ # our own node
+ pass
+
+
+ #######
+ ####### LOCAL INTERFACE - use these methods!
+ def addContact(self, ip, port, callback=None):
+ """
+ ping this node and add the contact info to the table on pong!
+ """
+ if ip_pat.match(ip):
+ self._addContact(ip, port)
+ else:
+ def go(ip=ip, port=port):
+ ip = gethostbyname(ip)
+ self.rawserver.external_add_task(self._addContact, 0, (ip, port))
+ t = Thread(target=go)
+ t.start()
+
+
+ ## this call is async!
+ def findNode(self, id, callback, errback=None):
+ """ returns the contact info for node, or the k closest nodes, from the global table """
+ # get K nodes out of local table/cache, or the node we want
+ nodes = self.table.findNodes(id, invalid=True)
+ l = [x for x in nodes if x.invalid]
+ if len(l) > 4:
+ nodes = sample(l , 4) + self.table.findNodes(id, invalid=False)[:4]
+
+ d = Deferred()
+ if errback:
+ d.addCallbacks(callback, errback)
+ else:
+ d.addCallback(callback)
+ if len(nodes) == 1 and nodes[0].id == id :
+ d.callback(nodes)
+ else:
+ # create our search state
+ state = FindNode(self, id, d.callback, self.rawserver.add_task)
+ self.rawserver.external_add_task(state.goWithNodes, 0, (nodes,))
+
+ def insertNode(self, n, contacted=1):
+ """
+ insert a node in our local table, pinging oldest contact in bucket, if necessary
+
+ If all you have is a host/port, then use addContact, which calls this method after
+ receiving the PONG from the remote node. The reason for the seperation is we can't insert
+ a node into the table without it's peer-ID. That means of course the node passed into this
+ method needs to be a properly formed Node object with a valid ID.
+ """
+ old = self.table.insertNode(n, contacted=contacted)
+ if old and old != n:
+ if not old.inPing():
+ self.checkOldNode(old, n, contacted)
+ else:
+ l = self.pingcache.get(old.id, [])
+ if len(l) < 10 or contacted:
+ l.append((n, contacted))
+ self.pingcache[old.id] = l
+
+
+
+ def checkOldNode(self, old, new, contacted=False):
+ ## these are the callbacks used when we ping the oldest node in a bucket
+
+ def cmp(a, b):
+ if a[1] == 1 and b[1] == 0:
+ return -1
+ elif b[1] == 1 and a[1] == 0:
+ return 1
+ else:
+ return 0
+
+ def _staleNodeHandler(dict, old=old, new=new, contacted=contacted):
+ """ called if the pinged node never responds """
+ if old.fails >= 2:
+ l = self.pingcache.get(old.id, [])
+ l.sort(cmp)
+ if l:
+ n, nc = l[0]
+ if (not contacted) and nc:
+ l = l[1:] + [(new, contacted)]
+ new = n
+ contacted = nc
+ o = self.table.replaceStaleNode(old, new)
+ if o and o != new:
+ self.checkOldNode(o, new)
+ try:
+ self.pingcache[o.id] = self.pingcache[old.id]
+ del(self.pingcache[old.id])
+ except KeyError:
+ pass
+ else:
+ if l:
+ del(self.pingcache[old.id])
+ l.sort(cmp)
+ for node in l:
+ self.insertNode(node[0], node[1])
+ else:
+ l = self.pingcache.get(old.id, [])
+ if l:
+ del(self.pingcache[old.id])
+ self.insertNode(new, contacted)
+ for node in l:
+ self.insertNode(node[0], node[1])
+
+ def _notStaleNodeHandler(dict, old=old, new=new, contacted=contacted):
+ """ called when we get a pong from the old node """
+ self.table.insertNode(old, True)
+ self.insertNode(new, contacted)
+ l = self.pingcache.get(old.id, [])
+ l.sort(cmp)
+ for node in l:
+ self.insertNode(node[0], node[1])
+ try:
+ del(self.pingcache[old.id])
+ except KeyError:
+ pass
+ try:
+ df = old.ping(self.node.id)
+ except krpc.KRPCSelfNodeError:
+ pass
+ df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+
+ def sendPing(self, node, callback=None):
+ """
+ ping a node
+ """
+ try:
+ df = node.ping(self.node.id)
+ except krpc.KRPCSelfNodeError:
+ pass
+ else:
+ ## these are the callbacks we use when we issue a PING
+ def _pongHandler(dict, node=node, table=self.table, callback=callback):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
+ sender = {'id' : dict['id']}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ table.insertNode(n)
+ if callback:
+ callback()
+ def _defaultPong(err, node=node, table=self.table, callback=callback):
+ if callback:
+ callback()
+
+ df.addCallbacks(_pongHandler,_defaultPong)
+
+ def findCloseNodes(self, callback=lambda a: a, auto=False):
+ """
+ This does a findNode on the ID one away from our own.
+ This will allow us to populate our table with nodes on our network closest to our own.
+ This is called as soon as we start up with an empty table
+ """
+ if not self.config['pause']:
+ id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
+ self.findNode(id, callback)
+ if auto:
+ if not self.config['pause']:
+ self.refreshTable()
+ self.rawserver.external_add_task(self.findCloseNodes, randrange(int(const.FIND_CLOSE_INTERVAL *0.9),
+ int(const.FIND_CLOSE_INTERVAL *1.1)), (lambda a: True, True))
+
+ def refreshTable(self, force=0):
+ """
+ force=1 will refresh table regardless of last bucket access time
+ """
+ def callback(nodes):
+ pass
+
+ refresh = [bucket for bucket in self.table.buckets if force or (len(bucket.l) < K) or len(filter(lambda a: a.invalid, bucket.l)) or (time() - bucket.lastAccessed > const.BUCKET_STALENESS)]
+ for bucket in refresh:
+ id = newIDInRange(bucket.min, bucket.max)
+ self.findNode(id, callback)
+
+ def stats(self):
+ """
+ Returns (num_contacts, num_nodes)
+ num_contacts: number contacts in our routing table
+ num_nodes: number of nodes estimated in the entire dht
+ """
+ num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
+ num_nodes = const.K * (2**(len(self.table.buckets) - 1))
+ return {'num_contacts':num_contacts, 'num_nodes':num_nodes}
+
+ def krpc_ping(self, id, _krpc_sender):
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+ return {"id" : self.node.id}
+
+ def krpc_find_node(self, target, id, _krpc_sender):
+ nodes = self.table.findNodes(target, invalid=False)
+ nodes = map(lambda node: node.senderDict(), nodes)
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+ return {"nodes" : packNodes(nodes), "id" : self.node.id}
+
+
+## This class provides read-only access to the DHT, valueForKey
+## you probably want to use this mixin and provide your own write methods
+class KhashmirRead(KhashmirBase):
+ _Node = KNodeRead
+ def retrieveValues(self, key):
+ try:
+ l = self.store[key]
+ except KeyError:
+ l = []
+ return l
+ ## also async
+ def valueForKey(self, key, callback, searchlocal = 1):
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
+ nodes = self.table.findNodes(key)
+
+ # get locals
+ if searchlocal:
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ self.rawserver.external_add_task(callback, 0, (l,))
+ else:
+ l = []
+
+ # create our search state
+ state = GetValue(self, key, callback, self.rawserver.add_task)
+ self.rawserver.external_add_task(state.goWithNodes, 0, (nodes, l))
+
+ def krpc_find_value(self, key, id, _krpc_sender):
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ return {'values' : l, "id": self.node.id}
+ else:
+ nodes = self.table.findNodes(key, invalid=False)
+ nodes = map(lambda node: node.senderDict(), nodes)
+ return {'nodes' : packNodes(nodes), "id": self.node.id}
+
+### provides a generic write method, you probably don't want to deploy something that allows
+### arbitrary value storage
+class KhashmirWrite(KhashmirRead):
+ _Node = KNodeWrite
+ ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+ def storeValueForKey(self, key, value, callback=None):
+ """ stores the value for key in the global table, returns immediately, no status
+ in this implementation, peers respond but don't indicate status to storing values
+ a key can have many values
+ """
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ if not response:
+ # default callback
+ def _storedValueHandler(sender):
+ pass
+ response=_storedValueHandler
+ action = StoreValue(self, key, value, response, self.rawserver.add_task)
+ self.rawserver.external_add_task(action.goWithNodes, 0, (nodes,))
+
+ # this call is asynch
+ self.findNode(key, _storeValueForKey)
+
+ def krpc_store_value(self, key, value, id, _krpc_sender):
+ t = "%0.6f" % time()
+ self.store[key] = value
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+ return {"id" : self.node.id}
+
+# the whole shebang, for testing
+class Khashmir(KhashmirWrite):
+ _Node = KNodeWrite
diff --git a/khashmir/knet.py b/khashmir/knet.py
new file mode 100644
index 0000000..3baf1e9
--- /dev/null
+++ b/khashmir/knet.py
@@ -0,0 +1,76 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+#
+# knet.py
+# create a network of khashmir nodes
+# usage: knet.py <num_nodes> <start_port> <ip_address>
+
+from khashmir import Khashmir
+from random import randrange
+import sys, os
+
+class Network:
+ def __init__(self, size=0, startport=5555, localip='127.0.0.1'):
+ self.num = size
+ self.startport = startport
+ self.localip = localip
+
+ def _done(self, val):
+ self.done = 1
+
+ def setUp(self):
+ self.kfiles()
+ self.l = []
+ for i in range(self.num):
+ self.l.append(Khashmir('', self.startport + i, '/tmp/kh%s.db' % (self.startport + i)))
+ reactor.iterate()
+ reactor.iterate()
+
+ for i in self.l:
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ reactor.iterate()
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ reactor.iterate()
+
+ def tearDown(self):
+ for i in self.l:
+ i.listenport.stopListening()
+ self.kfiles()
+
+ def kfiles(self):
+ for i in range(self.startport, self.startport+self.num):
+ try:
+ os.unlink('/tmp/kh%s.db' % i)
+ except:
+ pass
+
+ reactor.iterate()
+
+if __name__ == "__main__":
+ n = Network(int(sys.argv[1]), int(sys.argv[2]), sys.argv[3])
+ n.setUp()
+ try:
+ reactor.run()
+ finally:
+ n.tearDown()
diff --git a/khashmir/knode.py b/khashmir/knode.py
new file mode 100644
index 0000000..560252a
--- /dev/null
+++ b/khashmir/knode.py
@@ -0,0 +1,82 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from node import Node
+from BitTorrent.defer import Deferred
+from const import NULL_ID
+from krpc import KRPCProtocolError
+
+class IDChecker:
+ def __init__(self, id):
+ self.id = id
+
+class KNodeBase(Node):
+ def __init__(self, cfa):
+ Node.__init__(self)
+ self.cfa = cfa
+
+ def conn(self):
+ return self.cfa((self.host, self.port))
+
+ def checkSender(self, dict):
+ try:
+ senderid = dict['rsp']['id']
+ except KeyError:
+ raise KRPCProtocolError, "No peer id in response."
+ else:
+ if self.id != NULL_ID and senderid != self.id:
+ self.table.table.invalidateNode(self)
+ else:
+ if self.id == NULL_ID:
+ self.id = senderid
+ self.table.insertNode(self, contacted=1)
+ return dict
+
+ def errBack(self, err):
+ self.table.table.nodeFailed(self)
+ return err[0]
+
+ def ping(self, id):
+ df = self.conn().sendRequest('ping', {"id":id})
+ self.conn().pinging = True
+ def endping(x):
+ self.conn().pinging = False
+ return x
+ df.addCallbacks(endping, endping)
+ df.addCallbacks(self.checkSender, self.errBack)
+ return df
+
+ def findNode(self, target, id):
+ df = self.conn().sendRequest('find_node', {"target" : target, "id": id})
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
+
+ def inPing(self):
+ return self.conn().pinging
+
+class KNodeRead(KNodeBase):
+ def findValue(self, key, id):
+ df = self.conn().sendRequest('find_value', {"key" : key, "id" : id})
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
+
+class KNodeWrite(KNodeRead):
+ def storeValue(self, key, value, id):
+ df = self.conn().sendRequest('store_value', {"key" : key, "value" : value, "id": id})
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
+ def storeValues(self, key, value, id):
+ df = self.conn().sendRequest('store_values', {"key" : key, "values" : value, "id": id})
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
diff --git a/khashmir/krpc.py b/khashmir/krpc.py
new file mode 100644
index 0000000..86c9973
--- /dev/null
+++ b/khashmir/krpc.py
@@ -0,0 +1,240 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from BitTorrent.defer import Deferred
+from BitTorrent.bencode import bencode, bdecode
+import socket
+from BitTorrent.RawServer_magic import Handler
+from BitTorrent.platform import bttime
+import time
+from math import log10
+
+import sys
+from traceback import print_exc
+
+from khash import distance
+from cache import Cache
+from KRateLimiter import KRateLimiter
+from hammerlock import Hammerlock
+
+from const import *
+
+# commands
+TID = 't'
+REQ = 'q'
+RSP = 'r'
+TYP = 'y'
+ARG = 'a'
+ERR = 'e'
+
+class KRPCFailSilently(Exception):
+ pass
+
+class KRPCProtocolError(Exception):
+ pass
+
+class KRPCServerError(Exception):
+ pass
+
+class KRPCSelfNodeError(Exception):
+ pass
+
+class hostbroker(Handler):
+ def __init__(self, server, addr, transport, call_later, max_ul_rate, config, rlcount):
+ self.server = server
+ self.addr = addr
+ self.transport = transport
+ self.rltransport = KRateLimiter(transport, max_ul_rate, call_later, rlcount, config['max_rate_period'])
+ self.call_later = call_later
+ self.connections = Cache(touch_on_access=True)
+ self.hammerlock = Hammerlock(100, call_later)
+ self.expire_connections(loop=True)
+ self.config = config
+ if not self.config.has_key('pause'):
+ self.config['pause'] = False
+
+ def expire_connections(self, loop=False):
+ self.connections.expire(bttime() - KRPC_CONNECTION_CACHE_TIME)
+ if loop:
+ self.call_later(self.expire_connections, KRPC_CONNECTION_CACHE_TIME, (True,))
+
+ def data_came_in(self, addr, datagram):
+ #if addr != self.addr:
+ if not self.config['pause'] and self.hammerlock.check(addr):
+ c = self.connectionForAddr(addr)
+ c.datagramReceived(datagram, addr)
+
+ def connection_lost(self, socket):
+ ## this is like, bad
+ print ">>> connection lost!", socket
+
+ def connectionForAddr(self, addr):
+ if addr == self.addr:
+ raise KRPCSelfNodeError()
+ if not self.connections.has_key(addr):
+ conn = KRPC(addr, self.server, self.transport, self.rltransport, self.call_later)
+ self.connections[addr] = conn
+ else:
+ conn = self.connections[addr]
+ return conn
+
+
+## connection
+class KRPC:
+ noisy = 0
+ def __init__(self, addr, server, transport, rltransport, call_later):
+ self.call_later = call_later
+ self.transport = transport
+ self.rltransport = rltransport
+ self.factory = server
+ self.addr = addr
+ self.tids = {}
+ self.mtid = 0
+ self.pinging = False
+
+ def sendErr(self, addr, tid, code, msg):
+ ## send error
+ out = bencode({TID:tid, TYP:ERR, ERR :(code, msg)})
+ olen = len(out)
+ self.rltransport.sendto(out, 0, addr)
+ return olen
+
+ def datagramReceived(self, str, addr):
+ # bdecode
+ try:
+ msg = bdecode(str)
+ except Exception, e:
+ if self.noisy:
+ print "response decode error: " + `e`, `str`
+ else:
+ #if self.noisy:
+ # print msg
+ # look at msg type
+ if msg[TYP] == REQ:
+ ilen = len(str)
+ # if request
+ # tell factory to handle
+ f = getattr(self.factory ,"krpc_" + msg[REQ], None)
+ msg[ARG]['_krpc_sender'] = self.addr
+ if f and callable(f):
+ try:
+ ret = apply(f, (), msg[ARG])
+ except KRPCFailSilently:
+ pass
+ except KRPCServerError, e:
+ olen = self.sendErr(addr, msg[TID], 202, "Server Error: %s" % e.args[0])
+ except KRPCProtocolError, e:
+ olen = self.sendErr(addr, msg[TID], 204, "Protocol Error: %s" % e.args[0])
+ except Exception, e:
+ print_exc(20)
+ olen = self.sendErr(addr, msg[TID], 202, "Server Error")
+ else:
+ if ret:
+ # make response
+ out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
+ else:
+ out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
+ # send response
+ olen = len(out)
+ self.rltransport.sendto(out, 0, addr)
+
+ else:
+ if self.noisy:
+ #print "don't know about method %s" % msg[REQ]
+ pass
+ # unknown method
+ olen = self.sendErr(addr, msg[TID], *KERR_METHOD_UNKNOWN)
+ if self.noisy:
+ try:
+ ndist = 10 * log10(2**160 * 1.0 / distance(self.factory.node.id, msg[ARG]['id']))
+ ndist = int(ndist)
+ except OverflowError:
+ ndist = 999
+
+ h = None
+ if msg[ARG].has_key('target'):
+ h = msg[ARG]['target']
+ elif msg[ARG].has_key('info_hash'):
+ h = msg[ARG]['info_hash']
+ else:
+ tdist = '-'
+
+ if h != None:
+ try:
+ tdist = 10 * log10(2**160 * 1.0 / distance(self.factory.node.id, h))
+ tdist = int(tdist)
+ except OverflowError:
+ tdist = 999
+
+ t = time.localtime()
+ t = "%2d-%2d-%2d %2d:%2d:%2d" % (t[0], t[1], t[2], t[3], t[4], t[5])
+ print "%s %s %s >>> %s - %s %s %s - %s %s" % (t,
+ msg[ARG]['id'].encode('base64')[:4],
+ addr,
+ self.factory.node.port,
+ ilen,
+ msg[REQ],
+ olen,
+ ndist,
+ tdist)
+ elif msg[TYP] == RSP:
+ # if response
+ # lookup tid
+ if self.tids.has_key(msg[TID]):
+ df = self.tids[msg[TID]]
+ # callback
+ del(self.tids[msg[TID]])
+ df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
+ else:
+ # no tid, this transaction timed out already...
+ pass
+
+ elif msg[TYP] == ERR:
+ # if error
+ # lookup tid
+ if self.tids.has_key(msg[TID]):
+ df = self.tids[msg[TID]]
+ # callback
+ df.errback(msg[ERR])
+ del(self.tids[msg[TID]])
+ else:
+ # day late and dollar short
+ pass
+ else:
+ # unknown message type
+ df = self.tids[msg[TID]]
+ # callback
+ df.errback((KRPC_ERROR_RECEIVED_UNKNOWN, _("received unknown message type")))
+ del(self.tids[msg[TID]])
+
+ def sendRequest(self, method, args):
+ # make message
+ # send it
+ msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
+ self.mtid = (self.mtid + 1) % 256
+ s = bencode(msg)
+ d = Deferred()
+ self.tids[msg[TID]] = d
+ self.call_later(self.timeOut, KRPC_TIMEOUT, (msg[TID],))
+ self.call_later(self._send, 0, (s, d))
+ return d
+
+ def timeOut(self, id):
+ if self.tids.has_key(id):
+ df = self.tids[id]
+ del(self.tids[id])
+ df.errback((KRPC_ERROR_TIMEOUT, _("timeout")))
+
+ def _send(self, s, d):
+ try:
+ self.transport.sendto(s, 0, self.addr)
+ except socket.error:
+ d.errback((KRPC_SOCKET_ERROR, _("socket error")))
+
diff --git a/khashmir/kstore.py b/khashmir/kstore.py
new file mode 100644
index 0000000..5c01200
--- /dev/null
+++ b/khashmir/kstore.py
@@ -0,0 +1,119 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+try:
+ from random import sample
+except ImportError:
+ from random import choice
+ def sample(l, n):
+ if len(l) <= n:
+ return l
+ d = {}
+ while len(d) < n:
+ d[choice(l)] = 1
+ return d.keys()
+
+from BitTorrent.platform import bttime as time
+
+class KItem:
+ def __init__(self, key, value):
+ self.t = time()
+ self.k = key
+ self.v = value
+ def __cmp__(self, a):
+ # same value = same item, only used to keep dupes out of db
+ if self.v == a.v:
+ return 0
+
+ # compare by time
+ if self.t < a.t:
+ return -1
+ elif self.t > a.t:
+ return 1
+ else:
+ return 0
+
+ def __hash__(self):
+ return self.v.__hash__()
+
+ def __repr__(self):
+ return `(self.k, self.v, time() - self.t)`
+
+## in memory data store for distributed tracker
+## keeps a list of values per key in dictionary
+## keeps expiration for each key in a queue
+## can efficiently expire all values older than a given time
+## can insert one val at a time, or a list: ks['key'] = 'value' or ks['key'] = ['v1', 'v2', 'v3']
+class KStore:
+ def __init__(self):
+ self.d = {}
+ self.q = []
+
+ def __getitem__(self, key):
+ return [x.v for x in self.d[key]]
+
+ def __setitem__(self, key, value):
+ if type(value) == type([]):
+ [self.__setitem__(key, v) for v in value]
+ return
+ x = KItem(key, value)
+ try:
+ l = self.d[key]
+ except KeyError:
+ self.d[key] = [x]
+ else:
+ # this is slow
+ try:
+ i = l.index(x)
+ del(l[i])
+ except ValueError:
+ pass
+ l.insert(0, x)
+ self.q.append(x)
+
+ def __delitem__(self, key):
+ del(self.d[key])
+
+ def __len__(self):
+ return len(self.d)
+
+ def keys(self):
+ return self.d.keys()
+
+ def values(self):
+ return [self[key] for key in self.keys()]
+
+ def items(self):
+ return [(key, self[key]) for key in self.keys()]
+
+ def expire(self, t):
+ #.expire values inserted prior to t
+ try:
+ while self.q[0].t <= t:
+ x = self.q.pop(0)
+ try:
+ l = self.d[x.k]
+ try:
+ while l[-1].t <= t:
+ l.pop()
+ except IndexError:
+ del(self.d[x.k])
+ except KeyError:
+ pass
+ except IndexError:
+ pass
+
+ def sample(self, key, n):
+ # returns n random values of key, or all values if less than n
+ try:
+ l = [x.v for x in sample(self.d[key], n)]
+ except ValueError:
+ l = [x.v for x in self.d[key]]
+ return l
diff --git a/khashmir/ktable.py b/khashmir/ktable.py
new file mode 100644
index 0000000..e0a07b4
--- /dev/null
+++ b/khashmir/ktable.py
@@ -0,0 +1,338 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from BitTorrent.platform import bttime as time
+from bisect import *
+from types import *
+
+import khash as hash
+import const
+from const import K, HASH_LENGTH, NULL_ID, MAX_FAILURES, MIN_PING_INTERVAL
+from node import Node
+
+
+def ls(a, b):
+ return cmp(a.lastSeen, b.lastSeen)
+
+class KTable(object):
+ __slots__ = ('node', 'buckets')
+ """local routing table for a kademlia like distributed hash table"""
+ def __init__(self, node):
+ # this is the root node, a.k.a. US!
+ self.node = node
+ self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)]
+ self.insertNode(node)
+
+ def _bucketIndexForInt(self, num):
+ """the index of the bucket that should hold int"""
+ return bisect_left(self.buckets, num)
+
+ def bucketForInt(self, num):
+ return self.buckets[self._bucketIndexForInt(num)]
+
+ def findNodes(self, id, invalid=True):
+ """
+ return K nodes in our own local table closest to the ID.
+ """
+
+ if isinstance(id, str):
+ num = hash.intify(id)
+ elif isinstance(id, Node):
+ num = id.num
+ elif isinstance(id, int) or isinstance(id, long):
+ num = id
+ else:
+ raise TypeError, "findNodes requires an int, string, or Node"
+
+ nodes = []
+ i = self._bucketIndexForInt(num)
+
+ # if this node is already in our table then return it
+ try:
+ node = self.buckets[i].getNodeWithInt(num)
+ except ValueError:
+ pass
+ else:
+ return [node]
+
+ # don't have the node, get the K closest nodes
+ nodes = nodes + self.buckets[i].l
+ if not invalid:
+ nodes = [a for a in nodes if not a.invalid]
+ if len(nodes) < K:
+ # need more nodes
+ min = i - 1
+ max = i + 1
+ while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
+ #ASw: note that this requires K be even
+ if min >= 0:
+ nodes = nodes + self.buckets[min].l
+ if max < len(self.buckets):
+ nodes = nodes + self.buckets[max].l
+ min = min - 1
+ max = max + 1
+ if not invalid:
+ nodes = [a for a in nodes if not a.invalid]
+
+ nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
+ return nodes[:K]
+
+ def _splitBucket(self, a):
+ diff = (a.max - a.min) / 2
+ b = KBucket([], a.max - diff, a.max)
+ self.buckets.insert(self.buckets.index(a.min) + 1, b)
+ a.max = a.max - diff
+ # transfer nodes to new bucket
+ for anode in a.l[:]:
+ if anode.num >= a.max:
+ a.removeNode(anode)
+ b.addNode(anode)
+
+ def replaceStaleNode(self, stale, new):
+ """this is used by clients to replace a node returned by insertNode after
+ it fails to respond to a Pong message"""
+ i = self._bucketIndexForInt(stale.num)
+
+ if self.buckets[i].hasNode(stale):
+ self.buckets[i].removeNode(stale)
+ if new and self.buckets[i].hasNode(new):
+ self.buckets[i].seenNode(new)
+ elif new:
+ self.buckets[i].addNode(new)
+
+ return
+
+ def insertNode(self, node, contacted=1, nocheck=False):
+ """
+ this insert the node, returning None if successful, returns the oldest node in the bucket if it's full
+ the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!!
+ contacted means that yes, we contacted THEM and we know the node is reachable
+ """
+ if node.id == NULL_ID or node.id == self.node.id:
+ return
+
+ if contacted:
+ node.updateLastSeen()
+
+ # get the bucket for this node
+ i = self._bucketIndexForInt(node.num)
+ # check to see if node is in the bucket already
+ if self.buckets[i].hasNode(node):
+ it = self.buckets[i].l.index(node.num)
+ xnode = self.buckets[i].l[it]
+ if contacted:
+ node.age = xnode.age
+ self.buckets[i].seenNode(node)
+ elif xnode.lastSeen != 0 and xnode.port == node.port and xnode.host == node.host:
+ xnode.updateLastSeen()
+ return
+
+ # we don't have this node, check to see if the bucket is full
+ if not self.buckets[i].bucketFull():
+ # no, append this node and return
+ self.buckets[i].addNode(node)
+ return
+
+ # full bucket, check to see if any nodes are invalid
+ t = time()
+ invalid = [x for x in self.buckets[i].invalid.values() if x.invalid]
+ if len(invalid) and not nocheck:
+ invalid.sort(ls)
+ while invalid and not self.buckets[i].hasNode(invalid[0]):
+ del(self.buckets[i].invalid[invalid[0].num])
+ invalid = invalid[1:]
+ if invalid and (invalid[0].lastSeen == 0 and invalid[0].fails < MAX_FAILURES):
+ return invalid[0]
+ elif invalid:
+ self.replaceStaleNode(invalid[0], node)
+ return
+
+ stale = [n for n in self.buckets[i].l if (t - n.lastSeen) > MIN_PING_INTERVAL]
+ if len(stale) and not nocheck:
+ stale.sort(ls)
+ return stale[0]
+
+ # bucket is full and all nodes are valid, check to see if self.node is in the bucket
+ if not (self.buckets[i].min <= self.node < self.buckets[i].max):
+ return
+
+ # this bucket is full and contains our node, split the bucket
+ if len(self.buckets) >= HASH_LENGTH:
+ # our table is FULL, this is really unlikely
+ print "Hash Table is FULL! Increase K!"
+ return
+
+ self._splitBucket(self.buckets[i])
+
+ # now that the bucket is split and balanced, try to insert the node again
+ return self.insertNode(node, contacted)
+
+ def justSeenNode(self, id):
+ """call this any time you get a message from a node
+ it will update it in the table if it's there """
+ try:
+ n = self.findNodes(id)[0]
+ except IndexError:
+ return None
+ else:
+ tstamp = n.lastSeen
+ n.updateLastSeen()
+ bucket = self.bucketForInt(n.num)
+ bucket.seenNode(n)
+ return tstamp
+
+ def invalidateNode(self, n):
+ """
+ forget about node n - use when you know that node is invalid
+ """
+ n.invalid = True
+ bucket = self.bucketForInt(n.num)
+ bucket.invalidateNode(n)
+
+ def nodeFailed(self, node):
+ """ call this when a node fails to respond to a message, to invalidate that node """
+ try:
+ n = self.findNodes(node.num)[0]
+ except IndexError:
+ return None
+ else:
+ if n.msgFailed() >= const.MAX_FAILURES:
+ self.invalidateNode(n)
+
+ def numPeers(self):
+ """ estimated number of connectable nodes in global table """
+ return 8 * (2 ** (len(self.buckets) - 1))
+
+class KBucket(object):
+ __slots__ = ('min', 'max', 'lastAccessed', 'l', 'index', 'invalid')
+ def __init__(self, contents, min, max):
+ self.l = contents
+ self.index = {}
+ self.invalid = {}
+ self.min = min
+ self.max = max
+ self.lastAccessed = time()
+
+ def touch(self):
+ self.lastAccessed = time()
+
+ def lacmp(self, a, b):
+ if a.lastSeen > b.lastSeen:
+ return 1
+ elif b.lastSeen > a.lastSeen:
+ return -1
+ return 0
+
+ def sort(self):
+ self.l.sort(self.lacmp)
+
+ def getNodeWithInt(self, num):
+ try:
+ node = self.index[num]
+ except KeyError:
+ raise ValueError
+ return node
+
+ def addNode(self, node):
+ if len(self.l) >= K:
+ return
+ if self.index.has_key(node.num):
+ return
+ self.l.append(node)
+ self.index[node.num] = node
+ self.touch()
+
+ def removeNode(self, node):
+ assert self.index.has_key(node.num)
+ del(self.l[self.l.index(node.num)])
+ del(self.index[node.num])
+ try:
+ del(self.invalid[node.num])
+ except KeyError:
+ pass
+ self.touch()
+
+ def invalidateNode(self, node):
+ self.invalid[node.num] = node
+
+ def seenNode(self, node):
+ try:
+ del(self.invalid[node.num])
+ except KeyError:
+ pass
+ it = self.l.index(node.num)
+ del(self.l[it])
+ self.l.append(node)
+ self.index[node.num] = node
+
+ def hasNode(self, node):
+ return self.index.has_key(node.num)
+
+ def bucketFull(self):
+ return len(self.l) >= K
+
+ def __repr__(self):
+ return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
+
+ ## Comparators
+ # necessary for bisecting list of buckets with a hash expressed as an integer or a distance
+ # compares integer or node object with the bucket's range
+ def __lt__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.max <= a
+ def __le__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.min < a
+ def __gt__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.min > a
+ def __ge__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.max >= a
+ def __eq__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.min <= a and self.max > a
+ def __ne__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.min >= a or self.max < a
+
+
+### UNIT TESTS ###
+import unittest
+
+class TestKTable(unittest.TestCase):
+ def setUp(self):
+ self.a = Node().init(hash.newID(), 'localhost', 2002)
+ self.t = KTable(self.a)
+
+ def testAddNode(self):
+ self.b = Node().init(hash.newID(), 'localhost', 2003)
+ self.t.insertNode(self.b)
+ self.assertEqual(len(self.t.buckets[0].l), 1)
+ self.assertEqual(self.t.buckets[0].l[0], self.b)
+
+ def testRemove(self):
+ self.testAddNode()
+ self.t.invalidateNode(self.b)
+ self.assertEqual(len(self.t.buckets[0].l), 0)
+
+ def testFail(self):
+ self.testAddNode()
+ for i in range(const.MAX_FAILURES - 1):
+ self.t.nodeFailed(self.b)
+ self.assertEqual(len(self.t.buckets[0].l), 1)
+ self.assertEqual(self.t.buckets[0].l[0], self.b)
+
+ self.t.nodeFailed(self.b)
+ self.assertEqual(len(self.t.buckets[0].l), 0)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/khashmir/node.py b/khashmir/node.py
new file mode 100644
index 0000000..65a48ef
--- /dev/null
+++ b/khashmir/node.py
@@ -0,0 +1,95 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+import khash
+from BitTorrent.platform import bttime as time
+from types import *
+
+class Node:
+ """encapsulate contact info"""
+ def __init__(self):
+ self.fails = 0
+ self.lastSeen = 0
+ self.invalid = True
+ self.id = self.host = self.port = ''
+ self.age = time()
+
+ def init(self, id, host, port):
+ self.id = id
+ self.num = khash.intify(id)
+ self.host = host
+ self.port = port
+ self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host}
+ return self
+
+ def initWithDict(self, dict):
+ self._senderDict = dict
+ self.id = dict['id']
+ self.num = khash.intify(self.id)
+ self.port = dict['port']
+ self.host = dict['host']
+ self.age = dict.get('age', self.age)
+ return self
+
+ def updateLastSeen(self):
+ self.lastSeen = time()
+ self.fails = 0
+ self.invalid = False
+
+ def msgFailed(self):
+ self.fails = self.fails + 1
+ return self.fails
+
+ def senderDict(self):
+ return self._senderDict
+
+ def __hash__(self):
+ return self.id.__hash__()
+
+ def __repr__(self):
+ return ">node <%s> %s<" % (self.id.encode('base64')[:4], (self.host, self.port))
+
+ ## these comparators let us bisect/index a list full of nodes with either a node or an int/long
+ def __lt__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num < a
+ def __le__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num <= a
+ def __gt__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num > a
+ def __ge__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num >= a
+ def __eq__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num == a
+ def __ne__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num != a
+
+
+import unittest
+
+class TestNode(unittest.TestCase):
+ def setUp(self):
+ self.node = Node().init(khash.newID(), 'localhost', 2002)
+ def testUpdateLastSeen(self):
+ t = self.node.lastSeen
+ self.node.updateLastSeen()
+ assert t < self.node.lastSeen
+
diff --git a/khashmir/setup.py b/khashmir/setup.py
new file mode 100644
index 0000000..a9e680a
--- /dev/null
+++ b/khashmir/setup.py
@@ -0,0 +1,70 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+#!/usr/bin/env python
+
+import os
+import sys
+
+try:
+ import distutils.core
+ import distutils.command.build_ext
+except ImportError:
+ raise SystemExit, """\
+You don't have the python development modules installed.
+
+If you have Debian you can install it by running
+ apt-get install python-dev
+
+If you have RedHat and know how to install this from an RPM please
+email us so we can put instructions here.
+"""
+
+try:
+ import twisted
+except ImportError:
+ raise SystemExit, """\
+You don't have Twisted installed.
+
+Twisted can be downloaded from
+ http://twistedmatrix.com/products/download
+
+Anything later that version 1.0.3 should work
+"""
+
+try:
+ import sqlite
+except ImportError:
+ raise SystemExit, """\
+You don't have PySQLite installed.
+
+PySQLite can be downloaded from
+ http://sourceforge.net/project/showfiles.php?group_id=54058&release_id=139482
+"""
+
+setup_args = {
+ 'name': 'khashmir',
+ 'author': 'Andrew Loewenstern',
+ 'author_email': 'burris@users.sourceforge.net',
+ 'licence': 'MIT',
+ 'package_dir': {'khashmir': '.'},
+ 'packages': [
+ 'khashmir',
+ ],
+}
+
+if hasattr(distutils.dist.DistributionMetadata, 'get_keywords'):
+ setup_args['keywords'] = "internet tcp p2p"
+
+if hasattr(distutils.dist.DistributionMetadata, 'get_platforms'):
+ setup_args['platforms'] = "win32 posix"
+
+if __name__ == '__main__':
+ apply(distutils.core.setup, (), setup_args)
diff --git a/khashmir/test.py b/khashmir/test.py
new file mode 100644
index 0000000..a6c8a9c
--- /dev/null
+++ b/khashmir/test.py
@@ -0,0 +1,21 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+import unittest
+
+import ktable, khashmir
+import khash, node, knode
+import actions
+import test_krpc
+import test_khashmir
+import kstore
+
+tests = unittest.defaultTestLoader.loadTestsFromNames(['kstore', 'khash', 'node', 'knode', 'actions', 'ktable', 'test_krpc', 'test_khashmir'])
+result = unittest.TextTestRunner().run(tests)
diff --git a/khashmir/test_khashmir.py b/khashmir/test_khashmir.py
new file mode 100644
index 0000000..e22028e
--- /dev/null
+++ b/khashmir/test_khashmir.py
@@ -0,0 +1,166 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from unittest import *
+
+from BitTorrent import RawServer_magic
+
+from khashmir import *
+import khash
+from copy import copy
+
+from random import randrange
+from krpc import KRPC
+
+KRPC.noisy=0
+import os
+
+if __name__ =="__main__":
+ tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
+ result = TextTestRunner().run(tests)
+
+class MultiTest(TestCase):
+ num = 25
+ def _done(self, val):
+ self.done = 1
+
+ def setUp(self):
+ self.l = []
+ self.startport = 10088
+ d = dict([(x[0],x[1]) for x in common_options + rare_options])
+ self.r = RawServer(Event(), d)
+ for i in range(self.num):
+ self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i), self.r))
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+
+ for i in self.l:
+ try:
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ except:
+ pass
+ try:
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ except:
+ pass
+ try:
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ except:
+ pass
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ self.r.listen_once(1)
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ self.r.listen_once(1)
+
+ def tearDown(self):
+ for i in self.l:
+ self.r.stop_listening_udp(i.socket)
+ i.socket.close()
+
+ self.r.listen_once(1)
+
+ def testStoreRetrieve(self):
+ for i in range(10):
+ K = khash.newID()
+ V = khash.newID()
+
+ for a in range(3):
+ self.done = 0
+ def _scb(val):
+ self.done = 1
+ self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
+ while not self.done:
+ self.r.listen_once(1)
+
+
+ def _rcb(val):
+ if not val:
+ self.done = 1
+ self.assertEqual(self.got, 1)
+ elif V in val:
+ self.got = 1
+ for x in range(3):
+ self.got = 0
+ self.done = 0
+ self.l[randrange(0, self.num)].valueForKey(K, _rcb)
+ while not self.done:
+ self.r.listen_once(1)
+
+class AASimpleTests(TestCase):
+ def setUp(self):
+ d = dict([(x[0],x[1]) for x in common_options + rare_options])
+ self.r = RawServer(Event(), d)
+ self.a = Khashmir('127.0.0.1', 4044, '/tmp/a.test', self.r)
+ self.b = Khashmir('127.0.0.1', 4045, '/tmp/b.test', self.r)
+
+ def tearDown(self):
+ self.r.stop_listening_udp(self.a.socket)
+ self.r.stop_listening_udp(self.b.socket)
+ self.a.socket.close()
+ self.b.socket.close()
+
+ def addContacts(self):
+ self.a.addContact('127.0.0.1', 4045)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+
+ def testStoreRetrieve(self):
+ self.addContacts()
+ self.got = 0
+ self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.a.valueForKey(sha('foo').digest(), self._cb)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+
+ def _cb(self, val):
+ if not val:
+ self.assertEqual(self.got, 1)
+ elif 'foobar' in val:
+ self.got = 1
+
+ def testAddContact(self):
+ self.assertEqual(len(self.a.table.buckets), 1)
+ self.assertEqual(len(self.a.table.buckets[0].l), 0)
+
+ self.assertEqual(len(self.b.table.buckets), 1)
+ self.assertEqual(len(self.b.table.buckets[0].l), 0)
+
+ self.addContacts()
+
+ self.assertEqual(len(self.a.table.buckets), 1)
+ self.assertEqual(len(self.a.table.buckets[0].l), 1)
+ self.assertEqual(len(self.b.table.buckets), 1)
+ self.assertEqual(len(self.b.table.buckets[0].l), 1)
+
+
+
+
+
+
+
+
diff --git a/khashmir/test_krpc.py b/khashmir/test_krpc.py
new file mode 100644
index 0000000..2ba4343
--- /dev/null
+++ b/khashmir/test_krpc.py
@@ -0,0 +1,161 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from unittest import *
+from krpc import *
+from BitTorrent.defaultargs import common_options, rare_options
+from threading import Event
+from node import Node
+
+KRPC.noisy = 0
+
+import sys
+
+if __name__ =="__main__":
+ tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
+ result = TextTestRunner().run(tests)
+
+
+def connectionForAddr(host, port):
+ return host
+
+
+class Receiver(object):
+ protocol = KRPC
+ def __init__(self, addr):
+ self.buf = []
+ self.node = Node().init('0'*20, addr[0], addr[1])
+ def krpc_store(self, msg, _krpc_sender):
+ self.buf += [msg]
+ def krpc_echo(self, msg, _krpc_sender):
+ return msg
+
+class KRPCTests(TestCase):
+ def setUp(self):
+ self.noisy = 0
+ d = dict([(x[0],x[1]) for x in common_options + rare_options])
+ self.r = RawServer(Event(), d)
+
+ addr = ('127.0.0.1', 1180)
+ self.as = self.r.create_udpsocket(addr[1], addr[0], True)
+ self.af = Receiver(addr)
+ self.a = hostbroker(self.af, addr, self.as, self.r.add_task)
+ self.r.start_listening_udp(self.as, self.a)
+
+ addr = ('127.0.0.1', 1181)
+ self.bs = self.r.create_udpsocket(addr[1], addr[0], True)
+ self.bf = Receiver(addr)
+ self.b = hostbroker(self.bf, addr, self.bs, self.r.add_task)
+ self.r.start_listening_udp(self.bs, self.b)
+
+ def tearDown(self):
+ self.as.close()
+ self.bs.close()
+
+ def testSimpleMessage(self):
+ self.noisy = 0
+ self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
+ self.r.listen_once(0.01)
+ self.assertEqual(self.bf.buf, ["This is a test."])
+
+ def testMessageBlast(self):
+ self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
+ self.r.listen_once(0.01)
+ self.assertEqual(self.bf.buf, ["This is a test."])
+ self.bf.buf = []
+
+ for i in range(100):
+ self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
+ self.r.listen_once(0.01)
+ #self.bf.buf = []
+ self.assertEqual(self.bf.buf, ["This is a test."] * 100)
+
+ def testEcho(self):
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is a test.")
+
+ def gotMsg(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ msg = dict['rsp']
+ self.msg = msg
+
+ def testManyEcho(self):
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is a test.")
+ for i in xrange(100):
+ self.msg = None
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is a test.")
+
+ def testMultiEcho(self):
+ self.noisy = 0
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is a test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is another test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is yet another test.")
+
+ def testEchoReset(self):
+ self.noisy = 0
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is a test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is another test.")
+
+ del(self.a.connections[('127.0.0.1', 1181)])
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
+ df.addCallback(self.gotMsg)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.msg, "This is yet another test.")
+
+ def testLotsofEchoReset(self):
+ for i in range(100):
+ self.testEchoReset()
+
+ def testUnknownMeth(self):
+ self.noisy = 0
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
+ df.addErrback(self.gotErr)
+ self.r.listen_once(0.01)
+ self.r.listen_once(0.01)
+ self.assertEqual(self.err, KRPC_ERROR_METHOD_UNKNOWN)
+
+ def gotErr(self, err):
+ self.err = err
+
diff --git a/khashmir/test_kstore.py b/khashmir/test_kstore.py
new file mode 100644
index 0000000..d2f2b15
--- /dev/null
+++ b/khashmir/test_kstore.py
@@ -0,0 +1,91 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+import unittest
+from BitTorrent.platform import bttime
+from time import sleep
+
+from kstore import KStore
+if __name__ =="__main__":
+ tests = unittest.defaultTestLoader.loadTestsFromNames(['test_kstore'])
+ result = unittest.TextTestRunner().run(tests)
+
+
+class BasicTests(unittest.TestCase):
+ def setUp(self):
+ self.k = KStore()
+
+ def testNoKeys(self):
+ self.assertEqual(self.k.keys(), [])
+
+ def testKey(self):
+ self.k['foo'] = 'bar'
+ self.assertEqual(self.k.keys(), ['foo'])
+
+ def testKeys(self):
+ self.k['foo'] = 'bar'
+ self.k['wing'] = 'wang'
+ l = self.k.keys()
+ l.sort()
+ self.assertEqual(l, ['foo', 'wing'])
+
+ def testInsert(self):
+ self.k['foo'] = 'bar'
+ self.assertEqual(self.k['foo'], ['bar'])
+
+ def testInsertTwo(self):
+ self.k['foo'] = 'bar'
+ self.k['foo'] = 'bing'
+ l = self.k['foo']
+ l.sort()
+ self.assertEqual(l, ['bar', 'bing'])
+
+ def testExpire(self):
+ self.k['foo'] = 'bar'
+ self.k.expire(bttime() - 1)
+ l = self.k['foo']
+ l.sort()
+ self.assertEqual(l, ['bar'])
+ self.k['foo'] = 'bing'
+ t = bttime()
+ self.k.expire(bttime() - 1)
+ l = self.k['foo']
+ l.sort()
+ self.assertEqual(l, ['bar', 'bing'])
+ self.k['foo'] = 'ding'
+ self.k['foo'] = 'dang'
+ l = self.k['foo']
+ l.sort()
+ self.assertEqual(l, ['bar', 'bing', 'dang', 'ding'])
+ self.k.expire(t)
+ l = self.k['foo']
+ l.sort()
+ self.assertEqual(l, ['dang', 'ding'])
+
+ def testDup(self):
+ self.k['foo'] = 'bar'
+ self.k['foo'] = 'bar'
+ self.assertEqual(self.k['foo'], ['bar'])
+
+ def testSample(self):
+ for i in xrange(2):
+ self.k['foo'] = i
+ l = self.k.sample('foo', 5)
+ l.sort()
+ self.assertEqual(l, [0, 1])
+
+ for i in xrange(10):
+ for i in xrange(10):
+ self.k['bar'] = i
+ l = self.k.sample('bar', 5)
+ self.assertEqual(len(l), 5)
+ for i in xrange(len(l)):
+ self.assert_(l[i] not in l[i+1:])
+
diff --git a/khashmir/unet.py b/khashmir/unet.py
new file mode 100644
index 0000000..e163a3f
--- /dev/null
+++ b/khashmir/unet.py
@@ -0,0 +1,84 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+#
+# knet.py
+# create a network of khashmir nodes
+# usage: knet.py <num_nodes> <start_port> <ip_address>
+
+from utkhashmir import UTKhashmir
+from BitTorrent.RawServer_magic import RawServer
+from BitTorrent.defaultargs import common_options, rare_options
+from random import randrange
+from threading import Event
+import sys, os
+
+from krpc import KRPC
+KRPC.noisy = 1
+
+class Network:
+ def __init__(self, size=0, startport=5555, localip='127.0.0.1'):
+ self.num = size
+ self.startport = startport
+ self.localip = localip
+
+ def _done(self, val):
+ self.done = 1
+
+ def simpleSetUp(self):
+ #self.kfiles()
+ d = dict([(x[0],x[1]) for x in common_options + rare_options])
+ self.r = RawServer(Event(), d)
+ self.l = []
+ for i in range(self.num):
+ self.l.append(UTKhashmir('', self.startport + i, 'kh%s.db' % (self.startport + i), self.r))
+
+ for i in self.l:
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ i.addContact(self.localip, self.l[randrange(0,self.num)].port)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+ self.r.listen_once(1)
+
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ self.r.listen_once(1)
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ self.r.listen_once(1)
+
+ def tearDown(self):
+ for i in self.l:
+ i.rawserver.stop_listening_udp(i.socket)
+ i.socket.close()
+ #self.kfiles()
+
+ def kfiles(self):
+ for i in range(self.startport, self.startport+self.num):
+ try:
+ os.unlink('kh%s.db' % i)
+ except:
+ pass
+
+ self.r.listen_once(1)
+
+if __name__ == "__main__":
+ n = Network(int(sys.argv[1]), int(sys.argv[2]))
+ n.simpleSetUp()
+ print ">>> network ready"
+ try:
+ n.r.listen_forever()
+ finally:
+ n.tearDown()
diff --git a/khashmir/util.py b/khashmir/util.py
new file mode 100644
index 0000000..3bc456e
--- /dev/null
+++ b/khashmir/util.py
@@ -0,0 +1,69 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from struct import pack, unpack
+
+def bucket_stats(l):
+ """given a list of khashmir instances, finds min, max, and average number of nodes in tables"""
+ max = avg = 0
+ min = None
+ def count(buckets):
+ c = 0
+ for bucket in buckets:
+ c = c + len(bucket.l)
+ return c
+ for node in l:
+ c = count(node.table.buckets)
+ if min == None:
+ min = c
+ elif c < min:
+ min = c
+ if c > max:
+ max = c
+ avg = avg + c
+ avg = avg / len(l)
+ return {'min':min, 'max':max, 'avg':avg}
+
+def compact_peer_info(ip, port):
+ return pack('!BBBBH', *([int(i) for i in ip.split('.')] + [port]))
+
+def packPeers(peers):
+ return map(lambda a: compact_peer_info(a[0], a[1]), peers)
+
+def reducePeers(peers):
+ return reduce(lambda a, b: a + b, peers, '')
+
+def unpackPeers(p):
+ peers = []
+ if type(p) == type(''):
+ for x in xrange(0, len(p), 6):
+ ip = '.'.join([str(ord(i)) for i in p[x:x+4]])
+ port = unpack('!H', p[x+4:x+6])[0]
+ peers.append((ip, port, None))
+ else:
+ for x in p:
+ peers.append((x['ip'], x['port'], x.get('peer id')))
+ return peers
+
+
+def compact_node_info(id, ip, port):
+ return id + compact_peer_info(ip, port)
+
+def packNodes(nodes):
+ return ''.join([compact_node_info(x['id'], x['host'], x['port']) for x in nodes])
+
+def unpackNodes(n):
+ nodes = []
+ for x in xrange(0, len(n), 26):
+ id = n[x:x+20]
+ ip = '.'.join([str(ord(i)) for i in n[x+20:x+24]])
+ port = unpack('!H', n[x+24:x+26])[0]
+ nodes.append({'id':id, 'host':ip, 'port': port})
+ return nodes
diff --git a/khashmir/utkhashmir.py b/khashmir/utkhashmir.py
new file mode 100644
index 0000000..66c7441
--- /dev/null
+++ b/khashmir/utkhashmir.py
@@ -0,0 +1,224 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+import khashmir, knode
+from actions import *
+from khash import newID
+from krpc import KRPCProtocolError, KRPCFailSilently
+from cache import Cache
+from sha import sha
+from util import *
+from threading import Thread
+from socket import gethostbyname
+from const import *
+from kstore import sample
+
+TOKEN_UPDATE_INTERVAL = 5 * 60 # five minutes
+NUM_PEERS = 50 # number of peers to return
+
+class UTNode(knode.KNodeBase):
+ def announcePeer(self, info_hash, port, khashmir_id):
+ assert type(port) == type(1)
+ assert type(info_hash) == type('')
+ assert type(khashmir_id) == type('')
+ assert len(info_hash) == 20
+ assert len(khashmir_id) == 20
+
+ try:
+ token = self.table.tcache[self.id]
+ except:
+ token = None
+ if token:
+ assert type(token) == type("")
+ assert len(token) == 20
+ df = self.conn().sendRequest('announce_peer', {'info_hash':info_hash,
+ 'port':port,
+ 'id':khashmir_id,
+ 'token':token})
+ else:
+ raise KRPCProtocolError("no write token for node")
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
+
+ def getPeers(self, info_hash, khashmir_id):
+ df = self.conn().sendRequest('get_peers', {'info_hash':info_hash, 'id':khashmir_id})
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
+
+ def checkSender(self, dict):
+ d = knode.KNodeBase.checkSender(self, dict)
+ try:
+ self.table.tcache[d['rsp']['id']] = d['rsp']['token']
+ except KeyError:
+ pass
+ return d
+
+class UTStoreValue(StoreValue):
+ def callNode(self, node, f):
+ return f(self.target, self.value, node.token, self.table.node.id)
+
+class UTKhashmir(khashmir.KhashmirBase):
+ _Node = UTNode
+
+ def setup(self, host, port, data_dir, rlcount, checkpoint=True):
+ khashmir.KhashmirBase.setup(self, host, port,data_dir, rlcount, checkpoint)
+ self.cur_token = self.last_token = sha('')
+ self.tcache = Cache()
+ self.gen_token(loop=True)
+ self.expire_cached_tokens(loop=True)
+
+ def expire_cached_tokens(self, loop=False):
+ self.tcache.expire(time() - TOKEN_UPDATE_INTERVAL)
+ if loop:
+ self.rawserver.external_add_task(self.expire_cached_tokens, TOKEN_UPDATE_INTERVAL, (True,))
+
+ def gen_token(self, loop=False):
+ self.last_token = self.cur_token
+ self.cur_token = sha(newID())
+ if loop:
+ self.rawserver.external_add_task(self.gen_token, TOKEN_UPDATE_INTERVAL, (True,))
+
+ def get_token(self, host, port):
+ x = self.cur_token.copy()
+ x.update("%s%s" % (host, port))
+ h = x.digest()
+ return h
+
+
+ def val_token(self, token, host, port):
+ x = self.cur_token.copy()
+ x.update("%s%s" % (host, port))
+ a = x.digest()
+ if token == a:
+ return True
+
+ x = self.last_token.copy()
+ x.update("%s%s" % (host, port))
+ b = x.digest()
+ if token == b:
+ return True
+
+ return False
+
+ def addContact(self, host, port, callback=None):
+ # use dns on host, then call khashmir.addContact
+ Thread(target=self._get_host, args=[host, port, callback]).start()
+
+ def _get_host(self, host, port, callback):
+
+ # this exception catch can go away once we actually fix the bug
+ try:
+ ip = gethostbyname(host)
+ except TypeError, e:
+ raise TypeError(str(e) + (": host(%s) port(%s)" % (repr(host), repr(port))))
+
+ self.rawserver.external_add_task(self._got_host, 0, (ip, port, callback))
+
+ def _got_host(self, host, port, callback):
+ khashmir.KhashmirBase.addContact(self, host, port, callback)
+
+ def announcePeer(self, info_hash, port, callback=None):
+ """ stores the value for key in the global table, returns immediately, no status
+ in this implementation, peers respond but don't indicate status to storing values
+ a key can have many values
+ """
+ def _storeValueForKey(nodes, key=info_hash, value=port, response=callback , table=self.table):
+ if not response:
+ # default callback
+ def _storedValueHandler(sender):
+ pass
+ response=_storedValueHandler
+ action = UTStoreValue(self, key, value, response, self.rawserver.add_task, "announcePeer")
+ self.rawserver.external_add_task(action.goWithNodes, 0, (nodes,))
+
+ # this call is asynch
+ self.findNode(info_hash, _storeValueForKey)
+
+ def krpc_announce_peer(self, info_hash, port, id, token, _krpc_sender):
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ if not self.val_token(token, sender['host'], sender['port']):
+ raise KRPCProtocolError("Invalid Write Token")
+ value = compact_peer_info(_krpc_sender[0], port)
+ self.store[info_hash] = value
+ n = self.Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+ return {"id" : self.node.id}
+
+ def retrieveValues(self, key):
+ try:
+ l = self.store.sample(key, NUM_PEERS)
+ except KeyError:
+ l = []
+ return l
+
+ def getPeers(self, info_hash, callback, searchlocal = 1):
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
+ nodes = self.table.findNodes(info_hash, invalid=True)
+ l = [x for x in nodes if x.invalid]
+ if len(l) > 4:
+ nodes = sample(l , 4) + self.table.findNodes(info_hash, invalid=False)[:4]
+
+ # get locals
+ if searchlocal:
+ l = self.retrieveValues(info_hash)
+ if len(l) > 0:
+ self.rawserver.external_add_task(callback, 0, ([reducePeers(l)],))
+ else:
+ l = []
+ # create our search state
+ state = GetValue(self, info_hash, callback, self.rawserver.add_task, 'getPeers')
+ self.rawserver.external_add_task(state.goWithNodes, 0, (nodes, l))
+
+ def getPeersAndAnnounce(self, info_hash, port, callback, searchlocal = 1):
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
+ nodes = self.table.findNodes(info_hash, invalid=False)
+ nodes += self.table.findNodes(info_hash, invalid=True)
+
+ # get locals
+ if searchlocal:
+ l = self.retrieveValues(info_hash)
+ if len(l) > 0:
+ self.rawserver.external_add_task(callback, 0, ([reducePeers(l)],))
+ else:
+ l = []
+ # create our search state
+ x = lambda a: a
+ state = GetAndStore(self, info_hash, port, callback, x, self.rawserver.add_task, 'getPeers', "announcePeer")
+ self.rawserver.external_add_task(state.goWithNodes, 0, (nodes, l))
+
+ def krpc_get_peers(self, info_hash, id, _krpc_sender):
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+
+ l = self.retrieveValues(info_hash)
+ if len(l) > 0:
+ return {'values' : [reducePeers(l)],
+ "id": self.node.id,
+ "token" : self.get_token(sender['host'], sender['port'])}
+ else:
+ nodes = self.table.findNodes(info_hash, invalid=False)
+ nodes = [node.senderDict() for node in nodes]
+ return {'nodes' : packNodes(nodes),
+ "id": self.node.id,
+ "token" : self.get_token(sender['host'], sender['port'])}
+