summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorSoren Hansen <soren@linux2go.dk>2011-09-16 17:17:34 +0200
committerSoren Hansen <soren@linux2go.dk>2011-09-16 17:17:34 +0200
commit322f5458236a64bc6d12ba29cee6750d452f1ecd (patch)
tree9549137acf4c12c7ebeb7e0a0b5fa7a1449ec7ed /nova/tests
parent1d2a5bd706736bff2e2d82ebeb813438493cea41 (diff)
parent23fefd17727966b0a1397f40b25bedcd668206d7 (diff)
Merge trunk
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/ec2/public_key/dummy.fingerprint (renamed from nova/tests/public_key/dummy.fingerprint)0
-rw-r--r--nova/tests/api/ec2/public_key/dummy.pub (renamed from nova/tests/public_key/dummy.pub)0
-rw-r--r--nova/tests/api/ec2/test_cloud.py (renamed from nova/tests/test_cloud.py)8
-rw-r--r--nova/tests/api/openstack/common.py22
-rw-r--r--nova/tests/api/openstack/contrib/test_createserverext.py103
-rw-r--r--nova/tests/api/openstack/contrib/test_floating_ips.py78
-rw-r--r--nova/tests/api/openstack/contrib/test_volumes.py73
-rw-r--r--nova/tests/api/openstack/fakes.py129
-rw-r--r--nova/tests/api/openstack/test_api.py25
-rw-r--r--nova/tests/api/openstack/test_common.py176
-rw-r--r--nova/tests/api/openstack/test_extensions.py1
-rw-r--r--nova/tests/api/openstack/test_flavors.py259
-rw-r--r--nova/tests/api/openstack/test_image_metadata.py163
-rw-r--r--nova/tests/api/openstack/test_images.py1100
-rw-r--r--nova/tests/api/openstack/test_limits.py104
-rw-r--r--nova/tests/api/openstack/test_servers.py349
-rw-r--r--nova/tests/api/openstack/test_versions.py604
-rw-r--r--nova/tests/api/openstack/test_wsgi.py57
-rw-r--r--nova/tests/db/fakes.py4
-rw-r--r--nova/tests/fake_network.py194
-rw-r--r--nova/tests/glance/stubs.py74
-rw-r--r--nova/tests/image/test_glance.py637
-rw-r--r--nova/tests/integrated/integrated_helpers.py2
-rw-r--r--nova/tests/integrated/test_xml.py12
-rw-r--r--nova/tests/scheduler/test_abstract_scheduler.py55
-rw-r--r--nova/tests/scheduler/test_scheduler.py102
-rw-r--r--nova/tests/test_compute.py28
-rw-r--r--nova/tests/test_direct.py2
-rw-r--r--nova/tests/test_libvirt.py282
-rwxr-xr-xnova/tests/test_linux_net.py347
-rw-r--r--nova/tests/test_network.py139
-rw-r--r--nova/tests/test_quantum.py323
-rw-r--r--nova/tests/test_virt_drivers.py3
-rw-r--r--nova/tests/test_vmwareapi.py3
-rw-r--r--nova/tests/test_xenapi.py8
-rw-r--r--nova/tests/vmwareapi/stubs.py2
36 files changed, 3497 insertions, 1971 deletions
diff --git a/nova/tests/public_key/dummy.fingerprint b/nova/tests/api/ec2/public_key/dummy.fingerprint
index 715bca27a..715bca27a 100644
--- a/nova/tests/public_key/dummy.fingerprint
+++ b/nova/tests/api/ec2/public_key/dummy.fingerprint
diff --git a/nova/tests/public_key/dummy.pub b/nova/tests/api/ec2/public_key/dummy.pub
index d4cf2bc0d..d4cf2bc0d 100644
--- a/nova/tests/public_key/dummy.pub
+++ b/nova/tests/api/ec2/public_key/dummy.pub
diff --git a/nova/tests/test_cloud.py b/nova/tests/api/ec2/test_cloud.py
index 96aebdd54..cc85cbd95 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/api/ec2/test_cloud.py
@@ -541,11 +541,9 @@ class CloudTestCase(test.TestCase):
inst2 = db.instance_create(self.context, args2)
db.instance_destroy(self.context, inst1.id)
result = self.cloud.describe_instances(self.context)
+ self.assertEqual(len(result['reservationSet']), 1)
result1 = result['reservationSet'][0]['instancesSet']
self.assertEqual(result1[0]['instanceId'],
- ec2utils.id_to_ec2_id(inst1.id))
- result2 = result['reservationSet'][1]['instancesSet']
- self.assertEqual(result2[0]['instanceId'],
ec2utils.id_to_ec2_id(inst2.id))
def _block_device_mapping_create(self, instance_id, mappings):
@@ -1597,7 +1595,9 @@ class CloudTestCase(test.TestCase):
'ephemeral0': '/dev/sdb',
'swap': '/dev/sdc',
'ephemeral1': '/dev/sdd',
- 'ephemeral2': '/dev/sd3'}
+ 'ephemeral2': '/dev/sd3',
+ 'ebs0': '/dev/sdh',
+ 'ebs1': '/dev/sdi'}
self.assertEqual(self.cloud._format_instance_mapping(ctxt,
instance_ref0),
diff --git a/nova/tests/api/openstack/common.py b/nova/tests/api/openstack/common.py
index 74bb8729a..19515ca67 100644
--- a/nova/tests/api/openstack/common.py
+++ b/nova/tests/api/openstack/common.py
@@ -34,3 +34,25 @@ def webob_factory(url):
req.body = json.dumps(body)
return req
return web_request
+
+
+def compare_links(actual, expected):
+ """Compare xml atom links."""
+
+ return compare_tree_to_dict(actual, expected, ('rel', 'href', 'type'))
+
+
+def compare_media_types(actual, expected):
+ """Compare xml media types."""
+
+ return compare_tree_to_dict(actual, expected, ('base', 'type'))
+
+
+def compare_tree_to_dict(actual, expected, keys):
+ """Compare parts of lxml.etree objects to dicts."""
+
+ for elem, data in zip(actual, expected):
+ for key in keys:
+ if elem.get(key) != data.get(key):
+ return False
+ return True
diff --git a/nova/tests/api/openstack/contrib/test_createserverext.py b/nova/tests/api/openstack/contrib/test_createserverext.py
index 089c8e59d..03c7d1ec5 100644
--- a/nova/tests/api/openstack/contrib/test_createserverext.py
+++ b/nova/tests/api/openstack/contrib/test_createserverext.py
@@ -16,6 +16,7 @@
# under the License.
import base64
+import datetime
import json
import unittest
from xml.dom import minidom
@@ -27,15 +28,7 @@ from nova import db
from nova import exception
from nova import flags
from nova import test
-from nova import utils
import nova.api.openstack
-from nova.api.openstack import servers
-from nova.api.openstack.contrib import createserverext
-import nova.compute.api
-
-import nova.scheduler.api
-import nova.image.fake
-import nova.rpc
from nova.tests.api.openstack import fakes
@@ -52,22 +45,49 @@ DUPLICATE_NETWORKS = [('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '10.0.1.12'),
INVALID_NETWORKS = [('invalid', 'invalid-ip-address')]
+INSTANCE = {
+ "id": 1,
+ "display_name": "test_server",
+ "uuid": FAKE_UUID,
+ "user_id": 'fake_user_id',
+ "tenant_id": 'fake_tenant_id',
+ "created_at": datetime.datetime(2010, 10, 10, 12, 0, 0),
+ "updated_at": datetime.datetime(2010, 11, 11, 11, 0, 0),
+ "security_groups": [{"id": 1, "name": "test"}],
+ "image_ref": 'http://foo.com/123',
+ "instance_type": {"flavorid": '124'},
+ }
+
+
+def return_server_by_id(context, id, session=None):
+ INSTANCE['id'] = id
+ return INSTANCE
+
+
+def return_security_group_non_existing(context, project_id, group_name):
+ raise exception.SecurityGroupNotFoundForProject(project_id=project_id,
+ security_group_id=group_name)
+
+
+def return_security_group_get_by_name(context, project_id, group_name):
+ return {'id': 1, 'name': group_name}
+
+
+def return_security_group_get(context, security_group_id, session):
+ return {'id': security_group_id}
+
+
+def return_instance_add_security_group(context, instance_id,
+ security_group_id):
+ pass
+
class CreateserverextTest(test.TestCase):
def setUp(self):
super(CreateserverextTest, self).setUp()
- self.stubs = stubout.StubOutForTesting()
- fakes.FakeAuthManager.auth_data = {}
- fakes.FakeAuthDatabase.data = {}
- fakes.stub_out_auth(self.stubs)
- fakes.stub_out_image_service(self.stubs)
- fakes.stub_out_key_pair_funcs(self.stubs)
- self.allow_admin = FLAGS.allow_admin_api
def tearDown(self):
- self.stubs.UnsetAll()
- FLAGS.allow_admin_api = self.allow_admin
super(CreateserverextTest, self).tearDown()
def _setup_mock_compute_api(self):
@@ -96,6 +116,8 @@ class CreateserverextTest(test.TestCase):
return [{'id': '1234', 'display_name': 'fakeinstance',
'uuid': FAKE_UUID,
+ 'user_id': 'fake',
+ 'project_id': 'fake',
'created_at': "",
'updated_at': ""}]
@@ -114,6 +136,18 @@ class CreateserverextTest(test.TestCase):
'_get_kernel_ramdisk_from_image', make_stub_method((1, 1)))
return compute_api
+ def _create_security_group_request_dict(self, security_groups):
+ server = {}
+ server['name'] = 'new-server-test'
+ server['imageRef'] = 1
+ server['flavorRef'] = 1
+ if security_groups is not None:
+ sg_list = []
+ for name in security_groups:
+ sg_list.append({'name': name})
+ server['security_groups'] = sg_list
+ return {'server': server}
+
def _create_networks_request_dict(self, networks):
server = {}
server['name'] = 'new-server-test'
@@ -348,3 +382,38 @@ class CreateserverextTest(test.TestCase):
self._create_instance_with_user_data_json(user_data_contents)
self.assertEquals(response.status_int, 400)
self.assertEquals(user_data, None)
+
+ def test_create_instance_with_security_group_json(self):
+ security_groups = ['test', 'test1']
+ self.stubs.Set(nova.db.api, 'security_group_get_by_name',
+ return_security_group_get_by_name)
+ self.stubs.Set(nova.db.api, 'instance_add_security_group',
+ return_instance_add_security_group)
+ body_dict = self._create_security_group_request_dict(security_groups)
+ request = self._get_create_request_json(body_dict)
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 202)
+
+ def test_get_server_by_id_verify_security_groups_json(self):
+ self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id)
+ req = webob.Request.blank('/v1.1/123/os-create-server-ext/1')
+ req.headers['Content-Type'] = 'application/json'
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 200)
+ res_dict = json.loads(response.body)
+ expected_security_group = [{"name": "test"}]
+ self.assertEquals(res_dict['server']['security_groups'],
+ expected_security_group)
+
+ def test_get_server_by_id_verify_security_groups_xml(self):
+ self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id)
+ req = webob.Request.blank('/v1.1/123/os-create-server-ext/1')
+ req.headers['Accept'] = 'application/xml'
+ response = req.get_response(fakes.wsgi_app())
+ self.assertEquals(response.status_int, 200)
+ dom = minidom.parseString(response.body)
+ server = dom.childNodes[0]
+ sec_groups = server.getElementsByTagName('security_groups')[0]
+ sec_group = sec_groups.getElementsByTagName('security_group')[0]
+ self.assertEqual(INSTANCE['security_groups'][0]['name'],
+ sec_group.getAttribute("name"))
diff --git a/nova/tests/api/openstack/contrib/test_floating_ips.py b/nova/tests/api/openstack/contrib/test_floating_ips.py
index 642f2b841..0744f0a11 100644
--- a/nova/tests/api/openstack/contrib/test_floating_ips.py
+++ b/nova/tests/api/openstack/contrib/test_floating_ips.py
@@ -212,11 +212,45 @@ class FloatingIpTest(test.TestCase):
"fixed_ip": None}
self.assertEqual(ip, expected)
- def test_floating_ip_release(self):
+ def test_floating_ip_release_associated(self):
+ self.disassociated = False
+
+ def get_floating_ip(ignore, context, id):
+ return {'id': 1, 'address': '10.10.10.10',
+ 'fixed_ip': {'id': 1}}
+
+ def disassociate(ignore, context, floating_address):
+ self.disassociated = True
+
+ self.stubs.Set(network.api.API, "get_floating_ip",
+ get_floating_ip)
+ self.stubs.Set(network.api.API, "disassociate_floating_ip",
+ disassociate)
+ req = webob.Request.blank('/v1.1/123/os-floating-ips/1')
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 202)
+ self.assertTrue(self.disassociated)
+
+ def test_floating_ip_release_disassociated(self):
+ self.disassociated = False
+
+ def fake_get_floating_ip(ignore, context, id):
+ return {'id': 1, 'address': '10.10.10.10',
+ 'fixed_ip': None}
+
+ def fake_disassociate(ignore, context, floating_address):
+ self.disassociated = True
+
+ self.stubs.Set(network.api.API, "get_floating_ip",
+ fake_get_floating_ip)
+ self.stubs.Set(network.api.API, "disassociate_floating_ip",
+ fake_disassociate)
req = webob.Request.blank('/v1.1/123/os-floating-ips/1')
req.method = 'DELETE'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 202)
+ self.assertFalse(self.disassociated)
def test_add_floating_ip_to_instance(self):
self.stubs.Set(network.api.API, "associate_floating_ip",
@@ -289,8 +323,45 @@ class FloatingIpTest(test.TestCase):
self.assertEqual(resp.status_int, 202)
self.assertTrue(self.disassociated)
- def test_remove_floating_ip_from_instance(self):
- body = dict(removeFloatingIp=dict(address='11.0.0.1'))
+ def test_remove_associated_floating_ip_from_instance(self):
+ self.disassociated = False
+
+ def fake_get_floating_ip_by_ip(ignore, context, ip):
+ return {'id': 1, 'address': '10.10.10.10',
+ 'fixed_ip': {'id': 1}}
+
+ def fake_disassociate(ignore, context, floating_address):
+ self.disassociated = True
+
+ self.stubs.Set(network.api.API, "get_floating_ip_by_ip",
+ fake_get_floating_ip_by_ip)
+ self.stubs.Set(network.api.API, "disassociate_floating_ip",
+ fake_disassociate)
+ body = dict(removeFloatingIp=dict(address='10.10.10.10'))
+ req = webob.Request.blank('/v1.1/123/servers/test_inst/action')
+ req.method = "POST"
+ req.body = json.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ resp = req.get_response(fakes.wsgi_app())
+ self.assertEqual(resp.status_int, 202)
+ self.assertTrue(self.disassociated)
+
+ def test_remove_disassociated_floating_ip_from_instance(self):
+ self.disassociated = False
+
+ def fake_get_floating_ip_by_ip(ignore, context, ip):
+ return {'id': 1, 'address': '10.10.10.10',
+ 'fixed_ip': None}
+
+ def fake_disassociate(ignore, context, floating_address):
+ self.disassociated = True
+
+ self.stubs.Set(network.api.API, "get_floating_ip_by_ip",
+ fake_get_floating_ip_by_ip)
+ self.stubs.Set(network.api.API, "disassociate_floating_ip",
+ fake_disassociate)
+ body = dict(removeFloatingIp=dict(address='10.10.10.10'))
req = webob.Request.blank('/v1.1/123/servers/test_inst/action')
req.method = "POST"
req.body = json.dumps(body)
@@ -298,6 +369,7 @@ class FloatingIpTest(test.TestCase):
resp = req.get_response(fakes.wsgi_app())
self.assertEqual(resp.status_int, 202)
+ self.assertFalse(self.disassociated)
def test_bad_address_param_in_remove_floating_ip(self):
body = dict(removeFloatingIp=dict(badparam='11.0.0.1'))
diff --git a/nova/tests/api/openstack/contrib/test_volumes.py b/nova/tests/api/openstack/contrib/test_volumes.py
new file mode 100644
index 000000000..443ec399f
--- /dev/null
+++ b/nova/tests/api/openstack/contrib/test_volumes.py
@@ -0,0 +1,73 @@
+# Copyright 2011 Josh Durgin
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import json
+import webob
+
+import nova
+from nova import context
+from nova import test
+from nova.api.openstack.contrib.volumes import BootFromVolumeController
+from nova.compute import instance_types
+from nova.tests.api.openstack import fakes
+from nova.tests.api.openstack.test_servers import fake_gen_uuid
+
+
+def fake_compute_api_create(cls, context, instance_type, image_href, **kwargs):
+ inst_type = instance_types.get_instance_type_by_flavor_id(2)
+ return [{'id': 1,
+ 'display_name': 'test_server',
+ 'uuid': fake_gen_uuid(),
+ 'instance_type': dict(inst_type),
+ 'access_ip_v4': '1.2.3.4',
+ 'access_ip_v6': 'fead::1234',
+ 'image_ref': 3,
+ 'user_id': 'fake',
+ 'project_id': 'fake',
+ 'created_at': datetime.datetime(2010, 10, 10, 12, 0, 0),
+ 'updated_at': datetime.datetime(2010, 11, 11, 11, 0, 0),
+ }]
+
+
+class BootFromVolumeTest(test.TestCase):
+
+ def setUp(self):
+ super(BootFromVolumeTest, self).setUp()
+ self.stubs.Set(nova.compute.API, 'create', fake_compute_api_create)
+
+ def test_create_root_volume(self):
+ body = dict(server=dict(
+ name='test_server', imageRef=3,
+ flavorRef=2, min_count=1, max_count=1,
+ block_device_mapping=[dict(
+ volume_id=1,
+ device_name='/dev/vda',
+ virtual='root',
+ delete_on_termination=False,
+ )]
+ ))
+ req = webob.Request.blank('/v1.1/fake/os-volumes_boot')
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ req.headers['content-type'] = 'application/json'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ server = json.loads(res.body)['server']
+ self.assertEqual(1, server['id'])
+ self.assertEqual(2, int(server['flavor']['id']))
+ self.assertEqual(u'test_server', server['name'])
+ self.assertEqual(3, int(server['image']['id']))
+ self.assertEqual(16, len(server['adminPass']))
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 44681d395..3a567f0cc 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -40,8 +40,8 @@ from nova.api.openstack import limits
from nova.auth.manager import User, Project
import nova.image.fake
from nova.image import glance
-from nova.image import service
from nova.tests import fake_flags
+from nova.tests.glance import stubs as glance_stubs
class Context(object):
@@ -83,7 +83,7 @@ def wsgi_app(inner_app10=None, inner_app11=None, fake_auth=True,
if fake_auth_context is not None:
ctxt = fake_auth_context
else:
- ctxt = context.RequestContext('fake', 'fake')
+ ctxt = context.RequestContext('fake', 'fake', auth_token=True)
api10 = openstack.FaultWrapper(api_auth.InjectContext(ctxt,
limits.RateLimitingMiddleware(inner_app10)))
api11 = openstack.FaultWrapper(api_auth.InjectContext(ctxt,
@@ -124,7 +124,7 @@ def stub_out_key_pair_funcs(stubs, have_key_pair=True):
def stub_out_image_service(stubs):
- def fake_get_image_service(image_href):
+ def fake_get_image_service(context, image_href):
return (nova.image.fake.FakeImageService(), image_href)
stubs.Set(nova.image, 'get_image_service', fake_get_image_service)
stubs.Set(nova.image, 'get_default_image_service',
@@ -177,6 +177,39 @@ def stub_out_compute_api_backup(stubs):
stubs.Set(nova.compute.API, 'backup', backup)
+def _make_image_fixtures():
+ NOW_GLANCE_FORMAT = "2010-10-11T10:30:22"
+
+ image_id = 123
+ base_attrs = {'deleted': False}
+
+ fixtures = []
+
+ def add_fixture(**kwargs):
+ kwargs.update(base_attrs)
+ fixtures.append(kwargs)
+
+ # Public image
+ add_fixture(id=image_id, name='public image', is_public=True,
+ status='active', properties={'key1': 'value1'})
+ image_id += 1
+
+ # Snapshot for User 1
+ server_ref = 'http://localhost/v1.1/servers/42'
+ snapshot_properties = {'instance_ref': server_ref, 'user_id': 'fake'}
+ for status in ('queued', 'saving', 'active', 'killed',
+ 'deleted', 'pending_delete'):
+ add_fixture(id=image_id, name='%s snapshot' % status,
+ is_public=False, status=status,
+ properties=snapshot_properties)
+ image_id += 1
+
+ # Image without a name
+ add_fixture(id=image_id, is_public=True, status='active', properties={})
+
+ return fixtures
+
+
def stub_out_glance_add_image(stubs, sent_to_glance):
"""
We return the metadata sent to glance by modifying the sent_to_glance dict
@@ -192,91 +225,11 @@ def stub_out_glance_add_image(stubs, sent_to_glance):
stubs.Set(glance_client.Client, 'add_image', fake_add_image)
-def stub_out_glance(stubs, initial_fixtures=None):
-
- class FakeGlanceClient:
-
- def __init__(self, initial_fixtures):
- self.fixtures = initial_fixtures or []
-
- def _filter_images(self, filters=None, marker=None, limit=None):
- found = True
- if marker:
- found = False
- if limit == 0:
- limit = None
-
- fixtures = []
- count = 0
- for f in self.fixtures:
- if limit and count >= limit:
- break
- if found:
- fixtures.append(f)
- count = count + 1
- if f['id'] == marker:
- found = True
-
- return fixtures
-
- def fake_get_images(self, filters=None, marker=None, limit=None):
- fixtures = self._filter_images(filters, marker, limit)
- return [dict(id=f['id'], name=f['name'])
- for f in fixtures]
-
- def fake_get_images_detailed(self, filters=None,
- marker=None, limit=None):
- return self._filter_images(filters, marker, limit)
-
- def fake_get_image_meta(self, image_id):
- image = self._find_image(image_id)
- if image:
- return copy.deepcopy(image)
- raise glance_exc.NotFound
-
- def fake_add_image(self, image_meta, data=None):
- image_meta = copy.deepcopy(image_meta)
- image_id = ''.join(random.choice(string.letters)
- for _ in range(20))
- image_meta['id'] = image_id
- self.fixtures.append(image_meta)
- return copy.deepcopy(image_meta)
-
- def fake_update_image(self, image_id, image_meta, data=None):
- for attr in ('created_at', 'updated_at', 'deleted_at', 'deleted'):
- if attr in image_meta:
- del image_meta[attr]
-
- f = self._find_image(image_id)
- if not f:
- raise glance_exc.NotFound
-
- f.update(image_meta)
- return copy.deepcopy(f)
-
- def fake_delete_image(self, image_id):
- f = self._find_image(image_id)
- if not f:
- raise glance_exc.NotFound
-
- self.fixtures.remove(f)
-
- def _find_image(self, image_id):
- for f in self.fixtures:
- if str(f['id']) == str(image_id):
- return f
- return None
-
- GlanceClient = glance_client.Client
- fake = FakeGlanceClient(initial_fixtures)
-
- stubs.Set(GlanceClient, 'get_images', fake.fake_get_images)
- stubs.Set(GlanceClient, 'get_images_detailed',
- fake.fake_get_images_detailed)
- stubs.Set(GlanceClient, 'get_image_meta', fake.fake_get_image_meta)
- stubs.Set(GlanceClient, 'add_image', fake.fake_add_image)
- stubs.Set(GlanceClient, 'update_image', fake.fake_update_image)
- stubs.Set(GlanceClient, 'delete_image', fake.fake_delete_image)
+def stub_out_glance(stubs):
+ def fake_get_image_service():
+ client = glance_stubs.StubGlanceClient(_make_image_fixtures())
+ return nova.image.glance.GlanceImageService(client)
+ stubs.Set(nova.image, 'get_default_image_service', fake_get_image_service)
class FakeToken(object):
diff --git a/nova/tests/api/openstack/test_api.py b/nova/tests/api/openstack/test_api.py
index 7321c329f..b7a0b01ef 100644
--- a/nova/tests/api/openstack/test_api.py
+++ b/nova/tests/api/openstack/test_api.py
@@ -20,6 +20,7 @@ import json
import webob.exc
import webob.dec
+from lxml import etree
from webob import Request
from nova import test
@@ -52,6 +53,30 @@ class APITest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
+ def test_vendor_content_type_json(self):
+ ctype = 'application/vnd.openstack.compute+json'
+
+ req = webob.Request.blank('/')
+ req.headers['Accept'] = ctype
+
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res.content_type, ctype)
+
+ body = json.loads(res.body)
+
+ def test_vendor_content_type_xml(self):
+ ctype = 'application/vnd.openstack.compute+xml'
+
+ req = webob.Request.blank('/')
+ req.headers['Accept'] = ctype
+
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res.content_type, ctype)
+
+ body = etree.XML(res.body)
+
def test_exceptions_are_converted_to_faults(self):
@webob.dec.wsgify
diff --git a/nova/tests/api/openstack/test_common.py b/nova/tests/api/openstack/test_common.py
index b422bc4d1..1628ad1c8 100644
--- a/nova/tests/api/openstack/test_common.py
+++ b/nova/tests/api/openstack/test_common.py
@@ -19,6 +19,7 @@
Test suites for 'common' code used throughout the OpenStack HTTP API.
"""
+from lxml import etree
import webob.exc
import xml.dom.minidom as minidom
@@ -26,6 +27,11 @@ from webob import Request
from nova import test
from nova.api.openstack import common
+from nova.api.openstack import xmlutil
+
+
+NS = "{http://docs.openstack.org/compute/api/v1.1}"
+ATOMNS = "{http://www.w3.org/2005/Atom}"
class LimiterTest(test.TestCase):
@@ -237,21 +243,41 @@ class MiscFunctionsTest(test.TestCase):
common.remove_version_from_href,
fixture)
- def test_get_id_from_href(self):
+ def test_get_id_from_href_with_int_url(self):
fixture = 'http://www.testsite.com/dir/45'
actual = common.get_id_from_href(fixture)
- expected = 45
+ expected = '45'
self.assertEqual(actual, expected)
- def test_get_id_from_href_bad_request(self):
- fixture = 'http://45'
- self.assertRaises(ValueError,
- common.get_id_from_href,
- fixture)
+ def test_get_id_from_href_with_int(self):
+ fixture = '45'
+ actual = common.get_id_from_href(fixture)
+ expected = '45'
+ self.assertEqual(actual, expected)
+
+ def test_get_id_from_href_with_int_url_query(self):
+ fixture = 'http://www.testsite.com/dir/45?asdf=jkl'
+ actual = common.get_id_from_href(fixture)
+ expected = '45'
+ self.assertEqual(actual, expected)
- def test_get_id_from_href_int(self):
- fixture = 1
- self.assertEqual(fixture, common.get_id_from_href(fixture))
+ def test_get_id_from_href_with_uuid_url(self):
+ fixture = 'http://www.testsite.com/dir/abc123'
+ actual = common.get_id_from_href(fixture)
+ expected = "abc123"
+ self.assertEqual(actual, expected)
+
+ def test_get_id_from_href_with_uuid_url_query(self):
+ fixture = 'http://www.testsite.com/dir/abc123?asdf=jkl'
+ actual = common.get_id_from_href(fixture)
+ expected = "abc123"
+ self.assertEqual(actual, expected)
+
+ def test_get_id_from_href_with_uuid(self):
+ fixture = 'abc123'
+ actual = common.get_id_from_href(fixture)
+ expected = 'abc123'
+ self.assertEqual(actual, expected)
def test_get_version_from_href(self):
fixture = 'http://www.testsite.com/v1.1/images'
@@ -314,7 +340,7 @@ class MetadataXMLDeserializationTest(test.TestCase):
class MetadataXMLSerializationTest(test.TestCase):
- def test_index(self):
+ def test_xml_declaration(self):
serializer = common.MetadataXMLSerializer()
fixture = {
'metadata': {
@@ -322,17 +348,31 @@ class MetadataXMLSerializationTest(test.TestCase):
'three': 'four',
},
}
- output = serializer.serialize(fixture, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
- expected = minidom.parseString("""
- <metadata xmlns="http://docs.openstack.org/compute/api/v1.1">
- <meta key="three">four</meta>
- <meta key="one">two</meta>
- </metadata>
- """.replace(" ", "").replace("\n", ""))
+ output = serializer.serialize(fixture, 'index')
+ print output
+ has_dec = output.startswith("<?xml version='1.0' encoding='UTF-8'?>")
+ self.assertTrue(has_dec)
- self.assertEqual(expected.toxml(), actual.toxml())
+ def test_index(self):
+ serializer = common.MetadataXMLSerializer()
+ fixture = {
+ 'metadata': {
+ 'one': 'two',
+ 'three': 'four',
+ },
+ }
+ output = serializer.serialize(fixture, 'index')
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'metadata')
+ metadata_dict = fixture['metadata']
+ metadata_elems = root.findall('{0}meta'.format(NS))
+ self.assertEqual(len(metadata_elems), 2)
+ for i, metadata_elem in enumerate(metadata_elems):
+ (meta_key, meta_value) = metadata_dict.items()[i]
+ self.assertEqual(str(metadata_elem.get('key')), str(meta_key))
+ self.assertEqual(str(metadata_elem.text).strip(), str(meta_value))
def test_index_null(self):
serializer = common.MetadataXMLSerializer()
@@ -342,15 +382,16 @@ class MetadataXMLSerializationTest(test.TestCase):
},
}
output = serializer.serialize(fixture, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <metadata xmlns="http://docs.openstack.org/compute/api/v1.1">
- <meta key="None">None</meta>
- </metadata>
- """.replace(" ", "").replace("\n", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'metadata')
+ metadata_dict = fixture['metadata']
+ metadata_elems = root.findall('{0}meta'.format(NS))
+ self.assertEqual(len(metadata_elems), 1)
+ for i, metadata_elem in enumerate(metadata_elems):
+ (meta_key, meta_value) = metadata_dict.items()[i]
+ self.assertEqual(str(metadata_elem.get('key')), str(meta_key))
+ self.assertEqual(str(metadata_elem.text).strip(), str(meta_value))
def test_index_unicode(self):
serializer = common.MetadataXMLSerializer()
@@ -360,15 +401,16 @@ class MetadataXMLSerializationTest(test.TestCase):
},
}
output = serializer.serialize(fixture, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString(u"""
- <metadata xmlns="http://docs.openstack.org/compute/api/v1.1">
- <meta key="three">Jos\xe9</meta>
- </metadata>
- """.encode("UTF-8").replace(" ", "").replace("\n", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'metadata')
+ metadata_dict = fixture['metadata']
+ metadata_elems = root.findall('{0}meta'.format(NS))
+ self.assertEqual(len(metadata_elems), 1)
+ for i, metadata_elem in enumerate(metadata_elems):
+ (meta_key, meta_value) = metadata_dict.items()[i]
+ self.assertEqual(str(metadata_elem.get('key')), str(meta_key))
+ self.assertEqual(metadata_elem.text.strip(), meta_value)
def test_show(self):
serializer = common.MetadataXMLSerializer()
@@ -378,14 +420,12 @@ class MetadataXMLSerializationTest(test.TestCase):
},
}
output = serializer.serialize(fixture, 'show')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <meta xmlns="http://docs.openstack.org/compute/api/v1.1"
- key="one">two</meta>
- """.replace(" ", "").replace("\n", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ meta_dict = fixture['meta']
+ (meta_key, meta_value) = meta_dict.items()[0]
+ self.assertEqual(str(root.get('key')), str(meta_key))
+ self.assertEqual(root.text.strip(), meta_value)
def test_update_all(self):
serializer = common.MetadataXMLSerializer()
@@ -396,16 +436,16 @@ class MetadataXMLSerializationTest(test.TestCase):
},
}
output = serializer.serialize(fixture, 'update_all')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <metadata xmlns="http://docs.openstack.org/compute/api/v1.1">
- <meta key="key6">value6</meta>
- <meta key="key4">value4</meta>
- </metadata>
- """.replace(" ", "").replace("\n", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'metadata')
+ metadata_dict = fixture['metadata']
+ metadata_elems = root.findall('{0}meta'.format(NS))
+ self.assertEqual(len(metadata_elems), 2)
+ for i, metadata_elem in enumerate(metadata_elems):
+ (meta_key, meta_value) = metadata_dict.items()[i]
+ self.assertEqual(str(metadata_elem.get('key')), str(meta_key))
+ self.assertEqual(str(metadata_elem.text).strip(), str(meta_value))
def test_update_item(self):
serializer = common.MetadataXMLSerializer()
@@ -415,14 +455,12 @@ class MetadataXMLSerializationTest(test.TestCase):
},
}
output = serializer.serialize(fixture, 'update')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <meta xmlns="http://docs.openstack.org/compute/api/v1.1"
- key="one">two</meta>
- """.replace(" ", "").replace("\n", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ meta_dict = fixture['meta']
+ (meta_key, meta_value) = meta_dict.items()[0]
+ self.assertEqual(str(root.get('key')), str(meta_key))
+ self.assertEqual(root.text.strip(), meta_value)
def test_create(self):
serializer = common.MetadataXMLSerializer()
@@ -434,6 +472,16 @@ class MetadataXMLSerializationTest(test.TestCase):
},
}
output = serializer.serialize(fixture, 'create')
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'metadata')
+ metadata_dict = fixture['metadata']
+ metadata_elems = root.findall('{0}meta'.format(NS))
+ self.assertEqual(len(metadata_elems), 3)
+ for i, metadata_elem in enumerate(metadata_elems):
+ (meta_key, meta_value) = metadata_dict.items()[i]
+ self.assertEqual(str(metadata_elem.get('key')), str(meta_key))
+ self.assertEqual(str(metadata_elem.text).strip(), str(meta_value))
actual = minidom.parseString(output.replace(" ", ""))
expected = minidom.parseString("""
diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py
index 31443242b..44f4eb055 100644
--- a/nova/tests/api/openstack/test_extensions.py
+++ b/nova/tests/api/openstack/test_extensions.py
@@ -87,6 +87,7 @@ class ExtensionControllerTest(test.TestCase):
self.ext_list = [
"Createserverext",
"FlavorExtraSpecs",
+ "FlavorExtraData",
"Floating_ips",
"Fox In Socks",
"Hosts",
diff --git a/nova/tests/api/openstack/test_flavors.py b/nova/tests/api/openstack/test_flavors.py
index 812bece42..348042bfe 100644
--- a/nova/tests/api/openstack/test_flavors.py
+++ b/nova/tests/api/openstack/test_flavors.py
@@ -17,16 +17,21 @@
import json
import webob
-import xml.dom.minidom as minidom
+from lxml import etree
from nova.api.openstack import flavors
import nova.db.api
from nova import exception
from nova import test
+from nova.api.openstack import xmlutil
from nova.tests.api.openstack import fakes
from nova import wsgi
+NS = "{http://docs.openstack.org/compute/api/v1.1}"
+ATOMNS = "{http://www.w3.org/2005/Atom}"
+
+
def stub_flavor(flavorid, name, memory_mb="256", local_gb="10"):
return {
"flavorid": str(flavorid),
@@ -107,12 +112,20 @@ class FlavorsTest(test.TestCase):
"name": "flavor 1",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
},
{
"id": "2",
"name": "flavor 2",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
},
]
self.assertEqual(flavors, expected)
@@ -127,6 +140,10 @@ class FlavorsTest(test.TestCase):
"name": "flavor 12",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
}
self.assertEqual(flavor, expected)
@@ -149,6 +166,10 @@ class FlavorsTest(test.TestCase):
"name": "flavor 12",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -216,6 +237,10 @@ class FlavorsTest(test.TestCase):
"name": "flavor 1",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -232,6 +257,10 @@ class FlavorsTest(test.TestCase):
"name": "flavor 2",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -262,15 +291,50 @@ class FlavorsTest(test.TestCase):
class FlavorsXMLSerializationTest(test.TestCase):
+ def test_xml_declaration(self):
+ serializer = flavors.FlavorXMLSerializer()
+
+ fixture = {
+ "flavor": {
+ "id": "12",
+ "name": "asdf",
+ "ram": "256",
+ "disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/flavors/12",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/flavors/12",
+ },
+ ],
+ },
+ }
+
+ output = serializer.serialize(fixture, 'show')
+ print output
+ has_dec = output.startswith("<?xml version='1.0' encoding='UTF-8'?>")
+ self.assertTrue(has_dec)
+
def test_show(self):
serializer = flavors.FlavorXMLSerializer()
- input = {
+ fixture = {
"flavor": {
"id": "12",
"name": "asdf",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -284,34 +348,34 @@ class FlavorsXMLSerializationTest(test.TestCase):
},
}
- output = serializer.serialize(input, 'show')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <flavor xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom"
- id="12"
- name="asdf"
- ram="256"
- disk="10">
- <atom:link href="http://localhost/v1.1/fake/flavors/12"
- rel="self"/>
- <atom:link href="http://localhost/fake/flavors/12"
- rel="bookmark"/>
- </flavor>
- """.replace(" ", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ output = serializer.serialize(fixture, 'show')
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'flavor')
+ flavor_dict = fixture['flavor']
+
+ for key in ['name', 'id', 'ram', 'disk']:
+ self.assertEqual(root.get(key), str(flavor_dict[key]))
+
+ link_nodes = root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(flavor_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
def test_show_handles_integers(self):
serializer = flavors.FlavorXMLSerializer()
- input = {
+ fixture = {
"flavor": {
"id": 12,
"name": "asdf",
"ram": 256,
"disk": 10,
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -325,35 +389,35 @@ class FlavorsXMLSerializationTest(test.TestCase):
},
}
- output = serializer.serialize(input, 'show')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <flavor xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom"
- id="12"
- name="asdf"
- ram="256"
- disk="10">
- <atom:link href="http://localhost/v1.1/fake/flavors/12"
- rel="self"/>
- <atom:link href="http://localhost/fake/flavors/12"
- rel="bookmark"/>
- </flavor>
- """.replace(" ", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ output = serializer.serialize(fixture, 'show')
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'flavor')
+ flavor_dict = fixture['flavor']
+
+ for key in ['name', 'id', 'ram', 'disk']:
+ self.assertEqual(root.get(key), str(flavor_dict[key]))
+
+ link_nodes = root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(flavor_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
def test_detail(self):
serializer = flavors.FlavorXMLSerializer()
- input = {
+ fixture = {
"flavors": [
{
"id": "23",
"name": "flavor 23",
"ram": "512",
"disk": "20",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -369,6 +433,10 @@ class FlavorsXMLSerializationTest(test.TestCase):
"name": "flavor 13",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -383,45 +451,38 @@ class FlavorsXMLSerializationTest(test.TestCase):
],
}
- output = serializer.serialize(input, 'detail')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <flavors xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom">
- <flavor id="23"
- name="flavor 23"
- ram="512"
- disk="20">
- <atom:link href="http://localhost/v1.1/fake/flavors/23"
- rel="self"/>
- <atom:link href="http://localhost/fake/flavors/23"
- rel="bookmark"/>
- </flavor>
- <flavor id="13"
- name="flavor 13"
- ram="256"
- disk="10">
- <atom:link href="http://localhost/v1.1/fake/flavors/13"
- rel="self"/>
- <atom:link href="http://localhost/fake/flavors/13"
- rel="bookmark"/>
- </flavor>
- </flavors>
- """.replace(" ", "") % locals())
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ output = serializer.serialize(fixture, 'detail')
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'flavors')
+ flavor_elems = root.findall('{0}flavor'.format(NS))
+ self.assertEqual(len(flavor_elems), 2)
+ for i, flavor_elem in enumerate(flavor_elems):
+ flavor_dict = fixture['flavors'][i]
+
+ for key in ['name', 'id', 'ram', 'disk']:
+ self.assertEqual(flavor_elem.get(key), str(flavor_dict[key]))
+
+ link_nodes = flavor_elem.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(flavor_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
def test_index(self):
serializer = flavors.FlavorXMLSerializer()
- input = {
+ fixture = {
"flavors": [
{
"id": "23",
"name": "flavor 23",
"ram": "512",
"disk": "20",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -437,6 +498,10 @@ class FlavorsXMLSerializationTest(test.TestCase):
"name": "flavor 13",
"ram": "256",
"disk": "10",
+ "rxtx_cap": "",
+ "rxtx_quota": "",
+ "swap": "",
+ "vcpus": "",
"links": [
{
"rel": "self",
@@ -451,42 +516,34 @@ class FlavorsXMLSerializationTest(test.TestCase):
],
}
- output = serializer.serialize(input, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <flavors xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom">
- <flavor id="23" name="flavor 23">
- <atom:link href="http://localhost/v1.1/fake/flavors/23"
- rel="self"/>
- <atom:link href="http://localhost/fake/flavors/23"
- rel="bookmark"/>
- </flavor>
- <flavor id="13" name="flavor 13">
- <atom:link href="http://localhost/v1.1/fake/flavors/13"
- rel="self"/>
- <atom:link href="http://localhost/fake/flavors/13"
- rel="bookmark"/>
- </flavor>
- </flavors>
- """.replace(" ", "") % locals())
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ output = serializer.serialize(fixture, 'index')
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'flavors_index')
+ flavor_elems = root.findall('{0}flavor'.format(NS))
+ self.assertEqual(len(flavor_elems), 2)
+ for i, flavor_elem in enumerate(flavor_elems):
+ flavor_dict = fixture['flavors'][i]
+
+ for key in ['name', 'id']:
+ self.assertEqual(flavor_elem.get(key), str(flavor_dict[key]))
+
+ link_nodes = flavor_elem.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(flavor_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
def test_index_empty(self):
serializer = flavors.FlavorXMLSerializer()
- input = {
+ fixture = {
"flavors": [],
}
- output = serializer.serialize(input, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <flavors xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom" />
- """.replace(" ", "") % locals())
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ output = serializer.serialize(fixture, 'index')
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'flavors_index')
+ flavor_elems = root.findall('{0}flavor'.format(NS))
+ self.assertEqual(len(flavor_elems), 0)
diff --git a/nova/tests/api/openstack/test_image_metadata.py b/nova/tests/api/openstack/test_image_metadata.py
index fe42e35e5..314c3c38e 100644
--- a/nova/tests/api/openstack/test_image_metadata.py
+++ b/nova/tests/api/openstack/test_image_metadata.py
@@ -23,7 +23,6 @@ from nova import flags
from nova.api import openstack
from nova import test
from nova.tests.api.openstack import fakes
-import nova.wsgi
FLAGS = flags.FLAGS
@@ -31,76 +30,20 @@ FLAGS = flags.FLAGS
class ImageMetaDataTest(test.TestCase):
- IMAGE_FIXTURES = [
- {'status': 'active',
- 'name': 'image1',
- 'deleted': False,
- 'container_format': None,
- 'checksum': None,
- 'created_at': '2011-03-22T17:40:15',
- 'disk_format': None,
- 'updated_at': '2011-03-22T17:40:15',
- 'id': '1',
- 'location': 'file:///var/lib/glance/images/1',
- 'is_public': True,
- 'deleted_at': None,
- 'properties': {
- 'key1': 'value1',
- 'key2': 'value2'},
- 'size': 5882349},
- {'status': 'active',
- 'name': 'image2',
- 'deleted': False,
- 'container_format': None,
- 'checksum': None,
- 'created_at': '2011-03-22T17:40:15',
- 'disk_format': None,
- 'updated_at': '2011-03-22T17:40:15',
- 'id': '2',
- 'location': 'file:///var/lib/glance/images/2',
- 'is_public': True,
- 'deleted_at': None,
- 'properties': {
- 'key1': 'value1',
- 'key2': 'value2'},
- 'size': 5882349},
- {'status': 'active',
- 'name': 'image3',
- 'deleted': False,
- 'container_format': None,
- 'checksum': None,
- 'created_at': '2011-03-22T17:40:15',
- 'disk_format': None,
- 'updated_at': '2011-03-22T17:40:15',
- 'id': '3',
- 'location': 'file:///var/lib/glance/images/2',
- 'is_public': True,
- 'deleted_at': None,
- 'properties': {},
- 'size': 5882349},
- ]
-
def setUp(self):
super(ImageMetaDataTest, self).setUp()
- self.flags(image_service='nova.image.glance.GlanceImageService')
- # NOTE(dprince) max out properties/metadata in image 3 for testing
- img3 = self.IMAGE_FIXTURES[2]
- for num in range(FLAGS.quota_metadata_items):
- img3['properties']['key%i' % num] = "blah"
- fakes.stub_out_glance(self.stubs, self.IMAGE_FIXTURES)
+ fakes.stub_out_glance(self.stubs)
def test_index(self):
- req = webob.Request.blank('/v1.1/123/images/1/metadata')
+ req = webob.Request.blank('/v1.1/123/images/123/metadata')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(200, res.status_int)
- expected = self.IMAGE_FIXTURES[0]['properties']
- self.assertEqual(len(expected), len(res_dict['metadata']))
- for (key, value) in res_dict['metadata'].items():
- self.assertEqual(value, res_dict['metadata'][key])
+ expected = {'metadata': {'key1': 'value1'}}
+ self.assertEqual(res_dict, expected)
def test_show(self):
- req = webob.Request.blank('/v1.1/fake/images/1/metadata/key1')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/key1')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(200, res.status_int)
@@ -109,32 +52,38 @@ class ImageMetaDataTest(test.TestCase):
self.assertEqual('value1', res_dict['meta']['key1'])
def test_show_not_found(self):
- req = webob.Request.blank('/v1.1/fake/images/1/metadata/key9')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/key9')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(404, res.status_int)
+
+ def test_show_image_not_found(self):
+ req = webob.Request.blank('/v1.1/fake/images/100/metadata/key1')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(404, res.status_int)
def test_create(self):
- req = webob.Request.blank('/v1.1/fake/images/2/metadata')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata')
req.method = 'POST'
- req.body = '{"metadata": {"key9": "value9"}}'
+ req.body = '{"metadata": {"key7": "value7"}}'
req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
self.assertEqual(200, res.status_int)
actual_output = json.loads(res.body)
+ expected_output = {'metadata': {'key1': 'value1', 'key7': 'value7'}}
+ self.assertEqual(expected_output, actual_output)
- expected_output = {
- 'metadata': {
- 'key1': 'value1',
- 'key2': 'value2',
- 'key9': 'value9',
- },
- }
+ def test_create_image_not_found(self):
+ req = webob.Request.blank('/v1.1/fake/images/100/metadata')
+ req.method = 'POST'
+ req.body = '{"metadata": {"key7": "value7"}}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
- self.assertEqual(expected_output, actual_output)
+ self.assertEqual(404, res.status_int)
def test_update_all(self):
- req = webob.Request.blank('/v1.1/fake/images/1/metadata')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata')
req.method = 'PUT'
req.body = '{"metadata": {"key9": "value9"}}'
req.headers["content-type"] = "application/json"
@@ -142,17 +91,20 @@ class ImageMetaDataTest(test.TestCase):
self.assertEqual(200, res.status_int)
actual_output = json.loads(res.body)
+ expected_output = {'metadata': {'key9': 'value9'}}
+ self.assertEqual(expected_output, actual_output)
- expected_output = {
- 'metadata': {
- 'key9': 'value9',
- },
- }
+ def test_update_all_image_not_found(self):
+ req = webob.Request.blank('/v1.1/fake/images/100/metadata')
+ req.method = 'PUT'
+ req.body = '{"metadata": {"key9": "value9"}}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
- self.assertEqual(expected_output, actual_output)
+ self.assertEqual(404, res.status_int)
def test_update_item(self):
- req = webob.Request.blank('/v1.1/fake/images/1/metadata/key1')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/key1')
req.method = 'PUT'
req.body = '{"meta": {"key1": "zz"}}'
req.headers["content-type"] = "application/json"
@@ -160,15 +112,20 @@ class ImageMetaDataTest(test.TestCase):
self.assertEqual(200, res.status_int)
actual_output = json.loads(res.body)
- expected_output = {
- 'meta': {
- 'key1': 'zz',
- },
- }
+ expected_output = {'meta': {'key1': 'zz'}}
self.assertEqual(actual_output, expected_output)
+ def test_update_item_image_not_found(self):
+ req = webob.Request.blank('/v1.1/fake/images/100/metadata/key1')
+ req.method = 'PUT'
+ req.body = '{"meta": {"key1": "zz"}}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+
+ self.assertEqual(404, res.status_int)
+
def test_update_item_bad_body(self):
- req = webob.Request.blank('/v1.1/fake/images/1/metadata/key1')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/key1')
req.method = 'PUT'
req.body = '{"key1": "zz"}'
req.headers["content-type"] = "application/json"
@@ -176,15 +133,18 @@ class ImageMetaDataTest(test.TestCase):
self.assertEqual(400, res.status_int)
def test_update_item_too_many_keys(self):
- req = webob.Request.blank('/v1.1/fake/images/1/metadata/key1')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/key1')
req.method = 'PUT'
- req.body = '{"meta": {"key1": "value1", "key2": "value2"}}'
+ overload = {}
+ for num in range(FLAGS.quota_metadata_items + 1):
+ overload['key%s' % num] = 'value%s' % num
+ req.body = json.dumps({'meta': overload})
req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
self.assertEqual(400, res.status_int)
def test_update_item_body_uri_mismatch(self):
- req = webob.Request.blank('/v1.1/fake/images/1/metadata/bad')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/bad')
req.method = 'PUT'
req.body = '{"meta": {"key1": "value1"}}'
req.headers["content-type"] = "application/json"
@@ -192,7 +152,7 @@ class ImageMetaDataTest(test.TestCase):
self.assertEqual(400, res.status_int)
def test_update_item_xml(self):
- req = webob.Request.blank('/v1.1/fake/images/1/metadata/key1')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/key1')
req.method = 'PUT'
req.body = '<meta key="key1">five</meta>'
req.headers["content-type"] = "application/xml"
@@ -200,22 +160,24 @@ class ImageMetaDataTest(test.TestCase):
self.assertEqual(200, res.status_int)
actual_output = json.loads(res.body)
- expected_output = {
- 'meta': {
- 'key1': 'five',
- },
- }
+ expected_output = {'meta': {'key1': 'five'}}
self.assertEqual(actual_output, expected_output)
def test_delete(self):
- req = webob.Request.blank('/v1.1/fake/images/2/metadata/key1')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/key1')
req.method = 'DELETE'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(204, res.status_int)
self.assertEqual('', res.body)
def test_delete_not_found(self):
- req = webob.Request.blank('/v1.1/fake/images/2/metadata/blah')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/blah')
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(404, res.status_int)
+
+ def test_delete_image_not_found(self):
+ req = webob.Request.blank('/v1.1/fake/images/100/metadata/key1')
req.method = 'DELETE'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(404, res.status_int)
@@ -225,7 +187,7 @@ class ImageMetaDataTest(test.TestCase):
for num in range(FLAGS.quota_metadata_items + 1):
data['metadata']['key%i' % num] = "blah"
json_string = str(data).replace("\'", "\"")
- req = webob.Request.blank('/v1.1/fake/images/2/metadata')
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata')
req.method = 'POST'
req.body = json_string
req.headers["content-type"] = "application/json"
@@ -233,7 +195,8 @@ class ImageMetaDataTest(test.TestCase):
self.assertEqual(413, res.status_int)
def test_too_many_metadata_items_on_put(self):
- req = webob.Request.blank('/v1.1/fake/images/3/metadata/blah')
+ FLAGS.quota_metadata_items = 1
+ req = webob.Request.blank('/v1.1/fake/images/123/metadata/blah')
req.method = 'PUT'
req.body = '{"meta": {"blah": "blah"}}'
req.headers["content-type"] = "application/json"
diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py
index 2a7cfc382..e5fd4764a 100644
--- a/nova/tests/api/openstack/test_images.py
+++ b/nova/tests/api/openstack/test_images.py
@@ -22,340 +22,57 @@ and as a WSGI layer
import copy
import json
-import os
-import shutil
-import tempfile
import xml.dom.minidom as minidom
+from lxml import etree
import mox
import stubout
import webob
-from glance import client as glance_client
from nova import context
-from nova import exception
-from nova import test
-from nova import utils
import nova.api.openstack
from nova.api.openstack import images
+from nova.api.openstack import xmlutil
+from nova import test
from nova.tests.api.openstack import fakes
-class _BaseImageServiceTests(test.TestCase):
- """Tasks to test for all image services"""
-
- def __init__(self, *args, **kwargs):
- super(_BaseImageServiceTests, self).__init__(*args, **kwargs)
- self.service = None
- self.context = None
-
- def test_create(self):
- fixture = self._make_fixture('test image')
- num_images = len(self.service.index(self.context))
-
- image_id = self.service.create(self.context, fixture)['id']
-
- self.assertNotEquals(None, image_id)
- self.assertEquals(num_images + 1,
- len(self.service.index(self.context)))
-
- def test_create_and_show_non_existing_image(self):
- fixture = self._make_fixture('test image')
- num_images = len(self.service.index(self.context))
-
- image_id = self.service.create(self.context, fixture)['id']
-
- self.assertNotEquals(None, image_id)
- self.assertRaises(exception.NotFound,
- self.service.show,
- self.context,
- 'bad image id')
-
- def test_create_and_show_non_existing_image_by_name(self):
- fixture = self._make_fixture('test image')
- num_images = len(self.service.index(self.context))
-
- image_id = self.service.create(self.context, fixture)['id']
-
- self.assertNotEquals(None, image_id)
- self.assertRaises(exception.ImageNotFound,
- self.service.show_by_name,
- self.context,
- 'bad image id')
-
- def test_update(self):
- fixture = self._make_fixture('test image')
- image_id = self.service.create(self.context, fixture)['id']
- fixture['status'] = 'in progress'
-
- self.service.update(self.context, image_id, fixture)
-
- new_image_data = self.service.show(self.context, image_id)
- self.assertEquals('in progress', new_image_data['status'])
-
- def test_delete(self):
- fixture1 = self._make_fixture('test image 1')
- fixture2 = self._make_fixture('test image 2')
- fixtures = [fixture1, fixture2]
-
- num_images = len(self.service.index(self.context))
- self.assertEquals(0, num_images, str(self.service.index(self.context)))
-
- ids = []
- for fixture in fixtures:
- new_id = self.service.create(self.context, fixture)['id']
- ids.append(new_id)
+NS = "{http://docs.openstack.org/compute/api/v1.1}"
+ATOMNS = "{http://www.w3.org/2005/Atom}"
+NOW_API_FORMAT = "2010-10-11T10:30:22Z"
- num_images = len(self.service.index(self.context))
- self.assertEquals(2, num_images, str(self.service.index(self.context)))
- self.service.delete(self.context, ids[0])
-
- num_images = len(self.service.index(self.context))
- self.assertEquals(1, num_images)
-
- def test_index(self):
- fixture = self._make_fixture('test image')
- image_id = self.service.create(self.context, fixture)['id']
- image_metas = self.service.index(self.context)
- expected = [{'id': 'DONTCARE', 'name': 'test image'}]
- self.assertDictListMatch(image_metas, expected)
-
- @staticmethod
- def _make_fixture(name):
- fixture = {'name': name,
- 'updated': None,
- 'created': None,
- 'status': None,
- 'is_public': True}
- return fixture
-
-
-class GlanceImageServiceTest(_BaseImageServiceTests):
-
- """Tests the Glance image service, in particular that metadata translation
- works properly.
-
- At a high level, the translations involved are:
-
- 1. Glance -> ImageService - This is needed so we can support
- multple ImageServices (Glance, Local, etc)
-
- 2. ImageService -> API - This is needed so we can support multple
- APIs (OpenStack, EC2)
- """
- def setUp(self):
- super(GlanceImageServiceTest, self).setUp()
- self.stubs = stubout.StubOutForTesting()
- fakes.stub_out_glance(self.stubs)
- fakes.stub_out_compute_api_snapshot(self.stubs)
- service_class = 'nova.image.glance.GlanceImageService'
- self.service = utils.import_object(service_class)
- self.context = context.RequestContext('fake', 'fake')
- self.service.delete_all()
- self.sent_to_glance = {}
- fakes.stub_out_glance_add_image(self.stubs, self.sent_to_glance)
-
- def tearDown(self):
- self.stubs.UnsetAll()
- super(GlanceImageServiceTest, self).tearDown()
-
- def test_create_with_instance_id(self):
- """Ensure instance_id is persisted as an image-property"""
- fixture = {'name': 'test image',
- 'is_public': False,
- 'properties': {'instance_id': '42', 'user_id': 'fake'}}
-
- image_id = self.service.create(self.context, fixture)['id']
- expected = fixture
- self.assertDictMatch(self.sent_to_glance['metadata'], expected)
-
- image_meta = self.service.show(self.context, image_id)
- expected = {'id': image_id,
- 'name': 'test image',
- 'is_public': False,
- 'properties': {'instance_id': '42', 'user_id': 'fake'}}
- self.assertDictMatch(image_meta, expected)
-
- image_metas = self.service.detail(self.context)
- self.assertDictMatch(image_metas[0], expected)
-
- def test_create_without_instance_id(self):
- """
- Ensure we can create an image without having to specify an
- instance_id. Public images are an example of an image not tied to an
- instance.
- """
- fixture = {'name': 'test image'}
- image_id = self.service.create(self.context, fixture)['id']
-
- expected = {'name': 'test image', 'properties': {}}
- self.assertDictMatch(self.sent_to_glance['metadata'], expected)
-
- def test_index_default_limit(self):
- fixtures = []
- ids = []
- for i in range(10):
- fixture = self._make_fixture('TestImage %d' % (i))
- fixtures.append(fixture)
- ids.append(self.service.create(self.context, fixture)['id'])
-
- image_metas = self.service.index(self.context)
- i = 0
- for meta in image_metas:
- expected = {'id': 'DONTCARE',
- 'name': 'TestImage %d' % (i)}
- self.assertDictMatch(meta, expected)
- i = i + 1
-
- def test_index_marker(self):
- fixtures = []
- ids = []
- for i in range(10):
- fixture = self._make_fixture('TestImage %d' % (i))
- fixtures.append(fixture)
- ids.append(self.service.create(self.context, fixture)['id'])
-
- image_metas = self.service.index(self.context, marker=ids[1])
- self.assertEquals(len(image_metas), 8)
- i = 2
- for meta in image_metas:
- expected = {'id': 'DONTCARE',
- 'name': 'TestImage %d' % (i)}
- self.assertDictMatch(meta, expected)
- i = i + 1
-
- def test_index_limit(self):
- fixtures = []
- ids = []
- for i in range(10):
- fixture = self._make_fixture('TestImage %d' % (i))
- fixtures.append(fixture)
- ids.append(self.service.create(self.context, fixture)['id'])
-
- image_metas = self.service.index(self.context, limit=3)
- self.assertEquals(len(image_metas), 3)
-
- def test_index_marker_and_limit(self):
- fixtures = []
- ids = []
- for i in range(10):
- fixture = self._make_fixture('TestImage %d' % (i))
- fixtures.append(fixture)
- ids.append(self.service.create(self.context, fixture)['id'])
-
- image_metas = self.service.index(self.context, marker=ids[3], limit=1)
- self.assertEquals(len(image_metas), 1)
- i = 4
- for meta in image_metas:
- expected = {'id': 'DONTCARE',
- 'name': 'TestImage %d' % (i)}
- self.assertDictMatch(meta, expected)
- i = i + 1
-
- def test_detail_marker(self):
- fixtures = []
- ids = []
- for i in range(10):
- fixture = self._make_fixture('TestImage %d' % (i))
- fixtures.append(fixture)
- ids.append(self.service.create(self.context, fixture)['id'])
-
- image_metas = self.service.detail(self.context, marker=ids[1])
- self.assertEquals(len(image_metas), 8)
- i = 2
- for meta in image_metas:
- expected = {
- 'id': 'DONTCARE',
- 'status': None,
- 'is_public': True,
- 'name': 'TestImage %d' % (i),
- 'properties': {
- 'updated': None,
- 'created': None,
- },
- }
-
- self.assertDictMatch(meta, expected)
- i = i + 1
-
- def test_detail_limit(self):
- fixtures = []
- ids = []
- for i in range(10):
- fixture = self._make_fixture('TestImage %d' % (i))
- fixtures.append(fixture)
- ids.append(self.service.create(self.context, fixture)['id'])
-
- image_metas = self.service.detail(self.context, limit=3)
- self.assertEquals(len(image_metas), 3)
-
- def test_detail_marker_and_limit(self):
- fixtures = []
- ids = []
- for i in range(10):
- fixture = self._make_fixture('TestImage %d' % (i))
- fixtures.append(fixture)
- ids.append(self.service.create(self.context, fixture)['id'])
-
- image_metas = self.service.detail(self.context, marker=ids[3], limit=3)
- self.assertEquals(len(image_metas), 3)
- i = 4
- for meta in image_metas:
- expected = {
- 'id': 'DONTCARE',
- 'status': None,
- 'is_public': True,
- 'name': 'TestImage %d' % (i),
- 'properties': {
- 'updated': None, 'created': None},
- }
- self.assertDictMatch(meta, expected)
- i = i + 1
-
-
-class ImageControllerWithGlanceServiceTest(test.TestCase):
+class ImagesTest(test.TestCase):
"""
Test of the OpenStack API /images application controller w/Glance.
"""
- NOW_GLANCE_FORMAT = "2010-10-11T10:30:22"
- NOW_API_FORMAT = "2010-10-11T10:30:22Z"
def setUp(self):
"""Run before each test."""
- super(ImageControllerWithGlanceServiceTest, self).setUp()
- self.flags(image_service='nova.image.glance.GlanceImageService')
+ super(ImagesTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
fakes.stub_out_key_pair_funcs(self.stubs)
- self.fixtures = self._make_image_fixtures()
- fakes.stub_out_glance(self.stubs, initial_fixtures=self.fixtures)
fakes.stub_out_compute_api_snapshot(self.stubs)
fakes.stub_out_compute_api_backup(self.stubs)
+ fakes.stub_out_glance(self.stubs)
def tearDown(self):
"""Run after each test."""
self.stubs.UnsetAll()
- super(ImageControllerWithGlanceServiceTest, self).tearDown()
+ super(ImagesTest, self).tearDown()
def _get_fake_context(self):
class Context(object):
project_id = 'fake'
+ auth_token = True
return Context()
- def _applicable_fixture(self, fixture, user_id):
- """Determine if this fixture is applicable for given user id."""
- is_public = fixture["is_public"]
- try:
- uid = fixture["properties"]["user_id"]
- except KeyError:
- uid = None
- return uid == user_id or is_public
-
def test_get_image_index(self):
request = webob.Request.blank('/v1.0/images')
- response = request.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ response = request.get_response(app)
response_dict = json.loads(response.body)
response_list = response_dict["images"]
@@ -365,13 +82,16 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
{'id': 125, 'name': 'saving snapshot'},
{'id': 126, 'name': 'active snapshot'},
{'id': 127, 'name': 'killed snapshot'},
- {'id': 129, 'name': None}]
+ {'id': 128, 'name': 'deleted snapshot'},
+ {'id': 129, 'name': 'pending_delete snapshot'},
+ {'id': 130, 'name': None}]
self.assertDictListMatch(response_list, expected)
def test_get_image(self):
request = webob.Request.blank('/v1.0/images/123')
- response = request.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ response = request.get_response(app)
self.assertEqual(200, response.status_int)
@@ -381,18 +101,19 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
"image": {
"id": 123,
"name": "public image",
- "updated": self.NOW_API_FORMAT,
- "created": self.NOW_API_FORMAT,
+ "updated": NOW_API_FORMAT,
+ "created": NOW_API_FORMAT,
"status": "ACTIVE",
"progress": 100,
},
}
- self.assertEqual(expected_image, actual_image)
+ self.assertDictMatch(expected_image, actual_image)
def test_get_image_v1_1(self):
request = webob.Request.blank('/v1.1/fake/images/124')
- response = request.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ response = request.get_response(app)
actual_image = json.loads(response.body)
@@ -403,14 +124,14 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
expected_image = {
"image": {
- "id": 124,
+ "id": "124",
"name": "queued snapshot",
- "updated": self.NOW_API_FORMAT,
- "created": self.NOW_API_FORMAT,
- "status": "QUEUED",
+ "updated": NOW_API_FORMAT,
+ "created": NOW_API_FORMAT,
+ "status": "SAVING",
"progress": 0,
'server': {
- 'id': 42,
+ 'id': '42',
"links": [{
"rel": "self",
"href": server_href,
@@ -440,11 +161,12 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
def test_get_image_xml(self):
request = webob.Request.blank('/v1.0/images/123')
request.accept = "application/xml"
- response = request.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ response = request.get_response(app)
actual_image = minidom.parseString(response.body.replace(" ", ""))
- expected_now = self.NOW_API_FORMAT
+ expected_now = NOW_API_FORMAT
expected_image = minidom.parseString("""
<image id="123"
name="public image"
@@ -458,15 +180,16 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
self.assertEqual(expected_image.toxml(), actual_image.toxml())
def test_get_image_xml_no_name(self):
- request = webob.Request.blank('/v1.0/images/129')
+ request = webob.Request.blank('/v1.0/images/130')
request.accept = "application/xml"
- response = request.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ response = request.get_response(app)
actual_image = minidom.parseString(response.body.replace(" ", ""))
- expected_now = self.NOW_API_FORMAT
+ expected_now = NOW_API_FORMAT
expected_image = minidom.parseString("""
- <image id="129"
+ <image id="130"
name="None"
updated="%(expected_now)s"
created="%(expected_now)s"
@@ -501,12 +224,10 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
expected = minidom.parseString("""
<itemNotFound code="404"
- xmlns="http://docs.rackspacecloud.com/servers/api/v1.0">
- <message>
- Image not found.
- </message>
+ xmlns="http://docs.rackspacecloud.com/servers/api/v1.0">
+ <message>Image not found.</message>
</itemNotFound>
- """.replace(" ", ""))
+ """.replace(" ", "").replace("\n", ""))
actual = minidom.parseString(response.body.replace(" ", ""))
@@ -538,12 +259,10 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
# because the element hasn't changed definition
expected = minidom.parseString("""
<itemNotFound code="404"
- xmlns="http://docs.openstack.org/compute/api/v1.1">
- <message>
- Image not found.
- </message>
+ xmlns="http://docs.openstack.org/compute/api/v1.1">
+ <message>Image not found.</message>
</itemNotFound>
- """.replace(" ", ""))
+ """.replace(" ", "").replace("\n", ""))
actual = minidom.parseString(response.body.replace(" ", ""))
@@ -551,41 +270,133 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
def test_get_image_index_v1_1(self):
request = webob.Request.blank('/v1.1/fake/images')
- response = request.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ response = request.get_response(app)
response_dict = json.loads(response.body)
response_list = response_dict["images"]
- fixtures = copy.copy(self.fixtures)
-
- for image in fixtures:
- if not self._applicable_fixture(image, "fake"):
- fixtures.remove(image)
- continue
-
- href = "http://localhost/v1.1/fake/images/%s" % image["id"]
- bookmark = "http://localhost/fake/images/%s" % image["id"]
- test_image = {
- "id": image["id"],
- "name": image["name"],
+ expected = [
+ {
+ "id": "123",
+ "name": "public image",
"links": [
{
"rel": "self",
- "href": href,
+ "href": "http://localhost/v1.1/fake/images/123",
},
{
"rel": "bookmark",
- "href": bookmark,
+ "href": "http://localhost/fake/images/123",
},
],
- }
- self.assertTrue(test_image in response_list)
+ },
+ {
+ "id": "124",
+ "name": "queued snapshot",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/124",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/124",
+ },
+ ],
+ },
+ {
+ "id": "125",
+ "name": "saving snapshot",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/125",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/125",
+ },
+ ],
+ },
+ {
+ "id": "126",
+ "name": "active snapshot",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/126",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/126",
+ },
+ ],
+ },
+ {
+ "id": "127",
+ "name": "killed snapshot",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/127",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/127",
+ },
+ ],
+ },
+ {
+ "id": "128",
+ "name": "deleted snapshot",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/128",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/128",
+ },
+ ],
+ },
+ {
+ "id": "129",
+ "name": "pending_delete snapshot",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/129",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/129",
+ },
+ ],
+ },
+ {
+ "id": "130",
+ "name": None,
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/130",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/130",
+ },
+ ],
+ },
+ ]
- self.assertEqual(len(response_list), len(fixtures))
+ self.assertDictListMatch(response_list, expected)
def test_get_image_details(self):
request = webob.Request.blank('/v1.0/images/detail')
- response = request.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ response = request.get_response(app)
response_dict = json.loads(response.body)
response_list = response_dict["images"]
@@ -593,48 +404,64 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
expected = [{
'id': 123,
'name': 'public image',
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
'status': 'ACTIVE',
'progress': 100,
},
{
'id': 124,
'name': 'queued snapshot',
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
- 'status': 'QUEUED',
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
+ 'status': 'SAVING',
'progress': 0,
},
{
'id': 125,
'name': 'saving snapshot',
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
'status': 'SAVING',
'progress': 0,
},
{
'id': 126,
'name': 'active snapshot',
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
'status': 'ACTIVE',
'progress': 100,
},
{
'id': 127,
'name': 'killed snapshot',
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
- 'status': 'FAILED',
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
+ 'status': 'ERROR',
+ 'progress': 0,
+ },
+ {
+ 'id': 128,
+ 'name': 'deleted snapshot',
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
+ 'status': 'DELETED',
'progress': 0,
},
{
'id': 129,
+ 'name': 'pending_delete snapshot',
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
+ 'status': 'DELETED',
+ 'progress': 0,
+ },
+ {
+ 'id': 130,
'name': None,
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
'status': 'ACTIVE',
'progress': 100,
}]
@@ -643,7 +470,8 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
def test_get_image_details_v1_1(self):
request = webob.Request.blank('/v1.1/fake/images/detail')
- response = request.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ response = request.get_response(app)
response_dict = json.loads(response.body)
response_list = response_dict["images"]
@@ -651,11 +479,11 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
server_bookmark = "http://localhost/servers/42"
expected = [{
- 'id': 123,
+ 'id': '123',
'name': 'public image',
- 'metadata': {},
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
+ 'metadata': {'key1': 'value1'},
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
'status': 'ACTIVE',
'progress': 100,
"links": [{
@@ -668,18 +496,18 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
}],
},
{
- 'id': 124,
+ 'id': '124',
'name': 'queued snapshot',
'metadata': {
u'instance_ref': u'http://localhost/v1.1/servers/42',
u'user_id': u'fake',
},
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
- 'status': 'QUEUED',
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
+ 'status': 'SAVING',
'progress': 0,
'server': {
- 'id': 42,
+ 'id': '42',
"links": [{
"rel": "self",
"href": server_href,
@@ -699,18 +527,18 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
}],
},
{
- 'id': 125,
+ 'id': '125',
'name': 'saving snapshot',
'metadata': {
u'instance_ref': u'http://localhost/v1.1/servers/42',
u'user_id': u'fake',
},
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
'status': 'SAVING',
'progress': 0,
'server': {
- 'id': 42,
+ 'id': '42',
"links": [{
"rel": "self",
"href": server_href,
@@ -730,18 +558,18 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
}],
},
{
- 'id': 126,
+ 'id': '126',
'name': 'active snapshot',
'metadata': {
u'instance_ref': u'http://localhost/v1.1/servers/42',
u'user_id': u'fake',
},
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
'status': 'ACTIVE',
'progress': 100,
'server': {
- 'id': 42,
+ 'id': '42',
"links": [{
"rel": "self",
"href": server_href,
@@ -761,18 +589,18 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
}],
},
{
- 'id': 127,
+ 'id': '127',
'name': 'killed snapshot',
'metadata': {
u'instance_ref': u'http://localhost/v1.1/servers/42',
u'user_id': u'fake',
},
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
- 'status': 'FAILED',
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
+ 'status': 'ERROR',
'progress': 0,
'server': {
- 'id': 42,
+ 'id': '42',
"links": [{
"rel": "self",
"href": server_href,
@@ -792,20 +620,82 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
}],
},
{
- 'id': 129,
+ 'id': '128',
+ 'name': 'deleted snapshot',
+ 'metadata': {
+ u'instance_ref': u'http://localhost/v1.1/servers/42',
+ u'user_id': u'fake',
+ },
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
+ 'status': 'DELETED',
+ 'progress': 0,
+ 'server': {
+ 'id': '42',
+ "links": [{
+ "rel": "self",
+ "href": server_href,
+ },
+ {
+ "rel": "bookmark",
+ "href": server_bookmark,
+ }],
+ },
+ "links": [{
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/128",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/128",
+ }],
+ },
+ {
+ 'id': '129',
+ 'name': 'pending_delete snapshot',
+ 'metadata': {
+ u'instance_ref': u'http://localhost/v1.1/servers/42',
+ u'user_id': u'fake',
+ },
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
+ 'status': 'DELETED',
+ 'progress': 0,
+ 'server': {
+ 'id': '42',
+ "links": [{
+ "rel": "self",
+ "href": server_href,
+ },
+ {
+ "rel": "bookmark",
+ "href": server_bookmark,
+ }],
+ },
+ "links": [{
+ "rel": "self",
+ "href": "http://localhost/v1.1/fake/images/129",
+ },
+ {
+ "rel": "bookmark",
+ "href": "http://localhost/fake/images/129",
+ }],
+ },
+ {
+ 'id': '130',
'name': None,
'metadata': {},
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT,
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT,
'status': 'ACTIVE',
'progress': 100,
"links": [{
"rel": "self",
- "href": "http://localhost/v1.1/fake/images/129",
+ "href": "http://localhost/v1.1/fake/images/130",
},
{
"rel": "bookmark",
- "href": "http://localhost/fake/images/129",
+ "href": "http://localhost/fake/images/130",
}],
},
]
@@ -1017,11 +907,12 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
def test_get_image_found(self):
req = webob.Request.blank('/v1.0/images/123')
- res = req.get_response(fakes.wsgi_app())
+ app = fakes.wsgi_app(fake_auth_context=self._get_fake_context())
+ res = req.get_response(app)
image_meta = json.loads(res.body)['image']
expected = {'id': 123, 'name': 'public image',
- 'updated': self.NOW_API_FORMAT,
- 'created': self.NOW_API_FORMAT, 'status': 'ACTIVE',
+ 'updated': NOW_API_FORMAT,
+ 'created': NOW_API_FORMAT, 'status': 'ACTIVE',
'progress': 100}
self.assertDictMatch(image_meta, expected)
@@ -1030,14 +921,6 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 404)
- def test_get_image_not_owned(self):
- """We should return a 404 if we request an image that doesn't belong
- to us
- """
- req = webob.Request.blank('/v1.0/images/128')
- res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status_int, 404)
-
def test_create_image(self):
body = dict(image=dict(serverId='123', name='Snapshot 1'))
req = webob.Request.blank('/v1.0/images')
@@ -1080,49 +963,6 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
response = req.get_response(fakes.wsgi_app())
self.assertEqual(400, response.status_int)
- @classmethod
- def _make_image_fixtures(cls):
- image_id = 123
- base_attrs = {'created_at': cls.NOW_GLANCE_FORMAT,
- 'updated_at': cls.NOW_GLANCE_FORMAT,
- 'deleted_at': None,
- 'deleted': False}
-
- fixtures = []
-
- def add_fixture(**kwargs):
- kwargs.update(base_attrs)
- fixtures.append(kwargs)
-
- # Public image
- add_fixture(id=image_id, name='public image', is_public=True,
- status='active', properties={})
- image_id += 1
-
- # Snapshot for User 1
- server_ref = 'http://localhost/v1.1/servers/42'
- snapshot_properties = {'instance_ref': server_ref, 'user_id': 'fake'}
- for status in ('queued', 'saving', 'active', 'killed'):
- add_fixture(id=image_id, name='%s snapshot' % status,
- is_public=False, status=status,
- properties=snapshot_properties)
- image_id += 1
-
- # Snapshot for User 2
- other_snapshot_properties = {'instance_id': '43', 'user_id': 'other'}
- add_fixture(id=image_id, name='someone elses snapshot',
- is_public=False, status='active',
- properties=other_snapshot_properties)
-
- image_id += 1
-
- # Image without a name
- add_fixture(id=image_id, is_public=True, status='active',
- properties={})
- image_id += 1
-
- return fixtures
-
class ImageXMLSerializationTest(test.TestCase):
@@ -1132,7 +972,7 @@ class ImageXMLSerializationTest(test.TestCase):
IMAGE_HREF = 'http://localhost/v1.1/fake/images/%s'
IMAGE_BOOKMARK = 'http://localhost/fake/images/%s'
- def test_show(self):
+ def test_xml_declaration(self):
serializer = images.ImageXMLSerializer()
fixture = {
@@ -1144,7 +984,7 @@ class ImageXMLSerializationTest(test.TestCase):
'status': 'ACTIVE',
'progress': 80,
'server': {
- 'id': 1,
+ 'id': '1',
'links': [
{
'href': self.SERVER_HREF,
@@ -1173,37 +1013,80 @@ class ImageXMLSerializationTest(test.TestCase):
}
output = serializer.serialize(fixture, 'show')
- actual = minidom.parseString(output.replace(" ", ""))
+ print output
+ has_dec = output.startswith("<?xml version='1.0' encoding='UTF-8'?>")
+ self.assertTrue(has_dec)
- expected_server_href = self.SERVER_HREF
- expected_server_bookmark = self.SERVER_BOOKMARK
- expected_href = self.IMAGE_HREF % 1
- expected_bookmark = self.IMAGE_BOOKMARK % 1
- expected_now = self.TIMESTAMP
- expected = minidom.parseString("""
- <image id="1"
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom"
- name="Image1"
- updated="%(expected_now)s"
- created="%(expected_now)s"
- status="ACTIVE"
- progress="80">
- <server id="1">
- <atom:link rel="self" href="%(expected_server_href)s"/>
- <atom:link rel="bookmark" href="%(expected_server_bookmark)s"/>
- </server>
- <metadata>
- <meta key="key1">
- value1
- </meta>
- </metadata>
- <atom:link href="%(expected_href)s" rel="self"/>
- <atom:link href="%(expected_bookmark)s" rel="bookmark"/>
- </image>
- """.replace(" ", "") % (locals()))
+ def test_show(self):
+ serializer = images.ImageXMLSerializer()
- self.assertEqual(expected.toxml(), actual.toxml())
+ fixture = {
+ 'image': {
+ 'id': 1,
+ 'name': 'Image1',
+ 'created': self.TIMESTAMP,
+ 'updated': self.TIMESTAMP,
+ 'status': 'ACTIVE',
+ 'progress': 80,
+ 'server': {
+ 'id': '1',
+ 'links': [
+ {
+ 'href': self.SERVER_HREF,
+ 'rel': 'self',
+ },
+ {
+ 'href': self.SERVER_BOOKMARK,
+ 'rel': 'bookmark',
+ },
+ ],
+ },
+ 'metadata': {
+ 'key1': 'value1',
+ },
+ 'links': [
+ {
+ 'href': self.IMAGE_HREF % 1,
+ 'rel': 'self',
+ },
+ {
+ 'href': self.IMAGE_BOOKMARK % 1,
+ 'rel': 'bookmark',
+ },
+ ],
+ },
+ }
+
+ output = serializer.serialize(fixture, 'show')
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'image')
+ image_dict = fixture['image']
+
+ for key in ['name', 'id', 'updated', 'created', 'status', 'progress']:
+ self.assertEqual(root.get(key), str(image_dict[key]))
+
+ link_nodes = root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
+ metadata_root = root.find('{0}metadata'.format(NS))
+ metadata_elems = metadata_root.findall('{0}meta'.format(NS))
+ self.assertEqual(len(metadata_elems), 1)
+ for i, metadata_elem in enumerate(metadata_elems):
+ (meta_key, meta_value) = image_dict['metadata'].items()[i]
+ self.assertEqual(str(metadata_elem.get('key')), str(meta_key))
+ self.assertEqual(str(metadata_elem.text).strip(), str(meta_value))
+
+ server_root = root.find('{0}server'.format(NS))
+ self.assertEqual(server_root.get('id'), image_dict['server']['id'])
+ link_nodes = server_root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['server']['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
def test_show_zero_metadata(self):
serializer = images.ImageXMLSerializer()
@@ -1216,7 +1099,7 @@ class ImageXMLSerializationTest(test.TestCase):
'updated': self.TIMESTAMP,
'status': 'ACTIVE',
'server': {
- 'id': 1,
+ 'id': '1',
'links': [
{
'href': self.SERVER_HREF,
@@ -1243,31 +1126,31 @@ class ImageXMLSerializationTest(test.TestCase):
}
output = serializer.serialize(fixture, 'show')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected_server_href = self.SERVER_HREF
- expected_server_bookmark = self.SERVER_BOOKMARK
- expected_href = self.IMAGE_HREF % 1
- expected_bookmark = self.IMAGE_BOOKMARK % 1
- expected_now = self.TIMESTAMP
- expected = minidom.parseString("""
- <image id="1"
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom"
- name="Image1"
- updated="%(expected_now)s"
- created="%(expected_now)s"
- status="ACTIVE">
- <server id="1">
- <atom:link rel="self" href="%(expected_server_href)s"/>
- <atom:link rel="bookmark" href="%(expected_server_bookmark)s"/>
- </server>
- <atom:link href="%(expected_href)s" rel="self"/>
- <atom:link href="%(expected_bookmark)s" rel="bookmark"/>
- </image>
- """.replace(" ", "") % (locals()))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'image')
+ image_dict = fixture['image']
+
+ for key in ['name', 'id', 'updated', 'created', 'status']:
+ self.assertEqual(root.get(key), str(image_dict[key]))
+
+ link_nodes = root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
+ metadata_root = root.find('{0}metadata'.format(NS))
+ meta_nodes = root.findall('{0}meta'.format(ATOMNS))
+ self.assertEqual(len(meta_nodes), 0)
+
+ server_root = root.find('{0}server'.format(NS))
+ self.assertEqual(server_root.get('id'), image_dict['server']['id'])
+ link_nodes = server_root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['server']['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
def test_show_image_no_metadata_key(self):
serializer = images.ImageXMLSerializer()
@@ -1280,7 +1163,7 @@ class ImageXMLSerializationTest(test.TestCase):
'updated': self.TIMESTAMP,
'status': 'ACTIVE',
'server': {
- 'id': 1,
+ 'id': '1',
'links': [
{
'href': self.SERVER_HREF,
@@ -1306,31 +1189,31 @@ class ImageXMLSerializationTest(test.TestCase):
}
output = serializer.serialize(fixture, 'show')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected_server_href = self.SERVER_HREF
- expected_server_bookmark = self.SERVER_BOOKMARK
- expected_href = self.IMAGE_HREF % 1
- expected_bookmark = self.IMAGE_BOOKMARK % 1
- expected_now = self.TIMESTAMP
- expected = minidom.parseString("""
- <image id="1"
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom"
- name="Image1"
- updated="%(expected_now)s"
- created="%(expected_now)s"
- status="ACTIVE">
- <server id="1">
- <atom:link rel="self" href="%(expected_server_href)s"/>
- <atom:link rel="bookmark" href="%(expected_server_bookmark)s"/>
- </server>
- <atom:link href="%(expected_href)s" rel="self"/>
- <atom:link href="%(expected_bookmark)s" rel="bookmark"/>
- </image>
- """.replace(" ", "") % (locals()))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'image')
+ image_dict = fixture['image']
+
+ for key in ['name', 'id', 'updated', 'created', 'status']:
+ self.assertEqual(root.get(key), str(image_dict[key]))
+
+ link_nodes = root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
+ metadata_root = root.find('{0}metadata'.format(NS))
+ meta_nodes = root.findall('{0}meta'.format(ATOMNS))
+ self.assertEqual(len(meta_nodes), 0)
+
+ server_root = root.find('{0}server'.format(NS))
+ self.assertEqual(server_root.get('id'), image_dict['server']['id'])
+ link_nodes = server_root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['server']['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
def test_show_no_server(self):
serializer = images.ImageXMLSerializer()
@@ -1359,30 +1242,30 @@ class ImageXMLSerializationTest(test.TestCase):
}
output = serializer.serialize(fixture, 'show')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected_href = self.IMAGE_HREF % 1
- expected_bookmark = self.IMAGE_BOOKMARK % 1
- expected_now = self.TIMESTAMP
- expected = minidom.parseString("""
- <image id="1"
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom"
- name="Image1"
- updated="%(expected_now)s"
- created="%(expected_now)s"
- status="ACTIVE">
- <metadata>
- <meta key="key1">
- value1
- </meta>
- </metadata>
- <atom:link href="%(expected_href)s" rel="self"/>
- <atom:link href="%(expected_bookmark)s" rel="bookmark"/>
- </image>
- """.replace(" ", "") % (locals()))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'image')
+ image_dict = fixture['image']
+
+ for key in ['name', 'id', 'updated', 'created', 'status']:
+ self.assertEqual(root.get(key), str(image_dict[key]))
+
+ link_nodes = root.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
+ metadata_root = root.find('{0}metadata'.format(NS))
+ metadata_elems = metadata_root.findall('{0}meta'.format(NS))
+ self.assertEqual(len(metadata_elems), 1)
+ for i, metadata_elem in enumerate(metadata_elems):
+ (meta_key, meta_value) = image_dict['metadata'].items()[i]
+ self.assertEqual(str(metadata_elem.get('key')), str(meta_key))
+ self.assertEqual(str(metadata_elem.text).strip(), str(meta_value))
+
+ server_root = root.find('{0}server'.format(NS))
+ self.assertEqual(server_root, None)
def test_index(self):
serializer = images.ImageXMLSerializer()
@@ -1397,6 +1280,10 @@ class ImageXMLSerializationTest(test.TestCase):
'href': self.IMAGE_HREF % 1,
'rel': 'self',
},
+ {
+ 'href': self.IMAGE_BOOKMARK % 1,
+ 'rel': 'bookmark',
+ },
],
},
{
@@ -1407,35 +1294,32 @@ class ImageXMLSerializationTest(test.TestCase):
'href': self.IMAGE_HREF % 2,
'rel': 'self',
},
+ {
+ 'href': self.IMAGE_BOOKMARK % 2,
+ 'rel': 'bookmark',
+ },
],
},
]
}
output = serializer.serialize(fixture, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected_server_href = self.SERVER_HREF
- expected_server_bookmark = self.SERVER_BOOKMARK
- expected_href = self.IMAGE_HREF % 1
- expected_bookmark = self.IMAGE_BOOKMARK % 1
- expected_href_two = self.IMAGE_HREF % 2
- expected_bookmark_two = self.IMAGE_BOOKMARK % 2
- expected_now = self.TIMESTAMP
- expected = minidom.parseString("""
- <images
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom">
- <image id="1" name="Image1">
- <atom:link href="%(expected_href)s" rel="self"/>
- </image>
- <image id="2" name="Image2">
- <atom:link href="%(expected_href_two)s" rel="self"/>
- </image>
- </images>
- """.replace(" ", "") % (locals()))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'images_index')
+ image_elems = root.findall('{0}image'.format(NS))
+ self.assertEqual(len(image_elems), 2)
+ for i, image_elem in enumerate(image_elems):
+ image_dict = fixture['images'][i]
+
+ for key in ['name', 'id']:
+ self.assertEqual(image_elem.get(key), str(image_dict[key]))
+
+ link_nodes = image_elem.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
def test_index_zero_images(self):
serializer = images.ImageXMLSerializer()
@@ -1445,15 +1329,11 @@ class ImageXMLSerializationTest(test.TestCase):
}
output = serializer.serialize(fixtures, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <images
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom" />
- """.replace(" ", "") % (locals()))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'images_index')
+ image_elems = root.findall('{0}image'.format(NS))
+ self.assertEqual(len(image_elems), 0)
def test_detail(self):
serializer = images.ImageXMLSerializer()
@@ -1467,7 +1347,7 @@ class ImageXMLSerializationTest(test.TestCase):
'updated': self.TIMESTAMP,
'status': 'ACTIVE',
'server': {
- 'id': 1,
+ 'id': '1',
'links': [
{
'href': self.SERVER_HREF,
@@ -1491,7 +1371,7 @@ class ImageXMLSerializationTest(test.TestCase):
],
},
{
- 'id': 2,
+ 'id': '2',
'name': 'Image2',
'created': self.TIMESTAMP,
'updated': self.TIMESTAMP,
@@ -1515,46 +1395,22 @@ class ImageXMLSerializationTest(test.TestCase):
}
output = serializer.serialize(fixture, 'detail')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected_server_href = self.SERVER_HREF
- expected_server_bookmark = self.SERVER_BOOKMARK
- expected_href = self.IMAGE_HREF % 1
- expected_bookmark = self.IMAGE_BOOKMARK % 1
- expected_href_two = self.IMAGE_HREF % 2
- expected_bookmark_two = self.IMAGE_BOOKMARK % 2
- expected_now = self.TIMESTAMP
- expected = minidom.parseString("""
- <images
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom">
- <image id="1"
- name="Image1"
- updated="%(expected_now)s"
- created="%(expected_now)s"
- status="ACTIVE">
- <server id="1">
- <atom:link rel="self" href="%(expected_server_href)s"/>
- <atom:link rel="bookmark" href="%(expected_server_bookmark)s"/>
- </server>
- <atom:link href="%(expected_href)s" rel="self"/>
- <atom:link href="%(expected_bookmark)s" rel="bookmark"/>
- </image>
- <image id="2"
- name="Image2"
- updated="%(expected_now)s"
- created="%(expected_now)s"
- status="SAVING"
- progress="80">
- <metadata>
- <meta key="key1">
- value1
- </meta>
- </metadata>
- <atom:link href="%(expected_href_two)s" rel="self"/>
- <atom:link href="%(expected_bookmark_two)s" rel="bookmark"/>
- </image>
- </images>
- """.replace(" ", "") % (locals()))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'images')
+ image_elems = root.findall('{0}image'.format(NS))
+ self.assertEqual(len(image_elems), 2)
+ for i, image_elem in enumerate(image_elems):
+ image_dict = fixture['images'][i]
+
+ for key in ['name', 'id', 'updated', 'created', 'status']:
+ self.assertEqual(image_elem.get(key), str(image_dict[key]))
+
+ link_nodes = image_elem.findall('{0}link'.format(ATOMNS))
+ self.assertEqual(len(link_nodes), 2)
+ for i, link in enumerate(image_dict['links']):
+ for key, value in link.items():
+ self.assertEqual(link_nodes[i].get(key), value)
+
+ metadata_root = image_elem.find('{0}metadata'.format(NS))
+ metadata_elems = metadata_root.findall('{0}meta'.format(NS))
diff --git a/nova/tests/api/openstack/test_limits.py b/nova/tests/api/openstack/test_limits.py
index 801b06230..6f0210c27 100644
--- a/nova/tests/api/openstack/test_limits.py
+++ b/nova/tests/api/openstack/test_limits.py
@@ -19,6 +19,7 @@ Tests dealing with HTTP rate-limiting.
import httplib
import json
+from lxml import etree
import StringIO
import stubout
import time
@@ -29,6 +30,7 @@ from xml.dom import minidom
import nova.context
from nova.api.openstack import limits
from nova.api.openstack import views
+from nova.api.openstack import xmlutil
from nova import test
@@ -39,6 +41,10 @@ TEST_LIMITS = [
limits.Limit("PUT", "*", "", 10, limits.PER_MINUTE),
limits.Limit("PUT", "/servers", "^/servers", 5, limits.PER_MINUTE),
]
+NS = {
+ 'atom': 'http://www.w3.org/2005/Atom',
+ 'ns': 'http://docs.openstack.org/compute/api/v1.1'
+}
class BaseLimitTestSuite(unittest.TestCase):
@@ -168,12 +174,11 @@ class LimitsControllerV10Test(BaseLimitTestSuite):
response = request.get_response(self.controller)
expected = minidom.parseString("""
- <limits
- xmlns="http://docs.rackspacecloud.com/servers/api/v1.0">
+ <limits xmlns="http://docs.rackspacecloud.com/servers/api/v1.0">
<rate/>
<absolute/>
</limits>
- """.replace(" ", ""))
+ """.replace(" ", "").replace("\n", ""))
body = minidom.parseString(response.body.replace(" ", ""))
@@ -186,17 +191,16 @@ class LimitsControllerV10Test(BaseLimitTestSuite):
response = request.get_response(self.controller)
expected = minidom.parseString("""
- <limits
- xmlns="http://docs.rackspacecloud.com/servers/api/v1.0">
+ <limits xmlns="http://docs.rackspacecloud.com/servers/api/v1.0">
<rate>
<limit URI="*" regex=".*" remaining="10" resetTime="0"
- unit="MINUTE" value="10" verb="GET"/>
+ unit="MINUTE" value="10" verb="GET"/>
<limit URI="*" regex=".*" remaining="5" resetTime="0"
- unit="HOUR" value="5" verb="POST"/>
+ unit="HOUR" value="5" verb="POST"/>
</rate>
<absolute/>
</limits>
- """.replace(" ", ""))
+ """.replace(" ", "").replace("\n", ""))
body = minidom.parseString(response.body.replace(" ", ""))
self.assertEqual(expected.toxml(), body.toxml())
@@ -980,9 +984,22 @@ class LimitsXMLSerializationTest(test.TestCase):
def tearDown(self):
pass
- def test_index(self):
+ def test_xml_declaration(self):
serializer = limits.LimitsXMLSerializer()
+
fixture = {"limits": {
+ "rate": [],
+ "absolute": {}}}
+
+ output = serializer.serialize(fixture, 'index')
+ print output
+ has_dec = output.startswith("<?xml version='1.0' encoding='UTF-8'?>")
+ self.assertTrue(has_dec)
+
+ def test_index(self):
+ serializer = limits.LimitsXMLSerializer()
+ fixture = {
+ "limits": {
"rate": [{
"uri": "*",
"regex": ".*",
@@ -1006,32 +1023,32 @@ class LimitsXMLSerializationTest(test.TestCase):
"maxPersonalitySize": 10240}}}
output = serializer.serialize(fixture, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <limits xmlns="http://docs.openstack.org/compute/api/v1.1">
- <rates>
- <rate uri="*" regex=".*">
- <limit value="10" verb="POST" remaining="2"
- unit="MINUTE"
- next-available="2011-12-15T22:42:45Z"/>
- </rate>
- <rate uri="*/servers" regex="^/servers">
- <limit value="50" verb="POST" remaining="10"
- unit="DAY"
- next-available="2011-12-15T22:42:45Z"/>
- </rate>
- </rates>
- <absolute>
- <limit name="maxServerMeta" value="1"/>
- <limit name="maxPersonality" value="5"/>
- <limit name="maxImageMeta" value="1"/>
- <limit name="maxPersonalitySize" value="10240"/>
- </absolute>
- </limits>
- """.replace(" ", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'limits')
+
+ #verify absolute limits
+ absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
+ self.assertEqual(len(absolutes), 4)
+ for limit in absolutes:
+ name = limit.get('name')
+ value = limit.get('value')
+ self.assertEqual(value, str(fixture['limits']['absolute'][name]))
+
+ #verify rate limits
+ rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
+ self.assertEqual(len(rates), 2)
+ for i, rate in enumerate(rates):
+ for key in ['uri', 'regex']:
+ self.assertEqual(rate.get(key),
+ str(fixture['limits']['rate'][i][key]))
+ rate_limits = rate.xpath('ns:limit', namespaces=NS)
+ self.assertEqual(len(rate_limits), 1)
+ for j, limit in enumerate(rate_limits):
+ for key in ['verb', 'value', 'remaining', 'unit',
+ 'next-available']:
+ self.assertEqual(limit.get(key),
+ str(fixture['limits']['rate'][i]['limit'][j][key]))
def test_index_no_limits(self):
serializer = limits.LimitsXMLSerializer()
@@ -1041,13 +1058,14 @@ class LimitsXMLSerializationTest(test.TestCase):
"absolute": {}}}
output = serializer.serialize(fixture, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'limits')
- expected = minidom.parseString("""
- <limits xmlns="http://docs.openstack.org/compute/api/v1.1">
- <rates />
- <absolute />
- </limits>
- """.replace(" ", ""))
+ #verify absolute limits
+ absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
+ self.assertEqual(len(absolutes), 0)
- self.assertEqual(expected.toxml(), actual.toxml())
+ #verify rate limits
+ rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
+ self.assertEqual(len(rates), 0)
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 1591ea56c..ee7927c64 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -52,6 +52,10 @@ from nova.tests.api.openstack import fakes
FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
NS = "{http://docs.openstack.org/compute/api/v1.1}"
ATOMNS = "{http://www.w3.org/2005/Atom}"
+XPATH_NS = {
+ 'atom': 'http://www.w3.org/2005/Atom',
+ 'ns': 'http://docs.openstack.org/compute/api/v1.1'
+}
def fake_gen_uuid():
@@ -347,6 +351,8 @@ class ServersTest(test.TestCase):
"server": {
"id": 1,
"uuid": FAKE_UUID,
+ "user_id": "fake",
+ "tenant_id": "fake",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"progress": 0,
@@ -410,12 +416,7 @@ class ServersTest(test.TestCase):
def test_get_server_by_id_v1_1_xml(self):
image_bookmark = "http://localhost/fake/images/10"
- flavor_ref = "http://localhost/v1.1/fake/flavors/1"
- flavor_id = "1"
flavor_bookmark = "http://localhost/fake/flavors/1"
- server_href = "http://localhost/v1.1/fake/servers/1"
- server_bookmark = "http://localhost/fake/servers/1"
-
public_ip = '192.168.0.3'
private_ip = '172.19.0.1'
interfaces = [
@@ -439,48 +440,88 @@ class ServersTest(test.TestCase):
req = webob.Request.blank('/v1.1/fake/servers/1')
req.headers['Accept'] = 'application/xml'
res = req.get_response(fakes.wsgi_app())
- actual = minidom.parseString(res.body.replace(' ', ''))
- expected_uuid = FAKE_UUID
- expected_updated = "2010-11-11T11:00:00Z"
- expected_created = "2010-10-10T12:00:00Z"
- expected = minidom.parseString("""
- <server id="1"
- uuid="%(expected_uuid)s"
- xmlns="http://docs.openstack.org/compute/api/v1.1"
- xmlns:atom="http://www.w3.org/2005/Atom"
- name="server1"
- updated="%(expected_updated)s"
- created="%(expected_created)s"
- hostId=""
- status="BUILD"
- accessIPv4=""
- accessIPv6=""
- progress="0">
- <atom:link href="%(server_href)s" rel="self"/>
- <atom:link href="%(server_bookmark)s" rel="bookmark"/>
- <image id="10">
- <atom:link rel="bookmark" href="%(image_bookmark)s"/>
- </image>
- <flavor id="1">
- <atom:link rel="bookmark" href="%(flavor_bookmark)s"/>
- </flavor>
- <metadata>
- <meta key="seq">
- 1
- </meta>
- </metadata>
- <addresses>
- <network id="public">
- <ip version="4" addr="%(public_ip)s"/>
- </network>
- <network id="private">
- <ip version="4" addr="%(private_ip)s"/>
- </network>
- </addresses>
- </server>
- """.replace(" ", "") % (locals()))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ output = res.body
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'server')
+
+ expected = {
+ 'id': 1,
+ 'uuid': FAKE_UUID,
+ 'user_id': 'fake',
+ 'tenant_id': 'fake',
+ 'updated': '2010-11-11T11:00:00Z',
+ 'created': '2010-10-10T12:00:00Z',
+ 'progress': 0,
+ 'name': 'server1',
+ 'status': 'BUILD',
+ 'accessIPv4': '',
+ 'accessIPv6': '',
+ 'hostId': '',
+ 'key_name': '',
+ 'image': {
+ 'id': '10',
+ 'links': [{'rel': 'bookmark', 'href': image_bookmark}],
+ },
+ 'flavor': {
+ 'id': '1',
+ 'links': [{'rel': 'bookmark', 'href': flavor_bookmark}],
+ },
+ 'addresses': {
+ 'public': [{'version': 4, 'addr': public_ip}],
+ 'private': [{'version': 4, 'addr': private_ip}],
+ },
+ 'metadata': {'seq': '1'},
+ 'config_drive': None,
+ 'links': [
+ {
+ 'rel': 'self',
+ 'href': 'http://localhost/v1.1/fake/servers/1',
+ },
+ {
+ 'rel': 'bookmark',
+ 'href': 'http://localhost/fake/servers/1',
+ },
+ ],
+ }
+
+ self.assertTrue(root.xpath('/ns:server', namespaces=XPATH_NS))
+ for key in ['id', 'uuid', 'created', 'progress', 'name', 'status',
+ 'accessIPv4', 'accessIPv6', 'hostId']:
+ self.assertEqual(root.get(key), str(expected[key]))
+ self.assertEqual(root.get('userId'), str(expected['user_id']))
+ self.assertEqual(root.get('tenantId'), str(expected['tenant_id']))
+
+ (image,) = root.xpath('ns:image', namespaces=XPATH_NS)
+ self.assertEqual(image.get('id'), str(expected['image']['id']))
+
+ links = root.xpath('ns:image/atom:link', namespaces=XPATH_NS)
+ self.assertTrue(common.compare_links(links,
+ expected['image']['links']))
+
+ (flavor,) = root.xpath('ns:flavor', namespaces=XPATH_NS)
+ self.assertEqual(flavor.get('id'), str(expected['flavor']['id']))
+
+ (meta,) = root.xpath('ns:metadata/ns:meta', namespaces=XPATH_NS)
+ self.assertEqual(meta.get('key'), 'seq')
+ self.assertEqual(meta.text, '1')
+
+ (pub_network, priv_network) = root.xpath('ns:addresses/ns:network',
+ namespaces=XPATH_NS)
+ self.assertEqual(pub_network.get('id'), 'public')
+ (pub_ip,) = pub_network.xpath('ns:ip', namespaces=XPATH_NS)
+ (priv_ip,) = priv_network.xpath('ns:ip', namespaces=XPATH_NS)
+ self.assertEqual(pub_ip.get('version'),
+ str(expected['addresses']['public'][0]['version']))
+ self.assertEqual(pub_ip.get('addr'),
+ str(expected['addresses']['public'][0]['addr']))
+ self.assertEqual(priv_ip.get('version'),
+ str(expected['addresses']['private'][0]['version']))
+ self.assertEqual(priv_ip.get('addr'),
+ str(expected['addresses']['private'][0]['addr']))
+
+ links = root.xpath('atom:link', namespaces=XPATH_NS)
+ self.assertTrue(common.compare_links(links, expected['links']))
def test_get_server_with_active_status_by_id_v1_1(self):
image_bookmark = "http://localhost/fake/images/10"
@@ -515,6 +556,8 @@ class ServersTest(test.TestCase):
"server": {
"id": 1,
"uuid": FAKE_UUID,
+ "user_id": "fake",
+ "tenant_id": "fake",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"progress": 100,
@@ -610,6 +653,8 @@ class ServersTest(test.TestCase):
"server": {
"id": 1,
"uuid": FAKE_UUID,
+ "user_id": "fake",
+ "tenant_id": "fake",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"progress": 100,
@@ -1199,6 +1244,26 @@ class ServersTest(test.TestCase):
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
+ def test_tenant_id_filter_converts_to_project_id_for_admin(self):
+ def fake_get_all(context, filters=None):
+ self.assertNotEqual(filters, None)
+ self.assertEqual(filters['project_id'], 'faketenant')
+ self.assertFalse(filters.get('tenant_id'))
+ return [stub_instance(100)]
+
+ self.stubs.Set(nova.db.api, 'instance_get_all_by_filters',
+ fake_get_all)
+ self.flags(allow_admin_api=True)
+
+ req = webob.Request.blank('/v1.1/fake/servers?tenant_id=faketenant')
+ # Use admin context
+ context = nova.context.RequestContext('testuser', 'testproject',
+ is_admin=True)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=context))
+ res_dict = json.loads(res.body)
+ # Failure in fake_get_all returns non 200 status code
+ self.assertEqual(res.status_int, 200)
+
def test_get_servers_allows_flavor_v1_1(self):
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
@@ -1455,6 +1520,8 @@ class ServersTest(test.TestCase):
'access_ip_v4': '1.2.3.4',
'access_ip_v6': 'fead::1234',
'image_ref': image_ref,
+ 'user_id': 'fake',
+ 'project_id': 'fake',
"created_at": datetime.datetime(2010, 10, 10, 12, 0, 0),
"updated_at": datetime.datetime(2010, 11, 11, 11, 0, 0),
"config_drive": self.config_drive,
@@ -3103,7 +3170,7 @@ class TestServerCreateRequestXMLDeserializerV11(test.TestCase):
"name": "new-server-test",
"imageRef": "1",
"flavorRef": "1",
- "networks": []
+ "networks": [],
}}
self.assertEquals(request['body'], expected)
@@ -3255,7 +3322,7 @@ class TestAddressesXMLSerialization(test.TestCase):
serializer = nova.api.openstack.ips.IPXMLSerializer()
- def test_show(self):
+ def test_xml_declaration(self):
fixture = {
'network_2': [
{'addr': '192.168.0.1', 'version': 4},
@@ -3263,17 +3330,29 @@ class TestAddressesXMLSerialization(test.TestCase):
],
}
output = self.serializer.serialize(fixture, 'show')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <network xmlns="http://docs.openstack.org/compute/api/v1.1"
- id="network_2">
- <ip version="4" addr="192.168.0.1"/>
- <ip version="6" addr="fe80::beef"/>
- </network>
- """.replace(" ", ""))
+ print output
+ has_dec = output.startswith("<?xml version='1.0' encoding='UTF-8'?>")
+ self.assertTrue(has_dec)
- self.assertEqual(expected.toxml(), actual.toxml())
+ def test_show(self):
+ fixture = {
+ 'network_2': [
+ {'addr': '192.168.0.1', 'version': 4},
+ {'addr': 'fe80::beef', 'version': 6},
+ ],
+ }
+ output = self.serializer.serialize(fixture, 'show')
+ print output
+ root = etree.XML(output)
+ network = fixture['network_2']
+ self.assertEqual(str(root.get('id')), 'network_2')
+ ip_elems = root.findall('{0}ip'.format(NS))
+ for z, ip_elem in enumerate(ip_elems):
+ ip = network[z]
+ self.assertEqual(str(ip_elem.get('version')),
+ str(ip['version']))
+ self.assertEqual(str(ip_elem.get('addr')),
+ str(ip['addr']))
def test_index(self):
fixture = {
@@ -3289,22 +3368,22 @@ class TestAddressesXMLSerialization(test.TestCase):
},
}
output = self.serializer.serialize(fixture, 'index')
- actual = minidom.parseString(output.replace(" ", ""))
-
- expected = minidom.parseString("""
- <addresses xmlns="http://docs.openstack.org/compute/api/v1.1">
- <network id="network_2">
- <ip version="4" addr="192.168.0.1"/>
- <ip version="6" addr="fe80::beef"/>
- </network>
- <network id="network_1">
- <ip version="4" addr="192.168.0.3"/>
- <ip version="4" addr="192.168.0.5"/>
- </network>
- </addresses>
- """.replace(" ", ""))
-
- self.assertEqual(expected.toxml(), actual.toxml())
+ print output
+ root = etree.XML(output)
+ xmlutil.validate_schema(root, 'addresses')
+ addresses_dict = fixture['addresses']
+ network_elems = root.findall('{0}network'.format(NS))
+ self.assertEqual(len(network_elems), 2)
+ for i, network_elem in enumerate(network_elems):
+ network = addresses_dict.items()[i]
+ self.assertEqual(str(network_elem.get('id')), str(network[0]))
+ ip_elems = network_elem.findall('{0}ip'.format(NS))
+ for z, ip_elem in enumerate(ip_elems):
+ ip = network[1][z]
+ self.assertEqual(str(ip_elem.get('version')),
+ str(ip['version']))
+ self.assertEqual(str(ip_elem.get('addr')),
+ str(ip['addr']))
class TestServerInstanceCreation(test.TestCase):
@@ -3330,6 +3409,8 @@ class TestServerInstanceCreation(test.TestCase):
self.injected_files = None
return [{'id': '1234', 'display_name': 'fakeinstance',
+ 'user_id': 'fake',
+ 'project_id': 'fake',
'uuid': FAKE_UUID}]
def set_admin_password(self, *args, **kwargs):
@@ -3583,10 +3664,14 @@ class TestGetKernelRamdiskFromImage(test.TestCase):
self.assertRaises(exception.NotFound, self._get_k_r, image_meta)
def test_ami_no_ramdisk(self):
- """If an ami is missing a ramdisk it should raise NotFound"""
+ """If an ami is missing a ramdisk, return kernel ID and None for
+ ramdisk ID
+ """
image_meta = {'id': 1, 'status': 'active', 'container_format': 'ami',
'properties': {'kernel_id': 1}}
- self.assertRaises(exception.NotFound, self._get_k_r, image_meta)
+ kernel_id, ramdisk_id = self._get_k_r(image_meta)
+ self.assertEqual(kernel_id, 1)
+ self.assertEqual(ramdisk_id, None)
def test_ami_kernel_ramdisk_present(self):
"""Return IDs if both kernel and ramdisk are present"""
@@ -3621,8 +3706,8 @@ class ServersViewBuilderV11Test(test.TestCase):
"created_at": created_at,
"updated_at": updated_at,
"admin_pass": "",
- "user_id": "",
- "project_id": "",
+ "user_id": "fake",
+ "project_id": "fake",
"image_ref": "5",
"kernel_id": "",
"ramdisk_id": "",
@@ -3647,7 +3732,6 @@ class ServersViewBuilderV11Test(test.TestCase):
"terminated_at": utils.utcnow(),
"availability_zone": "",
"display_name": "test_server",
- "display_description": "",
"locked": False,
"metadata": [],
"accessIPv4": "1.2.3.4",
@@ -3680,7 +3764,6 @@ class ServersViewBuilderV11Test(test.TestCase):
"id": 1,
"uuid": self.instance['uuid'],
"name": "test_server",
- "key_name": '',
"links": [
{
"rel": "self",
@@ -3691,7 +3774,6 @@ class ServersViewBuilderV11Test(test.TestCase):
"href": "http://localhost/servers/1",
},
],
- "config_drive": None,
}
}
@@ -3704,8 +3786,6 @@ class ServersViewBuilderV11Test(test.TestCase):
"id": 1,
"uuid": self.instance['uuid'],
"name": "test_server",
- "key_name": '',
- "config_drive": None,
"links": [
{
"rel": "self",
@@ -3730,6 +3810,8 @@ class ServersViewBuilderV11Test(test.TestCase):
"server": {
"id": 1,
"uuid": self.instance['uuid'],
+ "user_id": "fake",
+ "tenant_id": "fake",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"progress": 0,
@@ -3785,6 +3867,8 @@ class ServersViewBuilderV11Test(test.TestCase):
"server": {
"id": 1,
"uuid": self.instance['uuid'],
+ "user_id": "fake",
+ "tenant_id": "fake",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"progress": 100,
@@ -3841,6 +3925,8 @@ class ServersViewBuilderV11Test(test.TestCase):
"server": {
"id": 1,
"uuid": self.instance['uuid'],
+ "user_id": "fake",
+ "tenant_id": "fake",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"progress": 0,
@@ -3897,6 +3983,8 @@ class ServersViewBuilderV11Test(test.TestCase):
"server": {
"id": 1,
"uuid": self.instance['uuid'],
+ "user_id": "fake",
+ "tenant_id": "fake",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"progress": 0,
@@ -3956,6 +4044,8 @@ class ServersViewBuilderV11Test(test.TestCase):
"server": {
"id": 1,
"uuid": self.instance['uuid'],
+ "user_id": "fake",
+ "tenant_id": "fake",
"updated": "2010-11-11T11:00:00Z",
"created": "2010-10-10T12:00:00Z",
"progress": 0,
@@ -4018,12 +4108,93 @@ class ServerXMLSerializationTest(test.TestCase):
self.maxDiff = None
test.TestCase.setUp(self)
+ def test_xml_declaration(self):
+ serializer = servers.ServerXMLSerializer()
+
+ fixture = {
+ "server": {
+ 'id': 1,
+ 'uuid': FAKE_UUID,
+ 'user_id': 'fake_user_id',
+ 'tenant_id': 'fake_tenant_id',
+ 'created': self.TIMESTAMP,
+ 'updated': self.TIMESTAMP,
+ "progress": 0,
+ "name": "test_server",
+ "status": "BUILD",
+ "hostId": 'e4d909c290d0fb1ca068ffaddf22cbd0',
+ "accessIPv4": "1.2.3.4",
+ "accessIPv6": "fead::1234",
+ "image": {
+ "id": "5",
+ "links": [
+ {
+ "rel": "bookmark",
+ "href": self.IMAGE_BOOKMARK,
+ },
+ ],
+ },
+ "flavor": {
+ "id": "1",
+ "links": [
+ {
+ "rel": "bookmark",
+ "href": self.FLAVOR_BOOKMARK,
+ },
+ ],
+ },
+ "addresses": {
+ "network_one": [
+ {
+ "version": 4,
+ "addr": "67.23.10.138",
+ },
+ {
+ "version": 6,
+ "addr": "::babe:67.23.10.138",
+ },
+ ],
+ "network_two": [
+ {
+ "version": 4,
+ "addr": "67.23.10.139",
+ },
+ {
+ "version": 6,
+ "addr": "::babe:67.23.10.139",
+ },
+ ],
+ },
+ "metadata": {
+ "Open": "Stack",
+ "Number": "1",
+ },
+ 'links': [
+ {
+ 'href': self.SERVER_HREF,
+ 'rel': 'self',
+ },
+ {
+ 'href': self.SERVER_BOOKMARK,
+ 'rel': 'bookmark',
+ },
+ ],
+ }
+ }
+
+ output = serializer.serialize(fixture, 'show')
+ print output
+ has_dec = output.startswith("<?xml version='1.0' encoding='UTF-8'?>")
+ self.assertTrue(has_dec)
+
def test_show(self):
serializer = servers.ServerXMLSerializer()
fixture = {
"server": {
"id": 1,
+ "user_id": "fake",
+ "tenant_id": "fake",
"uuid": FAKE_UUID,
'created': self.TIMESTAMP,
'updated': self.TIMESTAMP,
@@ -4161,6 +4332,8 @@ class ServerXMLSerializationTest(test.TestCase):
"server": {
"id": 1,
"uuid": FAKE_UUID,
+ "user_id": "fake",
+ "tenant_id": "fake",
'created': self.TIMESTAMP,
'updated': self.TIMESTAMP,
"progress": 0,
@@ -4361,6 +4534,8 @@ class ServerXMLSerializationTest(test.TestCase):
{
"id": 1,
"uuid": FAKE_UUID,
+ "user_id": "fake",
+ "tenant_id": "fake",
'created': self.TIMESTAMP,
'updated': self.TIMESTAMP,
"progress": 0,
@@ -4416,6 +4591,8 @@ class ServerXMLSerializationTest(test.TestCase):
{
"id": 2,
"uuid": FAKE_UUID,
+ "user_id": 'fake',
+ "tenant_id": 'fake',
'created': self.TIMESTAMP,
'updated': self.TIMESTAMP,
"progress": 100,
@@ -4535,6 +4712,8 @@ class ServerXMLSerializationTest(test.TestCase):
fixture = {
"server": {
"id": 1,
+ "user_id": "fake",
+ "tenant_id": "fake",
"uuid": FAKE_UUID,
'created': self.TIMESTAMP,
'updated': self.TIMESTAMP,
@@ -4671,6 +4850,8 @@ class ServerXMLSerializationTest(test.TestCase):
"server": {
"id": 1,
"uuid": FAKE_UUID,
+ "user_id": "fake",
+ "tenant_id": "fake",
'created': self.TIMESTAMP,
'updated': self.TIMESTAMP,
"progress": 0,
diff --git a/nova/tests/api/openstack/test_versions.py b/nova/tests/api/openstack/test_versions.py
index 1269f13c9..f69dbd316 100644
--- a/nova/tests/api/openstack/test_versions.py
+++ b/nova/tests/api/openstack/test_versions.py
@@ -15,19 +15,24 @@
# License for the specific language governing permissions and limitations
# under the License.
+import feedparser
import json
import stubout
import webob
-import xml.etree.ElementTree
-
+from lxml import etree
from nova import context
from nova import test
-from nova.tests.api.openstack import fakes
from nova.api.openstack import versions
from nova.api.openstack import views
from nova.api.openstack import wsgi
+from nova.tests.api.openstack import common
+from nova.tests.api.openstack import fakes
+NS = {
+ 'atom': 'http://www.w3.org/2005/Atom',
+ 'ns': 'http://docs.openstack.org/compute/api/v1.1'
+}
VERSIONS = {
"v1.0": {
"id": "v1.0",
@@ -113,23 +118,23 @@ class VersionsTest(test.TestCase):
versions = json.loads(res.body)["versions"]
expected = [
{
- "id": "v1.1",
- "status": "CURRENT",
+ "id": "v1.0",
+ "status": "DEPRECATED",
"updated": "2011-01-21T11:33:21Z",
"links": [
{
"rel": "self",
- "href": "http://localhost/v1.1/",
+ "href": "http://localhost/v1.0/",
}],
},
{
- "id": "v1.0",
- "status": "DEPRECATED",
+ "id": "v1.1",
+ "status": "CURRENT",
"updated": "2011-01-21T11:33:21Z",
"links": [
{
"rel": "self",
- "href": "http://localhost/v1.0/",
+ "href": "http://localhost/v1.1/",
}],
},
]
@@ -233,48 +238,20 @@ class VersionsTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
self.assertEqual(res.content_type, "application/xml")
- root = xml.etree.ElementTree.XML(res.body)
- self.assertEqual(root.tag.split('}')[1], "version")
- self.assertEqual(root.tag.split('}')[0].strip('{'), wsgi.XMLNS_V11)
- children = list(root)
- media_types = children[0]
- media_type_nodes = list(media_types)
- links = (children[1], children[2], children[3])
-
- self.assertEqual(media_types.tag.split('}')[1], 'media-types')
- for media_node in media_type_nodes:
- self.assertEqual(media_node.tag.split('}')[1], 'media-type')
-
- expected = """
- <version id="v1.0" status="DEPRECATED"
- updated="2011-01-21T11:33:21Z"
- xmlns="%s"
- xmlns:atom="http://www.w3.org/2005/Atom">
-
- <media-types>
- <media-type base="application/xml"
- type="application/vnd.openstack.compute-v1.0+xml"/>
- <media-type base="application/json"
- type="application/vnd.openstack.compute-v1.0+json"/>
- </media-types>
-
- <atom:link href="http://localhost/v1.0/"
- rel="self"/>
-
- <atom:link href="http://docs.rackspacecloud.com/servers/
- api/v1.0/cs-devguide-20110125.pdf"
- rel="describedby"
- type="application/pdf"/>
-
- <atom:link href="http://docs.rackspacecloud.com/servers/
- api/v1.0/application.wadl"
- rel="describedby"
- type="application/vnd.sun.wadl+xml"/>
- </version>""".replace(" ", "").replace("\n", "") % wsgi.XMLNS_V11
-
- actual = res.body.replace(" ", "").replace("\n", "")
- self.assertEqual(expected, actual)
+ version = etree.XML(res.body)
+ expected = VERSIONS['v1.0']
+ self.assertTrue(version.xpath('/ns:version', namespaces=NS))
+ media_types = version.xpath('ns:media-types/ns:media-type',
+ namespaces=NS)
+ self.assertTrue(common.compare_media_types(media_types,
+ expected['media-types']))
+ for key in ['id', 'status', 'updated']:
+ self.assertEqual(version.get(key), expected[key])
+ links = version.xpath('atom:link', namespaces=NS)
+ self.assertTrue(common.compare_links(links,
+ [{'rel': 'self', 'href': 'http://localhost/v1.0/'}]
+ + expected['links']))
def test_get_version_1_1_detail_xml(self):
req = webob.Request.blank('/v1.1/')
@@ -282,35 +259,20 @@ class VersionsTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
self.assertEqual(res.content_type, "application/xml")
- expected = """
- <version id="v1.1" status="CURRENT"
- updated="2011-01-21T11:33:21Z"
- xmlns="%s"
- xmlns:atom="http://www.w3.org/2005/Atom">
-
- <media-types>
- <media-type base="application/xml"
- type="application/vnd.openstack.compute-v1.1+xml"/>
- <media-type base="application/json"
- type="application/vnd.openstack.compute-v1.1+json"/>
- </media-types>
-
- <atom:link href="http://localhost/v1.1/"
- rel="self"/>
-
- <atom:link href="http://docs.rackspacecloud.com/servers/
- api/v1.1/cs-devguide-20110125.pdf"
- rel="describedby"
- type="application/pdf"/>
-
- <atom:link href="http://docs.rackspacecloud.com/servers/
- api/v1.1/application.wadl"
- rel="describedby"
- type="application/vnd.sun.wadl+xml"/>
- </version>""".replace(" ", "").replace("\n", "") % wsgi.XMLNS_V11
-
- actual = res.body.replace(" ", "").replace("\n", "")
- self.assertEqual(expected, actual)
+
+ version = etree.XML(res.body)
+ expected = VERSIONS['v1.1']
+ self.assertTrue(version.xpath('/ns:version', namespaces=NS))
+ media_types = version.xpath('ns:media-types/ns:media-type',
+ namespaces=NS)
+ self.assertTrue(common.compare_media_types(media_types,
+ expected['media-types']))
+ for key in ['id', 'status', 'updated']:
+ self.assertEqual(version.get(key), expected[key])
+ links = version.xpath('atom:link', namespaces=NS)
+ self.assertTrue(common.compare_links(links,
+ [{'rel': 'self', 'href': 'http://localhost/v1.1/'}]
+ + expected['links']))
def test_get_version_list_xml(self):
req = webob.Request.blank('/')
@@ -319,21 +281,19 @@ class VersionsTest(test.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(res.content_type, "application/xml")
- expected = """
- <versions xmlns="%s" xmlns:atom="%s">
- <version id="v1.1" status="CURRENT" updated="2011-01-21T11:33:21Z">
- <atom:link href="http://localhost/v1.1/" rel="self"/>
- </version>
- <version id="v1.0" status="DEPRECATED"
- updated="2011-01-21T11:33:21Z">
- <atom:link href="http://localhost/v1.0/" rel="self"/>
- </version>
- </versions>""".replace(" ", "").replace("\n", "") % (wsgi.XMLNS_V11,
- wsgi.XMLNS_ATOM)
+ root = etree.XML(res.body)
+ self.assertTrue(root.xpath('/ns:versions', namespaces=NS))
+ versions = root.xpath('ns:version', namespaces=NS)
+ self.assertEqual(len(versions), 2)
- actual = res.body.replace(" ", "").replace("\n", "")
-
- self.assertEqual(expected, actual)
+ for i, v in enumerate(['v1.0', 'v1.1']):
+ version = versions[i]
+ expected = VERSIONS[v]
+ for key in ['id', 'status', 'updated']:
+ self.assertEqual(version.get(key), expected[key])
+ (link,) = version.xpath('atom:link', namespaces=NS)
+ self.assertTrue(common.compare_links(link,
+ [{'rel': 'self', 'href': 'http://localhost/%s/' % v}]))
def test_get_version_1_0_detail_atom(self):
req = webob.Request.blank('/v1.0/')
@@ -341,36 +301,38 @@ class VersionsTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
self.assertEqual("application/atom+xml", res.content_type)
- expected = """
- <feed xmlns="http://www.w3.org/2005/Atom">
- <title type="text">About This Version</title>
- <updated>2011-01-21T11:33:21Z</updated>
- <id>http://localhost/v1.0/</id>
- <author>
- <name>Rackspace</name>
- <uri>http://www.rackspace.com/</uri>
- </author>
- <link href="http://localhost/v1.0/" rel="self"/>
- <entry>
- <id>http://localhost/v1.0/</id>
- <title type="text">Version v1.0</title>
- <updated>2011-01-21T11:33:21Z</updated>
- <link href="http://localhost/v1.0/"
- rel="self"/>
- <link href="http://docs.rackspacecloud.com/servers/
- api/v1.0/cs-devguide-20110125.pdf"
- rel="describedby" type="application/pdf"/>
- <link href="http://docs.rackspacecloud.com/servers/
- api/v1.0/application.wadl"
- rel="describedby" type="application/vnd.sun.wadl+xml"/>
- <content type="text">
- Version v1.0 DEPRECATED (2011-01-21T11:33:21Z)
- </content>
- </entry>
- </feed>""".replace(" ", "").replace("\n", "")
-
- actual = res.body.replace(" ", "").replace("\n", "")
- self.assertEqual(expected, actual)
+
+ f = feedparser.parse(res.body)
+ self.assertEqual(f.feed.title, 'About This Version')
+ self.assertEqual(f.feed.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(f.feed.id, 'http://localhost/v1.0/')
+ self.assertEqual(f.feed.author, 'Rackspace')
+ self.assertEqual(f.feed.author_detail.href,
+ 'http://www.rackspace.com/')
+ self.assertEqual(f.feed.links[0]['href'], 'http://localhost/v1.0/')
+ self.assertEqual(f.feed.links[0]['rel'], 'self')
+
+ self.assertEqual(len(f.entries), 1)
+ entry = f.entries[0]
+ self.assertEqual(entry.id, 'http://localhost/v1.0/')
+ self.assertEqual(entry.title, 'Version v1.0')
+ self.assertEqual(entry.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(len(entry.content), 1)
+ self.assertEqual(entry.content[0].value,
+ 'Version v1.0 DEPRECATED (2011-01-21T11:33:21Z)')
+ self.assertEqual(len(entry.links), 3)
+ self.assertEqual(entry.links[0]['href'], 'http://localhost/v1.0/')
+ self.assertEqual(entry.links[0]['rel'], 'self')
+ self.assertEqual(entry.links[1], {
+ 'href': 'http://docs.rackspacecloud.com/servers/api/v1.0/'\
+ 'cs-devguide-20110125.pdf',
+ 'type': 'application/pdf',
+ 'rel': 'describedby'})
+ self.assertEqual(entry.links[2], {
+ 'href': 'http://docs.rackspacecloud.com/servers/api/v1.0/'\
+ 'application.wadl',
+ 'type': 'application/vnd.sun.wadl+xml',
+ 'rel': 'describedby'})
def test_get_version_1_1_detail_atom(self):
req = webob.Request.blank('/v1.1/')
@@ -378,36 +340,38 @@ class VersionsTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
self.assertEqual("application/atom+xml", res.content_type)
- expected = """
- <feed xmlns="http://www.w3.org/2005/Atom">
- <title type="text">About This Version</title>
- <updated>2011-01-21T11:33:21Z</updated>
- <id>http://localhost/v1.1/</id>
- <author>
- <name>Rackspace</name>
- <uri>http://www.rackspace.com/</uri>
- </author>
- <link href="http://localhost/v1.1/" rel="self"/>
- <entry>
- <id>http://localhost/v1.1/</id>
- <title type="text">Version v1.1</title>
- <updated>2011-01-21T11:33:21Z</updated>
- <link href="http://localhost/v1.1/"
- rel="self"/>
- <link href="http://docs.rackspacecloud.com/servers/
- api/v1.1/cs-devguide-20110125.pdf"
- rel="describedby" type="application/pdf"/>
- <link href="http://docs.rackspacecloud.com/servers/
- api/v1.1/application.wadl"
- rel="describedby" type="application/vnd.sun.wadl+xml"/>
- <content type="text">
- Version v1.1 CURRENT (2011-01-21T11:33:21Z)
- </content>
- </entry>
- </feed>""".replace(" ", "").replace("\n", "")
-
- actual = res.body.replace(" ", "").replace("\n", "")
- self.assertEqual(expected, actual)
+
+ f = feedparser.parse(res.body)
+ self.assertEqual(f.feed.title, 'About This Version')
+ self.assertEqual(f.feed.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(f.feed.id, 'http://localhost/v1.1/')
+ self.assertEqual(f.feed.author, 'Rackspace')
+ self.assertEqual(f.feed.author_detail.href,
+ 'http://www.rackspace.com/')
+ self.assertEqual(f.feed.links[0]['href'], 'http://localhost/v1.1/')
+ self.assertEqual(f.feed.links[0]['rel'], 'self')
+
+ self.assertEqual(len(f.entries), 1)
+ entry = f.entries[0]
+ self.assertEqual(entry.id, 'http://localhost/v1.1/')
+ self.assertEqual(entry.title, 'Version v1.1')
+ self.assertEqual(entry.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(len(entry.content), 1)
+ self.assertEqual(entry.content[0].value,
+ 'Version v1.1 CURRENT (2011-01-21T11:33:21Z)')
+ self.assertEqual(len(entry.links), 3)
+ self.assertEqual(entry.links[0]['href'], 'http://localhost/v1.1/')
+ self.assertEqual(entry.links[0]['rel'], 'self')
+ self.assertEqual(entry.links[1], {
+ 'href': 'http://docs.rackspacecloud.com/servers/api/v1.1/'\
+ 'cs-devguide-20110125.pdf',
+ 'type': 'application/pdf',
+ 'rel': 'describedby'})
+ self.assertEqual(entry.links[2], {
+ 'href': 'http://docs.rackspacecloud.com/servers/api/v1.1/'\
+ 'application.wadl',
+ 'type': 'application/vnd.sun.wadl+xml',
+ 'rel': 'describedby'})
def test_get_version_list_atom(self):
req = webob.Request.blank('/')
@@ -416,40 +380,37 @@ class VersionsTest(test.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(res.content_type, "application/atom+xml")
- expected = """
- <feed xmlns="http://www.w3.org/2005/Atom">
- <title type="text">Available API Versions</title>
- <updated>2011-01-21T11:33:21Z</updated>
- <id>http://localhost/</id>
- <author>
- <name>Rackspace</name>
- <uri>http://www.rackspace.com/</uri>
- </author>
- <link href="http://localhost/" rel="self"/>
- <entry>
- <id>http://localhost/v1.1/</id>
- <title type="text">Version v1.1</title>
- <updated>2011-01-21T11:33:21Z</updated>
- <link href="http://localhost/v1.1/" rel="self"/>
- <content type="text">
- Version v1.1 CURRENT (2011-01-21T11:33:21Z)
- </content>
- </entry>
- <entry>
- <id>http://localhost/v1.0/</id>
- <title type="text">Version v1.0</title>
- <updated>2011-01-21T11:33:21Z</updated>
- <link href="http://localhost/v1.0/" rel="self"/>
- <content type="text">
- Version v1.0 DEPRECATED (2011-01-21T11:33:21Z)
- </content>
- </entry>
- </feed>
- """.replace(" ", "").replace("\n", "")
-
- actual = res.body.replace(" ", "").replace("\n", "")
-
- self.assertEqual(expected, actual)
+ f = feedparser.parse(res.body)
+ self.assertEqual(f.feed.title, 'Available API Versions')
+ self.assertEqual(f.feed.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(f.feed.id, 'http://localhost/')
+ self.assertEqual(f.feed.author, 'Rackspace')
+ self.assertEqual(f.feed.author_detail.href,
+ 'http://www.rackspace.com/')
+ self.assertEqual(f.feed.links[0]['href'], 'http://localhost/')
+ self.assertEqual(f.feed.links[0]['rel'], 'self')
+
+ self.assertEqual(len(f.entries), 2)
+ entry = f.entries[0]
+ self.assertEqual(entry.id, 'http://localhost/v1.0/')
+ self.assertEqual(entry.title, 'Version v1.0')
+ self.assertEqual(entry.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(len(entry.content), 1)
+ self.assertEqual(entry.content[0].value,
+ 'Version v1.0 DEPRECATED (2011-01-21T11:33:21Z)')
+ self.assertEqual(len(entry.links), 1)
+ self.assertEqual(entry.links[0]['href'], 'http://localhost/v1.0/')
+ self.assertEqual(entry.links[0]['rel'], 'self')
+ entry = f.entries[1]
+ self.assertEqual(entry.id, 'http://localhost/v1.1/')
+ self.assertEqual(entry.title, 'Version v1.1')
+ self.assertEqual(entry.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(len(entry.content), 1)
+ self.assertEqual(entry.content[0].value,
+ 'Version v1.1 CURRENT (2011-01-21T11:33:21Z)')
+ self.assertEqual(len(entry.links), 1)
+ self.assertEqual(entry.links[0]['href'], 'http://localhost/v1.1/')
+ self.assertEqual(entry.links[0]['rel'], 'self')
def test_multi_choice_image(self):
req = webob.Request.blank('/images/1')
@@ -511,28 +472,32 @@ class VersionsTest(test.TestCase):
self.assertEqual(res.status_int, 300)
self.assertEqual(res.content_type, "application/xml")
- expected = """
- <choices xmlns="%s" xmlns:atom="%s">
- <version id="v1.1" status="CURRENT">
- <media-types>
- <media-type base="application/xml"
- type="application/vnd.openstack.compute-v1.1+xml"/>
- <media-type base="application/json"
- type="application/vnd.openstack.compute-v1.1+json"/>
- </media-types>
- <atom:link href="http://localhost/v1.1/images/1" rel="self"/>
- </version>
- <version id="v1.0" status="DEPRECATED">
- <media-types>
- <media-type base="application/xml"
- type="application/vnd.openstack.compute-v1.0+xml"/>
- <media-type base="application/json"
- type="application/vnd.openstack.compute-v1.0+json"/>
- </media-types>
- <atom:link href="http://localhost/v1.0/images/1" rel="self"/>
- </version>
- </choices>""".replace(" ", "").replace("\n", "") % (wsgi.XMLNS_V11,
- wsgi.XMLNS_ATOM)
+ root = etree.XML(res.body)
+ self.assertTrue(root.xpath('/ns:choices', namespaces=NS))
+ versions = root.xpath('ns:version', namespaces=NS)
+ self.assertEqual(len(versions), 2)
+
+ version = versions[0]
+ self.assertEqual(version.get('id'), 'v1.1')
+ self.assertEqual(version.get('status'), 'CURRENT')
+ media_types = version.xpath('ns:media-types/ns:media-type',
+ namespaces=NS)
+ self.assertTrue(common.compare_media_types(media_types,
+ VERSIONS['v1.1']['media-types']))
+ links = version.xpath('atom:link', namespaces=NS)
+ self.assertTrue(common.compare_links(links,
+ [{'rel': 'self', 'href': 'http://localhost/v1.1/images/1'}]))
+
+ version = versions[1]
+ self.assertEqual(version.get('id'), 'v1.0')
+ self.assertEqual(version.get('status'), 'DEPRECATED')
+ media_types = version.xpath('ns:media-types/ns:media-type',
+ namespaces=NS)
+ self.assertTrue(common.compare_media_types(media_types,
+ VERSIONS['v1.0']['media-types']))
+ links = version.xpath('atom:link', namespaces=NS)
+ self.assertTrue(common.compare_links(links,
+ [{'rel': 'self', 'href': 'http://localhost/v1.0/images/1'}]))
def test_multi_choice_server_atom(self):
"""
@@ -665,22 +630,20 @@ class VersionsSerializerTests(test.TestCase):
serializer = versions.VersionsXMLSerializer()
response = serializer.index(versions_data)
- root = xml.etree.ElementTree.XML(response)
- self.assertEqual(root.tag.split('}')[1], "versions")
- self.assertEqual(root.tag.split('}')[0].strip('{'), wsgi.XMLNS_V11)
- version = list(root)[0]
- self.assertEqual(version.tag.split('}')[1], "version")
- self.assertEqual(version.get('id'),
- versions_data['versions'][0]['id'])
+ root = etree.XML(response)
+ self.assertTrue(root.xpath('/ns:versions', namespaces=NS))
+ version_elems = root.xpath('ns:version', namespaces=NS)
+ self.assertEqual(len(version_elems), 1)
+ version = version_elems[0]
+ self.assertEqual(version.get('id'), versions_data['versions'][0]['id'])
self.assertEqual(version.get('status'),
versions_data['versions'][0]['status'])
- link = list(version)[0]
-
- self.assertEqual(link.tag.split('}')[1], "link")
- self.assertEqual(link.tag.split('}')[0].strip('{'), wsgi.XMLNS_ATOM)
- for key, val in versions_data['versions'][0]['links'][0].items():
- self.assertEqual(link.get(key), val)
+ (link,) = version.xpath('atom:link', namespaces=NS)
+ self.assertTrue(common.compare_links(link, [{
+ 'rel': 'self',
+ 'href': 'http://test/2.7.1',
+ 'type': 'application/atom+xml'}]))
def test_versions_multi_xml_serializer(self):
versions_data = {
@@ -703,11 +666,9 @@ class VersionsSerializerTests(test.TestCase):
serializer = versions.VersionsXMLSerializer()
response = serializer.multi(versions_data)
- root = xml.etree.ElementTree.XML(response)
- self.assertEqual(root.tag.split('}')[1], "choices")
- self.assertEqual(root.tag.split('}')[0].strip('{'), wsgi.XMLNS_V11)
- version = list(root)[0]
- self.assertEqual(version.tag.split('}')[1], "version")
+ root = etree.XML(response)
+ self.assertTrue(root.xpath('/ns:choices', namespaces=NS))
+ (version,) = root.xpath('ns:version', namespaces=NS)
self.assertEqual(version.get('id'), versions_data['choices'][0]['id'])
self.assertEqual(version.get('status'),
versions_data['choices'][0]['status'])
@@ -716,19 +677,14 @@ class VersionsSerializerTests(test.TestCase):
media_type_nodes = list(media_types)
self.assertEqual(media_types.tag.split('}')[1], "media-types")
- set_types = versions_data['choices'][0]['media-types']
- for i, type in enumerate(set_types):
- node = media_type_nodes[i]
- self.assertEqual(node.tag.split('}')[1], "media-type")
- for key, val in set_types[i].items():
- self.assertEqual(node.get(key), val)
-
- link = list(version)[1]
+ media_types = version.xpath('ns:media-types/ns:media-type',
+ namespaces=NS)
+ self.assertTrue(common.compare_media_types(media_types,
+ versions_data['choices'][0]['media-types']))
- self.assertEqual(link.tag.split('}')[1], "link")
- self.assertEqual(link.tag.split('}')[0].strip('{'), wsgi.XMLNS_ATOM)
- for key, val in versions_data['choices'][0]['links'][0].items():
- self.assertEqual(link.get(key), val)
+ (link,) = version.xpath('atom:link', namespaces=NS)
+ self.assertTrue(common.compare_links(link,
+ versions_data['choices'][0]['links']))
def test_version_detail_xml_serializer(self):
version_data = {
@@ -770,7 +726,7 @@ class VersionsSerializerTests(test.TestCase):
serializer = versions.VersionsXMLSerializer()
response = serializer.show(version_data)
- root = xml.etree.ElementTree.XML(response)
+ root = etree.XML(response)
self.assertEqual(root.tag.split('}')[1], "version")
self.assertEqual(root.tag.split('}')[0].strip('{'), wsgi.XMLNS_V11)
@@ -811,59 +767,28 @@ class VersionsSerializerTests(test.TestCase):
serializer = versions.VersionsAtomSerializer()
response = serializer.index(versions_data)
-
- root = xml.etree.ElementTree.XML(response)
- self.assertEqual(root.tag.split('}')[1], "feed")
- self.assertEqual(root.tag.split('}')[0].strip('{'),
- "http://www.w3.org/2005/Atom")
-
- children = list(root)
- title = children[0]
- updated = children[1]
- id = children[2]
- author = children[3]
- link = children[4]
- entry = children[5]
-
- self.assertEqual(title.tag.split('}')[1], 'title')
- self.assertEqual(title.text, 'Available API Versions')
- self.assertEqual(updated.tag.split('}')[1], 'updated')
- self.assertEqual(updated.text, '2011-07-20T11:40:00Z')
- self.assertEqual(id.tag.split('}')[1], 'id')
- self.assertEqual(id.text, 'http://test/')
-
- self.assertEqual(author.tag.split('}')[1], 'author')
- author_name = list(author)[0]
- author_uri = list(author)[1]
- self.assertEqual(author_name.tag.split('}')[1], 'name')
- self.assertEqual(author_name.text, 'Rackspace')
- self.assertEqual(author_uri.tag.split('}')[1], 'uri')
- self.assertEqual(author_uri.text, 'http://www.rackspace.com/')
-
- self.assertEqual(link.get('href'), 'http://test/')
- self.assertEqual(link.get('rel'), 'self')
-
- self.assertEqual(entry.tag.split('}')[1], 'entry')
- entry_children = list(entry)
- entry_id = entry_children[0]
- entry_title = entry_children[1]
- entry_updated = entry_children[2]
- entry_link = entry_children[3]
- entry_content = entry_children[4]
- self.assertEqual(entry_id.tag.split('}')[1], "id")
- self.assertEqual(entry_id.text, "http://test/2.9.8")
- self.assertEqual(entry_title.tag.split('}')[1], "title")
- self.assertEqual(entry_title.get('type'), "text")
- self.assertEqual(entry_title.text, "Version 2.9.8")
- self.assertEqual(entry_updated.tag.split('}')[1], "updated")
- self.assertEqual(entry_updated.text, "2011-07-20T11:40:00Z")
- self.assertEqual(entry_link.tag.split('}')[1], "link")
- self.assertEqual(entry_link.get('href'), "http://test/2.9.8")
- self.assertEqual(entry_link.get('rel'), "self")
- self.assertEqual(entry_content.tag.split('}')[1], "content")
- self.assertEqual(entry_content.get('type'), "text")
- self.assertEqual(entry_content.text,
- "Version 2.9.8 CURRENT (2011-07-20T11:40:00Z)")
+ f = feedparser.parse(response)
+
+ self.assertEqual(f.feed.title, 'Available API Versions')
+ self.assertEqual(f.feed.updated, '2011-07-20T11:40:00Z')
+ self.assertEqual(f.feed.id, 'http://test/')
+ self.assertEqual(f.feed.author, 'Rackspace')
+ self.assertEqual(f.feed.author_detail.href,
+ 'http://www.rackspace.com/')
+ self.assertEqual(f.feed.links[0]['href'], 'http://test/')
+ self.assertEqual(f.feed.links[0]['rel'], 'self')
+
+ self.assertEqual(len(f.entries), 1)
+ entry = f.entries[0]
+ self.assertEqual(entry.id, 'http://test/2.9.8')
+ self.assertEqual(entry.title, 'Version 2.9.8')
+ self.assertEqual(entry.updated, '2011-07-20T11:40:00Z')
+ self.assertEqual(len(entry.content), 1)
+ self.assertEqual(entry.content[0].value,
+ 'Version 2.9.8 CURRENT (2011-07-20T11:40:00Z)')
+ self.assertEqual(len(entry.links), 1)
+ self.assertEqual(entry.links[0]['href'], 'http://test/2.9.8')
+ self.assertEqual(entry.links[0]['rel'], 'self')
def test_version_detail_atom_serializer(self):
versions_data = {
@@ -904,63 +829,36 @@ class VersionsSerializerTests(test.TestCase):
serializer = versions.VersionsAtomSerializer()
response = serializer.show(versions_data)
-
- root = xml.etree.ElementTree.XML(response)
- self.assertEqual(root.tag.split('}')[1], "feed")
- self.assertEqual(root.tag.split('}')[0].strip('{'),
- "http://www.w3.org/2005/Atom")
-
- children = list(root)
- title = children[0]
- updated = children[1]
- id = children[2]
- author = children[3]
- link = children[4]
- entry = children[5]
-
- self.assertEqual(root.tag.split('}')[1], 'feed')
- self.assertEqual(title.tag.split('}')[1], 'title')
- self.assertEqual(title.text, 'About This Version')
- self.assertEqual(updated.tag.split('}')[1], 'updated')
- self.assertEqual(updated.text, '2011-01-21T11:33:21Z')
- self.assertEqual(id.tag.split('}')[1], 'id')
- self.assertEqual(id.text, 'http://localhost/v1.1/')
-
- self.assertEqual(author.tag.split('}')[1], 'author')
- author_name = list(author)[0]
- author_uri = list(author)[1]
- self.assertEqual(author_name.tag.split('}')[1], 'name')
- self.assertEqual(author_name.text, 'Rackspace')
- self.assertEqual(author_uri.tag.split('}')[1], 'uri')
- self.assertEqual(author_uri.text, 'http://www.rackspace.com/')
-
- self.assertEqual(link.get('href'),
- 'http://localhost/v1.1/')
- self.assertEqual(link.get('rel'), 'self')
-
- self.assertEqual(entry.tag.split('}')[1], 'entry')
- entry_children = list(entry)
- entry_id = entry_children[0]
- entry_title = entry_children[1]
- entry_updated = entry_children[2]
- entry_links = (entry_children[3], entry_children[4], entry_children[5])
- entry_content = entry_children[6]
-
- self.assertEqual(entry_id.tag.split('}')[1], "id")
- self.assertEqual(entry_id.text,
- "http://localhost/v1.1/")
- self.assertEqual(entry_title.tag.split('}')[1], "title")
- self.assertEqual(entry_title.get('type'), "text")
- self.assertEqual(entry_title.text, "Version v1.1")
- self.assertEqual(entry_updated.tag.split('}')[1], "updated")
- self.assertEqual(entry_updated.text, "2011-01-21T11:33:21Z")
-
- for i, link in enumerate(versions_data["version"]["links"]):
- self.assertEqual(entry_links[i].tag.split('}')[1], "link")
- for key, val in versions_data["version"]["links"][i].items():
- self.assertEqual(entry_links[i].get(key), val)
-
- self.assertEqual(entry_content.tag.split('}')[1], "content")
- self.assertEqual(entry_content.get('type'), "text")
- self.assertEqual(entry_content.text,
- "Version v1.1 CURRENT (2011-01-21T11:33:21Z)")
+ f = feedparser.parse(response)
+
+ self.assertEqual(f.feed.title, 'About This Version')
+ self.assertEqual(f.feed.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(f.feed.id, 'http://localhost/v1.1/')
+ self.assertEqual(f.feed.author, 'Rackspace')
+ self.assertEqual(f.feed.author_detail.href,
+ 'http://www.rackspace.com/')
+ self.assertEqual(f.feed.links[0]['href'], 'http://localhost/v1.1/')
+ self.assertEqual(f.feed.links[0]['rel'], 'self')
+
+ self.assertEqual(len(f.entries), 1)
+ entry = f.entries[0]
+ self.assertEqual(entry.id, 'http://localhost/v1.1/')
+ self.assertEqual(entry.title, 'Version v1.1')
+ self.assertEqual(entry.updated, '2011-01-21T11:33:21Z')
+ self.assertEqual(len(entry.content), 1)
+ self.assertEqual(entry.content[0].value,
+ 'Version v1.1 CURRENT (2011-01-21T11:33:21Z)')
+ self.assertEqual(len(entry.links), 3)
+ self.assertEqual(entry.links[0]['href'], 'http://localhost/v1.1/')
+ self.assertEqual(entry.links[0]['rel'], 'self')
+ self.assertEqual(entry.links[1], {
+ 'rel': 'describedby',
+ 'type': 'application/pdf',
+ 'href': 'http://docs.rackspacecloud.com/'
+ 'servers/api/v1.1/cs-devguide-20110125.pdf'})
+ self.assertEqual(entry.links[2], {
+ 'rel': 'describedby',
+ 'type': 'application/vnd.sun.wadl+xml',
+ 'href': 'http://docs.rackspacecloud.com/'
+ 'servers/api/v1.1/application.wadl',
+ })
diff --git a/nova/tests/api/openstack/test_wsgi.py b/nova/tests/api/openstack/test_wsgi.py
index 6dea78d17..74b9ce853 100644
--- a/nova/tests/api/openstack/test_wsgi.py
+++ b/nova/tests/api/openstack/test_wsgi.py
@@ -27,17 +27,17 @@ class RequestTest(test.TestCase):
result = request.get_content_type()
self.assertEqual(result, "application/json")
- def test_content_type_from_accept_xml(self):
- request = wsgi.Request.blank('/tests/123')
- request.headers["Accept"] = "application/xml"
- result = request.best_match_content_type()
- self.assertEqual(result, "application/xml")
-
- request = wsgi.Request.blank('/tests/123')
- request.headers["Accept"] = "application/json"
- result = request.best_match_content_type()
- self.assertEqual(result, "application/json")
-
+ def test_content_type_from_accept(self):
+ for content_type in ('application/xml',
+ 'application/vnd.openstack.compute+xml',
+ 'application/json',
+ 'application/vnd.openstack.compute+json'):
+ request = wsgi.Request.blank('/tests/123')
+ request.headers["Accept"] = content_type
+ result = request.best_match_content_type()
+ self.assertEqual(result, content_type)
+
+ def test_content_type_from_accept_best(self):
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/xml, application/json"
result = request.best_match_content_type()
@@ -231,7 +231,7 @@ class ResponseSerializerTest(test.TestCase):
self.body_serializers = {
'application/json': JSONSerializer(),
- 'application/XML': XMLSerializer(),
+ 'application/xml': XMLSerializer(),
}
self.serializer = wsgi.ResponseSerializer(self.body_serializers,
@@ -250,15 +250,24 @@ class ResponseSerializerTest(test.TestCase):
self.serializer.get_body_serializer,
'application/unknown')
- def test_serialize_response(self):
- response = self.serializer.serialize({}, 'application/json')
- self.assertEqual(response.headers['Content-Type'], 'application/json')
- self.assertEqual(response.body, 'pew_json')
- self.assertEqual(response.status_int, 404)
+ def test_serialize_response_json(self):
+ for content_type in ('application/json',
+ 'application/vnd.openstack.compute+json'):
+ response = self.serializer.serialize({}, content_type)
+ self.assertEqual(response.headers['Content-Type'], content_type)
+ self.assertEqual(response.body, 'pew_json')
+ self.assertEqual(response.status_int, 404)
+
+ def test_serialize_response_xml(self):
+ for content_type in ('application/xml',
+ 'application/vnd.openstack.compute+xml'):
+ response = self.serializer.serialize({}, content_type)
+ self.assertEqual(response.headers['Content-Type'], content_type)
+ self.assertEqual(response.body, 'pew_xml')
+ self.assertEqual(response.status_int, 404)
def test_serialize_response_None(self):
response = self.serializer.serialize(None, 'application/json')
- print response
self.assertEqual(response.headers['Content-Type'], 'application/json')
self.assertEqual(response.body, '')
self.assertEqual(response.status_int, 404)
@@ -281,7 +290,7 @@ class RequestDeserializerTest(test.TestCase):
self.body_deserializers = {
'application/json': JSONDeserializer(),
- 'application/XML': XMLDeserializer(),
+ 'application/xml': XMLDeserializer(),
}
self.deserializer = wsgi.RequestDeserializer(self.body_deserializers)
@@ -290,8 +299,9 @@ class RequestDeserializerTest(test.TestCase):
pass
def test_get_deserializer(self):
- expected = self.deserializer.get_body_deserializer('application/json')
- self.assertEqual(expected, self.body_deserializers['application/json'])
+ ctype = 'application/json'
+ expected = self.deserializer.get_body_deserializer(ctype)
+ self.assertEqual(expected, self.body_deserializers[ctype])
def test_get_deserializer_unknown_content_type(self):
self.assertRaises(exception.InvalidContentType,
@@ -299,10 +309,11 @@ class RequestDeserializerTest(test.TestCase):
'application/unknown')
def test_get_expected_content_type(self):
+ ctype = 'application/json'
request = wsgi.Request.blank('/')
- request.headers['Accept'] = 'application/json'
+ request.headers['Accept'] = ctype
self.assertEqual(self.deserializer.get_expected_content_type(request),
- 'application/json')
+ ctype)
def test_get_action_args(self):
env = {
diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py
index 19028a451..cdbfba63a 100644
--- a/nova/tests/db/fakes.py
+++ b/nova/tests/db/fakes.py
@@ -125,10 +125,11 @@ def stub_out_db_network_api(stubs):
if ips[0]['fixed_ip']:
fixed_ip_address = ips[0]['fixed_ip']['address']
ips[0]['fixed_ip'] = None
+ ips[0]['host'] = None
return fixed_ip_address
def fake_floating_ip_fixed_ip_associate(context, floating_address,
- fixed_address):
+ fixed_address, host):
float = filter(lambda i: i['address'] == floating_address,
floating_ips)
fixed = filter(lambda i: i['address'] == fixed_address,
@@ -136,6 +137,7 @@ def stub_out_db_network_api(stubs):
if float and fixed:
float[0]['fixed_ip'] = fixed[0]
float[0]['fixed_ip_id'] = fixed[0]['id']
+ float[0]['host'] = host
def fake_floating_ip_get_all_by_host(context, host):
# TODO(jkoelker): Once we get the patches that remove host from
diff --git a/nova/tests/fake_network.py b/nova/tests/fake_network.py
new file mode 100644
index 000000000..142206755
--- /dev/null
+++ b/nova/tests/fake_network.py
@@ -0,0 +1,194 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Rackspace
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova import db
+from nova import flags
+from nova import test
+from nova.network import manager as network_manager
+
+
+HOST = "testhost"
+FLAGS = flags.FLAGS
+
+
+class FakeIptablesFirewallDriver(object):
+ def __init__(self, **kwargs):
+ pass
+
+ def setattr(self, key, val):
+ self.__setattr__(key, val)
+
+ def apply_instance_filter(self, instance, network_info):
+ pass
+
+
+class FakeVIFDriver(object):
+
+ def __init__(self, **kwargs):
+ pass
+
+ def setattr(self, key, val):
+ self.__setattr__(key, val)
+
+ def plug(self, instance, network, mapping):
+ return {
+ 'id': 'fake',
+ 'bridge_name': 'fake',
+ 'mac_address': 'fake',
+ 'ip_address': 'fake',
+ 'dhcp_server': 'fake',
+ 'extra_params': 'fake',
+ }
+
+
+class FakeModel(dict):
+ """Represent a model from the db"""
+ def __init__(self, *args, **kwargs):
+ self.update(kwargs)
+
+ def __getattr__(self, name):
+ return self[name]
+
+
+flavor = {'id': 0,
+ 'name': 'fake_flavor',
+ 'memory_mb': 2048,
+ 'vcpus': 2,
+ 'local_gb': 10,
+ 'flavor_id': 0,
+ 'swap': 0,
+ 'rxtx_quota': 0,
+ 'rxtx_cap': 3}
+
+
+def fake_network(network_id, ipv6=None):
+ if ipv6 is None:
+ ipv6 = FLAGS.use_ipv6
+ fake_network = {'id': network_id,
+ 'label': 'test%d' % network_id,
+ 'injected': False,
+ 'multi_host': False,
+ 'cidr': '192.168.%d.0/24' % network_id,
+ 'cidr_v6': None,
+ 'netmask': '255.255.255.0',
+ 'netmask_v6': None,
+ 'bridge': 'fake_br%d' % network_id,
+ 'bridge_interface': 'fake_eth%d' % network_id,
+ 'gateway': '192.168.%d.1' % network_id,
+ 'gateway_v6': None,
+ 'broadcast': '192.168.%d.255' % network_id,
+ 'dns1': '192.168.%d.3' % network_id,
+ 'dns2': '192.168.%d.4' % network_id,
+ 'vlan': None,
+ 'host': None,
+ 'project_id': 'fake_project',
+ 'vpn_public_address': '192.168.%d.2' % network_id}
+ if ipv6:
+ fake_network['cidr_v6'] = '2001:db8:0:%x::/64' % network_id
+ fake_network['gateway_v6'] = '2001:db8:0:%x::1' % network_id
+ fake_network['netmask_v6'] = '64'
+
+ return fake_network
+
+
+def vifs(n):
+ for x in xrange(n):
+ yield {'id': x,
+ 'address': 'DE:AD:BE:EF:00:%02x' % x,
+ 'uuid': '00000000-0000-0000-0000-00000000000000%02d' % x,
+ 'network_id': x,
+ 'network': FakeModel(**fake_network(x)),
+ 'instance_id': 0}
+
+
+def floating_ip_ids():
+ for i in xrange(99):
+ yield i
+
+
+def fixed_ip_ids():
+ for i in xrange(99):
+ yield i
+
+
+floating_ip_id = floating_ip_ids()
+fixed_ip_id = fixed_ip_ids()
+
+
+def next_fixed_ip(network_id, num_floating_ips=0):
+ next_id = fixed_ip_id.next()
+ f_ips = [FakeModel(**next_floating_ip(next_id))
+ for i in xrange(num_floating_ips)]
+ return {'id': next_id,
+ 'network_id': network_id,
+ 'address': '192.168.%d.1%02d' % (network_id, next_id),
+ 'instance_id': 0,
+ 'allocated': False,
+ # and since network_id and vif_id happen to be equivalent
+ 'virtual_interface_id': network_id,
+ 'floating_ips': f_ips}
+
+
+def next_floating_ip(fixed_ip_id):
+ next_id = floating_ip_id.next()
+ return {'id': next_id,
+ 'address': '10.10.10.1%02d' % next_id,
+ 'fixed_ip_id': fixed_ip_id,
+ 'project_id': None,
+ 'auto_assigned': False}
+
+
+def ipv4_like(ip, match_string):
+ ip = ip.split('.')
+ match_octets = match_string.split('.')
+
+ for i, octet in enumerate(match_octets):
+ if octet == '*':
+ continue
+ if octet != ip[i]:
+ return False
+ return True
+
+
+def fake_get_instance_nw_info(stubs, num_networks=1, ips_per_vif=2,
+ floating_ips_per_fixed_ip=0):
+ # stubs is the self.stubs from the test
+ # ips_per_vif is the number of ips each vif will have
+ # num_floating_ips is number of float ips for each fixed ip
+ network = network_manager.FlatManager(host=HOST)
+ network.db = db
+
+ # reset the fixed and floating ip generators
+ global floating_ip_id, fixed_ip_id
+ floating_ip_id = floating_ip_ids()
+ fixed_ip_id = fixed_ip_ids()
+
+ def fixed_ips_fake(*args, **kwargs):
+ return [next_fixed_ip(i, floating_ips_per_fixed_ip)
+ for i in xrange(num_networks) for j in xrange(ips_per_vif)]
+
+ def virtual_interfaces_fake(*args, **kwargs):
+ return [vif for vif in vifs(num_networks)]
+
+ def instance_type_fake(*args, **kwargs):
+ return flavor
+
+ stubs.Set(db, 'fixed_ip_get_by_instance', fixed_ips_fake)
+ stubs.Set(db, 'virtual_interface_get_by_instance', virtual_interfaces_fake)
+ stubs.Set(db, 'instance_type_get', instance_type_fake)
+
+ return network.get_instance_nw_info(None, 0, 0, None)
diff --git a/nova/tests/glance/stubs.py b/nova/tests/glance/stubs.py
index f2a19f22d..1567393e3 100644
--- a/nova/tests/glance/stubs.py
+++ b/nova/tests/glance/stubs.py
@@ -16,14 +16,15 @@
import StringIO
-import nova.image
+from nova import exception
+from nova.image import glance
def stubout_glance_client(stubs):
- def fake_get_glance_client(image_href):
+ def fake_get_glance_client(context, image_href):
image_id = int(str(image_href).split('/')[-1])
return (FakeGlance('foo'), image_id)
- stubs.Set(nova.image, 'get_glance_client', fake_get_glance_client)
+ stubs.Set(glance, 'get_glance_client', fake_get_glance_client)
class FakeGlance(object):
@@ -78,3 +79,70 @@ class FakeGlance(object):
def get_image(self, image_id):
image = self.IMAGE_FIXTURES[int(image_id)]
return image['image_meta'], image['image_data']
+
+
+NOW_GLANCE_FORMAT = "2010-10-11T10:30:22"
+
+
+class StubGlanceClient(object):
+
+ def __init__(self, images=None):
+ self.images = []
+ _images = images or []
+ map(lambda image: self.add_image(image, None), _images)
+
+ def set_auth_token(self, auth_tok):
+ pass
+
+ def get_image_meta(self, image_id):
+ for image in self.images:
+ if image['id'] == str(image_id):
+ return image
+ raise exception.ImageNotFound(image_id=image_id)
+
+ #TODO(bcwaldon): implement filters
+ def get_images_detailed(self, filters=None, marker=None, limit=3):
+ if marker is None:
+ index = 0
+ else:
+ for index, image in enumerate(self.images):
+ if image['id'] == str(marker):
+ index += 1
+ break
+
+ return self.images[index:index + limit]
+
+ def get_image(self, image_id):
+ return self.get_image_meta(image_id), []
+
+ def add_image(self, metadata, data):
+ metadata['created_at'] = NOW_GLANCE_FORMAT
+ metadata['updated_at'] = NOW_GLANCE_FORMAT
+
+ self.images.append(metadata)
+
+ try:
+ image_id = str(metadata['id'])
+ except KeyError:
+ # auto-generate an id if one wasn't provided
+ image_id = str(len(self.images))
+
+ self.images[-1]['id'] = image_id
+
+ return self.images[-1]
+
+ def update_image(self, image_id, metadata, data):
+ for i, image in enumerate(self.images):
+ if image['id'] == str(image_id):
+ if 'id' in metadata:
+ metadata['id'] = str(metadata['id'])
+ self.images[i].update(metadata)
+ return self.images[i]
+ raise exception.ImageNotFound(image_id=image_id)
+
+ def delete_image(self, image_id):
+ for i, image in enumerate(self.images):
+ if image['id'] == image_id:
+ del self.images[i]
+ return
+ raise exception.ImageNotFound(image_id=image_id)
diff --git a/nova/tests/image/test_glance.py b/nova/tests/image/test_glance.py
index b1ebd8436..290c9a04a 100644
--- a/nova/tests/image/test_glance.py
+++ b/nova/tests/image/test_glance.py
@@ -17,47 +17,14 @@
import datetime
-import unittest
+import stubout
+from nova.tests.api.openstack import fakes
from nova import context
from nova import exception
-from nova import test
from nova.image import glance
-
-
-class StubGlanceClient(object):
-
- def __init__(self, images, add_response=None, update_response=None):
- self.images = images
- self.add_response = add_response
- self.update_response = update_response
-
- def set_auth_token(self, auth_tok):
- pass
-
- def get_image_meta(self, image_id):
- return self.images[image_id]
-
- def get_images_detailed(self, filters=None, marker=None, limit=None):
- images = self.images.values()
- if marker is None:
- index = 0
- else:
- for index, image in enumerate(images):
- if image['id'] == marker:
- index += 1
- break
- # default to a page size of 3 to ensure we flex the pagination code
- return images[index:index + 3]
-
- def get_image(self, image_id):
- return self.images[image_id], []
-
- def add_image(self, metadata, data):
- return self.add_response
-
- def update_image(self, image_id, metadata, data):
- return self.update_response
+from nova import test
+from nova.tests.glance import stubs as glance_stubs
class NullWriter(object):
@@ -67,218 +34,7 @@ class NullWriter(object):
pass
-class BaseGlanceTest(unittest.TestCase):
- NOW_GLANCE_OLD_FORMAT = "2010-10-11T10:30:22"
- NOW_GLANCE_FORMAT = "2010-10-11T10:30:22.000000"
- NOW_DATETIME = datetime.datetime(2010, 10, 11, 10, 30, 22)
-
- def setUp(self):
- self.client = StubGlanceClient(None)
- self.service = glance.GlanceImageService(client=self.client)
- self.context = context.RequestContext(None, None)
-
- def assertDateTimesFilled(self, image_meta):
- self.assertEqual(image_meta['created_at'], self.NOW_DATETIME)
- self.assertEqual(image_meta['updated_at'], self.NOW_DATETIME)
- self.assertEqual(image_meta['deleted_at'], self.NOW_DATETIME)
-
- def assertDateTimesEmpty(self, image_meta):
- self.assertEqual(image_meta['updated_at'], None)
- self.assertEqual(image_meta['deleted_at'], None)
-
- def assertDateTimesBlank(self, image_meta):
- self.assertEqual(image_meta['updated_at'], '')
- self.assertEqual(image_meta['deleted_at'], '')
-
-
-class TestGlanceImageServiceProperties(BaseGlanceTest):
- def test_show_passes_through_to_client(self):
- """Ensure attributes which aren't BASE_IMAGE_ATTRS are stored in the
- properties dict
- """
- fixtures = {'image1': {'id': '1', 'name': 'image1', 'is_public': True,
- 'foo': 'bar',
- 'properties': {'prop1': 'propvalue1'}}}
- self.client.images = fixtures
- image_meta = self.service.show(self.context, 'image1')
-
- expected = {'id': '1', 'name': 'image1', 'is_public': True,
- 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}
- self.assertEqual(image_meta, expected)
-
- def test_show_raises_when_no_authtoken_in_the_context(self):
- fixtures = {'image1': {'name': 'image1', 'is_public': False,
- 'foo': 'bar',
- 'properties': {'prop1': 'propvalue1'}}}
- self.client.images = fixtures
- self.context.auth_token = False
-
- expected = {'name': 'image1', 'is_public': True,
- 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}
- self.assertRaises(exception.ImageNotFound,
- self.service.show, self.context, 'image1')
-
- def test_show_passes_through_to_client_with_authtoken_in_context(self):
- fixtures = {'image1': {'name': 'image1', 'is_public': False,
- 'foo': 'bar',
- 'properties': {'prop1': 'propvalue1'}}}
- self.client.images = fixtures
- self.context.auth_token = True
-
- expected = {'name': 'image1', 'is_public': False,
- 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}
-
- image_meta = self.service.show(self.context, 'image1')
- self.assertEqual(image_meta, expected)
-
- def test_detail_passes_through_to_client(self):
- fixtures = {'image1': {'id': '1', 'name': 'image1', 'is_public': True,
- 'foo': 'bar',
- 'properties': {'prop1': 'propvalue1'}}}
- self.client.images = fixtures
- image_meta = self.service.detail(self.context)
- expected = [{'id': '1', 'name': 'image1', 'is_public': True,
- 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}]
- self.assertEqual(image_meta, expected)
-
-
-class TestGetterDateTimeNoneTests(BaseGlanceTest):
-
- def test_show_handles_none_datetimes(self):
- self.client.images = self._make_none_datetime_fixtures()
- image_meta = self.service.show(self.context, 'image1')
- self.assertDateTimesEmpty(image_meta)
-
- def test_show_handles_blank_datetimes(self):
- self.client.images = self._make_blank_datetime_fixtures()
- image_meta = self.service.show(self.context, 'image1')
- self.assertDateTimesBlank(image_meta)
-
- def test_detail_handles_none_datetimes(self):
- self.client.images = self._make_none_datetime_fixtures()
- image_meta = self.service.detail(self.context)[0]
- self.assertDateTimesEmpty(image_meta)
-
- def test_detail_handles_blank_datetimes(self):
- self.client.images = self._make_blank_datetime_fixtures()
- image_meta = self.service.detail(self.context)[0]
- self.assertDateTimesBlank(image_meta)
-
- def test_get_handles_none_datetimes(self):
- self.client.images = self._make_none_datetime_fixtures()
- writer = NullWriter()
- image_meta = self.service.get(self.context, 'image1', writer)
- self.assertDateTimesEmpty(image_meta)
-
- def test_get_handles_blank_datetimes(self):
- self.client.images = self._make_blank_datetime_fixtures()
- writer = NullWriter()
- image_meta = self.service.get(self.context, 'image1', writer)
- self.assertDateTimesBlank(image_meta)
-
- def test_show_makes_datetimes(self):
- self.client.images = self._make_datetime_fixtures()
- image_meta = self.service.show(self.context, 'image1')
- self.assertDateTimesFilled(image_meta)
- image_meta = self.service.show(self.context, 'image2')
- self.assertDateTimesFilled(image_meta)
-
- def test_detail_makes_datetimes(self):
- self.client.images = self._make_datetime_fixtures()
- image_meta = self.service.detail(self.context)[0]
- self.assertDateTimesFilled(image_meta)
- image_meta = self.service.detail(self.context)[1]
- self.assertDateTimesFilled(image_meta)
-
- def test_get_makes_datetimes(self):
- self.client.images = self._make_datetime_fixtures()
- writer = NullWriter()
- image_meta = self.service.get(self.context, 'image1', writer)
- self.assertDateTimesFilled(image_meta)
- image_meta = self.service.get(self.context, 'image2', writer)
- self.assertDateTimesFilled(image_meta)
-
- def _make_datetime_fixtures(self):
- fixtures = {
- 'image1': {
- 'id': '1',
- 'name': 'image1',
- 'is_public': True,
- 'created_at': self.NOW_GLANCE_FORMAT,
- 'updated_at': self.NOW_GLANCE_FORMAT,
- 'deleted_at': self.NOW_GLANCE_FORMAT,
- },
- 'image2': {
- 'id': '2',
- 'name': 'image2',
- 'is_public': True,
- 'created_at': self.NOW_GLANCE_OLD_FORMAT,
- 'updated_at': self.NOW_GLANCE_OLD_FORMAT,
- 'deleted_at': self.NOW_GLANCE_OLD_FORMAT,
- },
- }
- return fixtures
-
- def _make_none_datetime_fixtures(self):
- fixtures = {'image1': {'id': '1',
- 'name': 'image1',
- 'is_public': True,
- 'updated_at': None,
- 'deleted_at': None}}
- return fixtures
-
- def _make_blank_datetime_fixtures(self):
- fixtures = {'image1': {'id': '1',
- 'name': 'image1',
- 'is_public': True,
- 'updated_at': '',
- 'deleted_at': ''}}
- return fixtures
-
-
-class TestMutatorDateTimeTests(BaseGlanceTest):
- """Tests create(), update()"""
-
- def test_create_handles_datetimes(self):
- self.client.add_response = self._make_datetime_fixture()
- image_meta = self.service.create(self.context, {})
- self.assertDateTimesFilled(image_meta)
-
- def test_create_handles_none_datetimes(self):
- self.client.add_response = self._make_none_datetime_fixture()
- dummy_meta = {}
- image_meta = self.service.create(self.context, dummy_meta)
- self.assertDateTimesEmpty(image_meta)
-
- def test_update_handles_datetimes(self):
- self.client.images = {'image1': self._make_datetime_fixture()}
- self.client.update_response = self._make_datetime_fixture()
- dummy_meta = {}
- image_meta = self.service.update(self.context, 'image1', dummy_meta)
- self.assertDateTimesFilled(image_meta)
-
- def test_update_handles_none_datetimes(self):
- self.client.images = {'image1': self._make_datetime_fixture()}
- self.client.update_response = self._make_none_datetime_fixture()
- dummy_meta = {}
- image_meta = self.service.update(self.context, 'image1', dummy_meta)
- self.assertDateTimesEmpty(image_meta)
-
- def _make_datetime_fixture(self):
- fixture = {'id': 'image1', 'name': 'image1', 'is_public': True,
- 'created_at': self.NOW_GLANCE_FORMAT,
- 'updated_at': self.NOW_GLANCE_FORMAT,
- 'deleted_at': self.NOW_GLANCE_FORMAT}
- return fixture
-
- def _make_none_datetime_fixture(self):
- fixture = {'id': 'image1', 'name': 'image1', 'is_public': True,
- 'updated_at': None,
- 'deleted_at': None}
- return fixture
-
-
-class TestGlanceSerializer(unittest.TestCase):
+class TestGlanceSerializer(test.TestCase):
def test_serialize(self):
metadata = {'name': 'image1',
'is_public': True,
@@ -312,3 +68,386 @@ class TestGlanceSerializer(unittest.TestCase):
converted = glance._convert_to_string(metadata)
self.assertEqual(converted, converted_expected)
self.assertEqual(glance._convert_from_string(converted), metadata)
+
+
+class TestGlanceImageService(test.TestCase):
+ """
+ Tests the Glance image service.
+
+ At a high level, the translations involved are:
+
+ 1. Glance -> ImageService - This is needed so we can support
+ multple ImageServices (Glance, Local, etc)
+
+ 2. ImageService -> API - This is needed so we can support multple
+ APIs (OpenStack, EC2)
+
+ """
+ NOW_GLANCE_OLD_FORMAT = "2010-10-11T10:30:22"
+ NOW_GLANCE_FORMAT = "2010-10-11T10:30:22.000000"
+ NOW_DATETIME = datetime.datetime(2010, 10, 11, 10, 30, 22)
+
+ def setUp(self):
+ super(TestGlanceImageService, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ fakes.stub_out_compute_api_snapshot(self.stubs)
+ client = glance_stubs.StubGlanceClient()
+ self.service = glance.GlanceImageService(client=client)
+ self.context = context.RequestContext('fake', 'fake', auth_token=True)
+ self.service.delete_all()
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ super(TestGlanceImageService, self).tearDown()
+
+ @staticmethod
+ def _make_fixture(**kwargs):
+ fixture = {'name': None,
+ 'properties': {},
+ 'status': None,
+ 'is_public': None}
+ fixture.update(kwargs)
+ return fixture
+
+ def _make_datetime_fixture(self):
+ return self._make_fixture(created_at=self.NOW_GLANCE_FORMAT,
+ updated_at=self.NOW_GLANCE_FORMAT,
+ deleted_at=self.NOW_GLANCE_FORMAT)
+
+ def test_create_with_instance_id(self):
+ """Ensure instance_id is persisted as an image-property"""
+ fixture = {'name': 'test image',
+ 'is_public': False,
+ 'properties': {'instance_id': '42', 'user_id': 'fake'}}
+
+ image_id = self.service.create(self.context, fixture)['id']
+ image_meta = self.service.show(self.context, image_id)
+ expected = {
+ 'id': image_id,
+ 'name': 'test image',
+ 'is_public': False,
+ 'size': None,
+ 'location': None,
+ 'disk_format': None,
+ 'container_format': None,
+ 'checksum': None,
+ 'created_at': self.NOW_DATETIME,
+ 'updated_at': self.NOW_DATETIME,
+ 'deleted_at': None,
+ 'deleted': None,
+ 'status': None,
+ 'properties': {'instance_id': '42', 'user_id': 'fake'},
+ }
+ self.assertDictMatch(image_meta, expected)
+
+ image_metas = self.service.detail(self.context)
+ self.assertDictMatch(image_metas[0], expected)
+
+ def test_create_without_instance_id(self):
+ """
+ Ensure we can create an image without having to specify an
+ instance_id. Public images are an example of an image not tied to an
+ instance.
+ """
+ fixture = {'name': 'test image', 'is_public': False}
+ image_id = self.service.create(self.context, fixture)['id']
+
+ expected = {
+ 'id': image_id,
+ 'name': 'test image',
+ 'is_public': False,
+ 'size': None,
+ 'location': None,
+ 'disk_format': None,
+ 'container_format': None,
+ 'checksum': None,
+ 'created_at': self.NOW_DATETIME,
+ 'updated_at': self.NOW_DATETIME,
+ 'deleted_at': None,
+ 'deleted': None,
+ 'status': None,
+ 'properties': {},
+ }
+ actual = self.service.show(self.context, image_id)
+ self.assertDictMatch(actual, expected)
+
+ def test_create(self):
+ fixture = self._make_fixture(name='test image')
+ num_images = len(self.service.index(self.context))
+ image_id = self.service.create(self.context, fixture)['id']
+
+ self.assertNotEquals(None, image_id)
+ self.assertEquals(num_images + 1,
+ len(self.service.index(self.context)))
+
+ def test_create_and_show_non_existing_image(self):
+ fixture = self._make_fixture(name='test image')
+ image_id = self.service.create(self.context, fixture)['id']
+
+ self.assertNotEquals(None, image_id)
+ self.assertRaises(exception.NotFound,
+ self.service.show,
+ self.context,
+ 'bad image id')
+
+ def test_create_and_show_non_existing_image_by_name(self):
+ fixture = self._make_fixture(name='test image')
+ image_id = self.service.create(self.context, fixture)['id']
+
+ self.assertNotEquals(None, image_id)
+ self.assertRaises(exception.ImageNotFound,
+ self.service.show_by_name,
+ self.context,
+ 'bad image id')
+
+ def test_index(self):
+ fixture = self._make_fixture(name='test image')
+ image_id = self.service.create(self.context, fixture)['id']
+ image_metas = self.service.index(self.context)
+ expected = [{'id': image_id, 'name': 'test image'}]
+ self.assertDictListMatch(image_metas, expected)
+
+ def test_index_default_limit(self):
+ fixtures = []
+ ids = []
+ for i in range(10):
+ fixture = self._make_fixture(name='TestImage %d' % (i))
+ fixtures.append(fixture)
+ ids.append(self.service.create(self.context, fixture)['id'])
+
+ image_metas = self.service.index(self.context)
+ i = 0
+ for meta in image_metas:
+ expected = {'id': 'DONTCARE',
+ 'name': 'TestImage %d' % (i)}
+ self.assertDictMatch(meta, expected)
+ i = i + 1
+
+ def test_index_marker(self):
+ fixtures = []
+ ids = []
+ for i in range(10):
+ fixture = self._make_fixture(name='TestImage %d' % (i))
+ fixtures.append(fixture)
+ ids.append(self.service.create(self.context, fixture)['id'])
+
+ image_metas = self.service.index(self.context, marker=ids[1])
+ self.assertEquals(len(image_metas), 8)
+ i = 2
+ for meta in image_metas:
+ expected = {'id': 'DONTCARE',
+ 'name': 'TestImage %d' % (i)}
+ self.assertDictMatch(meta, expected)
+ i = i + 1
+
+ def test_index_limit(self):
+ fixtures = []
+ ids = []
+ for i in range(10):
+ fixture = self._make_fixture(name='TestImage %d' % (i))
+ fixtures.append(fixture)
+ ids.append(self.service.create(self.context, fixture)['id'])
+
+ image_metas = self.service.index(self.context, limit=5)
+ self.assertEquals(len(image_metas), 5)
+
+ def test_index_marker_and_limit(self):
+ fixtures = []
+ ids = []
+ for i in range(10):
+ fixture = self._make_fixture(name='TestImage %d' % (i))
+ fixtures.append(fixture)
+ ids.append(self.service.create(self.context, fixture)['id'])
+
+ image_metas = self.service.index(self.context, marker=ids[3], limit=1)
+ self.assertEquals(len(image_metas), 1)
+ i = 4
+ for meta in image_metas:
+ expected = {'id': ids[i],
+ 'name': 'TestImage %d' % (i)}
+ self.assertDictMatch(meta, expected)
+ i = i + 1
+
+ def test_detail_marker(self):
+ fixtures = []
+ ids = []
+ for i in range(10):
+ fixture = self._make_fixture(name='TestImage %d' % (i))
+ fixtures.append(fixture)
+ ids.append(self.service.create(self.context, fixture)['id'])
+
+ image_metas = self.service.detail(self.context, marker=ids[1])
+ self.assertEquals(len(image_metas), 8)
+ i = 2
+ for meta in image_metas:
+ expected = {
+ 'id': ids[i],
+ 'status': None,
+ 'is_public': None,
+ 'name': 'TestImage %d' % (i),
+ 'properties': {},
+ 'size': None,
+ 'location': None,
+ 'disk_format': None,
+ 'container_format': None,
+ 'checksum': None,
+ 'created_at': self.NOW_DATETIME,
+ 'updated_at': self.NOW_DATETIME,
+ 'deleted_at': None,
+ 'deleted': None
+ }
+
+ self.assertDictMatch(meta, expected)
+ i = i + 1
+
+ def test_detail_limit(self):
+ fixtures = []
+ ids = []
+ for i in range(10):
+ fixture = self._make_fixture(name='TestImage %d' % (i))
+ fixtures.append(fixture)
+ ids.append(self.service.create(self.context, fixture)['id'])
+
+ image_metas = self.service.detail(self.context, limit=5)
+ self.assertEquals(len(image_metas), 5)
+
+ def test_detail_marker_and_limit(self):
+ fixtures = []
+ ids = []
+ for i in range(10):
+ fixture = self._make_fixture(name='TestImage %d' % (i))
+ fixtures.append(fixture)
+ ids.append(self.service.create(self.context, fixture)['id'])
+
+ image_metas = self.service.detail(self.context, marker=ids[3], limit=5)
+ self.assertEquals(len(image_metas), 5)
+ i = 4
+ for meta in image_metas:
+ expected = {
+ 'id': ids[i],
+ 'status': None,
+ 'is_public': None,
+ 'name': 'TestImage %d' % (i),
+ 'properties': {},
+ 'size': None,
+ 'location': None,
+ 'disk_format': None,
+ 'container_format': None,
+ 'checksum': None,
+ 'created_at': self.NOW_DATETIME,
+ 'updated_at': self.NOW_DATETIME,
+ 'deleted_at': None,
+ 'deleted': None
+ }
+ self.assertDictMatch(meta, expected)
+ i = i + 1
+
+ def test_update(self):
+ fixture = self._make_fixture(name='test image')
+ image_id = self.service.create(self.context, fixture)['id']
+ fixture['name'] = 'new image name'
+ self.service.update(self.context, image_id, fixture)
+
+ new_image_data = self.service.show(self.context, image_id)
+ self.assertEquals('new image name', new_image_data['name'])
+
+ def test_delete(self):
+ fixture1 = self._make_fixture(name='test image 1')
+ fixture2 = self._make_fixture(name='test image 2')
+ fixtures = [fixture1, fixture2]
+
+ num_images = len(self.service.index(self.context))
+ self.assertEquals(0, num_images, str(self.service.index(self.context)))
+
+ ids = []
+ for fixture in fixtures:
+ new_id = self.service.create(self.context, fixture)['id']
+ ids.append(new_id)
+
+ num_images = len(self.service.index(self.context))
+ self.assertEquals(2, num_images, str(self.service.index(self.context)))
+
+ self.service.delete(self.context, ids[0])
+
+ num_images = len(self.service.index(self.context))
+ self.assertEquals(1, num_images)
+
+ def test_show_passes_through_to_client(self):
+ fixture = self._make_fixture(name='image1', is_public=True)
+ image_id = self.service.create(self.context, fixture)['id']
+
+ image_meta = self.service.show(self.context, image_id)
+ expected = {
+ 'id': image_id,
+ 'name': 'image1',
+ 'is_public': True,
+ 'size': None,
+ 'location': None,
+ 'disk_format': None,
+ 'container_format': None,
+ 'checksum': None,
+ 'created_at': self.NOW_DATETIME,
+ 'updated_at': self.NOW_DATETIME,
+ 'deleted_at': None,
+ 'deleted': None,
+ 'status': None,
+ 'properties': {},
+ }
+ self.assertEqual(image_meta, expected)
+
+ def test_show_raises_when_no_authtoken_in_the_context(self):
+ fixture = self._make_fixture(name='image1',
+ is_public=False,
+ properties={'one': 'two'})
+ image_id = self.service.create(self.context, fixture)['id']
+ self.context.auth_token = False
+ self.assertRaises(exception.ImageNotFound,
+ self.service.show,
+ self.context,
+ image_id)
+
+ def test_detail_passes_through_to_client(self):
+ fixture = self._make_fixture(name='image10', is_public=True)
+ image_id = self.service.create(self.context, fixture)['id']
+ image_metas = self.service.detail(self.context)
+ expected = [
+ {
+ 'id': image_id,
+ 'name': 'image10',
+ 'is_public': True,
+ 'size': None,
+ 'location': None,
+ 'disk_format': None,
+ 'container_format': None,
+ 'checksum': None,
+ 'created_at': self.NOW_DATETIME,
+ 'updated_at': self.NOW_DATETIME,
+ 'deleted_at': None,
+ 'deleted': None,
+ 'status': None,
+ 'properties': {},
+ },
+ ]
+ self.assertEqual(image_metas, expected)
+
+ def test_show_makes_datetimes(self):
+ fixture = self._make_datetime_fixture()
+ image_id = self.service.create(self.context, fixture)['id']
+ image_meta = self.service.show(self.context, image_id)
+ self.assertEqual(image_meta['created_at'], self.NOW_DATETIME)
+ self.assertEqual(image_meta['updated_at'], self.NOW_DATETIME)
+
+ def test_detail_makes_datetimes(self):
+ fixture = self._make_datetime_fixture()
+ self.service.create(self.context, fixture)
+ image_meta = self.service.detail(self.context)[0]
+ self.assertEqual(image_meta['created_at'], self.NOW_DATETIME)
+ self.assertEqual(image_meta['updated_at'], self.NOW_DATETIME)
+
+ def test_get_makes_datetimes(self):
+ fixture = self._make_datetime_fixture()
+ image_id = self.service.create(self.context, fixture)['id']
+ writer = NullWriter()
+ image_meta = self.service.get(self.context, image_id, writer)
+ self.assertEqual(image_meta['created_at'], self.NOW_DATETIME)
+ self.assertEqual(image_meta['updated_at'], self.NOW_DATETIME)
diff --git a/nova/tests/integrated/integrated_helpers.py b/nova/tests/integrated/integrated_helpers.py
index 343190427..49de9c854 100644
--- a/nova/tests/integrated/integrated_helpers.py
+++ b/nova/tests/integrated/integrated_helpers.py
@@ -64,7 +64,7 @@ class _IntegratedTestBase(test.TestCase):
self.flags(**f)
self.flags(verbose=True)
- def fake_get_image_service(image_href):
+ def fake_get_image_service(context, image_href):
image_id = int(str(image_href).split('/')[-1])
return (nova.image.fake.FakeImageService(), image_id)
self.stubs.Set(nova.image, 'get_image_service', fake_get_image_service)
diff --git a/nova/tests/integrated/test_xml.py b/nova/tests/integrated/test_xml.py
index 74baaacc2..cf013da1d 100644
--- a/nova/tests/integrated/test_xml.py
+++ b/nova/tests/integrated/test_xml.py
@@ -15,6 +15,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+from lxml import etree
+
from nova.log import logging
from nova.tests.integrated import integrated_helpers
from nova.api.openstack import common
@@ -34,9 +36,8 @@ class XmlTests(integrated_helpers._IntegratedTestBase):
response = self.api.api_request('/limits', headers=headers)
data = response.read()
LOG.debug("data: %s" % data)
-
- prefix = '<limits xmlns="%s"' % common.XML_NS_V11
- self.assertTrue(data.startswith(prefix))
+ root = etree.XML(data)
+ self.assertEqual(root.nsmap.get(None), common.XML_NS_V11)
def test_namespace_servers(self):
"""/servers should have v1.1 namespace (has changed in 1.1)."""
@@ -46,6 +47,5 @@ class XmlTests(integrated_helpers._IntegratedTestBase):
response = self.api.api_request('/servers', headers=headers)
data = response.read()
LOG.debug("data: %s" % data)
-
- prefix = '<servers xmlns="%s"' % common.XML_NS_V11
- self.assertTrue(data.startswith(prefix))
+ root = etree.XML(data)
+ self.assertEqual(root.nsmap.get(None), common.XML_NS_V11)
diff --git a/nova/tests/scheduler/test_abstract_scheduler.py b/nova/tests/scheduler/test_abstract_scheduler.py
index aa97e2344..5549ea453 100644
--- a/nova/tests/scheduler/test_abstract_scheduler.py
+++ b/nova/tests/scheduler/test_abstract_scheduler.py
@@ -26,6 +26,7 @@ from nova import test
from nova.compute import api as compute_api
from nova.scheduler import driver
from nova.scheduler import abstract_scheduler
+from nova.scheduler import base_scheduler
from nova.scheduler import zone_manager
@@ -65,6 +66,11 @@ class FakeAbstractScheduler(abstract_scheduler.AbstractScheduler):
pass
+class FakeBaseScheduler(base_scheduler.BaseScheduler):
+ # No need to stub anything at the moment
+ pass
+
+
class FakeZoneManager(zone_manager.ZoneManager):
def __init__(self):
self.service_states = {
@@ -365,3 +371,52 @@ class AbstractSchedulerTestCase(test.TestCase):
self.assertEqual(fixture._decrypt_blob(test_data),
json.dumps(test_data))
+
+ def test_empty_local_hosts(self):
+ """
+ Create a nested set of FakeZones, try to build multiple instances
+ and ensure that a select call returns the appropriate build plan.
+ """
+ sched = FakeAbstractScheduler()
+ self.stubs.Set(sched, '_call_zone_method', fake_call_zone_method)
+ self.stubs.Set(nova.db, 'zone_get_all', fake_zone_get_all)
+
+ zm = FakeZoneManager()
+ # patch this to have no local hosts
+ zm.service_states = {}
+ sched.set_zone_manager(zm)
+
+ fake_context = {}
+ build_plan = sched.select(fake_context,
+ {'instance_type': {'memory_mb': 512},
+ 'num_instances': 4})
+
+ # 0 from local zones, 12 from remotes
+ self.assertEqual(12, len(build_plan))
+
+
+class BaseSchedulerTestCase(test.TestCase):
+ """Test case for Base Scheduler."""
+
+ def test_weigh_hosts(self):
+ """
+ Try to weigh a short list of hosts and make sure enough
+ entries for a larger number instances are returned.
+ """
+
+ sched = FakeBaseScheduler()
+
+ # Fake out a list of hosts
+ zm = FakeZoneManager()
+ hostlist = [(host, services['compute'])
+ for host, services in zm.service_states.items()
+ if 'compute' in services]
+
+ # Call weigh_hosts()
+ num_instances = len(hostlist) * 2 + len(hostlist) / 2
+ instlist = sched.weigh_hosts('compute',
+ dict(num_instances=num_instances),
+ hostlist)
+
+ # Should be enough entries to cover all instances
+ self.assertEqual(len(instlist), num_instances)
diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py
index a52dd041a..890348192 100644
--- a/nova/tests/scheduler/test_scheduler.py
+++ b/nova/tests/scheduler/test_scheduler.py
@@ -963,9 +963,14 @@ class FakeZone(object):
self.password = password
+ZONE_API_URL1 = "http://1.example.com"
+ZONE_API_URL2 = "http://2.example.com"
+
+
def zone_get_all(context):
return [
- FakeZone(1, 'http://example.com', 'bob', 'xxx'),
+ FakeZone(1, ZONE_API_URL1, 'bob', 'xxx'),
+ FakeZone(2, ZONE_API_URL2, 'bob', 'xxx'),
]
@@ -1065,7 +1070,9 @@ class ZoneRedirectTest(test.TestCase):
def test_unmarshal_single_server(self):
decorator = api.reroute_compute("foo")
- self.assertEquals(decorator.unmarshall_result([]), {})
+ decorator.item_uuid = 'fake_uuid'
+ self.assertRaises(exception.InstanceNotFound,
+ decorator.unmarshall_result, [])
self.assertEquals(decorator.unmarshall_result(
[FakeResource(dict(a=1, b=2)), ]),
dict(server=dict(a=1, b=2)))
@@ -1079,6 +1086,90 @@ class ZoneRedirectTest(test.TestCase):
[FakeResource(dict(_a=1, manager=2)), ]),
dict(server={}))
+ def test_one_zone_down_no_instances(self):
+
+ def _fake_issue_novaclient_command(nova, zone, *args, **kwargs):
+ return None
+
+ class FakeNovaClientWithFailure(object):
+ def __init__(self, username, password, method, api_url):
+ self.api_url = api_url
+
+ def authenticate(self):
+ if self.api_url == ZONE_API_URL2:
+ raise novaclient_exceptions.BadRequest('foo')
+
+ self.stubs.Set(api, '_issue_novaclient_command',
+ _fake_issue_novaclient_command)
+ self.stubs.Set(api.novaclient, 'Client', FakeNovaClientWithFailure)
+
+ @api.reroute_compute("get")
+ def do_get(self, context, uuid):
+ pass
+
+ self.assertRaises(exception.ZoneRequestError,
+ do_get, None, {}, FAKE_UUID)
+
+ def test_one_zone_down_got_instance(self):
+
+ def _fake_issue_novaclient_command(nova, zone, *args, **kwargs):
+ class FakeServer(object):
+ def __init__(self):
+ self.id = FAKE_UUID
+ self.test = '1234'
+ return FakeServer()
+
+ class FakeNovaClientWithFailure(object):
+ def __init__(self, username, password, method, api_url):
+ self.api_url = api_url
+
+ def authenticate(self):
+ if self.api_url == ZONE_API_URL2:
+ raise novaclient_exceptions.BadRequest('foo')
+
+ self.stubs.Set(api, '_issue_novaclient_command',
+ _fake_issue_novaclient_command)
+ self.stubs.Set(api.novaclient, 'Client', FakeNovaClientWithFailure)
+
+ @api.reroute_compute("get")
+ def do_get(self, context, uuid):
+ pass
+
+ try:
+ do_get(None, {}, FAKE_UUID)
+ except api.RedirectResult, e:
+ results = e.results
+ self.assertIn('server', results)
+ self.assertEqual(results['server']['id'], FAKE_UUID)
+ self.assertEqual(results['server']['test'], '1234')
+ except Exception, e:
+ self.fail(_("RedirectResult should have been raised"))
+ else:
+ self.fail(_("RedirectResult should have been raised"))
+
+ def test_zones_up_no_instances(self):
+
+ def _fake_issue_novaclient_command(nova, zone, *args, **kwargs):
+ return None
+
+ class FakeNovaClientNoFailure(object):
+ def __init__(self, username, password, method, api_url):
+ pass
+
+ def authenticate(self):
+ return
+
+ self.stubs.Set(api, '_issue_novaclient_command',
+ _fake_issue_novaclient_command)
+ self.stubs.Set(api.novaclient, 'Client', FakeNovaClientNoFailure)
+
+ @api.reroute_compute("get")
+ def do_get(self, context, uuid):
+ pass
+
+ self.assertRaises(exception.InstanceNotFound,
+ do_get, None, {}, FAKE_UUID)
+
class FakeServerCollection(object):
def get(self, instance_id):
@@ -1097,7 +1188,7 @@ class FakeEmptyServerCollection(object):
class FakeNovaClient(object):
- def __init__(self, collection):
+ def __init__(self, collection, *args, **kwargs):
self.servers = collection
@@ -1162,8 +1253,9 @@ class CallZoneMethodTest(test.TestCase):
context = {}
method = 'do_something'
results = api.call_zone_method(context, method)
- expected = [(1, 42)]
- self.assertEqual(expected, results)
+ self.assertEqual(len(results), 2)
+ self.assertIn((1, 42), results)
+ self.assertIn((2, 42), results)
def test_call_zone_method_not_present(self):
context = {}
diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py
index 766a7da9b..4d463572b 100644
--- a/nova/tests/test_compute.py
+++ b/nova/tests/test_compute.py
@@ -161,6 +161,19 @@ class ComputeTestCase(test.TestCase):
db.security_group_destroy(self.context, group['id'])
db.instance_destroy(self.context, ref[0]['id'])
+ def test_create_instance_with_invalid_security_group_raises(self):
+ instance_type = instance_types.get_default_instance_type()
+
+ pre_build_len = len(db.instance_get_all(context.get_admin_context()))
+ self.assertRaises(exception.SecurityGroupNotFoundForProject,
+ self.compute_api.create,
+ self.context,
+ instance_type=instance_type,
+ image_href=None,
+ security_group=['this_is_a_fake_sec_group'])
+ self.assertEqual(pre_build_len,
+ len(db.instance_get_all(context.get_admin_context())))
+
def test_create_instance_associates_config_drive(self):
"""Make sure create associates a config drive."""
@@ -287,11 +300,20 @@ class ComputeTestCase(test.TestCase):
self.compute.resume_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
- def test_reboot(self):
- """Ensure instance can be rebooted"""
+ def test_soft_reboot(self):
+ """Ensure instance can be soft rebooted"""
+ instance_id = self._create_instance()
+ reboot_type = "SOFT"
+ self.compute.run_instance(self.context, instance_id)
+ self.compute.reboot_instance(self.context, instance_id, reboot_type)
+ self.compute.terminate_instance(self.context, instance_id)
+
+ def test_hard_reboot(self):
+ """Ensure instance can be hard rebooted"""
instance_id = self._create_instance()
+ reboot_type = "HARD"
self.compute.run_instance(self.context, instance_id)
- self.compute.reboot_instance(self.context, instance_id)
+ self.compute.reboot_instance(self.context, instance_id, reboot_type)
self.compute.terminate_instance(self.context, instance_id)
def test_set_admin_password(self):
diff --git a/nova/tests/test_direct.py b/nova/tests/test_direct.py
index 4ed0c2aa5..8d856dc4b 100644
--- a/nova/tests/test_direct.py
+++ b/nova/tests/test_direct.py
@@ -30,7 +30,7 @@ from nova import test
from nova import volume
from nova import utils
from nova.api import direct
-from nova.tests import test_cloud
+from nova.tests.api.ec2 import test_cloud
class ArbitraryObject(object):
diff --git a/nova/tests/test_libvirt.py b/nova/tests/test_libvirt.py
index 1254c3299..769bc5291 100644
--- a/nova/tests/test_libvirt.py
+++ b/nova/tests/test_libvirt.py
@@ -35,61 +35,47 @@ from nova import utils
from nova.api.ec2 import cloud
from nova.compute import power_state
from nova.compute import vm_states
+from nova.virt import driver
from nova.virt.libvirt import connection
from nova.virt.libvirt import firewall
+from nova.tests import fake_network
libvirt = None
FLAGS = flags.FLAGS
+_fake_network_info = fake_network.fake_get_instance_nw_info
+_ipv4_like = fake_network.ipv4_like
+
def _concurrency(wait, done, target):
wait.wait()
done.send()
-def _create_network_info(count=1, ipv6=None):
- if ipv6 is None:
- ipv6 = FLAGS.use_ipv6
- fake = 'fake'
- fake_ip = '10.11.12.13'
- fake_ip_2 = '0.0.0.1'
- fake_ip_3 = '0.0.0.1'
- fake_vlan = 100
- fake_bridge_interface = 'eth0'
- network = {'bridge': fake,
- 'cidr': fake_ip,
- 'cidr_v6': fake_ip,
- 'gateway_v6': fake,
- 'vlan': fake_vlan,
- 'bridge_interface': fake_bridge_interface}
- mapping = {'mac': fake,
- 'dhcp_server': '10.0.0.1',
- 'gateway': fake,
- 'gateway6': fake,
- 'ips': [{'ip': fake_ip}, {'ip': fake_ip}]}
- if ipv6:
- mapping['ip6s'] = [{'ip': fake_ip},
- {'ip': fake_ip_2},
- {'ip': fake_ip_3}]
- return [(network, mapping) for x in xrange(0, count)]
-
-
-def _setup_networking(instance_id, ip='1.2.3.4', mac='56:12:12:12:12:12'):
- ctxt = context.get_admin_context()
- network_ref = db.project_get_networks(ctxt,
- 'fake',
- associate=True)[0]
- vif = {'address': mac,
- 'network_id': network_ref['id'],
- 'instance_id': instance_id}
- vif_ref = db.virtual_interface_create(ctxt, vif)
-
- fixed_ip = {'address': ip,
- 'network_id': network_ref['id'],
- 'virtual_interface_id': vif_ref['id']}
- db.fixed_ip_create(ctxt, fixed_ip)
- db.fixed_ip_update(ctxt, ip, {'allocated': True,
- 'instance_id': instance_id})
+class FakeVirtDomain(object):
+
+ def __init__(self, fake_xml=None):
+ if fake_xml:
+ self._fake_dom_xml = fake_xml
+ else:
+ self._fake_dom_xml = """
+ <domain type='kvm'>
+ <devices>
+ <disk type='file'>
+ <source file='filename'/>
+ </disk>
+ </devices>
+ </domain>
+ """
+
+ def snapshotCreateXML(self, *args):
+ return None
+
+ def createWithFlags(self, launch_flags):
+ pass
+
+ def XMLDesc(self, *args):
+ return self._fake_dom_xml
class CacheConcurrencyTestCase(test.TestCase):
@@ -163,7 +149,6 @@ class LibvirtConnTestCase(test.TestCase):
self.context = context.get_admin_context()
self.flags(instances_path='')
self.call_libvirt_dependant_setup = False
- self.test_ip = '10.11.12.13'
test_instance = {'memory_kb': '1024000',
'basepath': '/some/path',
@@ -194,70 +179,24 @@ class LibvirtConnTestCase(test.TestCase):
# A fake libvirt.virConnect
class FakeLibvirtConnection(object):
- pass
-
- # A fake connection.IptablesFirewallDriver
- class FakeIptablesFirewallDriver(object):
-
- def __init__(self, **kwargs):
- pass
-
- def setattr(self, key, val):
- self.__setattr__(key, val)
-
- # A fake VIF driver
- class FakeVIFDriver(object):
-
- def __init__(self, **kwargs):
- pass
-
- def setattr(self, key, val):
- self.__setattr__(key, val)
-
- def plug(self, instance, network, mapping):
- return {
- 'id': 'fake',
- 'bridge_name': 'fake',
- 'mac_address': 'fake',
- 'ip_address': 'fake',
- 'dhcp_server': 'fake',
- 'extra_params': 'fake',
- }
+ def defineXML(self, xml):
+ return FakeVirtDomain()
# Creating mocks
fake = FakeLibvirtConnection()
- fakeip = FakeIptablesFirewallDriver
- fakevif = FakeVIFDriver()
# Customizing above fake if necessary
for key, val in kwargs.items():
fake.__setattr__(key, val)
- # Inevitable mocks for connection.LibvirtConnection
- self.mox.StubOutWithMock(connection.utils, 'import_class')
- connection.utils.import_class(mox.IgnoreArg()).AndReturn(fakeip)
- self.mox.StubOutWithMock(connection.utils, 'import_object')
- connection.utils.import_object(mox.IgnoreArg()).AndReturn(fakevif)
+ self.flags(image_service='nova.image.fake.FakeImageService')
+ fw_driver = "nova.tests.fake_network.FakeIptablesFirewallDriver"
+ self.flags(firewall_driver=fw_driver)
+ self.flags(libvirt_vif_driver="nova.tests.fake_network.FakeVIFDriver")
+
self.mox.StubOutWithMock(connection.LibvirtConnection, '_conn')
connection.LibvirtConnection._conn = fake
def fake_lookup(self, instance_name):
-
- class FakeVirtDomain(object):
-
- def snapshotCreateXML(self, *args):
- return None
-
- def XMLDesc(self, *args):
- return """
- <domain type='kvm'>
- <devices>
- <disk type='file'>
- <source file='filename'/>
- </disk>
- </devices>
- </domain>
- """
-
return FakeVirtDomain()
def fake_execute(self, *args):
@@ -277,12 +216,12 @@ class LibvirtConnTestCase(test.TestCase):
instance_ref = db.instance_create(self.context, self.test_instance)
result = conn._prepare_xml_info(instance_ref,
- _create_network_info(),
+ _fake_network_info(self.stubs, 1),
False)
self.assertTrue(len(result['nics']) == 1)
result = conn._prepare_xml_info(instance_ref,
- _create_network_info(2),
+ _fake_network_info(self.stubs, 2),
False)
self.assertTrue(len(result['nics']) == 2)
@@ -407,7 +346,7 @@ class LibvirtConnTestCase(test.TestCase):
def test_multi_nic(self):
instance_data = dict(self.test_instance)
- network_info = _create_network_info(2)
+ network_info = _fake_network_info(self.stubs, 2)
conn = connection.LibvirtConnection(True)
instance_ref = db.instance_create(self.context, instance_data)
xml = conn.to_xml(instance_ref, network_info, False)
@@ -417,15 +356,14 @@ class LibvirtConnTestCase(test.TestCase):
parameters = interfaces[0].findall('./filterref/parameter')
self.assertEquals(interfaces[0].get('type'), 'bridge')
self.assertEquals(parameters[0].get('name'), 'IP')
- self.assertEquals(parameters[0].get('value'), '10.11.12.13')
+ self.assertTrue(_ipv4_like(parameters[0].get('value'), '192.168'))
self.assertEquals(parameters[1].get('name'), 'DHCPSERVER')
- self.assertEquals(parameters[1].get('value'), '10.0.0.1')
+ self.assertTrue(_ipv4_like(parameters[1].get('value'), '192.168.*.1'))
def _check_xml_and_container(self, instance):
user_context = context.RequestContext(self.user_id,
self.project_id)
instance_ref = db.instance_create(user_context, instance)
- _setup_networking(instance_ref['id'], self.test_ip)
self.flags(libvirt_type='lxc')
conn = connection.LibvirtConnection(True)
@@ -433,7 +371,7 @@ class LibvirtConnTestCase(test.TestCase):
uri = conn.get_uri()
self.assertEquals(uri, 'lxc:///')
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
xml = conn.to_xml(instance_ref, network_info)
tree = xml_to_tree(xml)
@@ -457,8 +395,6 @@ class LibvirtConnTestCase(test.TestCase):
network_ref = db.project_get_networks(context.get_admin_context(),
self.project_id)[0]
- _setup_networking(instance_ref['id'], self.test_ip)
-
type_uri_map = {'qemu': ('qemu:///system',
[(lambda t: t.find('.').get('type'), 'qemu'),
(lambda t: t.find('./os/type').text, 'hvm'),
@@ -504,9 +440,11 @@ class LibvirtConnTestCase(test.TestCase):
common_checks = [
(lambda t: t.find('.').tag, 'domain'),
(lambda t: t.find(parameter).get('name'), 'IP'),
- (lambda t: t.find(parameter).get('value'), '10.11.12.13'),
+ (lambda t: _ipv4_like(t.find(parameter).get('value'), '192.168'),
+ True),
(lambda t: t.findall(parameter)[1].get('name'), 'DHCPSERVER'),
- (lambda t: t.findall(parameter)[1].get('value'), '10.0.0.1'),
+ (lambda t: _ipv4_like(t.findall(parameter)[1].get('value'),
+ '192.168.*.1'), True),
(lambda t: t.find('./devices/serial/source').get(
'path').split('/')[1], 'console.log'),
(lambda t: t.find('./memory').text, '2097152')]
@@ -531,7 +469,7 @@ class LibvirtConnTestCase(test.TestCase):
uri = conn.get_uri()
self.assertEquals(uri, expected_uri)
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
xml = conn.to_xml(instance_ref, network_info, rescue)
tree = xml_to_tree(xml)
for i, (check, expected_result) in enumerate(checks):
@@ -646,7 +584,7 @@ class LibvirtConnTestCase(test.TestCase):
self.create_fake_libvirt_mock()
instance_ref = db.instance_create(self.context, self.test_instance)
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
# Start test
self.mox.ReplayAll()
@@ -743,7 +681,7 @@ class LibvirtConnTestCase(test.TestCase):
# qemu-img should be mockd since test environment might not have
# large disk space.
self.mox.StubOutWithMock(utils, "execute")
- utils.execute('sudo', 'qemu-img', 'create', '-f', 'raw',
+ utils.execute('qemu-img', 'create', '-f', 'raw',
'%s/%s/disk' % (tmpdir, instance_ref.name), '10G')
self.mox.ReplayAll()
@@ -795,7 +733,7 @@ class LibvirtConnTestCase(test.TestCase):
os.path.getsize("/test/disk").AndReturn(10 * 1024 * 1024 * 1024)
# another is qcow image, so qemu-img should be mocked.
self.mox.StubOutWithMock(utils, "execute")
- utils.execute('sudo', 'qemu-img', 'info', '/test/disk.local').\
+ utils.execute('qemu-img', 'info', '/test/disk.local').\
AndReturn((ret, ''))
self.mox.ReplayAll()
@@ -830,7 +768,7 @@ class LibvirtConnTestCase(test.TestCase):
conn.firewall_driver.setattr('setup_basic_filtering', fake_none)
conn.firewall_driver.setattr('prepare_instance_filter', fake_none)
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
try:
conn.spawn(self.context, instance, network_info)
@@ -840,8 +778,6 @@ class LibvirtConnTestCase(test.TestCase):
shutil.rmtree(os.path.join(FLAGS.instances_path, instance.name))
shutil.rmtree(os.path.join(FLAGS.instances_path, '_base'))
- self.assertTrue(count)
-
def test_get_host_ip_addr(self):
conn = connection.LibvirtConnection(False)
ip = conn.get_host_ip_addr()
@@ -883,6 +819,50 @@ class LibvirtConnTestCase(test.TestCase):
_assert_volume_in_mapping('sdg', False)
_assert_volume_in_mapping('sdh1', False)
+ def test_reboot_signature(self):
+ """Test that libvirt driver method sig matches interface"""
+ def fake_reboot_with_correct_sig(ignore, instance,
+ network_info, reboot_type):
+ pass
+
+ def fake_destroy(instance, network_info, cleanup=False):
+ pass
+
+ def fake_plug_vifs(instance, network_info):
+ pass
+
+ def fake_create_new_domain(xml):
+ return
+
+ def fake_none(self, instance):
+ return
+
+ instance = db.instance_create(self.context, self.test_instance)
+ network_info = _fake_network_info(self.stubs, 1)
+
+ self.mox.StubOutWithMock(connection.LibvirtConnection, '_conn')
+ connection.LibvirtConnection._conn.lookupByName = self.fake_lookup
+
+ conn = connection.LibvirtConnection(False)
+ self.stubs.Set(conn, 'destroy', fake_destroy)
+ self.stubs.Set(conn, 'plug_vifs', fake_plug_vifs)
+ self.stubs.Set(conn.firewall_driver,
+ 'setup_basic_filtering',
+ fake_none)
+ self.stubs.Set(conn.firewall_driver,
+ 'prepare_instance_filter',
+ fake_none)
+ self.stubs.Set(conn, '_create_new_domain', fake_create_new_domain)
+ self.stubs.Set(conn.firewall_driver,
+ 'apply_instance_filter',
+ fake_none)
+
+ args = [instance, network_info, 'SOFT']
+ conn.reboot(*args)
+
+ compute_driver = driver.ComputeDriver()
+ self.assertRaises(NotImplementedError, compute_driver.reboot, *args)
+
class NWFilterFakes:
def __init__(self):
@@ -923,7 +903,6 @@ class IptablesFirewallTestCase(test.TestCase):
"""setup_basic_rules in nwfilter calls this."""
pass
self.fake_libvirt_connection = FakeLibvirtConnection()
- self.test_ip = '10.11.12.13'
self.fw = firewall.IptablesFirewallDriver(
get_connection=lambda: self.fake_libvirt_connection)
@@ -987,10 +966,6 @@ class IptablesFirewallTestCase(test.TestCase):
def test_static_filters(self):
instance_ref = self._create_instance_ref()
src_instance_ref = self._create_instance_ref()
- src_ip = '10.11.12.14'
- src_mac = '56:12:12:12:12:13'
- _setup_networking(instance_ref['id'], self.test_ip, src_mac)
- _setup_networking(src_instance_ref['id'], src_ip)
admin_ctxt = context.get_admin_context()
secgroup = db.security_group_create(admin_ctxt,
@@ -1065,10 +1040,17 @@ class IptablesFirewallTestCase(test.TestCase):
return '', ''
print cmd, kwargs
+ def get_fixed_ips(*args, **kwargs):
+ ips = []
+ for network, info in network_info:
+ ips.extend(info['ips'])
+ return [ip['ip'] for ip in ips]
+
from nova.network import linux_net
linux_net.iptables_manager.execute = fake_iptables_execute
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
+ self.stubs.Set(db, 'instance_get_fixed_addresses', get_fixed_ips)
self.fw.prepare_instance_filter(instance_ref, network_info)
self.fw.apply_instance_filter(instance_ref, network_info)
@@ -1082,7 +1064,8 @@ class IptablesFirewallTestCase(test.TestCase):
instance_chain = None
for rule in self.out_rules:
# This is pretty crude, but it'll do for now
- if '-d 10.11.12.13 -j' in rule:
+ # last two octets change
+ if re.search('-d 192.168.[0-9]{1,3}.[0-9]{1,3} -j', rule):
instance_chain = rule.split(' ')[-1]
break
self.assertTrue(instance_chain, "The instance chain wasn't added")
@@ -1105,10 +1088,11 @@ class IptablesFirewallTestCase(test.TestCase):
self.assertTrue(len(filter(regex.match, self.out_rules)) > 0,
"ICMP Echo Request acceptance rule wasn't added")
- regex = re.compile('-A .* -j ACCEPT -p tcp -m multiport '
- '--dports 80:81 -s %s' % (src_ip,))
- self.assertTrue(len(filter(regex.match, self.out_rules)) > 0,
- "TCP port 80/81 acceptance rule wasn't added")
+ for ip in get_fixed_ips():
+ regex = re.compile('-A .* -j ACCEPT -p tcp -m multiport '
+ '--dports 80:81 -s %s' % ip)
+ self.assertTrue(len(filter(regex.match, self.out_rules)) > 0,
+ "TCP port 80/81 acceptance rule wasn't added")
regex = re.compile('-A .* -j ACCEPT -s %s' % (src_ip,))
self.assertTrue(len(filter(regex.match, self.out_rules)) > 0,
@@ -1122,24 +1106,27 @@ class IptablesFirewallTestCase(test.TestCase):
def test_filters_for_instance_with_ip_v6(self):
self.flags(use_ipv6=True)
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
rulesv4, rulesv6 = self.fw._filters_for_instance("fake", network_info)
self.assertEquals(len(rulesv4), 2)
- self.assertEquals(len(rulesv6), 3)
+ self.assertEquals(len(rulesv6), 1)
def test_filters_for_instance_without_ip_v6(self):
self.flags(use_ipv6=False)
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
rulesv4, rulesv6 = self.fw._filters_for_instance("fake", network_info)
self.assertEquals(len(rulesv4), 2)
self.assertEquals(len(rulesv6), 0)
def test_multinic_iptables(self):
- ipv4_rules_per_network = 2
- ipv6_rules_per_network = 3
+ ipv4_rules_per_addr = 1
+ ipv4_addr_per_network = 2
+ ipv6_rules_per_addr = 1
+ ipv6_addr_per_network = 1
networks_count = 5
instance_ref = self._create_instance_ref()
- network_info = _create_network_info(networks_count)
+ network_info = _fake_network_info(self.stubs, networks_count,
+ ipv4_addr_per_network)
ipv4_len = len(self.fw.iptables.ipv4['filter'].rules)
ipv6_len = len(self.fw.iptables.ipv6['filter'].rules)
inst_ipv4, inst_ipv6 = self.fw.instance_rules(instance_ref,
@@ -1150,9 +1137,9 @@ class IptablesFirewallTestCase(test.TestCase):
ipv4_network_rules = len(ipv4) - len(inst_ipv4) - ipv4_len
ipv6_network_rules = len(ipv6) - len(inst_ipv6) - ipv6_len
self.assertEquals(ipv4_network_rules,
- ipv4_rules_per_network * networks_count)
+ ipv4_rules_per_addr * ipv4_addr_per_network * networks_count)
self.assertEquals(ipv6_network_rules,
- ipv6_rules_per_network * networks_count)
+ ipv6_rules_per_addr * ipv6_addr_per_network * networks_count)
def test_do_refresh_security_group_rules(self):
instance_ref = self._create_instance_ref()
@@ -1178,8 +1165,7 @@ class IptablesFirewallTestCase(test.TestCase):
fakefilter.nwfilterLookupByName
instance_ref = self._create_instance_ref()
- _setup_networking(instance_ref['id'], self.test_ip)
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
self.fw.setup_basic_filtering(instance_ref, network_info)
self.fw.prepare_instance_filter(instance_ref, network_info)
self.fw.apply_instance_filter(instance_ref, network_info)
@@ -1194,13 +1180,12 @@ class IptablesFirewallTestCase(test.TestCase):
def test_provider_firewall_rules(self):
# setup basic instance data
instance_ref = self._create_instance_ref()
- _setup_networking(instance_ref['id'], self.test_ip)
# FRAGILE: peeks at how the firewall names chains
chain_name = 'inst-%s' % instance_ref['id']
# create a firewall via setup_basic_filtering like libvirt_conn.spawn
# should have a chain with 0 rules
- network_info = _create_network_info(1)
+ network_info = _fake_network_info(self.stubs, 1)
self.fw.setup_basic_filtering(instance_ref, network_info)
self.assertTrue('provider' in self.fw.iptables.ipv4['filter'].chains)
rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules
@@ -1264,7 +1249,6 @@ class NWFilterTestCase(test.TestCase):
self.fake_libvirt_connection = Mock()
- self.test_ip = '10.11.12.13'
self.fw = firewall.NWFilterFirewall(
lambda: self.fake_libvirt_connection)
@@ -1380,11 +1364,9 @@ class NWFilterTestCase(test.TestCase):
instance_ref = self._create_instance()
inst_id = instance_ref['id']
- _setup_networking(instance_ref['id'], self.test_ip)
-
- def _ensure_all_called():
+ def _ensure_all_called(mac):
instance_filter = 'nova-instance-%s-%s' % (instance_ref['name'],
- 'fake')
+ mac.translate(None, ':'))
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
'no-arp-spoofing', 'no-ip-spoofing',
@@ -1400,17 +1382,22 @@ class NWFilterTestCase(test.TestCase):
self.security_group.id)
instance = db.instance_get(self.context, inst_id)
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
+ # since there is one (network_info) there is one vif
+ # pass this vif's mac to _ensure_all_called()
+ # to set the instance_filter properly
+ mac = network_info[0][1]['mac']
+
self.fw.setup_basic_filtering(instance, network_info)
self.fw.prepare_instance_filter(instance, network_info)
self.fw.apply_instance_filter(instance, network_info)
- _ensure_all_called()
+ _ensure_all_called(mac)
self.teardown_security_group()
db.instance_destroy(context.get_admin_context(), instance_ref['id'])
def test_create_network_filters(self):
instance_ref = self._create_instance()
- network_info = _create_network_info(3)
+ network_info = _fake_network_info(self.stubs, 3)
result = self.fw._create_network_filters(instance_ref,
network_info,
"fake")
@@ -1433,8 +1420,7 @@ class NWFilterTestCase(test.TestCase):
instance = db.instance_get(self.context, inst_id)
- _setup_networking(instance_ref['id'], self.test_ip)
- network_info = _create_network_info()
+ network_info = _fake_network_info(self.stubs, 1)
self.fw.setup_basic_filtering(instance, network_info)
self.fw.prepare_instance_filter(instance, network_info)
self.fw.apply_instance_filter(instance, network_info)
diff --git a/nova/tests/test_linux_net.py b/nova/tests/test_linux_net.py
new file mode 100755
index 000000000..99577b88e
--- /dev/null
+++ b/nova/tests/test_linux_net.py
@@ -0,0 +1,347 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 NTT
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova import test
+from nova import utils
+from nova.network import manager as network_manager
+from nova.network import linux_net
+
+import mox
+
+FLAGS = flags.FLAGS
+
+LOG = logging.getLogger('nova.tests.network')
+
+
+HOST = "testhost"
+
+instances = [{'id': 0,
+ 'host': 'fake_instance00',
+ 'hostname': 'fake_instance00'},
+ {'id': 1,
+ 'host': 'fake_instance01',
+ 'hostname': 'fake_instance01'}]
+
+
+addresses = [{"address": "10.0.0.1"},
+ {"address": "10.0.0.2"},
+ {"address": "10.0.0.3"},
+ {"address": "10.0.0.4"},
+ {"address": "10.0.0.5"},
+ {"address": "10.0.0.6"}]
+
+
+networks = [{'id': 0,
+ 'uuid': "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
+ 'label': 'test0',
+ 'injected': False,
+ 'multi_host': False,
+ 'cidr': '192.168.0.0/24',
+ 'cidr_v6': '2001:db8::/64',
+ 'gateway_v6': '2001:db8::1',
+ 'netmask_v6': '64',
+ 'netmask': '255.255.255.0',
+ 'bridge': 'fa0',
+ 'bridge_interface': 'fake_fa0',
+ 'gateway': '192.168.0.1',
+ 'broadcast': '192.168.0.255',
+ 'dns1': '192.168.0.1',
+ 'dns2': '192.168.0.2',
+ 'dhcp_server': '0.0.0.0',
+ 'dhcp_start': '192.168.100.1',
+ 'vlan': None,
+ 'host': None,
+ 'project_id': 'fake_project',
+ 'vpn_public_address': '192.168.0.2'},
+ {'id': 1,
+ 'uuid': "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb",
+ 'label': 'test1',
+ 'injected': False,
+ 'multi_host': False,
+ 'cidr': '192.168.1.0/24',
+ 'cidr_v6': '2001:db9::/64',
+ 'gateway_v6': '2001:db9::1',
+ 'netmask_v6': '64',
+ 'netmask': '255.255.255.0',
+ 'bridge': 'fa1',
+ 'bridge_interface': 'fake_fa1',
+ 'gateway': '192.168.1.1',
+ 'broadcast': '192.168.1.255',
+ 'dns1': '192.168.0.1',
+ 'dns2': '192.168.0.2',
+ 'dhcp_server': '0.0.0.0',
+ 'dhcp_start': '192.168.100.1',
+ 'vlan': None,
+ 'host': None,
+ 'project_id': 'fake_project',
+ 'vpn_public_address': '192.168.1.2'}]
+
+
+fixed_ips = [{'id': 0,
+ 'network_id': 0,
+ 'address': '192.168.0.100',
+ 'instance_id': 0,
+ 'allocated': True,
+ 'virtual_interface_id': 0,
+ 'virtual_interface': addresses[0],
+ 'instance': instances[0],
+ 'floating_ips': []},
+ {'id': 1,
+ 'network_id': 1,
+ 'address': '192.168.1.100',
+ 'instance_id': 0,
+ 'allocated': True,
+ 'virtual_interface_id': 1,
+ 'virtual_interface': addresses[1],
+ 'instance': instances[0],
+ 'floating_ips': []},
+ {'id': 2,
+ 'network_id': 1,
+ 'address': '192.168.0.101',
+ 'instance_id': 1,
+ 'allocated': True,
+ 'virtual_interface_id': 2,
+ 'virtual_interface': addresses[2],
+ 'instance': instances[1],
+ 'floating_ips': []},
+ {'id': 3,
+ 'network_id': 0,
+ 'address': '192.168.1.101',
+ 'instance_id': 1,
+ 'allocated': True,
+ 'virtual_interface_id': 3,
+ 'virtual_interface': addresses[3],
+ 'instance': instances[1],
+ 'floating_ips': []},
+ {'id': 4,
+ 'network_id': 0,
+ 'address': '192.168.0.102',
+ 'instance_id': 0,
+ 'allocated': True,
+ 'virtual_interface_id': 4,
+ 'virtual_interface': addresses[4],
+ 'instance': instances[0],
+ 'floating_ips': []},
+ {'id': 5,
+ 'network_id': 1,
+ 'address': '192.168.1.102',
+ 'instance_id': 1,
+ 'allocated': True,
+ 'virtual_interface_id': 5,
+ 'virtual_interface': addresses[5],
+ 'instance': instances[1],
+ 'floating_ips': []}]
+
+
+vifs = [{'id': 0,
+ 'address': 'DE:AD:BE:EF:00:00',
+ 'uuid': '00000000-0000-0000-0000-0000000000000000',
+ 'network_id': 0,
+ 'network': networks[0],
+ 'instance_id': 0},
+ {'id': 1,
+ 'address': 'DE:AD:BE:EF:00:01',
+ 'uuid': '00000000-0000-0000-0000-0000000000000001',
+ 'network_id': 1,
+ 'network': networks[1],
+ 'instance_id': 0},
+ {'id': 2,
+ 'address': 'DE:AD:BE:EF:00:02',
+ 'uuid': '00000000-0000-0000-0000-0000000000000002',
+ 'network_id': 1,
+ 'network': networks[1],
+ 'instance_id': 1},
+ {'id': 3,
+ 'address': 'DE:AD:BE:EF:00:03',
+ 'uuid': '00000000-0000-0000-0000-0000000000000003',
+ 'network_id': 0,
+ 'network': networks[0],
+ 'instance_id': 1},
+ {'id': 4,
+ 'address': 'DE:AD:BE:EF:00:04',
+ 'uuid': '00000000-0000-0000-0000-0000000000000004',
+ 'network_id': 0,
+ 'network': networks[0],
+ 'instance_id': 0},
+ {'id': 5,
+ 'address': 'DE:AD:BE:EF:00:05',
+ 'uuid': '00000000-0000-0000-0000-0000000000000005',
+ 'network_id': 1,
+ 'network': networks[1],
+ 'instance_id': 1}]
+
+
+class LinuxNetworkTestCase(test.TestCase):
+
+ def setUp(self):
+ super(LinuxNetworkTestCase, self).setUp()
+ network_driver = FLAGS.network_driver
+ self.driver = utils.import_object(network_driver)
+ self.driver.db = db
+
+ def test_update_dhcp_for_nw00(self):
+ self.flags(use_single_default_gateway=True)
+ self.mox.StubOutWithMock(db, 'network_get_associated_fixed_ips')
+ self.mox.StubOutWithMock(db, 'virtual_interface_get_by_instance')
+
+ db.network_get_associated_fixed_ips(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([fixed_ips[0],
+ fixed_ips[3]])
+
+ db.network_get_associated_fixed_ips(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([fixed_ips[0],
+ fixed_ips[3]])
+ db.virtual_interface_get_by_instance(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([vifs[0], vifs[1]])
+ db.virtual_interface_get_by_instance(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([vifs[2], vifs[3]])
+ self.mox.ReplayAll()
+
+ self.driver.update_dhcp(None, "eth0", networks[0])
+
+ def test_update_dhcp_for_nw01(self):
+ self.flags(use_single_default_gateway=True)
+ self.mox.StubOutWithMock(db, 'network_get_associated_fixed_ips')
+ self.mox.StubOutWithMock(db, 'virtual_interface_get_by_instance')
+
+ db.network_get_associated_fixed_ips(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([fixed_ips[1],
+ fixed_ips[2]])
+
+ db.network_get_associated_fixed_ips(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([fixed_ips[1],
+ fixed_ips[2]])
+ db.virtual_interface_get_by_instance(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([vifs[0], vifs[1]])
+ db.virtual_interface_get_by_instance(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([vifs[2], vifs[3]])
+ self.mox.ReplayAll()
+
+ self.driver.update_dhcp(None, "eth0", networks[0])
+
+ def test_get_dhcp_hosts_for_nw00(self):
+ self.flags(use_single_default_gateway=True)
+ self.mox.StubOutWithMock(db, 'network_get_associated_fixed_ips')
+
+ db.network_get_associated_fixed_ips(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([fixed_ips[0],
+ fixed_ips[3]])
+ self.mox.ReplayAll()
+
+ expected = \
+ "10.0.0.1,fake_instance00.novalocal,"\
+ "192.168.0.100,net:NW-i00000000-0\n"\
+ "10.0.0.4,fake_instance01.novalocal,"\
+ "192.168.1.101,net:NW-i00000001-0"
+ actual_hosts = self.driver.get_dhcp_hosts(None, networks[1])
+
+ self.assertEquals(actual_hosts, expected)
+
+ def test_get_dhcp_hosts_for_nw01(self):
+ self.flags(use_single_default_gateway=True)
+ self.mox.StubOutWithMock(db, 'network_get_associated_fixed_ips')
+
+ db.network_get_associated_fixed_ips(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([fixed_ips[1],
+ fixed_ips[2]])
+ self.mox.ReplayAll()
+
+ expected = \
+ "10.0.0.2,fake_instance00.novalocal,"\
+ "192.168.1.100,net:NW-i00000000-1\n"\
+ "10.0.0.3,fake_instance01.novalocal,"\
+ "192.168.0.101,net:NW-i00000001-1"
+ actual_hosts = self.driver.get_dhcp_hosts(None, networks[0])
+
+ self.assertEquals(actual_hosts, expected)
+
+ def test_get_dhcp_opts_for_nw00(self):
+ self.mox.StubOutWithMock(db, 'network_get_associated_fixed_ips')
+ self.mox.StubOutWithMock(db, 'virtual_interface_get_by_instance')
+
+ db.network_get_associated_fixed_ips(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([fixed_ips[0],
+ fixed_ips[3],
+ fixed_ips[4]])
+ db.virtual_interface_get_by_instance(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([vifs[0],
+ vifs[1],
+ vifs[4]])
+ db.virtual_interface_get_by_instance(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([vifs[2],
+ vifs[3],
+ vifs[5]])
+ self.mox.ReplayAll()
+
+ expected_opts = 'NW-i00000001-0,3'
+ actual_opts = self.driver.get_dhcp_opts(None, networks[0])
+
+ self.assertEquals(actual_opts, expected_opts)
+
+ def test_get_dhcp_opts_for_nw01(self):
+ self.mox.StubOutWithMock(db, 'network_get_associated_fixed_ips')
+ self.mox.StubOutWithMock(db, 'virtual_interface_get_by_instance')
+
+ db.network_get_associated_fixed_ips(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([fixed_ips[1],
+ fixed_ips[2],
+ fixed_ips[5]])
+ db.virtual_interface_get_by_instance(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([vifs[0],
+ vifs[1],
+ vifs[4]])
+ db.virtual_interface_get_by_instance(mox.IgnoreArg(),
+ mox.IgnoreArg())\
+ .AndReturn([vifs[2],
+ vifs[3],
+ vifs[5]])
+ self.mox.ReplayAll()
+
+ expected_opts = "NW-i00000000-1,3"
+ actual_opts = self.driver.get_dhcp_opts(None, networks[1])
+
+ self.assertEquals(actual_opts, expected_opts)
+
+ def test_dhcp_opts_not_default_gateway_network(self):
+ expected = "NW-i00000000-0,3"
+ actual = self.driver._host_dhcp_opts(fixed_ips[0])
+ self.assertEquals(actual, expected)
+
+ def test_host_dhcp_without_default_gateway_network(self):
+ expected = ("10.0.0.1,fake_instance00.novalocal,192.168.0.100")
+ actual = self.driver._host_dhcp(fixed_ips[0])
+ self.assertEquals(actual, expected)
diff --git a/nova/tests/test_network.py b/nova/tests/test_network.py
index 25ff940f0..926ea065a 100644
--- a/nova/tests/test_network.py
+++ b/nova/tests/test_network.py
@@ -14,6 +14,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import mox
from nova import context
from nova import db
@@ -21,9 +22,7 @@ from nova import exception
from nova import log as logging
from nova import test
from nova.network import manager as network_manager
-
-
-import mox
+from nova.tests import fake_network
LOG = logging.getLogger('nova.tests.network')
@@ -58,7 +57,7 @@ networks = [{'id': 0,
'dns1': '192.168.0.1',
'dns2': '192.168.0.2',
'vlan': None,
- 'host': None,
+ 'host': HOST,
'project_id': 'fake_project',
'vpn_public_address': '192.168.0.2'},
{'id': 1,
@@ -78,7 +77,7 @@ networks = [{'id': 0,
'dns1': '192.168.0.1',
'dns2': '192.168.0.2',
'vlan': None,
- 'host': None,
+ 'host': HOST,
'project_id': 'fake_project',
'vpn_public_address': '192.168.1.2'}]
@@ -118,9 +117,14 @@ vifs = [{'id': 0,
{'id': 1,
'address': 'DE:AD:BE:EF:00:01',
'uuid': '00000000-0000-0000-0000-0000000000000001',
- 'network_id': 0,
'network_id': 1,
'network': FakeModel(**networks[1]),
+ 'instance_id': 0},
+ {'id': 2,
+ 'address': 'DE:AD:BE:EF:00:02',
+ 'uuid': '00000000-0000-0000-0000-0000000000000002',
+ 'network_id': 2,
+ 'network': None,
'instance_id': 0}]
@@ -133,60 +137,50 @@ class FlatNetworkTestCase(test.TestCase):
is_admin=False)
def test_get_instance_nw_info(self):
- self.mox.StubOutWithMock(db, 'fixed_ip_get_by_instance')
- self.mox.StubOutWithMock(db, 'virtual_interface_get_by_instance')
- self.mox.StubOutWithMock(db, 'instance_type_get')
-
- db.fixed_ip_get_by_instance(mox.IgnoreArg(),
- mox.IgnoreArg()).AndReturn(fixed_ips)
- db.virtual_interface_get_by_instance(mox.IgnoreArg(),
- mox.IgnoreArg()).AndReturn(vifs)
- db.instance_type_get(mox.IgnoreArg(),
- mox.IgnoreArg()).AndReturn(flavor)
- self.mox.ReplayAll()
-
- nw_info = self.network.get_instance_nw_info(None, 0, 0, None)
+ fake_get_instance_nw_info = fake_network.fake_get_instance_nw_info
- self.assertTrue(nw_info)
+ nw_info = fake_get_instance_nw_info(self.stubs, 0, 2)
+ self.assertFalse(nw_info)
- for i, nw in enumerate(nw_info):
- i8 = i + 8
- check = {'bridge': 'fa%s' % i,
+ for i, (nw, info) in enumerate(nw_info):
+ check = {'bridge': 'fake_br%d' % i,
'cidr': '192.168.%s.0/24' % i,
- 'cidr_v6': '2001:db%s::/64' % i8,
+ 'cidr_v6': '2001:db8:0:%x::/64' % i,
'id': i,
'multi_host': False,
- 'injected': 'DONTCARE',
- 'bridge_interface': 'fake_fa%s' % i,
+ 'injected': False,
+ 'bridge_interface': 'fake_eth%d' % i,
'vlan': None}
- self.assertDictMatch(nw[0], check)
+ self.assertDictMatch(nw, check)
- check = {'broadcast': '192.168.%s.255' % i,
- 'dhcp_server': '192.168.%s.1' % i,
- 'dns': 'DONTCARE',
- 'gateway': '192.168.%s.1' % i,
- 'gateway6': '2001:db%s::1' % i8,
+ check = {'broadcast': '192.168.%d.255' % i,
+ 'dhcp_server': '192.168.%d.1' % i,
+ 'dns': ['192.168.%d.3' % n, '192.168.%d.4' % n],
+ 'gateway': '192.168.%d.1' % i,
+ 'gateway6': '2001:db8:0:%x::1' % i,
'ip6s': 'DONTCARE',
'ips': 'DONTCARE',
- 'label': 'test%s' % i,
- 'mac': 'DE:AD:BE:EF:00:0%s' % i,
- 'vif_uuid': ('00000000-0000-0000-0000-000000000000000%s' %
- i),
- 'rxtx_cap': 'DONTCARE',
+ 'label': 'test%d' % i,
+ 'mac': 'DE:AD:BE:EF:00:%02x' % i,
+ 'vif_uuid':
+ '00000000-0000-0000-0000-00000000000000%02d' % i,
+ 'rxtx_cap': 3,
'should_create_vlan': False,
'should_create_bridge': False}
- self.assertDictMatch(nw[1], check)
+ self.assertDictMatch(info, check)
check = [{'enabled': 'DONTCARE',
- 'ip': '2001:db%s::dcad:beff:feef:%s' % (i8, i),
+ 'ip': '2001:db8::dcad:beff:feef:%s' % i,
'netmask': '64'}]
- self.assertDictListMatch(nw[1]['ip6s'], check)
+ self.assertDictListMatch(info['ip6s'], check)
- check = [{'enabled': '1',
- 'ip': '192.168.%s.100' % i,
- 'netmask': '255.255.255.0'}]
- self.assertDictListMatch(nw[1]['ips'], check)
+ num_fixed_ips = len(info['ips'])
+ check = [{'enabled': 'DONTCARE',
+ 'ip': '192.168.%d.1%02d' % (i, ip_num),
+ 'netmask': '255.255.255.0'}
+ for ip_num in xrange(num_fixed_ips)]
+ self.assertDictListMatch(info['ips'], check)
def test_validate_networks(self):
self.mox.StubOutWithMock(db, 'network_get_all_by_uuids')
@@ -247,6 +241,34 @@ class FlatNetworkTestCase(test.TestCase):
self.network.validate_networks(None, requested_networks)
+ def test_add_fixed_ip_instance_without_vpn_requested_networks(self):
+ self.mox.StubOutWithMock(db, 'network_get')
+ self.mox.StubOutWithMock(db, 'network_update')
+ self.mox.StubOutWithMock(db, 'fixed_ip_associate_pool')
+ self.mox.StubOutWithMock(db, 'instance_get')
+ self.mox.StubOutWithMock(db,
+ 'virtual_interface_get_by_instance_and_network')
+ self.mox.StubOutWithMock(db, 'fixed_ip_update')
+
+ db.fixed_ip_update(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+ db.virtual_interface_get_by_instance_and_network(mox.IgnoreArg(),
+ mox.IgnoreArg(), mox.IgnoreArg()).AndReturn({'id': 0})
+
+ db.instance_get(mox.IgnoreArg(),
+ mox.IgnoreArg()).AndReturn({'security_groups':
+ [{'id': 0}]})
+ db.fixed_ip_associate_pool(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg()).AndReturn('192.168.0.101')
+ db.network_get(mox.IgnoreArg(),
+ mox.IgnoreArg()).AndReturn(networks[0])
+ db.network_update(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
+ self.mox.ReplayAll()
+ self.network.add_fixed_ip_to_instance(self.context, 1, HOST,
+ networks[0]['id'])
+
class VlanNetworkTestCase(test.TestCase):
def setUp(self):
@@ -264,7 +286,8 @@ class VlanNetworkTestCase(test.TestCase):
db.fixed_ip_associate(mox.IgnoreArg(),
mox.IgnoreArg(),
- mox.IgnoreArg()).AndReturn('192.168.0.1')
+ mox.IgnoreArg(),
+ reserved=True).AndReturn('192.168.0.1')
db.fixed_ip_update(mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg())
@@ -387,6 +410,32 @@ class VlanNetworkTestCase(test.TestCase):
mox.IgnoreArg(),
mox.IgnoreArg())
+ def test_add_fixed_ip_instance_without_vpn_requested_networks(self):
+ self.mox.StubOutWithMock(db, 'network_get')
+ self.mox.StubOutWithMock(db, 'fixed_ip_associate_pool')
+ self.mox.StubOutWithMock(db, 'instance_get')
+ self.mox.StubOutWithMock(db,
+ 'virtual_interface_get_by_instance_and_network')
+ self.mox.StubOutWithMock(db, 'fixed_ip_update')
+
+ db.fixed_ip_update(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg())
+ db.virtual_interface_get_by_instance_and_network(mox.IgnoreArg(),
+ mox.IgnoreArg(), mox.IgnoreArg()).AndReturn({'id': 0})
+
+ db.instance_get(mox.IgnoreArg(),
+ mox.IgnoreArg()).AndReturn({'security_groups':
+ [{'id': 0}]})
+ db.fixed_ip_associate_pool(mox.IgnoreArg(),
+ mox.IgnoreArg(),
+ mox.IgnoreArg()).AndReturn('192.168.0.101')
+ db.network_get(mox.IgnoreArg(),
+ mox.IgnoreArg()).AndReturn(networks[0])
+ self.mox.ReplayAll()
+ self.network.add_fixed_ip_to_instance(self.context, 1, HOST,
+ networks[0]['id'])
+
class CommonNetworkTestCase(test.TestCase):
diff --git a/nova/tests/test_quantum.py b/nova/tests/test_quantum.py
new file mode 100644
index 000000000..0feec9b99
--- /dev/null
+++ b/nova/tests/test_quantum.py
@@ -0,0 +1,323 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Nicira, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova import context
+from nova import db
+from nova.db.sqlalchemy import models
+from nova.db.sqlalchemy.session import get_session
+from nova import exception
+from nova import ipv6
+from nova import log as logging
+from nova.network.quantum import manager as quantum_manager
+from nova import test
+from nova import utils
+
+LOG = logging.getLogger('nova.tests.quantum_network')
+
+
+# this class can be used for unit functional/testing on nova,
+# as it does not actually make remote calls to the Quantum service
+class FakeQuantumClientConnection(object):
+
+ def __init__(self):
+ self.nets = {}
+
+ def get_networks_for_tenant(self, tenant_id):
+ net_ids = []
+ for net_id, n in self.nets.items():
+ if n['tenant-id'] == tenant_id:
+ net_ids.append(net_id)
+ return net_ids
+
+ def create_network(self, tenant_id, network_name):
+
+ uuid = str(utils.gen_uuid())
+ self.nets[uuid] = {'net-name': network_name,
+ 'tenant-id': tenant_id,
+ 'ports': {}}
+ return uuid
+
+ def delete_network(self, tenant_id, net_id):
+ if self.nets[net_id]['tenant-id'] == tenant_id:
+ del self.nets[net_id]
+
+ def network_exists(self, tenant_id, net_id):
+ try:
+ return self.nets[net_id]['tenant-id'] == tenant_id
+ except KeyError:
+ return False
+
+ def _confirm_not_attached(self, interface_id):
+ for n in self.nets.values():
+ for p in n['ports'].values():
+ if p['attachment-id'] == interface_id:
+ raise Exception(_("interface '%s' is already attached" %
+ interface_id))
+
+ def create_and_attach_port(self, tenant_id, net_id, interface_id):
+ if not self.network_exists(tenant_id, net_id):
+ raise Exception(
+ _("network %(net_id)s does not exist for tenant %(tenant_id)"
+ % locals()))
+
+ self._confirm_not_attached(interface_id)
+ uuid = str(utils.gen_uuid())
+ self.nets[net_id]['ports'][uuid] = \
+ {"port-state": "ACTIVE",
+ "attachment-id": interface_id}
+
+ def detach_and_delete_port(self, tenant_id, net_id, port_id):
+ if not self.network_exists(tenant_id, net_id):
+ raise exception.NotFound(
+ _("network %(net_id)s does not exist "
+ "for tenant %(tenant_id)s" % locals()))
+ del self.nets[net_id]['ports'][port_id]
+
+ def get_port_by_attachment(self, tenant_id, attachment_id):
+ for net_id, n in self.nets.items():
+ if n['tenant-id'] == tenant_id:
+ for port_id, p in n['ports'].items():
+ if p['attachment-id'] == attachment_id:
+ return (net_id, port_id)
+
+ return (None, None)
+
+networks = [{'label': 'project1-net1',
+ 'injected': False,
+ 'multi_host': False,
+ 'cidr': '192.168.0.0/24',
+ 'cidr_v6': '2001:1db8::/64',
+ 'gateway_v6': '2001:1db8::1',
+ 'netmask_v6': '64',
+ 'netmask': '255.255.255.0',
+ 'bridge': None,
+ 'bridge_interface': None,
+ 'gateway': '192.168.0.1',
+ 'broadcast': '192.168.0.255',
+ 'dns1': '192.168.0.1',
+ 'dns2': '192.168.0.2',
+ 'vlan': None,
+ 'host': None,
+ 'vpn_public_address': None,
+ 'project_id': 'fake_project1',
+ 'priority': 1},
+ {'label': 'project2-net1',
+ 'injected': False,
+ 'multi_host': False,
+ 'cidr': '192.168.1.0/24',
+ 'cidr_v6': '2001:1db9::/64',
+ 'gateway_v6': '2001:1db9::1',
+ 'netmask_v6': '64',
+ 'netmask': '255.255.255.0',
+ 'bridge': None,
+ 'bridge_interface': None,
+ 'gateway': '192.168.1.1',
+ 'broadcast': '192.168.1.255',
+ 'dns1': '192.168.0.1',
+ 'dns2': '192.168.0.2',
+ 'vlan': None,
+ 'host': None,
+ 'project_id': 'fake_project2',
+ 'priority': 1},
+ {'label': "public",
+ 'injected': False,
+ 'multi_host': False,
+ 'cidr': '10.0.0.0/24',
+ 'cidr_v6': '2001:1dba::/64',
+ 'gateway_v6': '2001:1dba::1',
+ 'netmask_v6': '64',
+ 'netmask': '255.255.255.0',
+ 'bridge': None,
+ 'bridge_interface': None,
+ 'gateway': '10.0.0.1',
+ 'broadcast': '10.0.0.255',
+ 'dns1': '10.0.0.1',
+ 'dns2': '10.0.0.2',
+ 'vlan': None,
+ 'host': None,
+ 'project_id': None,
+ 'priority': 0},
+ {'label': "project2-net2",
+ 'injected': False,
+ 'multi_host': False,
+ 'cidr': '9.0.0.0/24',
+ 'cidr_v6': '2001:1dbb::/64',
+ 'gateway_v6': '2001:1dbb::1',
+ 'netmask_v6': '64',
+ 'netmask': '255.255.255.0',
+ 'bridge': None,
+ 'bridge_interface': None,
+ 'gateway': '9.0.0.1',
+ 'broadcast': '9.0.0.255',
+ 'dns1': '9.0.0.1',
+ 'dns2': '9.0.0.2',
+ 'vlan': None,
+ 'host': None,
+ 'project_id': "fake_project2",
+ 'priority': 2}]
+
+
+# this is a base class to be used by all other Quantum Test classes
+class QuantumTestCaseBase(object):
+
+ def test_create_and_delete_nets(self):
+ self._create_nets()
+ self._delete_nets()
+
+ def _create_nets(self):
+ for n in networks:
+ ctx = context.RequestContext('user1', n['project_id'])
+ self.net_man.create_networks(ctx,
+ label=n['label'], cidr=n['cidr'],
+ multi_host=n['multi_host'],
+ num_networks=1, network_size=256, cidr_v6=n['cidr_v6'],
+ gateway_v6=n['gateway_v6'], bridge=None,
+ bridge_interface=None, dns1=n['dns1'],
+ dns2=n['dns2'], project_id=n['project_id'],
+ priority=n['priority'])
+
+ def _delete_nets(self):
+ for n in networks:
+ ctx = context.RequestContext('user1', n['project_id'])
+ self.net_man.delete_network(ctx, n['cidr'])
+
+ def test_allocate_and_deallocate_instance_static(self):
+ self._create_nets()
+
+ project_id = "fake_project1"
+ ctx = context.RequestContext('user1', project_id)
+
+ instance_ref = db.api.instance_create(ctx,
+ {"project_id": project_id})
+ nw_info = self.net_man.allocate_for_instance(ctx,
+ instance_id=instance_ref['id'], host="",
+ instance_type_id=instance_ref['instance_type_id'],
+ project_id=project_id)
+
+ self.assertEquals(len(nw_info), 2)
+
+ # we don't know which order the NICs will be in until we
+ # introduce the notion of priority
+ # v4 cidr
+ self.assertTrue(nw_info[0][0]['cidr'].startswith("10."))
+ self.assertTrue(nw_info[1][0]['cidr'].startswith("192."))
+
+ # v4 address
+ self.assertTrue(nw_info[0][1]['ips'][0]['ip'].startswith("10."))
+ self.assertTrue(nw_info[1][1]['ips'][0]['ip'].startswith("192."))
+
+ # v6 cidr
+ self.assertTrue(nw_info[0][0]['cidr_v6'].startswith("2001:1dba:"))
+ self.assertTrue(nw_info[1][0]['cidr_v6'].startswith("2001:1db8:"))
+
+ # v6 address
+ self.assertTrue(
+ nw_info[0][1]['ip6s'][0]['ip'].startswith("2001:1dba:"))
+ self.assertTrue(
+ nw_info[1][1]['ip6s'][0]['ip'].startswith("2001:1db8:"))
+
+ self.net_man.deallocate_for_instance(ctx,
+ instance_id=instance_ref['id'],
+ project_id=project_id)
+
+ self._delete_nets()
+
+ def test_allocate_and_deallocate_instance_dynamic(self):
+ self._create_nets()
+ project_id = "fake_project2"
+ ctx = context.RequestContext('user1', project_id)
+
+ net_ids = self.net_man.q_conn.get_networks_for_tenant(project_id)
+ requested_networks = [(net_id, None) for net_id in net_ids]
+
+ self.net_man.validate_networks(ctx, requested_networks)
+
+ instance_ref = db.api.instance_create(ctx,
+ {"project_id": project_id})
+ nw_info = self.net_man.allocate_for_instance(ctx,
+ instance_id=instance_ref['id'], host="",
+ instance_type_id=instance_ref['instance_type_id'],
+ project_id=project_id,
+ requested_networks=requested_networks)
+
+ self.assertEquals(len(nw_info), 2)
+
+ # we don't know which order the NICs will be in until we
+ # introduce the notion of priority
+ # v4 cidr
+ self.assertTrue(nw_info[0][0]['cidr'].startswith("9.") or
+ nw_info[1][0]['cidr'].startswith("9."))
+ self.assertTrue(nw_info[0][0]['cidr'].startswith("192.") or
+ nw_info[1][0]['cidr'].startswith("192."))
+
+ # v4 address
+ self.assertTrue(nw_info[0][1]['ips'][0]['ip'].startswith("9.") or
+ nw_info[1][1]['ips'][0]['ip'].startswith("9."))
+ self.assertTrue(nw_info[0][1]['ips'][0]['ip'].startswith("192.") or
+ nw_info[1][1]['ips'][0]['ip'].startswith("192."))
+
+ # v6 cidr
+ self.assertTrue(nw_info[0][0]['cidr_v6'].startswith("2001:1dbb:") or
+ nw_info[1][0]['cidr_v6'].startswith("2001:1dbb:"))
+ self.assertTrue(nw_info[0][0]['cidr_v6'].startswith("2001:1db9:") or
+ nw_info[1][0]['cidr_v6'].startswith("2001:1db9:"))
+
+ # v6 address
+ self.assertTrue(
+ nw_info[0][1]['ip6s'][0]['ip'].startswith("2001:1dbb:") or
+ nw_info[1][1]['ip6s'][0]['ip'].startswith("2001:1dbb:"))
+ self.assertTrue(
+ nw_info[0][1]['ip6s'][0]['ip'].startswith("2001:1db9:") or
+ nw_info[1][1]['ip6s'][0]['ip'].startswith("2001:1db9:"))
+
+ self.net_man.deallocate_for_instance(ctx,
+ instance_id=instance_ref['id'],
+ project_id=project_id)
+
+ self._delete_nets()
+
+ def test_validate_bad_network(self):
+ ctx = context.RequestContext('user1', 'fake_project1')
+ self.assertRaises(exception.NetworkNotFound,
+ self.net_man.validate_networks, ctx, [("", None)])
+
+
+class QuantumNovaIPAMTestCase(QuantumTestCaseBase, test.TestCase):
+
+ def setUp(self):
+ super(QuantumNovaIPAMTestCase, self).setUp()
+
+ self.net_man = quantum_manager.QuantumManager(
+ ipam_lib="nova.network.quantum.nova_ipam_lib",
+ q_conn=FakeQuantumClientConnection())
+
+ # Tests seem to create some networks by default, which
+ # we don't want. So we delete them.
+
+ ctx = context.RequestContext('user1', 'fake_project1').elevated()
+ for n in db.network_get_all(ctx):
+ db.network_delete_safe(ctx, n['id'])
+
+ # Other unit tests (e.g., test_compute.py) have a nasty
+ # habit of of creating fixed IPs and not cleaning up, which
+ # can confuse these tests, so we remove all existing fixed
+ # ips before starting.
+ session = get_session()
+ result = session.query(models.FixedIp).all()
+ with session.begin():
+ for fip_ref in result:
+ session.delete(fip_ref)
diff --git a/nova/tests/test_virt_drivers.py b/nova/tests/test_virt_drivers.py
index 480247c91..440d3401b 100644
--- a/nova/tests/test_virt_drivers.py
+++ b/nova/tests/test_virt_drivers.py
@@ -103,8 +103,9 @@ class _VirtDriverTestCase(test.TestCase):
def test_reboot(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
+ reboot_type = "SOFT"
self.connection.spawn(self.ctxt, instance_ref, network_info)
- self.connection.reboot(instance_ref, network_info)
+ self.connection.reboot(instance_ref, network_info, reboot_type)
@catch_notimplementederror
def test_get_host_ip_addr(self):
diff --git a/nova/tests/test_vmwareapi.py b/nova/tests/test_vmwareapi.py
index 06daf46e8..e6da1690f 100644
--- a/nova/tests/test_vmwareapi.py
+++ b/nova/tests/test_vmwareapi.py
@@ -170,7 +170,8 @@ class VMWareAPIVMTestCase(test.TestCase):
self._create_vm()
info = self.conn.get_info(1)
self._check_vm_info(info, power_state.RUNNING)
- self.conn.reboot(self.instance, self.network_info)
+ reboot_type = "SOFT"
+ self.conn.reboot(self.instance, self.network_info, reboot_type)
info = self.conn.get_info(1)
self._check_vm_info(info, power_state.RUNNING)
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 45dad3516..47c6a3c95 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -364,7 +364,7 @@ class XenAPIVMTestCase(test.TestCase):
def _test_spawn(self, image_ref, kernel_id, ramdisk_id,
instance_type_id="3", os_type="linux",
- architecture="x86-64", instance_id=1,
+ hostname="test", architecture="x86-64", instance_id=1,
check_injection=False,
create_record=True, empty_dns=False):
stubs.stubout_loopingcall_start(self.stubs)
@@ -377,6 +377,7 @@ class XenAPIVMTestCase(test.TestCase):
'ramdisk_id': ramdisk_id,
'instance_type_id': instance_type_id,
'os_type': os_type,
+ 'hostname': hostname,
'architecture': architecture}
instance = db.instance_create(self.context, values)
else:
@@ -494,6 +495,7 @@ class XenAPIVMTestCase(test.TestCase):
self.check_vm_params_for_linux_with_external_kernel()
def test_spawn_netinject_file(self):
+ self.flags(flat_injected=True)
db_fakes.stub_out_db_instance_api(self.stubs, injected=True)
self._tee_executed = False
@@ -611,7 +613,6 @@ class XenAPIVMTestCase(test.TestCase):
str(3 * 1024))
def test_rescue(self):
- self.flags(flat_injected=False)
instance = self._create_instance()
conn = xenapi_conn.get_connection(False)
conn.rescue(self.context, instance, None, [])
@@ -932,8 +933,9 @@ class XenAPIDetermineDiskImageTestCase(test.TestCase):
self.fake_instance.architecture = 'x86-64'
def assert_disk_type(self, disk_type):
+ ctx = context.RequestContext('fake', 'fake')
dt = vm_utils.VMHelper.determine_disk_image_type(
- self.fake_instance)
+ self.fake_instance, ctx)
self.assertEqual(disk_type, dt)
def test_instance_disk(self):
diff --git a/nova/tests/vmwareapi/stubs.py b/nova/tests/vmwareapi/stubs.py
index 0ed5e9b68..7de10e612 100644
--- a/nova/tests/vmwareapi/stubs.py
+++ b/nova/tests/vmwareapi/stubs.py
@@ -47,7 +47,5 @@ def set_stubs(stubs):
stubs.Set(vmware_images, 'upload_image', fake.fake_upload_image)
stubs.Set(vmwareapi_conn.VMWareAPISession, "_get_vim_object",
fake_get_vim_object)
- stubs.Set(vmwareapi_conn.VMWareAPISession, "_get_vim_object",
- fake_get_vim_object)
stubs.Set(vmwareapi_conn.VMWareAPISession, "_is_vim_object",
fake_is_vim_object)