summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2011-10-17 22:17:19 +0000
committerGerrit Code Review <review@openstack.org>2011-10-17 22:17:19 +0000
commit821fae95d6aa86ffd14a4e48254da8ee7392c042 (patch)
tree35e173df84e1b0fa296c997ca4401ad03fb84cbc /nova/tests
parented0c5731b70771e08e1ae75db0a0a0cf6e72c9e9 (diff)
parent9a15c0d070db086111cbe5eff4f19dcb419b32bc (diff)
Merge "Add XML templates."
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/openstack/extensions/foxinsocks.py12
-rw-r--r--nova/tests/api/openstack/fakes.py12
-rw-r--r--nova/tests/api/openstack/test_accounts.py52
-rw-r--r--nova/tests/api/openstack/test_consoles.py66
-rw-r--r--nova/tests/api/openstack/test_extensions.py46
-rw-r--r--nova/tests/api/openstack/test_limits.py4
-rw-r--r--nova/tests/api/openstack/test_users.py76
-rw-r--r--nova/tests/api/openstack/test_wsgi.py85
-rw-r--r--nova/tests/api/openstack/test_xmlutil.py763
-rw-r--r--nova/tests/api/openstack/test_zones.py140
10 files changed, 1210 insertions, 46 deletions
diff --git a/nova/tests/api/openstack/extensions/foxinsocks.py b/nova/tests/api/openstack/extensions/foxinsocks.py
index 2d8313cf6..70556a28d 100644
--- a/nova/tests/api/openstack/extensions/foxinsocks.py
+++ b/nova/tests/api/openstack/extensions/foxinsocks.py
@@ -64,12 +64,10 @@ class Foxinsocks(object):
def get_request_extensions(self):
request_exts = []
- def _goose_handler(req, res):
+ def _goose_handler(req, res, body):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
- data = json.loads(res.body)
- data['flavor']['googoose'] = req.GET.get('chewing')
- res.body = json.dumps(data)
+ body['flavor']['googoose'] = req.GET.get('chewing')
return res
req_ext1 = extensions.RequestExtension('GET',
@@ -77,12 +75,10 @@ class Foxinsocks(object):
_goose_handler)
request_exts.append(req_ext1)
- def _bands_handler(req, res):
+ def _bands_handler(req, res, body):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
- data = json.loads(res.body)
- data['big_bands'] = 'Pig Bands!'
- res.body = json.dumps(data)
+ body['big_bands'] = 'Pig Bands!'
return res
req_ext2 = extensions.RequestExtension('GET',
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index cc0e244e1..e57a60d4b 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -30,9 +30,10 @@ from nova.api import openstack
from nova.api import auth as api_auth
from nova.api.openstack import auth
from nova.api.openstack import extensions
-from nova.api.openstack import versions
from nova.api.openstack import limits
from nova.api.openstack import urlmap
+from nova.api.openstack import versions
+from nova.api.openstack import wsgi as os_wsgi
from nova.auth.manager import User, Project
import nova.image.fake
from nova.tests.glance import stubs as glance_stubs
@@ -66,7 +67,8 @@ def fake_wsgi(self, req):
return self.application
-def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None):
+def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None,
+ serialization=os_wsgi.LazySerializationMiddleware):
if not inner_app11:
inner_app11 = openstack.APIRouter()
@@ -77,11 +79,13 @@ def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None):
ctxt = context.RequestContext('fake', 'fake', auth_token=True)
api11 = openstack.FaultWrapper(api_auth.InjectContext(ctxt,
limits.RateLimitingMiddleware(
- extensions.ExtensionMiddleware(inner_app11))))
+ serialization(
+ extensions.ExtensionMiddleware(inner_app11)))))
else:
api11 = openstack.FaultWrapper(auth.AuthMiddleware(
limits.RateLimitingMiddleware(
- extensions.ExtensionMiddleware(inner_app11))))
+ serialization(
+ extensions.ExtensionMiddleware(inner_app11)))))
Auth = auth
mapper = urlmap.URLMap()
mapper['/v1.1'] = api11
diff --git a/nova/tests/api/openstack/test_accounts.py b/nova/tests/api/openstack/test_accounts.py
index 125ab8ea0..ea96e1348 100644
--- a/nova/tests/api/openstack/test_accounts.py
+++ b/nova/tests/api/openstack/test_accounts.py
@@ -16,6 +16,7 @@
import json
+from lxml import etree
import webob
from nova import test
@@ -59,10 +60,21 @@ class AccountsTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
+ self.assertEqual(res.status_int, 200)
self.assertEqual(res_dict['account']['id'], 'test1')
self.assertEqual(res_dict['account']['name'], 'test1')
self.assertEqual(res_dict['account']['manager'], 'id1')
+
+ def test_get_account_xml(self):
+ req = webob.Request.blank('/v1.1/fake/accounts/test1.xml')
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
self.assertEqual(res.status_int, 200)
+ self.assertEqual('account', res_tree.tag)
+ self.assertEqual('test1', res_tree.get('id'))
+ self.assertEqual('test1', res_tree.get('name'))
+ self.assertEqual('id1', res_tree.get('manager'))
def test_account_delete(self):
req = webob.Request.blank('/v1.1/fake/accounts/test1')
@@ -91,6 +103,27 @@ class AccountsTest(test.TestCase):
fakes.FakeAuthManager.projects)
self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3)
+ def test_account_create_xml(self):
+ body = dict(account=dict(description='test account',
+ manager='id1'))
+ req = webob.Request.blank('/v1.1/fake/accounts/newacct.xml')
+ req.headers["Content-Type"] = "application/json"
+ req.method = 'PUT'
+ req.body = json.dumps(body)
+
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_tree.tag, 'account')
+ self.assertEqual(res_tree.get('id'), 'newacct')
+ self.assertEqual(res_tree.get('name'), 'newacct')
+ self.assertEqual(res_tree.get('description'), 'test account')
+ self.assertEqual(res_tree.get('manager'), 'id1')
+ self.assertTrue('newacct' in
+ fakes.FakeAuthManager.projects)
+ self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3)
+
def test_account_update(self):
body = dict(account=dict(description='test account',
manager='id2'))
@@ -108,3 +141,22 @@ class AccountsTest(test.TestCase):
self.assertEqual(res_dict['account']['description'], 'test account')
self.assertEqual(res_dict['account']['manager'], 'id2')
self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2)
+
+ def test_account_update_xml(self):
+ body = dict(account=dict(description='test account',
+ manager='id2'))
+ req = webob.Request.blank('/v1.1/fake/accounts/test1.xml')
+ req.headers["Content-Type"] = "application/json"
+ req.method = 'PUT'
+ req.body = json.dumps(body)
+
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_tree.tag, 'account')
+ self.assertEqual(res_tree.get('id'), 'test1')
+ self.assertEqual(res_tree.get('name'), 'test1')
+ self.assertEqual(res_tree.get('description'), 'test account')
+ self.assertEqual(res_tree.get('manager'), 'id2')
+ self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2)
diff --git a/nova/tests/api/openstack/test_consoles.py b/nova/tests/api/openstack/test_consoles.py
index 25414bc7d..75b55942b 100644
--- a/nova/tests/api/openstack/test_consoles.py
+++ b/nova/tests/api/openstack/test_consoles.py
@@ -18,6 +18,8 @@
import datetime
import json
+
+from lxml import etree
import webob
from nova.api.openstack import consoles
@@ -142,6 +144,30 @@ class ConsolesTest(test.TestCase):
res_dict = json.loads(res.body)
self.assertDictMatch(res_dict, expected)
+ def test_show_console_xml(self):
+ def fake_get_console(cons_self, context, instance_id, console_id):
+ self.assertEqual(instance_id, 10)
+ self.assertEqual(console_id, 20)
+ pool = dict(console_type='fake_type',
+ public_hostname='fake_hostname')
+ return dict(id=console_id, password='fake_password',
+ port='fake_port', pool=pool)
+
+ self.stubs.Set(console.API, 'get_console', fake_get_console)
+
+ req = webob.Request.blank('/v1.1/fake/servers/10/consoles/20.xml')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+
+ res_tree = etree.fromstring(res.body)
+ self.assertEqual(res_tree.tag, 'console')
+ self.assertEqual(res_tree.xpath('id')[0].text, '20')
+ self.assertEqual(res_tree.xpath('port')[0].text, 'fake_port')
+ self.assertEqual(res_tree.xpath('host')[0].text, 'fake_hostname')
+ self.assertEqual(res_tree.xpath('password')[0].text, 'fake_password')
+ self.assertEqual(res_tree.xpath('console_type')[0].text,
+ 'fake_type')
+
def test_show_console_unknown_console(self):
def fake_get_console(cons_self, context, instance_id, console_id):
raise exception.ConsoleNotFound(console_id=console_id)
@@ -188,6 +214,46 @@ class ConsolesTest(test.TestCase):
res_dict = json.loads(res.body)
self.assertDictMatch(res_dict, expected)
+ def test_list_consoles_xml(self):
+ def fake_get_consoles(cons_self, context, instance_id):
+ self.assertEqual(instance_id, 10)
+
+ pool1 = dict(console_type='fake_type',
+ public_hostname='fake_hostname')
+ cons1 = dict(id=10, password='fake_password',
+ port='fake_port', pool=pool1)
+ pool2 = dict(console_type='fake_type2',
+ public_hostname='fake_hostname2')
+ cons2 = dict(id=11, password='fake_password2',
+ port='fake_port2', pool=pool2)
+ return [cons1, cons2]
+
+ expected = {'consoles':
+ [{'console': {'id': 10, 'console_type': 'fake_type'}},
+ {'console': {'id': 11, 'console_type': 'fake_type2'}}]}
+
+ self.stubs.Set(console.API, 'get_consoles', fake_get_consoles)
+
+ req = webob.Request.blank('/v1.1/fake/servers/10/consoles.xml')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+
+ res_tree = etree.fromstring(res.body)
+ self.assertEqual(res_tree.tag, 'consoles')
+ self.assertEqual(len(res_tree), 2)
+ self.assertEqual(res_tree[0].tag, 'console')
+ self.assertEqual(res_tree[1].tag, 'console')
+ self.assertEqual(len(res_tree[0]), 1)
+ self.assertEqual(res_tree[0][0].tag, 'console')
+ self.assertEqual(len(res_tree[1]), 1)
+ self.assertEqual(res_tree[1][0].tag, 'console')
+ self.assertEqual(res_tree[0][0].xpath('id')[0].text, '10')
+ self.assertEqual(res_tree[1][0].xpath('id')[0].text, '11')
+ self.assertEqual(res_tree[0][0].xpath('console_type')[0].text,
+ 'fake_type')
+ self.assertEqual(res_tree[1][0].xpath('console_type')[0].text,
+ 'fake_type2')
+
def test_delete_console(self):
def fake_get_console(cons_self, context, instance_id, console_id):
self.assertEqual(instance_id, 10)
diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py
index e3fe0e878..92e74e545 100644
--- a/nova/tests/api/openstack/test_extensions.py
+++ b/nova/tests/api/openstack/test_extensions.py
@@ -22,6 +22,7 @@ from lxml import etree
from nova import context
from nova import test
+from nova import wsgi as base_wsgi
from nova.api import openstack
from nova.api.openstack import extensions
from nova.api.openstack import flavors
@@ -111,8 +112,9 @@ class ExtensionControllerTest(test.TestCase):
def test_list_extensions_json(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/extensions")
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
# Make sure we have all the extensions.
@@ -137,8 +139,9 @@ class ExtensionControllerTest(test.TestCase):
def test_get_extension_json(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/extensions/FOXNSOX")
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
data = json.loads(response.body)
@@ -160,9 +163,10 @@ class ExtensionControllerTest(test.TestCase):
def test_list_extensions_xml(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/extensions")
request.accept = "application/xml"
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
print response.body
@@ -187,9 +191,10 @@ class ExtensionControllerTest(test.TestCase):
def test_get_extension_xml(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/extensions/FOXNSOX")
request.accept = "application/xml"
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
xml = response.body
print xml
@@ -218,8 +223,9 @@ class ResourceExtensionTest(test.TestCase):
manager = StubExtensionManager(None)
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app, manager)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/blah")
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(404, response.status_int)
def test_get_resources(self):
@@ -228,8 +234,9 @@ class ResourceExtensionTest(test.TestCase):
manager = StubExtensionManager(res_ext)
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app, manager)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/tweedles")
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
self.assertEqual(response_body, response.body)
@@ -239,8 +246,9 @@ class ResourceExtensionTest(test.TestCase):
manager = StubExtensionManager(res_ext)
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app, manager)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/tweedles")
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
self.assertEqual(response_body, response.body)
@@ -263,12 +271,15 @@ class ExtensionManagerTest(test.TestCase):
def test_get_resources(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/foxnsocks")
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
self.assertEqual(response_body, response.body)
def test_invalid_extensions(self):
+ # Don't need the serialization middleware here because we're
+ # not testing any serialization
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
ext_mgr = ext_midware.ext_mgr
@@ -287,11 +298,12 @@ class ActionExtensionTest(test.TestCase):
def _send_server_action_request(self, url, body):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank(url)
request.method = 'POST'
request.content_type = 'application/json'
request.body = json.dumps(body)
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
return response
def test_extended_action(self):
@@ -328,11 +340,9 @@ class RequestExtensionTest(test.TestCase):
def test_get_resources_with_stub_mgr(self):
- def _req_handler(req, res):
+ def _req_handler(req, res, body):
# only handle JSON responses
- data = json.loads(res.body)
- data['flavor']['googoose'] = req.GET.get('chewing')
- res.body = json.dumps(data)
+ body['flavor']['googoose'] = req.GET.get('chewing')
return res
req_ext = extensions.RequestExtension('GET',
@@ -340,22 +350,24 @@ class RequestExtensionTest(test.TestCase):
_req_handler)
manager = StubExtensionManager(None, None, req_ext)
- app = fakes.wsgi_app()
+ app = fakes.wsgi_app(serialization=base_wsgi.Middleware)
ext_midware = extensions.ExtensionMiddleware(app, manager)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/v1.1/123/flavors/1?chewing=bluegoo")
request.environ['api.version'] = '1.1'
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
response_data = json.loads(response.body)
self.assertEqual('bluegoo', response_data['flavor']['googoose'])
def test_get_resources_with_mgr(self):
- app = fakes.wsgi_app()
+ app = fakes.wsgi_app(serialization=base_wsgi.Middleware)
ext_midware = extensions.ExtensionMiddleware(app)
+ ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/v1.1/123/flavors/1?chewing=newblue")
request.environ['api.version'] = '1.1'
- response = request.get_response(ext_midware)
+ response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
response_data = json.loads(response.body)
self.assertEqual('newblue', response_data['flavor']['googoose'])
diff --git a/nova/tests/api/openstack/test_limits.py b/nova/tests/api/openstack/test_limits.py
index 51a97f920..96e30f756 100644
--- a/nova/tests/api/openstack/test_limits.py
+++ b/nova/tests/api/openstack/test_limits.py
@@ -30,6 +30,7 @@ from xml.dom import minidom
import nova.context
from nova.api.openstack import limits
from nova.api.openstack import views
+from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
from nova import test
@@ -80,7 +81,8 @@ class LimitsControllerTest(BaseLimitTestSuite):
def setUp(self):
"""Run before each test."""
BaseLimitTestSuite.setUp(self)
- self.controller = limits.create_resource()
+ self.controller = wsgi.LazySerializationMiddleware(
+ limits.create_resource())
self.maxDiff = None
def _get_index_request(self, accept_header="application/json"):
diff --git a/nova/tests/api/openstack/test_users.py b/nova/tests/api/openstack/test_users.py
index c9af530cf..cc77d7d26 100644
--- a/nova/tests/api/openstack/test_users.py
+++ b/nova/tests/api/openstack/test_users.py
@@ -15,6 +15,7 @@
import json
+from lxml import etree
import webob
from nova import test
@@ -63,6 +64,19 @@ class UsersTest(test.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(len(res_dict['users']), 2)
+ def test_get_user_list_xml(self):
+ req = webob.Request.blank('/v1.1/fake/users.xml')
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_tree.tag, 'users')
+ self.assertEqual(len(res_tree), 2)
+ self.assertEqual(res_tree[0].tag, 'user')
+ self.assertEqual(res_tree[0].get('id'), 'id1')
+ self.assertEqual(res_tree[1].tag, 'user')
+ self.assertEqual(res_tree[1].get('id'), 'id2')
+
def test_get_user_by_id(self):
req = webob.Request.blank('/v1.1/fake/users/id2')
res = req.get_response(fakes.wsgi_app())
@@ -74,6 +88,18 @@ class UsersTest(test.TestCase):
self.assertEqual(res_dict['user']['admin'], True)
self.assertEqual(res.status_int, 200)
+ def test_get_user_by_id_xml(self):
+ req = webob.Request.blank('/v1.1/fake/users/id2.xml')
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_tree.tag, 'user')
+ self.assertEqual(res_tree.get('id'), 'id2')
+ self.assertEqual(res_tree.get('name'), 'guy2')
+ self.assertEqual(res_tree.get('secret'), 'secret2')
+ self.assertEqual(res_tree.get('admin'), 'True')
+
def test_user_delete(self):
# Check the user exists
req = webob.Request.blank('/v1.1/fake/users/id1')
@@ -125,6 +151,35 @@ class UsersTest(test.TestCase):
fakes.FakeAuthManager.auth_data])
self.assertEqual(len(fakes.FakeAuthManager.auth_data), 3)
+ def test_user_create_xml(self):
+ secret = utils.generate_password()
+ body = dict(user=dict(name='test_guy',
+ access='acc3',
+ secret=secret,
+ admin=True))
+ req = webob.Request.blank('/v1.1/fake/users.xml')
+ req.headers["Content-Type"] = "application/json"
+ req.method = 'POST'
+ req.body = json.dumps(body)
+
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
+ self.assertEqual(res.status_int, 200)
+
+ # NOTE(justinsb): This is a questionable assertion in general
+ # fake sets id=name, but others might not...
+ self.assertEqual(res_tree.tag, 'user')
+ self.assertEqual(res_tree.get('id'), 'test_guy')
+
+ self.assertEqual(res_tree.get('name'), 'test_guy')
+ self.assertEqual(res_tree.get('access'), 'acc3')
+ self.assertEqual(res_tree.get('secret'), secret)
+ self.assertEqual(res_tree.get('admin'), 'True')
+ self.assertTrue('test_guy' in [u.id for u in
+ fakes.FakeAuthManager.auth_data])
+ self.assertEqual(len(fakes.FakeAuthManager.auth_data), 3)
+
def test_user_update(self):
new_secret = utils.generate_password()
body = dict(user=dict(name='guy2',
@@ -144,3 +199,24 @@ class UsersTest(test.TestCase):
self.assertEqual(res_dict['user']['access'], 'acc2')
self.assertEqual(res_dict['user']['secret'], new_secret)
self.assertEqual(res_dict['user']['admin'], True)
+
+ def test_user_update_xml(self):
+ new_secret = utils.generate_password()
+ body = dict(user=dict(name='guy2',
+ access='acc2',
+ secret=new_secret))
+ req = webob.Request.blank('/v1.1/fake/users/id2.xml')
+ req.headers["Content-Type"] = "application/json"
+ req.method = 'PUT'
+ req.body = json.dumps(body)
+
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_tree.tag, 'user')
+ self.assertEqual(res_tree.get('id'), 'id2')
+ self.assertEqual(res_tree.get('name'), 'guy2')
+ self.assertEqual(res_tree.get('access'), 'acc2')
+ self.assertEqual(res_tree.get('secret'), new_secret)
+ self.assertEqual(res_tree.get('admin'), 'True')
diff --git a/nova/tests/api/openstack/test_wsgi.py b/nova/tests/api/openstack/test_wsgi.py
index 74b9ce853..5aea8275d 100644
--- a/nova/tests/api/openstack/test_wsgi.py
+++ b/nova/tests/api/openstack/test_wsgi.py
@@ -215,20 +215,23 @@ class RequestHeadersDeserializerTest(test.TestCase):
self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'})
-class ResponseSerializerTest(test.TestCase):
- def setUp(self):
- class JSONSerializer(object):
- def serialize(self, data, action='default'):
- return 'pew_json'
+class JSONSerializer(object):
+ def serialize(self, data, action='default'):
+ return 'pew_json'
- class XMLSerializer(object):
- def serialize(self, data, action='default'):
- return 'pew_xml'
- class HeadersSerializer(object):
- def serialize(self, response, data, action):
- response.status_int = 404
+class XMLSerializer(object):
+ def serialize(self, data, action='default'):
+ return 'pew_xml'
+
+class HeadersSerializer(object):
+ def serialize(self, response, data, action):
+ response.status_int = 404
+
+
+class ResponseSerializerTest(test.TestCase):
+ def setUp(self):
self.body_serializers = {
'application/json': JSONSerializer(),
'application/xml': XMLSerializer(),
@@ -253,7 +256,8 @@ class ResponseSerializerTest(test.TestCase):
def test_serialize_response_json(self):
for content_type in ('application/json',
'application/vnd.openstack.compute+json'):
- response = self.serializer.serialize({}, content_type)
+ request = wsgi.Request.blank('/')
+ response = self.serializer.serialize(request, {}, content_type)
self.assertEqual(response.headers['Content-Type'], content_type)
self.assertEqual(response.body, 'pew_json')
self.assertEqual(response.status_int, 404)
@@ -261,21 +265,72 @@ class ResponseSerializerTest(test.TestCase):
def test_serialize_response_xml(self):
for content_type in ('application/xml',
'application/vnd.openstack.compute+xml'):
- response = self.serializer.serialize({}, content_type)
+ request = wsgi.Request.blank('/')
+ response = self.serializer.serialize(request, {}, content_type)
self.assertEqual(response.headers['Content-Type'], content_type)
self.assertEqual(response.body, 'pew_xml')
self.assertEqual(response.status_int, 404)
def test_serialize_response_None(self):
- response = self.serializer.serialize(None, 'application/json')
+ request = wsgi.Request.blank('/')
+ response = self.serializer.serialize(request, None, 'application/json')
self.assertEqual(response.headers['Content-Type'], 'application/json')
self.assertEqual(response.body, '')
self.assertEqual(response.status_int, 404)
def test_serialize_response_dict_to_unknown_content_type(self):
+ request = wsgi.Request.blank('/')
self.assertRaises(exception.InvalidContentType,
self.serializer.serialize,
- {}, 'application/unknown')
+ request, {}, 'application/unknown')
+
+
+class LazySerializationTest(test.TestCase):
+ def setUp(self):
+ self.body_serializers = {
+ 'application/json': JSONSerializer(),
+ 'application/xml': XMLSerializer(),
+ }
+
+ self.serializer = wsgi.ResponseSerializer(self.body_serializers,
+ HeadersSerializer())
+
+ def tearDown(self):
+ pass
+
+ def test_serialize_response_json(self):
+ for content_type in ('application/json',
+ 'application/vnd.openstack.compute+json'):
+ request = wsgi.Request.blank('/')
+ request.environ['nova.lazy_serialize'] = True
+ response = self.serializer.serialize(request, {}, content_type)
+ self.assertEqual(response.headers['Content-Type'], content_type)
+ self.assertEqual(response.status_int, 404)
+ body = json.loads(response.body)
+ self.assertEqual(body, {})
+ serializer = request.environ['nova.serializer']
+ self.assertEqual(serializer.serialize(body), 'pew_json')
+
+ def test_serialize_response_xml(self):
+ for content_type in ('application/xml',
+ 'application/vnd.openstack.compute+xml'):
+ request = wsgi.Request.blank('/')
+ request.environ['nova.lazy_serialize'] = True
+ response = self.serializer.serialize(request, {}, content_type)
+ self.assertEqual(response.headers['Content-Type'], content_type)
+ self.assertEqual(response.status_int, 404)
+ body = json.loads(response.body)
+ self.assertEqual(body, {})
+ serializer = request.environ['nova.serializer']
+ self.assertEqual(serializer.serialize(body), 'pew_xml')
+
+ def test_serialize_response_None(self):
+ request = wsgi.Request.blank('/')
+ request.environ['nova.lazy_serialize'] = True
+ response = self.serializer.serialize(request, None, 'application/json')
+ self.assertEqual(response.headers['Content-Type'], 'application/json')
+ self.assertEqual(response.status_int, 404)
+ self.assertEqual(response.body, '')
class RequestDeserializerTest(test.TestCase):
diff --git a/nova/tests/api/openstack/test_xmlutil.py b/nova/tests/api/openstack/test_xmlutil.py
new file mode 100644
index 000000000..6d224967f
--- /dev/null
+++ b/nova/tests/api/openstack/test_xmlutil.py
@@ -0,0 +1,763 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from lxml import etree
+
+from nova import test
+from nova.api.openstack import xmlutil
+
+
+class SelectorTest(test.TestCase):
+ obj_for_test = {
+ 'test': {
+ 'name': 'test',
+ 'values': [1, 2, 3],
+ 'attrs': {
+ 'foo': 1,
+ 'bar': 2,
+ 'baz': 3,
+ },
+ },
+ }
+
+ def test_empty_selector(self):
+ sel = xmlutil.Selector()
+ self.assertEqual(len(sel.chain), 0)
+ self.assertEqual(sel(self.obj_for_test), self.obj_for_test)
+
+ def test_dict_selector(self):
+ sel = xmlutil.Selector('test')
+ self.assertEqual(len(sel.chain), 1)
+ self.assertEqual(sel.chain[0], 'test')
+ self.assertEqual(sel(self.obj_for_test),
+ self.obj_for_test['test'])
+
+ def test_datum_selector(self):
+ sel = xmlutil.Selector('test', 'name')
+ self.assertEqual(len(sel.chain), 2)
+ self.assertEqual(sel.chain[0], 'test')
+ self.assertEqual(sel.chain[1], 'name')
+ self.assertEqual(sel(self.obj_for_test), 'test')
+
+ def test_list_selector(self):
+ sel = xmlutil.Selector('test', 'values', 0)
+ self.assertEqual(len(sel.chain), 3)
+ self.assertEqual(sel.chain[0], 'test')
+ self.assertEqual(sel.chain[1], 'values')
+ self.assertEqual(sel.chain[2], 0)
+ self.assertEqual(sel(self.obj_for_test), 1)
+
+ def test_items_selector(self):
+ sel = xmlutil.Selector('test', 'attrs', xmlutil.get_items)
+ self.assertEqual(len(sel.chain), 3)
+ self.assertEqual(sel.chain[2], xmlutil.get_items)
+ for key, val in sel(self.obj_for_test):
+ self.assertEqual(self.obj_for_test['test']['attrs'][key], val)
+
+ def test_missing_key_selector(self):
+ sel = xmlutil.Selector('test2', 'attrs')
+ self.assertEqual(sel(self.obj_for_test), None)
+ self.assertRaises(KeyError, sel, self.obj_for_test, True)
+
+ def test_constant_selector(self):
+ sel = xmlutil.ConstantSelector('Foobar')
+ self.assertEqual(sel.value, 'Foobar')
+ self.assertEqual(sel(self.obj_for_test), 'Foobar')
+
+
+class TemplateElementTest(test.TestCase):
+ def test_element_initial_attributes(self):
+ # Create a template element with some attributes
+ elem = xmlutil.TemplateElement('test', attrib=dict(a=1, b=2, c=3),
+ c=4, d=5, e=6)
+
+ # Verify all the attributes are as expected
+ expected = dict(a=1, b=2, c=4, d=5, e=6)
+ for k, v in expected.items():
+ self.assertEqual(elem.attrib[k].chain[0], v)
+
+ def test_element_get_attributes(self):
+ expected = dict(a=1, b=2, c=3)
+
+ # Create a template element with some attributes
+ elem = xmlutil.TemplateElement('test', attrib=expected)
+
+ # Verify that get() retrieves the attributes
+ for k, v in expected.items():
+ self.assertEqual(elem.get(k).chain[0], v)
+
+ def test_element_set_attributes(self):
+ attrs = dict(a=None, b='foo', c=xmlutil.Selector('foo', 'bar'))
+
+ # Create a bare template element with no attributes
+ elem = xmlutil.TemplateElement('test')
+
+ # Set the attribute values
+ for k, v in attrs.items():
+ elem.set(k, v)
+
+ # Now verify what got set
+ self.assertEqual(len(elem.attrib['a'].chain), 1)
+ self.assertEqual(elem.attrib['a'].chain[0], 'a')
+ self.assertEqual(len(elem.attrib['b'].chain), 1)
+ self.assertEqual(elem.attrib['b'].chain[0], 'foo')
+ self.assertEqual(elem.attrib['c'], attrs['c'])
+
+ def test_element_attribute_keys(self):
+ attrs = dict(a=1, b=2, c=3, d=4)
+ expected = set(attrs.keys())
+
+ # Create a template element with some attributes
+ elem = xmlutil.TemplateElement('test', attrib=attrs)
+
+ # Now verify keys
+ self.assertEqual(set(elem.keys()), expected)
+
+ def test_element_attribute_items(self):
+ expected = dict(a=xmlutil.Selector(1),
+ b=xmlutil.Selector(2),
+ c=xmlutil.Selector(3))
+ keys = set(expected.keys())
+
+ # Create a template element with some attributes
+ elem = xmlutil.TemplateElement('test', attrib=expected)
+
+ # Now verify items
+ for k, v in elem.items():
+ self.assertEqual(expected[k], v)
+ keys.remove(k)
+
+ # Did we visit all keys?
+ self.assertEqual(len(keys), 0)
+
+ def test_element_selector_none(self):
+ # Create a template element with no selector
+ elem = xmlutil.TemplateElement('test')
+
+ self.assertEqual(len(elem.selector.chain), 0)
+
+ def test_element_selector_string(self):
+ # Create a template element with a string selector
+ elem = xmlutil.TemplateElement('test', selector='test')
+
+ self.assertEqual(len(elem.selector.chain), 1)
+ self.assertEqual(elem.selector.chain[0], 'test')
+
+ def test_element_selector(self):
+ sel = xmlutil.Selector('a', 'b')
+
+ # Create a template element with an explicit selector
+ elem = xmlutil.TemplateElement('test', selector=sel)
+
+ self.assertEqual(elem.selector, sel)
+
+ def test_element_append_child(self):
+ # Create an element
+ elem = xmlutil.TemplateElement('test')
+
+ # Make sure the element starts off empty
+ self.assertEqual(len(elem), 0)
+
+ # Create a child element
+ child = xmlutil.TemplateElement('child')
+
+ # Append the child to the parent
+ elem.append(child)
+
+ # Verify that the child was added
+ self.assertEqual(len(elem), 1)
+ self.assertEqual(elem[0], child)
+ self.assertEqual('child' in elem, True)
+ self.assertEqual(elem['child'], child)
+
+ # Ensure that multiple children of the same name are rejected
+ child2 = xmlutil.TemplateElement('child')
+ self.assertRaises(KeyError, elem.append, child2)
+
+ def test_element_extend_children(self):
+ # Create an element
+ elem = xmlutil.TemplateElement('test')
+
+ # Make sure the element starts off empty
+ self.assertEqual(len(elem), 0)
+
+ # Create a few children
+ children = [
+ xmlutil.TemplateElement('child1'),
+ xmlutil.TemplateElement('child2'),
+ xmlutil.TemplateElement('child3'),
+ ]
+
+ # Extend the parent by those children
+ elem.extend(children)
+
+ # Verify that the children were added
+ self.assertEqual(len(elem), 3)
+ for idx in range(len(elem)):
+ self.assertEqual(children[idx], elem[idx])
+ self.assertEqual(children[idx].tag in elem, True)
+ self.assertEqual(elem[children[idx].tag], children[idx])
+
+ # Ensure that multiple children of the same name are rejected
+ children2 = [
+ xmlutil.TemplateElement('child4'),
+ xmlutil.TemplateElement('child1'),
+ ]
+ self.assertRaises(KeyError, elem.extend, children2)
+
+ # Also ensure that child4 was not added
+ self.assertEqual(len(elem), 3)
+ self.assertEqual(elem[-1].tag, 'child3')
+
+ def test_element_insert_child(self):
+ # Create an element
+ elem = xmlutil.TemplateElement('test')
+
+ # Make sure the element starts off empty
+ self.assertEqual(len(elem), 0)
+
+ # Create a few children
+ children = [
+ xmlutil.TemplateElement('child1'),
+ xmlutil.TemplateElement('child2'),
+ xmlutil.TemplateElement('child3'),
+ ]
+
+ # Extend the parent by those children
+ elem.extend(children)
+
+ # Create a child to insert
+ child = xmlutil.TemplateElement('child4')
+
+ # Insert it
+ elem.insert(1, child)
+
+ # Ensure the child was inserted in the right place
+ self.assertEqual(len(elem), 4)
+ children.insert(1, child)
+ for idx in range(len(elem)):
+ self.assertEqual(children[idx], elem[idx])
+ self.assertEqual(children[idx].tag in elem, True)
+ self.assertEqual(elem[children[idx].tag], children[idx])
+
+ # Ensure that multiple children of the same name are rejected
+ child2 = xmlutil.TemplateElement('child2')
+ self.assertRaises(KeyError, elem.insert, 2, child2)
+
+ def test_element_remove_child(self):
+ # Create an element
+ elem = xmlutil.TemplateElement('test')
+
+ # Make sure the element starts off empty
+ self.assertEqual(len(elem), 0)
+
+ # Create a few children
+ children = [
+ xmlutil.TemplateElement('child1'),
+ xmlutil.TemplateElement('child2'),
+ xmlutil.TemplateElement('child3'),
+ ]
+
+ # Extend the parent by those children
+ elem.extend(children)
+
+ # Create a test child to remove
+ child = xmlutil.TemplateElement('child2')
+
+ # Try to remove it
+ self.assertRaises(ValueError, elem.remove, child)
+
+ # Ensure that no child was removed
+ self.assertEqual(len(elem), 3)
+
+ # Now remove a legitimate child
+ elem.remove(children[1])
+
+ # Ensure that the child was removed
+ self.assertEqual(len(elem), 2)
+ self.assertEqual(elem[0], children[0])
+ self.assertEqual(elem[1], children[2])
+ self.assertEqual('child2' in elem, False)
+
+ # Ensure the child cannot be retrieved by name
+ def get_key(elem, key):
+ return elem[key]
+ self.assertRaises(KeyError, get_key, elem, 'child2')
+
+ def test_element_text(self):
+ # Create an element
+ elem = xmlutil.TemplateElement('test')
+
+ # Ensure that it has no text
+ self.assertEqual(elem.text, None)
+
+ # Try setting it to a string and ensure it becomes a selector
+ elem.text = 'test'
+ self.assertEqual(hasattr(elem.text, 'chain'), True)
+ self.assertEqual(len(elem.text.chain), 1)
+ self.assertEqual(elem.text.chain[0], 'test')
+
+ # Try resetting the text to None
+ elem.text = None
+ self.assertEqual(elem.text, None)
+
+ # Now make up a selector and try setting the text to that
+ sel = xmlutil.Selector()
+ elem.text = sel
+ self.assertEqual(elem.text, sel)
+
+ # Finally, try deleting the text and see what happens
+ del elem.text
+ self.assertEqual(elem.text, None)
+
+ def test_apply_attrs(self):
+ # Create a template element
+ attrs = dict(attr1=xmlutil.ConstantSelector(1),
+ attr2=xmlutil.ConstantSelector(2))
+ tmpl_elem = xmlutil.TemplateElement('test', attrib=attrs)
+
+ # Create an etree element
+ elem = etree.Element('test')
+
+ # Apply the template to the element
+ tmpl_elem.apply(elem, None)
+
+ # Now, verify the correct attributes were set
+ for k, v in elem.items():
+ self.assertEqual(str(attrs[k].value), v)
+
+ def test_apply_text(self):
+ # Create a template element
+ tmpl_elem = xmlutil.TemplateElement('test')
+ tmpl_elem.text = xmlutil.ConstantSelector(1)
+
+ # Create an etree element
+ elem = etree.Element('test')
+
+ # Apply the template to the element
+ tmpl_elem.apply(elem, None)
+
+ # Now, verify the text was set
+ self.assertEqual(str(tmpl_elem.text.value), elem.text)
+
+ def test__render(self):
+ attrs = dict(attr1=xmlutil.ConstantSelector(1),
+ attr2=xmlutil.ConstantSelector(2),
+ attr3=xmlutil.ConstantSelector(3))
+
+ # Create a master template element
+ master_elem = xmlutil.TemplateElement('test', attr1=attrs['attr1'])
+
+ # Create a couple of slave template element
+ slave_elems = [
+ xmlutil.TemplateElement('test', attr2=attrs['attr2']),
+ xmlutil.TemplateElement('test', attr3=attrs['attr3']),
+ ]
+
+ # Try the render
+ elem = master_elem._render(None, None, slave_elems, None)
+
+ # Verify the particulars of the render
+ self.assertEqual(elem.tag, 'test')
+ self.assertEqual(len(elem.nsmap), 0)
+ for k, v in elem.items():
+ self.assertEqual(str(attrs[k].value), v)
+
+ # Create a parent for the element to be rendered
+ parent = etree.Element('parent')
+
+ # Try the render again...
+ elem = master_elem._render(parent, None, slave_elems, dict(a='foo'))
+
+ # Verify the particulars of the render
+ self.assertEqual(len(parent), 1)
+ self.assertEqual(parent[0], elem)
+ self.assertEqual(len(elem.nsmap), 1)
+ self.assertEqual(elem.nsmap['a'], 'foo')
+
+ def test_render(self):
+ # Create a template element
+ tmpl_elem = xmlutil.TemplateElement('test')
+ tmpl_elem.text = xmlutil.Selector()
+
+ # Create the object we're going to render
+ obj = ['elem1', 'elem2', 'elem3', 'elem4']
+
+ # Try a render with no object
+ elems = tmpl_elem.render(None, None)
+ self.assertEqual(len(elems), 0)
+
+ # Try a render with one object
+ elems = tmpl_elem.render(None, 'foo')
+ self.assertEqual(len(elems), 1)
+ self.assertEqual(elems[0][0].text, 'foo')
+ self.assertEqual(elems[0][1], 'foo')
+
+ # Now, try rendering an object with multiple entries
+ parent = etree.Element('parent')
+ elems = tmpl_elem.render(parent, obj)
+ self.assertEqual(len(elems), 4)
+
+ # Check the results
+ for idx in range(len(obj)):
+ self.assertEqual(elems[idx][0].text, obj[idx])
+ self.assertEqual(elems[idx][1], obj[idx])
+
+ def test_subelement(self):
+ # Try the SubTemplateElement constructor
+ parent = xmlutil.SubTemplateElement(None, 'parent')
+ self.assertEqual(parent.tag, 'parent')
+ self.assertEqual(len(parent), 0)
+
+ # Now try it with a parent element
+ child = xmlutil.SubTemplateElement(parent, 'child')
+ self.assertEqual(child.tag, 'child')
+ self.assertEqual(len(parent), 1)
+ self.assertEqual(parent[0], child)
+
+ def test_wrap(self):
+ # These are strange methods, but they make things easier
+ elem = xmlutil.TemplateElement('test')
+ self.assertEqual(elem.unwrap(), elem)
+ self.assertEqual(elem.wrap().root, elem)
+
+ def test_dyntag(self):
+ obj = ['a', 'b', 'c']
+
+ # Create a template element with a dynamic tag
+ tmpl_elem = xmlutil.TemplateElement(xmlutil.Selector())
+
+ # Try the render
+ parent = etree.Element('parent')
+ elems = tmpl_elem.render(parent, obj)
+
+ # Verify the particulars of the render
+ self.assertEqual(len(elems), len(obj))
+ for idx in range(len(obj)):
+ self.assertEqual(elems[idx][0].tag, obj[idx])
+
+
+class TemplateTest(test.TestCase):
+ def test_wrap(self):
+ # These are strange methods, but they make things easier
+ elem = xmlutil.TemplateElement('test')
+ tmpl = xmlutil.Template(elem)
+ self.assertEqual(tmpl.unwrap(), elem)
+ self.assertEqual(tmpl.wrap(), tmpl)
+
+ def test__siblings(self):
+ # Set up a basic template
+ elem = xmlutil.TemplateElement('test')
+ tmpl = xmlutil.Template(elem)
+
+ # Check that we get the right siblings
+ siblings = tmpl._siblings()
+ self.assertEqual(len(siblings), 1)
+ self.assertEqual(siblings[0], elem)
+
+ def test__nsmap(self):
+ # Set up a basic template
+ elem = xmlutil.TemplateElement('test')
+ tmpl = xmlutil.Template(elem, nsmap=dict(a="foo"))
+
+ # Check out that we get the right namespace dictionary
+ nsmap = tmpl._nsmap()
+ self.assertNotEqual(id(nsmap), id(tmpl.nsmap))
+ self.assertEqual(len(nsmap), 1)
+ self.assertEqual(nsmap['a'], 'foo')
+
+ def test_master_attach(self):
+ # Set up a master template
+ elem = xmlutil.TemplateElement('test')
+ tmpl = xmlutil.MasterTemplate(elem, 1)
+
+ # Make sure it has a root but no slaves
+ self.assertEqual(tmpl.root, elem)
+ self.assertEqual(len(tmpl.slaves), 0)
+
+ # Try to attach an invalid slave
+ bad_elem = xmlutil.TemplateElement('test2')
+ self.assertRaises(ValueError, tmpl.attach, bad_elem)
+ self.assertEqual(len(tmpl.slaves), 0)
+
+ # Try to attach an invalid and a valid slave
+ good_elem = xmlutil.TemplateElement('test')
+ self.assertRaises(ValueError, tmpl.attach, good_elem, bad_elem)
+ self.assertEqual(len(tmpl.slaves), 0)
+
+ # Try to attach an inapplicable template
+ class InapplicableTemplate(xmlutil.Template):
+ def apply(self, master):
+ return False
+ inapp_tmpl = InapplicableTemplate(good_elem)
+ tmpl.attach(inapp_tmpl)
+ self.assertEqual(len(tmpl.slaves), 0)
+
+ # Now try attaching an applicable template
+ tmpl.attach(good_elem)
+ self.assertEqual(len(tmpl.slaves), 1)
+ self.assertEqual(tmpl.slaves[0].root, good_elem)
+
+ def test_master_copy(self):
+ # Construct a master template
+ elem = xmlutil.TemplateElement('test')
+ tmpl = xmlutil.MasterTemplate(elem, 1, nsmap=dict(a='foo'))
+
+ # Give it a slave
+ slave = xmlutil.TemplateElement('test')
+ tmpl.attach(slave)
+
+ # Construct a copy
+ copy = tmpl.copy()
+
+ # Check to see if we actually managed a copy
+ self.assertNotEqual(tmpl, copy)
+ self.assertEqual(tmpl.root, copy.root)
+ self.assertEqual(tmpl.version, copy.version)
+ self.assertEqual(id(tmpl.nsmap), id(copy.nsmap))
+ self.assertNotEqual(id(tmpl.slaves), id(copy.slaves))
+ self.assertEqual(len(tmpl.slaves), len(copy.slaves))
+ self.assertEqual(tmpl.slaves[0], copy.slaves[0])
+
+ def test_slave_apply(self):
+ # Construct a master template
+ elem = xmlutil.TemplateElement('test')
+ master = xmlutil.MasterTemplate(elem, 3)
+
+ # Construct a slave template with applicable minimum version
+ slave = xmlutil.SlaveTemplate(elem, 2)
+ self.assertEqual(slave.apply(master), True)
+
+ # Construct a slave template with equal minimum version
+ slave = xmlutil.SlaveTemplate(elem, 3)
+ self.assertEqual(slave.apply(master), True)
+
+ # Construct a slave template with inapplicable minimum version
+ slave = xmlutil.SlaveTemplate(elem, 4)
+ self.assertEqual(slave.apply(master), False)
+
+ # Construct a slave template with applicable version range
+ slave = xmlutil.SlaveTemplate(elem, 2, 4)
+ self.assertEqual(slave.apply(master), True)
+
+ # Construct a slave template with low version range
+ slave = xmlutil.SlaveTemplate(elem, 1, 2)
+ self.assertEqual(slave.apply(master), False)
+
+ # Construct a slave template with high version range
+ slave = xmlutil.SlaveTemplate(elem, 4, 5)
+ self.assertEqual(slave.apply(master), False)
+
+ # Construct a slave template with matching version range
+ slave = xmlutil.SlaveTemplate(elem, 3, 3)
+ self.assertEqual(slave.apply(master), True)
+
+ def test__serialize(self):
+ # Our test object to serialize
+ obj = {
+ 'test': {
+ 'name': 'foobar',
+ 'values': [1, 2, 3, 4],
+ 'attrs': {
+ 'a': 1,
+ 'b': 2,
+ 'c': 3,
+ 'd': 4,
+ },
+ 'image': {
+ 'name': 'image_foobar',
+ 'id': 42,
+ },
+ },
+ }
+
+ # Set up our master template
+ root = xmlutil.TemplateElement('test', selector='test',
+ name='name')
+ value = xmlutil.SubTemplateElement(root, 'value', selector='values')
+ value.text = xmlutil.Selector()
+ attrs = xmlutil.SubTemplateElement(root, 'attrs', selector='attrs')
+ xmlutil.SubTemplateElement(attrs, 'attr', selector=xmlutil.get_items,
+ key=0, value=1)
+ master = xmlutil.MasterTemplate(root, 1, nsmap=dict(f='foo'))
+
+ # Set up our slave template
+ root_slave = xmlutil.TemplateElement('test', selector='test')
+ image = xmlutil.SubTemplateElement(root_slave, 'image',
+ selector='image', id='id')
+ image.text = xmlutil.Selector('name')
+ slave = xmlutil.SlaveTemplate(root_slave, 1, nsmap=dict(b='bar'))
+
+ # Attach the slave to the master...
+ master.attach(slave)
+
+ # Try serializing our object
+ siblings = master._siblings()
+ nsmap = master._nsmap()
+ result = master._serialize(None, obj, siblings, nsmap)
+
+ # Now we get to manually walk the element tree...
+ self.assertEqual(result.tag, 'test')
+ self.assertEqual(len(result.nsmap), 2)
+ self.assertEqual(result.nsmap['f'], 'foo')
+ self.assertEqual(result.nsmap['b'], 'bar')
+ self.assertEqual(result.get('name'), obj['test']['name'])
+ for idx, val in enumerate(obj['test']['values']):
+ self.assertEqual(result[idx].tag, 'value')
+ self.assertEqual(result[idx].text, str(val))
+ idx += 1
+ self.assertEqual(result[idx].tag, 'attrs')
+ for attr in result[idx]:
+ self.assertEqual(attr.tag, 'attr')
+ self.assertEqual(attr.get('value'),
+ str(obj['test']['attrs'][attr.get('key')]))
+ idx += 1
+ self.assertEqual(result[idx].tag, 'image')
+ self.assertEqual(result[idx].get('id'),
+ str(obj['test']['image']['id']))
+ self.assertEqual(result[idx].text, obj['test']['image']['name'])
+
+
+class MasterTemplateBuilder(xmlutil.TemplateBuilder):
+ def construct(self):
+ elem = xmlutil.TemplateElement('test')
+ return xmlutil.MasterTemplate(elem, 1)
+
+
+class SlaveTemplateBuilder(xmlutil.TemplateBuilder):
+ def construct(self):
+ elem = xmlutil.TemplateElement('test')
+ return xmlutil.SlaveTemplate(elem, 1)
+
+
+class TemplateBuilderTest(test.TestCase):
+ def test_master_template_builder(self):
+ # Make sure the template hasn't been built yet
+ self.assertEqual(MasterTemplateBuilder._tmpl, None)
+
+ # Now, construct the template
+ tmpl1 = MasterTemplateBuilder()
+
+ # Make sure that there is a template cached...
+ self.assertNotEqual(MasterTemplateBuilder._tmpl, None)
+
+ # Make sure it wasn't what was returned...
+ self.assertNotEqual(MasterTemplateBuilder._tmpl, tmpl1)
+
+ # Make sure it doesn't get rebuilt
+ cached = MasterTemplateBuilder._tmpl
+ tmpl2 = MasterTemplateBuilder()
+ self.assertEqual(MasterTemplateBuilder._tmpl, cached)
+
+ # Make sure we're always getting fresh copies
+ self.assertNotEqual(tmpl1, tmpl2)
+
+ # Make sure we can override the copying behavior
+ tmpl3 = MasterTemplateBuilder(False)
+ self.assertEqual(MasterTemplateBuilder._tmpl, tmpl3)
+
+ def test_slave_template_builder(self):
+ # Make sure the template hasn't been built yet
+ self.assertEqual(SlaveTemplateBuilder._tmpl, None)
+
+ # Now, construct the template
+ tmpl1 = SlaveTemplateBuilder()
+
+ # Make sure there is a template cached...
+ self.assertNotEqual(SlaveTemplateBuilder._tmpl, None)
+
+ # Make sure it was what was returned...
+ self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1)
+
+ # Make sure it doesn't get rebuilt
+ tmpl2 = SlaveTemplateBuilder()
+ self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1)
+
+ # Make sure we're always getting the cached copy
+ self.assertEqual(tmpl1, tmpl2)
+
+
+class SerializerTest(xmlutil.XMLTemplateSerializer):
+ def test(self):
+ root = xmlutil.TemplateElement('servers')
+ a = xmlutil.SubTemplateElement(root, 'a', selector='servers')
+ a.text = xmlutil.Selector('a')
+ return xmlutil.MasterTemplate(root, 1, nsmap={None: "asdf"})
+
+
+class XMLTemplateSerializerTest(test.TestCase):
+ def setUp(self):
+ self.tmpl_serializer = SerializerTest()
+ self.data = dict(servers=dict(a=(2, 3)))
+ self.data_multi = dict(servers=[dict(a=(2, 3)), dict(a=(3, 4))])
+ super(XMLTemplateSerializerTest, self).setUp()
+
+ def test_get_template(self):
+ # First, check what happens when we fall back on the default
+ # option
+ self.assertEqual(self.tmpl_serializer.get_template(), None)
+ self.assertEqual(self.tmpl_serializer.get_template('nosuch'), None)
+
+ # Now, check that we get back a template
+ tmpl = self.tmpl_serializer.get_template('test')
+ self.assertNotEqual(tmpl, None)
+ self.assertEqual(tmpl.root.tag, 'servers')
+
+ def test_serialize_default(self):
+ expected_xml = '<servers><a>(2,3)</a></servers>'
+ result = self.tmpl_serializer.serialize(self.data)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
+
+ def test_serialize_multi_default(self):
+ expected_xml = ('<servers><server><a>(2,3)</a></server>'
+ '<server><a>(3,4)</a></server></servers>')
+ result = self.tmpl_serializer.serialize(self.data_multi)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
+
+ def test_serialize_explicit(self):
+ expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>"
+ '<serversxmlns="asdf"><a>(2,3)</a></servers>')
+ tmpl = self.tmpl_serializer.get_template('test')
+ result = self.tmpl_serializer.serialize(self.data, template=tmpl)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
+
+ def test_serialize_multi_explicit(self):
+ expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>"
+ '<serversxmlns="asdf"><a>(2,3)</a>'
+ '<a>(3,4)</a></servers>')
+ tmpl = self.tmpl_serializer.get_template('test')
+ result = self.tmpl_serializer.serialize(self.data_multi, template=tmpl)
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
+
+ def test_serialize(self):
+ expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>"
+ '<serversxmlns="asdf"><a>(2,3)</a></servers>')
+ result = self.tmpl_serializer.serialize(self.data, 'test')
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
+
+ def test_serialize_multi(self):
+ expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>"
+ '<serversxmlns="asdf"><a>(2,3)</a>'
+ '<a>(3,4)</a></servers>')
+ result = self.tmpl_serializer.serialize(self.data_multi, 'test')
+ result = result.replace('\n', '').replace(' ', '')
+ self.assertEqual(result, expected_xml)
diff --git a/nova/tests/api/openstack/test_zones.py b/nova/tests/api/openstack/test_zones.py
index 869f13183..af762d3d6 100644
--- a/nova/tests/api/openstack/test_zones.py
+++ b/nova/tests/api/openstack/test_zones.py
@@ -14,15 +14,18 @@
# under the License.
+import json
+
+from lxml import etree
import stubout
import webob
-import json
import nova.db
from nova import context
from nova import crypto
from nova import flags
from nova import test
+from nova.api.openstack import xmlutil
from nova.api.openstack import zones
from nova.tests.api.openstack import fakes
from nova.scheduler import api
@@ -112,6 +115,18 @@ class ZonesTest(test.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(len(res_dict['zones']), 2)
+ def test_get_zone_list_scheduler_xml(self):
+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler)
+ req = webob.Request.blank('/v1.1/fake/zones.xml')
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_tree.tag, '{%s}zones' % xmlutil.XMLNS_V10)
+ self.assertEqual(len(res_tree), 2)
+ self.assertEqual(res_tree[0].tag, '{%s}zone' % xmlutil.XMLNS_V10)
+ self.assertEqual(res_tree[1].tag, '{%s}zone' % xmlutil.XMLNS_V10)
+
def test_get_zone_list_db(self):
self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
@@ -123,6 +138,20 @@ class ZonesTest(test.TestCase):
res_dict = json.loads(res.body)
self.assertEqual(len(res_dict['zones']), 2)
+ def test_get_zone_list_db_xml(self):
+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
+ self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
+ req = webob.Request.blank('/v1.1/fake/zones.xml')
+ req.headers["Content-Type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+
+ self.assertEqual(res.status_int, 200)
+ res_tree = etree.fromstring(res.body)
+ self.assertEqual(res_tree.tag, '{%s}zones' % xmlutil.XMLNS_V10)
+ self.assertEqual(len(res_tree), 2)
+ self.assertEqual(res_tree[0].tag, '{%s}zone' % xmlutil.XMLNS_V10)
+ self.assertEqual(res_tree[1].tag, '{%s}zone' % xmlutil.XMLNS_V10)
+
def test_get_zone_by_id(self):
req = webob.Request.blank('/v1.1/fake/zones/1')
req.headers["Content-Type"] = "application/json"
@@ -134,6 +163,18 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('password' in res_dict['zone'])
+ def test_get_zone_by_id_xml(self):
+ req = webob.Request.blank('/v1.1/fake/zones/1.xml')
+ req.headers["Content-Type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10)
+ self.assertEqual(res_tree.get('id'), '1')
+ self.assertEqual(res_tree.get('api_url'), 'http://example.com')
+ self.assertEqual(res_tree.get('password'), None)
+
def test_zone_delete(self):
req = webob.Request.blank('/v1.1/fake/zones/1')
req.headers["Content-Type"] = "application/json"
@@ -157,6 +198,23 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])
+ def test_zone_create_xml(self):
+ body = dict(zone=dict(api_url='http://example.com', username='fred',
+ password='fubar'))
+ req = webob.Request.blank('/v1.1/fake/zones.xml')
+ req.headers["Content-Type"] = "application/json"
+ req.method = 'POST'
+ req.body = json.dumps(body)
+
+ res = req.get_response(fakes.wsgi_app())
+
+ self.assertEqual(res.status_int, 200)
+ res_tree = etree.fromstring(res.body)
+ self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10)
+ self.assertEqual(res_tree.get('id'), '1')
+ self.assertEqual(res_tree.get('api_url'), 'http://example.com')
+ self.assertEqual(res_tree.get('username'), None)
+
def test_zone_update(self):
body = dict(zone=dict(username='zeb', password='sneaky'))
req = webob.Request.blank('/v1.1/fake/zones/1')
@@ -172,6 +230,22 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])
+ def test_zone_update_xml(self):
+ body = dict(zone=dict(username='zeb', password='sneaky'))
+ req = webob.Request.blank('/v1.1/fake/zones/1.xml')
+ req.headers["Content-Type"] = "application/json"
+ req.method = 'PUT'
+ req.body = json.dumps(body)
+
+ res = req.get_response(fakes.wsgi_app())
+
+ self.assertEqual(res.status_int, 200)
+ res_tree = etree.fromstring(res.body)
+ self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10)
+ self.assertEqual(res_tree.get('id'), '1')
+ self.assertEqual(res_tree.get('api_url'), 'http://example.com')
+ self.assertEqual(res_tree.get('username'), None)
+
def test_zone_info(self):
caps = ['cap1=a;b', 'cap2=c;d']
self.flags(zone_name='darksecret', zone_capabilities=caps)
@@ -187,6 +261,28 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['cap1'], 'a;b')
self.assertEqual(res_dict['zone']['cap2'], 'c;d')
+ def test_zone_info_xml(self):
+ caps = ['cap1=a;b', 'cap2=c;d']
+ self.flags(zone_name='darksecret', zone_capabilities=caps)
+ self.stubs.Set(api, '_call_scheduler', zone_capabilities)
+
+ body = dict(zone=dict(username='zeb', password='sneaky'))
+ req = webob.Request.blank('/v1.1/fake/zones/info.xml')
+
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10)
+ self.assertEqual(res_tree.get('name'), 'darksecret')
+ for elem in res_tree:
+ self.assertEqual(elem.tag in ('{%s}cap1' % xmlutil.XMLNS_V10,
+ '{%s}cap2' % xmlutil.XMLNS_V10),
+ True)
+ if elem.tag == '{%s}cap1' % xmlutil.XMLNS_V10:
+ self.assertEqual(elem.text, 'a;b')
+ elif elem.tag == '{%s}cap2' % xmlutil.XMLNS_V10:
+ self.assertEqual(elem.text, 'c;d')
+
def test_zone_select(self):
key = 'c286696d887c9aa0611bbb3e2025a45a'
self.flags(build_plan_encryption_key=key)
@@ -220,3 +316,45 @@ class ZonesTest(test.TestCase):
self.assertTrue(found)
self.assertEqual(len(item), 2)
self.assertTrue('weight' in item)
+
+ def test_zone_select_xml(self):
+ key = 'c286696d887c9aa0611bbb3e2025a45a'
+ self.flags(build_plan_encryption_key=key)
+ self.stubs.Set(api, 'select', zone_select)
+
+ req = webob.Request.blank('/v1.1/fake/zones/select.xml')
+ req.method = 'POST'
+ req.headers["Content-Type"] = "application/json"
+ # Select queries end up being JSON encoded twice.
+ # Once to a string and again as an HTTP POST Body
+ req.body = json.dumps(json.dumps({}))
+
+ res = req.get_response(fakes.wsgi_app())
+ res_tree = etree.fromstring(res.body)
+ self.assertEqual(res.status_int, 200)
+
+ self.assertEqual(res_tree.tag, '{%s}weights' % xmlutil.XMLNS_V10)
+
+ for item in res_tree:
+ self.assertEqual(item.tag, '{%s}weight' % xmlutil.XMLNS_V10)
+ blob = None
+ weight = None
+ for chld in item:
+ if chld.tag.endswith('blob'):
+ blob = chld.text
+ elif chld.tag.endswith('weight'):
+ weight = chld.text
+
+ decrypt = crypto.decryptor(FLAGS.build_plan_encryption_key)
+ secret_item = json.loads(decrypt(blob))
+ found = False
+ for original_item in GLOBAL_BUILD_PLAN:
+ if original_item['name'] != secret_item['name']:
+ continue
+ found = True
+ for key in ('weight', 'ip', 'zone'):
+ self.assertEqual(secret_item[key], original_item[key])
+
+ self.assertTrue(found)
+ self.assertEqual(len(item), 2)
+ self.assertTrue(weight)