diff options
-rw-r--r-- | nova/compute/model.py | 36 | ||||
-rw-r--r-- | nova/exception.py | 3 | ||||
-rw-r--r-- | nova/tests/model_unittest.py | 52 |
3 files changed, 91 insertions, 0 deletions
diff --git a/nova/compute/model.py b/nova/compute/model.py index cda188183..331b68349 100644 --- a/nova/compute/model.py +++ b/nova/compute/model.py @@ -43,6 +43,7 @@ True import logging import time import redis +import uuid from nova import datastore from nova import exception @@ -228,6 +229,41 @@ class Daemon(datastore.BasicModel): for x in cls.associated_to("host", hostname): yield x +class SessionToken(datastore.BasicModel): + """This is a short-lived auth token that is passed through web requests""" + + def __init__(self, session_token): + self.token = session_token + super(SessionToken, self).__init__() + + @property + def identifier(self): + return self.token + + def default_state(self): + return {'user': None, 'session_type': None, 'token': self.token} + + @classmethod + def generate(cls, userid, session_type=None): + token = str(uuid.uuid4()) + while cls.lookup(token): + token = str(uuid.uuid4()) + instance = cls(token) + instance['user'] = userid + instance['session_type'] = session_type + instance.save() + return instance + + def save(self): + """Call into superclass to save object, then save associations""" + if not self['user']: + raise exception.Invalid("SessionToken requires a User association") + success = super(SessionToken, self).save() + if success: + self.associate_with("user", self['user']) + return True + + if __name__ == "__main__": import doctest doctest.testmod() diff --git a/nova/exception.py b/nova/exception.py index 2108123de..52497a19e 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -47,6 +47,9 @@ class NotAuthorized(Error): class NotEmpty(Error): pass +class Invalid(Error): + pass + def wrap_exception(f): def _wrap(*args, **kw): try: diff --git a/nova/tests/model_unittest.py b/nova/tests/model_unittest.py index 1bd7e527f..7823991b9 100644 --- a/nova/tests/model_unittest.py +++ b/nova/tests/model_unittest.py @@ -66,6 +66,12 @@ class ModelTestCase(test.TrialTestCase): daemon.save() return daemon + def create_session_token(self): + session_token = model.SessionToken('tk12341234') + session_token['user'] = 'testuser' + session_token.save() + return session_token + @defer.inlineCallbacks def test_create_instance(self): """store with create_instace, then test that a load finds it""" @@ -204,3 +210,49 @@ class ModelTestCase(test.TrialTestCase): if x.identifier == 'testhost:nova-testdaemon': found = True self.assertTrue(found) + + @defer.inlineCallbacks + def test_create_session_token(self): + """create""" + d = yield self.create_session_token() + d = model.SessionToken(d.token) + self.assertFalse(d.is_new_record()) + + @defer.inlineCallbacks + def test_delete_session_token(self): + """create, then destroy, then make sure loads a new record""" + instance = yield self.create_session_token() + yield instance.destroy() + newinst = yield model.SessionToken(instance.token) + self.assertTrue(newinst.is_new_record()) + + @defer.inlineCallbacks + def test_session_token_added_to_set(self): + """create, then check that it is included in list""" + instance = yield self.create_session_token() + found = False + for x in model.SessionToken.all(): + if x.identifier == instance.token: + found = True + self.assert_(found) + + @defer.inlineCallbacks + def test_session_token_associates_user(self): + """create, then check that it is listed for the user""" + instance = yield self.create_session_token() + found = False + for x in model.SessionToken.associated_to('user', 'testuser'): + if x.identifier == instance.identifier: + found = True + self.assertTrue(found) + + @defer.inlineCallbacks + def test_session_token_generation(self): + instance = yield model.SessionToken.generate('username', 'TokenType') + self.assertFalse(instance.is_new_record()) + + @defer.inlineCallbacks + def test_find_generated_session_token(self): + instance = yield model.SessionToken.generate('username', 'TokenType') + found = yield model.SessionToken.lookup(instance.identifier) + self.assert_(found) |