summaryrefslogtreecommitdiffstats
path: root/khashmir/utkhashmir.py
diff options
context:
space:
mode:
Diffstat (limited to 'khashmir/utkhashmir.py')
-rw-r--r--khashmir/utkhashmir.py224
1 files changed, 224 insertions, 0 deletions
diff --git a/khashmir/utkhashmir.py b/khashmir/utkhashmir.py
new file mode 100644
index 0000000..66c7441
--- /dev/null
+++ b/khashmir/utkhashmir.py
@@ -0,0 +1,224 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+import khashmir, knode
+from actions import *
+from khash import newID
+from krpc import KRPCProtocolError, KRPCFailSilently
+from cache import Cache
+from sha import sha
+from util import *
+from threading import Thread
+from socket import gethostbyname
+from const import *
+from kstore import sample
+
+TOKEN_UPDATE_INTERVAL = 5 * 60 # five minutes
+NUM_PEERS = 50 # number of peers to return
+
+class UTNode(knode.KNodeBase):
+ def announcePeer(self, info_hash, port, khashmir_id):
+ assert type(port) == type(1)
+ assert type(info_hash) == type('')
+ assert type(khashmir_id) == type('')
+ assert len(info_hash) == 20
+ assert len(khashmir_id) == 20
+
+ try:
+ token = self.table.tcache[self.id]
+ except:
+ token = None
+ if token:
+ assert type(token) == type("")
+ assert len(token) == 20
+ df = self.conn().sendRequest('announce_peer', {'info_hash':info_hash,
+ 'port':port,
+ 'id':khashmir_id,
+ 'token':token})
+ else:
+ raise KRPCProtocolError("no write token for node")
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
+
+ def getPeers(self, info_hash, khashmir_id):
+ df = self.conn().sendRequest('get_peers', {'info_hash':info_hash, 'id':khashmir_id})
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
+
+ def checkSender(self, dict):
+ d = knode.KNodeBase.checkSender(self, dict)
+ try:
+ self.table.tcache[d['rsp']['id']] = d['rsp']['token']
+ except KeyError:
+ pass
+ return d
+
+class UTStoreValue(StoreValue):
+ def callNode(self, node, f):
+ return f(self.target, self.value, node.token, self.table.node.id)
+
+class UTKhashmir(khashmir.KhashmirBase):
+ _Node = UTNode
+
+ def setup(self, host, port, data_dir, rlcount, checkpoint=True):
+ khashmir.KhashmirBase.setup(self, host, port,data_dir, rlcount, checkpoint)
+ self.cur_token = self.last_token = sha('')
+ self.tcache = Cache()
+ self.gen_token(loop=True)
+ self.expire_cached_tokens(loop=True)
+
+ def expire_cached_tokens(self, loop=False):
+ self.tcache.expire(time() - TOKEN_UPDATE_INTERVAL)
+ if loop:
+ self.rawserver.external_add_task(self.expire_cached_tokens, TOKEN_UPDATE_INTERVAL, (True,))
+
+ def gen_token(self, loop=False):
+ self.last_token = self.cur_token
+ self.cur_token = sha(newID())
+ if loop:
+ self.rawserver.external_add_task(self.gen_token, TOKEN_UPDATE_INTERVAL, (True,))
+
+ def get_token(self, host, port):
+ x = self.cur_token.copy()
+ x.update("%s%s" % (host, port))
+ h = x.digest()
+ return h
+
+
+ def val_token(self, token, host, port):
+ x = self.cur_token.copy()
+ x.update("%s%s" % (host, port))
+ a = x.digest()
+ if token == a:
+ return True
+
+ x = self.last_token.copy()
+ x.update("%s%s" % (host, port))
+ b = x.digest()
+ if token == b:
+ return True
+
+ return False
+
+ def addContact(self, host, port, callback=None):
+ # use dns on host, then call khashmir.addContact
+ Thread(target=self._get_host, args=[host, port, callback]).start()
+
+ def _get_host(self, host, port, callback):
+
+ # this exception catch can go away once we actually fix the bug
+ try:
+ ip = gethostbyname(host)
+ except TypeError, e:
+ raise TypeError(str(e) + (": host(%s) port(%s)" % (repr(host), repr(port))))
+
+ self.rawserver.external_add_task(self._got_host, 0, (ip, port, callback))
+
+ def _got_host(self, host, port, callback):
+ khashmir.KhashmirBase.addContact(self, host, port, callback)
+
+ def announcePeer(self, info_hash, port, callback=None):
+ """ stores the value for key in the global table, returns immediately, no status
+ in this implementation, peers respond but don't indicate status to storing values
+ a key can have many values
+ """
+ def _storeValueForKey(nodes, key=info_hash, value=port, response=callback , table=self.table):
+ if not response:
+ # default callback
+ def _storedValueHandler(sender):
+ pass
+ response=_storedValueHandler
+ action = UTStoreValue(self, key, value, response, self.rawserver.add_task, "announcePeer")
+ self.rawserver.external_add_task(action.goWithNodes, 0, (nodes,))
+
+ # this call is asynch
+ self.findNode(info_hash, _storeValueForKey)
+
+ def krpc_announce_peer(self, info_hash, port, id, token, _krpc_sender):
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ if not self.val_token(token, sender['host'], sender['port']):
+ raise KRPCProtocolError("Invalid Write Token")
+ value = compact_peer_info(_krpc_sender[0], port)
+ self.store[info_hash] = value
+ n = self.Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+ return {"id" : self.node.id}
+
+ def retrieveValues(self, key):
+ try:
+ l = self.store.sample(key, NUM_PEERS)
+ except KeyError:
+ l = []
+ return l
+
+ def getPeers(self, info_hash, callback, searchlocal = 1):
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
+ nodes = self.table.findNodes(info_hash, invalid=True)
+ l = [x for x in nodes if x.invalid]
+ if len(l) > 4:
+ nodes = sample(l , 4) + self.table.findNodes(info_hash, invalid=False)[:4]
+
+ # get locals
+ if searchlocal:
+ l = self.retrieveValues(info_hash)
+ if len(l) > 0:
+ self.rawserver.external_add_task(callback, 0, ([reducePeers(l)],))
+ else:
+ l = []
+ # create our search state
+ state = GetValue(self, info_hash, callback, self.rawserver.add_task, 'getPeers')
+ self.rawserver.external_add_task(state.goWithNodes, 0, (nodes, l))
+
+ def getPeersAndAnnounce(self, info_hash, port, callback, searchlocal = 1):
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
+ nodes = self.table.findNodes(info_hash, invalid=False)
+ nodes += self.table.findNodes(info_hash, invalid=True)
+
+ # get locals
+ if searchlocal:
+ l = self.retrieveValues(info_hash)
+ if len(l) > 0:
+ self.rawserver.external_add_task(callback, 0, ([reducePeers(l)],))
+ else:
+ l = []
+ # create our search state
+ x = lambda a: a
+ state = GetAndStore(self, info_hash, port, callback, x, self.rawserver.add_task, 'getPeers', "announcePeer")
+ self.rawserver.external_add_task(state.goWithNodes, 0, (nodes, l))
+
+ def krpc_get_peers(self, info_hash, id, _krpc_sender):
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ self.insertNode(n, contacted=0)
+
+ l = self.retrieveValues(info_hash)
+ if len(l) > 0:
+ return {'values' : [reducePeers(l)],
+ "id": self.node.id,
+ "token" : self.get_token(sender['host'], sender['port'])}
+ else:
+ nodes = self.table.findNodes(info_hash, invalid=False)
+ nodes = [node.senderDict() for node in nodes]
+ return {'nodes' : packNodes(nodes),
+ "id": self.node.id,
+ "token" : self.get_token(sender['host'], sender['port'])}
+