summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorEwan Mellor <ewan.mellor@citrix.com>2010-07-29 00:11:02 +0100
committerEwan Mellor <ewan.mellor@citrix.com>2010-07-29 00:11:02 +0100
commit9f4996e8738991a95a23cba2caa660f7002f94cd (patch)
treee40bc2e56ea96131200d5e12918bf5a429bb5cc1 /nova/tests
parentc5edaa2186add12947185cb1fd47e0a48eccafa9 (diff)
parentf61b62983f62aa10d7bed3bc1c406717663be923 (diff)
Merge with trunk, including fixing up conflicts with the removal of fake_users
and the reworking of node.py -> service.py.
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/access_unittest.py6
-rw-r--r--nova/tests/api_unittest.py31
-rw-r--r--nova/tests/auth_unittest.py (renamed from nova/tests/users_unittest.py)101
-rw-r--r--nova/tests/cloud_unittest.py33
-rw-r--r--nova/tests/compute_unittest.py (renamed from nova/tests/node_unittest.py)43
-rw-r--r--nova/tests/fake_flags.py2
-rw-r--r--nova/tests/future_unittest.py75
-rw-r--r--nova/tests/model_unittest.py99
-rw-r--r--nova/tests/network_unittest.py95
-rw-r--r--nova/tests/objectstore_unittest.py184
-rw-r--r--nova/tests/real_flags.py1
-rw-r--r--nova/tests/volume_unittest.py115
12 files changed, 495 insertions, 290 deletions
diff --git a/nova/tests/access_unittest.py b/nova/tests/access_unittest.py
index 6cf7e893d..fa0a090a0 100644
--- a/nova/tests/access_unittest.py
+++ b/nova/tests/access_unittest.py
@@ -22,7 +22,7 @@ import logging
from nova import exception
from nova import flags
from nova import test
-from nova.auth.users import UserManager
+from nova.auth import manager
from nova.auth import rbac
@@ -35,7 +35,7 @@ class AccessTestCase(test.BaseTestCase):
super(AccessTestCase, self).setUp()
FLAGS.connection_type = 'fake'
FLAGS.fake_storage = True
- um = UserManager.instance()
+ um = manager.AuthManager()
# Make test users
try:
self.testadmin = um.create_user('testadmin')
@@ -79,7 +79,7 @@ class AccessTestCase(test.BaseTestCase):
#user is set in each test
def tearDown(self):
- um = UserManager.instance()
+ um = manager.AuthManager()
# Delete the test project
um.delete_project('testproj')
# Delete the test user
diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py
index e5e2afe26..9d072866c 100644
--- a/nova/tests/api_unittest.py
+++ b/nova/tests/api_unittest.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
from nova import flags
from nova import test
-from nova.auth import users
+from nova.auth import manager
from nova.endpoint import api
from nova.endpoint import cloud
@@ -43,7 +43,11 @@ def boto_to_tornado(method, path, headers, data, host, connection=None):
connection should be a FakeTornadoHttpConnection instance
"""
- headers = httpserver.HTTPHeaders()
+ try:
+ headers = httpserver.HTTPHeaders()
+ except AttributeError:
+ from tornado import httputil
+ headers = httputil.HTTPHeaders()
for k, v in headers.iteritems():
headers[k] = v
@@ -150,7 +154,7 @@ class ApiEc2TestCase(test.BaseTestCase):
def setUp(self):
super(ApiEc2TestCase, self).setUp()
- self.users = users.UserManager.instance()
+ self.manager = manager.AuthManager()
self.cloud = cloud.CloudController()
self.host = '127.0.0.1'
@@ -175,25 +179,22 @@ class ApiEc2TestCase(test.BaseTestCase):
def test_describe_instances(self):
self.expect_http()
self.mox.ReplayAll()
- try:
- self.users.create_user('fake', 'fake', 'fake')
- except Exception, _err:
- pass # User may already exist
+ user = self.manager.create_user('fake', 'fake', 'fake')
+ project = self.manager.create_project('fake', 'fake', 'fake')
self.assertEqual(self.ec2.get_all_instances(), [])
- self.users.delete_user('fake')
+ self.manager.delete_project(project)
+ self.manager.delete_user(user)
def test_get_all_key_pairs(self):
self.expect_http()
self.mox.ReplayAll()
keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8)))
- try:
- self.users.create_user('fake', 'fake', 'fake')
- except Exception, _err:
- pass # User may already exist
- self.users.generate_key_pair('fake', keyname)
+ user = self.manager.create_user('fake', 'fake', 'fake')
+ project = self.manager.create_project('fake', 'fake', 'fake')
+ self.manager.generate_key_pair(user.id, keyname)
rv = self.ec2.get_all_key_pairs()
self.assertTrue(filter(lambda k: k.name == keyname, rv))
- self.users.delete_user('fake')
-
+ self.manager.delete_project(project)
+ self.manager.delete_user(user)
diff --git a/nova/tests/users_unittest.py b/nova/tests/auth_unittest.py
index 824d5cff6..073ff71d2 100644
--- a/nova/tests/users_unittest.py
+++ b/nova/tests/auth_unittest.py
@@ -25,41 +25,41 @@ import unittest
from nova import crypto
from nova import flags
from nova import test
-from nova.auth import users
+from nova.auth import manager
from nova.endpoint import cloud
FLAGS = flags.FLAGS
-class UserTestCase(test.BaseTestCase):
+class AuthTestCase(test.BaseTestCase):
flush_db = False
def setUp(self):
- super(UserTestCase, self).setUp()
+ super(AuthTestCase, self).setUp()
self.flags(connection_type='fake',
fake_storage=True)
- self.users = users.UserManager.instance()
+ self.manager = manager.AuthManager()
def test_001_can_create_users(self):
- self.users.create_user('test1', 'access', 'secret')
- self.users.create_user('test2')
+ self.manager.create_user('test1', 'access', 'secret')
+ self.manager.create_user('test2')
def test_002_can_get_user(self):
- user = self.users.get_user('test1')
+ user = self.manager.get_user('test1')
def test_003_can_retreive_properties(self):
- user = self.users.get_user('test1')
+ user = self.manager.get_user('test1')
self.assertEqual('test1', user.id)
self.assertEqual('access', user.access)
self.assertEqual('secret', user.secret)
def test_004_signature_is_valid(self):
- #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? ))
+ #self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? ))
pass
#raise NotImplementedError
def test_005_can_get_credentials(self):
return
- credentials = self.users.get_user('test1').get_credentials()
+ credentials = self.manager.get_user('test1').get_credentials()
self.assertEqual(credentials,
'export EC2_ACCESS_KEY="access"\n' +
'export EC2_SECRET_KEY="secret"\n' +
@@ -68,14 +68,14 @@ class UserTestCase(test.BaseTestCase):
'export EC2_USER_ID="test1"\n')
def test_006_test_key_storage(self):
- user = self.users.get_user('test1')
+ user = self.manager.get_user('test1')
user.create_key_pair('public', 'key', 'fingerprint')
key = user.get_key_pair('public')
self.assertEqual('key', key.public_key)
self.assertEqual('fingerprint', key.fingerprint)
def test_007_test_key_generation(self):
- user = self.users.get_user('test1')
+ user = self.manager.get_user('test1')
private_key, fingerprint = user.generate_key_pair('public2')
key = RSA.load_key_string(private_key, callback=lambda: None)
bio = BIO.MemoryBuffer()
@@ -87,71 +87,71 @@ class UserTestCase(test.BaseTestCase):
converted.split(" ")[1].strip())
def test_008_can_list_key_pairs(self):
- keys = self.users.get_user('test1').get_key_pairs()
+ keys = self.manager.get_user('test1').get_key_pairs()
self.assertTrue(filter(lambda k: k.name == 'public', keys))
self.assertTrue(filter(lambda k: k.name == 'public2', keys))
def test_009_can_delete_key_pair(self):
- self.users.get_user('test1').delete_key_pair('public')
- keys = self.users.get_user('test1').get_key_pairs()
+ self.manager.get_user('test1').delete_key_pair('public')
+ keys = self.manager.get_user('test1').get_key_pairs()
self.assertFalse(filter(lambda k: k.name == 'public', keys))
def test_010_can_list_users(self):
- users = self.users.get_users()
+ users = self.manager.get_users()
logging.warn(users)
self.assertTrue(filter(lambda u: u.id == 'test1', users))
def test_101_can_add_user_role(self):
- self.assertFalse(self.users.has_role('test1', 'itsec'))
- self.users.add_role('test1', 'itsec')
- self.assertTrue(self.users.has_role('test1', 'itsec'))
+ self.assertFalse(self.manager.has_role('test1', 'itsec'))
+ self.manager.add_role('test1', 'itsec')
+ self.assertTrue(self.manager.has_role('test1', 'itsec'))
def test_199_can_remove_user_role(self):
- self.assertTrue(self.users.has_role('test1', 'itsec'))
- self.users.remove_role('test1', 'itsec')
- self.assertFalse(self.users.has_role('test1', 'itsec'))
+ self.assertTrue(self.manager.has_role('test1', 'itsec'))
+ self.manager.remove_role('test1', 'itsec')
+ self.assertFalse(self.manager.has_role('test1', 'itsec'))
def test_201_can_create_project(self):
- project = self.users.create_project('testproj', 'test1', 'A test project', ['test1'])
- self.assertTrue(filter(lambda p: p.name == 'testproj', self.users.get_projects()))
+ project = self.manager.create_project('testproj', 'test1', 'A test project', ['test1'])
+ self.assertTrue(filter(lambda p: p.name == 'testproj', self.manager.get_projects()))
self.assertEqual(project.name, 'testproj')
self.assertEqual(project.description, 'A test project')
self.assertEqual(project.project_manager_id, 'test1')
self.assertTrue(project.has_member('test1'))
def test_202_user1_is_project_member(self):
- self.assertTrue(self.users.get_user('test1').is_project_member('testproj'))
+ self.assertTrue(self.manager.get_user('test1').is_project_member('testproj'))
def test_203_user2_is_not_project_member(self):
- self.assertFalse(self.users.get_user('test2').is_project_member('testproj'))
+ self.assertFalse(self.manager.get_user('test2').is_project_member('testproj'))
def test_204_user1_is_project_manager(self):
- self.assertTrue(self.users.get_user('test1').is_project_manager('testproj'))
+ self.assertTrue(self.manager.get_user('test1').is_project_manager('testproj'))
def test_205_user2_is_not_project_manager(self):
- self.assertFalse(self.users.get_user('test2').is_project_manager('testproj'))
+ self.assertFalse(self.manager.get_user('test2').is_project_manager('testproj'))
def test_206_can_add_user_to_project(self):
- self.users.add_to_project('test2', 'testproj')
- self.assertTrue(self.users.get_project('testproj').has_member('test2'))
+ self.manager.add_to_project('test2', 'testproj')
+ self.assertTrue(self.manager.get_project('testproj').has_member('test2'))
def test_208_can_remove_user_from_project(self):
- self.users.remove_from_project('test2', 'testproj')
- self.assertFalse(self.users.get_project('testproj').has_member('test2'))
+ self.manager.remove_from_project('test2', 'testproj')
+ self.assertFalse(self.manager.get_project('testproj').has_member('test2'))
def test_209_can_generate_x509(self):
# MUST HAVE RUN CLOUD SETUP BY NOW
self.cloud = cloud.CloudController()
self.cloud.setup()
- private_key, signed_cert_string = self.users.get_project('testproj').generate_x509_cert('test1')
- logging.debug(signed_cert_string)
+ _key, cert_str = self.manager._generate_x509_cert('test1', 'testproj')
+ logging.debug(cert_str)
# Need to verify that it's signed by the right intermediate CA
full_chain = crypto.fetch_ca(project_id='testproj', chain=True)
int_cert = crypto.fetch_ca(project_id='testproj', chain=False)
cloud_cert = crypto.fetch_ca()
logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
- signed_cert = X509.load_cert_string(signed_cert_string)
+ signed_cert = X509.load_cert_string(cert_str)
chain_cert = X509.load_cert_string(full_chain)
int_cert = X509.load_cert_string(int_cert)
cloud_cert = X509.load_cert_string(cloud_cert)
@@ -164,42 +164,45 @@ class UserTestCase(test.BaseTestCase):
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
def test_210_can_add_project_role(self):
- project = self.users.get_project('testproj')
+ project = self.manager.get_project('testproj')
self.assertFalse(project.has_role('test1', 'sysadmin'))
- self.users.add_role('test1', 'sysadmin')
+ self.manager.add_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
project.add_role('test1', 'sysadmin')
self.assertTrue(project.has_role('test1', 'sysadmin'))
def test_211_can_remove_project_role(self):
- project = self.users.get_project('testproj')
+ project = self.manager.get_project('testproj')
self.assertTrue(project.has_role('test1', 'sysadmin'))
project.remove_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
- self.users.remove_role('test1', 'sysadmin')
+ self.manager.remove_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
def test_212_vpn_ip_and_port_looks_valid(self):
- project = self.users.get_project('testproj')
+ project = self.manager.get_project('testproj')
self.assert_(project.vpn_ip)
self.assert_(project.vpn_port >= FLAGS.vpn_start_port)
self.assert_(project.vpn_port <= FLAGS.vpn_end_port)
def test_213_too_many_vpns(self):
- for i in xrange(users.Vpn.num_ports_for_ip(FLAGS.vpn_ip)):
- users.Vpn.create("vpnuser%s" % i)
- self.assertRaises(users.NoMorePorts, users.Vpn.create, "boom")
+ vpns = []
+ for i in xrange(manager.Vpn.num_ports_for_ip(FLAGS.vpn_ip)):
+ vpns.append(manager.Vpn.create("vpnuser%s" % i))
+ self.assertRaises(manager.NoMorePorts, manager.Vpn.create, "boom")
+ for vpn in vpns:
+ vpn.destroy()
def test_299_can_delete_project(self):
- self.users.delete_project('testproj')
- self.assertFalse(filter(lambda p: p.name == 'testproj', self.users.get_projects()))
+ self.manager.delete_project('testproj')
+ self.assertFalse(filter(lambda p: p.name == 'testproj', self.manager.get_projects()))
def test_999_can_delete_users(self):
- self.users.delete_user('test1')
- users = self.users.get_users()
+ self.manager.delete_user('test1')
+ users = self.manager.get_users()
self.assertFalse(filter(lambda u: u.id == 'test1', users))
- self.users.delete_user('test2')
- self.assertEqual(self.users.get_user('test2'), None)
+ self.manager.delete_user('test2')
+ self.assertEqual(self.manager.get_user('test2'), None)
if __name__ == "__main__":
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py
index 8040f6331..40837405c 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/cloud_unittest.py
@@ -27,8 +27,8 @@ from xml.etree import ElementTree
from nova import flags
from nova import rpc
from nova import test
-from nova.auth import users
-from nova.compute import node
+from nova.auth import manager
+from nova.compute import service
from nova.endpoint import api
from nova.endpoint import cloud
@@ -40,8 +40,7 @@ class CloudTestCase(test.BaseTestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
self.flags(connection_type='fake',
- fake_storage=True,
- fake_users=True)
+ fake_storage=True)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG)
@@ -53,34 +52,34 @@ class CloudTestCase(test.BaseTestCase):
proxy=self.cloud)
self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop))
- # set up a node
- self.node = node.Node()
- self.node_consumer = rpc.AdapterConsumer(connection=self.conn,
+ # set up a service
+ self.compute = service.ComputeService()
+ self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.compute_topic,
- proxy=self.node)
- self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop))
+ proxy=self.compute)
+ self.injected.append(self.compute_consumer.attach_to_tornado(self.ioloop))
try:
- users.UserManager.instance().create_user('admin', 'admin', 'admin')
+ manager.AuthManager().create_user('admin', 'admin', 'admin')
except: pass
- admin = users.UserManager.instance().get_user('admin')
- project = users.UserManager.instance().create_project('proj', 'admin', 'proj')
+ admin = manager.AuthManager().get_user('admin')
+ project = manager.AuthManager().create_project('proj', 'admin', 'proj')
self.context = api.APIRequestContext(handler=None,project=project,user=admin)
def tearDown(self):
- users.UserManager.instance().delete_project('proj')
- users.UserManager.instance().delete_user('admin')
+ manager.AuthManager().delete_project('proj')
+ manager.AuthManager().delete_user('admin')
def test_console_output(self):
if FLAGS.connection_type == 'fake':
logging.debug("Can't test instances without a real virtual env.")
return
instance_id = 'foo'
- inst = yield self.node.run_instance(instance_id)
+ inst = yield self.compute.run_instance(instance_id)
output = yield self.cloud.get_console_output(self.context, [instance_id])
logging.debug(output)
self.assert_(output)
- rv = yield self.node.terminate_instance(instance_id)
+ rv = yield self.compute.terminate_instance(instance_id)
def test_run_instances(self):
if FLAGS.connection_type == 'fake':
@@ -112,7 +111,7 @@ class CloudTestCase(test.BaseTestCase):
# for instance in reservations[res_id]:
for instance in reservations[reservations.keys()[0]]:
logging.debug("Terminating instance %s" % instance['instance_id'])
- rv = yield self.node.terminate_instance(instance['instance_id'])
+ rv = yield self.compute.terminate_instance(instance['instance_id'])
def test_instance_update_state(self):
def instance(num):
diff --git a/nova/tests/node_unittest.py b/nova/tests/compute_unittest.py
index 86d9775fd..da0f82e3a 100644
--- a/nova/tests/node_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -26,7 +26,7 @@ from nova import flags
from nova import test
from nova import utils
from nova.compute import model
-from nova.compute import node
+from nova.compute import service
FLAGS = flags.FLAGS
@@ -53,14 +53,13 @@ class InstanceXmlTestCase(test.TrialTestCase):
# rv = yield first_node.terminate_instance(instance_id)
-class NodeConnectionTestCase(test.TrialTestCase):
+class ComputeConnectionTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
- super(NodeConnectionTestCase, self).setUp()
+ super(ComputeConnectionTestCase, self).setUp()
self.flags(connection_type='fake',
- fake_storage=True,
- fake_users=True)
- self.node = node.Node()
+ fake_storage=True)
+ self.compute = service.ComputeService()
def create_instance(self):
instdir = model.InstanceDirectory()
@@ -81,48 +80,48 @@ class NodeConnectionTestCase(test.TrialTestCase):
def test_run_describe_terminate(self):
instance_id = self.create_instance()
- rv = yield self.node.run_instance(instance_id)
+ rv = yield self.compute.run_instance(instance_id)
- rv = yield self.node.describe_instances()
+ rv = yield self.compute.describe_instances()
logging.info("Running instances: %s", rv)
self.assertEqual(rv[instance_id].name, instance_id)
- rv = yield self.node.terminate_instance(instance_id)
+ rv = yield self.compute.terminate_instance(instance_id)
- rv = yield self.node.describe_instances()
+ rv = yield self.compute.describe_instances()
logging.info("After terminating instances: %s", rv)
self.assertEqual(rv, {})
@defer.inlineCallbacks
def test_reboot(self):
instance_id = self.create_instance()
- rv = yield self.node.run_instance(instance_id)
+ rv = yield self.compute.run_instance(instance_id)
- rv = yield self.node.describe_instances()
+ rv = yield self.compute.describe_instances()
self.assertEqual(rv[instance_id].name, instance_id)
- yield self.node.reboot_instance(instance_id)
+ yield self.compute.reboot_instance(instance_id)
- rv = yield self.node.describe_instances()
+ rv = yield self.compute.describe_instances()
self.assertEqual(rv[instance_id].name, instance_id)
- rv = yield self.node.terminate_instance(instance_id)
+ rv = yield self.compute.terminate_instance(instance_id)
@defer.inlineCallbacks
def test_console_output(self):
instance_id = self.create_instance()
- rv = yield self.node.run_instance(instance_id)
+ rv = yield self.compute.run_instance(instance_id)
- console = yield self.node.get_console_output(instance_id)
+ console = yield self.compute.get_console_output(instance_id)
self.assert_(console)
- rv = yield self.node.terminate_instance(instance_id)
+ rv = yield self.compute.terminate_instance(instance_id)
@defer.inlineCallbacks
def test_run_instance_existing(self):
instance_id = self.create_instance()
- rv = yield self.node.run_instance(instance_id)
+ rv = yield self.compute.run_instance(instance_id)
- rv = yield self.node.describe_instances()
+ rv = yield self.compute.describe_instances()
self.assertEqual(rv[instance_id].name, instance_id)
- self.assertRaises(exception.Error, self.node.run_instance, instance_id)
- rv = yield self.node.terminate_instance(instance_id)
+ self.assertRaises(exception.Error, self.compute.run_instance, instance_id)
+ rv = yield self.compute.terminate_instance(instance_id)
diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py
index 5fcd2bcac..a7310fb26 100644
--- a/nova/tests/fake_flags.py
+++ b/nova/tests/fake_flags.py
@@ -24,5 +24,5 @@ FLAGS.connection_type = 'fake'
FLAGS.fake_storage = True
FLAGS.fake_rabbit = True
FLAGS.fake_network = True
-FLAGS.fake_users = True
+FLAGS.auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
FLAGS.verbose = True
diff --git a/nova/tests/future_unittest.py b/nova/tests/future_unittest.py
deleted file mode 100644
index 31ec83065..000000000
--- a/nova/tests/future_unittest.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import mox
-import StringIO
-import time
-from tornado import ioloop
-from twisted.internet import defer
-import unittest
-from xml.etree import ElementTree
-
-from nova import cloud
-from nova import exception
-from nova import flags
-from nova import node
-from nova import rpc
-from nova import test
-
-
-FLAGS = flags.FLAGS
-
-
-class AdminTestCase(test.BaseTestCase):
- def setUp(self):
- super(AdminTestCase, self).setUp()
- self.flags(connection_type='fake',
- fake_rabbit=True)
-
- self.conn = rpc.Connection.instance()
-
- logging.getLogger().setLevel(logging.INFO)
-
- # set up our cloud
- self.cloud = cloud.CloudController()
- self.cloud_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.cloud_topic,
- proxy=self.cloud)
- self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop))
-
- # set up a node
- self.node = node.Node()
- self.node_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.compute_topic,
- proxy=self.node)
- self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop))
-
- def test_flush_terminated(self):
- # Launch an instance
-
- # Wait until it's running
-
- # Terminate it
-
- # Wait until it's terminated
-
- # Flush terminated nodes
-
- # ASSERT that it's gone
- pass
diff --git a/nova/tests/model_unittest.py b/nova/tests/model_unittest.py
index b9eb2ac96..6825cfe2a 100644
--- a/nova/tests/model_unittest.py
+++ b/nova/tests/model_unittest.py
@@ -16,6 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+from datetime import datetime, timedelta
import logging
import time
from twisted.internet import defer
@@ -25,7 +26,6 @@ from nova import flags
from nova import test
from nova import utils
from nova.compute import model
-from nova.compute import node
FLAGS = flags.FLAGS
@@ -35,8 +35,7 @@ class ModelTestCase(test.TrialTestCase):
def setUp(self):
super(ModelTestCase, self).setUp()
self.flags(connection_type='fake',
- fake_storage=True,
- fake_users=True)
+ fake_storage=True)
def tearDown(self):
model.Instance('i-test').destroy()
@@ -66,6 +65,12 @@ class ModelTestCase(test.TrialTestCase):
daemon.save()
return daemon
+ def create_session_token(self):
+ session_token = model.SessionToken('tk12341234')
+ session_token['user'] = 'testuser'
+ session_token.save()
+ return session_token
+
@defer.inlineCallbacks
def test_create_instance(self):
"""store with create_instace, then test that a load finds it"""
@@ -204,3 +209,91 @@ class ModelTestCase(test.TrialTestCase):
if x.identifier == 'testhost:nova-testdaemon':
found = True
self.assertTrue(found)
+
+ @defer.inlineCallbacks
+ def test_create_session_token(self):
+ """create"""
+ d = yield self.create_session_token()
+ d = model.SessionToken(d.token)
+ self.assertFalse(d.is_new_record())
+
+ @defer.inlineCallbacks
+ def test_delete_session_token(self):
+ """create, then destroy, then make sure loads a new record"""
+ instance = yield self.create_session_token()
+ yield instance.destroy()
+ newinst = yield model.SessionToken(instance.token)
+ self.assertTrue(newinst.is_new_record())
+
+ @defer.inlineCallbacks
+ def test_session_token_added_to_set(self):
+ """create, then check that it is included in list"""
+ instance = yield self.create_session_token()
+ found = False
+ for x in model.SessionToken.all():
+ if x.identifier == instance.token:
+ found = True
+ self.assert_(found)
+
+ @defer.inlineCallbacks
+ def test_session_token_associates_user(self):
+ """create, then check that it is listed for the user"""
+ instance = yield self.create_session_token()
+ found = False
+ for x in model.SessionToken.associated_to('user', 'testuser'):
+ if x.identifier == instance.identifier:
+ found = True
+ self.assertTrue(found)
+
+ @defer.inlineCallbacks
+ def test_session_token_generation(self):
+ instance = yield model.SessionToken.generate('username', 'TokenType')
+ self.assertFalse(instance.is_new_record())
+
+ @defer.inlineCallbacks
+ def test_find_generated_session_token(self):
+ instance = yield model.SessionToken.generate('username', 'TokenType')
+ found = yield model.SessionToken.lookup(instance.identifier)
+ self.assert_(found)
+
+ def test_update_session_token_expiry(self):
+ instance = model.SessionToken('tk12341234')
+ oldtime = datetime.utcnow()
+ instance['expiry'] = oldtime.strftime(utils.TIME_FORMAT)
+ instance.update_expiry()
+ expiry = utils.parse_isotime(instance['expiry'])
+ self.assert_(expiry > datetime.utcnow())
+
+ @defer.inlineCallbacks
+ def test_session_token_lookup_when_expired(self):
+ instance = yield model.SessionToken.generate("testuser")
+ instance['expiry'] = datetime.utcnow().strftime(utils.TIME_FORMAT)
+ instance.save()
+ inst = model.SessionToken.lookup(instance.identifier)
+ self.assertFalse(inst)
+
+ @defer.inlineCallbacks
+ def test_session_token_lookup_when_not_expired(self):
+ instance = yield model.SessionToken.generate("testuser")
+ inst = model.SessionToken.lookup(instance.identifier)
+ self.assert_(inst)
+
+ @defer.inlineCallbacks
+ def test_session_token_is_expired_when_expired(self):
+ instance = yield model.SessionToken.generate("testuser")
+ instance['expiry'] = datetime.utcnow().strftime(utils.TIME_FORMAT)
+ self.assert_(instance.is_expired())
+
+ @defer.inlineCallbacks
+ def test_session_token_is_expired_when_not_expired(self):
+ instance = yield model.SessionToken.generate("testuser")
+ self.assertFalse(instance.is_expired())
+
+ @defer.inlineCallbacks
+ def test_session_token_ttl(self):
+ instance = yield model.SessionToken.generate("testuser")
+ now = datetime.utcnow()
+ delta = timedelta(hours=1)
+ instance['expiry'] = (now + delta).strftime(utils.TIME_FORMAT)
+ # give 5 seconds of fuzziness
+ self.assert_(abs(instance.ttl() - FLAGS.auth_token_ttl) < 5)
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index 69278e896..f24eefb0d 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -23,7 +23,7 @@ import logging
from nova import flags
from nova import test
from nova import utils
-from nova.auth import users
+from nova.auth import manager
from nova.compute import network
from nova.compute.exception import NoMoreAddresses
@@ -32,66 +32,71 @@ FLAGS = flags.FLAGS
class NetworkTestCase(test.TrialTestCase):
def setUp(self):
super(NetworkTestCase, self).setUp()
+ # NOTE(vish): if you change these flags, make sure to change the
+ # flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
fake_storage=True,
fake_network=True,
+ auth_driver='nova.auth.ldapdriver.FakeLdapDriver',
network_size=32)
logging.getLogger().setLevel(logging.DEBUG)
- self.manager = users.UserManager.instance()
+ self.manager = manager.AuthManager()
self.dnsmasq = FakeDNSMasq()
- try:
- self.manager.create_user('netuser', 'netuser', 'netuser')
- except: pass
+ self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
+ self.projects = []
+ self.projects.append(self.manager.create_project('netuser',
+ 'netuser',
+ 'netuser'))
for i in range(0, 6):
name = 'project%s' % i
- if not self.manager.get_project(name):
- self.manager.create_project(name, 'netuser', name)
+ self.projects.append(self.manager.create_project(name,
+ 'netuser',
+ name))
self.network = network.PublicNetworkController()
def tearDown(self):
super(NetworkTestCase, self).tearDown()
- for i in range(0, 6):
- name = 'project%s' % i
- self.manager.delete_project(name)
- self.manager.delete_user('netuser')
+ for project in self.projects:
+ self.manager.delete_project(project)
+ self.manager.delete_user(self.user)
def test_public_network_allocation(self):
pubnet = IPy.IP(flags.FLAGS.public_range)
- address = self.network.allocate_ip("netuser", "project0", "public")
+ address = self.network.allocate_ip(self.user.id, self.projects[0].id, "public")
self.assertTrue(IPy.IP(address) in pubnet)
self.assertTrue(IPy.IP(address) in self.network.network)
def test_allocate_deallocate_ip(self):
address = network.allocate_ip(
- "netuser", "project0", utils.generate_mac())
+ self.user.id, self.projects[0].id, utils.generate_mac())
logging.debug("Was allocated %s" % (address))
- net = network.get_project_network("project0", "default")
- self.assertEqual(True, is_in_project(address, "project0"))
+ net = network.get_project_network(self.projects[0].id, "default")
+ self.assertEqual(True, is_in_project(address, self.projects[0].id))
mac = utils.generate_mac()
hostname = "test-host"
self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name)
rv = network.deallocate_ip(address)
# Doesn't go away until it's dhcp released
- self.assertEqual(True, is_in_project(address, "project0"))
+ self.assertEqual(True, is_in_project(address, self.projects[0].id))
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
- self.assertEqual(False, is_in_project(address, "project0"))
+ self.assertEqual(False, is_in_project(address, self.projects[0].id))
def test_range_allocation(self):
mac = utils.generate_mac()
secondmac = utils.generate_mac()
hostname = "test-host"
address = network.allocate_ip(
- "netuser", "project0", mac)
+ self.user.id, self.projects[0].id, mac)
secondaddress = network.allocate_ip(
- "netuser", "project1", secondmac)
- net = network.get_project_network("project0", "default")
- secondnet = network.get_project_network("project1", "default")
+ self.user, self.projects[1].id, secondmac)
+ net = network.get_project_network(self.projects[0].id, "default")
+ secondnet = network.get_project_network(self.projects[1].id, "default")
- self.assertEqual(True, is_in_project(address, "project0"))
- self.assertEqual(True, is_in_project(secondaddress, "project1"))
- self.assertEqual(False, is_in_project(address, "project1"))
+ self.assertEqual(True, is_in_project(address, self.projects[0].id))
+ self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id))
+ self.assertEqual(False, is_in_project(address, self.projects[1].id))
# Addresses are allocated before they're issued
self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name)
@@ -100,34 +105,34 @@ class NetworkTestCase(test.TrialTestCase):
rv = network.deallocate_ip(address)
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
- self.assertEqual(False, is_in_project(address, "project0"))
+ self.assertEqual(False, is_in_project(address, self.projects[0].id))
# First address release shouldn't affect the second
- self.assertEqual(True, is_in_project(secondaddress, "project1"))
+ self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id))
rv = network.deallocate_ip(secondaddress)
self.dnsmasq.release_ip(secondmac, secondaddress,
hostname, secondnet.bridge_name)
- self.assertEqual(False, is_in_project(secondaddress, "project1"))
+ self.assertEqual(False, is_in_project(secondaddress, self.projects[1].id))
def test_subnet_edge(self):
- secondaddress = network.allocate_ip("netuser", "project0",
+ secondaddress = network.allocate_ip(self.user.id, self.projects[0].id,
utils.generate_mac())
hostname = "toomany-hosts"
- for project in range(1,5):
- project_id = "project%s" % (project)
+ for i in range(1,5):
+ project_id = self.projects[i].id
mac = utils.generate_mac()
mac2 = utils.generate_mac()
mac3 = utils.generate_mac()
address = network.allocate_ip(
- "netuser", project_id, mac)
+ self.user, project_id, mac)
address2 = network.allocate_ip(
- "netuser", project_id, mac2)
+ self.user, project_id, mac2)
address3 = network.allocate_ip(
- "netuser", project_id, mac3)
- self.assertEqual(False, is_in_project(address, "project0"))
- self.assertEqual(False, is_in_project(address2, "project0"))
- self.assertEqual(False, is_in_project(address3, "project0"))
+ self.user, project_id, mac3)
+ self.assertEqual(False, is_in_project(address, self.projects[0].id))
+ self.assertEqual(False, is_in_project(address2, self.projects[0].id))
+ self.assertEqual(False, is_in_project(address3, self.projects[0].id))
rv = network.deallocate_ip(address)
rv = network.deallocate_ip(address2)
rv = network.deallocate_ip(address3)
@@ -135,7 +140,7 @@ class NetworkTestCase(test.TrialTestCase):
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
self.dnsmasq.release_ip(mac2, address2, hostname, net.bridge_name)
self.dnsmasq.release_ip(mac3, address3, hostname, net.bridge_name)
- net = network.get_project_network("project0", "default")
+ net = network.get_project_network(self.projects[0].id, "default")
rv = network.deallocate_ip(secondaddress)
self.dnsmasq.release_ip(mac, secondaddress, hostname, net.bridge_name)
@@ -153,34 +158,36 @@ class NetworkTestCase(test.TrialTestCase):
environment's setup.
Network size is set in test fixture's setUp method.
-
+
There are FLAGS.cnt_vpn_clients addresses reserved for VPN (NUM_RESERVED_VPN_IPS)
And there are NUM_STATIC_IPS that are always reserved by Nova for the necessary
services (gateway, CloudPipe, etc)
- So we should get flags.network_size - (NUM_STATIC_IPS +
- NUM_PREALLOCATED_IPS +
+ So we should get flags.network_size - (NUM_STATIC_IPS +
+ NUM_PREALLOCATED_IPS +
NUM_RESERVED_VPN_IPS)
usable addresses
"""
- net = network.get_project_network("project0", "default")
+ net = network.get_project_network(self.projects[0].id, "default")
# Determine expected number of available IP addresses
num_static_ips = net.num_static_ips
num_preallocated_ips = len(net.hosts.keys())
num_reserved_vpn_ips = flags.FLAGS.cnt_vpn_clients
- num_available_ips = flags.FLAGS.network_size - (num_static_ips + num_preallocated_ips + num_reserved_vpn_ips)
+ num_available_ips = flags.FLAGS.network_size - (num_static_ips +
+ num_preallocated_ips +
+ num_reserved_vpn_ips)
hostname = "toomany-hosts"
macs = {}
addresses = {}
for i in range(0, (num_available_ips - 1)):
macs[i] = utils.generate_mac()
- addresses[i] = network.allocate_ip("netuser", "project0", macs[i])
+ addresses[i] = network.allocate_ip(self.user.id, self.projects[0].id, macs[i])
self.dnsmasq.issue_ip(macs[i], addresses[i], hostname, net.bridge_name)
- self.assertRaises(NoMoreAddresses, network.allocate_ip, "netuser", "project0", utils.generate_mac())
+ self.assertRaises(NoMoreAddresses, network.allocate_ip, self.user.id, self.projects[0].id, utils.generate_mac())
for i in range(0, (num_available_ips - 1)):
rv = network.deallocate_ip(addresses[i])
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
index f22256aaf..dd00377e7 100644
--- a/nova/tests/objectstore_unittest.py
+++ b/nova/tests/objectstore_unittest.py
@@ -16,6 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import boto
import glob
import hashlib
import logging
@@ -28,8 +29,13 @@ from nova import objectstore
from nova.objectstore import bucket # for buckets_path flag
from nova.objectstore import image # for images_path flag
from nova import test
-from nova.auth import users
+from nova.auth import manager
+from nova.objectstore.handler import S3
+from nova.exception import NotEmpty, NotFound, NotAuthorized
+from boto.s3.connection import S3Connection, OrdinaryCallingFormat
+from twisted.internet import reactor, threads, defer
+from twisted.web import http, server
FLAGS = flags.FLAGS
@@ -53,13 +59,12 @@ os.makedirs(os.path.join(oss_tempdir, 'buckets'))
class ObjectStoreTestCase(test.BaseTestCase):
def setUp(self):
super(ObjectStoreTestCase, self).setUp()
- self.flags(fake_users=True,
- buckets_path=os.path.join(oss_tempdir, 'buckets'),
+ self.flags(buckets_path=os.path.join(oss_tempdir, 'buckets'),
images_path=os.path.join(oss_tempdir, 'images'),
ca_path=os.path.join(os.path.dirname(__file__), 'CA'))
logging.getLogger().setLevel(logging.DEBUG)
- self.um = users.UserManager.instance()
+ self.um = manager.AuthManager()
try:
self.um.create_user('user1')
except: pass
@@ -98,49 +103,37 @@ class ObjectStoreTestCase(test.BaseTestCase):
# another user is not authorized
self.context.user = self.um.get_user('user2')
self.context.project = self.um.get_project('proj2')
- self.assert_(bucket.is_authorized(self.context) == False)
+ self.assertFalse(bucket.is_authorized(self.context))
# admin is authorized to use bucket
self.context.user = self.um.get_user('admin_user')
self.context.project = None
- self.assert_(bucket.is_authorized(self.context))
+ self.assertTrue(bucket.is_authorized(self.context))
# new buckets are empty
- self.assert_(bucket.list_keys()['Contents'] == [])
+ self.assertTrue(bucket.list_keys()['Contents'] == [])
# storing keys works
bucket['foo'] = "bar"
- self.assert_(len(bucket.list_keys()['Contents']) == 1)
+ self.assertEquals(len(bucket.list_keys()['Contents']), 1)
- self.assert_(bucket['foo'].read() == 'bar')
+ self.assertEquals(bucket['foo'].read(), 'bar')
# md5 of key works
- self.assert_(bucket['foo'].md5 == hashlib.md5('bar').hexdigest())
-
- # deleting non-empty bucket throws exception
- exception = False
- try:
- bucket.delete()
- except:
- exception = True
+ self.assertEquals(bucket['foo'].md5, hashlib.md5('bar').hexdigest())
- self.assert_(exception)
+ # deleting non-empty bucket should throw a NotEmpty exception
+ self.assertRaises(NotEmpty, bucket.delete)
# deleting key
del bucket['foo']
- # deleting empty button
+ # deleting empty bucket
bucket.delete()
# accessing deleted bucket throws exception
- exception = False
- try:
- objectstore.bucket.Bucket('new_bucket')
- except:
- exception = True
-
- self.assert_(exception)
+ self.assertRaises(NotFound, objectstore.bucket.Bucket, 'new_bucket')
def test_images(self):
self.context.user = self.um.get_user('user1')
@@ -169,37 +162,108 @@ class ObjectStoreTestCase(test.BaseTestCase):
# verify image permissions
self.context.user = self.um.get_user('user2')
self.context.project = self.um.get_project('proj2')
- self.assert_(my_img.is_authorized(self.context) == False)
-
-# class ApiObjectStoreTestCase(test.BaseTestCase):
-# def setUp(self):
-# super(ApiObjectStoreTestCase, self).setUp()
-# FLAGS.fake_users = True
-# FLAGS.buckets_path = os.path.join(tempdir, 'buckets')
-# FLAGS.images_path = os.path.join(tempdir, 'images')
-# FLAGS.ca_path = os.path.join(os.path.dirname(__file__), 'CA')
-#
-# self.users = users.UserManager.instance()
-# self.app = handler.Application(self.users)
-#
-# self.host = '127.0.0.1'
-#
-# self.conn = boto.s3.connection.S3Connection(
-# aws_access_key_id=user.access,
-# aws_secret_access_key=user.secret,
-# is_secure=False,
-# calling_format=boto.s3.connection.OrdinaryCallingFormat(),
-# port=FLAGS.s3_port,
-# host=FLAGS.s3_host)
-#
-# self.mox.StubOutWithMock(self.ec2, 'new_http_connection')
-#
-# def tearDown(self):
-# FLAGS.Reset()
-# super(ApiObjectStoreTestCase, self).tearDown()
-#
-# def test_describe_instances(self):
-# self.expect_http()
-# self.mox.ReplayAll()
-#
-# self.assertEqual(self.ec2.get_all_instances(), [])
+ self.assertFalse(my_img.is_authorized(self.context))
+
+
+class TestHTTPChannel(http.HTTPChannel):
+ # Otherwise we end up with an unclean reactor
+ def checkPersistence(self, _, __):
+ return False
+
+
+class TestSite(server.Site):
+ protocol = TestHTTPChannel
+
+
+class S3APITestCase(test.TrialTestCase):
+ def setUp(self):
+ super(S3APITestCase, self).setUp()
+
+ FLAGS.auth_driver='nova.auth.ldapdriver.FakeLdapDriver',
+ FLAGS.buckets_path = os.path.join(oss_tempdir, 'buckets')
+
+ self.um = manager.AuthManager()
+ self.admin_user = self.um.create_user('admin', admin=True)
+ self.admin_project = self.um.create_project('admin', self.admin_user)
+
+ shutil.rmtree(FLAGS.buckets_path)
+ os.mkdir(FLAGS.buckets_path)
+
+ root = S3()
+ self.site = TestSite(root)
+ self.listening_port = reactor.listenTCP(0, self.site, interface='127.0.0.1')
+ self.tcp_port = self.listening_port.getHost().port
+
+
+ if not boto.config.has_section('Boto'):
+ boto.config.add_section('Boto')
+ boto.config.set('Boto', 'num_retries', '0')
+ self.conn = S3Connection(aws_access_key_id=self.admin_user.access,
+ aws_secret_access_key=self.admin_user.secret,
+ host='127.0.0.1',
+ port=self.tcp_port,
+ is_secure=False,
+ calling_format=OrdinaryCallingFormat())
+
+ # Don't attempt to reuse connections
+ def get_http_connection(host, is_secure):
+ return self.conn.new_http_connection(host, is_secure)
+ self.conn.get_http_connection = get_http_connection
+
+ def _ensure_empty_list(self, l):
+ self.assertEquals(len(l), 0, "List was not empty")
+ return True
+
+ def _ensure_only_bucket(self, l, name):
+ self.assertEquals(len(l), 1, "List didn't have exactly one element in it")
+ self.assertEquals(l[0].name, name, "Wrong name")
+
+ def test_000_list_buckets(self):
+ d = threads.deferToThread(self.conn.get_all_buckets)
+ d.addCallback(self._ensure_empty_list)
+ return d
+
+ def test_001_create_and_delete_bucket(self):
+ bucket_name = 'testbucket'
+
+ d = threads.deferToThread(self.conn.create_bucket, bucket_name)
+ d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets))
+
+ def ensure_only_bucket(l, name):
+ self.assertEquals(len(l), 1, "List didn't have exactly one element in it")
+ self.assertEquals(l[0].name, name, "Wrong name")
+ d.addCallback(ensure_only_bucket, bucket_name)
+
+ d.addCallback(lambda _:threads.deferToThread(self.conn.delete_bucket, bucket_name))
+ d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets))
+ d.addCallback(self._ensure_empty_list)
+ return d
+
+ def test_002_create_bucket_and_key_and_delete_key_again(self):
+ bucket_name = 'testbucket'
+ key_name = 'somekey'
+ key_contents = 'somekey'
+
+ d = threads.deferToThread(self.conn.create_bucket, bucket_name)
+ d.addCallback(lambda b:threads.deferToThread(b.new_key, key_name))
+ d.addCallback(lambda k:threads.deferToThread(k.set_contents_from_string, key_contents))
+ def ensure_key_contents(bucket_name, key_name, contents):
+ bucket = self.conn.get_bucket(bucket_name)
+ key = bucket.get_key(key_name)
+ self.assertEquals(key.get_contents_as_string(), contents, "Bad contents")
+ d.addCallback(lambda _:threads.deferToThread(ensure_key_contents, bucket_name, key_name, key_contents))
+ def delete_key(bucket_name, key_name):
+ bucket = self.conn.get_bucket(bucket_name)
+ key = bucket.get_key(key_name)
+ key.delete()
+ d.addCallback(lambda _:threads.deferToThread(delete_key, bucket_name, key_name))
+ d.addCallback(lambda _:threads.deferToThread(self.conn.get_bucket, bucket_name))
+ d.addCallback(lambda b:threads.deferToThread(b.get_all_keys))
+ d.addCallback(self._ensure_empty_list)
+ return d
+
+ def tearDown(self):
+ self.um.delete_user('admin')
+ self.um.delete_project('admin')
+ return defer.DeferredList([defer.maybeDeferred(self.listening_port.stopListening)])
+ super(S3APITestCase, self).tearDown()
diff --git a/nova/tests/real_flags.py b/nova/tests/real_flags.py
index 690fb640a..121f4eb41 100644
--- a/nova/tests/real_flags.py
+++ b/nova/tests/real_flags.py
@@ -24,5 +24,4 @@ FLAGS.connection_type = 'libvirt'
FLAGS.fake_storage = False
FLAGS.fake_rabbit = False
FLAGS.fake_network = False
-FLAGS.fake_users = False
FLAGS.verbose = False
diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py
new file mode 100644
index 000000000..b536ac383
--- /dev/null
+++ b/nova/tests/volume_unittest.py
@@ -0,0 +1,115 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from nova import compute
+from nova import exception
+from nova import flags
+from nova import test
+from nova.volume import service as volume_service
+
+
+FLAGS = flags.FLAGS
+
+
+class VolumeTestCase(test.TrialTestCase):
+ def setUp(self):
+ logging.getLogger().setLevel(logging.DEBUG)
+ super(VolumeTestCase, self).setUp()
+ self.compute = compute.service.ComputeService()
+ self.volume = None
+ self.flags(connection_type='fake',
+ fake_storage=True)
+ self.volume = volume_service.VolumeService()
+
+ def test_run_create_volume(self):
+ vol_size = '0'
+ user_id = 'fake'
+ project_id = 'fake'
+ volume_id = self.volume.create_volume(vol_size, user_id, project_id)
+ # TODO(termie): get_volume returns differently than create_volume
+ self.assertEqual(volume_id,
+ volume_service.get_volume(volume_id)['volume_id'])
+
+ rv = self.volume.delete_volume(volume_id)
+ self.assertRaises(exception.Error,
+ volume_service.get_volume,
+ volume_id)
+
+ def test_too_big_volume(self):
+ vol_size = '1001'
+ user_id = 'fake'
+ project_id = 'fake'
+ self.assertRaises(TypeError,
+ self.volume.create_volume,
+ vol_size, user_id, project_id)
+
+ def test_too_many_volumes(self):
+ vol_size = '1'
+ user_id = 'fake'
+ project_id = 'fake'
+ num_shelves = FLAGS.last_shelf_id - FLAGS.first_shelf_id + 1
+ total_slots = FLAGS.slots_per_shelf * num_shelves
+ vols = []
+ for i in xrange(total_slots):
+ vid = self.volume.create_volume(vol_size, user_id, project_id)
+ vols.append(vid)
+ self.assertRaises(volume_service.NoMoreVolumes,
+ self.volume.create_volume,
+ vol_size, user_id, project_id)
+ for id in vols:
+ self.volume.delete_volume(id)
+
+ def test_run_attach_detach_volume(self):
+ # Create one volume and one compute to test with
+ instance_id = "storage-test"
+ vol_size = "5"
+ user_id = "fake"
+ project_id = 'fake'
+ mountpoint = "/dev/sdf"
+ volume_id = self.volume.create_volume(vol_size, user_id, project_id)
+
+ volume_obj = volume_service.get_volume(volume_id)
+ volume_obj.start_attach(instance_id, mountpoint)
+ rv = yield self.compute.attach_volume(volume_id,
+ instance_id,
+ mountpoint)
+ self.assertEqual(volume_obj['status'], "in-use")
+ self.assertEqual(volume_obj['attachStatus'], "attached")
+ self.assertEqual(volume_obj['instance_id'], instance_id)
+ self.assertEqual(volume_obj['mountpoint'], mountpoint)
+
+ self.assertRaises(exception.Error,
+ self.volume.delete_volume,
+ volume_id)
+
+ rv = yield self.volume.detach_volume(volume_id)
+ volume_obj = volume_service.get_volume(volume_id)
+ self.assertEqual(volume_obj['status'], "available")
+
+ rv = self.volume.delete_volume(volume_id)
+ self.assertRaises(exception.Error,
+ volume_service.get_volume,
+ volume_id)
+
+ def test_multi_node(self):
+ # TODO(termie): Figure out how to test with two nodes,
+ # each of them having a different FLAG for storage_node
+ # This will allow us to test cross-node interactions
+ pass