From a3ca587654095ffd4b97103302fb0744e505e332 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 19 Jul 2010 13:19:26 -0500 Subject: Massive refactor of users.py Split users.py into manager.py and ldpadriver.py Added tons of docstrings Cleaned up public methods Simplified manager singleton handling --- nova/tests/access_unittest.py | 6 +- nova/tests/api_unittest.py | 4 +- nova/tests/auth_unittest.py | 207 +++++++++++++++++++++++++++++++++++++ nova/tests/cloud_unittest.py | 12 +-- nova/tests/network_unittest.py | 4 +- nova/tests/objectstore_unittest.py | 6 +- nova/tests/users_unittest.py | 207 ------------------------------------- 7 files changed, 223 insertions(+), 223 deletions(-) create mode 100644 nova/tests/auth_unittest.py delete mode 100644 nova/tests/users_unittest.py (limited to 'nova/tests') diff --git a/nova/tests/access_unittest.py b/nova/tests/access_unittest.py index 8500dd0cb..832a4b279 100644 --- a/nova/tests/access_unittest.py +++ b/nova/tests/access_unittest.py @@ -22,7 +22,7 @@ import logging from nova import exception from nova import flags from nova import test -from nova.auth.users import UserManager +from nova.auth import manager from nova.auth import rbac @@ -35,7 +35,7 @@ class AccessTestCase(test.BaseTestCase): super(AccessTestCase, self).setUp() FLAGS.fake_libvirt = True FLAGS.fake_storage = True - um = UserManager.instance() + um = manager.AuthManager() # Make test users try: self.testadmin = um.create_user('testadmin') @@ -79,7 +79,7 @@ class AccessTestCase(test.BaseTestCase): #user is set in each test def tearDown(self): - um = UserManager.instance() + um = manager.AuthManager() # Delete the test project um.delete_project('testproj') # Delete the test user diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py index e5e2afe26..5c26192bd 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/api_unittest.py @@ -26,7 +26,7 @@ from twisted.internet import defer from nova import flags from nova import test -from nova.auth import users +from nova.auth import manager from nova.endpoint import api from nova.endpoint import cloud @@ -150,7 +150,7 @@ class ApiEc2TestCase(test.BaseTestCase): def setUp(self): super(ApiEc2TestCase, self).setUp() - self.users = users.UserManager.instance() + self.users = manager.AuthManager() self.cloud = cloud.CloudController() self.host = '127.0.0.1' diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py new file mode 100644 index 000000000..000f6bf17 --- /dev/null +++ b/nova/tests/auth_unittest.py @@ -0,0 +1,207 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +from M2Crypto import BIO +from M2Crypto import RSA +from M2Crypto import X509 +import unittest + +from nova import crypto +from nova import flags +from nova import test +from nova.auth import manager +from nova.endpoint import cloud + +FLAGS = flags.FLAGS + + +class AuthTestCase(test.BaseTestCase): + flush_db = False + def setUp(self): + super(AuthTestCase, self).setUp() + self.flags(fake_libvirt=True, + fake_storage=True) + self.users = manager.AuthManager() + + def test_001_can_create_users(self): + self.users.create_user('test1', 'access', 'secret') + self.users.create_user('test2') + + def test_002_can_get_user(self): + user = self.users.get_user('test1') + + def test_003_can_retreive_properties(self): + user = self.users.get_user('test1') + self.assertEqual('test1', user.id) + self.assertEqual('access', user.access) + self.assertEqual('secret', user.secret) + + def test_004_signature_is_valid(self): + #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? )) + pass + #raise NotImplementedError + + def test_005_can_get_credentials(self): + return + credentials = self.users.get_user('test1').get_credentials() + self.assertEqual(credentials, + 'export EC2_ACCESS_KEY="access"\n' + + 'export EC2_SECRET_KEY="secret"\n' + + 'export EC2_URL="http://127.0.0.1:8773/services/Cloud"\n' + + 'export S3_URL="http://127.0.0.1:3333/"\n' + + 'export EC2_USER_ID="test1"\n') + + def test_006_test_key_storage(self): + user = self.users.get_user('test1') + user.create_key_pair('public', 'key', 'fingerprint') + key = user.get_key_pair('public') + self.assertEqual('key', key.public_key) + self.assertEqual('fingerprint', key.fingerprint) + + def test_007_test_key_generation(self): + user = self.users.get_user('test1') + private_key, fingerprint = user.generate_key_pair('public2') + key = RSA.load_key_string(private_key, callback=lambda: None) + bio = BIO.MemoryBuffer() + public_key = user.get_key_pair('public2').public_key + key.save_pub_key_bio(bio) + converted = crypto.ssl_pub_to_ssh_pub(bio.read()) + # assert key fields are equal + self.assertEqual(public_key.split(" ")[1].strip(), + converted.split(" ")[1].strip()) + + def test_008_can_list_key_pairs(self): + keys = self.users.get_user('test1').get_key_pairs() + self.assertTrue(filter(lambda k: k.name == 'public', keys)) + self.assertTrue(filter(lambda k: k.name == 'public2', keys)) + + def test_009_can_delete_key_pair(self): + self.users.get_user('test1').delete_key_pair('public') + keys = self.users.get_user('test1').get_key_pairs() + self.assertFalse(filter(lambda k: k.name == 'public', keys)) + + def test_010_can_list_users(self): + users = self.users.get_users() + logging.warn(users) + self.assertTrue(filter(lambda u: u.id == 'test1', users)) + + def test_101_can_add_user_role(self): + self.assertFalse(self.users.has_role('test1', 'itsec')) + self.users.add_role('test1', 'itsec') + self.assertTrue(self.users.has_role('test1', 'itsec')) + + def test_199_can_remove_user_role(self): + self.assertTrue(self.users.has_role('test1', 'itsec')) + self.users.remove_role('test1', 'itsec') + self.assertFalse(self.users.has_role('test1', 'itsec')) + + def test_201_can_create_project(self): + project = self.users.create_project('testproj', 'test1', 'A test project', ['test1']) + self.assertTrue(filter(lambda p: p.name == 'testproj', self.users.get_projects())) + self.assertEqual(project.name, 'testproj') + self.assertEqual(project.description, 'A test project') + self.assertEqual(project.project_manager_id, 'test1') + self.assertTrue(project.has_member('test1')) + + def test_202_user1_is_project_member(self): + self.assertTrue(self.users.get_user('test1').is_project_member('testproj')) + + def test_203_user2_is_not_project_member(self): + self.assertFalse(self.users.get_user('test2').is_project_member('testproj')) + + def test_204_user1_is_project_manager(self): + self.assertTrue(self.users.get_user('test1').is_project_manager('testproj')) + + def test_205_user2_is_not_project_manager(self): + self.assertFalse(self.users.get_user('test2').is_project_manager('testproj')) + + def test_206_can_add_user_to_project(self): + self.users.add_to_project('test2', 'testproj') + self.assertTrue(self.users.get_project('testproj').has_member('test2')) + + def test_208_can_remove_user_from_project(self): + self.users.remove_from_project('test2', 'testproj') + self.assertFalse(self.users.get_project('testproj').has_member('test2')) + + def test_209_can_generate_x509(self): + # MUST HAVE RUN CLOUD SETUP BY NOW + self.cloud = cloud.CloudController() + self.cloud.setup() + private_key, signed_cert_string = self.users.get_project('testproj').generate_x509_cert('test1') + logging.debug(signed_cert_string) + + # Need to verify that it's signed by the right intermediate CA + full_chain = crypto.fetch_ca(project_id='testproj', chain=True) + int_cert = crypto.fetch_ca(project_id='testproj', chain=False) + cloud_cert = crypto.fetch_ca() + logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain) + signed_cert = X509.load_cert_string(signed_cert_string) + chain_cert = X509.load_cert_string(full_chain) + int_cert = X509.load_cert_string(int_cert) + cloud_cert = X509.load_cert_string(cloud_cert) + self.assertTrue(signed_cert.verify(chain_cert.get_pubkey())) + self.assertTrue(signed_cert.verify(int_cert.get_pubkey())) + + if not FLAGS.use_intermediate_ca: + self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey())) + else: + self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey())) + + def test_210_can_add_project_role(self): + project = self.users.get_project('testproj') + self.assertFalse(project.has_role('test1', 'sysadmin')) + self.users.add_role('test1', 'sysadmin') + self.assertFalse(project.has_role('test1', 'sysadmin')) + project.add_role('test1', 'sysadmin') + self.assertTrue(project.has_role('test1', 'sysadmin')) + + def test_211_can_remove_project_role(self): + project = self.users.get_project('testproj') + self.assertTrue(project.has_role('test1', 'sysadmin')) + project.remove_role('test1', 'sysadmin') + self.assertFalse(project.has_role('test1', 'sysadmin')) + self.users.remove_role('test1', 'sysadmin') + self.assertFalse(project.has_role('test1', 'sysadmin')) + + def test_212_vpn_ip_and_port_looks_valid(self): + project = self.users.get_project('testproj') + self.assert_(project.vpn_ip) + self.assert_(project.vpn_port >= FLAGS.vpn_start_port) + self.assert_(project.vpn_port <= FLAGS.vpn_end_port) + + def test_213_too_many_vpns(self): + for i in xrange(users.Vpn.num_ports_for_ip(FLAGS.vpn_ip)): + users.Vpn.create("vpnuser%s" % i) + self.assertRaises(users.NoMorePorts, users.Vpn.create, "boom") + + def test_299_can_delete_project(self): + self.users.delete_project('testproj') + self.assertFalse(filter(lambda p: p.name == 'testproj', self.users.get_projects())) + + def test_999_can_delete_users(self): + self.users.delete_user('test1') + users = self.users.get_users() + self.assertFalse(filter(lambda u: u.id == 'test1', users)) + self.users.delete_user('test2') + self.assertEqual(self.users.get_user('test2'), None) + + +if __name__ == "__main__": + # TODO: Implement use_fake as an option + unittest.main() diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index b8614fdc8..3abef28a1 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -27,7 +27,7 @@ from xml.etree import ElementTree from nova import flags from nova import rpc from nova import test -from nova.auth import users +from nova.auth import manager from nova.compute import node from nova.endpoint import api from nova.endpoint import cloud @@ -61,15 +61,15 @@ class CloudTestCase(test.BaseTestCase): self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) try: - users.UserManager.instance().create_user('admin', 'admin', 'admin') + manager.AuthManager().create_user('admin', 'admin', 'admin') except: pass - admin = users.UserManager.instance().get_user('admin') - project = users.UserManager.instance().create_project('proj', 'admin', 'proj') + admin = manager.AuthManager().get_user('admin') + project = manager.AuthManager().create_project('proj', 'admin', 'proj') self.context = api.APIRequestContext(handler=None,project=project,user=admin) def tearDown(self): - users.UserManager.instance().delete_project('proj') - users.UserManager.instance().delete_user('admin') + manager.AuthManager().delete_project('proj') + manager.AuthManager().delete_user('admin') def test_console_output(self): if FLAGS.fake_libvirt: diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index a822cc1d9..fd0e64724 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -26,7 +26,7 @@ from nova import test from nova import exception from nova.compute.exception import NoMoreAddresses from nova.compute import network -from nova.auth import users +from nova.auth import manager from nova import utils @@ -38,7 +38,7 @@ class NetworkTestCase(test.TrialTestCase): fake_network=True, network_size=32) logging.getLogger().setLevel(logging.DEBUG) - self.manager = users.UserManager.instance() + self.manager = manager.AuthManager() self.dnsmasq = FakeDNSMasq() try: self.manager.create_user('netuser', 'netuser', 'netuser') diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py index f47ca7f00..85bcd7c67 100644 --- a/nova/tests/objectstore_unittest.py +++ b/nova/tests/objectstore_unittest.py @@ -26,7 +26,7 @@ import tempfile from nova import flags from nova import objectstore from nova import test -from nova.auth import users +from nova.auth import manager FLAGS = flags.FLAGS @@ -57,7 +57,7 @@ class ObjectStoreTestCase(test.BaseTestCase): ca_path=os.path.join(os.path.dirname(__file__), 'CA')) logging.getLogger().setLevel(logging.DEBUG) - self.um = users.UserManager.instance() + self.um = manager.AuthManager() try: self.um.create_user('user1') except: pass @@ -177,7 +177,7 @@ class ObjectStoreTestCase(test.BaseTestCase): # FLAGS.images_path = os.path.join(tempdir, 'images') # FLAGS.ca_path = os.path.join(os.path.dirname(__file__), 'CA') # -# self.users = users.UserManager.instance() +# self.users = manager.AuthManager() # self.app = handler.Application(self.users) # # self.host = '127.0.0.1' diff --git a/nova/tests/users_unittest.py b/nova/tests/users_unittest.py deleted file mode 100644 index 301721075..000000000 --- a/nova/tests/users_unittest.py +++ /dev/null @@ -1,207 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -from M2Crypto import BIO -from M2Crypto import RSA -from M2Crypto import X509 -import unittest - -from nova import crypto -from nova import flags -from nova import test -from nova.auth import users -from nova.endpoint import cloud - -FLAGS = flags.FLAGS - - -class UserTestCase(test.BaseTestCase): - flush_db = False - def setUp(self): - super(UserTestCase, self).setUp() - self.flags(fake_libvirt=True, - fake_storage=True) - self.users = users.UserManager.instance() - - def test_001_can_create_users(self): - self.users.create_user('test1', 'access', 'secret') - self.users.create_user('test2') - - def test_002_can_get_user(self): - user = self.users.get_user('test1') - - def test_003_can_retreive_properties(self): - user = self.users.get_user('test1') - self.assertEqual('test1', user.id) - self.assertEqual('access', user.access) - self.assertEqual('secret', user.secret) - - def test_004_signature_is_valid(self): - #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? )) - pass - #raise NotImplementedError - - def test_005_can_get_credentials(self): - return - credentials = self.users.get_user('test1').get_credentials() - self.assertEqual(credentials, - 'export EC2_ACCESS_KEY="access"\n' + - 'export EC2_SECRET_KEY="secret"\n' + - 'export EC2_URL="http://127.0.0.1:8773/services/Cloud"\n' + - 'export S3_URL="http://127.0.0.1:3333/"\n' + - 'export EC2_USER_ID="test1"\n') - - def test_006_test_key_storage(self): - user = self.users.get_user('test1') - user.create_key_pair('public', 'key', 'fingerprint') - key = user.get_key_pair('public') - self.assertEqual('key', key.public_key) - self.assertEqual('fingerprint', key.fingerprint) - - def test_007_test_key_generation(self): - user = self.users.get_user('test1') - private_key, fingerprint = user.generate_key_pair('public2') - key = RSA.load_key_string(private_key, callback=lambda: None) - bio = BIO.MemoryBuffer() - public_key = user.get_key_pair('public2').public_key - key.save_pub_key_bio(bio) - converted = crypto.ssl_pub_to_ssh_pub(bio.read()) - # assert key fields are equal - self.assertEqual(public_key.split(" ")[1].strip(), - converted.split(" ")[1].strip()) - - def test_008_can_list_key_pairs(self): - keys = self.users.get_user('test1').get_key_pairs() - self.assertTrue(filter(lambda k: k.name == 'public', keys)) - self.assertTrue(filter(lambda k: k.name == 'public2', keys)) - - def test_009_can_delete_key_pair(self): - self.users.get_user('test1').delete_key_pair('public') - keys = self.users.get_user('test1').get_key_pairs() - self.assertFalse(filter(lambda k: k.name == 'public', keys)) - - def test_010_can_list_users(self): - users = self.users.get_users() - logging.warn(users) - self.assertTrue(filter(lambda u: u.id == 'test1', users)) - - def test_101_can_add_user_role(self): - self.assertFalse(self.users.has_role('test1', 'itsec')) - self.users.add_role('test1', 'itsec') - self.assertTrue(self.users.has_role('test1', 'itsec')) - - def test_199_can_remove_user_role(self): - self.assertTrue(self.users.has_role('test1', 'itsec')) - self.users.remove_role('test1', 'itsec') - self.assertFalse(self.users.has_role('test1', 'itsec')) - - def test_201_can_create_project(self): - project = self.users.create_project('testproj', 'test1', 'A test project', ['test1']) - self.assertTrue(filter(lambda p: p.name == 'testproj', self.users.get_projects())) - self.assertEqual(project.name, 'testproj') - self.assertEqual(project.description, 'A test project') - self.assertEqual(project.project_manager_id, 'test1') - self.assertTrue(project.has_member('test1')) - - def test_202_user1_is_project_member(self): - self.assertTrue(self.users.get_user('test1').is_project_member('testproj')) - - def test_203_user2_is_not_project_member(self): - self.assertFalse(self.users.get_user('test2').is_project_member('testproj')) - - def test_204_user1_is_project_manager(self): - self.assertTrue(self.users.get_user('test1').is_project_manager('testproj')) - - def test_205_user2_is_not_project_manager(self): - self.assertFalse(self.users.get_user('test2').is_project_manager('testproj')) - - def test_206_can_add_user_to_project(self): - self.users.add_to_project('test2', 'testproj') - self.assertTrue(self.users.get_project('testproj').has_member('test2')) - - def test_208_can_remove_user_from_project(self): - self.users.remove_from_project('test2', 'testproj') - self.assertFalse(self.users.get_project('testproj').has_member('test2')) - - def test_209_can_generate_x509(self): - # MUST HAVE RUN CLOUD SETUP BY NOW - self.cloud = cloud.CloudController() - self.cloud.setup() - private_key, signed_cert_string = self.users.get_project('testproj').generate_x509_cert('test1') - logging.debug(signed_cert_string) - - # Need to verify that it's signed by the right intermediate CA - full_chain = crypto.fetch_ca(project_id='testproj', chain=True) - int_cert = crypto.fetch_ca(project_id='testproj', chain=False) - cloud_cert = crypto.fetch_ca() - logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain) - signed_cert = X509.load_cert_string(signed_cert_string) - chain_cert = X509.load_cert_string(full_chain) - int_cert = X509.load_cert_string(int_cert) - cloud_cert = X509.load_cert_string(cloud_cert) - self.assertTrue(signed_cert.verify(chain_cert.get_pubkey())) - self.assertTrue(signed_cert.verify(int_cert.get_pubkey())) - - if not FLAGS.use_intermediate_ca: - self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey())) - else: - self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey())) - - def test_210_can_add_project_role(self): - project = self.users.get_project('testproj') - self.assertFalse(project.has_role('test1', 'sysadmin')) - self.users.add_role('test1', 'sysadmin') - self.assertFalse(project.has_role('test1', 'sysadmin')) - project.add_role('test1', 'sysadmin') - self.assertTrue(project.has_role('test1', 'sysadmin')) - - def test_211_can_remove_project_role(self): - project = self.users.get_project('testproj') - self.assertTrue(project.has_role('test1', 'sysadmin')) - project.remove_role('test1', 'sysadmin') - self.assertFalse(project.has_role('test1', 'sysadmin')) - self.users.remove_role('test1', 'sysadmin') - self.assertFalse(project.has_role('test1', 'sysadmin')) - - def test_212_vpn_ip_and_port_looks_valid(self): - project = self.users.get_project('testproj') - self.assert_(project.vpn_ip) - self.assert_(project.vpn_port >= FLAGS.vpn_start_port) - self.assert_(project.vpn_port <= FLAGS.vpn_end_port) - - def test_213_too_many_vpns(self): - for i in xrange(users.Vpn.num_ports_for_ip(FLAGS.vpn_ip)): - users.Vpn.create("vpnuser%s" % i) - self.assertRaises(users.NoMorePorts, users.Vpn.create, "boom") - - def test_299_can_delete_project(self): - self.users.delete_project('testproj') - self.assertFalse(filter(lambda p: p.name == 'testproj', self.users.get_projects())) - - def test_999_can_delete_users(self): - self.users.delete_user('test1') - users = self.users.get_users() - self.assertFalse(filter(lambda u: u.id == 'test1', users)) - self.users.delete_user('test2') - self.assertEqual(self.users.get_user('test2'), None) - - -if __name__ == "__main__": - # TODO: Implement use_fake as an option - unittest.main() -- cgit From 0f9be756f44e831545bf5c31606e0419b61d6ddd Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Tue, 20 Jul 2010 09:15:36 -0500 Subject: Test cleanup, make driver return dictionaries and construct objects in manager --- nova/tests/api_unittest.py | 23 +++++----- nova/tests/auth_unittest.py | 95 ++++++++++++++++++++++-------------------- nova/tests/network_unittest.py | 79 ++++++++++++++++++----------------- 3 files changed, 100 insertions(+), 97 deletions(-) (limited to 'nova/tests') diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py index 5c26192bd..4477a1fe6 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/api_unittest.py @@ -150,7 +150,7 @@ class ApiEc2TestCase(test.BaseTestCase): def setUp(self): super(ApiEc2TestCase, self).setUp() - self.users = manager.AuthManager() + self.manager = manager.AuthManager() self.cloud = cloud.CloudController() self.host = '127.0.0.1' @@ -175,25 +175,22 @@ class ApiEc2TestCase(test.BaseTestCase): def test_describe_instances(self): self.expect_http() self.mox.ReplayAll() - try: - self.users.create_user('fake', 'fake', 'fake') - except Exception, _err: - pass # User may already exist + user = self.manager.create_user('fake', 'fake', 'fake') + project = self.manager.create_project('fake', 'fake', 'fake') self.assertEqual(self.ec2.get_all_instances(), []) - self.users.delete_user('fake') + self.manager.delete_project(project) + self.manager.delete_user(user) def test_get_all_key_pairs(self): self.expect_http() self.mox.ReplayAll() keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8))) - try: - self.users.create_user('fake', 'fake', 'fake') - except Exception, _err: - pass # User may already exist - self.users.generate_key_pair('fake', keyname) + user = self.manager.create_user('fake', 'fake', 'fake') + project = self.manager.create_project('fake', 'fake', 'fake') + self.manager.generate_key_pair(user.id, keyname) rv = self.ec2.get_all_key_pairs() self.assertTrue(filter(lambda k: k.name == keyname, rv)) - self.users.delete_user('fake') - + self.manager.delete_project(project) + self.manager.delete_user(user) diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py index 000f6bf17..0cd377b70 100644 --- a/nova/tests/auth_unittest.py +++ b/nova/tests/auth_unittest.py @@ -37,29 +37,29 @@ class AuthTestCase(test.BaseTestCase): super(AuthTestCase, self).setUp() self.flags(fake_libvirt=True, fake_storage=True) - self.users = manager.AuthManager() + self.manager = manager.AuthManager() def test_001_can_create_users(self): - self.users.create_user('test1', 'access', 'secret') - self.users.create_user('test2') + self.manager.create_user('test1', 'access', 'secret') + self.manager.create_user('test2') def test_002_can_get_user(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') def test_003_can_retreive_properties(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') self.assertEqual('test1', user.id) self.assertEqual('access', user.access) self.assertEqual('secret', user.secret) def test_004_signature_is_valid(self): - #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? )) + #self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? )) pass #raise NotImplementedError def test_005_can_get_credentials(self): return - credentials = self.users.get_user('test1').get_credentials() + credentials = self.manager.get_user('test1').get_credentials() self.assertEqual(credentials, 'export EC2_ACCESS_KEY="access"\n' + 'export EC2_SECRET_KEY="secret"\n' + @@ -68,14 +68,14 @@ class AuthTestCase(test.BaseTestCase): 'export EC2_USER_ID="test1"\n') def test_006_test_key_storage(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') user.create_key_pair('public', 'key', 'fingerprint') key = user.get_key_pair('public') self.assertEqual('key', key.public_key) self.assertEqual('fingerprint', key.fingerprint) def test_007_test_key_generation(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') private_key, fingerprint = user.generate_key_pair('public2') key = RSA.load_key_string(private_key, callback=lambda: None) bio = BIO.MemoryBuffer() @@ -87,71 +87,71 @@ class AuthTestCase(test.BaseTestCase): converted.split(" ")[1].strip()) def test_008_can_list_key_pairs(self): - keys = self.users.get_user('test1').get_key_pairs() + keys = self.manager.get_user('test1').get_key_pairs() self.assertTrue(filter(lambda k: k.name == 'public', keys)) self.assertTrue(filter(lambda k: k.name == 'public2', keys)) def test_009_can_delete_key_pair(self): - self.users.get_user('test1').delete_key_pair('public') - keys = self.users.get_user('test1').get_key_pairs() + self.manager.get_user('test1').delete_key_pair('public') + keys = self.manager.get_user('test1').get_key_pairs() self.assertFalse(filter(lambda k: k.name == 'public', keys)) def test_010_can_list_users(self): - users = self.users.get_users() + users = self.manager.get_users() logging.warn(users) self.assertTrue(filter(lambda u: u.id == 'test1', users)) def test_101_can_add_user_role(self): - self.assertFalse(self.users.has_role('test1', 'itsec')) - self.users.add_role('test1', 'itsec') - self.assertTrue(self.users.has_role('test1', 'itsec')) + self.assertFalse(self.manager.has_role('test1', 'itsec')) + self.manager.add_role('test1', 'itsec') + self.assertTrue(self.manager.has_role('test1', 'itsec')) def test_199_can_remove_user_role(self): - self.assertTrue(self.users.has_role('test1', 'itsec')) - self.users.remove_role('test1', 'itsec') - self.assertFalse(self.users.has_role('test1', 'itsec')) + self.assertTrue(self.manager.has_role('test1', 'itsec')) + self.manager.remove_role('test1', 'itsec') + self.assertFalse(self.manager.has_role('test1', 'itsec')) def test_201_can_create_project(self): - project = self.users.create_project('testproj', 'test1', 'A test project', ['test1']) - self.assertTrue(filter(lambda p: p.name == 'testproj', self.users.get_projects())) + project = self.manager.create_project('testproj', 'test1', 'A test project', ['test1']) + self.assertTrue(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) self.assertEqual(project.name, 'testproj') self.assertEqual(project.description, 'A test project') self.assertEqual(project.project_manager_id, 'test1') self.assertTrue(project.has_member('test1')) def test_202_user1_is_project_member(self): - self.assertTrue(self.users.get_user('test1').is_project_member('testproj')) + self.assertTrue(self.manager.get_user('test1').is_project_member('testproj')) def test_203_user2_is_not_project_member(self): - self.assertFalse(self.users.get_user('test2').is_project_member('testproj')) + self.assertFalse(self.manager.get_user('test2').is_project_member('testproj')) def test_204_user1_is_project_manager(self): - self.assertTrue(self.users.get_user('test1').is_project_manager('testproj')) + self.assertTrue(self.manager.get_user('test1').is_project_manager('testproj')) def test_205_user2_is_not_project_manager(self): - self.assertFalse(self.users.get_user('test2').is_project_manager('testproj')) + self.assertFalse(self.manager.get_user('test2').is_project_manager('testproj')) def test_206_can_add_user_to_project(self): - self.users.add_to_project('test2', 'testproj') - self.assertTrue(self.users.get_project('testproj').has_member('test2')) + self.manager.add_to_project('test2', 'testproj') + self.assertTrue(self.manager.get_project('testproj').has_member('test2')) def test_208_can_remove_user_from_project(self): - self.users.remove_from_project('test2', 'testproj') - self.assertFalse(self.users.get_project('testproj').has_member('test2')) + self.manager.remove_from_project('test2', 'testproj') + self.assertFalse(self.manager.get_project('testproj').has_member('test2')) def test_209_can_generate_x509(self): # MUST HAVE RUN CLOUD SETUP BY NOW self.cloud = cloud.CloudController() self.cloud.setup() - private_key, signed_cert_string = self.users.get_project('testproj').generate_x509_cert('test1') - logging.debug(signed_cert_string) + _key, cert_str = self.manager._generate_x509_cert('test1', 'testproj') + logging.debug(cert_str) # Need to verify that it's signed by the right intermediate CA full_chain = crypto.fetch_ca(project_id='testproj', chain=True) int_cert = crypto.fetch_ca(project_id='testproj', chain=False) cloud_cert = crypto.fetch_ca() logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain) - signed_cert = X509.load_cert_string(signed_cert_string) + signed_cert = X509.load_cert_string(cert_str) chain_cert = X509.load_cert_string(full_chain) int_cert = X509.load_cert_string(int_cert) cloud_cert = X509.load_cert_string(cloud_cert) @@ -164,42 +164,45 @@ class AuthTestCase(test.BaseTestCase): self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey())) def test_210_can_add_project_role(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assertFalse(project.has_role('test1', 'sysadmin')) - self.users.add_role('test1', 'sysadmin') + self.manager.add_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) project.add_role('test1', 'sysadmin') self.assertTrue(project.has_role('test1', 'sysadmin')) def test_211_can_remove_project_role(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assertTrue(project.has_role('test1', 'sysadmin')) project.remove_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) - self.users.remove_role('test1', 'sysadmin') + self.manager.remove_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) def test_212_vpn_ip_and_port_looks_valid(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assert_(project.vpn_ip) self.assert_(project.vpn_port >= FLAGS.vpn_start_port) self.assert_(project.vpn_port <= FLAGS.vpn_end_port) def test_213_too_many_vpns(self): - for i in xrange(users.Vpn.num_ports_for_ip(FLAGS.vpn_ip)): - users.Vpn.create("vpnuser%s" % i) - self.assertRaises(users.NoMorePorts, users.Vpn.create, "boom") + vpns = [] + for i in xrange(manager.Vpn.num_ports_for_ip(FLAGS.vpn_ip)): + vpns.append(manager.Vpn.create("vpnuser%s" % i)) + self.assertRaises(manager.NoMorePorts, manager.Vpn.create, "boom") + for vpn in vpns: + vpn.destroy() def test_299_can_delete_project(self): - self.users.delete_project('testproj') - self.assertFalse(filter(lambda p: p.name == 'testproj', self.users.get_projects())) + self.manager.delete_project('testproj') + self.assertFalse(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) def test_999_can_delete_users(self): - self.users.delete_user('test1') - users = self.users.get_users() + self.manager.delete_user('test1') + users = self.manager.get_users() self.assertFalse(filter(lambda u: u.id == 'test1', users)) - self.users.delete_user('test2') - self.assertEqual(self.users.get_user('test2'), None) + self.manager.delete_user('test2') + self.assertEqual(self.manager.get_user('test2'), None) if __name__ == "__main__": diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index 68cd488be..0e1b55065 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -39,59 +39,61 @@ class NetworkTestCase(test.TrialTestCase): logging.getLogger().setLevel(logging.DEBUG) self.manager = manager.AuthManager() self.dnsmasq = FakeDNSMasq() - try: - self.manager.create_user('netuser', 'netuser', 'netuser') - except: pass + self.user = self.manager.create_user('netuser', 'netuser', 'netuser') + self.projects = [] + self.projects.append(self.manager.create_project('netuser', + 'netuser', + 'netuser')) for i in range(0, 6): name = 'project%s' % i - if not self.manager.get_project(name): - self.manager.create_project(name, 'netuser', name) + self.projects.append(self.manager.create_project(name, + 'netuser', + name)) self.network = network.PublicNetworkController() def tearDown(self): super(NetworkTestCase, self).tearDown() - for i in range(0, 6): - name = 'project%s' % i - self.manager.delete_project(name) - self.manager.delete_user('netuser') + for project in self.projects: + self.manager.delete_project(project) + self.manager.delete_user(self.user) def test_public_network_allocation(self): pubnet = IPy.IP(flags.FLAGS.public_range) - address = self.network.allocate_ip("netuser", "project0", "public") + address = self.network.allocate_ip(self.user.id, self.projects[0].id, "public") self.assertTrue(IPy.IP(address) in pubnet) self.assertTrue(IPy.IP(address) in self.network.network) def test_allocate_deallocate_ip(self): address = network.allocate_ip( - "netuser", "project0", utils.generate_mac()) + self.user.id, self.projects[0].id, utils.generate_mac()) logging.debug("Was allocated %s" % (address)) - net = network.get_project_network("project0", "default") - self.assertEqual(True, is_in_project(address, "project0")) + net = network.get_project_network(self.projects[0].id, "default") + self.assertEqual(True, is_in_project(address, self.projects[0].id)) mac = utils.generate_mac() hostname = "test-host" self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name) rv = network.deallocate_ip(address) # Doesn't go away until it's dhcp released - self.assertEqual(True, is_in_project(address, "project0")) + self.assertEqual(True, is_in_project(address, self.projects[0].id)) self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) - self.assertEqual(False, is_in_project(address, "project0")) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) def test_range_allocation(self): mac = utils.generate_mac() secondmac = utils.generate_mac() hostname = "test-host" address = network.allocate_ip( - "netuser", "project0", mac) + self.user.id, self.projects[0].id, mac) secondaddress = network.allocate_ip( - "netuser", "project1", secondmac) - net = network.get_project_network("project0", "default") - secondnet = network.get_project_network("project1", "default") + self.user, "project1", secondmac) + net = network.get_project_network(self.projects[0].id, "default") + secondnet = network.get_project_network(self.projects[1].id, "default") - self.assertEqual(True, is_in_project(address, "project0")) - self.assertEqual(True, is_in_project(secondaddress, "project1")) - self.assertEqual(False, is_in_project(address, "project1")) + self.assertEqual(True, is_in_project(address, self.projects[0].id)) + self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id)) + self.assertEqual(False, is_in_project(address, self.projects[1].id)) # Addresses are allocated before they're issued self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name) @@ -100,34 +102,34 @@ class NetworkTestCase(test.TrialTestCase): rv = network.deallocate_ip(address) self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) - self.assertEqual(False, is_in_project(address, "project0")) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) # First address release shouldn't affect the second - self.assertEqual(True, is_in_project(secondaddress, "project1")) + self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id)) rv = network.deallocate_ip(secondaddress) self.dnsmasq.release_ip(secondmac, secondaddress, hostname, secondnet.bridge_name) - self.assertEqual(False, is_in_project(secondaddress, "project1")) + self.assertEqual(False, is_in_project(secondaddress, self.projects[1].id)) def test_subnet_edge(self): - secondaddress = network.allocate_ip("netuser", "project0", + secondaddress = network.allocate_ip(self.user.id, self.projects[0].id, utils.generate_mac()) hostname = "toomany-hosts" - for project in range(1,5): - project_id = "project%s" % (project) + for i in range(1,5): + project_id = self.projects[i].id mac = utils.generate_mac() mac2 = utils.generate_mac() mac3 = utils.generate_mac() address = network.allocate_ip( - "netuser", project_id, mac) + self.user, project_id, mac) address2 = network.allocate_ip( - "netuser", project_id, mac2) + self.user, project_id, mac2) address3 = network.allocate_ip( - "netuser", project_id, mac3) - self.assertEqual(False, is_in_project(address, "project0")) - self.assertEqual(False, is_in_project(address2, "project0")) - self.assertEqual(False, is_in_project(address3, "project0")) + self.user, project_id, mac3) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) + self.assertEqual(False, is_in_project(address2, self.projects[0].id)) + self.assertEqual(False, is_in_project(address3, self.projects[0].id)) rv = network.deallocate_ip(address) rv = network.deallocate_ip(address2) rv = network.deallocate_ip(address3) @@ -135,7 +137,7 @@ class NetworkTestCase(test.TrialTestCase): self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) self.dnsmasq.release_ip(mac2, address2, hostname, net.bridge_name) self.dnsmasq.release_ip(mac3, address3, hostname, net.bridge_name) - net = network.get_project_network("project0", "default") + net = network.get_project_network(self.projects[0].id, "default") rv = network.deallocate_ip(secondaddress) self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) @@ -150,22 +152,23 @@ class NetworkTestCase(test.TrialTestCase): Network size is 32, there are 5 addresses reserved for VPN. So we should get 23 usable addresses """ - net = network.get_project_network("project0", "default") + net = network.get_project_network(self.projects[0].id, "default") hostname = "toomany-hosts" macs = {} addresses = {} for i in range(0, 22): macs[i] = utils.generate_mac() - addresses[i] = network.allocate_ip("netuser", "project0", macs[i]) + addresses[i] = network.allocate_ip(self.user.id, self.projects[0].id, macs[i]) self.dnsmasq.issue_ip(macs[i], addresses[i], hostname, net.bridge_name) - self.assertRaises(NoMoreAddresses, network.allocate_ip, "netuser", "project0", utils.generate_mac()) + self.assertRaises(NoMoreAddresses, network.allocate_ip, self.user.id, self.projects[0].id, utils.generate_mac()) for i in range(0, 22): rv = network.deallocate_ip(addresses[i]) self.dnsmasq.release_ip(macs[i], addresses[i], hostname, net.bridge_name) def is_in_project(address, project_id): + print address, list(network.get_project_network(project_id).list_addresses()) return address in network.get_project_network(project_id).list_addresses() def _get_project_addresses(project_id): -- cgit From 79b5ab9a9e18fdee3d65311b6ff16cc39d7d2513 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Tue, 20 Jul 2010 09:22:53 -0500 Subject: network unittest clean up --- nova/tests/network_unittest.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'nova/tests') diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index 0e1b55065..237750d7f 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -87,7 +87,7 @@ class NetworkTestCase(test.TrialTestCase): address = network.allocate_ip( self.user.id, self.projects[0].id, mac) secondaddress = network.allocate_ip( - self.user, "project1", secondmac) + self.user, self.projects[1].id, secondmac) net = network.get_project_network(self.projects[0].id, "default") secondnet = network.get_project_network(self.projects[1].id, "default") @@ -168,7 +168,6 @@ class NetworkTestCase(test.TrialTestCase): self.dnsmasq.release_ip(macs[i], addresses[i], hostname, net.bridge_name) def is_in_project(address, project_id): - print address, list(network.get_project_network(project_id).list_addresses()) return address in network.get_project_network(project_id).list_addresses() def _get_project_addresses(project_id): -- cgit From 898102508a1c2a1087b3ffce36b3fb890f5d3775 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 21 Jul 2010 19:56:08 -0500 Subject: refactoring of imports for fakeldapdriver --- nova/tests/cloud_unittest.py | 3 +-- nova/tests/fake_flags.py | 2 +- nova/tests/model_unittest.py | 3 +-- nova/tests/node_unittest.py | 3 +-- nova/tests/objectstore_unittest.py | 3 +-- nova/tests/real_flags.py | 1 - 6 files changed, 5 insertions(+), 10 deletions(-) (limited to 'nova/tests') diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 3abef28a1..741973201 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -40,8 +40,7 @@ class CloudTestCase(test.BaseTestCase): def setUp(self): super(CloudTestCase, self).setUp() self.flags(fake_libvirt=True, - fake_storage=True, - fake_users=True) + fake_storage=True) self.conn = rpc.Connection.instance() logging.getLogger().setLevel(logging.DEBUG) diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py index d32f40d8f..57575b44b 100644 --- a/nova/tests/fake_flags.py +++ b/nova/tests/fake_flags.py @@ -24,5 +24,5 @@ FLAGS.fake_libvirt = True FLAGS.fake_storage = True FLAGS.fake_rabbit = True FLAGS.fake_network = True -FLAGS.fake_users = True +FLAGS.auth_driver = 'nova.auth.fakeldapdriver' FLAGS.verbose = True diff --git a/nova/tests/model_unittest.py b/nova/tests/model_unittest.py index 1bd7e527f..1b94e5798 100644 --- a/nova/tests/model_unittest.py +++ b/nova/tests/model_unittest.py @@ -35,8 +35,7 @@ class ModelTestCase(test.TrialTestCase): def setUp(self): super(ModelTestCase, self).setUp() self.flags(fake_libvirt=True, - fake_storage=True, - fake_users=True) + fake_storage=True) def tearDown(self): model.Instance('i-test').destroy() diff --git a/nova/tests/node_unittest.py b/nova/tests/node_unittest.py index 93942d79e..55c957696 100644 --- a/nova/tests/node_unittest.py +++ b/nova/tests/node_unittest.py @@ -58,8 +58,7 @@ class NodeConnectionTestCase(test.TrialTestCase): logging.getLogger().setLevel(logging.DEBUG) super(NodeConnectionTestCase, self).setUp() self.flags(fake_libvirt=True, - fake_storage=True, - fake_users=True) + fake_storage=True) self.node = node.Node() def create_instance(self): diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py index 85bcd7c67..1703adb62 100644 --- a/nova/tests/objectstore_unittest.py +++ b/nova/tests/objectstore_unittest.py @@ -51,8 +51,7 @@ os.makedirs(os.path.join(oss_tempdir, 'buckets')) class ObjectStoreTestCase(test.BaseTestCase): def setUp(self): super(ObjectStoreTestCase, self).setUp() - self.flags(fake_users=True, - buckets_path=os.path.join(oss_tempdir, 'buckets'), + self.flags(buckets_path=os.path.join(oss_tempdir, 'buckets'), images_path=os.path.join(oss_tempdir, 'images'), ca_path=os.path.join(os.path.dirname(__file__), 'CA')) logging.getLogger().setLevel(logging.DEBUG) diff --git a/nova/tests/real_flags.py b/nova/tests/real_flags.py index 9e106f227..f054a8f19 100644 --- a/nova/tests/real_flags.py +++ b/nova/tests/real_flags.py @@ -24,5 +24,4 @@ FLAGS.fake_libvirt = False FLAGS.fake_storage = False FLAGS.fake_rabbit = False FLAGS.fake_network = False -FLAGS.fake_users = False FLAGS.verbose = False -- cgit From f7962c73aa9835c76857005ab56f512fbc9eebfd Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Sun, 25 Jul 2010 11:20:09 -0700 Subject: More Cleanup of code Moved code in AuthManager init to new so it isn't called multiple times Changed AuthManager flag to specify class name as well as module name Added exception for missing auth_driver Changed import to use "recommended" style for nested imports http://docs.python.org/dev/library/functions.html#__import__ --- nova/tests/fake_flags.py | 2 +- nova/tests/network_unittest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'nova/tests') diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py index 57575b44b..304f24841 100644 --- a/nova/tests/fake_flags.py +++ b/nova/tests/fake_flags.py @@ -24,5 +24,5 @@ FLAGS.fake_libvirt = True FLAGS.fake_storage = True FLAGS.fake_rabbit = True FLAGS.fake_network = True -FLAGS.auth_driver = 'nova.auth.fakeldapdriver' +FLAGS.auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver' FLAGS.verbose = True diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index 12840e736..9e17bf155 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -37,7 +37,7 @@ class NetworkTestCase(test.TrialTestCase): self.flags(fake_libvirt=True, fake_storage=True, fake_network=True, - auth_driver='nova.auth.fakeldapdriver', + auth_driver='nova.auth.ldapdriver.FakeLdapDriver', network_size=32) logging.getLogger().setLevel(logging.DEBUG) self.manager = manager.AuthManager() -- cgit From a2cf8a6f6038062cf343322acdbde66456b73dfb Mon Sep 17 00:00:00 2001 From: "jaypipes@gmail.com" <> Date: Mon, 26 Jul 2010 23:28:59 -0400 Subject: Fixes bug#610140. Thanks to Vish and Muharem for the patch --- nova/tests/api_unittest.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'nova/tests') diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py index e5e2afe26..45ae50b2e 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/api_unittest.py @@ -43,7 +43,11 @@ def boto_to_tornado(method, path, headers, data, host, connection=None): connection should be a FakeTornadoHttpConnection instance """ - headers = httpserver.HTTPHeaders() + try: + headers = httpserver.HTTPHeaders() + except AttributeError: + from tornado import httputil + headers = httputil.HTTPHeaders() for k, v in headers.iteritems(): headers[k] = v -- cgit