diff options
| author | Ewan Mellor <ewan.mellor@citrix.com> | 2010-07-29 00:11:02 +0100 |
|---|---|---|
| committer | Ewan Mellor <ewan.mellor@citrix.com> | 2010-07-29 00:11:02 +0100 |
| commit | 9f4996e8738991a95a23cba2caa660f7002f94cd (patch) | |
| tree | e40bc2e56ea96131200d5e12918bf5a429bb5cc1 /nova/tests | |
| parent | c5edaa2186add12947185cb1fd47e0a48eccafa9 (diff) | |
| parent | f61b62983f62aa10d7bed3bc1c406717663be923 (diff) | |
Merge with trunk, including fixing up conflicts with the removal of fake_users
and the reworking of node.py -> service.py.
Diffstat (limited to 'nova/tests')
| -rw-r--r-- | nova/tests/access_unittest.py | 6 | ||||
| -rw-r--r-- | nova/tests/api_unittest.py | 31 | ||||
| -rw-r--r-- | nova/tests/auth_unittest.py (renamed from nova/tests/users_unittest.py) | 101 | ||||
| -rw-r--r-- | nova/tests/cloud_unittest.py | 33 | ||||
| -rw-r--r-- | nova/tests/compute_unittest.py (renamed from nova/tests/node_unittest.py) | 43 | ||||
| -rw-r--r-- | nova/tests/fake_flags.py | 2 | ||||
| -rw-r--r-- | nova/tests/future_unittest.py | 75 | ||||
| -rw-r--r-- | nova/tests/model_unittest.py | 99 | ||||
| -rw-r--r-- | nova/tests/network_unittest.py | 95 | ||||
| -rw-r--r-- | nova/tests/objectstore_unittest.py | 184 | ||||
| -rw-r--r-- | nova/tests/real_flags.py | 1 | ||||
| -rw-r--r-- | nova/tests/volume_unittest.py | 115 |
12 files changed, 495 insertions, 290 deletions
diff --git a/nova/tests/access_unittest.py b/nova/tests/access_unittest.py index 6cf7e893d..fa0a090a0 100644 --- a/nova/tests/access_unittest.py +++ b/nova/tests/access_unittest.py @@ -22,7 +22,7 @@ import logging from nova import exception from nova import flags from nova import test -from nova.auth.users import UserManager +from nova.auth import manager from nova.auth import rbac @@ -35,7 +35,7 @@ class AccessTestCase(test.BaseTestCase): super(AccessTestCase, self).setUp() FLAGS.connection_type = 'fake' FLAGS.fake_storage = True - um = UserManager.instance() + um = manager.AuthManager() # Make test users try: self.testadmin = um.create_user('testadmin') @@ -79,7 +79,7 @@ class AccessTestCase(test.BaseTestCase): #user is set in each test def tearDown(self): - um = UserManager.instance() + um = manager.AuthManager() # Delete the test project um.delete_project('testproj') # Delete the test user diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py index e5e2afe26..9d072866c 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/api_unittest.py @@ -26,7 +26,7 @@ from twisted.internet import defer from nova import flags from nova import test -from nova.auth import users +from nova.auth import manager from nova.endpoint import api from nova.endpoint import cloud @@ -43,7 +43,11 @@ def boto_to_tornado(method, path, headers, data, host, connection=None): connection should be a FakeTornadoHttpConnection instance """ - headers = httpserver.HTTPHeaders() + try: + headers = httpserver.HTTPHeaders() + except AttributeError: + from tornado import httputil + headers = httputil.HTTPHeaders() for k, v in headers.iteritems(): headers[k] = v @@ -150,7 +154,7 @@ class ApiEc2TestCase(test.BaseTestCase): def setUp(self): super(ApiEc2TestCase, self).setUp() - self.users = users.UserManager.instance() + self.manager = manager.AuthManager() self.cloud = cloud.CloudController() self.host = '127.0.0.1' @@ -175,25 +179,22 @@ class ApiEc2TestCase(test.BaseTestCase): def test_describe_instances(self): self.expect_http() self.mox.ReplayAll() - try: - self.users.create_user('fake', 'fake', 'fake') - except Exception, _err: - pass # User may already exist + user = self.manager.create_user('fake', 'fake', 'fake') + project = self.manager.create_project('fake', 'fake', 'fake') self.assertEqual(self.ec2.get_all_instances(), []) - self.users.delete_user('fake') + self.manager.delete_project(project) + self.manager.delete_user(user) def test_get_all_key_pairs(self): self.expect_http() self.mox.ReplayAll() keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8))) - try: - self.users.create_user('fake', 'fake', 'fake') - except Exception, _err: - pass # User may already exist - self.users.generate_key_pair('fake', keyname) + user = self.manager.create_user('fake', 'fake', 'fake') + project = self.manager.create_project('fake', 'fake', 'fake') + self.manager.generate_key_pair(user.id, keyname) rv = self.ec2.get_all_key_pairs() self.assertTrue(filter(lambda k: k.name == keyname, rv)) - self.users.delete_user('fake') - + self.manager.delete_project(project) + self.manager.delete_user(user) diff --git a/nova/tests/users_unittest.py b/nova/tests/auth_unittest.py index 824d5cff6..073ff71d2 100644 --- a/nova/tests/users_unittest.py +++ b/nova/tests/auth_unittest.py @@ -25,41 +25,41 @@ import unittest from nova import crypto from nova import flags from nova import test -from nova.auth import users +from nova.auth import manager from nova.endpoint import cloud FLAGS = flags.FLAGS -class UserTestCase(test.BaseTestCase): +class AuthTestCase(test.BaseTestCase): flush_db = False def setUp(self): - super(UserTestCase, self).setUp() + super(AuthTestCase, self).setUp() self.flags(connection_type='fake', fake_storage=True) - self.users = users.UserManager.instance() + self.manager = manager.AuthManager() def test_001_can_create_users(self): - self.users.create_user('test1', 'access', 'secret') - self.users.create_user('test2') + self.manager.create_user('test1', 'access', 'secret') + self.manager.create_user('test2') def test_002_can_get_user(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') def test_003_can_retreive_properties(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') self.assertEqual('test1', user.id) self.assertEqual('access', user.access) self.assertEqual('secret', user.secret) def test_004_signature_is_valid(self): - #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? )) + #self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? )) pass #raise NotImplementedError def test_005_can_get_credentials(self): return - credentials = self.users.get_user('test1').get_credentials() + credentials = self.manager.get_user('test1').get_credentials() self.assertEqual(credentials, 'export EC2_ACCESS_KEY="access"\n' + 'export EC2_SECRET_KEY="secret"\n' + @@ -68,14 +68,14 @@ class UserTestCase(test.BaseTestCase): 'export EC2_USER_ID="test1"\n') def test_006_test_key_storage(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') user.create_key_pair('public', 'key', 'fingerprint') key = user.get_key_pair('public') self.assertEqual('key', key.public_key) self.assertEqual('fingerprint', key.fingerprint) def test_007_test_key_generation(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') private_key, fingerprint = user.generate_key_pair('public2') key = RSA.load_key_string(private_key, callback=lambda: None) bio = BIO.MemoryBuffer() @@ -87,71 +87,71 @@ class UserTestCase(test.BaseTestCase): converted.split(" ")[1].strip()) def test_008_can_list_key_pairs(self): - keys = self.users.get_user('test1').get_key_pairs() + keys = self.manager.get_user('test1').get_key_pairs() self.assertTrue(filter(lambda k: k.name == 'public', keys)) self.assertTrue(filter(lambda k: k.name == 'public2', keys)) def test_009_can_delete_key_pair(self): - self.users.get_user('test1').delete_key_pair('public') - keys = self.users.get_user('test1').get_key_pairs() + self.manager.get_user('test1').delete_key_pair('public') + keys = self.manager.get_user('test1').get_key_pairs() self.assertFalse(filter(lambda k: k.name == 'public', keys)) def test_010_can_list_users(self): - users = self.users.get_users() + users = self.manager.get_users() logging.warn(users) self.assertTrue(filter(lambda u: u.id == 'test1', users)) def test_101_can_add_user_role(self): - self.assertFalse(self.users.has_role('test1', 'itsec')) - self.users.add_role('test1', 'itsec') - self.assertTrue(self.users.has_role('test1', 'itsec')) + self.assertFalse(self.manager.has_role('test1', 'itsec')) + self.manager.add_role('test1', 'itsec') + self.assertTrue(self.manager.has_role('test1', 'itsec')) def test_199_can_remove_user_role(self): - self.assertTrue(self.users.has_role('test1', 'itsec')) - self.users.remove_role('test1', 'itsec') - self.assertFalse(self.users.has_role('test1', 'itsec')) + self.assertTrue(self.manager.has_role('test1', 'itsec')) + self.manager.remove_role('test1', 'itsec') + self.assertFalse(self.manager.has_role('test1', 'itsec')) def test_201_can_create_project(self): - project = self.users.create_project('testproj', 'test1', 'A test project', ['test1']) - self.assertTrue(filter(lambda p: p.name == 'testproj', self.users.get_projects())) + project = self.manager.create_project('testproj', 'test1', 'A test project', ['test1']) + self.assertTrue(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) self.assertEqual(project.name, 'testproj') self.assertEqual(project.description, 'A test project') self.assertEqual(project.project_manager_id, 'test1') self.assertTrue(project.has_member('test1')) def test_202_user1_is_project_member(self): - self.assertTrue(self.users.get_user('test1').is_project_member('testproj')) + self.assertTrue(self.manager.get_user('test1').is_project_member('testproj')) def test_203_user2_is_not_project_member(self): - self.assertFalse(self.users.get_user('test2').is_project_member('testproj')) + self.assertFalse(self.manager.get_user('test2').is_project_member('testproj')) def test_204_user1_is_project_manager(self): - self.assertTrue(self.users.get_user('test1').is_project_manager('testproj')) + self.assertTrue(self.manager.get_user('test1').is_project_manager('testproj')) def test_205_user2_is_not_project_manager(self): - self.assertFalse(self.users.get_user('test2').is_project_manager('testproj')) + self.assertFalse(self.manager.get_user('test2').is_project_manager('testproj')) def test_206_can_add_user_to_project(self): - self.users.add_to_project('test2', 'testproj') - self.assertTrue(self.users.get_project('testproj').has_member('test2')) + self.manager.add_to_project('test2', 'testproj') + self.assertTrue(self.manager.get_project('testproj').has_member('test2')) def test_208_can_remove_user_from_project(self): - self.users.remove_from_project('test2', 'testproj') - self.assertFalse(self.users.get_project('testproj').has_member('test2')) + self.manager.remove_from_project('test2', 'testproj') + self.assertFalse(self.manager.get_project('testproj').has_member('test2')) def test_209_can_generate_x509(self): # MUST HAVE RUN CLOUD SETUP BY NOW self.cloud = cloud.CloudController() self.cloud.setup() - private_key, signed_cert_string = self.users.get_project('testproj').generate_x509_cert('test1') - logging.debug(signed_cert_string) + _key, cert_str = self.manager._generate_x509_cert('test1', 'testproj') + logging.debug(cert_str) # Need to verify that it's signed by the right intermediate CA full_chain = crypto.fetch_ca(project_id='testproj', chain=True) int_cert = crypto.fetch_ca(project_id='testproj', chain=False) cloud_cert = crypto.fetch_ca() logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain) - signed_cert = X509.load_cert_string(signed_cert_string) + signed_cert = X509.load_cert_string(cert_str) chain_cert = X509.load_cert_string(full_chain) int_cert = X509.load_cert_string(int_cert) cloud_cert = X509.load_cert_string(cloud_cert) @@ -164,42 +164,45 @@ class UserTestCase(test.BaseTestCase): self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey())) def test_210_can_add_project_role(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assertFalse(project.has_role('test1', 'sysadmin')) - self.users.add_role('test1', 'sysadmin') + self.manager.add_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) project.add_role('test1', 'sysadmin') self.assertTrue(project.has_role('test1', 'sysadmin')) def test_211_can_remove_project_role(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assertTrue(project.has_role('test1', 'sysadmin')) project.remove_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) - self.users.remove_role('test1', 'sysadmin') + self.manager.remove_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) def test_212_vpn_ip_and_port_looks_valid(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assert_(project.vpn_ip) self.assert_(project.vpn_port >= FLAGS.vpn_start_port) self.assert_(project.vpn_port <= FLAGS.vpn_end_port) def test_213_too_many_vpns(self): - for i in xrange(users.Vpn.num_ports_for_ip(FLAGS.vpn_ip)): - users.Vpn.create("vpnuser%s" % i) - self.assertRaises(users.NoMorePorts, users.Vpn.create, "boom") + vpns = [] + for i in xrange(manager.Vpn.num_ports_for_ip(FLAGS.vpn_ip)): + vpns.append(manager.Vpn.create("vpnuser%s" % i)) + self.assertRaises(manager.NoMorePorts, manager.Vpn.create, "boom") + for vpn in vpns: + vpn.destroy() def test_299_can_delete_project(self): - self.users.delete_project('testproj') - self.assertFalse(filter(lambda p: p.name == 'testproj', self.users.get_projects())) + self.manager.delete_project('testproj') + self.assertFalse(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) def test_999_can_delete_users(self): - self.users.delete_user('test1') - users = self.users.get_users() + self.manager.delete_user('test1') + users = self.manager.get_users() self.assertFalse(filter(lambda u: u.id == 'test1', users)) - self.users.delete_user('test2') - self.assertEqual(self.users.get_user('test2'), None) + self.manager.delete_user('test2') + self.assertEqual(self.manager.get_user('test2'), None) if __name__ == "__main__": diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 8040f6331..40837405c 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -27,8 +27,8 @@ from xml.etree import ElementTree from nova import flags from nova import rpc from nova import test -from nova.auth import users -from nova.compute import node +from nova.auth import manager +from nova.compute import service from nova.endpoint import api from nova.endpoint import cloud @@ -40,8 +40,7 @@ class CloudTestCase(test.BaseTestCase): def setUp(self): super(CloudTestCase, self).setUp() self.flags(connection_type='fake', - fake_storage=True, - fake_users=True) + fake_storage=True) self.conn = rpc.Connection.instance() logging.getLogger().setLevel(logging.DEBUG) @@ -53,34 +52,34 @@ class CloudTestCase(test.BaseTestCase): proxy=self.cloud) self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) - # set up a node - self.node = node.Node() - self.node_consumer = rpc.AdapterConsumer(connection=self.conn, + # set up a service + self.compute = service.ComputeService() + self.compute_consumer = rpc.AdapterConsumer(connection=self.conn, topic=FLAGS.compute_topic, - proxy=self.node) - self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) + proxy=self.compute) + self.injected.append(self.compute_consumer.attach_to_tornado(self.ioloop)) try: - users.UserManager.instance().create_user('admin', 'admin', 'admin') + manager.AuthManager().create_user('admin', 'admin', 'admin') except: pass - admin = users.UserManager.instance().get_user('admin') - project = users.UserManager.instance().create_project('proj', 'admin', 'proj') + admin = manager.AuthManager().get_user('admin') + project = manager.AuthManager().create_project('proj', 'admin', 'proj') self.context = api.APIRequestContext(handler=None,project=project,user=admin) def tearDown(self): - users.UserManager.instance().delete_project('proj') - users.UserManager.instance().delete_user('admin') + manager.AuthManager().delete_project('proj') + manager.AuthManager().delete_user('admin') def test_console_output(self): if FLAGS.connection_type == 'fake': logging.debug("Can't test instances without a real virtual env.") return instance_id = 'foo' - inst = yield self.node.run_instance(instance_id) + inst = yield self.compute.run_instance(instance_id) output = yield self.cloud.get_console_output(self.context, [instance_id]) logging.debug(output) self.assert_(output) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) def test_run_instances(self): if FLAGS.connection_type == 'fake': @@ -112,7 +111,7 @@ class CloudTestCase(test.BaseTestCase): # for instance in reservations[res_id]: for instance in reservations[reservations.keys()[0]]: logging.debug("Terminating instance %s" % instance['instance_id']) - rv = yield self.node.terminate_instance(instance['instance_id']) + rv = yield self.compute.terminate_instance(instance['instance_id']) def test_instance_update_state(self): def instance(num): diff --git a/nova/tests/node_unittest.py b/nova/tests/compute_unittest.py index 86d9775fd..da0f82e3a 100644 --- a/nova/tests/node_unittest.py +++ b/nova/tests/compute_unittest.py @@ -26,7 +26,7 @@ from nova import flags from nova import test from nova import utils from nova.compute import model -from nova.compute import node +from nova.compute import service FLAGS = flags.FLAGS @@ -53,14 +53,13 @@ class InstanceXmlTestCase(test.TrialTestCase): # rv = yield first_node.terminate_instance(instance_id) -class NodeConnectionTestCase(test.TrialTestCase): +class ComputeConnectionTestCase(test.TrialTestCase): def setUp(self): logging.getLogger().setLevel(logging.DEBUG) - super(NodeConnectionTestCase, self).setUp() + super(ComputeConnectionTestCase, self).setUp() self.flags(connection_type='fake', - fake_storage=True, - fake_users=True) - self.node = node.Node() + fake_storage=True) + self.compute = service.ComputeService() def create_instance(self): instdir = model.InstanceDirectory() @@ -81,48 +80,48 @@ class NodeConnectionTestCase(test.TrialTestCase): def test_run_describe_terminate(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() logging.info("Running instances: %s", rv) self.assertEqual(rv[instance_id].name, instance_id) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() logging.info("After terminating instances: %s", rv) self.assertEqual(rv, {}) @defer.inlineCallbacks def test_reboot(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - yield self.node.reboot_instance(instance_id) + yield self.compute.reboot_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) @defer.inlineCallbacks def test_console_output(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - console = yield self.node.get_console_output(instance_id) + console = yield self.compute.get_console_output(instance_id) self.assert_(console) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) @defer.inlineCallbacks def test_run_instance_existing(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - self.assertRaises(exception.Error, self.node.run_instance, instance_id) - rv = yield self.node.terminate_instance(instance_id) + self.assertRaises(exception.Error, self.compute.run_instance, instance_id) + rv = yield self.compute.terminate_instance(instance_id) diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py index 5fcd2bcac..a7310fb26 100644 --- a/nova/tests/fake_flags.py +++ b/nova/tests/fake_flags.py @@ -24,5 +24,5 @@ FLAGS.connection_type = 'fake' FLAGS.fake_storage = True FLAGS.fake_rabbit = True FLAGS.fake_network = True -FLAGS.fake_users = True +FLAGS.auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver' FLAGS.verbose = True diff --git a/nova/tests/future_unittest.py b/nova/tests/future_unittest.py deleted file mode 100644 index 31ec83065..000000000 --- a/nova/tests/future_unittest.py +++ /dev/null @@ -1,75 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import mox -import StringIO -import time -from tornado import ioloop -from twisted.internet import defer -import unittest -from xml.etree import ElementTree - -from nova import cloud -from nova import exception -from nova import flags -from nova import node -from nova import rpc -from nova import test - - -FLAGS = flags.FLAGS - - -class AdminTestCase(test.BaseTestCase): - def setUp(self): - super(AdminTestCase, self).setUp() - self.flags(connection_type='fake', - fake_rabbit=True) - - self.conn = rpc.Connection.instance() - - logging.getLogger().setLevel(logging.INFO) - - # set up our cloud - self.cloud = cloud.CloudController() - self.cloud_consumer = rpc.AdapterConsumer(connection=self.conn, - topic=FLAGS.cloud_topic, - proxy=self.cloud) - self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) - - # set up a node - self.node = node.Node() - self.node_consumer = rpc.AdapterConsumer(connection=self.conn, - topic=FLAGS.compute_topic, - proxy=self.node) - self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) - - def test_flush_terminated(self): - # Launch an instance - - # Wait until it's running - - # Terminate it - - # Wait until it's terminated - - # Flush terminated nodes - - # ASSERT that it's gone - pass diff --git a/nova/tests/model_unittest.py b/nova/tests/model_unittest.py index b9eb2ac96..6825cfe2a 100644 --- a/nova/tests/model_unittest.py +++ b/nova/tests/model_unittest.py @@ -16,6 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. +from datetime import datetime, timedelta import logging import time from twisted.internet import defer @@ -25,7 +26,6 @@ from nova import flags from nova import test from nova import utils from nova.compute import model -from nova.compute import node FLAGS = flags.FLAGS @@ -35,8 +35,7 @@ class ModelTestCase(test.TrialTestCase): def setUp(self): super(ModelTestCase, self).setUp() self.flags(connection_type='fake', - fake_storage=True, - fake_users=True) + fake_storage=True) def tearDown(self): model.Instance('i-test').destroy() @@ -66,6 +65,12 @@ class ModelTestCase(test.TrialTestCase): daemon.save() return daemon + def create_session_token(self): + session_token = model.SessionToken('tk12341234') + session_token['user'] = 'testuser' + session_token.save() + return session_token + @defer.inlineCallbacks def test_create_instance(self): """store with create_instace, then test that a load finds it""" @@ -204,3 +209,91 @@ class ModelTestCase(test.TrialTestCase): if x.identifier == 'testhost:nova-testdaemon': found = True self.assertTrue(found) + + @defer.inlineCallbacks + def test_create_session_token(self): + """create""" + d = yield self.create_session_token() + d = model.SessionToken(d.token) + self.assertFalse(d.is_new_record()) + + @defer.inlineCallbacks + def test_delete_session_token(self): + """create, then destroy, then make sure loads a new record""" + instance = yield self.create_session_token() + yield instance.destroy() + newinst = yield model.SessionToken(instance.token) + self.assertTrue(newinst.is_new_record()) + + @defer.inlineCallbacks + def test_session_token_added_to_set(self): + """create, then check that it is included in list""" + instance = yield self.create_session_token() + found = False + for x in model.SessionToken.all(): + if x.identifier == instance.token: + found = True + self.assert_(found) + + @defer.inlineCallbacks + def test_session_token_associates_user(self): + """create, then check that it is listed for the user""" + instance = yield self.create_session_token() + found = False + for x in model.SessionToken.associated_to('user', 'testuser'): + if x.identifier == instance.identifier: + found = True + self.assertTrue(found) + + @defer.inlineCallbacks + def test_session_token_generation(self): + instance = yield model.SessionToken.generate('username', 'TokenType') + self.assertFalse(instance.is_new_record()) + + @defer.inlineCallbacks + def test_find_generated_session_token(self): + instance = yield model.SessionToken.generate('username', 'TokenType') + found = yield model.SessionToken.lookup(instance.identifier) + self.assert_(found) + + def test_update_session_token_expiry(self): + instance = model.SessionToken('tk12341234') + oldtime = datetime.utcnow() + instance['expiry'] = oldtime.strftime(utils.TIME_FORMAT) + instance.update_expiry() + expiry = utils.parse_isotime(instance['expiry']) + self.assert_(expiry > datetime.utcnow()) + + @defer.inlineCallbacks + def test_session_token_lookup_when_expired(self): + instance = yield model.SessionToken.generate("testuser") + instance['expiry'] = datetime.utcnow().strftime(utils.TIME_FORMAT) + instance.save() + inst = model.SessionToken.lookup(instance.identifier) + self.assertFalse(inst) + + @defer.inlineCallbacks + def test_session_token_lookup_when_not_expired(self): + instance = yield model.SessionToken.generate("testuser") + inst = model.SessionToken.lookup(instance.identifier) + self.assert_(inst) + + @defer.inlineCallbacks + def test_session_token_is_expired_when_expired(self): + instance = yield model.SessionToken.generate("testuser") + instance['expiry'] = datetime.utcnow().strftime(utils.TIME_FORMAT) + self.assert_(instance.is_expired()) + + @defer.inlineCallbacks + def test_session_token_is_expired_when_not_expired(self): + instance = yield model.SessionToken.generate("testuser") + self.assertFalse(instance.is_expired()) + + @defer.inlineCallbacks + def test_session_token_ttl(self): + instance = yield model.SessionToken.generate("testuser") + now = datetime.utcnow() + delta = timedelta(hours=1) + instance['expiry'] = (now + delta).strftime(utils.TIME_FORMAT) + # give 5 seconds of fuzziness + self.assert_(abs(instance.ttl() - FLAGS.auth_token_ttl) < 5) diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index 69278e896..f24eefb0d 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -23,7 +23,7 @@ import logging from nova import flags from nova import test from nova import utils -from nova.auth import users +from nova.auth import manager from nova.compute import network from nova.compute.exception import NoMoreAddresses @@ -32,66 +32,71 @@ FLAGS = flags.FLAGS class NetworkTestCase(test.TrialTestCase): def setUp(self): super(NetworkTestCase, self).setUp() + # NOTE(vish): if you change these flags, make sure to change the + # flags in the corresponding section in nova-dhcpbridge self.flags(connection_type='fake', fake_storage=True, fake_network=True, + auth_driver='nova.auth.ldapdriver.FakeLdapDriver', network_size=32) logging.getLogger().setLevel(logging.DEBUG) - self.manager = users.UserManager.instance() + self.manager = manager.AuthManager() self.dnsmasq = FakeDNSMasq() - try: - self.manager.create_user('netuser', 'netuser', 'netuser') - except: pass + self.user = self.manager.create_user('netuser', 'netuser', 'netuser') + self.projects = [] + self.projects.append(self.manager.create_project('netuser', + 'netuser', + 'netuser')) for i in range(0, 6): name = 'project%s' % i - if not self.manager.get_project(name): - self.manager.create_project(name, 'netuser', name) + self.projects.append(self.manager.create_project(name, + 'netuser', + name)) self.network = network.PublicNetworkController() def tearDown(self): super(NetworkTestCase, self).tearDown() - for i in range(0, 6): - name = 'project%s' % i - self.manager.delete_project(name) - self.manager.delete_user('netuser') + for project in self.projects: + self.manager.delete_project(project) + self.manager.delete_user(self.user) def test_public_network_allocation(self): pubnet = IPy.IP(flags.FLAGS.public_range) - address = self.network.allocate_ip("netuser", "project0", "public") + address = self.network.allocate_ip(self.user.id, self.projects[0].id, "public") self.assertTrue(IPy.IP(address) in pubnet) self.assertTrue(IPy.IP(address) in self.network.network) def test_allocate_deallocate_ip(self): address = network.allocate_ip( - "netuser", "project0", utils.generate_mac()) + self.user.id, self.projects[0].id, utils.generate_mac()) logging.debug("Was allocated %s" % (address)) - net = network.get_project_network("project0", "default") - self.assertEqual(True, is_in_project(address, "project0")) + net = network.get_project_network(self.projects[0].id, "default") + self.assertEqual(True, is_in_project(address, self.projects[0].id)) mac = utils.generate_mac() hostname = "test-host" self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name) rv = network.deallocate_ip(address) # Doesn't go away until it's dhcp released - self.assertEqual(True, is_in_project(address, "project0")) + self.assertEqual(True, is_in_project(address, self.projects[0].id)) self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) - self.assertEqual(False, is_in_project(address, "project0")) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) def test_range_allocation(self): mac = utils.generate_mac() secondmac = utils.generate_mac() hostname = "test-host" address = network.allocate_ip( - "netuser", "project0", mac) + self.user.id, self.projects[0].id, mac) secondaddress = network.allocate_ip( - "netuser", "project1", secondmac) - net = network.get_project_network("project0", "default") - secondnet = network.get_project_network("project1", "default") + self.user, self.projects[1].id, secondmac) + net = network.get_project_network(self.projects[0].id, "default") + secondnet = network.get_project_network(self.projects[1].id, "default") - self.assertEqual(True, is_in_project(address, "project0")) - self.assertEqual(True, is_in_project(secondaddress, "project1")) - self.assertEqual(False, is_in_project(address, "project1")) + self.assertEqual(True, is_in_project(address, self.projects[0].id)) + self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id)) + self.assertEqual(False, is_in_project(address, self.projects[1].id)) # Addresses are allocated before they're issued self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name) @@ -100,34 +105,34 @@ class NetworkTestCase(test.TrialTestCase): rv = network.deallocate_ip(address) self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) - self.assertEqual(False, is_in_project(address, "project0")) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) # First address release shouldn't affect the second - self.assertEqual(True, is_in_project(secondaddress, "project1")) + self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id)) rv = network.deallocate_ip(secondaddress) self.dnsmasq.release_ip(secondmac, secondaddress, hostname, secondnet.bridge_name) - self.assertEqual(False, is_in_project(secondaddress, "project1")) + self.assertEqual(False, is_in_project(secondaddress, self.projects[1].id)) def test_subnet_edge(self): - secondaddress = network.allocate_ip("netuser", "project0", + secondaddress = network.allocate_ip(self.user.id, self.projects[0].id, utils.generate_mac()) hostname = "toomany-hosts" - for project in range(1,5): - project_id = "project%s" % (project) + for i in range(1,5): + project_id = self.projects[i].id mac = utils.generate_mac() mac2 = utils.generate_mac() mac3 = utils.generate_mac() address = network.allocate_ip( - "netuser", project_id, mac) + self.user, project_id, mac) address2 = network.allocate_ip( - "netuser", project_id, mac2) + self.user, project_id, mac2) address3 = network.allocate_ip( - "netuser", project_id, mac3) - self.assertEqual(False, is_in_project(address, "project0")) - self.assertEqual(False, is_in_project(address2, "project0")) - self.assertEqual(False, is_in_project(address3, "project0")) + self.user, project_id, mac3) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) + self.assertEqual(False, is_in_project(address2, self.projects[0].id)) + self.assertEqual(False, is_in_project(address3, self.projects[0].id)) rv = network.deallocate_ip(address) rv = network.deallocate_ip(address2) rv = network.deallocate_ip(address3) @@ -135,7 +140,7 @@ class NetworkTestCase(test.TrialTestCase): self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) self.dnsmasq.release_ip(mac2, address2, hostname, net.bridge_name) self.dnsmasq.release_ip(mac3, address3, hostname, net.bridge_name) - net = network.get_project_network("project0", "default") + net = network.get_project_network(self.projects[0].id, "default") rv = network.deallocate_ip(secondaddress) self.dnsmasq.release_ip(mac, secondaddress, hostname, net.bridge_name) @@ -153,34 +158,36 @@ class NetworkTestCase(test.TrialTestCase): environment's setup. Network size is set in test fixture's setUp method. - + There are FLAGS.cnt_vpn_clients addresses reserved for VPN (NUM_RESERVED_VPN_IPS) And there are NUM_STATIC_IPS that are always reserved by Nova for the necessary services (gateway, CloudPipe, etc) - So we should get flags.network_size - (NUM_STATIC_IPS + - NUM_PREALLOCATED_IPS + + So we should get flags.network_size - (NUM_STATIC_IPS + + NUM_PREALLOCATED_IPS + NUM_RESERVED_VPN_IPS) usable addresses """ - net = network.get_project_network("project0", "default") + net = network.get_project_network(self.projects[0].id, "default") # Determine expected number of available IP addresses num_static_ips = net.num_static_ips num_preallocated_ips = len(net.hosts.keys()) num_reserved_vpn_ips = flags.FLAGS.cnt_vpn_clients - num_available_ips = flags.FLAGS.network_size - (num_static_ips + num_preallocated_ips + num_reserved_vpn_ips) + num_available_ips = flags.FLAGS.network_size - (num_static_ips + + num_preallocated_ips + + num_reserved_vpn_ips) hostname = "toomany-hosts" macs = {} addresses = {} for i in range(0, (num_available_ips - 1)): macs[i] = utils.generate_mac() - addresses[i] = network.allocate_ip("netuser", "project0", macs[i]) + addresses[i] = network.allocate_ip(self.user.id, self.projects[0].id, macs[i]) self.dnsmasq.issue_ip(macs[i], addresses[i], hostname, net.bridge_name) - self.assertRaises(NoMoreAddresses, network.allocate_ip, "netuser", "project0", utils.generate_mac()) + self.assertRaises(NoMoreAddresses, network.allocate_ip, self.user.id, self.projects[0].id, utils.generate_mac()) for i in range(0, (num_available_ips - 1)): rv = network.deallocate_ip(addresses[i]) diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py index f22256aaf..dd00377e7 100644 --- a/nova/tests/objectstore_unittest.py +++ b/nova/tests/objectstore_unittest.py @@ -16,6 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. +import boto import glob import hashlib import logging @@ -28,8 +29,13 @@ from nova import objectstore from nova.objectstore import bucket # for buckets_path flag from nova.objectstore import image # for images_path flag from nova import test -from nova.auth import users +from nova.auth import manager +from nova.objectstore.handler import S3 +from nova.exception import NotEmpty, NotFound, NotAuthorized +from boto.s3.connection import S3Connection, OrdinaryCallingFormat +from twisted.internet import reactor, threads, defer +from twisted.web import http, server FLAGS = flags.FLAGS @@ -53,13 +59,12 @@ os.makedirs(os.path.join(oss_tempdir, 'buckets')) class ObjectStoreTestCase(test.BaseTestCase): def setUp(self): super(ObjectStoreTestCase, self).setUp() - self.flags(fake_users=True, - buckets_path=os.path.join(oss_tempdir, 'buckets'), + self.flags(buckets_path=os.path.join(oss_tempdir, 'buckets'), images_path=os.path.join(oss_tempdir, 'images'), ca_path=os.path.join(os.path.dirname(__file__), 'CA')) logging.getLogger().setLevel(logging.DEBUG) - self.um = users.UserManager.instance() + self.um = manager.AuthManager() try: self.um.create_user('user1') except: pass @@ -98,49 +103,37 @@ class ObjectStoreTestCase(test.BaseTestCase): # another user is not authorized self.context.user = self.um.get_user('user2') self.context.project = self.um.get_project('proj2') - self.assert_(bucket.is_authorized(self.context) == False) + self.assertFalse(bucket.is_authorized(self.context)) # admin is authorized to use bucket self.context.user = self.um.get_user('admin_user') self.context.project = None - self.assert_(bucket.is_authorized(self.context)) + self.assertTrue(bucket.is_authorized(self.context)) # new buckets are empty - self.assert_(bucket.list_keys()['Contents'] == []) + self.assertTrue(bucket.list_keys()['Contents'] == []) # storing keys works bucket['foo'] = "bar" - self.assert_(len(bucket.list_keys()['Contents']) == 1) + self.assertEquals(len(bucket.list_keys()['Contents']), 1) - self.assert_(bucket['foo'].read() == 'bar') + self.assertEquals(bucket['foo'].read(), 'bar') # md5 of key works - self.assert_(bucket['foo'].md5 == hashlib.md5('bar').hexdigest()) - - # deleting non-empty bucket throws exception - exception = False - try: - bucket.delete() - except: - exception = True + self.assertEquals(bucket['foo'].md5, hashlib.md5('bar').hexdigest()) - self.assert_(exception) + # deleting non-empty bucket should throw a NotEmpty exception + self.assertRaises(NotEmpty, bucket.delete) # deleting key del bucket['foo'] - # deleting empty button + # deleting empty bucket bucket.delete() # accessing deleted bucket throws exception - exception = False - try: - objectstore.bucket.Bucket('new_bucket') - except: - exception = True - - self.assert_(exception) + self.assertRaises(NotFound, objectstore.bucket.Bucket, 'new_bucket') def test_images(self): self.context.user = self.um.get_user('user1') @@ -169,37 +162,108 @@ class ObjectStoreTestCase(test.BaseTestCase): # verify image permissions self.context.user = self.um.get_user('user2') self.context.project = self.um.get_project('proj2') - self.assert_(my_img.is_authorized(self.context) == False) - -# class ApiObjectStoreTestCase(test.BaseTestCase): -# def setUp(self): -# super(ApiObjectStoreTestCase, self).setUp() -# FLAGS.fake_users = True -# FLAGS.buckets_path = os.path.join(tempdir, 'buckets') -# FLAGS.images_path = os.path.join(tempdir, 'images') -# FLAGS.ca_path = os.path.join(os.path.dirname(__file__), 'CA') -# -# self.users = users.UserManager.instance() -# self.app = handler.Application(self.users) -# -# self.host = '127.0.0.1' -# -# self.conn = boto.s3.connection.S3Connection( -# aws_access_key_id=user.access, -# aws_secret_access_key=user.secret, -# is_secure=False, -# calling_format=boto.s3.connection.OrdinaryCallingFormat(), -# port=FLAGS.s3_port, -# host=FLAGS.s3_host) -# -# self.mox.StubOutWithMock(self.ec2, 'new_http_connection') -# -# def tearDown(self): -# FLAGS.Reset() -# super(ApiObjectStoreTestCase, self).tearDown() -# -# def test_describe_instances(self): -# self.expect_http() -# self.mox.ReplayAll() -# -# self.assertEqual(self.ec2.get_all_instances(), []) + self.assertFalse(my_img.is_authorized(self.context)) + + +class TestHTTPChannel(http.HTTPChannel): + # Otherwise we end up with an unclean reactor + def checkPersistence(self, _, __): + return False + + +class TestSite(server.Site): + protocol = TestHTTPChannel + + +class S3APITestCase(test.TrialTestCase): + def setUp(self): + super(S3APITestCase, self).setUp() + + FLAGS.auth_driver='nova.auth.ldapdriver.FakeLdapDriver', + FLAGS.buckets_path = os.path.join(oss_tempdir, 'buckets') + + self.um = manager.AuthManager() + self.admin_user = self.um.create_user('admin', admin=True) + self.admin_project = self.um.create_project('admin', self.admin_user) + + shutil.rmtree(FLAGS.buckets_path) + os.mkdir(FLAGS.buckets_path) + + root = S3() + self.site = TestSite(root) + self.listening_port = reactor.listenTCP(0, self.site, interface='127.0.0.1') + self.tcp_port = self.listening_port.getHost().port + + + if not boto.config.has_section('Boto'): + boto.config.add_section('Boto') + boto.config.set('Boto', 'num_retries', '0') + self.conn = S3Connection(aws_access_key_id=self.admin_user.access, + aws_secret_access_key=self.admin_user.secret, + host='127.0.0.1', + port=self.tcp_port, + is_secure=False, + calling_format=OrdinaryCallingFormat()) + + # Don't attempt to reuse connections + def get_http_connection(host, is_secure): + return self.conn.new_http_connection(host, is_secure) + self.conn.get_http_connection = get_http_connection + + def _ensure_empty_list(self, l): + self.assertEquals(len(l), 0, "List was not empty") + return True + + def _ensure_only_bucket(self, l, name): + self.assertEquals(len(l), 1, "List didn't have exactly one element in it") + self.assertEquals(l[0].name, name, "Wrong name") + + def test_000_list_buckets(self): + d = threads.deferToThread(self.conn.get_all_buckets) + d.addCallback(self._ensure_empty_list) + return d + + def test_001_create_and_delete_bucket(self): + bucket_name = 'testbucket' + + d = threads.deferToThread(self.conn.create_bucket, bucket_name) + d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets)) + + def ensure_only_bucket(l, name): + self.assertEquals(len(l), 1, "List didn't have exactly one element in it") + self.assertEquals(l[0].name, name, "Wrong name") + d.addCallback(ensure_only_bucket, bucket_name) + + d.addCallback(lambda _:threads.deferToThread(self.conn.delete_bucket, bucket_name)) + d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets)) + d.addCallback(self._ensure_empty_list) + return d + + def test_002_create_bucket_and_key_and_delete_key_again(self): + bucket_name = 'testbucket' + key_name = 'somekey' + key_contents = 'somekey' + + d = threads.deferToThread(self.conn.create_bucket, bucket_name) + d.addCallback(lambda b:threads.deferToThread(b.new_key, key_name)) + d.addCallback(lambda k:threads.deferToThread(k.set_contents_from_string, key_contents)) + def ensure_key_contents(bucket_name, key_name, contents): + bucket = self.conn.get_bucket(bucket_name) + key = bucket.get_key(key_name) + self.assertEquals(key.get_contents_as_string(), contents, "Bad contents") + d.addCallback(lambda _:threads.deferToThread(ensure_key_contents, bucket_name, key_name, key_contents)) + def delete_key(bucket_name, key_name): + bucket = self.conn.get_bucket(bucket_name) + key = bucket.get_key(key_name) + key.delete() + d.addCallback(lambda _:threads.deferToThread(delete_key, bucket_name, key_name)) + d.addCallback(lambda _:threads.deferToThread(self.conn.get_bucket, bucket_name)) + d.addCallback(lambda b:threads.deferToThread(b.get_all_keys)) + d.addCallback(self._ensure_empty_list) + return d + + def tearDown(self): + self.um.delete_user('admin') + self.um.delete_project('admin') + return defer.DeferredList([defer.maybeDeferred(self.listening_port.stopListening)]) + super(S3APITestCase, self).tearDown() diff --git a/nova/tests/real_flags.py b/nova/tests/real_flags.py index 690fb640a..121f4eb41 100644 --- a/nova/tests/real_flags.py +++ b/nova/tests/real_flags.py @@ -24,5 +24,4 @@ FLAGS.connection_type = 'libvirt' FLAGS.fake_storage = False FLAGS.fake_rabbit = False FLAGS.fake_network = False -FLAGS.fake_users = False FLAGS.verbose = False diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py new file mode 100644 index 000000000..b536ac383 --- /dev/null +++ b/nova/tests/volume_unittest.py @@ -0,0 +1,115 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from nova import compute +from nova import exception +from nova import flags +from nova import test +from nova.volume import service as volume_service + + +FLAGS = flags.FLAGS + + +class VolumeTestCase(test.TrialTestCase): + def setUp(self): + logging.getLogger().setLevel(logging.DEBUG) + super(VolumeTestCase, self).setUp() + self.compute = compute.service.ComputeService() + self.volume = None + self.flags(connection_type='fake', + fake_storage=True) + self.volume = volume_service.VolumeService() + + def test_run_create_volume(self): + vol_size = '0' + user_id = 'fake' + project_id = 'fake' + volume_id = self.volume.create_volume(vol_size, user_id, project_id) + # TODO(termie): get_volume returns differently than create_volume + self.assertEqual(volume_id, + volume_service.get_volume(volume_id)['volume_id']) + + rv = self.volume.delete_volume(volume_id) + self.assertRaises(exception.Error, + volume_service.get_volume, + volume_id) + + def test_too_big_volume(self): + vol_size = '1001' + user_id = 'fake' + project_id = 'fake' + self.assertRaises(TypeError, + self.volume.create_volume, + vol_size, user_id, project_id) + + def test_too_many_volumes(self): + vol_size = '1' + user_id = 'fake' + project_id = 'fake' + num_shelves = FLAGS.last_shelf_id - FLAGS.first_shelf_id + 1 + total_slots = FLAGS.slots_per_shelf * num_shelves + vols = [] + for i in xrange(total_slots): + vid = self.volume.create_volume(vol_size, user_id, project_id) + vols.append(vid) + self.assertRaises(volume_service.NoMoreVolumes, + self.volume.create_volume, + vol_size, user_id, project_id) + for id in vols: + self.volume.delete_volume(id) + + def test_run_attach_detach_volume(self): + # Create one volume and one compute to test with + instance_id = "storage-test" + vol_size = "5" + user_id = "fake" + project_id = 'fake' + mountpoint = "/dev/sdf" + volume_id = self.volume.create_volume(vol_size, user_id, project_id) + + volume_obj = volume_service.get_volume(volume_id) + volume_obj.start_attach(instance_id, mountpoint) + rv = yield self.compute.attach_volume(volume_id, + instance_id, + mountpoint) + self.assertEqual(volume_obj['status'], "in-use") + self.assertEqual(volume_obj['attachStatus'], "attached") + self.assertEqual(volume_obj['instance_id'], instance_id) + self.assertEqual(volume_obj['mountpoint'], mountpoint) + + self.assertRaises(exception.Error, + self.volume.delete_volume, + volume_id) + + rv = yield self.volume.detach_volume(volume_id) + volume_obj = volume_service.get_volume(volume_id) + self.assertEqual(volume_obj['status'], "available") + + rv = self.volume.delete_volume(volume_id) + self.assertRaises(exception.Error, + volume_service.get_volume, + volume_id) + + def test_multi_node(self): + # TODO(termie): Figure out how to test with two nodes, + # each of them having a different FLAG for storage_node + # This will allow us to test cross-node interactions + pass |
