summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorJustin Santa Barbara <justin@fathomdb.com>2011-03-25 14:59:01 -0700
committerJustin Santa Barbara <justin@fathomdb.com>2011-03-25 14:59:01 -0700
commit2d07440385f18e08cbf833f11a7d356ba430fabc (patch)
tree0983b8bc17a81f1070933733b32baa0b4206285d /nova/tests
parentd98b212ebb49990fbe053be5f45014d3466589ce (diff)
parented12a2cd2beef77d1c7e9d16771e766aa068530d (diff)
Merged with trunk, resolved conflicts & code-flicts
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/openstack/extensions/foxinsocks.py98
-rw-r--r--nova/tests/api/openstack/fakes.py36
-rw-r--r--nova/tests/api/openstack/test_extensions.py236
-rw-r--r--nova/tests/api/openstack/test_flavors.py158
-rw-r--r--nova/tests/api/openstack/test_image_metadata.py166
-rw-r--r--nova/tests/api/openstack/test_images.py263
-rw-r--r--nova/tests/api/openstack/test_server_metadata.py164
-rw-r--r--nova/tests/api/openstack/test_servers.py143
-rw-r--r--nova/tests/api/openstack/test_versions.py97
-rw-r--r--nova/tests/db/fakes.py72
-rw-r--r--nova/tests/fake_utils.py106
-rw-r--r--nova/tests/image/test_glance.py265
-rw-r--r--nova/tests/integrated/integrated_helpers.py67
-rw-r--r--nova/tests/objectstore_unittest.py315
-rw-r--r--nova/tests/test_cloud.py49
-rw-r--r--nova/tests/test_direct.py27
-rw-r--r--nova/tests/test_objectstore.py148
-rw-r--r--nova/tests/test_scheduler.py161
-rw-r--r--nova/tests/test_virt.py6
-rw-r--r--nova/tests/test_vmwareapi.py252
-rw-r--r--nova/tests/test_xenapi.py177
-rw-r--r--nova/tests/vmwareapi/__init__.py21
-rw-r--r--nova/tests/vmwareapi/db_fakes.py109
-rw-r--r--nova/tests/vmwareapi/stubs.py46
-rw-r--r--nova/tests/xenapi/stubs.py29
25 files changed, 2483 insertions, 728 deletions
diff --git a/nova/tests/api/openstack/extensions/foxinsocks.py b/nova/tests/api/openstack/extensions/foxinsocks.py
new file mode 100644
index 000000000..0860b51ac
--- /dev/null
+++ b/nova/tests/api/openstack/extensions/foxinsocks.py
@@ -0,0 +1,98 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from nova import wsgi
+
+from nova.api.openstack import extensions
+
+
+class FoxInSocksController(wsgi.Controller):
+
+ def index(self, req):
+ return "Try to say this Mr. Knox, sir..."
+
+
+class Foxinsocks(object):
+
+ def __init__(self):
+ pass
+
+ def get_name(self):
+ return "Fox In Socks"
+
+ def get_alias(self):
+ return "FOXNSOX"
+
+ def get_description(self):
+ return "The Fox In Socks Extension"
+
+ def get_namespace(self):
+ return "http://www.fox.in.socks/api/ext/pie/v1.0"
+
+ def get_updated(self):
+ return "2011-01-22T13:25:27-06:00"
+
+ def get_resources(self):
+ resources = []
+ resource = extensions.ResourceExtension('foxnsocks',
+ FoxInSocksController())
+ resources.append(resource)
+ return resources
+
+ def get_actions(self):
+ actions = []
+ actions.append(extensions.ActionExtension('servers', 'add_tweedle',
+ self._add_tweedle))
+ actions.append(extensions.ActionExtension('servers', 'delete_tweedle',
+ self._delete_tweedle))
+ return actions
+
+ def get_response_extensions(self):
+ response_exts = []
+
+ def _goose_handler(res):
+ #NOTE: This only handles JSON responses.
+ # You can use content type header to test for XML.
+ data = json.loads(res.body)
+ data['flavor']['googoose'] = "Gooey goo for chewy chewing!"
+ return data
+
+ resp_ext = extensions.ResponseExtension('GET', '/v1.1/flavors/:(id)',
+ _goose_handler)
+ response_exts.append(resp_ext)
+
+ def _bands_handler(res):
+ #NOTE: This only handles JSON responses.
+ # You can use content type header to test for XML.
+ data = json.loads(res.body)
+ data['big_bands'] = 'Pig Bands!'
+ return data
+
+ resp_ext2 = extensions.ResponseExtension('GET', '/v1.1/flavors/:(id)',
+ _bands_handler)
+ response_exts.append(resp_ext2)
+ return response_exts
+
+ def _add_tweedle(self, input_dict, req, id):
+
+ return "Tweedle Beetle Added."
+
+ def _delete_tweedle(self, input_dict, req, id):
+
+ return "Tweedle Beetle Deleted."
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 56143114d..8b0729c35 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -35,6 +35,7 @@ from nova import utils
import nova.api.openstack.auth
from nova.api import openstack
from nova.api.openstack import auth
+from nova.api.openstack import versions
from nova.api.openstack import limits
from nova.auth.manager import User, Project
from nova.image import glance
@@ -85,7 +86,7 @@ def wsgi_app(inner_app10=None, inner_app11=None):
limits.RateLimitingMiddleware(inner_app11)))
mapper['/v1.0'] = api10
mapper['/v1.1'] = api11
- mapper['/'] = openstack.FaultWrapper(openstack.Versions())
+ mapper['/'] = openstack.FaultWrapper(versions.Versions())
return mapper
@@ -143,6 +144,21 @@ def stub_out_compute_api_snapshot(stubs):
stubs.Set(nova.compute.API, 'snapshot', snapshot)
+def stub_out_glance_add_image(stubs, sent_to_glance):
+ """
+ We return the metadata sent to glance by modifying the sent_to_glance dict
+ in place.
+ """
+ orig_add_image = glance_client.Client.add_image
+
+ def fake_add_image(context, metadata, data=None):
+ sent_to_glance['metadata'] = metadata
+ sent_to_glance['data'] = data
+ return orig_add_image(metadata, data)
+
+ stubs.Set(glance_client.Client, 'add_image', fake_add_image)
+
+
def stub_out_glance(stubs, initial_fixtures=None):
class FakeGlanceClient:
@@ -165,18 +181,23 @@ def stub_out_glance(stubs, initial_fixtures=None):
def fake_add_image(self, image_meta, data=None):
image_meta = copy.deepcopy(image_meta)
- id = ''.join(random.choice(string.letters) for _ in range(20))
- image_meta['id'] = id
+ image_id = ''.join(random.choice(string.letters)
+ for _ in range(20))
+ image_meta['id'] = image_id
self.fixtures.append(image_meta)
- return image_meta
+ return copy.deepcopy(image_meta)
def fake_update_image(self, image_id, image_meta, data=None):
+ for attr in ('created_at', 'updated_at', 'deleted_at', 'deleted'):
+ if attr in image_meta:
+ del image_meta[attr]
+
f = self._find_image(image_id)
if not f:
raise glance_exc.NotFound
f.update(image_meta)
- return f
+ return copy.deepcopy(f)
def fake_delete_image(self, image_id):
f = self._find_image(image_id)
@@ -185,9 +206,6 @@ def stub_out_glance(stubs, initial_fixtures=None):
self.fixtures.remove(f)
- ##def fake_delete_all(self):
- ## self.fixtures = []
-
def _find_image(self, image_id):
for f in self.fixtures:
if f['id'] == image_id:
@@ -204,10 +222,10 @@ def stub_out_glance(stubs, initial_fixtures=None):
stubs.Set(GlanceClient, 'add_image', fake.fake_add_image)
stubs.Set(GlanceClient, 'update_image', fake.fake_update_image)
stubs.Set(GlanceClient, 'delete_image', fake.fake_delete_image)
- #stubs.Set(GlanceClient, 'delete_all', fake.fake_delete_all)
class FakeToken(object):
+ # FIXME(sirp): let's not use id here
id = 0
def __init__(self, **kwargs):
diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py
new file mode 100644
index 000000000..481d34ed1
--- /dev/null
+++ b/nova/tests/api/openstack/test_extensions.py
@@ -0,0 +1,236 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import stubout
+import unittest
+import webob
+import os.path
+
+from nova import context
+from nova import flags
+from nova.api import openstack
+from nova.api.openstack import extensions
+from nova.api.openstack import flavors
+from nova.tests.api.openstack import fakes
+import nova.wsgi
+
+FLAGS = flags.FLAGS
+
+response_body = "Try to say this Mr. Knox, sir..."
+
+
+class StubController(nova.wsgi.Controller):
+
+ def __init__(self, body):
+ self.body = body
+
+ def index(self, req):
+ return self.body
+
+
+class StubExtensionManager(object):
+
+ def __init__(self, resource_ext=None, action_ext=None, response_ext=None):
+ self.resource_ext = resource_ext
+ self.action_ext = action_ext
+ self.response_ext = response_ext
+
+ def get_name(self):
+ return "Tweedle Beetle Extension"
+
+ def get_alias(self):
+ return "TWDLBETL"
+
+ def get_description(self):
+ return "Provides access to Tweedle Beetles"
+
+ def get_resources(self):
+ resource_exts = []
+ if self.resource_ext:
+ resource_exts.append(self.resource_ext)
+ return resource_exts
+
+ def get_actions(self):
+ action_exts = []
+ if self.action_ext:
+ action_exts.append(self.action_ext)
+ return action_exts
+
+ def get_response_extensions(self):
+ response_exts = []
+ if self.response_ext:
+ response_exts.append(self.response_ext)
+ return response_exts
+
+
+class ExtensionControllerTest(unittest.TestCase):
+
+ def test_index(self):
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank("/extensions")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+
+ def test_get_by_alias(self):
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank("/extensions/FOXNSOX")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+
+
+class ResourceExtensionTest(unittest.TestCase):
+
+ def test_no_extension_present(self):
+ manager = StubExtensionManager(None)
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app, manager)
+ request = webob.Request.blank("/blah")
+ response = request.get_response(ext_midware)
+ self.assertEqual(404, response.status_int)
+
+ def test_get_resources(self):
+ res_ext = extensions.ResourceExtension('tweedles',
+ StubController(response_body))
+ manager = StubExtensionManager(res_ext)
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app, manager)
+ request = webob.Request.blank("/tweedles")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(response_body, response.body)
+
+ def test_get_resources_with_controller(self):
+ res_ext = extensions.ResourceExtension('tweedles',
+ StubController(response_body))
+ manager = StubExtensionManager(res_ext)
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app, manager)
+ request = webob.Request.blank("/tweedles")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(response_body, response.body)
+
+
+class ExtensionManagerTest(unittest.TestCase):
+
+ response_body = "Try to say this Mr. Knox, sir..."
+
+ def setUp(self):
+ FLAGS.osapi_extensions_path = os.path.join(os.path.dirname(__file__),
+ "extensions")
+
+ def test_get_resources(self):
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank("/foxnsocks")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(response_body, response.body)
+
+
+class ActionExtensionTest(unittest.TestCase):
+
+ def setUp(self):
+ FLAGS.osapi_extensions_path = os.path.join(os.path.dirname(__file__),
+ "extensions")
+
+ def _send_server_action_request(self, url, body):
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank(url)
+ request.method = 'POST'
+ request.content_type = 'application/json'
+ request.body = json.dumps(body)
+ response = request.get_response(ext_midware)
+ return response
+
+ def test_extended_action(self):
+ body = dict(add_tweedle=dict(name="test"))
+ response = self._send_server_action_request("/servers/1/action", body)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual("Tweedle Beetle Added.", response.body)
+
+ body = dict(delete_tweedle=dict(name="test"))
+ response = self._send_server_action_request("/servers/1/action", body)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual("Tweedle Beetle Deleted.", response.body)
+
+ def test_invalid_action_body(self):
+ body = dict(blah=dict(name="test")) # Doesn't exist
+ response = self._send_server_action_request("/servers/1/action", body)
+ self.assertEqual(501, response.status_int)
+
+ def test_invalid_action(self):
+ body = dict(blah=dict(name="test"))
+ response = self._send_server_action_request("/asdf/1/action", body)
+ self.assertEqual(404, response.status_int)
+
+
+class ResponseExtensionTest(unittest.TestCase):
+
+ def setUp(self):
+ super(ResponseExtensionTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ fakes.FakeAuthManager.reset_fake_data()
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_auth(self.stubs)
+ self.context = context.get_admin_context()
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ super(ResponseExtensionTest, self).tearDown()
+
+ def test_get_resources_with_stub_mgr(self):
+
+ test_resp = "Gooey goo for chewy chewing!"
+
+ def _resp_handler(res):
+ # only handle JSON responses
+ data = json.loads(res.body)
+ data['flavor']['googoose'] = test_resp
+ return data
+
+ resp_ext = extensions.ResponseExtension('GET',
+ '/v1.1/flavors/:(id)',
+ _resp_handler)
+
+ manager = StubExtensionManager(None, None, resp_ext)
+ app = fakes.wsgi_app()
+ ext_midware = extensions.ExtensionMiddleware(app, manager)
+ request = webob.Request.blank("/v1.1/flavors/1")
+ request.environ['api.version'] = '1.1'
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ response_data = json.loads(response.body)
+ self.assertEqual(test_resp, response_data['flavor']['googoose'])
+
+ def test_get_resources_with_mgr(self):
+
+ test_resp = "Gooey goo for chewy chewing!"
+
+ app = fakes.wsgi_app()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank("/v1.1/flavors/1")
+ request.environ['api.version'] = '1.1'
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ response_data = json.loads(response.body)
+ self.assertEqual(test_resp, response_data['flavor']['googoose'])
+ self.assertEqual("Pig Bands!", response_data['big_bands'])
diff --git a/nova/tests/api/openstack/test_flavors.py b/nova/tests/api/openstack/test_flavors.py
index 4f504808c..954d72adf 100644
--- a/nova/tests/api/openstack/test_flavors.py
+++ b/nova/tests/api/openstack/test_flavors.py
@@ -19,11 +19,10 @@ import json
import stubout
import webob
-from nova import test
-import nova.api
+import nova.db.api
from nova import context
-from nova.api.openstack import flavors
-from nova import db
+from nova import exception
+from nova import test
from nova.tests.api.openstack import fakes
@@ -48,6 +47,10 @@ def return_instance_types(context, num=2):
return instance_types
+def return_instance_type_not_found(context, flavorid):
+ raise exception.NotFound()
+
+
class FlavorsTest(test.TestCase):
def setUp(self):
super(FlavorsTest, self).setUp()
@@ -67,7 +70,7 @@ class FlavorsTest(test.TestCase):
self.stubs.UnsetAll()
super(FlavorsTest, self).tearDown()
- def test_get_flavor_list(self):
+ def test_get_flavor_list_v1_0(self):
req = webob.Request.blank('/v1.0/flavors')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
@@ -84,7 +87,7 @@ class FlavorsTest(test.TestCase):
]
self.assertEqual(flavors, expected)
- def test_get_flavor_list_detail(self):
+ def test_get_flavor_list_detail_v1_0(self):
req = webob.Request.blank('/v1.0/flavors/detail')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
@@ -105,7 +108,7 @@ class FlavorsTest(test.TestCase):
]
self.assertEqual(flavors, expected)
- def test_get_flavor_by_id(self):
+ def test_get_flavor_by_id_v1_0(self):
req = webob.Request.blank('/v1.0/flavors/12')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
@@ -117,3 +120,144 @@ class FlavorsTest(test.TestCase):
"disk": "10",
}
self.assertEqual(flavor, expected)
+
+ def test_get_flavor_by_invalid_id(self):
+ self.stubs.Set(nova.db.api, "instance_type_get_by_flavor_id",
+ return_instance_type_not_found)
+ req = webob.Request.blank('/v1.0/flavors/asdf')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 404)
+
+ def test_get_flavor_by_id_v1_1(self):
+ req = webob.Request.blank('/v1.1/flavors/12')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ flavor = json.loads(res.body)["flavor"]
+ expected = {
+ "id": "12",
+ "name": "flavor 12",
+ "ram": "256",
+ "disk": "10",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/12",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/12",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/12",
+ },
+ ],
+ }
+ self.assertEqual(flavor, expected)
+
+ def test_get_flavor_list_v1_1(self):
+ req = webob.Request.blank('/v1.1/flavors')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ flavor = json.loads(res.body)["flavors"]
+ expected = [
+ {
+ "id": "1",
+ "name": "flavor 1",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ ],
+ },
+ {
+ "id": "2",
+ "name": "flavor 2",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ ],
+ },
+ ]
+ self.assertEqual(flavor, expected)
+
+ def test_get_flavor_list_detail_v1_1(self):
+ req = webob.Request.blank('/v1.1/flavors/detail')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ flavor = json.loads(res.body)["flavors"]
+ expected = [
+ {
+ "id": "1",
+ "name": "flavor 1",
+ "ram": "256",
+ "disk": "10",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ ],
+ },
+ {
+ "id": "2",
+ "name": "flavor 2",
+ "ram": "256",
+ "disk": "10",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ ],
+ },
+ ]
+ self.assertEqual(flavor, expected)
diff --git a/nova/tests/api/openstack/test_image_metadata.py b/nova/tests/api/openstack/test_image_metadata.py
new file mode 100644
index 000000000..9be753f84
--- /dev/null
+++ b/nova/tests/api/openstack/test_image_metadata.py
@@ -0,0 +1,166 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import stubout
+import unittest
+import webob
+
+
+from nova import flags
+from nova.api import openstack
+from nova.tests.api.openstack import fakes
+import nova.wsgi
+
+
+FLAGS = flags.FLAGS
+
+
+class ImageMetaDataTest(unittest.TestCase):
+
+ IMAGE_FIXTURES = [
+ {'status': 'active',
+ 'name': 'image1',
+ 'deleted': False,
+ 'container_format': None,
+ 'created_at': '2011-03-22T17:40:15',
+ 'disk_format': None,
+ 'updated_at': '2011-03-22T17:40:15',
+ 'id': '1',
+ 'location': 'file:///var/lib/glance/images/1',
+ 'is_public': True,
+ 'deleted_at': None,
+ 'properties': {
+ 'type': 'ramdisk',
+ 'key1': 'value1',
+ 'key2': 'value2'
+ },
+ 'size': 5882349},
+ {'status': 'active',
+ 'name': 'image2',
+ 'deleted': False,
+ 'container_format': None,
+ 'created_at': '2011-03-22T17:40:15',
+ 'disk_format': None,
+ 'updated_at': '2011-03-22T17:40:15',
+ 'id': '2',
+ 'location': 'file:///var/lib/glance/images/2',
+ 'is_public': True,
+ 'deleted_at': None,
+ 'properties': {
+ 'type': 'ramdisk',
+ 'key1': 'value1',
+ 'key2': 'value2'
+ },
+ 'size': 5882349},
+ ]
+
+ def setUp(self):
+ super(ImageMetaDataTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ self.orig_image_service = FLAGS.image_service
+ FLAGS.image_service = 'nova.image.glance.GlanceImageService'
+ fakes.FakeAuthManager.auth_data = {}
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_auth(self.stubs)
+ fakes.stub_out_glance(self.stubs, self.IMAGE_FIXTURES)
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ FLAGS.image_service = self.orig_image_service
+ super(ImageMetaDataTest, self).tearDown()
+
+ def test_index(self):
+ req = webob.Request.blank('/v1.1/images/1/meta')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value1', res_dict['metadata']['key1'])
+
+ def test_show(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value1', res_dict['key1'])
+
+ def test_show_not_found(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/key9')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(404, res.status_int)
+
+ def test_create(self):
+ req = webob.Request.blank('/v1.1/images/2/meta')
+ req.environ['api.version'] = '1.1'
+ req.method = 'POST'
+ req.body = '{"metadata": {"key9": "value9"}}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value9', res_dict['metadata']['key9'])
+ # other items should not be modified
+ self.assertEqual('value1', res_dict['metadata']['key1'])
+ self.assertEqual('value2', res_dict['metadata']['key2'])
+ self.assertEqual(1, len(res_dict))
+
+ def test_update_item(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "zz"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(200, res.status_int)
+ res_dict = json.loads(res.body)
+ self.assertEqual('zz', res_dict['key1'])
+
+ def test_update_item_too_many_keys(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1", "key2": "value2"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(400, res.status_int)
+
+ def test_update_item_body_uri_mismatch(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/bad')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(400, res.status_int)
+
+ def test_delete(self):
+ req = webob.Request.blank('/v1.1/images/2/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(200, res.status_int)
+
+ def test_delete_not_found(self):
+ req = webob.Request.blank('/v1.1/images/2/meta/blah')
+ req.environ['api.version'] = '1.1'
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(404, res.status_int)
diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py
index feb32ed9f..1cdccadd6 100644
--- a/nova/tests/api/openstack/test_images.py
+++ b/nova/tests/api/openstack/test_images.py
@@ -29,6 +29,7 @@ import tempfile
import stubout
import webob
+from glance import client as glance_client
from nova import context
from nova import exception
from nova import flags
@@ -43,78 +44,44 @@ FLAGS = flags.FLAGS
class BaseImageServiceTests(object):
-
"""Tasks to test for all image services"""
def test_create(self):
-
- fixture = {'name': 'test image',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None}
-
+ fixture = self._make_fixture('test image')
num_images = len(self.service.index(self.context))
- id = self.service.create(self.context, fixture)['id']
+ image_id = self.service.create(self.context, fixture)['id']
- self.assertNotEquals(None, id)
+ self.assertNotEquals(None, image_id)
self.assertEquals(num_images + 1,
len(self.service.index(self.context)))
def test_create_and_show_non_existing_image(self):
-
- fixture = {'name': 'test image',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None}
-
+ fixture = self._make_fixture('test image')
num_images = len(self.service.index(self.context))
- id = self.service.create(self.context, fixture)['id']
-
- self.assertNotEquals(None, id)
+ image_id = self.service.create(self.context, fixture)['id']
+ self.assertNotEquals(None, image_id)
self.assertRaises(exception.NotFound,
self.service.show,
self.context,
'bad image id')
def test_update(self):
-
- fixture = {'name': 'test image',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None}
-
- id = self.service.create(self.context, fixture)['id']
-
+ fixture = self._make_fixture('test image')
+ image_id = self.service.create(self.context, fixture)['id']
fixture['status'] = 'in progress'
- self.service.update(self.context, id, fixture)
- new_image_data = self.service.show(self.context, id)
+ self.service.update(self.context, image_id, fixture)
+
+ new_image_data = self.service.show(self.context, image_id)
self.assertEquals('in progress', new_image_data['status'])
def test_delete(self):
-
- fixtures = [
- {'name': 'test image 1',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None},
- {'name': 'test image 2',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None}]
+ fixture1 = self._make_fixture('test image 1')
+ fixture2 = self._make_fixture('test image 2')
+ fixtures = [fixture1, fixture2]
num_images = len(self.service.index(self.context))
self.assertEquals(0, num_images, str(self.service.index(self.context)))
@@ -132,6 +99,22 @@ class BaseImageServiceTests(object):
num_images = len(self.service.index(self.context))
self.assertEquals(1, num_images)
+ def test_index(self):
+ fixture = self._make_fixture('test image')
+ image_id = self.service.create(self.context, fixture)['id']
+ image_metas = self.service.index(self.context)
+ expected = [{'id': 'DONTCARE', 'name': 'test image'}]
+ self.assertDictListMatch(image_metas, expected)
+
+ @staticmethod
+ def _make_fixture(name):
+ fixture = {'name': 'test image',
+ 'updated': None,
+ 'created': None,
+ 'status': None,
+ 'is_public': True}
+ return fixture
+
class LocalImageServiceTest(test.TestCase,
BaseImageServiceTests):
@@ -167,8 +150,17 @@ class LocalImageServiceTest(test.TestCase,
class GlanceImageServiceTest(test.TestCase,
BaseImageServiceTests):
- """Tests the local image service"""
+ """Tests the Glance image service, in particular that metadata translation
+ works properly.
+
+ At a high level, the translations involved are:
+ 1. Glance -> ImageService - This is needed so we can support
+ multple ImageServices (Glance, Local, etc)
+
+ 2. ImageService -> API - This is needed so we can support multple
+ APIs (OpenStack, EC2)
+ """
def setUp(self):
super(GlanceImageServiceTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
@@ -176,41 +168,53 @@ class GlanceImageServiceTest(test.TestCase,
fakes.stub_out_compute_api_snapshot(self.stubs)
service_class = 'nova.image.glance.GlanceImageService'
self.service = utils.import_object(service_class)
- self.context = context.RequestContext(None, None)
+ self.context = context.RequestContext(1, None)
self.service.delete_all()
+ self.sent_to_glance = {}
+ fakes.stub_out_glance_add_image(self.stubs, self.sent_to_glance)
def tearDown(self):
self.stubs.UnsetAll()
super(GlanceImageServiceTest, self).tearDown()
+ def test_create_with_instance_id(self):
+ """Ensure instance_id is persisted as an image-property"""
+ fixture = {'name': 'test image',
+ 'is_public': False,
+ 'properties': {'instance_id': '42', 'user_id': '1'}}
-class ImageControllerWithGlanceServiceTest(test.TestCase):
+ image_id = self.service.create(self.context, fixture)['id']
+ expected = fixture
+ self.assertDictMatch(self.sent_to_glance['metadata'], expected)
+
+ image_meta = self.service.show(self.context, image_id)
+ expected = {'id': image_id,
+ 'name': 'test image',
+ 'is_public': False,
+ 'properties': {'instance_id': '42', 'user_id': '1'}}
+ self.assertDictMatch(image_meta, expected)
+
+ image_metas = self.service.detail(self.context)
+ self.assertDictMatch(image_metas[0], expected)
+
+ def test_create_without_instance_id(self):
+ """
+ Ensure we can create an image without having to specify an
+ instance_id. Public images are an example of an image not tied to an
+ instance.
+ """
+ fixture = {'name': 'test image'}
+ image_id = self.service.create(self.context, fixture)['id']
+
+ expected = {'name': 'test image', 'properties': {}}
+ self.assertDictMatch(self.sent_to_glance['metadata'], expected)
+
+class ImageControllerWithGlanceServiceTest(test.TestCase):
"""Test of the OpenStack API /images application controller"""
- # Registered images at start of each test.
- now = datetime.datetime.utcnow()
- IMAGE_FIXTURES = [
- {'id': '23g2ogk23k4hhkk4k42l',
- 'imageId': '23g2ogk23k4hhkk4k42l',
- 'name': 'public image #1',
- 'created_at': now.isoformat(),
- 'updated_at': now.isoformat(),
- 'deleted_at': None,
- 'deleted': False,
- 'is_public': True,
- 'status': 'available',
- 'image_type': 'kernel'},
- {'id': 'slkduhfas73kkaskgdas',
- 'imageId': 'slkduhfas73kkaskgdas',
- 'name': 'public image #2',
- 'created_at': now.isoformat(),
- 'updated_at': now.isoformat(),
- 'deleted_at': None,
- 'deleted': False,
- 'is_public': True,
- 'status': 'available',
- 'image_type': 'ramdisk'}]
+ NOW_GLANCE_FORMAT = "2010-10-11T10:30:22"
+ NOW_API_FORMAT = "2010-10-11T10:30:22Z"
def setUp(self):
super(ImageControllerWithGlanceServiceTest, self).setUp()
@@ -223,7 +227,8 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
fakes.stub_out_rate_limiting(self.stubs)
fakes.stub_out_auth(self.stubs)
fakes.stub_out_key_pair_funcs(self.stubs)
- fakes.stub_out_glance(self.stubs, initial_fixtures=self.IMAGE_FIXTURES)
+ fixtures = self._make_image_fixtures()
+ fakes.stub_out_glance(self.stubs, initial_fixtures=fixtures)
def tearDown(self):
self.stubs.UnsetAll()
@@ -233,34 +238,94 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
def test_get_image_index(self):
req = webob.Request.blank('/v1.0/images')
res = req.get_response(fakes.wsgi_app())
- res_dict = json.loads(res.body)
+ image_metas = json.loads(res.body)['images']
- fixture_index = [dict(id=f['id'], name=f['name']) for f
- in self.IMAGE_FIXTURES]
+ expected = [{'id': 123, 'name': 'public image'},
+ {'id': 124, 'name': 'queued backup'},
+ {'id': 125, 'name': 'saving backup'},
+ {'id': 126, 'name': 'active backup'},
+ {'id': 127, 'name': 'killed backup'}]
- for image in res_dict['images']:
- self.assertEquals(1, fixture_index.count(image),
- "image %s not in fixture index!" % str(image))
+ self.assertDictListMatch(image_metas, expected)
def test_get_image_details(self):
req = webob.Request.blank('/v1.0/images/detail')
res = req.get_response(fakes.wsgi_app())
- res_dict = json.loads(res.body)
-
- for image in self.IMAGE_FIXTURES:
- expected = {
- 'id': abs(hash(image['imageId'])),
- 'name': image['name'],
- 'status': 'active',
- }
- self.assertTrue(expected in res_dict['images'])
-
- def test_show_image(self):
- expected = self.IMAGE_FIXTURES[0]
- id = abs(hash(expected['id']))
- expected_time = self.now.strftime('%Y-%m-%dT%H:%M:%SZ')
- req = webob.Request.blank('/v1.0/images/%s' % id)
+ image_metas = json.loads(res.body)['images']
+
+ now = self.NOW_API_FORMAT
+ expected = [
+ {'id': 123, 'name': 'public image', 'updated': now,
+ 'created': now, 'status': 'ACTIVE'},
+ {'id': 124, 'name': 'queued backup', 'serverId': 42,
+ 'updated': now, 'created': now,
+ 'status': 'QUEUED'},
+ {'id': 125, 'name': 'saving backup', 'serverId': 42,
+ 'updated': now, 'created': now,
+ 'status': 'SAVING', 'progress': 0},
+ {'id': 126, 'name': 'active backup', 'serverId': 42,
+ 'updated': now, 'created': now,
+ 'status': 'ACTIVE'},
+ {'id': 127, 'name': 'killed backup', 'serverId': 42,
+ 'updated': now, 'created': now,
+ 'status': 'FAILED'}
+ ]
+
+ self.assertDictListMatch(image_metas, expected)
+
+ def test_get_image_found(self):
+ req = webob.Request.blank('/v1.0/images/123')
+ res = req.get_response(fakes.wsgi_app())
+ image_meta = json.loads(res.body)['image']
+ expected = {'id': 123, 'name': 'public image',
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT, 'status': 'ACTIVE'}
+ self.assertDictMatch(image_meta, expected)
+
+ def test_get_image_non_existent(self):
+ req = webob.Request.blank('/v1.0/images/4242')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 404)
+
+ def test_get_image_not_owned(self):
+ """We should return a 404 if we request an image that doesn't belong
+ to us
+ """
+ req = webob.Request.blank('/v1.0/images/128')
res = req.get_response(fakes.wsgi_app())
- actual = json.loads(res.body)['image']
- self.assertEqual(expected_time, actual['created_at'])
- self.assertEqual(expected_time, actual['updated_at'])
+ self.assertEqual(res.status_int, 404)
+
+ @classmethod
+ def _make_image_fixtures(cls):
+ image_id = 123
+ base_attrs = {'created_at': cls.NOW_GLANCE_FORMAT,
+ 'updated_at': cls.NOW_GLANCE_FORMAT,
+ 'deleted_at': None,
+ 'deleted': False}
+
+ fixtures = []
+
+ def add_fixture(**kwargs):
+ kwargs.update(base_attrs)
+ fixtures.append(kwargs)
+
+ # Public image
+ add_fixture(id=image_id, name='public image', is_public=True,
+ status='active', properties={})
+ image_id += 1
+
+ # Backup for User 1
+ backup_properties = {'instance_id': '42', 'user_id': '1'}
+ for status in ('queued', 'saving', 'active', 'killed'):
+ add_fixture(id=image_id, name='%s backup' % status,
+ is_public=False, status=status,
+ properties=backup_properties)
+ image_id += 1
+
+ # Backup for User 2
+ other_backup_properties = {'instance_id': '43', 'user_id': '2'}
+ add_fixture(id=image_id, name='someone elses backup', is_public=False,
+ status='active', properties=other_backup_properties)
+ image_id += 1
+
+ return fixtures
diff --git a/nova/tests/api/openstack/test_server_metadata.py b/nova/tests/api/openstack/test_server_metadata.py
new file mode 100644
index 000000000..c8d456472
--- /dev/null
+++ b/nova/tests/api/openstack/test_server_metadata.py
@@ -0,0 +1,164 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import stubout
+import unittest
+import webob
+
+
+from nova.api import openstack
+from nova.tests.api.openstack import fakes
+import nova.wsgi
+
+
+def return_create_instance_metadata(context, server_id, metadata):
+ return stub_server_metadata()
+
+
+def return_server_metadata(context, server_id):
+ return stub_server_metadata()
+
+
+def return_empty_server_metadata(context, server_id):
+ return {}
+
+
+def delete_server_metadata(context, server_id, key):
+ pass
+
+
+def stub_server_metadata():
+ metadata = {
+ "key1": "value1",
+ "key2": "value2",
+ "key3": "value3",
+ "key4": "value4",
+ "key5": "value5"
+ }
+ return metadata
+
+
+class ServerMetaDataTest(unittest.TestCase):
+
+ def setUp(self):
+ super(ServerMetaDataTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ fakes.FakeAuthManager.auth_data = {}
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_auth(self.stubs)
+ fakes.stub_out_key_pair_funcs(self.stubs)
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ super(ServerMetaDataTest, self).tearDown()
+
+ def test_index(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_get',
+ return_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value1', res_dict['metadata']['key1'])
+
+ def test_index_no_data(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_get',
+ return_empty_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual(0, len(res_dict['metadata']))
+
+ def test_show(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_get',
+ return_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key5')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value5', res_dict['key5'])
+
+ def test_show_meta_not_found(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_get',
+ return_empty_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key6')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(404, res.status_int)
+
+ def test_delete(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_delete',
+ delete_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key5')
+ req.environ['api.version'] = '1.1'
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(200, res.status_int)
+
+ def test_create(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
+ return_create_instance_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta')
+ req.environ['api.version'] = '1.1'
+ req.method = 'POST'
+ req.body = '{"metadata": {"key1": "value1"}}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value1', res_dict['metadata']['key1'])
+
+ def test_update_item(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
+ return_create_instance_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(200, res.status_int)
+ res_dict = json.loads(res.body)
+ self.assertEqual('value1', res_dict['key1'])
+
+ def test_update_item_too_many_keys(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
+ return_create_instance_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1", "key2": "value2"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(400, res.status_int)
+
+ def test_update_item_body_uri_mismatch(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
+ return_create_instance_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/bad')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(400, res.status_int)
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 9462bd40b..737b43c7b 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -26,6 +26,7 @@ import webob
from nova import context
from nova import db
+from nova import exception
from nova import flags
from nova import test
import nova.api.openstack
@@ -164,6 +165,33 @@ class ServersTest(test.TestCase):
self.assertEqual(res_dict['server']['id'], 1)
self.assertEqual(res_dict['server']['name'], 'server1')
+ def test_get_server_by_id_v11(self):
+ req = webob.Request.blank('/v1.1/servers/1')
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(res_dict['server']['id'], 1)
+ self.assertEqual(res_dict['server']['name'], 'server1')
+
+ expected_links = [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/servers/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/servers/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/servers/1",
+ },
+ ]
+
+ print res_dict['server']
+ self.assertEqual(res_dict['server']['links'], expected_links)
+
def test_get_server_by_id_with_addresses(self):
private = "192.168.0.3"
public = ["1.2.3.4"]
@@ -186,7 +214,6 @@ class ServersTest(test.TestCase):
new_return_server = return_server_with_addresses(private, public)
self.stubs.Set(nova.db.api, 'instance_get', new_return_server)
req = webob.Request.blank('/v1.1/servers/1')
- req.environ['api.version'] = '1.1'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res_dict['server']['id'], 1)
@@ -211,6 +238,35 @@ class ServersTest(test.TestCase):
self.assertEqual(s.get('imageId', None), None)
i += 1
+ def test_get_server_list_v11(self):
+ req = webob.Request.blank('/v1.1/servers')
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+
+ for i, s in enumerate(res_dict['servers']):
+ self.assertEqual(s['id'], i)
+ self.assertEqual(s['name'], 'server%d' % i)
+ self.assertEqual(s.get('imageId', None), None)
+
+ expected_links = [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/servers/%d" % (i,),
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/servers/%d" % (i,),
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/servers/%d" % (i,),
+ },
+ ]
+
+ self.assertEqual(s['links'], expected_links)
+
def test_get_servers_with_limit(self):
req = webob.Request.blank('/v1.0/servers?limit=3')
res = req.get_response(fakes.wsgi_app())
@@ -239,6 +295,36 @@ class ServersTest(test.TestCase):
servers = json.loads(res.body)['servers']
self.assertEqual([s['id'] for s in servers], [1, 2])
+ def test_get_servers_with_bad_limit(self):
+ req = webob.Request.blank('/v1.0/servers?limit=asdf&offset=1')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+ self.assertTrue(res.body.find('limit param') > -1)
+
+ def test_get_servers_with_bad_offset(self):
+ req = webob.Request.blank('/v1.0/servers?limit=2&offset=asdf')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+ self.assertTrue(res.body.find('offset param') > -1)
+
+ def test_get_servers_with_marker(self):
+ req = webob.Request.blank('/v1.1/servers?marker=2')
+ res = req.get_response(fakes.wsgi_app())
+ servers = json.loads(res.body)['servers']
+ self.assertEqual([s['id'] for s in servers], [3, 4])
+
+ def test_get_servers_with_limit_and_marker(self):
+ req = webob.Request.blank('/v1.1/servers?limit=2&marker=1')
+ res = req.get_response(fakes.wsgi_app())
+ servers = json.loads(res.body)['servers']
+ self.assertEqual([s['id'] for s in servers], [2, 3])
+
+ def test_get_servers_with_bad_marker(self):
+ req = webob.Request.blank('/v1.1/servers?limit=2&marker=asdf')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+ self.assertTrue(res.body.find('marker param') > -1)
+
def _setup_for_create_instance(self):
"""Shared implementation for tests below that create instance"""
def instance_create(context, inst):
@@ -428,7 +514,6 @@ class ServersTest(test.TestCase):
def test_get_all_server_details_v1_1(self):
req = webob.Request.blank('/v1.1/servers/detail')
- req.environ['api.version'] = '1.1'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
@@ -1230,3 +1315,57 @@ class TestServerInstanceCreation(test.TestCase):
server = dom.childNodes[0]
self.assertEquals(server.nodeName, 'server')
self.assertTrue(server.getAttribute('adminPass').startswith('fake'))
+
+
+class TestGetKernelRamdiskFromImage(test.TestCase):
+ """
+ If we're building from an AMI-style image, we need to be able to fetch the
+ kernel and ramdisk associated with the machine image. This information is
+ stored with the image metadata and return via the ImageService.
+
+ These tests ensure that we parse the metadata return the ImageService
+ correctly and that we handle failure modes appropriately.
+ """
+
+ def test_status_not_active(self):
+ """We should only allow fetching of kernel and ramdisk information if
+ we have a 'fully-formed' image, aka 'active'
+ """
+ image_meta = {'id': 1, 'status': 'queued'}
+ self.assertRaises(exception.Invalid, self._get_k_r, image_meta)
+
+ def test_not_ami(self):
+ """Anything other than ami should return no kernel and no ramdisk"""
+ image_meta = {'id': 1, 'status': 'active',
+ 'properties': {'disk_format': 'vhd'}}
+ kernel_id, ramdisk_id = self._get_k_r(image_meta)
+ self.assertEqual(kernel_id, None)
+ self.assertEqual(ramdisk_id, None)
+
+ def test_ami_no_kernel(self):
+ """If an ami is missing a kernel it should raise NotFound"""
+ image_meta = {'id': 1, 'status': 'active',
+ 'properties': {'disk_format': 'ami', 'ramdisk_id': 1}}
+ self.assertRaises(exception.NotFound, self._get_k_r, image_meta)
+
+ def test_ami_no_ramdisk(self):
+ """If an ami is missing a ramdisk it should raise NotFound"""
+ image_meta = {'id': 1, 'status': 'active',
+ 'properties': {'disk_format': 'ami', 'kernel_id': 1}}
+ self.assertRaises(exception.NotFound, self._get_k_r, image_meta)
+
+ def test_ami_kernel_ramdisk_present(self):
+ """Return IDs if both kernel and ramdisk are present"""
+ image_meta = {'id': 1, 'status': 'active',
+ 'properties': {'disk_format': 'ami', 'kernel_id': 1,
+ 'ramdisk_id': 2}}
+ kernel_id, ramdisk_id = self._get_k_r(image_meta)
+ self.assertEqual(kernel_id, 1)
+ self.assertEqual(ramdisk_id, 2)
+
+ @staticmethod
+ def _get_k_r(image_meta):
+ """Rebinding function to a shorter name for convenience"""
+ kernel_id, ramdisk_id = \
+ servers.Controller._do_get_kernel_ramdisk_from_image(image_meta)
+ return kernel_id, ramdisk_id
diff --git a/nova/tests/api/openstack/test_versions.py b/nova/tests/api/openstack/test_versions.py
new file mode 100644
index 000000000..ebb59a9a6
--- /dev/null
+++ b/nova/tests/api/openstack/test_versions.py
@@ -0,0 +1,97 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010-2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import webob
+
+from nova import context
+from nova import test
+from nova.tests.api.openstack import fakes
+from nova.api.openstack import views
+
+
+class VersionsTest(test.TestCase):
+ def setUp(self):
+ super(VersionsTest, self).setUp()
+ self.context = context.get_admin_context()
+
+ def tearDown(self):
+ super(VersionsTest, self).tearDown()
+
+ def test_get_version_list(self):
+ req = webob.Request.blank('/')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ versions = json.loads(res.body)["versions"]
+ expected = [
+ {
+ "id": "v1.1",
+ "status": "CURRENT",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1",
+ }
+ ],
+ },
+ {
+ "id": "v1.0",
+ "status": "DEPRECATED",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.0",
+ }
+ ],
+ },
+ ]
+ self.assertEqual(versions, expected)
+
+ def test_view_builder(self):
+ base_url = "http://example.org/"
+
+ version_data = {
+ "id": "3.2.1",
+ "status": "CURRENT",
+ }
+
+ expected = {
+ "id": "3.2.1",
+ "status": "CURRENT",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://example.org/3.2.1",
+ },
+ ],
+ }
+
+ builder = views.versions.ViewBuilder(base_url)
+ output = builder.build(version_data)
+
+ self.assertEqual(output, expected)
+
+ def test_generate_href(self):
+ base_url = "http://example.org/app/"
+ version_number = "v1.4.6"
+
+ expected = "http://example.org/app/v1.4.6"
+
+ builder = views.versions.ViewBuilder(base_url)
+ actual = builder.generate_href(version_number)
+
+ self.assertEqual(actual, expected)
diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py
index 2d25d5fc5..21a5481bd 100644
--- a/nova/tests/db/fakes.py
+++ b/nova/tests/db/fakes.py
@@ -24,7 +24,7 @@ from nova import test
from nova import utils
-def stub_out_db_instance_api(stubs):
+def stub_out_db_instance_api(stubs, injected=True):
""" Stubs out the db API for creating Instances """
INSTANCE_TYPES = {
@@ -56,6 +56,25 @@ def stub_out_db_instance_api(stubs):
flavorid=5,
rxtx_cap=5)}
+ network_fields = {
+ 'id': 'test',
+ 'bridge': 'xenbr0',
+ 'label': 'test_network',
+ 'netmask': '255.255.255.0',
+ 'cidr_v6': 'fe80::a00:0/120',
+ 'netmask_v6': '120',
+ 'gateway': '10.0.0.1',
+ 'gateway_v6': 'fe80::a00:1',
+ 'broadcast': '10.0.0.255',
+ 'dns': '10.0.0.2',
+ 'ra_server': None,
+ 'injected': injected}
+
+ fixed_ip_fields = {
+ 'address': '10.0.0.3',
+ 'address_v6': 'fe80::a00:3',
+ 'network_id': 'test'}
+
class FakeModel(object):
""" Stubs out for model """
def __init__(self, values):
@@ -76,38 +95,29 @@ def stub_out_db_instance_api(stubs):
def fake_instance_type_get_by_name(context, name):
return INSTANCE_TYPES[name]
- def fake_instance_create(values):
- """ Stubs out the db.instance_create method """
-
- type_data = INSTANCE_TYPES[values['instance_type']]
-
- base_options = {
- 'name': values['name'],
- 'id': values['id'],
- 'reservation_id': utils.generate_uid('r'),
- 'image_id': values['image_id'],
- 'kernel_id': values['kernel_id'],
- 'ramdisk_id': values['ramdisk_id'],
- 'state_description': 'scheduling',
- 'user_id': values['user_id'],
- 'project_id': values['project_id'],
- 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
- 'instance_type': values['instance_type'],
- 'memory_mb': type_data['memory_mb'],
- 'mac_address': values['mac_address'],
- 'vcpus': type_data['vcpus'],
- 'local_gb': type_data['local_gb'],
- 'os_type': values['os_type']}
-
- return FakeModel(base_options)
-
def fake_network_get_by_instance(context, instance_id):
- fields = {
- 'bridge': 'xenbr0',
- }
- return FakeModel(fields)
+ return FakeModel(network_fields)
+
+ def fake_network_get_all_by_instance(context, instance_id):
+ return [FakeModel(network_fields)]
+
+ def fake_instance_get_fixed_address(context, instance_id):
+ return FakeModel(fixed_ip_fields).address
+
+ def fake_instance_get_fixed_address_v6(context, instance_id):
+ return FakeModel(fixed_ip_fields).address
+
+ def fake_fixed_ip_get_all_by_instance(context, instance_id):
+ return [FakeModel(fixed_ip_fields)]
- stubs.Set(db, 'instance_create', fake_instance_create)
stubs.Set(db, 'network_get_by_instance', fake_network_get_by_instance)
stubs.Set(db, 'instance_type_get_all', fake_instance_type_get_all)
stubs.Set(db, 'instance_type_get_by_name', fake_instance_type_get_by_name)
+ stubs.Set(db, 'instance_get_fixed_address',
+ fake_instance_get_fixed_address)
+ stubs.Set(db, 'instance_get_fixed_address_v6',
+ fake_instance_get_fixed_address_v6)
+ stubs.Set(db, 'network_get_all_by_instance',
+ fake_network_get_all_by_instance)
+ stubs.Set(db, 'fixed_ip_get_all_by_instance',
+ fake_fixed_ip_get_all_by_instance)
diff --git a/nova/tests/fake_utils.py b/nova/tests/fake_utils.py
new file mode 100644
index 000000000..823c775cb
--- /dev/null
+++ b/nova/tests/fake_utils.py
@@ -0,0 +1,106 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""This modules stubs out functions in nova.utils
+"""
+
+import re
+import types
+
+from eventlet import greenthread
+
+from nova import exception
+from nova import log as logging
+from nova import utils
+
+LOG = logging.getLogger('nova.tests.fake_utils')
+
+_fake_execute_repliers = []
+_fake_execute_log = []
+
+
+def fake_execute_get_log():
+ return _fake_execute_log
+
+
+def fake_execute_clear_log():
+ global _fake_execute_log
+ _fake_execute_log = []
+
+
+def fake_execute_set_repliers(repliers):
+ """Allows the client to configure replies to commands"""
+ global _fake_execute_repliers
+ _fake_execute_repliers = repliers
+
+
+def fake_execute_default_reply_handler(*ignore_args, **ignore_kwargs):
+ """A reply handler for commands that haven't been added to the reply
+ list. Returns empty strings for stdout and stderr
+ """
+ return '', ''
+
+
+def fake_execute(*cmd_parts, **kwargs):
+ """This function stubs out execute, optionally executing
+ a preconfigued function to return expected data
+ """
+ global _fake_execute_repliers
+
+ process_input = kwargs.get('process_input', None)
+ addl_env = kwargs.get('addl_env', None)
+ check_exit_code = kwargs.get('check_exit_code', 0)
+ cmd_str = ' '.join(str(part) for part in cmd_parts)
+
+ LOG.debug(_("Faking execution of cmd (subprocess): %s"), cmd_str)
+ _fake_execute_log.append(cmd_str)
+
+ reply_handler = fake_execute_default_reply_handler
+
+ for fake_replier in _fake_execute_repliers:
+ if re.match(fake_replier[0], cmd_str):
+ reply_handler = fake_replier[1]
+ LOG.debug(_('Faked command matched %s') % fake_replier[0])
+ break
+
+ if isinstance(reply_handler, basestring):
+ # If the reply handler is a string, return it as stdout
+ reply = reply_handler, ''
+ else:
+ try:
+ # Alternative is a function, so call it
+ reply = reply_handler(cmd_parts,
+ process_input=process_input,
+ addl_env=addl_env,
+ check_exit_code=check_exit_code)
+ except exception.ProcessExecutionError as e:
+ LOG.debug(_('Faked command raised an exception %s' % str(e)))
+ raise
+
+ stdout = reply[0]
+ stderr = reply[1]
+ LOG.debug(_("Reply to faked command is stdout='%(stdout)s' "
+ "stderr='%(stderr)s'") % locals())
+
+ # Replicate the sleep call in the real function
+ greenthread.sleep(0)
+ return reply
+
+
+def stub_out_utils_execute(stubs):
+ fake_execute_set_repliers([])
+ fake_execute_clear_log()
+ stubs.Set(utils, 'execute', fake_execute)
diff --git a/nova/tests/image/test_glance.py b/nova/tests/image/test_glance.py
index f1f8504f3..d03aa9cc8 100644
--- a/nova/tests/image/test_glance.py
+++ b/nova/tests/image/test_glance.py
@@ -19,6 +19,8 @@
import datetime
import unittest
+from nova import context
+from nova import test
from nova.image import glance
@@ -29,14 +31,14 @@ class StubGlanceClient(object):
self.add_response = add_response
self.update_response = update_response
- def get_image_meta(self, id):
- return self.images[id]
+ def get_image_meta(self, image_id):
+ return self.images[image_id]
def get_images_detailed(self):
return self.images.itervalues()
- def get_image(self, id):
- return self.images[id], []
+ def get_image(self, image_id):
+ return self.images[image_id], []
def add_image(self, metadata, data):
return self.add_response
@@ -46,143 +48,144 @@ class StubGlanceClient(object):
class NullWriter(object):
+ """Used to test ImageService.get which takes a writer object"""
def write(self, *arg, **kwargs):
pass
-class TestGlanceImageServiceDatetimes(unittest.TestCase):
+class BaseGlanceTest(unittest.TestCase):
+ NOW_GLANCE_FORMAT = "2010-10-11T10:30:22"
+ NOW_DATETIME = datetime.datetime(2010, 10, 11, 10, 30, 22)
def setUp(self):
+ # FIXME(sirp): we can probably use stubs library here rather than
+ # dependency injection
self.client = StubGlanceClient(None)
self.service = glance.GlanceImageService(self.client)
+ self.context = context.RequestContext(None, None)
+ def assertDateTimesFilled(self, image_meta):
+ self.assertEqual(image_meta['created_at'], self.NOW_DATETIME)
+ self.assertEqual(image_meta['updated_at'], self.NOW_DATETIME)
+ self.assertEqual(image_meta['deleted_at'], self.NOW_DATETIME)
+
+ def assertDateTimesEmpty(self, image_meta):
+ self.assertEqual(image_meta['updated_at'], None)
+ self.assertEqual(image_meta['deleted_at'], None)
+
+
+class TestGlanceImageServiceProperties(BaseGlanceTest):
def test_show_passes_through_to_client(self):
- self.client.images = {'xyz': {'foo': 'bar'}}
- self.assertEqual(self.service.show({}, 'xyz'), {'foo': 'bar'})
+ """Ensure attributes which aren't BASE_IMAGE_ATTRS are stored in the
+ properties dict
+ """
+ fixtures = {'image1': {'name': 'image1', 'is_public': True,
+ 'foo': 'bar',
+ 'properties': {'prop1': 'propvalue1'}}}
+ self.client.images = fixtures
+ image_meta = self.service.show(self.context, 'image1')
+
+ expected = {'name': 'image1', 'is_public': True,
+ 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}
+ self.assertEqual(image_meta, expected)
def test_detail_passes_through_to_client(self):
- self.client.images = {1: {'foo': 'bar'}}
- self.assertEqual(list(self.service.detail({})), [{'foo': 'bar'}])
-
- def test_show_makes_create_datetimes(self):
- create_time = datetime.datetime.utcnow()
- self.client.images = {'xyz': {
- 'id': "id",
- 'name': "my awesome image",
- 'created_at': create_time.isoformat(),
- }}
- actual = self.service.show({}, 'xyz')
- self.assertEqual(actual['created_at'], create_time)
-
- def test_show_makes_update_datetimes(self):
- update_time = datetime.datetime.utcnow()
- self.client.images = {'abc': {
- 'id': "id",
- 'name': "my okay image",
- 'updated_at': update_time.isoformat(),
- }}
- actual = self.service.show({}, 'abc')
- self.assertEqual(actual['updated_at'], update_time)
-
- def test_show_makes_delete_datetimes(self):
- delete_time = datetime.datetime.utcnow()
- self.client.images = {'123': {
- 'id': "123",
- 'name': "my lame image",
- 'deleted_at': delete_time.isoformat(),
- }}
- actual = self.service.show({}, '123')
- self.assertEqual(actual['deleted_at'], delete_time)
-
- def test_show_handles_deleted_at_none(self):
- self.client.images = {'747': {
- 'id': "747",
- 'name': "not deleted",
- 'deleted_at': None,
- }}
- actual = self.service.show({}, '747')
- self.assertEqual(actual['deleted_at'], None)
-
- def test_detail_handles_timestamps(self):
- now = datetime.datetime.utcnow()
- image1 = {
- 'id': 1,
- 'name': 'image 1',
- 'created_at': now.isoformat(),
- 'updated_at': now.isoformat(),
- 'deleted_at': None,
- }
- image2 = {
- 'id': 2,
- 'name': 'image 2',
- 'deleted_at': now.isoformat(),
- }
- self.client.images = {1: image1, 2: image2}
- i1, i2 = self.service.detail({})
- self.assertEqual(i1['created_at'], now)
- self.assertEqual(i1['updated_at'], now)
- self.assertEqual(i1['deleted_at'], None)
- self.assertEqual(i2['deleted_at'], now)
-
- def test_get_handles_timestamps(self):
- now = datetime.datetime.utcnow()
- self.client.images = {'abcd': {
- 'id': 'abcd',
- 'name': 'nifty image',
- 'created_at': now.isoformat(),
- 'updated_at': now.isoformat(),
- 'deleted_at': now.isoformat(),
- }}
- actual = self.service.get({}, 'abcd', NullWriter())
- for attr in ('created_at', 'updated_at', 'deleted_at'):
- self.assertEqual(actual[attr], now)
-
- def test_get_handles_deleted_at_none(self):
- self.client.images = {'abcd': {'deleted_at': None}}
- actual = self.service.get({}, 'abcd', NullWriter())
- self.assertEqual(actual['deleted_at'], None)
-
- def test_create_handles_timestamps(self):
- now = datetime.datetime.utcnow()
- self.client.add_response = {
- 'id': 'abcd',
- 'name': 'blah',
- 'created_at': now.isoformat(),
- 'updated_at': now.isoformat(),
- 'deleted_at': now.isoformat(),
- }
- actual = self.service.create({}, {})
- for attr in ('created_at', 'updated_at', 'deleted_at'):
- self.assertEqual(actual[attr], now)
-
- def test_create_handles_deleted_at_none(self):
- self.client.add_response = {
- 'id': 'abcd',
- 'name': 'blah',
- 'deleted_at': None,
- }
- actual = self.service.create({}, {})
- self.assertEqual(actual['deleted_at'], None)
-
- def test_update_handles_timestamps(self):
- now = datetime.datetime.utcnow()
- self.client.update_response = {
- 'id': 'abcd',
- 'name': 'blah',
- 'created_at': now.isoformat(),
- 'updated_at': now.isoformat(),
- 'deleted_at': now.isoformat(),
- }
- actual = self.service.update({}, 'dummy_id', {})
- for attr in ('created_at', 'updated_at', 'deleted_at'):
- self.assertEqual(actual[attr], now)
-
- def test_create_handles_deleted_at_none(self):
- self.client.update_response = {
- 'id': 'abcd',
- 'name': 'blah',
- 'deleted_at': None,
- }
- actual = self.service.update({}, 'dummy_id', {})
- self.assertEqual(actual['deleted_at'], None)
+ fixtures = {'image1': {'name': 'image1', 'is_public': True,
+ 'foo': 'bar',
+ 'properties': {'prop1': 'propvalue1'}}}
+ self.client.images = fixtures
+ image_meta = self.service.detail(self.context)
+ expected = [{'name': 'image1', 'is_public': True,
+ 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}]
+ self.assertEqual(image_meta, expected)
+
+
+class TestGetterDateTimeNoneTests(BaseGlanceTest):
+
+ def test_show_handles_none_datetimes(self):
+ self.client.images = self._make_none_datetime_fixtures()
+ image_meta = self.service.show(self.context, 'image1')
+ self.assertDateTimesEmpty(image_meta)
+
+ def test_detail_handles_none_datetimes(self):
+ self.client.images = self._make_none_datetime_fixtures()
+ image_meta = self.service.detail(self.context)[0]
+ self.assertDateTimesEmpty(image_meta)
+
+ def test_get_handles_none_datetimes(self):
+ self.client.images = self._make_none_datetime_fixtures()
+ writer = NullWriter()
+ image_meta = self.service.get(self.context, 'image1', writer)
+ self.assertDateTimesEmpty(image_meta)
+
+ def test_show_makes_datetimes(self):
+ self.client.images = self._make_datetime_fixtures()
+ image_meta = self.service.show(self.context, 'image1')
+ self.assertDateTimesFilled(image_meta)
+
+ def test_detail_makes_datetimes(self):
+ self.client.images = self._make_datetime_fixtures()
+ image_meta = self.service.detail(self.context)[0]
+ self.assertDateTimesFilled(image_meta)
+
+ def test_get_makes_datetimes(self):
+ self.client.images = self._make_datetime_fixtures()
+ writer = NullWriter()
+ image_meta = self.service.get(self.context, 'image1', writer)
+ self.assertDateTimesFilled(image_meta)
+
+ def _make_datetime_fixtures(self):
+ fixtures = {'image1': {'name': 'image1', 'is_public': True,
+ 'created_at': self.NOW_GLANCE_FORMAT,
+ 'updated_at': self.NOW_GLANCE_FORMAT,
+ 'deleted_at': self.NOW_GLANCE_FORMAT}}
+ return fixtures
+
+ def _make_none_datetime_fixtures(self):
+ fixtures = {'image1': {'name': 'image1', 'is_public': True,
+ 'updated_at': None,
+ 'deleted_at': None}}
+ return fixtures
+
+
+class TestMutatorDateTimeTests(BaseGlanceTest):
+ """Tests create(), update()"""
+
+ def test_create_handles_datetimes(self):
+ self.client.add_response = self._make_datetime_fixture()
+ image_meta = self.service.create(self.context, {})
+ self.assertDateTimesFilled(image_meta)
+
+ def test_create_handles_none_datetimes(self):
+ self.client.add_response = self._make_none_datetime_fixture()
+ dummy_meta = {}
+ image_meta = self.service.create(self.context, dummy_meta)
+ self.assertDateTimesEmpty(image_meta)
+
+ def test_update_handles_datetimes(self):
+ self.client.update_response = self._make_datetime_fixture()
+ dummy_id = 'dummy_id'
+ dummy_meta = {}
+ image_meta = self.service.update(self.context, 'dummy_id', dummy_meta)
+ self.assertDateTimesFilled(image_meta)
+
+ def test_update_handles_none_datetimes(self):
+ self.client.update_response = self._make_none_datetime_fixture()
+ dummy_id = 'dummy_id'
+ dummy_meta = {}
+ image_meta = self.service.update(self.context, 'dummy_id', dummy_meta)
+ self.assertDateTimesEmpty(image_meta)
+
+ def _make_datetime_fixture(self):
+ fixture = {'id': 'image1', 'name': 'image1', 'is_public': True,
+ 'created_at': self.NOW_GLANCE_FORMAT,
+ 'updated_at': self.NOW_GLANCE_FORMAT,
+ 'deleted_at': self.NOW_GLANCE_FORMAT}
+ return fixture
+
+ def _make_none_datetime_fixture(self):
+ fixture = {'id': 'image1', 'name': 'image1', 'is_public': True,
+ 'updated_at': None,
+ 'deleted_at': None}
+ return fixture
diff --git a/nova/tests/integrated/integrated_helpers.py b/nova/tests/integrated/integrated_helpers.py
index b68d34fd9..99d597f9a 100644
--- a/nova/tests/integrated/integrated_helpers.py
+++ b/nova/tests/integrated/integrated_helpers.py
@@ -19,7 +19,6 @@
Provides common functionality for integrated unit tests
"""
-import os
import random
import string
@@ -28,7 +27,6 @@ from nova import flags
from nova import service
from nova import test # For the flags
from nova.auth import manager
-from nova.exception import Error
from nova.log import logging
from nova.tests.integrated.api import client
@@ -98,24 +96,20 @@ class TestUser(object):
class IntegratedUnitTestContext(object):
- __INSTANCE = None
-
def __init__(self):
self.auth_manager = manager.AuthManager()
- self.wsgi_server = None
- self.wsgi_apps = []
self.api_service = None
-
self.services = []
self.auth_url = None
self.project_name = None
+ self.test_user = None
+
self.setup()
def setup(self):
self._start_services()
-
self._create_test_user()
def _create_test_user(self):
@@ -129,16 +123,13 @@ class IntegratedUnitTestContext(object):
self._start_compute_service()
self._start_volume_service()
self._start_scheduler_service()
-
+
# NOTE(justinsb): There's a bug here which is eluding me...
# If we start the network_service, all is good, but then subsequent
# tests fail: CloudTestCase.test_ajax_console in particular.
#self._start_network_service()
- # WSGI shutdown broken :-(
- # bug731668
- if not self.api_service:
- self._start_api_service()
+ self._start_api_service()
def _start_compute_service(self):
compute_service = service.Service.create(binary='nova-compute')
@@ -165,15 +156,10 @@ class IntegratedUnitTestContext(object):
return scheduler_service
def cleanup(self):
- for service in self.services:
- service.kill()
- self.services = []
- # TODO(justinsb): Shutdown WSGI & anything else we startup
- # bug731668
- # WSGI shutdown broken :-(
- # self.wsgi_server.terminate()
- # self.wsgi_server = None
self.test_user = None
+ for svc in self.services:
+ svc.kill()
+ self.services = []
def _create_unittest_user(self):
users = self.auth_manager.get_users()
@@ -206,43 +192,14 @@ class IntegratedUnitTestContext(object):
if not api_service:
raise Exception("API Service was None")
- # WSGI shutdown broken :-(
- #self.services.append(volume_service)
+ # NOTE(justinsb): The API service doesn't have a kill method yet,
+ # so we treat it separately
self.api_service = api_service
self.auth_url = 'http://localhost:8774/v1.0'
return api_service
- # WSGI shutdown broken :-(
- # bug731668
- #@staticmethod
- #def get():
- # if not IntegratedUnitTestContext.__INSTANCE:
- # IntegratedUnitTestContext.startup()
- # #raise Error("Must call IntegratedUnitTestContext::startup")
- # return IntegratedUnitTestContext.__INSTANCE
-
- @staticmethod
- def startup():
- # Because WSGI shutdown is broken at the moment, we have to recycle
- # bug731668
- if IntegratedUnitTestContext.__INSTANCE:
- #raise Error("Multiple calls to IntegratedUnitTestContext.startup")
- IntegratedUnitTestContext.__INSTANCE.setup()
- else:
- IntegratedUnitTestContext.__INSTANCE = IntegratedUnitTestContext()
- return IntegratedUnitTestContext.__INSTANCE
-
- @staticmethod
- def shutdown():
- if not IntegratedUnitTestContext.__INSTANCE:
- raise Error("Must call IntegratedUnitTestContext::startup")
- IntegratedUnitTestContext.__INSTANCE.cleanup()
- # WSGI shutdown broken :-(
- # bug731668
- #IntegratedUnitTestContext.__INSTANCE = None
-
class _IntegratedTestBase(test.TestCase):
def setUp(self):
@@ -251,12 +208,12 @@ class _IntegratedTestBase(test.TestCase):
f = self._get_flags()
self.flags(**f)
- context = IntegratedUnitTestContext.startup()
- self.user = context.test_user
+ self.context = IntegratedUnitTestContext()
+ self.user = self.context.test_user
self.api = self.user.openstack_api
def tearDown(self):
- IntegratedUnitTestContext.shutdown()
+ self.context.cleanup()
super(_IntegratedTestBase, self).tearDown()
def _get_flags(self):
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
deleted file mode 100644
index 4e2ac205e..000000000
--- a/nova/tests/objectstore_unittest.py
+++ /dev/null
@@ -1,315 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Unittets for S3 objectstore clone.
-"""
-
-import boto
-import glob
-import hashlib
-import os
-import shutil
-import tempfile
-
-from boto.s3.connection import S3Connection, OrdinaryCallingFormat
-from twisted.internet import reactor, threads, defer
-from twisted.web import http, server
-
-from nova import context
-from nova import flags
-from nova import objectstore
-from nova import test
-from nova.auth import manager
-from nova.exception import NotEmpty, NotFound
-from nova.objectstore import image
-from nova.objectstore.handler import S3
-
-
-FLAGS = flags.FLAGS
-
-# Create a unique temporary directory. We don't delete after test to
-# allow checking the contents after running tests. Users and/or tools
-# running the tests need to remove the tests directories.
-OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
-
-# Create bucket/images path
-os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
-os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
-
-
-class ObjectStoreTestCase(test.TestCase):
- """Test objectstore API directly."""
-
- def setUp(self):
- """Setup users and projects."""
- super(ObjectStoreTestCase, self).setUp()
- self.flags(buckets_path=os.path.join(OSS_TEMPDIR, 'buckets'),
- images_path=os.path.join(OSS_TEMPDIR, 'images'),
- ca_path=os.path.join(os.path.dirname(__file__), 'CA'))
-
- self.auth_manager = manager.AuthManager()
- self.auth_manager.create_user('user1')
- self.auth_manager.create_user('user2')
- self.auth_manager.create_user('admin_user', admin=True)
- self.auth_manager.create_project('proj1', 'user1', 'a proj', ['user1'])
- self.auth_manager.create_project('proj2', 'user2', 'a proj', ['user2'])
- self.context = context.RequestContext('user1', 'proj1')
-
- def tearDown(self):
- """Tear down users and projects."""
- self.auth_manager.delete_project('proj1')
- self.auth_manager.delete_project('proj2')
- self.auth_manager.delete_user('user1')
- self.auth_manager.delete_user('user2')
- self.auth_manager.delete_user('admin_user')
- super(ObjectStoreTestCase, self).tearDown()
-
- def test_buckets(self):
- """Test the bucket API."""
- objectstore.bucket.Bucket.create('new_bucket', self.context)
- bucket = objectstore.bucket.Bucket('new_bucket')
-
- # creator is authorized to use bucket
- self.assert_(bucket.is_authorized(self.context))
-
- # another user is not authorized
- context2 = context.RequestContext('user2', 'proj2')
- self.assertFalse(bucket.is_authorized(context2))
-
- # admin is authorized to use bucket
- admin_context = context.RequestContext('admin_user', None)
- self.assertTrue(bucket.is_authorized(admin_context))
-
- # new buckets are empty
- self.assertTrue(bucket.list_keys()['Contents'] == [])
-
- # storing keys works
- bucket['foo'] = "bar"
-
- self.assertEquals(len(bucket.list_keys()['Contents']), 1)
-
- self.assertEquals(bucket['foo'].read(), 'bar')
-
- # md5 of key works
- self.assertEquals(bucket['foo'].md5, hashlib.md5('bar').hexdigest())
-
- # deleting non-empty bucket should throw a NotEmpty exception
- self.assertRaises(NotEmpty, bucket.delete)
-
- # deleting key
- del bucket['foo']
-
- # deleting empty bucket
- bucket.delete()
-
- # accessing deleted bucket throws exception
- self.assertRaises(NotFound, objectstore.bucket.Bucket, 'new_bucket')
-
- def test_images(self):
- self.do_test_images('1mb.manifest.xml', True,
- 'image_bucket1', 'i-testing1')
-
- def test_images_no_kernel_or_ramdisk(self):
- self.do_test_images('1mb.no_kernel_or_ramdisk.manifest.xml',
- False, 'image_bucket2', 'i-testing2')
-
- def do_test_images(self, manifest_file, expect_kernel_and_ramdisk,
- image_bucket, image_name):
- "Test the image API."
-
- # create a bucket for our bundle
- objectstore.bucket.Bucket.create(image_bucket, self.context)
- bucket = objectstore.bucket.Bucket(image_bucket)
-
- # upload an image manifest/parts
- bundle_path = os.path.join(os.path.dirname(__file__), 'bundle')
- for path in glob.glob(bundle_path + '/*'):
- bucket[os.path.basename(path)] = open(path, 'rb').read()
-
- # register an image
- image.Image.register_aws_image(image_name,
- '%s/%s' % (image_bucket, manifest_file),
- self.context)
-
- # verify image
- my_img = image.Image(image_name)
- result_image_file = os.path.join(my_img.path, 'image')
- self.assertEqual(os.stat(result_image_file).st_size, 1048576)
-
- sha = hashlib.sha1(open(result_image_file).read()).hexdigest()
- self.assertEqual(sha, '3b71f43ff30f4b15b5cd85dd9e95ebc7e84eb5a3')
-
- if expect_kernel_and_ramdisk:
- # Verify the default kernel and ramdisk are set
- self.assertEqual(my_img.metadata['kernelId'], 'aki-test')
- self.assertEqual(my_img.metadata['ramdiskId'], 'ari-test')
- else:
- # Verify that the default kernel and ramdisk (the one from FLAGS)
- # doesn't get embedded in the metadata
- self.assertFalse('kernelId' in my_img.metadata)
- self.assertFalse('ramdiskId' in my_img.metadata)
-
- # verify image permissions
- context2 = context.RequestContext('user2', 'proj2')
- self.assertFalse(my_img.is_authorized(context2))
-
- # change user-editable fields
- my_img.update_user_editable_fields({'display_name': 'my cool image'})
- self.assertEqual('my cool image', my_img.metadata['displayName'])
- my_img.update_user_editable_fields({'display_name': ''})
- self.assert_(not my_img.metadata['displayName'])
-
-
-class TestHTTPChannel(http.HTTPChannel):
- """Dummy site required for twisted.web"""
-
- def checkPersistence(self, _, __): # pylint: disable=C0103
- """Otherwise we end up with an unclean reactor."""
- return False
-
-
-class TestSite(server.Site):
- """Dummy site required for twisted.web"""
- protocol = TestHTTPChannel
-
-
-class S3APITestCase(test.TestCase):
- """Test objectstore through S3 API."""
-
- def setUp(self):
- """Setup users, projects, and start a test server."""
- super(S3APITestCase, self).setUp()
-
- FLAGS.auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
- FLAGS.buckets_path = os.path.join(OSS_TEMPDIR, 'buckets')
-
- self.auth_manager = manager.AuthManager()
- self.admin_user = self.auth_manager.create_user('admin', admin=True)
- self.admin_project = self.auth_manager.create_project('admin',
- self.admin_user)
-
- shutil.rmtree(FLAGS.buckets_path)
- os.mkdir(FLAGS.buckets_path)
-
- root = S3()
- self.site = TestSite(root)
- # pylint: disable=E1101
- self.listening_port = reactor.listenTCP(0, self.site,
- interface='127.0.0.1')
- # pylint: enable=E1101
- self.tcp_port = self.listening_port.getHost().port
-
- if not boto.config.has_section('Boto'):
- boto.config.add_section('Boto')
- boto.config.set('Boto', 'num_retries', '0')
- self.conn = S3Connection(aws_access_key_id=self.admin_user.access,
- aws_secret_access_key=self.admin_user.secret,
- host='127.0.0.1',
- port=self.tcp_port,
- is_secure=False,
- calling_format=OrdinaryCallingFormat())
-
- def get_http_connection(host, is_secure):
- """Get a new S3 connection, don't attempt to reuse connections."""
- return self.conn.new_http_connection(host, is_secure)
-
- self.conn.get_http_connection = get_http_connection
-
- def _ensure_no_buckets(self, buckets): # pylint: disable=C0111
- self.assertEquals(len(buckets), 0, "Bucket list was not empty")
- return True
-
- def _ensure_one_bucket(self, buckets, name): # pylint: disable=C0111
- self.assertEquals(len(buckets), 1,
- "Bucket list didn't have exactly one element in it")
- self.assertEquals(buckets[0].name, name, "Wrong name")
- return True
-
- def test_000_list_buckets(self):
- """Make sure we are starting with no buckets."""
- deferred = threads.deferToThread(self.conn.get_all_buckets)
- deferred.addCallback(self._ensure_no_buckets)
- return deferred
-
- def test_001_create_and_delete_bucket(self):
- """Test bucket creation and deletion."""
- bucket_name = 'testbucket'
-
- deferred = threads.deferToThread(self.conn.create_bucket, bucket_name)
- deferred.addCallback(lambda _:
- threads.deferToThread(self.conn.get_all_buckets))
-
- deferred.addCallback(self._ensure_one_bucket, bucket_name)
-
- deferred.addCallback(lambda _:
- threads.deferToThread(self.conn.delete_bucket,
- bucket_name))
- deferred.addCallback(lambda _:
- threads.deferToThread(self.conn.get_all_buckets))
- deferred.addCallback(self._ensure_no_buckets)
- return deferred
-
- def test_002_create_bucket_and_key_and_delete_key_again(self):
- """Test key operations on buckets."""
- bucket_name = 'testbucket'
- key_name = 'somekey'
- key_contents = 'somekey'
-
- deferred = threads.deferToThread(self.conn.create_bucket, bucket_name)
- deferred.addCallback(lambda b:
- threads.deferToThread(b.new_key, key_name))
- deferred.addCallback(lambda k:
- threads.deferToThread(k.set_contents_from_string,
- key_contents))
-
- def ensure_key_contents(bucket_name, key_name, contents):
- """Verify contents for a key in the given bucket."""
- bucket = self.conn.get_bucket(bucket_name)
- key = bucket.get_key(key_name)
- self.assertEquals(key.get_contents_as_string(), contents,
- "Bad contents")
-
- deferred.addCallback(lambda _:
- threads.deferToThread(ensure_key_contents,
- bucket_name, key_name,
- key_contents))
-
- def delete_key(bucket_name, key_name):
- """Delete a key for the given bucket."""
- bucket = self.conn.get_bucket(bucket_name)
- key = bucket.get_key(key_name)
- key.delete()
-
- deferred.addCallback(lambda _:
- threads.deferToThread(delete_key, bucket_name,
- key_name))
- deferred.addCallback(lambda _:
- threads.deferToThread(self.conn.get_bucket,
- bucket_name))
- deferred.addCallback(lambda b: threads.deferToThread(b.get_all_keys))
- deferred.addCallback(self._ensure_no_buckets)
- return deferred
-
- def tearDown(self):
- """Tear down auth and test server."""
- self.auth_manager.delete_user('admin')
- self.auth_manager.delete_project('admin')
- stop_listening = defer.maybeDeferred(self.listening_port.stopListening)
- super(S3APITestCase, self).tearDown()
- return defer.DeferredList([stop_listening])
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index cf8ee7eff..00803d0ad 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -35,31 +35,22 @@ from nova import log as logging
from nova import rpc
from nova import service
from nova import test
+from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.image import local
-from nova.objectstore import image
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.cloud')
-# Temp dirs for working with image attributes through the cloud controller
-# (stole this from objectstore_unittest.py)
-OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
-IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
-os.makedirs(IMAGES_PATH)
-
-# TODO(termie): these tests are rather fragile, they should at the lest be
-# wiping database state after each run
class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
- self.flags(connection_type='fake',
- images_path=IMAGES_PATH)
+ self.flags(connection_type='fake')
self.conn = rpc.Connection.instance()
@@ -70,6 +61,7 @@ class CloudTestCase(test.TestCase):
self.compute = self.start_service('compute')
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
+ self.image_service = utils.import_object(FLAGS.image_service)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
@@ -318,41 +310,6 @@ class CloudTestCase(test.TestCase):
LOG.debug(_("Terminating instance %s"), instance_id)
rv = self.compute.terminate_instance(instance_id)
- @staticmethod
- def _fake_set_image_description(ctxt, image_id, description):
- from nova.objectstore import handler
-
- class req:
- pass
-
- request = req()
- request.context = ctxt
- request.args = {'image_id': [image_id],
- 'description': [description]}
-
- resource = handler.ImagesResource()
- resource.render_POST(request)
-
- def test_user_editable_image_endpoint(self):
- pathdir = os.path.join(FLAGS.images_path, 'ami-testing')
- os.mkdir(pathdir)
- info = {'isPublic': False}
- with open(os.path.join(pathdir, 'info.json'), 'w') as f:
- json.dump(info, f)
- img = image.Image('ami-testing')
- # self.cloud.set_image_description(self.context, 'ami-testing',
- # 'Foo Img')
- # NOTE(vish): Above won't work unless we start objectstore or create
- # a fake version of api/ec2/images.py conn that can
- # call methods directly instead of going through boto.
- # for now, just cheat and call the method directly
- self._fake_set_image_description(self.context, 'ami-testing',
- 'Foo Img')
- self.assertEqual('Foo Img', img.metadata['description'])
- self._fake_set_image_description(self.context, 'ami-testing', '')
- self.assertEqual('', img.metadata['description'])
- shutil.rmtree(pathdir)
-
def test_update_of_instance_display_fields(self):
inst = db.instance_create(self.context, {})
ec2_id = ec2utils.id_to_ec2_id(inst['id'])
diff --git a/nova/tests/test_direct.py b/nova/tests/test_direct.py
index 80e4d2e1f..588a24b35 100644
--- a/nova/tests/test_direct.py
+++ b/nova/tests/test_direct.py
@@ -25,12 +25,18 @@ import webob
from nova import compute
from nova import context
from nova import exception
+from nova import network
from nova import test
+from nova import volume
from nova import utils
from nova.api import direct
from nova.tests import test_cloud
+class ArbitraryObject(object):
+ pass
+
+
class FakeService(object):
def echo(self, context, data):
return {'data': data}
@@ -39,6 +45,9 @@ class FakeService(object):
return {'user': context.user_id,
'project': context.project_id}
+ def invalid_return(self, context):
+ return ArbitraryObject()
+
class DirectTestCase(test.TestCase):
def setUp(self):
@@ -84,6 +93,12 @@ class DirectTestCase(test.TestCase):
resp_parsed = json.loads(resp.body)
self.assertEqual(resp_parsed['data'], 'foo')
+ def test_invalid(self):
+ req = webob.Request.blank('/fake/invalid_return')
+ req.environ['openstack.context'] = self.context
+ req.method = 'POST'
+ self.assertRaises(exception.Error, req.get_response, self.router)
+
def test_proxy(self):
proxy = direct.Proxy(self.router)
rv = proxy.fake.echo(self.context, data='baz')
@@ -93,12 +108,20 @@ class DirectTestCase(test.TestCase):
class DirectCloudTestCase(test_cloud.CloudTestCase):
def setUp(self):
super(DirectCloudTestCase, self).setUp()
- compute_handle = compute.API(network_api=self.cloud.network_api,
- volume_api=self.cloud.volume_api)
+ compute_handle = compute.API(image_service=self.cloud.image_service)
+ volume_handle = volume.API()
+ network_handle = network.API()
direct.register_service('compute', compute_handle)
+ direct.register_service('volume', volume_handle)
+ direct.register_service('network', network_handle)
+
self.router = direct.JsonParamsMiddleware(direct.Router())
proxy = direct.Proxy(self.router)
self.cloud.compute_api = proxy.compute
+ self.cloud.volume_api = proxy.volume
+ self.cloud.network_api = proxy.network
+ compute_handle.volume_api = proxy.volume
+ compute_handle.network_api = proxy.network
def tearDown(self):
super(DirectCloudTestCase, self).tearDown()
diff --git a/nova/tests/test_objectstore.py b/nova/tests/test_objectstore.py
new file mode 100644
index 000000000..c78772f27
--- /dev/null
+++ b/nova/tests/test_objectstore.py
@@ -0,0 +1,148 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unittets for S3 objectstore clone.
+"""
+
+import boto
+import glob
+import hashlib
+import os
+import shutil
+import tempfile
+
+from boto import exception as boto_exception
+from boto.s3 import connection as s3
+
+from nova import context
+from nova import exception
+from nova import flags
+from nova import wsgi
+from nova import test
+from nova.auth import manager
+from nova.objectstore import s3server
+
+
+FLAGS = flags.FLAGS
+
+# Create a unique temporary directory. We don't delete after test to
+# allow checking the contents after running tests. Users and/or tools
+# running the tests need to remove the tests directories.
+OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
+
+# Create bucket/images path
+os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
+os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
+
+
+class S3APITestCase(test.TestCase):
+ """Test objectstore through S3 API."""
+
+ def setUp(self):
+ """Setup users, projects, and start a test server."""
+ super(S3APITestCase, self).setUp()
+ self.flags(auth_driver='nova.auth.ldapdriver.FakeLdapDriver',
+ buckets_path=os.path.join(OSS_TEMPDIR, 'buckets'),
+ s3_host='127.0.0.1')
+
+ self.auth_manager = manager.AuthManager()
+ self.admin_user = self.auth_manager.create_user('admin', admin=True)
+ self.admin_project = self.auth_manager.create_project('admin',
+ self.admin_user)
+
+ shutil.rmtree(FLAGS.buckets_path)
+ os.mkdir(FLAGS.buckets_path)
+
+ router = s3server.S3Application(FLAGS.buckets_path)
+ server = wsgi.Server()
+ server.start(router, FLAGS.s3_port, host=FLAGS.s3_host)
+
+ if not boto.config.has_section('Boto'):
+ boto.config.add_section('Boto')
+ boto.config.set('Boto', 'num_retries', '0')
+ conn = s3.S3Connection(aws_access_key_id=self.admin_user.access,
+ aws_secret_access_key=self.admin_user.secret,
+ host=FLAGS.s3_host,
+ port=FLAGS.s3_port,
+ is_secure=False,
+ calling_format=s3.OrdinaryCallingFormat())
+ self.conn = conn
+
+ def get_http_connection(host, is_secure):
+ """Get a new S3 connection, don't attempt to reuse connections."""
+ return self.conn.new_http_connection(host, is_secure)
+
+ self.conn.get_http_connection = get_http_connection
+
+ def _ensure_no_buckets(self, buckets): # pylint: disable=C0111
+ self.assertEquals(len(buckets), 0, "Bucket list was not empty")
+ return True
+
+ def _ensure_one_bucket(self, buckets, name): # pylint: disable=C0111
+ self.assertEquals(len(buckets), 1,
+ "Bucket list didn't have exactly one element in it")
+ self.assertEquals(buckets[0].name, name, "Wrong name")
+ return True
+
+ def test_000_list_buckets(self):
+ """Make sure we are starting with no buckets."""
+ self._ensure_no_buckets(self.conn.get_all_buckets())
+
+ def test_001_create_and_delete_bucket(self):
+ """Test bucket creation and deletion."""
+ bucket_name = 'testbucket'
+
+ self.conn.create_bucket(bucket_name)
+ self._ensure_one_bucket(self.conn.get_all_buckets(), bucket_name)
+ self.conn.delete_bucket(bucket_name)
+ self._ensure_no_buckets(self.conn.get_all_buckets())
+
+ def test_002_create_bucket_and_key_and_delete_key_again(self):
+ """Test key operations on buckets."""
+ bucket_name = 'testbucket'
+ key_name = 'somekey'
+ key_contents = 'somekey'
+
+ b = self.conn.create_bucket(bucket_name)
+ k = b.new_key(key_name)
+ k.set_contents_from_string(key_contents)
+
+ bucket = self.conn.get_bucket(bucket_name)
+
+ # make sure the contents are correct
+ key = bucket.get_key(key_name)
+ self.assertEquals(key.get_contents_as_string(), key_contents,
+ "Bad contents")
+
+ # delete the key
+ key.delete()
+
+ self._ensure_no_buckets(bucket.get_all_keys())
+
+ def test_unknown_bucket(self):
+ bucket_name = 'falalala'
+ self.assertRaises(boto_exception.S3ResponseError,
+ self.conn.get_bucket,
+ bucket_name)
+
+ def tearDown(self):
+ """Tear down auth and test server."""
+ self.auth_manager.delete_user('admin')
+ self.auth_manager.delete_project('admin')
+ super(S3APITestCase, self).tearDown()
diff --git a/nova/tests/test_scheduler.py b/nova/tests/test_scheduler.py
index 244e43bd9..6df74dd61 100644
--- a/nova/tests/test_scheduler.py
+++ b/nova/tests/test_scheduler.py
@@ -21,6 +21,9 @@ Tests For Scheduler
import datetime
import mox
+import novaclient.exceptions
+import stubout
+import webob
from mox import IgnoreArg
from nova import context
@@ -32,6 +35,7 @@ from nova import test
from nova import rpc
from nova import utils
from nova.auth import manager as auth_manager
+from nova.scheduler import api
from nova.scheduler import manager
from nova.scheduler import driver
from nova.compute import power_state
@@ -937,3 +941,160 @@ class SimpleDriverTestCase(test.TestCase):
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
db.service_destroy(self.context, s_ref2['id'])
+
+
+class FakeZone(object):
+ def __init__(self, api_url, username, password):
+ self.api_url = api_url
+ self.username = username
+ self.password = password
+
+
+def zone_get_all(context):
+ return [
+ FakeZone('http://example.com', 'bob', 'xxx'),
+ ]
+
+
+class FakeRerouteCompute(api.reroute_compute):
+ def _call_child_zones(self, zones, function):
+ return []
+
+ def get_collection_context_and_id(self, args, kwargs):
+ return ("servers", None, 1)
+
+ def unmarshall_result(self, zone_responses):
+ return dict(magic="found me")
+
+
+def go_boom(self, context, instance):
+ raise exception.InstanceNotFound("boom message", instance)
+
+
+def found_instance(self, context, instance):
+ return dict(name='myserver')
+
+
+class FakeResource(object):
+ def __init__(self, attribute_dict):
+ for k, v in attribute_dict.iteritems():
+ setattr(self, k, v)
+
+ def pause(self):
+ pass
+
+
+class ZoneRedirectTest(test.TestCase):
+ def setUp(self):
+ super(ZoneRedirectTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+
+ self.stubs.Set(db, 'zone_get_all', zone_get_all)
+
+ self.enable_zone_routing = FLAGS.enable_zone_routing
+ FLAGS.enable_zone_routing = True
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ FLAGS.enable_zone_routing = self.enable_zone_routing
+ super(ZoneRedirectTest, self).tearDown()
+
+ def test_trap_found_locally(self):
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(found_instance)(None, None, 1)
+ except api.RedirectResult, e:
+ self.fail(_("Successful database hit should succeed"))
+
+ def test_trap_not_found_locally(self):
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(go_boom)(None, None, 1)
+ self.assertFail(_("Should have rerouted."))
+ except api.RedirectResult, e:
+ self.assertEquals(e.results['magic'], 'found me')
+
+ def test_routing_flags(self):
+ FLAGS.enable_zone_routing = False
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(go_boom)(None, None, 1)
+ self.assertFail(_("Should have thrown exception."))
+ except exception.InstanceNotFound, e:
+ self.assertEquals(e.message, 'boom message')
+
+ def test_get_collection_context_and_id(self):
+ decorator = api.reroute_compute("foo")
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None, 10, 20), {}), ("servers", 10, 20))
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None, 11,), dict(instance_id=21)), ("servers", 11, 21))
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None,), dict(context=12, instance_id=22)), ("servers", 12, 22))
+
+ def test_unmarshal_single_server(self):
+ decorator = api.reroute_compute("foo")
+ self.assertEquals(decorator.unmarshall_result([]), {})
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, b=2)), ]),
+ dict(server=dict(a=1, b=2)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, _b=2)), ]),
+ dict(server=dict(a=1,)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, manager=2)), ]),
+ dict(server=dict(a=1,)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(_a=1, manager=2)), ]),
+ dict(server={}))
+
+
+class FakeServerCollection(object):
+ def get(self, instance_id):
+ return FakeResource(dict(a=10, b=20))
+
+ def find(self, name):
+ return FakeResource(dict(a=11, b=22))
+
+
+class FakeEmptyServerCollection(object):
+ def get(self, f):
+ raise novaclient.NotFound(1)
+
+ def find(self, name):
+ raise novaclient.NotFound(2)
+
+
+class FakeNovaClient(object):
+ def __init__(self, collection):
+ self.servers = collection
+
+
+class DynamicNovaClientTest(test.TestCase):
+ def test_issue_novaclient_command_found(self):
+ zone = FakeZone('http://example.com', 'bob', 'xxx')
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "get", 100).a, 10)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "find", "name").b, 22)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "pause", 100), None)
+
+ def test_issue_novaclient_command_not_found(self):
+ zone = FakeZone('http://example.com', 'bob', 'xxx')
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "get", 100), None)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "find", "name"), None)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "any", "name"), None)
diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py
index fb0ba53b1..3a03159ff 100644
--- a/nova/tests/test_virt.py
+++ b/nova/tests/test_virt.py
@@ -796,7 +796,8 @@ class NWFilterTestCase(test.TestCase):
instance_ref = db.instance_create(self.context,
{'user_id': 'fake',
- 'project_id': 'fake'})
+ 'project_id': 'fake',
+ 'mac_address': '00:A0:C9:14:C8:29'})
inst_id = instance_ref['id']
ip = '10.11.12.13'
@@ -813,7 +814,8 @@ class NWFilterTestCase(test.TestCase):
'instance_id': instance_ref['id']})
def _ensure_all_called():
- instance_filter = 'nova-instance-%s' % instance_ref['name']
+ instance_filter = 'nova-instance-%s-%s' % (instance_ref['name'],
+ '00A0C914C829')
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
'no-arp-spoofing', 'no-ip-spoofing',
diff --git a/nova/tests/test_vmwareapi.py b/nova/tests/test_vmwareapi.py
new file mode 100644
index 000000000..22b66010a
--- /dev/null
+++ b/nova/tests/test_vmwareapi.py
@@ -0,0 +1,252 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Test suite for VMWareAPI.
+"""
+
+import stubout
+
+from nova import context
+from nova import db
+from nova import flags
+from nova import test
+from nova import utils
+from nova.auth import manager
+from nova.compute import power_state
+from nova.tests.glance import stubs as glance_stubs
+from nova.tests.vmwareapi import db_fakes
+from nova.tests.vmwareapi import stubs
+from nova.virt import vmwareapi_conn
+from nova.virt.vmwareapi import fake as vmwareapi_fake
+
+
+FLAGS = flags.FLAGS
+
+
+class VMWareAPIVMTestCase(test.TestCase):
+ """Unit tests for Vmware API connection calls."""
+
+ def setUp(self):
+ super(VMWareAPIVMTestCase, self).setUp()
+ self.flags(vmwareapi_host_ip='test_url',
+ vmwareapi_host_username='test_username',
+ vmwareapi_host_password='test_pass')
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake',
+ admin=True)
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.stubs = stubout.StubOutForTesting()
+ vmwareapi_fake.reset()
+ db_fakes.stub_out_db_instance_api(self.stubs)
+ stubs.set_stubs(self.stubs)
+ glance_stubs.stubout_glance_client(self.stubs,
+ glance_stubs.FakeGlance)
+ self.conn = vmwareapi_conn.get_connection(False)
+
+ def _create_instance_in_the_db(self):
+ values = {'name': 1,
+ 'id': 1,
+ 'project_id': self.project.id,
+ 'user_id': self.user.id,
+ 'image_id': "1",
+ 'kernel_id': "1",
+ 'ramdisk_id': "1",
+ 'instance_type': 'm1.large',
+ 'mac_address': 'aa:bb:cc:dd:ee:ff',
+ }
+ self.instance = db.instance_create(values)
+
+ def _create_vm(self):
+ """Create and spawn the VM."""
+ self._create_instance_in_the_db()
+ self.type_data = db.instance_type_get_by_name(None, 'm1.large')
+ self.conn.spawn(self.instance)
+ self._check_vm_record()
+
+ def _check_vm_record(self):
+ """
+ Check if the spawned VM's properties correspond to the instance in
+ the db.
+ """
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 1)
+
+ # Get Nova record for VM
+ vm_info = self.conn.get_info(1)
+
+ # Get record for VM
+ vms = vmwareapi_fake._get_objects("VirtualMachine")
+ vm = vms[0]
+
+ # Check that m1.large above turned into the right thing.
+ mem_kib = long(self.type_data['memory_mb']) << 10
+ vcpus = self.type_data['vcpus']
+ self.assertEquals(vm_info['max_mem'], mem_kib)
+ self.assertEquals(vm_info['mem'], mem_kib)
+ self.assertEquals(vm.get("summary.config.numCpu"), vcpus)
+ self.assertEquals(vm.get("summary.config.memorySizeMB"),
+ self.type_data['memory_mb'])
+
+ # Check that the VM is running according to Nova
+ self.assertEquals(vm_info['state'], power_state.RUNNING)
+
+ # Check that the VM is running according to vSphere API.
+ self.assertEquals(vm.get("runtime.powerState"), 'poweredOn')
+
+ def _check_vm_info(self, info, pwr_state=power_state.RUNNING):
+ """
+ Check if the get_info returned values correspond to the instance
+ object in the db.
+ """
+ mem_kib = long(self.type_data['memory_mb']) << 10
+ self.assertEquals(info["state"], pwr_state)
+ self.assertEquals(info["max_mem"], mem_kib)
+ self.assertEquals(info["mem"], mem_kib)
+ self.assertEquals(info["num_cpu"], self.type_data['vcpus'])
+
+ def test_list_instances(self):
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 0)
+
+ def test_list_instances_1(self):
+ self._create_vm()
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 1)
+
+ def test_spawn(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_snapshot(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.snapshot(self.instance, "Test-Snapshot")
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_snapshot_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertRaises(Exception, self.conn.snapshot, self.instance,
+ "Test-Snapshot")
+
+ def test_reboot(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.reboot(self.instance)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_reboot_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertRaises(Exception, self.conn.reboot, self.instance)
+
+ def test_reboot_not_poweredon(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.suspend(self.instance, self.dummy_callback_handler)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.PAUSED)
+ self.assertRaises(Exception, self.conn.reboot, self.instance)
+
+ def test_suspend(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.suspend(self.instance, self.dummy_callback_handler)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.PAUSED)
+
+ def test_suspend_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertRaises(Exception, self.conn.suspend, self.instance,
+ self.dummy_callback_handler)
+
+ def test_resume(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.suspend(self.instance, self.dummy_callback_handler)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.PAUSED)
+ self.conn.resume(self.instance, self.dummy_callback_handler)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_resume_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertRaises(Exception, self.conn.resume, self.instance,
+ self.dummy_callback_handler)
+
+ def test_resume_not_suspended(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.assertRaises(Exception, self.conn.resume, self.instance,
+ self.dummy_callback_handler)
+
+ def test_get_info(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_destroy(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 1)
+ self.conn.destroy(self.instance)
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 0)
+
+ def test_destroy_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertEquals(self.conn.destroy(self.instance), None)
+
+ def test_pause(self):
+ pass
+
+ def test_unpause(self):
+ pass
+
+ def test_diagnostics(self):
+ pass
+
+ def test_get_console_output(self):
+ pass
+
+ def test_get_ajax_console(self):
+ pass
+
+ def dummy_callback_handler(self, ret):
+ """
+ Dummy callback function to be passed to suspend, resume, etc., calls.
+ """
+ pass
+
+ def tearDown(self):
+ super(VMWareAPIVMTestCase, self).tearDown()
+ vmwareapi_fake.cleanup()
+ self.manager.delete_project(self.project)
+ self.manager.delete_user(self.user)
+ self.stubs.UnsetAll()
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 66a973a78..36c88b020 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -19,11 +19,15 @@ Test suite for XenAPI
"""
import functools
+import os
+import re
import stubout
+import ast
from nova import db
from nova import context
from nova import flags
+from nova import log as logging
from nova import test
from nova import utils
from nova.auth import manager
@@ -38,6 +42,9 @@ from nova.virt.xenapi.vmops import VMOps
from nova.tests.db import fakes as db_fakes
from nova.tests.xenapi import stubs
from nova.tests.glance import stubs as glance_stubs
+from nova.tests import fake_utils
+
+LOG = logging.getLogger('nova.tests.test_xenapi')
FLAGS = flags.FLAGS
@@ -64,13 +71,14 @@ class XenAPIVolumeTestCase(test.TestCase):
def setUp(self):
super(XenAPIVolumeTestCase, self).setUp()
self.stubs = stubout.StubOutForTesting()
+ self.context = context.RequestContext('fake', 'fake', False)
FLAGS.target_host = '127.0.0.1'
FLAGS.xenapi_connection_url = 'test_url'
FLAGS.xenapi_connection_password = 'test_pass'
db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset()
- self.values = {'name': 1, 'id': 1,
+ self.values = {'id': 1,
'project_id': 'fake',
'user_id': 'fake',
'image_id': 1,
@@ -90,7 +98,7 @@ class XenAPIVolumeTestCase(test.TestCase):
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
- return db.volume_create(context.get_admin_context(), vol)
+ return db.volume_create(self.context, vol)
def test_create_iscsi_storage(self):
""" This shows how to test helper classes' methods """
@@ -126,7 +134,7 @@ class XenAPIVolumeTestCase(test.TestCase):
stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
conn = xenapi_conn.get_connection(False)
volume = self._create_volume()
- instance = db.instance_create(self.values)
+ instance = db.instance_create(self.context, self.values)
vm = xenapi_fake.create_vm(instance.name, 'Running')
result = conn.attach_volume(instance.name, volume['id'], '/dev/sdc')
@@ -146,7 +154,7 @@ class XenAPIVolumeTestCase(test.TestCase):
stubs.FakeSessionForVolumeFailedTests)
conn = xenapi_conn.get_connection(False)
volume = self._create_volume()
- instance = db.instance_create(self.values)
+ instance = db.instance_create(self.context, self.values)
xenapi_fake.create_vm(instance.name, 'Running')
self.assertRaises(Exception,
conn.attach_volume,
@@ -175,8 +183,9 @@ class XenAPIVMTestCase(test.TestCase):
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.network = utils.import_object(FLAGS.network_manager)
self.stubs = stubout.StubOutForTesting()
- FLAGS.xenapi_connection_url = 'test_url'
- FLAGS.xenapi_connection_password = 'test_pass'
+ self.flags(xenapi_connection_url='test_url',
+ xenapi_connection_password='test_pass',
+ instance_name_template='%d')
xenapi_fake.reset()
xenapi_fake.create_local_srs()
db_fakes.stub_out_db_instance_api(self.stubs)
@@ -186,8 +195,11 @@ class XenAPIVMTestCase(test.TestCase):
stubs.stubout_stream_disk(self.stubs)
stubs.stubout_is_vdi_pv(self.stubs)
self.stubs.Set(VMOps, 'reset_network', reset_network)
+ stubs.stub_out_vm_methods(self.stubs)
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
+ fake_utils.stub_out_utils_execute(self.stubs)
+ self.context = context.RequestContext('fake', 'fake', False)
self.conn = xenapi_conn.get_connection(False)
def test_list_instances_0(self):
@@ -212,7 +224,7 @@ class XenAPIVMTestCase(test.TestCase):
if not vm_rec["is_control_domain"]:
vm_labels.append(vm_rec["name_label"])
- self.assertEquals(vm_labels, [1])
+ self.assertEquals(vm_labels, ['1'])
def ensure_vbd_was_torn_down():
vbd_labels = []
@@ -220,7 +232,7 @@ class XenAPIVMTestCase(test.TestCase):
vbd_rec = xenapi_fake.get_record('VBD', vbd_ref)
vbd_labels.append(vbd_rec["vm_name_label"])
- self.assertEquals(vbd_labels, [1])
+ self.assertEquals(vbd_labels, ['1'])
def ensure_vdi_was_torn_down():
for vdi_ref in xenapi_fake.get_all('VDI'):
@@ -237,11 +249,10 @@ class XenAPIVMTestCase(test.TestCase):
def create_vm_record(self, conn, os_type):
instances = conn.list_instances()
- self.assertEquals(instances, [1])
+ self.assertEquals(instances, ['1'])
# Get Nova record for VM
vm_info = conn.get_info(1)
-
# Get XenAPI record for VM
vms = [rec for ref, rec
in xenapi_fake.get_all_records('VM').iteritems()
@@ -250,7 +261,7 @@ class XenAPIVMTestCase(test.TestCase):
self.vm_info = vm_info
self.vm = vm
- def check_vm_record(self, conn):
+ def check_vm_record(self, conn, check_injection=False):
# Check that m1.large above turned into the right thing.
instance_type = db.instance_type_get_by_name(conn, 'm1.large')
mem_kib = long(instance_type['memory_mb']) << 10
@@ -270,6 +281,25 @@ class XenAPIVMTestCase(test.TestCase):
# Check that the VM is running according to XenAPI.
self.assertEquals(self.vm['power_state'], 'Running')
+ if check_injection:
+ xenstore_data = self.vm['xenstore_data']
+ key = 'vm-data/networking/aabbccddeeff'
+ xenstore_value = xenstore_data[key]
+ tcpip_data = ast.literal_eval(xenstore_value)
+ self.assertEquals(tcpip_data, {
+ 'label': 'test_network',
+ 'broadcast': '10.0.0.255',
+ 'ips': [{'ip': '10.0.0.3',
+ 'netmask':'255.255.255.0',
+ 'enabled':'1'}],
+ 'ip6s': [{'ip': 'fe80::a8bb:ccff:fedd:eeff',
+ 'netmask': '120',
+ 'enabled': '1',
+ 'gateway': 'fe80::a00:1'}],
+ 'mac': 'aa:bb:cc:dd:ee:ff',
+ 'dns': ['10.0.0.2'],
+ 'gateway': '10.0.0.1'})
+
def check_vm_params_for_windows(self):
self.assertEquals(self.vm['platform']['nx'], 'true')
self.assertEquals(self.vm['HVM_boot_params'], {'order': 'dc'})
@@ -303,10 +333,10 @@ class XenAPIVMTestCase(test.TestCase):
self.assertEquals(self.vm['HVM_boot_policy'], '')
def _test_spawn(self, image_id, kernel_id, ramdisk_id,
- instance_type="m1.large", os_type="linux"):
- stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
- values = {'name': 1,
- 'id': 1,
+ instance_type="m1.large", os_type="linux",
+ check_injection=False):
+ stubs.stubout_loopingcall_start(self.stubs)
+ values = {'id': 1,
'project_id': self.project.id,
'user_id': self.user.id,
'image_id': image_id,
@@ -315,12 +345,10 @@ class XenAPIVMTestCase(test.TestCase):
'instance_type': instance_type,
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': os_type}
-
- conn = xenapi_conn.get_connection(False)
- instance = db.instance_create(values)
- conn.spawn(instance)
- self.create_vm_record(conn, os_type)
- self.check_vm_record(conn)
+ instance = db.instance_create(self.context, values)
+ self.conn.spawn(instance)
+ self.create_vm_record(self.conn, os_type)
+ self.check_vm_record(self.conn, check_injection)
def test_spawn_not_enough_memory(self):
FLAGS.xenapi_image_service = 'glance'
@@ -361,6 +389,85 @@ class XenAPIVMTestCase(test.TestCase):
glance_stubs.FakeGlance.IMAGE_RAMDISK)
self.check_vm_params_for_linux_with_external_kernel()
+ def test_spawn_netinject_file(self):
+ FLAGS.xenapi_image_service = 'glance'
+ db_fakes.stub_out_db_instance_api(self.stubs, injected=True)
+
+ self._tee_executed = False
+
+ def _tee_handler(cmd, **kwargs):
+ input = kwargs.get('process_input', None)
+ self.assertNotEqual(input, None)
+ config = [line.strip() for line in input.split("\n")]
+ # Find the start of eth0 configuration and check it
+ index = config.index('auto eth0')
+ self.assertEquals(config[index + 1:index + 8], [
+ 'iface eth0 inet static',
+ 'address 10.0.0.3',
+ 'netmask 255.255.255.0',
+ 'broadcast 10.0.0.255',
+ 'gateway 10.0.0.1',
+ 'dns-nameservers 10.0.0.2',
+ ''])
+ self._tee_executed = True
+ return '', ''
+
+ fake_utils.fake_execute_set_repliers([
+ # Capture the sudo tee .../etc/network/interfaces command
+ (r'(sudo\s+)?tee.*interfaces', _tee_handler),
+ ])
+ FLAGS.xenapi_image_service = 'glance'
+ self._test_spawn(glance_stubs.FakeGlance.IMAGE_MACHINE,
+ glance_stubs.FakeGlance.IMAGE_KERNEL,
+ glance_stubs.FakeGlance.IMAGE_RAMDISK,
+ check_injection=True)
+ self.assertTrue(self._tee_executed)
+
+ def test_spawn_netinject_xenstore(self):
+ FLAGS.xenapi_image_service = 'glance'
+ db_fakes.stub_out_db_instance_api(self.stubs, injected=True)
+
+ self._tee_executed = False
+
+ def _mount_handler(cmd, *ignore_args, **ignore_kwargs):
+ # When mounting, create real files under the mountpoint to simulate
+ # files in the mounted filesystem
+
+ # mount point will be the last item of the command list
+ self._tmpdir = cmd[len(cmd) - 1]
+ LOG.debug(_('Creating files in %s to simulate guest agent' %
+ self._tmpdir))
+ os.makedirs(os.path.join(self._tmpdir, 'usr', 'sbin'))
+ # Touch the file using open
+ open(os.path.join(self._tmpdir, 'usr', 'sbin',
+ 'xe-update-networking'), 'w').close()
+ return '', ''
+
+ def _umount_handler(cmd, *ignore_args, **ignore_kwargs):
+ # Umount would normall make files in the m,ounted filesystem
+ # disappear, so do that here
+ LOG.debug(_('Removing simulated guest agent files in %s' %
+ self._tmpdir))
+ os.remove(os.path.join(self._tmpdir, 'usr', 'sbin',
+ 'xe-update-networking'))
+ os.rmdir(os.path.join(self._tmpdir, 'usr', 'sbin'))
+ os.rmdir(os.path.join(self._tmpdir, 'usr'))
+ return '', ''
+
+ def _tee_handler(cmd, *ignore_args, **ignore_kwargs):
+ self._tee_executed = True
+ return '', ''
+
+ fake_utils.fake_execute_set_repliers([
+ (r'(sudo\s+)?mount', _mount_handler),
+ (r'(sudo\s+)?umount', _umount_handler),
+ (r'(sudo\s+)?tee.*interfaces', _tee_handler)])
+ self._test_spawn(1, 2, 3, check_injection=True)
+
+ # tee must not run in this case, where an injection-capable
+ # guest agent is detected
+ self.assertFalse(self._tee_executed)
+
def test_spawn_with_network_qos(self):
self._create_instance()
for vif_ref in xenapi_fake.get_all('VIF'):
@@ -369,6 +476,18 @@ class XenAPIVMTestCase(test.TestCase):
self.assertEquals(vif_rec['qos_algorithm_params']['kbps'],
str(4 * 1024))
+ def test_rescue(self):
+ self.flags(xenapi_inject_image=False)
+ instance = self._create_instance()
+ conn = xenapi_conn.get_connection(False)
+ conn.rescue(instance, None)
+
+ def test_unrescue(self):
+ instance = self._create_instance()
+ conn = xenapi_conn.get_connection(False)
+ # Ensure that it will not unrescue a non-rescued instance.
+ self.assertRaises(Exception, conn.unrescue, instance, None)
+
def tearDown(self):
super(XenAPIVMTestCase, self).tearDown()
self.manager.delete_project(self.project)
@@ -379,8 +498,8 @@ class XenAPIVMTestCase(test.TestCase):
def _create_instance(self):
"""Creates and spawns a test instance"""
+ stubs.stubout_loopingcall_start(self.stubs)
values = {
- 'name': 1,
'id': 1,
'project_id': self.project.id,
'user_id': self.user.id,
@@ -390,7 +509,7 @@ class XenAPIVMTestCase(test.TestCase):
'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': 'linux'}
- instance = db.instance_create(values)
+ instance = db.instance_create(self.context, values)
self.conn.spawn(instance)
return instance
@@ -435,21 +554,26 @@ class XenAPIMigrateInstance(test.TestCase):
db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset()
+ xenapi_fake.create_network('fake', FLAGS.flat_network_bridge)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
- self.values = {'name': 1, 'id': 1,
+ self.context = context.RequestContext('fake', 'fake', False)
+ self.values = {'id': 1,
'project_id': self.project.id,
'user_id': self.user.id,
'image_id': 1,
'kernel_id': None,
'ramdisk_id': None,
+ 'local_gb': 5,
'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': 'linux'}
+ fake_utils.stub_out_utils_execute(self.stubs)
stubs.stub_out_migration_methods(self.stubs)
+ stubs.stubout_get_this_vm_uuid(self.stubs)
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
@@ -460,14 +584,15 @@ class XenAPIMigrateInstance(test.TestCase):
self.stubs.UnsetAll()
def test_migrate_disk_and_power_off(self):
- instance = db.instance_create(self.values)
+ instance = db.instance_create(self.context, self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
conn = xenapi_conn.get_connection(False)
conn.migrate_disk_and_power_off(instance, '127.0.0.1')
def test_finish_resize(self):
- instance = db.instance_create(self.values)
+ instance = db.instance_create(self.context, self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
+ stubs.stubout_loopingcall_start(self.stubs)
conn = xenapi_conn.get_connection(False)
conn.finish_resize(instance, dict(base_copy='hurr', cow='durr'))
diff --git a/nova/tests/vmwareapi/__init__.py b/nova/tests/vmwareapi/__init__.py
new file mode 100644
index 000000000..478ee742b
--- /dev/null
+++ b/nova/tests/vmwareapi/__init__.py
@@ -0,0 +1,21 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+:mod:`vmwareapi` -- Stubs for VMware API
+=======================================
+"""
diff --git a/nova/tests/vmwareapi/db_fakes.py b/nova/tests/vmwareapi/db_fakes.py
new file mode 100644
index 000000000..0addd5573
--- /dev/null
+++ b/nova/tests/vmwareapi/db_fakes.py
@@ -0,0 +1,109 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Stubouts, mocks and fixtures for the test suite
+"""
+
+import time
+
+from nova import db
+from nova import utils
+
+
+def stub_out_db_instance_api(stubs):
+ """Stubs out the db API for creating Instances."""
+
+ INSTANCE_TYPES = {
+ 'm1.tiny': dict(memory_mb=512, vcpus=1, local_gb=0, flavorid=1),
+ 'm1.small': dict(memory_mb=2048, vcpus=1, local_gb=20, flavorid=2),
+ 'm1.medium':
+ dict(memory_mb=4096, vcpus=2, local_gb=40, flavorid=3),
+ 'm1.large': dict(memory_mb=8192, vcpus=4, local_gb=80, flavorid=4),
+ 'm1.xlarge':
+ dict(memory_mb=16384, vcpus=8, local_gb=160, flavorid=5)}
+
+ class FakeModel(object):
+ """Stubs out for model."""
+
+ def __init__(self, values):
+ self.values = values
+
+ def __getattr__(self, name):
+ return self.values[name]
+
+ def __getitem__(self, key):
+ if key in self.values:
+ return self.values[key]
+ else:
+ raise NotImplementedError()
+
+ def fake_instance_create(values):
+ """Stubs out the db.instance_create method."""
+
+ type_data = INSTANCE_TYPES[values['instance_type']]
+
+ base_options = {
+ 'name': values['name'],
+ 'id': values['id'],
+ 'reservation_id': utils.generate_uid('r'),
+ 'image_id': values['image_id'],
+ 'kernel_id': values['kernel_id'],
+ 'ramdisk_id': values['ramdisk_id'],
+ 'state_description': 'scheduling',
+ 'user_id': values['user_id'],
+ 'project_id': values['project_id'],
+ 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
+ 'instance_type': values['instance_type'],
+ 'memory_mb': type_data['memory_mb'],
+ 'mac_address': values['mac_address'],
+ 'vcpus': type_data['vcpus'],
+ 'local_gb': type_data['local_gb'],
+ }
+ return FakeModel(base_options)
+
+ def fake_network_get_by_instance(context, instance_id):
+ """Stubs out the db.network_get_by_instance method."""
+
+ fields = {
+ 'bridge': 'vmnet0',
+ 'netmask': '255.255.255.0',
+ 'gateway': '10.10.10.1',
+ 'vlan': 100}
+ return FakeModel(fields)
+
+ def fake_instance_action_create(context, action):
+ """Stubs out the db.instance_action_create method."""
+ pass
+
+ def fake_instance_get_fixed_address(context, instance_id):
+ """Stubs out the db.instance_get_fixed_address method."""
+ return '10.10.10.10'
+
+ def fake_instance_type_get_all(context, inactive=0):
+ return INSTANCE_TYPES
+
+ def fake_instance_type_get_by_name(context, name):
+ return INSTANCE_TYPES[name]
+
+ stubs.Set(db, 'instance_create', fake_instance_create)
+ stubs.Set(db, 'network_get_by_instance', fake_network_get_by_instance)
+ stubs.Set(db, 'instance_action_create', fake_instance_action_create)
+ stubs.Set(db, 'instance_get_fixed_address',
+ fake_instance_get_fixed_address)
+ stubs.Set(db, 'instance_type_get_all', fake_instance_type_get_all)
+ stubs.Set(db, 'instance_type_get_by_name', fake_instance_type_get_by_name)
diff --git a/nova/tests/vmwareapi/stubs.py b/nova/tests/vmwareapi/stubs.py
new file mode 100644
index 000000000..a648efb16
--- /dev/null
+++ b/nova/tests/vmwareapi/stubs.py
@@ -0,0 +1,46 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Stubouts for the test suite
+"""
+
+from nova.virt import vmwareapi_conn
+from nova.virt.vmwareapi import fake
+from nova.virt.vmwareapi import vmware_images
+
+
+def fake_get_vim_object(arg):
+ """Stubs out the VMWareAPISession's get_vim_object method."""
+ return fake.FakeVim()
+
+
+def fake_is_vim_object(arg, module):
+ """Stubs out the VMWareAPISession's is_vim_object method."""
+ return isinstance(module, fake.FakeVim)
+
+
+def set_stubs(stubs):
+ """Set the stubs."""
+ stubs.Set(vmware_images, 'fetch_image', fake.fake_fetch_image)
+ stubs.Set(vmware_images, 'get_vmdk_size_and_properties',
+ fake.fake_get_vmdk_size_and_properties)
+ stubs.Set(vmware_images, 'upload_image', fake.fake_upload_image)
+ stubs.Set(vmwareapi_conn.VMWareAPISession, "_get_vim_object",
+ fake_get_vim_object)
+ stubs.Set(vmwareapi_conn.VMWareAPISession, "_is_vim_object",
+ fake_is_vim_object)
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 7f9706a3d..205f6c902 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -21,6 +21,7 @@ from nova.virt.xenapi import fake
from nova.virt.xenapi import volume_utils
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi import vmops
+from nova import utils
def stubout_instance_snapshot(stubs):
@@ -137,14 +138,17 @@ def stubout_is_vdi_pv(stubs):
stubs.Set(vm_utils, '_is_vdi_pv', f)
+def stubout_loopingcall_start(stubs):
+ def fake_start(self, interval, now=True):
+ self.f(*self.args, **self.kw)
+ stubs.Set(utils.LoopingCall, 'start', fake_start)
+
+
class FakeSessionForVMTests(fake.SessionBase):
""" Stubs out a XenAPISession for VM tests """
def __init__(self, uri):
super(FakeSessionForVMTests, self).__init__(uri)
- def network_get_all_records_where(self, _1, _2):
- return self.xenapi.network.get_all_records()
-
def host_call_plugin(self, _1, _2, _3, _4, _5):
sr_ref = fake.get_all('SR')[0]
vdi_ref = fake.create_vdi('', False, sr_ref, False)
@@ -185,6 +189,25 @@ class FakeSessionForVMTests(fake.SessionBase):
pass
+def stub_out_vm_methods(stubs):
+ def fake_shutdown(self, inst, vm, method="clean"):
+ pass
+
+ def fake_acquire_bootlock(self, vm):
+ pass
+
+ def fake_release_bootlock(self, vm):
+ pass
+
+ def fake_spawn_rescue(self, inst):
+ inst._rescue = False
+
+ stubs.Set(vmops.VMOps, "_shutdown", fake_shutdown)
+ stubs.Set(vmops.VMOps, "_acquire_bootlock", fake_acquire_bootlock)
+ stubs.Set(vmops.VMOps, "_release_bootlock", fake_release_bootlock)
+ stubs.Set(vmops.VMOps, "spawn_rescue", fake_spawn_rescue)
+
+
class FakeSessionForVolumeTests(fake.SessionBase):
""" Stubs out a XenAPISession for Volume tests """
def __init__(self, uri):