From 9a15c0d070db086111cbe5eff4f19dcb419b32bc Mon Sep 17 00:00:00 2001 From: "Kevin L. Mitchell" Date: Mon, 17 Oct 2011 14:19:25 -0500 Subject: Add XML templates. Creates the concept of an XML template, which is a description of how to serialize an object into XML. XML templates are split into master and slave templates, and slave templates can be attached to master templates to augment the serialization. The expected use case is with extensions, allowing extensions to not only add data to the object, but to also ensure that the new data gets serialized into the output XML representation. Also includes lazy serialization, for use by extensions. Change-Id: Ifd8493be04c73bbb2a850080b687c5e799c48239 --- nova/tests/api/openstack/extensions/foxinsocks.py | 12 +- nova/tests/api/openstack/fakes.py | 12 +- nova/tests/api/openstack/test_accounts.py | 52 ++ nova/tests/api/openstack/test_consoles.py | 66 ++ nova/tests/api/openstack/test_extensions.py | 46 +- nova/tests/api/openstack/test_limits.py | 4 +- nova/tests/api/openstack/test_users.py | 76 +++ nova/tests/api/openstack/test_wsgi.py | 85 ++- nova/tests/api/openstack/test_xmlutil.py | 763 ++++++++++++++++++++++ nova/tests/api/openstack/test_zones.py | 140 +++- 10 files changed, 1210 insertions(+), 46 deletions(-) create mode 100644 nova/tests/api/openstack/test_xmlutil.py (limited to 'nova/tests') diff --git a/nova/tests/api/openstack/extensions/foxinsocks.py b/nova/tests/api/openstack/extensions/foxinsocks.py index 2d8313cf6..70556a28d 100644 --- a/nova/tests/api/openstack/extensions/foxinsocks.py +++ b/nova/tests/api/openstack/extensions/foxinsocks.py @@ -64,12 +64,10 @@ class Foxinsocks(object): def get_request_extensions(self): request_exts = [] - def _goose_handler(req, res): + def _goose_handler(req, res, body): #NOTE: This only handles JSON responses. # You can use content type header to test for XML. - data = json.loads(res.body) - data['flavor']['googoose'] = req.GET.get('chewing') - res.body = json.dumps(data) + body['flavor']['googoose'] = req.GET.get('chewing') return res req_ext1 = extensions.RequestExtension('GET', @@ -77,12 +75,10 @@ class Foxinsocks(object): _goose_handler) request_exts.append(req_ext1) - def _bands_handler(req, res): + def _bands_handler(req, res, body): #NOTE: This only handles JSON responses. # You can use content type header to test for XML. - data = json.loads(res.body) - data['big_bands'] = 'Pig Bands!' - res.body = json.dumps(data) + body['big_bands'] = 'Pig Bands!' return res req_ext2 = extensions.RequestExtension('GET', diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index 1c6359ff1..88cfb8d7a 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -29,9 +29,10 @@ from nova.api import openstack from nova.api import auth as api_auth from nova.api.openstack import auth from nova.api.openstack import extensions -from nova.api.openstack import versions from nova.api.openstack import limits from nova.api.openstack import urlmap +from nova.api.openstack import versions +from nova.api.openstack import wsgi as os_wsgi from nova.auth.manager import User, Project import nova.image.fake from nova.tests.glance import stubs as glance_stubs @@ -65,7 +66,8 @@ def fake_wsgi(self, req): return self.application -def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None): +def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None, + serialization=os_wsgi.LazySerializationMiddleware): if not inner_app11: inner_app11 = openstack.APIRouter() @@ -76,11 +78,13 @@ def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None): ctxt = context.RequestContext('fake', 'fake', auth_token=True) api11 = openstack.FaultWrapper(api_auth.InjectContext(ctxt, limits.RateLimitingMiddleware( - extensions.ExtensionMiddleware(inner_app11)))) + serialization( + extensions.ExtensionMiddleware(inner_app11))))) else: api11 = openstack.FaultWrapper(auth.AuthMiddleware( limits.RateLimitingMiddleware( - extensions.ExtensionMiddleware(inner_app11)))) + serialization( + extensions.ExtensionMiddleware(inner_app11))))) Auth = auth mapper = urlmap.URLMap() mapper['/v1.1'] = api11 diff --git a/nova/tests/api/openstack/test_accounts.py b/nova/tests/api/openstack/test_accounts.py index 125ab8ea0..ea96e1348 100644 --- a/nova/tests/api/openstack/test_accounts.py +++ b/nova/tests/api/openstack/test_accounts.py @@ -16,6 +16,7 @@ import json +from lxml import etree import webob from nova import test @@ -59,10 +60,21 @@ class AccountsTest(test.TestCase): res = req.get_response(fakes.wsgi_app()) res_dict = json.loads(res.body) + self.assertEqual(res.status_int, 200) self.assertEqual(res_dict['account']['id'], 'test1') self.assertEqual(res_dict['account']['name'], 'test1') self.assertEqual(res_dict['account']['manager'], 'id1') + + def test_get_account_xml(self): + req = webob.Request.blank('/v1.1/fake/accounts/test1.xml') + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + self.assertEqual(res.status_int, 200) + self.assertEqual('account', res_tree.tag) + self.assertEqual('test1', res_tree.get('id')) + self.assertEqual('test1', res_tree.get('name')) + self.assertEqual('id1', res_tree.get('manager')) def test_account_delete(self): req = webob.Request.blank('/v1.1/fake/accounts/test1') @@ -91,6 +103,27 @@ class AccountsTest(test.TestCase): fakes.FakeAuthManager.projects) self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3) + def test_account_create_xml(self): + body = dict(account=dict(description='test account', + manager='id1')) + req = webob.Request.blank('/v1.1/fake/accounts/newacct.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'account') + self.assertEqual(res_tree.get('id'), 'newacct') + self.assertEqual(res_tree.get('name'), 'newacct') + self.assertEqual(res_tree.get('description'), 'test account') + self.assertEqual(res_tree.get('manager'), 'id1') + self.assertTrue('newacct' in + fakes.FakeAuthManager.projects) + self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3) + def test_account_update(self): body = dict(account=dict(description='test account', manager='id2')) @@ -108,3 +141,22 @@ class AccountsTest(test.TestCase): self.assertEqual(res_dict['account']['description'], 'test account') self.assertEqual(res_dict['account']['manager'], 'id2') self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2) + + def test_account_update_xml(self): + body = dict(account=dict(description='test account', + manager='id2')) + req = webob.Request.blank('/v1.1/fake/accounts/test1.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'account') + self.assertEqual(res_tree.get('id'), 'test1') + self.assertEqual(res_tree.get('name'), 'test1') + self.assertEqual(res_tree.get('description'), 'test account') + self.assertEqual(res_tree.get('manager'), 'id2') + self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2) diff --git a/nova/tests/api/openstack/test_consoles.py b/nova/tests/api/openstack/test_consoles.py index 25414bc7d..75b55942b 100644 --- a/nova/tests/api/openstack/test_consoles.py +++ b/nova/tests/api/openstack/test_consoles.py @@ -18,6 +18,8 @@ import datetime import json + +from lxml import etree import webob from nova.api.openstack import consoles @@ -142,6 +144,30 @@ class ConsolesTest(test.TestCase): res_dict = json.loads(res.body) self.assertDictMatch(res_dict, expected) + def test_show_console_xml(self): + def fake_get_console(cons_self, context, instance_id, console_id): + self.assertEqual(instance_id, 10) + self.assertEqual(console_id, 20) + pool = dict(console_type='fake_type', + public_hostname='fake_hostname') + return dict(id=console_id, password='fake_password', + port='fake_port', pool=pool) + + self.stubs.Set(console.API, 'get_console', fake_get_console) + + req = webob.Request.blank('/v1.1/fake/servers/10/consoles/20.xml') + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 200) + + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, 'console') + self.assertEqual(res_tree.xpath('id')[0].text, '20') + self.assertEqual(res_tree.xpath('port')[0].text, 'fake_port') + self.assertEqual(res_tree.xpath('host')[0].text, 'fake_hostname') + self.assertEqual(res_tree.xpath('password')[0].text, 'fake_password') + self.assertEqual(res_tree.xpath('console_type')[0].text, + 'fake_type') + def test_show_console_unknown_console(self): def fake_get_console(cons_self, context, instance_id, console_id): raise exception.ConsoleNotFound(console_id=console_id) @@ -188,6 +214,46 @@ class ConsolesTest(test.TestCase): res_dict = json.loads(res.body) self.assertDictMatch(res_dict, expected) + def test_list_consoles_xml(self): + def fake_get_consoles(cons_self, context, instance_id): + self.assertEqual(instance_id, 10) + + pool1 = dict(console_type='fake_type', + public_hostname='fake_hostname') + cons1 = dict(id=10, password='fake_password', + port='fake_port', pool=pool1) + pool2 = dict(console_type='fake_type2', + public_hostname='fake_hostname2') + cons2 = dict(id=11, password='fake_password2', + port='fake_port2', pool=pool2) + return [cons1, cons2] + + expected = {'consoles': + [{'console': {'id': 10, 'console_type': 'fake_type'}}, + {'console': {'id': 11, 'console_type': 'fake_type2'}}]} + + self.stubs.Set(console.API, 'get_consoles', fake_get_consoles) + + req = webob.Request.blank('/v1.1/fake/servers/10/consoles.xml') + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 200) + + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, 'consoles') + self.assertEqual(len(res_tree), 2) + self.assertEqual(res_tree[0].tag, 'console') + self.assertEqual(res_tree[1].tag, 'console') + self.assertEqual(len(res_tree[0]), 1) + self.assertEqual(res_tree[0][0].tag, 'console') + self.assertEqual(len(res_tree[1]), 1) + self.assertEqual(res_tree[1][0].tag, 'console') + self.assertEqual(res_tree[0][0].xpath('id')[0].text, '10') + self.assertEqual(res_tree[1][0].xpath('id')[0].text, '11') + self.assertEqual(res_tree[0][0].xpath('console_type')[0].text, + 'fake_type') + self.assertEqual(res_tree[1][0].xpath('console_type')[0].text, + 'fake_type2') + def test_delete_console(self): def fake_get_console(cons_self, context, instance_id, console_id): self.assertEqual(instance_id, 10) diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py index e3fe0e878..92e74e545 100644 --- a/nova/tests/api/openstack/test_extensions.py +++ b/nova/tests/api/openstack/test_extensions.py @@ -22,6 +22,7 @@ from lxml import etree from nova import context from nova import test +from nova import wsgi as base_wsgi from nova.api import openstack from nova.api.openstack import extensions from nova.api.openstack import flavors @@ -111,8 +112,9 @@ class ExtensionControllerTest(test.TestCase): def test_list_extensions_json(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/extensions") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) # Make sure we have all the extensions. @@ -137,8 +139,9 @@ class ExtensionControllerTest(test.TestCase): def test_get_extension_json(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/extensions/FOXNSOX") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) data = json.loads(response.body) @@ -160,9 +163,10 @@ class ExtensionControllerTest(test.TestCase): def test_list_extensions_xml(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/extensions") request.accept = "application/xml" - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) print response.body @@ -187,9 +191,10 @@ class ExtensionControllerTest(test.TestCase): def test_get_extension_xml(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/extensions/FOXNSOX") request.accept = "application/xml" - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) xml = response.body print xml @@ -218,8 +223,9 @@ class ResourceExtensionTest(test.TestCase): manager = StubExtensionManager(None) app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app, manager) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/blah") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(404, response.status_int) def test_get_resources(self): @@ -228,8 +234,9 @@ class ResourceExtensionTest(test.TestCase): manager = StubExtensionManager(res_ext) app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app, manager) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/tweedles") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) self.assertEqual(response_body, response.body) @@ -239,8 +246,9 @@ class ResourceExtensionTest(test.TestCase): manager = StubExtensionManager(res_ext) app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app, manager) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/tweedles") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) self.assertEqual(response_body, response.body) @@ -263,12 +271,15 @@ class ExtensionManagerTest(test.TestCase): def test_get_resources(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/foxnsocks") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) self.assertEqual(response_body, response.body) def test_invalid_extensions(self): + # Don't need the serialization middleware here because we're + # not testing any serialization app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) ext_mgr = ext_midware.ext_mgr @@ -287,11 +298,12 @@ class ActionExtensionTest(test.TestCase): def _send_server_action_request(self, url, body): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank(url) request.method = 'POST' request.content_type = 'application/json' request.body = json.dumps(body) - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) return response def test_extended_action(self): @@ -328,11 +340,9 @@ class RequestExtensionTest(test.TestCase): def test_get_resources_with_stub_mgr(self): - def _req_handler(req, res): + def _req_handler(req, res, body): # only handle JSON responses - data = json.loads(res.body) - data['flavor']['googoose'] = req.GET.get('chewing') - res.body = json.dumps(data) + body['flavor']['googoose'] = req.GET.get('chewing') return res req_ext = extensions.RequestExtension('GET', @@ -340,22 +350,24 @@ class RequestExtensionTest(test.TestCase): _req_handler) manager = StubExtensionManager(None, None, req_ext) - app = fakes.wsgi_app() + app = fakes.wsgi_app(serialization=base_wsgi.Middleware) ext_midware = extensions.ExtensionMiddleware(app, manager) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/v1.1/123/flavors/1?chewing=bluegoo") request.environ['api.version'] = '1.1' - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) response_data = json.loads(response.body) self.assertEqual('bluegoo', response_data['flavor']['googoose']) def test_get_resources_with_mgr(self): - app = fakes.wsgi_app() + app = fakes.wsgi_app(serialization=base_wsgi.Middleware) ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/v1.1/123/flavors/1?chewing=newblue") request.environ['api.version'] = '1.1' - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) response_data = json.loads(response.body) self.assertEqual('newblue', response_data['flavor']['googoose']) diff --git a/nova/tests/api/openstack/test_limits.py b/nova/tests/api/openstack/test_limits.py index 51a97f920..96e30f756 100644 --- a/nova/tests/api/openstack/test_limits.py +++ b/nova/tests/api/openstack/test_limits.py @@ -30,6 +30,7 @@ from xml.dom import minidom import nova.context from nova.api.openstack import limits from nova.api.openstack import views +from nova.api.openstack import wsgi from nova.api.openstack import xmlutil from nova import test @@ -80,7 +81,8 @@ class LimitsControllerTest(BaseLimitTestSuite): def setUp(self): """Run before each test.""" BaseLimitTestSuite.setUp(self) - self.controller = limits.create_resource() + self.controller = wsgi.LazySerializationMiddleware( + limits.create_resource()) self.maxDiff = None def _get_index_request(self, accept_header="application/json"): diff --git a/nova/tests/api/openstack/test_users.py b/nova/tests/api/openstack/test_users.py index c9af530cf..cc77d7d26 100644 --- a/nova/tests/api/openstack/test_users.py +++ b/nova/tests/api/openstack/test_users.py @@ -15,6 +15,7 @@ import json +from lxml import etree import webob from nova import test @@ -63,6 +64,19 @@ class UsersTest(test.TestCase): self.assertEqual(res.status_int, 200) self.assertEqual(len(res_dict['users']), 2) + def test_get_user_list_xml(self): + req = webob.Request.blank('/v1.1/fake/users.xml') + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'users') + self.assertEqual(len(res_tree), 2) + self.assertEqual(res_tree[0].tag, 'user') + self.assertEqual(res_tree[0].get('id'), 'id1') + self.assertEqual(res_tree[1].tag, 'user') + self.assertEqual(res_tree[1].get('id'), 'id2') + def test_get_user_by_id(self): req = webob.Request.blank('/v1.1/fake/users/id2') res = req.get_response(fakes.wsgi_app()) @@ -74,6 +88,18 @@ class UsersTest(test.TestCase): self.assertEqual(res_dict['user']['admin'], True) self.assertEqual(res.status_int, 200) + def test_get_user_by_id_xml(self): + req = webob.Request.blank('/v1.1/fake/users/id2.xml') + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'user') + self.assertEqual(res_tree.get('id'), 'id2') + self.assertEqual(res_tree.get('name'), 'guy2') + self.assertEqual(res_tree.get('secret'), 'secret2') + self.assertEqual(res_tree.get('admin'), 'True') + def test_user_delete(self): # Check the user exists req = webob.Request.blank('/v1.1/fake/users/id1') @@ -125,6 +151,35 @@ class UsersTest(test.TestCase): fakes.FakeAuthManager.auth_data]) self.assertEqual(len(fakes.FakeAuthManager.auth_data), 3) + def test_user_create_xml(self): + secret = utils.generate_password() + body = dict(user=dict(name='test_guy', + access='acc3', + secret=secret, + admin=True)) + req = webob.Request.blank('/v1.1/fake/users.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'POST' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + + # NOTE(justinsb): This is a questionable assertion in general + # fake sets id=name, but others might not... + self.assertEqual(res_tree.tag, 'user') + self.assertEqual(res_tree.get('id'), 'test_guy') + + self.assertEqual(res_tree.get('name'), 'test_guy') + self.assertEqual(res_tree.get('access'), 'acc3') + self.assertEqual(res_tree.get('secret'), secret) + self.assertEqual(res_tree.get('admin'), 'True') + self.assertTrue('test_guy' in [u.id for u in + fakes.FakeAuthManager.auth_data]) + self.assertEqual(len(fakes.FakeAuthManager.auth_data), 3) + def test_user_update(self): new_secret = utils.generate_password() body = dict(user=dict(name='guy2', @@ -144,3 +199,24 @@ class UsersTest(test.TestCase): self.assertEqual(res_dict['user']['access'], 'acc2') self.assertEqual(res_dict['user']['secret'], new_secret) self.assertEqual(res_dict['user']['admin'], True) + + def test_user_update_xml(self): + new_secret = utils.generate_password() + body = dict(user=dict(name='guy2', + access='acc2', + secret=new_secret)) + req = webob.Request.blank('/v1.1/fake/users/id2.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'user') + self.assertEqual(res_tree.get('id'), 'id2') + self.assertEqual(res_tree.get('name'), 'guy2') + self.assertEqual(res_tree.get('access'), 'acc2') + self.assertEqual(res_tree.get('secret'), new_secret) + self.assertEqual(res_tree.get('admin'), 'True') diff --git a/nova/tests/api/openstack/test_wsgi.py b/nova/tests/api/openstack/test_wsgi.py index 74b9ce853..5aea8275d 100644 --- a/nova/tests/api/openstack/test_wsgi.py +++ b/nova/tests/api/openstack/test_wsgi.py @@ -215,20 +215,23 @@ class RequestHeadersDeserializerTest(test.TestCase): self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'}) -class ResponseSerializerTest(test.TestCase): - def setUp(self): - class JSONSerializer(object): - def serialize(self, data, action='default'): - return 'pew_json' +class JSONSerializer(object): + def serialize(self, data, action='default'): + return 'pew_json' - class XMLSerializer(object): - def serialize(self, data, action='default'): - return 'pew_xml' - class HeadersSerializer(object): - def serialize(self, response, data, action): - response.status_int = 404 +class XMLSerializer(object): + def serialize(self, data, action='default'): + return 'pew_xml' + +class HeadersSerializer(object): + def serialize(self, response, data, action): + response.status_int = 404 + + +class ResponseSerializerTest(test.TestCase): + def setUp(self): self.body_serializers = { 'application/json': JSONSerializer(), 'application/xml': XMLSerializer(), @@ -253,7 +256,8 @@ class ResponseSerializerTest(test.TestCase): def test_serialize_response_json(self): for content_type in ('application/json', 'application/vnd.openstack.compute+json'): - response = self.serializer.serialize({}, content_type) + request = wsgi.Request.blank('/') + response = self.serializer.serialize(request, {}, content_type) self.assertEqual(response.headers['Content-Type'], content_type) self.assertEqual(response.body, 'pew_json') self.assertEqual(response.status_int, 404) @@ -261,21 +265,72 @@ class ResponseSerializerTest(test.TestCase): def test_serialize_response_xml(self): for content_type in ('application/xml', 'application/vnd.openstack.compute+xml'): - response = self.serializer.serialize({}, content_type) + request = wsgi.Request.blank('/') + response = self.serializer.serialize(request, {}, content_type) self.assertEqual(response.headers['Content-Type'], content_type) self.assertEqual(response.body, 'pew_xml') self.assertEqual(response.status_int, 404) def test_serialize_response_None(self): - response = self.serializer.serialize(None, 'application/json') + request = wsgi.Request.blank('/') + response = self.serializer.serialize(request, None, 'application/json') self.assertEqual(response.headers['Content-Type'], 'application/json') self.assertEqual(response.body, '') self.assertEqual(response.status_int, 404) def test_serialize_response_dict_to_unknown_content_type(self): + request = wsgi.Request.blank('/') self.assertRaises(exception.InvalidContentType, self.serializer.serialize, - {}, 'application/unknown') + request, {}, 'application/unknown') + + +class LazySerializationTest(test.TestCase): + def setUp(self): + self.body_serializers = { + 'application/json': JSONSerializer(), + 'application/xml': XMLSerializer(), + } + + self.serializer = wsgi.ResponseSerializer(self.body_serializers, + HeadersSerializer()) + + def tearDown(self): + pass + + def test_serialize_response_json(self): + for content_type in ('application/json', + 'application/vnd.openstack.compute+json'): + request = wsgi.Request.blank('/') + request.environ['nova.lazy_serialize'] = True + response = self.serializer.serialize(request, {}, content_type) + self.assertEqual(response.headers['Content-Type'], content_type) + self.assertEqual(response.status_int, 404) + body = json.loads(response.body) + self.assertEqual(body, {}) + serializer = request.environ['nova.serializer'] + self.assertEqual(serializer.serialize(body), 'pew_json') + + def test_serialize_response_xml(self): + for content_type in ('application/xml', + 'application/vnd.openstack.compute+xml'): + request = wsgi.Request.blank('/') + request.environ['nova.lazy_serialize'] = True + response = self.serializer.serialize(request, {}, content_type) + self.assertEqual(response.headers['Content-Type'], content_type) + self.assertEqual(response.status_int, 404) + body = json.loads(response.body) + self.assertEqual(body, {}) + serializer = request.environ['nova.serializer'] + self.assertEqual(serializer.serialize(body), 'pew_xml') + + def test_serialize_response_None(self): + request = wsgi.Request.blank('/') + request.environ['nova.lazy_serialize'] = True + response = self.serializer.serialize(request, None, 'application/json') + self.assertEqual(response.headers['Content-Type'], 'application/json') + self.assertEqual(response.status_int, 404) + self.assertEqual(response.body, '') class RequestDeserializerTest(test.TestCase): diff --git a/nova/tests/api/openstack/test_xmlutil.py b/nova/tests/api/openstack/test_xmlutil.py new file mode 100644 index 000000000..6d224967f --- /dev/null +++ b/nova/tests/api/openstack/test_xmlutil.py @@ -0,0 +1,763 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from lxml import etree + +from nova import test +from nova.api.openstack import xmlutil + + +class SelectorTest(test.TestCase): + obj_for_test = { + 'test': { + 'name': 'test', + 'values': [1, 2, 3], + 'attrs': { + 'foo': 1, + 'bar': 2, + 'baz': 3, + }, + }, + } + + def test_empty_selector(self): + sel = xmlutil.Selector() + self.assertEqual(len(sel.chain), 0) + self.assertEqual(sel(self.obj_for_test), self.obj_for_test) + + def test_dict_selector(self): + sel = xmlutil.Selector('test') + self.assertEqual(len(sel.chain), 1) + self.assertEqual(sel.chain[0], 'test') + self.assertEqual(sel(self.obj_for_test), + self.obj_for_test['test']) + + def test_datum_selector(self): + sel = xmlutil.Selector('test', 'name') + self.assertEqual(len(sel.chain), 2) + self.assertEqual(sel.chain[0], 'test') + self.assertEqual(sel.chain[1], 'name') + self.assertEqual(sel(self.obj_for_test), 'test') + + def test_list_selector(self): + sel = xmlutil.Selector('test', 'values', 0) + self.assertEqual(len(sel.chain), 3) + self.assertEqual(sel.chain[0], 'test') + self.assertEqual(sel.chain[1], 'values') + self.assertEqual(sel.chain[2], 0) + self.assertEqual(sel(self.obj_for_test), 1) + + def test_items_selector(self): + sel = xmlutil.Selector('test', 'attrs', xmlutil.get_items) + self.assertEqual(len(sel.chain), 3) + self.assertEqual(sel.chain[2], xmlutil.get_items) + for key, val in sel(self.obj_for_test): + self.assertEqual(self.obj_for_test['test']['attrs'][key], val) + + def test_missing_key_selector(self): + sel = xmlutil.Selector('test2', 'attrs') + self.assertEqual(sel(self.obj_for_test), None) + self.assertRaises(KeyError, sel, self.obj_for_test, True) + + def test_constant_selector(self): + sel = xmlutil.ConstantSelector('Foobar') + self.assertEqual(sel.value, 'Foobar') + self.assertEqual(sel(self.obj_for_test), 'Foobar') + + +class TemplateElementTest(test.TestCase): + def test_element_initial_attributes(self): + # Create a template element with some attributes + elem = xmlutil.TemplateElement('test', attrib=dict(a=1, b=2, c=3), + c=4, d=5, e=6) + + # Verify all the attributes are as expected + expected = dict(a=1, b=2, c=4, d=5, e=6) + for k, v in expected.items(): + self.assertEqual(elem.attrib[k].chain[0], v) + + def test_element_get_attributes(self): + expected = dict(a=1, b=2, c=3) + + # Create a template element with some attributes + elem = xmlutil.TemplateElement('test', attrib=expected) + + # Verify that get() retrieves the attributes + for k, v in expected.items(): + self.assertEqual(elem.get(k).chain[0], v) + + def test_element_set_attributes(self): + attrs = dict(a=None, b='foo', c=xmlutil.Selector('foo', 'bar')) + + # Create a bare template element with no attributes + elem = xmlutil.TemplateElement('test') + + # Set the attribute values + for k, v in attrs.items(): + elem.set(k, v) + + # Now verify what got set + self.assertEqual(len(elem.attrib['a'].chain), 1) + self.assertEqual(elem.attrib['a'].chain[0], 'a') + self.assertEqual(len(elem.attrib['b'].chain), 1) + self.assertEqual(elem.attrib['b'].chain[0], 'foo') + self.assertEqual(elem.attrib['c'], attrs['c']) + + def test_element_attribute_keys(self): + attrs = dict(a=1, b=2, c=3, d=4) + expected = set(attrs.keys()) + + # Create a template element with some attributes + elem = xmlutil.TemplateElement('test', attrib=attrs) + + # Now verify keys + self.assertEqual(set(elem.keys()), expected) + + def test_element_attribute_items(self): + expected = dict(a=xmlutil.Selector(1), + b=xmlutil.Selector(2), + c=xmlutil.Selector(3)) + keys = set(expected.keys()) + + # Create a template element with some attributes + elem = xmlutil.TemplateElement('test', attrib=expected) + + # Now verify items + for k, v in elem.items(): + self.assertEqual(expected[k], v) + keys.remove(k) + + # Did we visit all keys? + self.assertEqual(len(keys), 0) + + def test_element_selector_none(self): + # Create a template element with no selector + elem = xmlutil.TemplateElement('test') + + self.assertEqual(len(elem.selector.chain), 0) + + def test_element_selector_string(self): + # Create a template element with a string selector + elem = xmlutil.TemplateElement('test', selector='test') + + self.assertEqual(len(elem.selector.chain), 1) + self.assertEqual(elem.selector.chain[0], 'test') + + def test_element_selector(self): + sel = xmlutil.Selector('a', 'b') + + # Create a template element with an explicit selector + elem = xmlutil.TemplateElement('test', selector=sel) + + self.assertEqual(elem.selector, sel) + + def test_element_append_child(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Make sure the element starts off empty + self.assertEqual(len(elem), 0) + + # Create a child element + child = xmlutil.TemplateElement('child') + + # Append the child to the parent + elem.append(child) + + # Verify that the child was added + self.assertEqual(len(elem), 1) + self.assertEqual(elem[0], child) + self.assertEqual('child' in elem, True) + self.assertEqual(elem['child'], child) + + # Ensure that multiple children of the same name are rejected + child2 = xmlutil.TemplateElement('child') + self.assertRaises(KeyError, elem.append, child2) + + def test_element_extend_children(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Make sure the element starts off empty + self.assertEqual(len(elem), 0) + + # Create a few children + children = [ + xmlutil.TemplateElement('child1'), + xmlutil.TemplateElement('child2'), + xmlutil.TemplateElement('child3'), + ] + + # Extend the parent by those children + elem.extend(children) + + # Verify that the children were added + self.assertEqual(len(elem), 3) + for idx in range(len(elem)): + self.assertEqual(children[idx], elem[idx]) + self.assertEqual(children[idx].tag in elem, True) + self.assertEqual(elem[children[idx].tag], children[idx]) + + # Ensure that multiple children of the same name are rejected + children2 = [ + xmlutil.TemplateElement('child4'), + xmlutil.TemplateElement('child1'), + ] + self.assertRaises(KeyError, elem.extend, children2) + + # Also ensure that child4 was not added + self.assertEqual(len(elem), 3) + self.assertEqual(elem[-1].tag, 'child3') + + def test_element_insert_child(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Make sure the element starts off empty + self.assertEqual(len(elem), 0) + + # Create a few children + children = [ + xmlutil.TemplateElement('child1'), + xmlutil.TemplateElement('child2'), + xmlutil.TemplateElement('child3'), + ] + + # Extend the parent by those children + elem.extend(children) + + # Create a child to insert + child = xmlutil.TemplateElement('child4') + + # Insert it + elem.insert(1, child) + + # Ensure the child was inserted in the right place + self.assertEqual(len(elem), 4) + children.insert(1, child) + for idx in range(len(elem)): + self.assertEqual(children[idx], elem[idx]) + self.assertEqual(children[idx].tag in elem, True) + self.assertEqual(elem[children[idx].tag], children[idx]) + + # Ensure that multiple children of the same name are rejected + child2 = xmlutil.TemplateElement('child2') + self.assertRaises(KeyError, elem.insert, 2, child2) + + def test_element_remove_child(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Make sure the element starts off empty + self.assertEqual(len(elem), 0) + + # Create a few children + children = [ + xmlutil.TemplateElement('child1'), + xmlutil.TemplateElement('child2'), + xmlutil.TemplateElement('child3'), + ] + + # Extend the parent by those children + elem.extend(children) + + # Create a test child to remove + child = xmlutil.TemplateElement('child2') + + # Try to remove it + self.assertRaises(ValueError, elem.remove, child) + + # Ensure that no child was removed + self.assertEqual(len(elem), 3) + + # Now remove a legitimate child + elem.remove(children[1]) + + # Ensure that the child was removed + self.assertEqual(len(elem), 2) + self.assertEqual(elem[0], children[0]) + self.assertEqual(elem[1], children[2]) + self.assertEqual('child2' in elem, False) + + # Ensure the child cannot be retrieved by name + def get_key(elem, key): + return elem[key] + self.assertRaises(KeyError, get_key, elem, 'child2') + + def test_element_text(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Ensure that it has no text + self.assertEqual(elem.text, None) + + # Try setting it to a string and ensure it becomes a selector + elem.text = 'test' + self.assertEqual(hasattr(elem.text, 'chain'), True) + self.assertEqual(len(elem.text.chain), 1) + self.assertEqual(elem.text.chain[0], 'test') + + # Try resetting the text to None + elem.text = None + self.assertEqual(elem.text, None) + + # Now make up a selector and try setting the text to that + sel = xmlutil.Selector() + elem.text = sel + self.assertEqual(elem.text, sel) + + # Finally, try deleting the text and see what happens + del elem.text + self.assertEqual(elem.text, None) + + def test_apply_attrs(self): + # Create a template element + attrs = dict(attr1=xmlutil.ConstantSelector(1), + attr2=xmlutil.ConstantSelector(2)) + tmpl_elem = xmlutil.TemplateElement('test', attrib=attrs) + + # Create an etree element + elem = etree.Element('test') + + # Apply the template to the element + tmpl_elem.apply(elem, None) + + # Now, verify the correct attributes were set + for k, v in elem.items(): + self.assertEqual(str(attrs[k].value), v) + + def test_apply_text(self): + # Create a template element + tmpl_elem = xmlutil.TemplateElement('test') + tmpl_elem.text = xmlutil.ConstantSelector(1) + + # Create an etree element + elem = etree.Element('test') + + # Apply the template to the element + tmpl_elem.apply(elem, None) + + # Now, verify the text was set + self.assertEqual(str(tmpl_elem.text.value), elem.text) + + def test__render(self): + attrs = dict(attr1=xmlutil.ConstantSelector(1), + attr2=xmlutil.ConstantSelector(2), + attr3=xmlutil.ConstantSelector(3)) + + # Create a master template element + master_elem = xmlutil.TemplateElement('test', attr1=attrs['attr1']) + + # Create a couple of slave template element + slave_elems = [ + xmlutil.TemplateElement('test', attr2=attrs['attr2']), + xmlutil.TemplateElement('test', attr3=attrs['attr3']), + ] + + # Try the render + elem = master_elem._render(None, None, slave_elems, None) + + # Verify the particulars of the render + self.assertEqual(elem.tag, 'test') + self.assertEqual(len(elem.nsmap), 0) + for k, v in elem.items(): + self.assertEqual(str(attrs[k].value), v) + + # Create a parent for the element to be rendered + parent = etree.Element('parent') + + # Try the render again... + elem = master_elem._render(parent, None, slave_elems, dict(a='foo')) + + # Verify the particulars of the render + self.assertEqual(len(parent), 1) + self.assertEqual(parent[0], elem) + self.assertEqual(len(elem.nsmap), 1) + self.assertEqual(elem.nsmap['a'], 'foo') + + def test_render(self): + # Create a template element + tmpl_elem = xmlutil.TemplateElement('test') + tmpl_elem.text = xmlutil.Selector() + + # Create the object we're going to render + obj = ['elem1', 'elem2', 'elem3', 'elem4'] + + # Try a render with no object + elems = tmpl_elem.render(None, None) + self.assertEqual(len(elems), 0) + + # Try a render with one object + elems = tmpl_elem.render(None, 'foo') + self.assertEqual(len(elems), 1) + self.assertEqual(elems[0][0].text, 'foo') + self.assertEqual(elems[0][1], 'foo') + + # Now, try rendering an object with multiple entries + parent = etree.Element('parent') + elems = tmpl_elem.render(parent, obj) + self.assertEqual(len(elems), 4) + + # Check the results + for idx in range(len(obj)): + self.assertEqual(elems[idx][0].text, obj[idx]) + self.assertEqual(elems[idx][1], obj[idx]) + + def test_subelement(self): + # Try the SubTemplateElement constructor + parent = xmlutil.SubTemplateElement(None, 'parent') + self.assertEqual(parent.tag, 'parent') + self.assertEqual(len(parent), 0) + + # Now try it with a parent element + child = xmlutil.SubTemplateElement(parent, 'child') + self.assertEqual(child.tag, 'child') + self.assertEqual(len(parent), 1) + self.assertEqual(parent[0], child) + + def test_wrap(self): + # These are strange methods, but they make things easier + elem = xmlutil.TemplateElement('test') + self.assertEqual(elem.unwrap(), elem) + self.assertEqual(elem.wrap().root, elem) + + def test_dyntag(self): + obj = ['a', 'b', 'c'] + + # Create a template element with a dynamic tag + tmpl_elem = xmlutil.TemplateElement(xmlutil.Selector()) + + # Try the render + parent = etree.Element('parent') + elems = tmpl_elem.render(parent, obj) + + # Verify the particulars of the render + self.assertEqual(len(elems), len(obj)) + for idx in range(len(obj)): + self.assertEqual(elems[idx][0].tag, obj[idx]) + + +class TemplateTest(test.TestCase): + def test_wrap(self): + # These are strange methods, but they make things easier + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.Template(elem) + self.assertEqual(tmpl.unwrap(), elem) + self.assertEqual(tmpl.wrap(), tmpl) + + def test__siblings(self): + # Set up a basic template + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.Template(elem) + + # Check that we get the right siblings + siblings = tmpl._siblings() + self.assertEqual(len(siblings), 1) + self.assertEqual(siblings[0], elem) + + def test__nsmap(self): + # Set up a basic template + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.Template(elem, nsmap=dict(a="foo")) + + # Check out that we get the right namespace dictionary + nsmap = tmpl._nsmap() + self.assertNotEqual(id(nsmap), id(tmpl.nsmap)) + self.assertEqual(len(nsmap), 1) + self.assertEqual(nsmap['a'], 'foo') + + def test_master_attach(self): + # Set up a master template + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.MasterTemplate(elem, 1) + + # Make sure it has a root but no slaves + self.assertEqual(tmpl.root, elem) + self.assertEqual(len(tmpl.slaves), 0) + + # Try to attach an invalid slave + bad_elem = xmlutil.TemplateElement('test2') + self.assertRaises(ValueError, tmpl.attach, bad_elem) + self.assertEqual(len(tmpl.slaves), 0) + + # Try to attach an invalid and a valid slave + good_elem = xmlutil.TemplateElement('test') + self.assertRaises(ValueError, tmpl.attach, good_elem, bad_elem) + self.assertEqual(len(tmpl.slaves), 0) + + # Try to attach an inapplicable template + class InapplicableTemplate(xmlutil.Template): + def apply(self, master): + return False + inapp_tmpl = InapplicableTemplate(good_elem) + tmpl.attach(inapp_tmpl) + self.assertEqual(len(tmpl.slaves), 0) + + # Now try attaching an applicable template + tmpl.attach(good_elem) + self.assertEqual(len(tmpl.slaves), 1) + self.assertEqual(tmpl.slaves[0].root, good_elem) + + def test_master_copy(self): + # Construct a master template + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.MasterTemplate(elem, 1, nsmap=dict(a='foo')) + + # Give it a slave + slave = xmlutil.TemplateElement('test') + tmpl.attach(slave) + + # Construct a copy + copy = tmpl.copy() + + # Check to see if we actually managed a copy + self.assertNotEqual(tmpl, copy) + self.assertEqual(tmpl.root, copy.root) + self.assertEqual(tmpl.version, copy.version) + self.assertEqual(id(tmpl.nsmap), id(copy.nsmap)) + self.assertNotEqual(id(tmpl.slaves), id(copy.slaves)) + self.assertEqual(len(tmpl.slaves), len(copy.slaves)) + self.assertEqual(tmpl.slaves[0], copy.slaves[0]) + + def test_slave_apply(self): + # Construct a master template + elem = xmlutil.TemplateElement('test') + master = xmlutil.MasterTemplate(elem, 3) + + # Construct a slave template with applicable minimum version + slave = xmlutil.SlaveTemplate(elem, 2) + self.assertEqual(slave.apply(master), True) + + # Construct a slave template with equal minimum version + slave = xmlutil.SlaveTemplate(elem, 3) + self.assertEqual(slave.apply(master), True) + + # Construct a slave template with inapplicable minimum version + slave = xmlutil.SlaveTemplate(elem, 4) + self.assertEqual(slave.apply(master), False) + + # Construct a slave template with applicable version range + slave = xmlutil.SlaveTemplate(elem, 2, 4) + self.assertEqual(slave.apply(master), True) + + # Construct a slave template with low version range + slave = xmlutil.SlaveTemplate(elem, 1, 2) + self.assertEqual(slave.apply(master), False) + + # Construct a slave template with high version range + slave = xmlutil.SlaveTemplate(elem, 4, 5) + self.assertEqual(slave.apply(master), False) + + # Construct a slave template with matching version range + slave = xmlutil.SlaveTemplate(elem, 3, 3) + self.assertEqual(slave.apply(master), True) + + def test__serialize(self): + # Our test object to serialize + obj = { + 'test': { + 'name': 'foobar', + 'values': [1, 2, 3, 4], + 'attrs': { + 'a': 1, + 'b': 2, + 'c': 3, + 'd': 4, + }, + 'image': { + 'name': 'image_foobar', + 'id': 42, + }, + }, + } + + # Set up our master template + root = xmlutil.TemplateElement('test', selector='test', + name='name') + value = xmlutil.SubTemplateElement(root, 'value', selector='values') + value.text = xmlutil.Selector() + attrs = xmlutil.SubTemplateElement(root, 'attrs', selector='attrs') + xmlutil.SubTemplateElement(attrs, 'attr', selector=xmlutil.get_items, + key=0, value=1) + master = xmlutil.MasterTemplate(root, 1, nsmap=dict(f='foo')) + + # Set up our slave template + root_slave = xmlutil.TemplateElement('test', selector='test') + image = xmlutil.SubTemplateElement(root_slave, 'image', + selector='image', id='id') + image.text = xmlutil.Selector('name') + slave = xmlutil.SlaveTemplate(root_slave, 1, nsmap=dict(b='bar')) + + # Attach the slave to the master... + master.attach(slave) + + # Try serializing our object + siblings = master._siblings() + nsmap = master._nsmap() + result = master._serialize(None, obj, siblings, nsmap) + + # Now we get to manually walk the element tree... + self.assertEqual(result.tag, 'test') + self.assertEqual(len(result.nsmap), 2) + self.assertEqual(result.nsmap['f'], 'foo') + self.assertEqual(result.nsmap['b'], 'bar') + self.assertEqual(result.get('name'), obj['test']['name']) + for idx, val in enumerate(obj['test']['values']): + self.assertEqual(result[idx].tag, 'value') + self.assertEqual(result[idx].text, str(val)) + idx += 1 + self.assertEqual(result[idx].tag, 'attrs') + for attr in result[idx]: + self.assertEqual(attr.tag, 'attr') + self.assertEqual(attr.get('value'), + str(obj['test']['attrs'][attr.get('key')])) + idx += 1 + self.assertEqual(result[idx].tag, 'image') + self.assertEqual(result[idx].get('id'), + str(obj['test']['image']['id'])) + self.assertEqual(result[idx].text, obj['test']['image']['name']) + + +class MasterTemplateBuilder(xmlutil.TemplateBuilder): + def construct(self): + elem = xmlutil.TemplateElement('test') + return xmlutil.MasterTemplate(elem, 1) + + +class SlaveTemplateBuilder(xmlutil.TemplateBuilder): + def construct(self): + elem = xmlutil.TemplateElement('test') + return xmlutil.SlaveTemplate(elem, 1) + + +class TemplateBuilderTest(test.TestCase): + def test_master_template_builder(self): + # Make sure the template hasn't been built yet + self.assertEqual(MasterTemplateBuilder._tmpl, None) + + # Now, construct the template + tmpl1 = MasterTemplateBuilder() + + # Make sure that there is a template cached... + self.assertNotEqual(MasterTemplateBuilder._tmpl, None) + + # Make sure it wasn't what was returned... + self.assertNotEqual(MasterTemplateBuilder._tmpl, tmpl1) + + # Make sure it doesn't get rebuilt + cached = MasterTemplateBuilder._tmpl + tmpl2 = MasterTemplateBuilder() + self.assertEqual(MasterTemplateBuilder._tmpl, cached) + + # Make sure we're always getting fresh copies + self.assertNotEqual(tmpl1, tmpl2) + + # Make sure we can override the copying behavior + tmpl3 = MasterTemplateBuilder(False) + self.assertEqual(MasterTemplateBuilder._tmpl, tmpl3) + + def test_slave_template_builder(self): + # Make sure the template hasn't been built yet + self.assertEqual(SlaveTemplateBuilder._tmpl, None) + + # Now, construct the template + tmpl1 = SlaveTemplateBuilder() + + # Make sure there is a template cached... + self.assertNotEqual(SlaveTemplateBuilder._tmpl, None) + + # Make sure it was what was returned... + self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1) + + # Make sure it doesn't get rebuilt + tmpl2 = SlaveTemplateBuilder() + self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1) + + # Make sure we're always getting the cached copy + self.assertEqual(tmpl1, tmpl2) + + +class SerializerTest(xmlutil.XMLTemplateSerializer): + def test(self): + root = xmlutil.TemplateElement('servers') + a = xmlutil.SubTemplateElement(root, 'a', selector='servers') + a.text = xmlutil.Selector('a') + return xmlutil.MasterTemplate(root, 1, nsmap={None: "asdf"}) + + +class XMLTemplateSerializerTest(test.TestCase): + def setUp(self): + self.tmpl_serializer = SerializerTest() + self.data = dict(servers=dict(a=(2, 3))) + self.data_multi = dict(servers=[dict(a=(2, 3)), dict(a=(3, 4))]) + super(XMLTemplateSerializerTest, self).setUp() + + def test_get_template(self): + # First, check what happens when we fall back on the default + # option + self.assertEqual(self.tmpl_serializer.get_template(), None) + self.assertEqual(self.tmpl_serializer.get_template('nosuch'), None) + + # Now, check that we get back a template + tmpl = self.tmpl_serializer.get_template('test') + self.assertNotEqual(tmpl, None) + self.assertEqual(tmpl.root.tag, 'servers') + + def test_serialize_default(self): + expected_xml = '(2,3)' + result = self.tmpl_serializer.serialize(self.data) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize_multi_default(self): + expected_xml = ('(2,3)' + '(3,4)') + result = self.tmpl_serializer.serialize(self.data_multi) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize_explicit(self): + expected_xml = ("" + '(2,3)') + tmpl = self.tmpl_serializer.get_template('test') + result = self.tmpl_serializer.serialize(self.data, template=tmpl) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize_multi_explicit(self): + expected_xml = ("" + '(2,3)' + '(3,4)') + tmpl = self.tmpl_serializer.get_template('test') + result = self.tmpl_serializer.serialize(self.data_multi, template=tmpl) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize(self): + expected_xml = ("" + '(2,3)') + result = self.tmpl_serializer.serialize(self.data, 'test') + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize_multi(self): + expected_xml = ("" + '(2,3)' + '(3,4)') + result = self.tmpl_serializer.serialize(self.data_multi, 'test') + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) diff --git a/nova/tests/api/openstack/test_zones.py b/nova/tests/api/openstack/test_zones.py index 869f13183..af762d3d6 100644 --- a/nova/tests/api/openstack/test_zones.py +++ b/nova/tests/api/openstack/test_zones.py @@ -14,15 +14,18 @@ # under the License. +import json + +from lxml import etree import stubout import webob -import json import nova.db from nova import context from nova import crypto from nova import flags from nova import test +from nova.api.openstack import xmlutil from nova.api.openstack import zones from nova.tests.api.openstack import fakes from nova.scheduler import api @@ -112,6 +115,18 @@ class ZonesTest(test.TestCase): self.assertEqual(res.status_int, 200) self.assertEqual(len(res_dict['zones']), 2) + def test_get_zone_list_scheduler_xml(self): + self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler) + req = webob.Request.blank('/v1.1/fake/zones.xml') + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, '{%s}zones' % xmlutil.XMLNS_V10) + self.assertEqual(len(res_tree), 2) + self.assertEqual(res_tree[0].tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree[1].tag, '{%s}zone' % xmlutil.XMLNS_V10) + def test_get_zone_list_db(self): self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty) self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db) @@ -123,6 +138,20 @@ class ZonesTest(test.TestCase): res_dict = json.loads(res.body) self.assertEqual(len(res_dict['zones']), 2) + def test_get_zone_list_db_xml(self): + self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty) + self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db) + req = webob.Request.blank('/v1.1/fake/zones.xml') + req.headers["Content-Type"] = "application/json" + res = req.get_response(fakes.wsgi_app()) + + self.assertEqual(res.status_int, 200) + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, '{%s}zones' % xmlutil.XMLNS_V10) + self.assertEqual(len(res_tree), 2) + self.assertEqual(res_tree[0].tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree[1].tag, '{%s}zone' % xmlutil.XMLNS_V10) + def test_get_zone_by_id(self): req = webob.Request.blank('/v1.1/fake/zones/1') req.headers["Content-Type"] = "application/json" @@ -134,6 +163,18 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['api_url'], 'http://example.com') self.assertFalse('password' in res_dict['zone']) + def test_get_zone_by_id_xml(self): + req = webob.Request.blank('/v1.1/fake/zones/1.xml') + req.headers["Content-Type"] = "application/json" + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree.get('id'), '1') + self.assertEqual(res_tree.get('api_url'), 'http://example.com') + self.assertEqual(res_tree.get('password'), None) + def test_zone_delete(self): req = webob.Request.blank('/v1.1/fake/zones/1') req.headers["Content-Type"] = "application/json" @@ -157,6 +198,23 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['api_url'], 'http://example.com') self.assertFalse('username' in res_dict['zone']) + def test_zone_create_xml(self): + body = dict(zone=dict(api_url='http://example.com', username='fred', + password='fubar')) + req = webob.Request.blank('/v1.1/fake/zones.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'POST' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + + self.assertEqual(res.status_int, 200) + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree.get('id'), '1') + self.assertEqual(res_tree.get('api_url'), 'http://example.com') + self.assertEqual(res_tree.get('username'), None) + def test_zone_update(self): body = dict(zone=dict(username='zeb', password='sneaky')) req = webob.Request.blank('/v1.1/fake/zones/1') @@ -172,6 +230,22 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['api_url'], 'http://example.com') self.assertFalse('username' in res_dict['zone']) + def test_zone_update_xml(self): + body = dict(zone=dict(username='zeb', password='sneaky')) + req = webob.Request.blank('/v1.1/fake/zones/1.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + + self.assertEqual(res.status_int, 200) + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree.get('id'), '1') + self.assertEqual(res_tree.get('api_url'), 'http://example.com') + self.assertEqual(res_tree.get('username'), None) + def test_zone_info(self): caps = ['cap1=a;b', 'cap2=c;d'] self.flags(zone_name='darksecret', zone_capabilities=caps) @@ -187,6 +261,28 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['cap1'], 'a;b') self.assertEqual(res_dict['zone']['cap2'], 'c;d') + def test_zone_info_xml(self): + caps = ['cap1=a;b', 'cap2=c;d'] + self.flags(zone_name='darksecret', zone_capabilities=caps) + self.stubs.Set(api, '_call_scheduler', zone_capabilities) + + body = dict(zone=dict(username='zeb', password='sneaky')) + req = webob.Request.blank('/v1.1/fake/zones/info.xml') + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree.get('name'), 'darksecret') + for elem in res_tree: + self.assertEqual(elem.tag in ('{%s}cap1' % xmlutil.XMLNS_V10, + '{%s}cap2' % xmlutil.XMLNS_V10), + True) + if elem.tag == '{%s}cap1' % xmlutil.XMLNS_V10: + self.assertEqual(elem.text, 'a;b') + elif elem.tag == '{%s}cap2' % xmlutil.XMLNS_V10: + self.assertEqual(elem.text, 'c;d') + def test_zone_select(self): key = 'c286696d887c9aa0611bbb3e2025a45a' self.flags(build_plan_encryption_key=key) @@ -220,3 +316,45 @@ class ZonesTest(test.TestCase): self.assertTrue(found) self.assertEqual(len(item), 2) self.assertTrue('weight' in item) + + def test_zone_select_xml(self): + key = 'c286696d887c9aa0611bbb3e2025a45a' + self.flags(build_plan_encryption_key=key) + self.stubs.Set(api, 'select', zone_select) + + req = webob.Request.blank('/v1.1/fake/zones/select.xml') + req.method = 'POST' + req.headers["Content-Type"] = "application/json" + # Select queries end up being JSON encoded twice. + # Once to a string and again as an HTTP POST Body + req.body = json.dumps(json.dumps({})) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + self.assertEqual(res.status_int, 200) + + self.assertEqual(res_tree.tag, '{%s}weights' % xmlutil.XMLNS_V10) + + for item in res_tree: + self.assertEqual(item.tag, '{%s}weight' % xmlutil.XMLNS_V10) + blob = None + weight = None + for chld in item: + if chld.tag.endswith('blob'): + blob = chld.text + elif chld.tag.endswith('weight'): + weight = chld.text + + decrypt = crypto.decryptor(FLAGS.build_plan_encryption_key) + secret_item = json.loads(decrypt(blob)) + found = False + for original_item in GLOBAL_BUILD_PLAN: + if original_item['name'] != secret_item['name']: + continue + found = True + for key in ('weight', 'ip', 'zone'): + self.assertEqual(secret_item[key], original_item[key]) + + self.assertTrue(found) + self.assertEqual(len(item), 2) + self.assertTrue(weight) -- cgit