summaryrefslogtreecommitdiffstats
path: root/khashmir/actions.py
diff options
context:
space:
mode:
Diffstat (limited to 'khashmir/actions.py')
-rw-r--r--khashmir/actions.py349
1 files changed, 349 insertions, 0 deletions
diff --git a/khashmir/actions.py b/khashmir/actions.py
new file mode 100644
index 0000000..2bbe94b
--- /dev/null
+++ b/khashmir/actions.py
@@ -0,0 +1,349 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+from BitTorrent.platform import bttime as time
+
+import const
+
+from khash import intify
+from ktable import KTable, K
+from util import unpackNodes
+from krpc import KRPCProtocolError, KRPCSelfNodeError
+from bisect import insort
+
+class NodeWrap(object):
+ def __init__(self, node, target):
+ self.num = target
+ self.node = node
+
+ def __cmp__(self, o):
+ """ this function is for sorting nodes relative to the ID we are looking for """
+ x, y = self.num ^ o.num, self.num ^ self.node.num
+ if x > y:
+ return 1
+ elif x < y:
+ return -1
+ return 0
+
+class ActionBase(object):
+ """ base class for some long running asynchronous proccesses like finding nodes or values """
+ def __init__(self, table, target, callback, callLater):
+ self.table = table
+ self.target = target
+ self.callLater = callLater
+ self.num = intify(target)
+ self.found = {}
+ self.foundq = []
+ self.queried = {}
+ self.queriedip = {}
+ self.answered = {}
+ self.callback = callback
+ self.outstanding = 0
+ self.finished = 0
+
+ def sort(self, a, b):
+ """ this function is for sorting nodes relative to the ID we are looking for """
+ x, y = self.num ^ a.num, self.num ^ b.num
+ if x > y:
+ return 1
+ elif x < y:
+ return -1
+ return 0
+
+ def shouldQuery(self, node):
+ if node.id == self.table.node.id:
+ return False
+ elif (node.host, node.port) not in self.queriedip and node.id not in self.queried:
+ self.queriedip[(node.host, node.port)] = 1
+ self.queried[node.id] = 1
+ return True
+ return False
+
+ def _cleanup(self):
+ self.foundq = None
+ self.found = None
+ self.queried = None
+ self.queriedip = None
+
+ def goWithNodes(self, t):
+ pass
+
+
+
+FIND_NODE_TIMEOUT = 15
+
+class FindNode(ActionBase):
+ """ find node action merits it's own class as it is a long running stateful process """
+ def handleGotNodes(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
+ sender = {'id' : dict["id"]}
+ sender['port'] = _krpc_sender[1]
+ sender['host'] = _krpc_sender[0]
+ sender = self.table.Node().initWithDict(sender)
+ try:
+ l = unpackNodes(dict.get("nodes", []))
+ if not self.answered.has_key(sender.id):
+ self.answered[sender.id] = sender
+ except:
+ l = []
+ self.table.invalidateNode(sender)
+
+ if self.finished:
+ # a day late and a dollar short
+ return
+ self.outstanding = self.outstanding - 1
+ for node in l:
+ n = self.table.Node().initWithDict(node)
+ if not self.found.has_key(n.id):
+ self.found[n.id] = n
+ insort(self.foundq, NodeWrap(n, self.num))
+ self.table.insertNode(n, contacted=0)
+ self.schedule()
+
+ def schedule(self):
+ """
+ send messages to new peers, if necessary
+ """
+ if self.finished:
+ return
+ l = [wrapper.node for wrapper in self.foundq[:K]]
+ for node in l:
+ if node.id == self.target:
+ self.finished=1
+ return self.callback([node])
+ if self.shouldQuery(node):
+ #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
+ try:
+ df = node.findNode(self.target, self.table.node.id)
+ except KRPCSelfNodeError:
+ pass
+ else:
+ df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+ assert(self.outstanding) >=0
+ if self.outstanding == 0:
+ ## all done!!
+ self.finished=1
+ self._cleanup()
+ self.callLater(self.callback, 0, (l[:K],))
+
+ def makeMsgFailed(self, node):
+ return self._defaultGotNodes
+
+ def _defaultGotNodes(self, err):
+ self.outstanding = self.outstanding - 1
+ self.schedule()
+
+ def goWithNodes(self, nodes):
+ """
+ this starts the process, our argument is a transaction with t.extras being our list of nodes
+ it's a transaction since we got called from the dispatcher
+ """
+ for node in nodes:
+ if node.id == self.table.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+ insort(self.foundq, NodeWrap(node, self.num))
+ self.schedule()
+
+
+get_value_timeout = 15
+class GetValue(FindNode):
+ def __init__(self, table, target, callback, callLater, find="findValue"):
+ FindNode.__init__(self, table, target, callback, callLater)
+ self.findValue = find
+
+ """ get value task """
+ def handleGotNodes(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
+ sender = {'id' : dict["id"]}
+ sender['port'] = _krpc_sender[1]
+ sender['host'] = _krpc_sender[0]
+ sender = self.table.Node().initWithDict(sender)
+
+ if self.finished or self.answered.has_key(sender.id):
+ # a day late and a dollar short
+ return
+ self.outstanding = self.outstanding - 1
+
+ self.answered[sender.id] = sender
+ # go through nodes
+ # if we have any closer than what we already got, query them
+ if dict.has_key('nodes'):
+ try:
+ l = unpackNodes(dict.get('nodes',[]))
+ except:
+ l = []
+ del(self.answered[sender.id])
+
+ for node in l:
+ n = self.table.Node().initWithDict(node)
+ if not self.found.has_key(n.id):
+ self.table.insertNode(n)
+ self.found[n.id] = n
+ insort(self.foundq, NodeWrap(n, self.num))
+ elif dict.has_key('values'):
+ def x(y, z=self.results):
+ if not z.has_key(y):
+ z[y] = 1
+ return y
+ else:
+ return None
+ z = len(dict.get('values', []))
+ v = filter(None, map(x, dict.get('values',[])))
+ if(len(v)):
+ self.callLater(self.callback, 0, (v,))
+ self.schedule()
+
+ ## get value
+ def schedule(self):
+ if self.finished:
+ return
+ for node in [wrapper.node for wrapper in self.foundq[:K]]:
+ if self.shouldQuery(node):
+ #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+ try:
+ f = getattr(node, self.findValue)
+ except AttributeError:
+ print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
+ else:
+ try:
+ df = f(self.target, self.table.node.id)
+ df.addCallback(self.handleGotNodes)
+ df.addErrback(self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
+ except KRPCSelfNodeError:
+ pass
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+ assert(self.outstanding) >=0
+ if self.outstanding == 0:
+ ## all done, didn't find it!!
+ self.finished=1
+ self._cleanup()
+ self.callLater(self.callback,0, ([],))
+
+ ## get value
+ def goWithNodes(self, nodes, found=None):
+ self.results = {}
+ if found:
+ for n in found:
+ self.results[n] = 1
+ for node in nodes:
+ if node.id == self.table.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+ insort(self.foundq, NodeWrap(node, self.num))
+ self.schedule()
+
+
+class StoreValue(ActionBase):
+ def __init__(self, table, target, value, callback, callLater, store="storeValue"):
+ ActionBase.__init__(self, table, target, callback, callLater)
+ self.value = value
+ self.stored = []
+ self.store = store
+
+ def storedValue(self, t, node):
+ self.outstanding -= 1
+ if self.finished:
+ return
+ self.stored.append(t)
+ if len(self.stored) >= const.STORE_REDUNDANCY:
+ self.finished=1
+ self.callback(self.stored)
+ else:
+ if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
+ self.schedule()
+ return t
+
+ def storeFailed(self, t, node):
+ self.outstanding -= 1
+ if self.finished:
+ return t
+ self.schedule()
+ return t
+
+ def schedule(self):
+ if self.finished:
+ return
+ num = const.CONCURRENT_REQS - self.outstanding
+ if num > const.STORE_REDUNDANCY - len(self.stored):
+ num = const.STORE_REDUNDANCY - len(self.stored)
+ if num == 0 and not self.finished:
+ self.finished=1
+ self.callback(self.stored)
+ while num > 0:
+ try:
+ node = self.nodes.pop()
+ except IndexError:
+ if self.outstanding == 0:
+ self.finished = 1
+ self._cleanup()
+ self.callback(self.stored)
+ return
+ else:
+ if not node.id == self.table.node.id:
+ try:
+ f = getattr(node, self.store)
+ except AttributeError:
+ print ">>> %s doesn't have a %s method!" % (node, self.store)
+ else:
+ try:
+ df = f(self.target, self.value, self.table.node.id)
+ except KRPCProtocolError:
+ self.table.table.invalidateNode(node)
+ except KRPCSelfNodeError:
+ pass
+ else:
+ df.addCallback(self.storedValue,(),{'node':node})
+ df.addErrback(self.storeFailed, (), {'node':node})
+ self.outstanding += 1
+ num -= 1
+
+ def goWithNodes(self, nodes):
+ self.nodes = nodes
+ self.nodes.sort(self.sort)
+ self.schedule()
+
+
+class GetAndStore(GetValue):
+ def __init__(self, table, target, value, callback, storecallback, callLater, find="findValue", store="storeValue"):
+ self.store = store
+ self.value = value
+ self.cb2 = callback
+ self.storecallback = storecallback
+ def cb(res):
+ self.cb2(res)
+ if not(res):
+ n = StoreValue(self.table, self.target, self.value, self.doneStored, self.callLater, self.store)
+ n.goWithNodes(self.answered.values())
+ GetValue.__init__(self, table, target, cb, callLater, find)
+
+ def doneStored(self, dict):
+ self.storecallback(dict)
+
+class KeyExpirer:
+ def __init__(self, store, callLater):
+ self.store = store
+ self.callLater = callLater
+ self.callLater(self.doExpire, const.KEINITIAL_DELAY)
+
+ def doExpire(self):
+ self.cut = time() - const.KE_AGE
+ self.store.expire(self.cut)
+ self.callLater(self.doExpire, const.KE_DELAY)