diff options
| author | Jenkins <jenkins@review.openstack.org> | 2011-10-17 22:17:19 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2011-10-17 22:17:19 +0000 |
| commit | 821fae95d6aa86ffd14a4e48254da8ee7392c042 (patch) | |
| tree | 35e173df84e1b0fa296c997ca4401ad03fb84cbc /nova/tests | |
| parent | ed0c5731b70771e08e1ae75db0a0a0cf6e72c9e9 (diff) | |
| parent | 9a15c0d070db086111cbe5eff4f19dcb419b32bc (diff) | |
Merge "Add XML templates."
Diffstat (limited to 'nova/tests')
| -rw-r--r-- | nova/tests/api/openstack/extensions/foxinsocks.py | 12 | ||||
| -rw-r--r-- | nova/tests/api/openstack/fakes.py | 12 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_accounts.py | 52 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_consoles.py | 66 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_extensions.py | 46 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_limits.py | 4 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_users.py | 76 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_wsgi.py | 85 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_xmlutil.py | 763 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_zones.py | 140 |
10 files changed, 1210 insertions, 46 deletions
diff --git a/nova/tests/api/openstack/extensions/foxinsocks.py b/nova/tests/api/openstack/extensions/foxinsocks.py index 2d8313cf6..70556a28d 100644 --- a/nova/tests/api/openstack/extensions/foxinsocks.py +++ b/nova/tests/api/openstack/extensions/foxinsocks.py @@ -64,12 +64,10 @@ class Foxinsocks(object): def get_request_extensions(self): request_exts = [] - def _goose_handler(req, res): + def _goose_handler(req, res, body): #NOTE: This only handles JSON responses. # You can use content type header to test for XML. - data = json.loads(res.body) - data['flavor']['googoose'] = req.GET.get('chewing') - res.body = json.dumps(data) + body['flavor']['googoose'] = req.GET.get('chewing') return res req_ext1 = extensions.RequestExtension('GET', @@ -77,12 +75,10 @@ class Foxinsocks(object): _goose_handler) request_exts.append(req_ext1) - def _bands_handler(req, res): + def _bands_handler(req, res, body): #NOTE: This only handles JSON responses. # You can use content type header to test for XML. - data = json.loads(res.body) - data['big_bands'] = 'Pig Bands!' - res.body = json.dumps(data) + body['big_bands'] = 'Pig Bands!' return res req_ext2 = extensions.RequestExtension('GET', diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index cc0e244e1..e57a60d4b 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -30,9 +30,10 @@ from nova.api import openstack from nova.api import auth as api_auth from nova.api.openstack import auth from nova.api.openstack import extensions -from nova.api.openstack import versions from nova.api.openstack import limits from nova.api.openstack import urlmap +from nova.api.openstack import versions +from nova.api.openstack import wsgi as os_wsgi from nova.auth.manager import User, Project import nova.image.fake from nova.tests.glance import stubs as glance_stubs @@ -66,7 +67,8 @@ def fake_wsgi(self, req): return self.application -def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None): +def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None, + serialization=os_wsgi.LazySerializationMiddleware): if not inner_app11: inner_app11 = openstack.APIRouter() @@ -77,11 +79,13 @@ def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None): ctxt = context.RequestContext('fake', 'fake', auth_token=True) api11 = openstack.FaultWrapper(api_auth.InjectContext(ctxt, limits.RateLimitingMiddleware( - extensions.ExtensionMiddleware(inner_app11)))) + serialization( + extensions.ExtensionMiddleware(inner_app11))))) else: api11 = openstack.FaultWrapper(auth.AuthMiddleware( limits.RateLimitingMiddleware( - extensions.ExtensionMiddleware(inner_app11)))) + serialization( + extensions.ExtensionMiddleware(inner_app11))))) Auth = auth mapper = urlmap.URLMap() mapper['/v1.1'] = api11 diff --git a/nova/tests/api/openstack/test_accounts.py b/nova/tests/api/openstack/test_accounts.py index 125ab8ea0..ea96e1348 100644 --- a/nova/tests/api/openstack/test_accounts.py +++ b/nova/tests/api/openstack/test_accounts.py @@ -16,6 +16,7 @@ import json +from lxml import etree import webob from nova import test @@ -59,10 +60,21 @@ class AccountsTest(test.TestCase): res = req.get_response(fakes.wsgi_app()) res_dict = json.loads(res.body) + self.assertEqual(res.status_int, 200) self.assertEqual(res_dict['account']['id'], 'test1') self.assertEqual(res_dict['account']['name'], 'test1') self.assertEqual(res_dict['account']['manager'], 'id1') + + def test_get_account_xml(self): + req = webob.Request.blank('/v1.1/fake/accounts/test1.xml') + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + self.assertEqual(res.status_int, 200) + self.assertEqual('account', res_tree.tag) + self.assertEqual('test1', res_tree.get('id')) + self.assertEqual('test1', res_tree.get('name')) + self.assertEqual('id1', res_tree.get('manager')) def test_account_delete(self): req = webob.Request.blank('/v1.1/fake/accounts/test1') @@ -91,6 +103,27 @@ class AccountsTest(test.TestCase): fakes.FakeAuthManager.projects) self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3) + def test_account_create_xml(self): + body = dict(account=dict(description='test account', + manager='id1')) + req = webob.Request.blank('/v1.1/fake/accounts/newacct.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'account') + self.assertEqual(res_tree.get('id'), 'newacct') + self.assertEqual(res_tree.get('name'), 'newacct') + self.assertEqual(res_tree.get('description'), 'test account') + self.assertEqual(res_tree.get('manager'), 'id1') + self.assertTrue('newacct' in + fakes.FakeAuthManager.projects) + self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3) + def test_account_update(self): body = dict(account=dict(description='test account', manager='id2')) @@ -108,3 +141,22 @@ class AccountsTest(test.TestCase): self.assertEqual(res_dict['account']['description'], 'test account') self.assertEqual(res_dict['account']['manager'], 'id2') self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2) + + def test_account_update_xml(self): + body = dict(account=dict(description='test account', + manager='id2')) + req = webob.Request.blank('/v1.1/fake/accounts/test1.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'account') + self.assertEqual(res_tree.get('id'), 'test1') + self.assertEqual(res_tree.get('name'), 'test1') + self.assertEqual(res_tree.get('description'), 'test account') + self.assertEqual(res_tree.get('manager'), 'id2') + self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2) diff --git a/nova/tests/api/openstack/test_consoles.py b/nova/tests/api/openstack/test_consoles.py index 25414bc7d..75b55942b 100644 --- a/nova/tests/api/openstack/test_consoles.py +++ b/nova/tests/api/openstack/test_consoles.py @@ -18,6 +18,8 @@ import datetime import json + +from lxml import etree import webob from nova.api.openstack import consoles @@ -142,6 +144,30 @@ class ConsolesTest(test.TestCase): res_dict = json.loads(res.body) self.assertDictMatch(res_dict, expected) + def test_show_console_xml(self): + def fake_get_console(cons_self, context, instance_id, console_id): + self.assertEqual(instance_id, 10) + self.assertEqual(console_id, 20) + pool = dict(console_type='fake_type', + public_hostname='fake_hostname') + return dict(id=console_id, password='fake_password', + port='fake_port', pool=pool) + + self.stubs.Set(console.API, 'get_console', fake_get_console) + + req = webob.Request.blank('/v1.1/fake/servers/10/consoles/20.xml') + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 200) + + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, 'console') + self.assertEqual(res_tree.xpath('id')[0].text, '20') + self.assertEqual(res_tree.xpath('port')[0].text, 'fake_port') + self.assertEqual(res_tree.xpath('host')[0].text, 'fake_hostname') + self.assertEqual(res_tree.xpath('password')[0].text, 'fake_password') + self.assertEqual(res_tree.xpath('console_type')[0].text, + 'fake_type') + def test_show_console_unknown_console(self): def fake_get_console(cons_self, context, instance_id, console_id): raise exception.ConsoleNotFound(console_id=console_id) @@ -188,6 +214,46 @@ class ConsolesTest(test.TestCase): res_dict = json.loads(res.body) self.assertDictMatch(res_dict, expected) + def test_list_consoles_xml(self): + def fake_get_consoles(cons_self, context, instance_id): + self.assertEqual(instance_id, 10) + + pool1 = dict(console_type='fake_type', + public_hostname='fake_hostname') + cons1 = dict(id=10, password='fake_password', + port='fake_port', pool=pool1) + pool2 = dict(console_type='fake_type2', + public_hostname='fake_hostname2') + cons2 = dict(id=11, password='fake_password2', + port='fake_port2', pool=pool2) + return [cons1, cons2] + + expected = {'consoles': + [{'console': {'id': 10, 'console_type': 'fake_type'}}, + {'console': {'id': 11, 'console_type': 'fake_type2'}}]} + + self.stubs.Set(console.API, 'get_consoles', fake_get_consoles) + + req = webob.Request.blank('/v1.1/fake/servers/10/consoles.xml') + res = req.get_response(fakes.wsgi_app()) + self.assertEqual(res.status_int, 200) + + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, 'consoles') + self.assertEqual(len(res_tree), 2) + self.assertEqual(res_tree[0].tag, 'console') + self.assertEqual(res_tree[1].tag, 'console') + self.assertEqual(len(res_tree[0]), 1) + self.assertEqual(res_tree[0][0].tag, 'console') + self.assertEqual(len(res_tree[1]), 1) + self.assertEqual(res_tree[1][0].tag, 'console') + self.assertEqual(res_tree[0][0].xpath('id')[0].text, '10') + self.assertEqual(res_tree[1][0].xpath('id')[0].text, '11') + self.assertEqual(res_tree[0][0].xpath('console_type')[0].text, + 'fake_type') + self.assertEqual(res_tree[1][0].xpath('console_type')[0].text, + 'fake_type2') + def test_delete_console(self): def fake_get_console(cons_self, context, instance_id, console_id): self.assertEqual(instance_id, 10) diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py index e3fe0e878..92e74e545 100644 --- a/nova/tests/api/openstack/test_extensions.py +++ b/nova/tests/api/openstack/test_extensions.py @@ -22,6 +22,7 @@ from lxml import etree from nova import context from nova import test +from nova import wsgi as base_wsgi from nova.api import openstack from nova.api.openstack import extensions from nova.api.openstack import flavors @@ -111,8 +112,9 @@ class ExtensionControllerTest(test.TestCase): def test_list_extensions_json(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/extensions") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) # Make sure we have all the extensions. @@ -137,8 +139,9 @@ class ExtensionControllerTest(test.TestCase): def test_get_extension_json(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/extensions/FOXNSOX") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) data = json.loads(response.body) @@ -160,9 +163,10 @@ class ExtensionControllerTest(test.TestCase): def test_list_extensions_xml(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/extensions") request.accept = "application/xml" - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) print response.body @@ -187,9 +191,10 @@ class ExtensionControllerTest(test.TestCase): def test_get_extension_xml(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/extensions/FOXNSOX") request.accept = "application/xml" - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) xml = response.body print xml @@ -218,8 +223,9 @@ class ResourceExtensionTest(test.TestCase): manager = StubExtensionManager(None) app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app, manager) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/blah") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(404, response.status_int) def test_get_resources(self): @@ -228,8 +234,9 @@ class ResourceExtensionTest(test.TestCase): manager = StubExtensionManager(res_ext) app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app, manager) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/tweedles") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) self.assertEqual(response_body, response.body) @@ -239,8 +246,9 @@ class ResourceExtensionTest(test.TestCase): manager = StubExtensionManager(res_ext) app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app, manager) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/tweedles") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) self.assertEqual(response_body, response.body) @@ -263,12 +271,15 @@ class ExtensionManagerTest(test.TestCase): def test_get_resources(self): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/123/foxnsocks") - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) self.assertEqual(response_body, response.body) def test_invalid_extensions(self): + # Don't need the serialization middleware here because we're + # not testing any serialization app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) ext_mgr = ext_midware.ext_mgr @@ -287,11 +298,12 @@ class ActionExtensionTest(test.TestCase): def _send_server_action_request(self, url, body): app = openstack.APIRouter() ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank(url) request.method = 'POST' request.content_type = 'application/json' request.body = json.dumps(body) - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) return response def test_extended_action(self): @@ -328,11 +340,9 @@ class RequestExtensionTest(test.TestCase): def test_get_resources_with_stub_mgr(self): - def _req_handler(req, res): + def _req_handler(req, res, body): # only handle JSON responses - data = json.loads(res.body) - data['flavor']['googoose'] = req.GET.get('chewing') - res.body = json.dumps(data) + body['flavor']['googoose'] = req.GET.get('chewing') return res req_ext = extensions.RequestExtension('GET', @@ -340,22 +350,24 @@ class RequestExtensionTest(test.TestCase): _req_handler) manager = StubExtensionManager(None, None, req_ext) - app = fakes.wsgi_app() + app = fakes.wsgi_app(serialization=base_wsgi.Middleware) ext_midware = extensions.ExtensionMiddleware(app, manager) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/v1.1/123/flavors/1?chewing=bluegoo") request.environ['api.version'] = '1.1' - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) response_data = json.loads(response.body) self.assertEqual('bluegoo', response_data['flavor']['googoose']) def test_get_resources_with_mgr(self): - app = fakes.wsgi_app() + app = fakes.wsgi_app(serialization=base_wsgi.Middleware) ext_midware = extensions.ExtensionMiddleware(app) + ser_midware = wsgi.LazySerializationMiddleware(ext_midware) request = webob.Request.blank("/v1.1/123/flavors/1?chewing=newblue") request.environ['api.version'] = '1.1' - response = request.get_response(ext_midware) + response = request.get_response(ser_midware) self.assertEqual(200, response.status_int) response_data = json.loads(response.body) self.assertEqual('newblue', response_data['flavor']['googoose']) diff --git a/nova/tests/api/openstack/test_limits.py b/nova/tests/api/openstack/test_limits.py index 51a97f920..96e30f756 100644 --- a/nova/tests/api/openstack/test_limits.py +++ b/nova/tests/api/openstack/test_limits.py @@ -30,6 +30,7 @@ from xml.dom import minidom import nova.context from nova.api.openstack import limits from nova.api.openstack import views +from nova.api.openstack import wsgi from nova.api.openstack import xmlutil from nova import test @@ -80,7 +81,8 @@ class LimitsControllerTest(BaseLimitTestSuite): def setUp(self): """Run before each test.""" BaseLimitTestSuite.setUp(self) - self.controller = limits.create_resource() + self.controller = wsgi.LazySerializationMiddleware( + limits.create_resource()) self.maxDiff = None def _get_index_request(self, accept_header="application/json"): diff --git a/nova/tests/api/openstack/test_users.py b/nova/tests/api/openstack/test_users.py index c9af530cf..cc77d7d26 100644 --- a/nova/tests/api/openstack/test_users.py +++ b/nova/tests/api/openstack/test_users.py @@ -15,6 +15,7 @@ import json +from lxml import etree import webob from nova import test @@ -63,6 +64,19 @@ class UsersTest(test.TestCase): self.assertEqual(res.status_int, 200) self.assertEqual(len(res_dict['users']), 2) + def test_get_user_list_xml(self): + req = webob.Request.blank('/v1.1/fake/users.xml') + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'users') + self.assertEqual(len(res_tree), 2) + self.assertEqual(res_tree[0].tag, 'user') + self.assertEqual(res_tree[0].get('id'), 'id1') + self.assertEqual(res_tree[1].tag, 'user') + self.assertEqual(res_tree[1].get('id'), 'id2') + def test_get_user_by_id(self): req = webob.Request.blank('/v1.1/fake/users/id2') res = req.get_response(fakes.wsgi_app()) @@ -74,6 +88,18 @@ class UsersTest(test.TestCase): self.assertEqual(res_dict['user']['admin'], True) self.assertEqual(res.status_int, 200) + def test_get_user_by_id_xml(self): + req = webob.Request.blank('/v1.1/fake/users/id2.xml') + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'user') + self.assertEqual(res_tree.get('id'), 'id2') + self.assertEqual(res_tree.get('name'), 'guy2') + self.assertEqual(res_tree.get('secret'), 'secret2') + self.assertEqual(res_tree.get('admin'), 'True') + def test_user_delete(self): # Check the user exists req = webob.Request.blank('/v1.1/fake/users/id1') @@ -125,6 +151,35 @@ class UsersTest(test.TestCase): fakes.FakeAuthManager.auth_data]) self.assertEqual(len(fakes.FakeAuthManager.auth_data), 3) + def test_user_create_xml(self): + secret = utils.generate_password() + body = dict(user=dict(name='test_guy', + access='acc3', + secret=secret, + admin=True)) + req = webob.Request.blank('/v1.1/fake/users.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'POST' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + + # NOTE(justinsb): This is a questionable assertion in general + # fake sets id=name, but others might not... + self.assertEqual(res_tree.tag, 'user') + self.assertEqual(res_tree.get('id'), 'test_guy') + + self.assertEqual(res_tree.get('name'), 'test_guy') + self.assertEqual(res_tree.get('access'), 'acc3') + self.assertEqual(res_tree.get('secret'), secret) + self.assertEqual(res_tree.get('admin'), 'True') + self.assertTrue('test_guy' in [u.id for u in + fakes.FakeAuthManager.auth_data]) + self.assertEqual(len(fakes.FakeAuthManager.auth_data), 3) + def test_user_update(self): new_secret = utils.generate_password() body = dict(user=dict(name='guy2', @@ -144,3 +199,24 @@ class UsersTest(test.TestCase): self.assertEqual(res_dict['user']['access'], 'acc2') self.assertEqual(res_dict['user']['secret'], new_secret) self.assertEqual(res_dict['user']['admin'], True) + + def test_user_update_xml(self): + new_secret = utils.generate_password() + body = dict(user=dict(name='guy2', + access='acc2', + secret=new_secret)) + req = webob.Request.blank('/v1.1/fake/users/id2.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, 'user') + self.assertEqual(res_tree.get('id'), 'id2') + self.assertEqual(res_tree.get('name'), 'guy2') + self.assertEqual(res_tree.get('access'), 'acc2') + self.assertEqual(res_tree.get('secret'), new_secret) + self.assertEqual(res_tree.get('admin'), 'True') diff --git a/nova/tests/api/openstack/test_wsgi.py b/nova/tests/api/openstack/test_wsgi.py index 74b9ce853..5aea8275d 100644 --- a/nova/tests/api/openstack/test_wsgi.py +++ b/nova/tests/api/openstack/test_wsgi.py @@ -215,20 +215,23 @@ class RequestHeadersDeserializerTest(test.TestCase): self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'}) -class ResponseSerializerTest(test.TestCase): - def setUp(self): - class JSONSerializer(object): - def serialize(self, data, action='default'): - return 'pew_json' +class JSONSerializer(object): + def serialize(self, data, action='default'): + return 'pew_json' - class XMLSerializer(object): - def serialize(self, data, action='default'): - return 'pew_xml' - class HeadersSerializer(object): - def serialize(self, response, data, action): - response.status_int = 404 +class XMLSerializer(object): + def serialize(self, data, action='default'): + return 'pew_xml' + +class HeadersSerializer(object): + def serialize(self, response, data, action): + response.status_int = 404 + + +class ResponseSerializerTest(test.TestCase): + def setUp(self): self.body_serializers = { 'application/json': JSONSerializer(), 'application/xml': XMLSerializer(), @@ -253,7 +256,8 @@ class ResponseSerializerTest(test.TestCase): def test_serialize_response_json(self): for content_type in ('application/json', 'application/vnd.openstack.compute+json'): - response = self.serializer.serialize({}, content_type) + request = wsgi.Request.blank('/') + response = self.serializer.serialize(request, {}, content_type) self.assertEqual(response.headers['Content-Type'], content_type) self.assertEqual(response.body, 'pew_json') self.assertEqual(response.status_int, 404) @@ -261,21 +265,72 @@ class ResponseSerializerTest(test.TestCase): def test_serialize_response_xml(self): for content_type in ('application/xml', 'application/vnd.openstack.compute+xml'): - response = self.serializer.serialize({}, content_type) + request = wsgi.Request.blank('/') + response = self.serializer.serialize(request, {}, content_type) self.assertEqual(response.headers['Content-Type'], content_type) self.assertEqual(response.body, 'pew_xml') self.assertEqual(response.status_int, 404) def test_serialize_response_None(self): - response = self.serializer.serialize(None, 'application/json') + request = wsgi.Request.blank('/') + response = self.serializer.serialize(request, None, 'application/json') self.assertEqual(response.headers['Content-Type'], 'application/json') self.assertEqual(response.body, '') self.assertEqual(response.status_int, 404) def test_serialize_response_dict_to_unknown_content_type(self): + request = wsgi.Request.blank('/') self.assertRaises(exception.InvalidContentType, self.serializer.serialize, - {}, 'application/unknown') + request, {}, 'application/unknown') + + +class LazySerializationTest(test.TestCase): + def setUp(self): + self.body_serializers = { + 'application/json': JSONSerializer(), + 'application/xml': XMLSerializer(), + } + + self.serializer = wsgi.ResponseSerializer(self.body_serializers, + HeadersSerializer()) + + def tearDown(self): + pass + + def test_serialize_response_json(self): + for content_type in ('application/json', + 'application/vnd.openstack.compute+json'): + request = wsgi.Request.blank('/') + request.environ['nova.lazy_serialize'] = True + response = self.serializer.serialize(request, {}, content_type) + self.assertEqual(response.headers['Content-Type'], content_type) + self.assertEqual(response.status_int, 404) + body = json.loads(response.body) + self.assertEqual(body, {}) + serializer = request.environ['nova.serializer'] + self.assertEqual(serializer.serialize(body), 'pew_json') + + def test_serialize_response_xml(self): + for content_type in ('application/xml', + 'application/vnd.openstack.compute+xml'): + request = wsgi.Request.blank('/') + request.environ['nova.lazy_serialize'] = True + response = self.serializer.serialize(request, {}, content_type) + self.assertEqual(response.headers['Content-Type'], content_type) + self.assertEqual(response.status_int, 404) + body = json.loads(response.body) + self.assertEqual(body, {}) + serializer = request.environ['nova.serializer'] + self.assertEqual(serializer.serialize(body), 'pew_xml') + + def test_serialize_response_None(self): + request = wsgi.Request.blank('/') + request.environ['nova.lazy_serialize'] = True + response = self.serializer.serialize(request, None, 'application/json') + self.assertEqual(response.headers['Content-Type'], 'application/json') + self.assertEqual(response.status_int, 404) + self.assertEqual(response.body, '') class RequestDeserializerTest(test.TestCase): diff --git a/nova/tests/api/openstack/test_xmlutil.py b/nova/tests/api/openstack/test_xmlutil.py new file mode 100644 index 000000000..6d224967f --- /dev/null +++ b/nova/tests/api/openstack/test_xmlutil.py @@ -0,0 +1,763 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from lxml import etree + +from nova import test +from nova.api.openstack import xmlutil + + +class SelectorTest(test.TestCase): + obj_for_test = { + 'test': { + 'name': 'test', + 'values': [1, 2, 3], + 'attrs': { + 'foo': 1, + 'bar': 2, + 'baz': 3, + }, + }, + } + + def test_empty_selector(self): + sel = xmlutil.Selector() + self.assertEqual(len(sel.chain), 0) + self.assertEqual(sel(self.obj_for_test), self.obj_for_test) + + def test_dict_selector(self): + sel = xmlutil.Selector('test') + self.assertEqual(len(sel.chain), 1) + self.assertEqual(sel.chain[0], 'test') + self.assertEqual(sel(self.obj_for_test), + self.obj_for_test['test']) + + def test_datum_selector(self): + sel = xmlutil.Selector('test', 'name') + self.assertEqual(len(sel.chain), 2) + self.assertEqual(sel.chain[0], 'test') + self.assertEqual(sel.chain[1], 'name') + self.assertEqual(sel(self.obj_for_test), 'test') + + def test_list_selector(self): + sel = xmlutil.Selector('test', 'values', 0) + self.assertEqual(len(sel.chain), 3) + self.assertEqual(sel.chain[0], 'test') + self.assertEqual(sel.chain[1], 'values') + self.assertEqual(sel.chain[2], 0) + self.assertEqual(sel(self.obj_for_test), 1) + + def test_items_selector(self): + sel = xmlutil.Selector('test', 'attrs', xmlutil.get_items) + self.assertEqual(len(sel.chain), 3) + self.assertEqual(sel.chain[2], xmlutil.get_items) + for key, val in sel(self.obj_for_test): + self.assertEqual(self.obj_for_test['test']['attrs'][key], val) + + def test_missing_key_selector(self): + sel = xmlutil.Selector('test2', 'attrs') + self.assertEqual(sel(self.obj_for_test), None) + self.assertRaises(KeyError, sel, self.obj_for_test, True) + + def test_constant_selector(self): + sel = xmlutil.ConstantSelector('Foobar') + self.assertEqual(sel.value, 'Foobar') + self.assertEqual(sel(self.obj_for_test), 'Foobar') + + +class TemplateElementTest(test.TestCase): + def test_element_initial_attributes(self): + # Create a template element with some attributes + elem = xmlutil.TemplateElement('test', attrib=dict(a=1, b=2, c=3), + c=4, d=5, e=6) + + # Verify all the attributes are as expected + expected = dict(a=1, b=2, c=4, d=5, e=6) + for k, v in expected.items(): + self.assertEqual(elem.attrib[k].chain[0], v) + + def test_element_get_attributes(self): + expected = dict(a=1, b=2, c=3) + + # Create a template element with some attributes + elem = xmlutil.TemplateElement('test', attrib=expected) + + # Verify that get() retrieves the attributes + for k, v in expected.items(): + self.assertEqual(elem.get(k).chain[0], v) + + def test_element_set_attributes(self): + attrs = dict(a=None, b='foo', c=xmlutil.Selector('foo', 'bar')) + + # Create a bare template element with no attributes + elem = xmlutil.TemplateElement('test') + + # Set the attribute values + for k, v in attrs.items(): + elem.set(k, v) + + # Now verify what got set + self.assertEqual(len(elem.attrib['a'].chain), 1) + self.assertEqual(elem.attrib['a'].chain[0], 'a') + self.assertEqual(len(elem.attrib['b'].chain), 1) + self.assertEqual(elem.attrib['b'].chain[0], 'foo') + self.assertEqual(elem.attrib['c'], attrs['c']) + + def test_element_attribute_keys(self): + attrs = dict(a=1, b=2, c=3, d=4) + expected = set(attrs.keys()) + + # Create a template element with some attributes + elem = xmlutil.TemplateElement('test', attrib=attrs) + + # Now verify keys + self.assertEqual(set(elem.keys()), expected) + + def test_element_attribute_items(self): + expected = dict(a=xmlutil.Selector(1), + b=xmlutil.Selector(2), + c=xmlutil.Selector(3)) + keys = set(expected.keys()) + + # Create a template element with some attributes + elem = xmlutil.TemplateElement('test', attrib=expected) + + # Now verify items + for k, v in elem.items(): + self.assertEqual(expected[k], v) + keys.remove(k) + + # Did we visit all keys? + self.assertEqual(len(keys), 0) + + def test_element_selector_none(self): + # Create a template element with no selector + elem = xmlutil.TemplateElement('test') + + self.assertEqual(len(elem.selector.chain), 0) + + def test_element_selector_string(self): + # Create a template element with a string selector + elem = xmlutil.TemplateElement('test', selector='test') + + self.assertEqual(len(elem.selector.chain), 1) + self.assertEqual(elem.selector.chain[0], 'test') + + def test_element_selector(self): + sel = xmlutil.Selector('a', 'b') + + # Create a template element with an explicit selector + elem = xmlutil.TemplateElement('test', selector=sel) + + self.assertEqual(elem.selector, sel) + + def test_element_append_child(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Make sure the element starts off empty + self.assertEqual(len(elem), 0) + + # Create a child element + child = xmlutil.TemplateElement('child') + + # Append the child to the parent + elem.append(child) + + # Verify that the child was added + self.assertEqual(len(elem), 1) + self.assertEqual(elem[0], child) + self.assertEqual('child' in elem, True) + self.assertEqual(elem['child'], child) + + # Ensure that multiple children of the same name are rejected + child2 = xmlutil.TemplateElement('child') + self.assertRaises(KeyError, elem.append, child2) + + def test_element_extend_children(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Make sure the element starts off empty + self.assertEqual(len(elem), 0) + + # Create a few children + children = [ + xmlutil.TemplateElement('child1'), + xmlutil.TemplateElement('child2'), + xmlutil.TemplateElement('child3'), + ] + + # Extend the parent by those children + elem.extend(children) + + # Verify that the children were added + self.assertEqual(len(elem), 3) + for idx in range(len(elem)): + self.assertEqual(children[idx], elem[idx]) + self.assertEqual(children[idx].tag in elem, True) + self.assertEqual(elem[children[idx].tag], children[idx]) + + # Ensure that multiple children of the same name are rejected + children2 = [ + xmlutil.TemplateElement('child4'), + xmlutil.TemplateElement('child1'), + ] + self.assertRaises(KeyError, elem.extend, children2) + + # Also ensure that child4 was not added + self.assertEqual(len(elem), 3) + self.assertEqual(elem[-1].tag, 'child3') + + def test_element_insert_child(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Make sure the element starts off empty + self.assertEqual(len(elem), 0) + + # Create a few children + children = [ + xmlutil.TemplateElement('child1'), + xmlutil.TemplateElement('child2'), + xmlutil.TemplateElement('child3'), + ] + + # Extend the parent by those children + elem.extend(children) + + # Create a child to insert + child = xmlutil.TemplateElement('child4') + + # Insert it + elem.insert(1, child) + + # Ensure the child was inserted in the right place + self.assertEqual(len(elem), 4) + children.insert(1, child) + for idx in range(len(elem)): + self.assertEqual(children[idx], elem[idx]) + self.assertEqual(children[idx].tag in elem, True) + self.assertEqual(elem[children[idx].tag], children[idx]) + + # Ensure that multiple children of the same name are rejected + child2 = xmlutil.TemplateElement('child2') + self.assertRaises(KeyError, elem.insert, 2, child2) + + def test_element_remove_child(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Make sure the element starts off empty + self.assertEqual(len(elem), 0) + + # Create a few children + children = [ + xmlutil.TemplateElement('child1'), + xmlutil.TemplateElement('child2'), + xmlutil.TemplateElement('child3'), + ] + + # Extend the parent by those children + elem.extend(children) + + # Create a test child to remove + child = xmlutil.TemplateElement('child2') + + # Try to remove it + self.assertRaises(ValueError, elem.remove, child) + + # Ensure that no child was removed + self.assertEqual(len(elem), 3) + + # Now remove a legitimate child + elem.remove(children[1]) + + # Ensure that the child was removed + self.assertEqual(len(elem), 2) + self.assertEqual(elem[0], children[0]) + self.assertEqual(elem[1], children[2]) + self.assertEqual('child2' in elem, False) + + # Ensure the child cannot be retrieved by name + def get_key(elem, key): + return elem[key] + self.assertRaises(KeyError, get_key, elem, 'child2') + + def test_element_text(self): + # Create an element + elem = xmlutil.TemplateElement('test') + + # Ensure that it has no text + self.assertEqual(elem.text, None) + + # Try setting it to a string and ensure it becomes a selector + elem.text = 'test' + self.assertEqual(hasattr(elem.text, 'chain'), True) + self.assertEqual(len(elem.text.chain), 1) + self.assertEqual(elem.text.chain[0], 'test') + + # Try resetting the text to None + elem.text = None + self.assertEqual(elem.text, None) + + # Now make up a selector and try setting the text to that + sel = xmlutil.Selector() + elem.text = sel + self.assertEqual(elem.text, sel) + + # Finally, try deleting the text and see what happens + del elem.text + self.assertEqual(elem.text, None) + + def test_apply_attrs(self): + # Create a template element + attrs = dict(attr1=xmlutil.ConstantSelector(1), + attr2=xmlutil.ConstantSelector(2)) + tmpl_elem = xmlutil.TemplateElement('test', attrib=attrs) + + # Create an etree element + elem = etree.Element('test') + + # Apply the template to the element + tmpl_elem.apply(elem, None) + + # Now, verify the correct attributes were set + for k, v in elem.items(): + self.assertEqual(str(attrs[k].value), v) + + def test_apply_text(self): + # Create a template element + tmpl_elem = xmlutil.TemplateElement('test') + tmpl_elem.text = xmlutil.ConstantSelector(1) + + # Create an etree element + elem = etree.Element('test') + + # Apply the template to the element + tmpl_elem.apply(elem, None) + + # Now, verify the text was set + self.assertEqual(str(tmpl_elem.text.value), elem.text) + + def test__render(self): + attrs = dict(attr1=xmlutil.ConstantSelector(1), + attr2=xmlutil.ConstantSelector(2), + attr3=xmlutil.ConstantSelector(3)) + + # Create a master template element + master_elem = xmlutil.TemplateElement('test', attr1=attrs['attr1']) + + # Create a couple of slave template element + slave_elems = [ + xmlutil.TemplateElement('test', attr2=attrs['attr2']), + xmlutil.TemplateElement('test', attr3=attrs['attr3']), + ] + + # Try the render + elem = master_elem._render(None, None, slave_elems, None) + + # Verify the particulars of the render + self.assertEqual(elem.tag, 'test') + self.assertEqual(len(elem.nsmap), 0) + for k, v in elem.items(): + self.assertEqual(str(attrs[k].value), v) + + # Create a parent for the element to be rendered + parent = etree.Element('parent') + + # Try the render again... + elem = master_elem._render(parent, None, slave_elems, dict(a='foo')) + + # Verify the particulars of the render + self.assertEqual(len(parent), 1) + self.assertEqual(parent[0], elem) + self.assertEqual(len(elem.nsmap), 1) + self.assertEqual(elem.nsmap['a'], 'foo') + + def test_render(self): + # Create a template element + tmpl_elem = xmlutil.TemplateElement('test') + tmpl_elem.text = xmlutil.Selector() + + # Create the object we're going to render + obj = ['elem1', 'elem2', 'elem3', 'elem4'] + + # Try a render with no object + elems = tmpl_elem.render(None, None) + self.assertEqual(len(elems), 0) + + # Try a render with one object + elems = tmpl_elem.render(None, 'foo') + self.assertEqual(len(elems), 1) + self.assertEqual(elems[0][0].text, 'foo') + self.assertEqual(elems[0][1], 'foo') + + # Now, try rendering an object with multiple entries + parent = etree.Element('parent') + elems = tmpl_elem.render(parent, obj) + self.assertEqual(len(elems), 4) + + # Check the results + for idx in range(len(obj)): + self.assertEqual(elems[idx][0].text, obj[idx]) + self.assertEqual(elems[idx][1], obj[idx]) + + def test_subelement(self): + # Try the SubTemplateElement constructor + parent = xmlutil.SubTemplateElement(None, 'parent') + self.assertEqual(parent.tag, 'parent') + self.assertEqual(len(parent), 0) + + # Now try it with a parent element + child = xmlutil.SubTemplateElement(parent, 'child') + self.assertEqual(child.tag, 'child') + self.assertEqual(len(parent), 1) + self.assertEqual(parent[0], child) + + def test_wrap(self): + # These are strange methods, but they make things easier + elem = xmlutil.TemplateElement('test') + self.assertEqual(elem.unwrap(), elem) + self.assertEqual(elem.wrap().root, elem) + + def test_dyntag(self): + obj = ['a', 'b', 'c'] + + # Create a template element with a dynamic tag + tmpl_elem = xmlutil.TemplateElement(xmlutil.Selector()) + + # Try the render + parent = etree.Element('parent') + elems = tmpl_elem.render(parent, obj) + + # Verify the particulars of the render + self.assertEqual(len(elems), len(obj)) + for idx in range(len(obj)): + self.assertEqual(elems[idx][0].tag, obj[idx]) + + +class TemplateTest(test.TestCase): + def test_wrap(self): + # These are strange methods, but they make things easier + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.Template(elem) + self.assertEqual(tmpl.unwrap(), elem) + self.assertEqual(tmpl.wrap(), tmpl) + + def test__siblings(self): + # Set up a basic template + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.Template(elem) + + # Check that we get the right siblings + siblings = tmpl._siblings() + self.assertEqual(len(siblings), 1) + self.assertEqual(siblings[0], elem) + + def test__nsmap(self): + # Set up a basic template + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.Template(elem, nsmap=dict(a="foo")) + + # Check out that we get the right namespace dictionary + nsmap = tmpl._nsmap() + self.assertNotEqual(id(nsmap), id(tmpl.nsmap)) + self.assertEqual(len(nsmap), 1) + self.assertEqual(nsmap['a'], 'foo') + + def test_master_attach(self): + # Set up a master template + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.MasterTemplate(elem, 1) + + # Make sure it has a root but no slaves + self.assertEqual(tmpl.root, elem) + self.assertEqual(len(tmpl.slaves), 0) + + # Try to attach an invalid slave + bad_elem = xmlutil.TemplateElement('test2') + self.assertRaises(ValueError, tmpl.attach, bad_elem) + self.assertEqual(len(tmpl.slaves), 0) + + # Try to attach an invalid and a valid slave + good_elem = xmlutil.TemplateElement('test') + self.assertRaises(ValueError, tmpl.attach, good_elem, bad_elem) + self.assertEqual(len(tmpl.slaves), 0) + + # Try to attach an inapplicable template + class InapplicableTemplate(xmlutil.Template): + def apply(self, master): + return False + inapp_tmpl = InapplicableTemplate(good_elem) + tmpl.attach(inapp_tmpl) + self.assertEqual(len(tmpl.slaves), 0) + + # Now try attaching an applicable template + tmpl.attach(good_elem) + self.assertEqual(len(tmpl.slaves), 1) + self.assertEqual(tmpl.slaves[0].root, good_elem) + + def test_master_copy(self): + # Construct a master template + elem = xmlutil.TemplateElement('test') + tmpl = xmlutil.MasterTemplate(elem, 1, nsmap=dict(a='foo')) + + # Give it a slave + slave = xmlutil.TemplateElement('test') + tmpl.attach(slave) + + # Construct a copy + copy = tmpl.copy() + + # Check to see if we actually managed a copy + self.assertNotEqual(tmpl, copy) + self.assertEqual(tmpl.root, copy.root) + self.assertEqual(tmpl.version, copy.version) + self.assertEqual(id(tmpl.nsmap), id(copy.nsmap)) + self.assertNotEqual(id(tmpl.slaves), id(copy.slaves)) + self.assertEqual(len(tmpl.slaves), len(copy.slaves)) + self.assertEqual(tmpl.slaves[0], copy.slaves[0]) + + def test_slave_apply(self): + # Construct a master template + elem = xmlutil.TemplateElement('test') + master = xmlutil.MasterTemplate(elem, 3) + + # Construct a slave template with applicable minimum version + slave = xmlutil.SlaveTemplate(elem, 2) + self.assertEqual(slave.apply(master), True) + + # Construct a slave template with equal minimum version + slave = xmlutil.SlaveTemplate(elem, 3) + self.assertEqual(slave.apply(master), True) + + # Construct a slave template with inapplicable minimum version + slave = xmlutil.SlaveTemplate(elem, 4) + self.assertEqual(slave.apply(master), False) + + # Construct a slave template with applicable version range + slave = xmlutil.SlaveTemplate(elem, 2, 4) + self.assertEqual(slave.apply(master), True) + + # Construct a slave template with low version range + slave = xmlutil.SlaveTemplate(elem, 1, 2) + self.assertEqual(slave.apply(master), False) + + # Construct a slave template with high version range + slave = xmlutil.SlaveTemplate(elem, 4, 5) + self.assertEqual(slave.apply(master), False) + + # Construct a slave template with matching version range + slave = xmlutil.SlaveTemplate(elem, 3, 3) + self.assertEqual(slave.apply(master), True) + + def test__serialize(self): + # Our test object to serialize + obj = { + 'test': { + 'name': 'foobar', + 'values': [1, 2, 3, 4], + 'attrs': { + 'a': 1, + 'b': 2, + 'c': 3, + 'd': 4, + }, + 'image': { + 'name': 'image_foobar', + 'id': 42, + }, + }, + } + + # Set up our master template + root = xmlutil.TemplateElement('test', selector='test', + name='name') + value = xmlutil.SubTemplateElement(root, 'value', selector='values') + value.text = xmlutil.Selector() + attrs = xmlutil.SubTemplateElement(root, 'attrs', selector='attrs') + xmlutil.SubTemplateElement(attrs, 'attr', selector=xmlutil.get_items, + key=0, value=1) + master = xmlutil.MasterTemplate(root, 1, nsmap=dict(f='foo')) + + # Set up our slave template + root_slave = xmlutil.TemplateElement('test', selector='test') + image = xmlutil.SubTemplateElement(root_slave, 'image', + selector='image', id='id') + image.text = xmlutil.Selector('name') + slave = xmlutil.SlaveTemplate(root_slave, 1, nsmap=dict(b='bar')) + + # Attach the slave to the master... + master.attach(slave) + + # Try serializing our object + siblings = master._siblings() + nsmap = master._nsmap() + result = master._serialize(None, obj, siblings, nsmap) + + # Now we get to manually walk the element tree... + self.assertEqual(result.tag, 'test') + self.assertEqual(len(result.nsmap), 2) + self.assertEqual(result.nsmap['f'], 'foo') + self.assertEqual(result.nsmap['b'], 'bar') + self.assertEqual(result.get('name'), obj['test']['name']) + for idx, val in enumerate(obj['test']['values']): + self.assertEqual(result[idx].tag, 'value') + self.assertEqual(result[idx].text, str(val)) + idx += 1 + self.assertEqual(result[idx].tag, 'attrs') + for attr in result[idx]: + self.assertEqual(attr.tag, 'attr') + self.assertEqual(attr.get('value'), + str(obj['test']['attrs'][attr.get('key')])) + idx += 1 + self.assertEqual(result[idx].tag, 'image') + self.assertEqual(result[idx].get('id'), + str(obj['test']['image']['id'])) + self.assertEqual(result[idx].text, obj['test']['image']['name']) + + +class MasterTemplateBuilder(xmlutil.TemplateBuilder): + def construct(self): + elem = xmlutil.TemplateElement('test') + return xmlutil.MasterTemplate(elem, 1) + + +class SlaveTemplateBuilder(xmlutil.TemplateBuilder): + def construct(self): + elem = xmlutil.TemplateElement('test') + return xmlutil.SlaveTemplate(elem, 1) + + +class TemplateBuilderTest(test.TestCase): + def test_master_template_builder(self): + # Make sure the template hasn't been built yet + self.assertEqual(MasterTemplateBuilder._tmpl, None) + + # Now, construct the template + tmpl1 = MasterTemplateBuilder() + + # Make sure that there is a template cached... + self.assertNotEqual(MasterTemplateBuilder._tmpl, None) + + # Make sure it wasn't what was returned... + self.assertNotEqual(MasterTemplateBuilder._tmpl, tmpl1) + + # Make sure it doesn't get rebuilt + cached = MasterTemplateBuilder._tmpl + tmpl2 = MasterTemplateBuilder() + self.assertEqual(MasterTemplateBuilder._tmpl, cached) + + # Make sure we're always getting fresh copies + self.assertNotEqual(tmpl1, tmpl2) + + # Make sure we can override the copying behavior + tmpl3 = MasterTemplateBuilder(False) + self.assertEqual(MasterTemplateBuilder._tmpl, tmpl3) + + def test_slave_template_builder(self): + # Make sure the template hasn't been built yet + self.assertEqual(SlaveTemplateBuilder._tmpl, None) + + # Now, construct the template + tmpl1 = SlaveTemplateBuilder() + + # Make sure there is a template cached... + self.assertNotEqual(SlaveTemplateBuilder._tmpl, None) + + # Make sure it was what was returned... + self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1) + + # Make sure it doesn't get rebuilt + tmpl2 = SlaveTemplateBuilder() + self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1) + + # Make sure we're always getting the cached copy + self.assertEqual(tmpl1, tmpl2) + + +class SerializerTest(xmlutil.XMLTemplateSerializer): + def test(self): + root = xmlutil.TemplateElement('servers') + a = xmlutil.SubTemplateElement(root, 'a', selector='servers') + a.text = xmlutil.Selector('a') + return xmlutil.MasterTemplate(root, 1, nsmap={None: "asdf"}) + + +class XMLTemplateSerializerTest(test.TestCase): + def setUp(self): + self.tmpl_serializer = SerializerTest() + self.data = dict(servers=dict(a=(2, 3))) + self.data_multi = dict(servers=[dict(a=(2, 3)), dict(a=(3, 4))]) + super(XMLTemplateSerializerTest, self).setUp() + + def test_get_template(self): + # First, check what happens when we fall back on the default + # option + self.assertEqual(self.tmpl_serializer.get_template(), None) + self.assertEqual(self.tmpl_serializer.get_template('nosuch'), None) + + # Now, check that we get back a template + tmpl = self.tmpl_serializer.get_template('test') + self.assertNotEqual(tmpl, None) + self.assertEqual(tmpl.root.tag, 'servers') + + def test_serialize_default(self): + expected_xml = '<servers><a>(2,3)</a></servers>' + result = self.tmpl_serializer.serialize(self.data) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize_multi_default(self): + expected_xml = ('<servers><server><a>(2,3)</a></server>' + '<server><a>(3,4)</a></server></servers>') + result = self.tmpl_serializer.serialize(self.data_multi) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize_explicit(self): + expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>" + '<serversxmlns="asdf"><a>(2,3)</a></servers>') + tmpl = self.tmpl_serializer.get_template('test') + result = self.tmpl_serializer.serialize(self.data, template=tmpl) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize_multi_explicit(self): + expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>" + '<serversxmlns="asdf"><a>(2,3)</a>' + '<a>(3,4)</a></servers>') + tmpl = self.tmpl_serializer.get_template('test') + result = self.tmpl_serializer.serialize(self.data_multi, template=tmpl) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize(self): + expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>" + '<serversxmlns="asdf"><a>(2,3)</a></servers>') + result = self.tmpl_serializer.serialize(self.data, 'test') + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) + + def test_serialize_multi(self): + expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>" + '<serversxmlns="asdf"><a>(2,3)</a>' + '<a>(3,4)</a></servers>') + result = self.tmpl_serializer.serialize(self.data_multi, 'test') + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(result, expected_xml) diff --git a/nova/tests/api/openstack/test_zones.py b/nova/tests/api/openstack/test_zones.py index 869f13183..af762d3d6 100644 --- a/nova/tests/api/openstack/test_zones.py +++ b/nova/tests/api/openstack/test_zones.py @@ -14,15 +14,18 @@ # under the License. +import json + +from lxml import etree import stubout import webob -import json import nova.db from nova import context from nova import crypto from nova import flags from nova import test +from nova.api.openstack import xmlutil from nova.api.openstack import zones from nova.tests.api.openstack import fakes from nova.scheduler import api @@ -112,6 +115,18 @@ class ZonesTest(test.TestCase): self.assertEqual(res.status_int, 200) self.assertEqual(len(res_dict['zones']), 2) + def test_get_zone_list_scheduler_xml(self): + self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler) + req = webob.Request.blank('/v1.1/fake/zones.xml') + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, '{%s}zones' % xmlutil.XMLNS_V10) + self.assertEqual(len(res_tree), 2) + self.assertEqual(res_tree[0].tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree[1].tag, '{%s}zone' % xmlutil.XMLNS_V10) + def test_get_zone_list_db(self): self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty) self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db) @@ -123,6 +138,20 @@ class ZonesTest(test.TestCase): res_dict = json.loads(res.body) self.assertEqual(len(res_dict['zones']), 2) + def test_get_zone_list_db_xml(self): + self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty) + self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db) + req = webob.Request.blank('/v1.1/fake/zones.xml') + req.headers["Content-Type"] = "application/json" + res = req.get_response(fakes.wsgi_app()) + + self.assertEqual(res.status_int, 200) + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, '{%s}zones' % xmlutil.XMLNS_V10) + self.assertEqual(len(res_tree), 2) + self.assertEqual(res_tree[0].tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree[1].tag, '{%s}zone' % xmlutil.XMLNS_V10) + def test_get_zone_by_id(self): req = webob.Request.blank('/v1.1/fake/zones/1') req.headers["Content-Type"] = "application/json" @@ -134,6 +163,18 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['api_url'], 'http://example.com') self.assertFalse('password' in res_dict['zone']) + def test_get_zone_by_id_xml(self): + req = webob.Request.blank('/v1.1/fake/zones/1.xml') + req.headers["Content-Type"] = "application/json" + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree.get('id'), '1') + self.assertEqual(res_tree.get('api_url'), 'http://example.com') + self.assertEqual(res_tree.get('password'), None) + def test_zone_delete(self): req = webob.Request.blank('/v1.1/fake/zones/1') req.headers["Content-Type"] = "application/json" @@ -157,6 +198,23 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['api_url'], 'http://example.com') self.assertFalse('username' in res_dict['zone']) + def test_zone_create_xml(self): + body = dict(zone=dict(api_url='http://example.com', username='fred', + password='fubar')) + req = webob.Request.blank('/v1.1/fake/zones.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'POST' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + + self.assertEqual(res.status_int, 200) + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree.get('id'), '1') + self.assertEqual(res_tree.get('api_url'), 'http://example.com') + self.assertEqual(res_tree.get('username'), None) + def test_zone_update(self): body = dict(zone=dict(username='zeb', password='sneaky')) req = webob.Request.blank('/v1.1/fake/zones/1') @@ -172,6 +230,22 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['api_url'], 'http://example.com') self.assertFalse('username' in res_dict['zone']) + def test_zone_update_xml(self): + body = dict(zone=dict(username='zeb', password='sneaky')) + req = webob.Request.blank('/v1.1/fake/zones/1.xml') + req.headers["Content-Type"] = "application/json" + req.method = 'PUT' + req.body = json.dumps(body) + + res = req.get_response(fakes.wsgi_app()) + + self.assertEqual(res.status_int, 200) + res_tree = etree.fromstring(res.body) + self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree.get('id'), '1') + self.assertEqual(res_tree.get('api_url'), 'http://example.com') + self.assertEqual(res_tree.get('username'), None) + def test_zone_info(self): caps = ['cap1=a;b', 'cap2=c;d'] self.flags(zone_name='darksecret', zone_capabilities=caps) @@ -187,6 +261,28 @@ class ZonesTest(test.TestCase): self.assertEqual(res_dict['zone']['cap1'], 'a;b') self.assertEqual(res_dict['zone']['cap2'], 'c;d') + def test_zone_info_xml(self): + caps = ['cap1=a;b', 'cap2=c;d'] + self.flags(zone_name='darksecret', zone_capabilities=caps) + self.stubs.Set(api, '_call_scheduler', zone_capabilities) + + body = dict(zone=dict(username='zeb', password='sneaky')) + req = webob.Request.blank('/v1.1/fake/zones/info.xml') + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + self.assertEqual(res.status_int, 200) + self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10) + self.assertEqual(res_tree.get('name'), 'darksecret') + for elem in res_tree: + self.assertEqual(elem.tag in ('{%s}cap1' % xmlutil.XMLNS_V10, + '{%s}cap2' % xmlutil.XMLNS_V10), + True) + if elem.tag == '{%s}cap1' % xmlutil.XMLNS_V10: + self.assertEqual(elem.text, 'a;b') + elif elem.tag == '{%s}cap2' % xmlutil.XMLNS_V10: + self.assertEqual(elem.text, 'c;d') + def test_zone_select(self): key = 'c286696d887c9aa0611bbb3e2025a45a' self.flags(build_plan_encryption_key=key) @@ -220,3 +316,45 @@ class ZonesTest(test.TestCase): self.assertTrue(found) self.assertEqual(len(item), 2) self.assertTrue('weight' in item) + + def test_zone_select_xml(self): + key = 'c286696d887c9aa0611bbb3e2025a45a' + self.flags(build_plan_encryption_key=key) + self.stubs.Set(api, 'select', zone_select) + + req = webob.Request.blank('/v1.1/fake/zones/select.xml') + req.method = 'POST' + req.headers["Content-Type"] = "application/json" + # Select queries end up being JSON encoded twice. + # Once to a string and again as an HTTP POST Body + req.body = json.dumps(json.dumps({})) + + res = req.get_response(fakes.wsgi_app()) + res_tree = etree.fromstring(res.body) + self.assertEqual(res.status_int, 200) + + self.assertEqual(res_tree.tag, '{%s}weights' % xmlutil.XMLNS_V10) + + for item in res_tree: + self.assertEqual(item.tag, '{%s}weight' % xmlutil.XMLNS_V10) + blob = None + weight = None + for chld in item: + if chld.tag.endswith('blob'): + blob = chld.text + elif chld.tag.endswith('weight'): + weight = chld.text + + decrypt = crypto.decryptor(FLAGS.build_plan_encryption_key) + secret_item = json.loads(decrypt(blob)) + found = False + for original_item in GLOBAL_BUILD_PLAN: + if original_item['name'] != secret_item['name']: + continue + found = True + for key in ('weight', 'ip', 'zone'): + self.assertEqual(secret_item[key], original_item[key]) + + self.assertTrue(found) + self.assertEqual(len(item), 2) + self.assertTrue(weight) |
