summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorJustin Santa Barbara <justin@fathomdb.com>2011-03-30 09:17:46 -0700
committerJustin Santa Barbara <justin@fathomdb.com>2011-03-30 09:17:46 -0700
commit8a56ff7268b936a0b559e9a548cb587ff6aa5907 (patch)
treef807d47aee5e5cde8ef95291999e37b0b27ca5e0 /nova/tests
parent9686b3a296c53486a64a949ae2f7430e25df2dcb (diff)
parentf77c58ce317f9674671a1b44563ef3645533c815 (diff)
Merged with trunk
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/openstack/extensions/__init__.py15
-rw-r--r--nova/tests/api/openstack/extensions/foxinsocks.py98
-rw-r--r--nova/tests/api/openstack/fakes.py72
-rw-r--r--nova/tests/api/openstack/test_auth.py6
-rw-r--r--nova/tests/api/openstack/test_extensions.py236
-rw-r--r--nova/tests/api/openstack/test_flavors.py158
-rw-r--r--nova/tests/api/openstack/test_image_metadata.py166
-rw-r--r--nova/tests/api/openstack/test_images.py655
-rw-r--r--nova/tests/api/openstack/test_server_metadata.py164
-rw-r--r--nova/tests/api/openstack/test_servers.py390
-rw-r--r--nova/tests/api/openstack/test_shared_ip_groups.py30
-rw-r--r--nova/tests/api/openstack/test_versions.py123
-rw-r--r--nova/tests/api/openstack/test_zones.py29
-rw-r--r--nova/tests/db/fakes.py99
-rw-r--r--nova/tests/fake_utils.py109
-rw-r--r--nova/tests/image/__init__.py16
-rw-r--r--nova/tests/image/test_glance.py236
-rw-r--r--nova/tests/integrated/api/client.py44
-rw-r--r--nova/tests/integrated/integrated_helpers.py221
-rw-r--r--nova/tests/integrated/test_extensions.py44
-rw-r--r--nova/tests/integrated/test_login.py68
-rw-r--r--nova/tests/integrated/test_servers.py184
-rw-r--r--nova/tests/integrated/test_volumes.py295
-rw-r--r--nova/tests/network/__init__.py67
-rw-r--r--nova/tests/network/base.py154
-rw-r--r--nova/tests/objectstore_unittest.py315
-rw-r--r--nova/tests/test_auth.py8
-rw-r--r--nova/tests/test_cloud.py49
-rw-r--r--nova/tests/test_compute.py104
-rw-r--r--nova/tests/test_direct.py27
-rw-r--r--nova/tests/test_flat_network.py161
-rw-r--r--nova/tests/test_localization.py3
-rw-r--r--nova/tests/test_misc.py43
-rw-r--r--nova/tests/test_network.py371
-rw-r--r--nova/tests/test_objectstore.py148
-rw-r--r--nova/tests/test_rpc.py4
-rw-r--r--nova/tests/test_scheduler.py161
-rw-r--r--nova/tests/test_service.py24
-rw-r--r--nova/tests/test_test.py2
-rw-r--r--nova/tests/test_virt.py66
-rw-r--r--nova/tests/test_vlan_network.py242
-rw-r--r--nova/tests/test_vmwareapi.py252
-rw-r--r--nova/tests/test_volume.py4
-rw-r--r--nova/tests/test_xenapi.py242
-rw-r--r--nova/tests/test_zones.py34
-rw-r--r--nova/tests/vmwareapi/__init__.py21
-rw-r--r--nova/tests/vmwareapi/db_fakes.py109
-rw-r--r--nova/tests/vmwareapi/stubs.py46
-rw-r--r--nova/tests/xenapi/stubs.py36
49 files changed, 5099 insertions, 1052 deletions
diff --git a/nova/tests/api/openstack/extensions/__init__.py b/nova/tests/api/openstack/extensions/__init__.py
new file mode 100644
index 000000000..848908a95
--- /dev/null
+++ b/nova/tests/api/openstack/extensions/__init__.py
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/nova/tests/api/openstack/extensions/foxinsocks.py b/nova/tests/api/openstack/extensions/foxinsocks.py
new file mode 100644
index 000000000..0860b51ac
--- /dev/null
+++ b/nova/tests/api/openstack/extensions/foxinsocks.py
@@ -0,0 +1,98 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from nova import wsgi
+
+from nova.api.openstack import extensions
+
+
+class FoxInSocksController(wsgi.Controller):
+
+ def index(self, req):
+ return "Try to say this Mr. Knox, sir..."
+
+
+class Foxinsocks(object):
+
+ def __init__(self):
+ pass
+
+ def get_name(self):
+ return "Fox In Socks"
+
+ def get_alias(self):
+ return "FOXNSOX"
+
+ def get_description(self):
+ return "The Fox In Socks Extension"
+
+ def get_namespace(self):
+ return "http://www.fox.in.socks/api/ext/pie/v1.0"
+
+ def get_updated(self):
+ return "2011-01-22T13:25:27-06:00"
+
+ def get_resources(self):
+ resources = []
+ resource = extensions.ResourceExtension('foxnsocks',
+ FoxInSocksController())
+ resources.append(resource)
+ return resources
+
+ def get_actions(self):
+ actions = []
+ actions.append(extensions.ActionExtension('servers', 'add_tweedle',
+ self._add_tweedle))
+ actions.append(extensions.ActionExtension('servers', 'delete_tweedle',
+ self._delete_tweedle))
+ return actions
+
+ def get_response_extensions(self):
+ response_exts = []
+
+ def _goose_handler(res):
+ #NOTE: This only handles JSON responses.
+ # You can use content type header to test for XML.
+ data = json.loads(res.body)
+ data['flavor']['googoose'] = "Gooey goo for chewy chewing!"
+ return data
+
+ resp_ext = extensions.ResponseExtension('GET', '/v1.1/flavors/:(id)',
+ _goose_handler)
+ response_exts.append(resp_ext)
+
+ def _bands_handler(res):
+ #NOTE: This only handles JSON responses.
+ # You can use content type header to test for XML.
+ data = json.loads(res.body)
+ data['big_bands'] = 'Pig Bands!'
+ return data
+
+ resp_ext2 = extensions.ResponseExtension('GET', '/v1.1/flavors/:(id)',
+ _bands_handler)
+ response_exts.append(resp_ext2)
+ return response_exts
+
+ def _add_tweedle(self, input_dict, req, id):
+
+ return "Tweedle Beetle Added."
+
+ def _delete_tweedle(self, input_dict, req, id):
+
+ return "Tweedle Beetle Deleted."
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 75eade4d0..8b0729c35 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import datetime
import json
import random
@@ -34,6 +35,7 @@ from nova import utils
import nova.api.openstack.auth
from nova.api import openstack
from nova.api.openstack import auth
+from nova.api.openstack import versions
from nova.api.openstack import limits
from nova.auth.manager import User, Project
from nova.image import glance
@@ -72,15 +74,19 @@ def fake_wsgi(self, req):
return self.application
-def wsgi_app(inner_application=None):
- if not inner_application:
- inner_application = openstack.APIRouter()
+def wsgi_app(inner_app10=None, inner_app11=None):
+ if not inner_app10:
+ inner_app10 = openstack.APIRouterV10()
+ if not inner_app11:
+ inner_app11 = openstack.APIRouterV11()
mapper = urlmap.URLMap()
- api = openstack.FaultWrapper(auth.AuthMiddleware(
- limits.RateLimitingMiddleware(inner_application)))
- mapper['/v1.0'] = api
- mapper['/v1.1'] = api
- mapper['/'] = openstack.FaultWrapper(openstack.Versions())
+ api10 = openstack.FaultWrapper(auth.AuthMiddleware(
+ limits.RateLimitingMiddleware(inner_app10)))
+ api11 = openstack.FaultWrapper(auth.AuthMiddleware(
+ limits.RateLimitingMiddleware(inner_app11)))
+ mapper['/v1.0'] = api10
+ mapper['/v1.1'] = api11
+ mapper['/'] = openstack.FaultWrapper(versions.Versions())
return mapper
@@ -138,6 +144,21 @@ def stub_out_compute_api_snapshot(stubs):
stubs.Set(nova.compute.API, 'snapshot', snapshot)
+def stub_out_glance_add_image(stubs, sent_to_glance):
+ """
+ We return the metadata sent to glance by modifying the sent_to_glance dict
+ in place.
+ """
+ orig_add_image = glance_client.Client.add_image
+
+ def fake_add_image(context, metadata, data=None):
+ sent_to_glance['metadata'] = metadata
+ sent_to_glance['data'] = data
+ return orig_add_image(metadata, data)
+
+ stubs.Set(glance_client.Client, 'add_image', fake_add_image)
+
+
def stub_out_glance(stubs, initial_fixtures=None):
class FakeGlanceClient:
@@ -150,37 +171,46 @@ def stub_out_glance(stubs, initial_fixtures=None):
for f in self.fixtures]
def fake_get_images_detailed(self):
- return self.fixtures
+ return copy.deepcopy(self.fixtures)
def fake_get_image_meta(self, image_id):
- for f in self.fixtures:
- if f['id'] == image_id:
- return f
+ image = self._find_image(image_id)
+ if image:
+ return copy.deepcopy(image)
raise glance_exc.NotFound
def fake_add_image(self, image_meta, data=None):
- id = ''.join(random.choice(string.letters) for _ in range(20))
- image_meta['id'] = id
+ image_meta = copy.deepcopy(image_meta)
+ image_id = ''.join(random.choice(string.letters)
+ for _ in range(20))
+ image_meta['id'] = image_id
self.fixtures.append(image_meta)
- return image_meta
+ return copy.deepcopy(image_meta)
def fake_update_image(self, image_id, image_meta, data=None):
- f = self.fake_get_image_meta(image_id)
+ for attr in ('created_at', 'updated_at', 'deleted_at', 'deleted'):
+ if attr in image_meta:
+ del image_meta[attr]
+
+ f = self._find_image(image_id)
if not f:
raise glance_exc.NotFound
f.update(image_meta)
- return f
+ return copy.deepcopy(f)
def fake_delete_image(self, image_id):
- f = self.fake_get_image_meta(image_id)
+ f = self._find_image(image_id)
if not f:
raise glance_exc.NotFound
self.fixtures.remove(f)
- ##def fake_delete_all(self):
- ## self.fixtures = []
+ def _find_image(self, image_id):
+ for f in self.fixtures:
+ if f['id'] == image_id:
+ return f
+ return None
GlanceClient = glance_client.Client
fake = FakeGlanceClient(initial_fixtures)
@@ -192,10 +222,10 @@ def stub_out_glance(stubs, initial_fixtures=None):
stubs.Set(GlanceClient, 'add_image', fake.fake_add_image)
stubs.Set(GlanceClient, 'update_image', fake.fake_update_image)
stubs.Set(GlanceClient, 'delete_image', fake.fake_delete_image)
- #stubs.Set(GlanceClient, 'delete_all', fake.fake_delete_all)
class FakeToken(object):
+ # FIXME(sirp): let's not use id here
id = 0
def __init__(self, **kwargs):
diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py
index 21596fb25..8f189c744 100644
--- a/nova/tests/api/openstack/test_auth.py
+++ b/nova/tests/api/openstack/test_auth.py
@@ -83,8 +83,7 @@ class Test(test.TestCase):
self.assertEqual(result.headers['X-Storage-Url'], "")
token = result.headers['X-Auth-Token']
- self.stubs.Set(nova.api.openstack, 'APIRouter',
- fakes.FakeRouter)
+ self.stubs.Set(nova.api.openstack, 'APIRouterV10', fakes.FakeRouter)
req = webob.Request.blank('/v1.0/fake')
req.headers['X-Auth-Token'] = token
result = req.get_response(fakes.wsgi_app())
@@ -201,8 +200,7 @@ class TestLimiter(test.TestCase):
self.assertEqual(len(result.headers['X-Auth-Token']), 40)
token = result.headers['X-Auth-Token']
- self.stubs.Set(nova.api.openstack, 'APIRouter',
- fakes.FakeRouter)
+ self.stubs.Set(nova.api.openstack, 'APIRouterV10', fakes.FakeRouter)
req = webob.Request.blank('/v1.0/fake')
req.method = 'POST'
req.headers['X-Auth-Token'] = token
diff --git a/nova/tests/api/openstack/test_extensions.py b/nova/tests/api/openstack/test_extensions.py
new file mode 100644
index 000000000..481d34ed1
--- /dev/null
+++ b/nova/tests/api/openstack/test_extensions.py
@@ -0,0 +1,236 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import stubout
+import unittest
+import webob
+import os.path
+
+from nova import context
+from nova import flags
+from nova.api import openstack
+from nova.api.openstack import extensions
+from nova.api.openstack import flavors
+from nova.tests.api.openstack import fakes
+import nova.wsgi
+
+FLAGS = flags.FLAGS
+
+response_body = "Try to say this Mr. Knox, sir..."
+
+
+class StubController(nova.wsgi.Controller):
+
+ def __init__(self, body):
+ self.body = body
+
+ def index(self, req):
+ return self.body
+
+
+class StubExtensionManager(object):
+
+ def __init__(self, resource_ext=None, action_ext=None, response_ext=None):
+ self.resource_ext = resource_ext
+ self.action_ext = action_ext
+ self.response_ext = response_ext
+
+ def get_name(self):
+ return "Tweedle Beetle Extension"
+
+ def get_alias(self):
+ return "TWDLBETL"
+
+ def get_description(self):
+ return "Provides access to Tweedle Beetles"
+
+ def get_resources(self):
+ resource_exts = []
+ if self.resource_ext:
+ resource_exts.append(self.resource_ext)
+ return resource_exts
+
+ def get_actions(self):
+ action_exts = []
+ if self.action_ext:
+ action_exts.append(self.action_ext)
+ return action_exts
+
+ def get_response_extensions(self):
+ response_exts = []
+ if self.response_ext:
+ response_exts.append(self.response_ext)
+ return response_exts
+
+
+class ExtensionControllerTest(unittest.TestCase):
+
+ def test_index(self):
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank("/extensions")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+
+ def test_get_by_alias(self):
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank("/extensions/FOXNSOX")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+
+
+class ResourceExtensionTest(unittest.TestCase):
+
+ def test_no_extension_present(self):
+ manager = StubExtensionManager(None)
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app, manager)
+ request = webob.Request.blank("/blah")
+ response = request.get_response(ext_midware)
+ self.assertEqual(404, response.status_int)
+
+ def test_get_resources(self):
+ res_ext = extensions.ResourceExtension('tweedles',
+ StubController(response_body))
+ manager = StubExtensionManager(res_ext)
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app, manager)
+ request = webob.Request.blank("/tweedles")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(response_body, response.body)
+
+ def test_get_resources_with_controller(self):
+ res_ext = extensions.ResourceExtension('tweedles',
+ StubController(response_body))
+ manager = StubExtensionManager(res_ext)
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app, manager)
+ request = webob.Request.blank("/tweedles")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(response_body, response.body)
+
+
+class ExtensionManagerTest(unittest.TestCase):
+
+ response_body = "Try to say this Mr. Knox, sir..."
+
+ def setUp(self):
+ FLAGS.osapi_extensions_path = os.path.join(os.path.dirname(__file__),
+ "extensions")
+
+ def test_get_resources(self):
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank("/foxnsocks")
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual(response_body, response.body)
+
+
+class ActionExtensionTest(unittest.TestCase):
+
+ def setUp(self):
+ FLAGS.osapi_extensions_path = os.path.join(os.path.dirname(__file__),
+ "extensions")
+
+ def _send_server_action_request(self, url, body):
+ app = openstack.APIRouterV11()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank(url)
+ request.method = 'POST'
+ request.content_type = 'application/json'
+ request.body = json.dumps(body)
+ response = request.get_response(ext_midware)
+ return response
+
+ def test_extended_action(self):
+ body = dict(add_tweedle=dict(name="test"))
+ response = self._send_server_action_request("/servers/1/action", body)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual("Tweedle Beetle Added.", response.body)
+
+ body = dict(delete_tweedle=dict(name="test"))
+ response = self._send_server_action_request("/servers/1/action", body)
+ self.assertEqual(200, response.status_int)
+ self.assertEqual("Tweedle Beetle Deleted.", response.body)
+
+ def test_invalid_action_body(self):
+ body = dict(blah=dict(name="test")) # Doesn't exist
+ response = self._send_server_action_request("/servers/1/action", body)
+ self.assertEqual(501, response.status_int)
+
+ def test_invalid_action(self):
+ body = dict(blah=dict(name="test"))
+ response = self._send_server_action_request("/asdf/1/action", body)
+ self.assertEqual(404, response.status_int)
+
+
+class ResponseExtensionTest(unittest.TestCase):
+
+ def setUp(self):
+ super(ResponseExtensionTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ fakes.FakeAuthManager.reset_fake_data()
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_auth(self.stubs)
+ self.context = context.get_admin_context()
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ super(ResponseExtensionTest, self).tearDown()
+
+ def test_get_resources_with_stub_mgr(self):
+
+ test_resp = "Gooey goo for chewy chewing!"
+
+ def _resp_handler(res):
+ # only handle JSON responses
+ data = json.loads(res.body)
+ data['flavor']['googoose'] = test_resp
+ return data
+
+ resp_ext = extensions.ResponseExtension('GET',
+ '/v1.1/flavors/:(id)',
+ _resp_handler)
+
+ manager = StubExtensionManager(None, None, resp_ext)
+ app = fakes.wsgi_app()
+ ext_midware = extensions.ExtensionMiddleware(app, manager)
+ request = webob.Request.blank("/v1.1/flavors/1")
+ request.environ['api.version'] = '1.1'
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ response_data = json.loads(response.body)
+ self.assertEqual(test_resp, response_data['flavor']['googoose'])
+
+ def test_get_resources_with_mgr(self):
+
+ test_resp = "Gooey goo for chewy chewing!"
+
+ app = fakes.wsgi_app()
+ ext_midware = extensions.ExtensionMiddleware(app)
+ request = webob.Request.blank("/v1.1/flavors/1")
+ request.environ['api.version'] = '1.1'
+ response = request.get_response(ext_midware)
+ self.assertEqual(200, response.status_int)
+ response_data = json.loads(response.body)
+ self.assertEqual(test_resp, response_data['flavor']['googoose'])
+ self.assertEqual("Pig Bands!", response_data['big_bands'])
diff --git a/nova/tests/api/openstack/test_flavors.py b/nova/tests/api/openstack/test_flavors.py
index 4f504808c..954d72adf 100644
--- a/nova/tests/api/openstack/test_flavors.py
+++ b/nova/tests/api/openstack/test_flavors.py
@@ -19,11 +19,10 @@ import json
import stubout
import webob
-from nova import test
-import nova.api
+import nova.db.api
from nova import context
-from nova.api.openstack import flavors
-from nova import db
+from nova import exception
+from nova import test
from nova.tests.api.openstack import fakes
@@ -48,6 +47,10 @@ def return_instance_types(context, num=2):
return instance_types
+def return_instance_type_not_found(context, flavorid):
+ raise exception.NotFound()
+
+
class FlavorsTest(test.TestCase):
def setUp(self):
super(FlavorsTest, self).setUp()
@@ -67,7 +70,7 @@ class FlavorsTest(test.TestCase):
self.stubs.UnsetAll()
super(FlavorsTest, self).tearDown()
- def test_get_flavor_list(self):
+ def test_get_flavor_list_v1_0(self):
req = webob.Request.blank('/v1.0/flavors')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
@@ -84,7 +87,7 @@ class FlavorsTest(test.TestCase):
]
self.assertEqual(flavors, expected)
- def test_get_flavor_list_detail(self):
+ def test_get_flavor_list_detail_v1_0(self):
req = webob.Request.blank('/v1.0/flavors/detail')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
@@ -105,7 +108,7 @@ class FlavorsTest(test.TestCase):
]
self.assertEqual(flavors, expected)
- def test_get_flavor_by_id(self):
+ def test_get_flavor_by_id_v1_0(self):
req = webob.Request.blank('/v1.0/flavors/12')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
@@ -117,3 +120,144 @@ class FlavorsTest(test.TestCase):
"disk": "10",
}
self.assertEqual(flavor, expected)
+
+ def test_get_flavor_by_invalid_id(self):
+ self.stubs.Set(nova.db.api, "instance_type_get_by_flavor_id",
+ return_instance_type_not_found)
+ req = webob.Request.blank('/v1.0/flavors/asdf')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 404)
+
+ def test_get_flavor_by_id_v1_1(self):
+ req = webob.Request.blank('/v1.1/flavors/12')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ flavor = json.loads(res.body)["flavor"]
+ expected = {
+ "id": "12",
+ "name": "flavor 12",
+ "ram": "256",
+ "disk": "10",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/12",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/12",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/12",
+ },
+ ],
+ }
+ self.assertEqual(flavor, expected)
+
+ def test_get_flavor_list_v1_1(self):
+ req = webob.Request.blank('/v1.1/flavors')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ flavor = json.loads(res.body)["flavors"]
+ expected = [
+ {
+ "id": "1",
+ "name": "flavor 1",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ ],
+ },
+ {
+ "id": "2",
+ "name": "flavor 2",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ ],
+ },
+ ]
+ self.assertEqual(flavor, expected)
+
+ def test_get_flavor_list_detail_v1_1(self):
+ req = webob.Request.blank('/v1.1/flavors/detail')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ flavor = json.loads(res.body)["flavors"]
+ expected = [
+ {
+ "id": "1",
+ "name": "flavor 1",
+ "ram": "256",
+ "disk": "10",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/1",
+ },
+ ],
+ },
+ {
+ "id": "2",
+ "name": "flavor 2",
+ "ram": "256",
+ "disk": "10",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/flavors/2",
+ },
+ ],
+ },
+ ]
+ self.assertEqual(flavor, expected)
diff --git a/nova/tests/api/openstack/test_image_metadata.py b/nova/tests/api/openstack/test_image_metadata.py
new file mode 100644
index 000000000..9be753f84
--- /dev/null
+++ b/nova/tests/api/openstack/test_image_metadata.py
@@ -0,0 +1,166 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import stubout
+import unittest
+import webob
+
+
+from nova import flags
+from nova.api import openstack
+from nova.tests.api.openstack import fakes
+import nova.wsgi
+
+
+FLAGS = flags.FLAGS
+
+
+class ImageMetaDataTest(unittest.TestCase):
+
+ IMAGE_FIXTURES = [
+ {'status': 'active',
+ 'name': 'image1',
+ 'deleted': False,
+ 'container_format': None,
+ 'created_at': '2011-03-22T17:40:15',
+ 'disk_format': None,
+ 'updated_at': '2011-03-22T17:40:15',
+ 'id': '1',
+ 'location': 'file:///var/lib/glance/images/1',
+ 'is_public': True,
+ 'deleted_at': None,
+ 'properties': {
+ 'type': 'ramdisk',
+ 'key1': 'value1',
+ 'key2': 'value2'
+ },
+ 'size': 5882349},
+ {'status': 'active',
+ 'name': 'image2',
+ 'deleted': False,
+ 'container_format': None,
+ 'created_at': '2011-03-22T17:40:15',
+ 'disk_format': None,
+ 'updated_at': '2011-03-22T17:40:15',
+ 'id': '2',
+ 'location': 'file:///var/lib/glance/images/2',
+ 'is_public': True,
+ 'deleted_at': None,
+ 'properties': {
+ 'type': 'ramdisk',
+ 'key1': 'value1',
+ 'key2': 'value2'
+ },
+ 'size': 5882349},
+ ]
+
+ def setUp(self):
+ super(ImageMetaDataTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ self.orig_image_service = FLAGS.image_service
+ FLAGS.image_service = 'nova.image.glance.GlanceImageService'
+ fakes.FakeAuthManager.auth_data = {}
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_auth(self.stubs)
+ fakes.stub_out_glance(self.stubs, self.IMAGE_FIXTURES)
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ FLAGS.image_service = self.orig_image_service
+ super(ImageMetaDataTest, self).tearDown()
+
+ def test_index(self):
+ req = webob.Request.blank('/v1.1/images/1/meta')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value1', res_dict['metadata']['key1'])
+
+ def test_show(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value1', res_dict['key1'])
+
+ def test_show_not_found(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/key9')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(404, res.status_int)
+
+ def test_create(self):
+ req = webob.Request.blank('/v1.1/images/2/meta')
+ req.environ['api.version'] = '1.1'
+ req.method = 'POST'
+ req.body = '{"metadata": {"key9": "value9"}}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value9', res_dict['metadata']['key9'])
+ # other items should not be modified
+ self.assertEqual('value1', res_dict['metadata']['key1'])
+ self.assertEqual('value2', res_dict['metadata']['key2'])
+ self.assertEqual(1, len(res_dict))
+
+ def test_update_item(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "zz"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(200, res.status_int)
+ res_dict = json.loads(res.body)
+ self.assertEqual('zz', res_dict['key1'])
+
+ def test_update_item_too_many_keys(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1", "key2": "value2"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(400, res.status_int)
+
+ def test_update_item_body_uri_mismatch(self):
+ req = webob.Request.blank('/v1.1/images/1/meta/bad')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(400, res.status_int)
+
+ def test_delete(self):
+ req = webob.Request.blank('/v1.1/images/2/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(200, res.status_int)
+
+ def test_delete_not_found(self):
+ req = webob.Request.blank('/v1.1/images/2/meta/blah')
+ req.environ['api.version'] = '1.1'
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(404, res.status_int)
diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py
index a674ccefe..57e447dce 100644
--- a/nova/tests/api/openstack/test_images.py
+++ b/nova/tests/api/openstack/test_images.py
@@ -20,15 +20,18 @@ Tests of the new image services, both as a service layer,
and as a WSGI layer
"""
+import copy
import json
import datetime
import os
import shutil
import tempfile
+import xml.dom.minidom as minidom
import stubout
import webob
+from glance import client as glance_client
from nova import context
from nova import exception
from nova import flags
@@ -42,79 +45,50 @@ from nova.tests.api.openstack import fakes
FLAGS = flags.FLAGS
-class BaseImageServiceTests(object):
-
+class _BaseImageServiceTests(test.TestCase):
"""Tasks to test for all image services"""
- def test_create(self):
-
- fixture = {'name': 'test image',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None}
+ def __init__(self, *args, **kwargs):
+ super(_BaseImageServiceTests, self).__init__(*args, **kwargs)
+ self.service = None
+ self.context = None
+ def test_create(self):
+ fixture = self._make_fixture('test image')
num_images = len(self.service.index(self.context))
- id = self.service.create(self.context, fixture)['id']
+ image_id = self.service.create(self.context, fixture)['id']
- self.assertNotEquals(None, id)
+ self.assertNotEquals(None, image_id)
self.assertEquals(num_images + 1,
len(self.service.index(self.context)))
def test_create_and_show_non_existing_image(self):
-
- fixture = {'name': 'test image',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None}
-
+ fixture = self._make_fixture('test image')
num_images = len(self.service.index(self.context))
- id = self.service.create(self.context, fixture)['id']
-
- self.assertNotEquals(None, id)
+ image_id = self.service.create(self.context, fixture)['id']
+ self.assertNotEquals(None, image_id)
self.assertRaises(exception.NotFound,
self.service.show,
self.context,
'bad image id')
def test_update(self):
-
- fixture = {'name': 'test image',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None}
-
- id = self.service.create(self.context, fixture)['id']
-
+ fixture = self._make_fixture('test image')
+ image_id = self.service.create(self.context, fixture)['id']
fixture['status'] = 'in progress'
- self.service.update(self.context, id, fixture)
- new_image_data = self.service.show(self.context, id)
+ self.service.update(self.context, image_id, fixture)
+
+ new_image_data = self.service.show(self.context, image_id)
self.assertEquals('in progress', new_image_data['status'])
def test_delete(self):
-
- fixtures = [
- {'name': 'test image 1',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None},
- {'name': 'test image 2',
- 'updated': None,
- 'created': None,
- 'status': None,
- 'instance_id': None,
- 'progress': None}]
+ fixture1 = self._make_fixture('test image 1')
+ fixture2 = self._make_fixture('test image 2')
+ fixtures = [fixture1, fixture2]
num_images = len(self.service.index(self.context))
self.assertEquals(0, num_images, str(self.service.index(self.context)))
@@ -132,9 +106,24 @@ class BaseImageServiceTests(object):
num_images = len(self.service.index(self.context))
self.assertEquals(1, num_images)
+ def test_index(self):
+ fixture = self._make_fixture('test image')
+ image_id = self.service.create(self.context, fixture)['id']
+ image_metas = self.service.index(self.context)
+ expected = [{'id': 'DONTCARE', 'name': 'test image'}]
+ self.assertDictListMatch(image_metas, expected)
-class LocalImageServiceTest(test.TestCase,
- BaseImageServiceTests):
+ @staticmethod
+ def _make_fixture(name):
+ fixture = {'name': 'test image',
+ 'updated': None,
+ 'created': None,
+ 'status': None,
+ 'is_public': True}
+ return fixture
+
+
+class LocalImageServiceTest(_BaseImageServiceTests):
"""Tests the local image service"""
@@ -164,11 +153,19 @@ class LocalImageServiceTest(test.TestCase,
self.assertEqual(3, len(found_image_ids), len(found_image_ids))
-class GlanceImageServiceTest(test.TestCase,
- BaseImageServiceTests):
+class GlanceImageServiceTest(_BaseImageServiceTests):
- """Tests the local image service"""
+ """Tests the Glance image service, in particular that metadata translation
+ works properly.
+ At a high level, the translations involved are:
+
+ 1. Glance -> ImageService - This is needed so we can support
+ multple ImageServices (Glance, Local, etc)
+
+ 2. ImageService -> API - This is needed so we can support multple
+ APIs (OpenStack, EC2)
+ """
def setUp(self):
super(GlanceImageServiceTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
@@ -176,43 +173,57 @@ class GlanceImageServiceTest(test.TestCase,
fakes.stub_out_compute_api_snapshot(self.stubs)
service_class = 'nova.image.glance.GlanceImageService'
self.service = utils.import_object(service_class)
- self.context = context.RequestContext(None, None)
+ self.context = context.RequestContext(1, None)
self.service.delete_all()
+ self.sent_to_glance = {}
+ fakes.stub_out_glance_add_image(self.stubs, self.sent_to_glance)
def tearDown(self):
self.stubs.UnsetAll()
super(GlanceImageServiceTest, self).tearDown()
+ def test_create_with_instance_id(self):
+ """Ensure instance_id is persisted as an image-property"""
+ fixture = {'name': 'test image',
+ 'is_public': False,
+ 'properties': {'instance_id': '42', 'user_id': '1'}}
-class ImageControllerWithGlanceServiceTest(test.TestCase):
+ image_id = self.service.create(self.context, fixture)['id']
+ expected = fixture
+ self.assertDictMatch(self.sent_to_glance['metadata'], expected)
+
+ image_meta = self.service.show(self.context, image_id)
+ expected = {'id': image_id,
+ 'name': 'test image',
+ 'is_public': False,
+ 'properties': {'instance_id': '42', 'user_id': '1'}}
+ self.assertDictMatch(image_meta, expected)
- """Test of the OpenStack API /images application controller"""
-
- # Registered images at start of each test.
-
- IMAGE_FIXTURES = [
- {'id': '23g2ogk23k4hhkk4k42l',
- 'imageId': '23g2ogk23k4hhkk4k42l',
- 'name': 'public image #1',
- 'created_at': str(datetime.datetime.utcnow()),
- 'updated_at': str(datetime.datetime.utcnow()),
- 'deleted_at': None,
- 'deleted': False,
- 'is_public': True,
- 'status': 'available',
- 'image_type': 'kernel'},
- {'id': 'slkduhfas73kkaskgdas',
- 'imageId': 'slkduhfas73kkaskgdas',
- 'name': 'public image #2',
- 'created_at': str(datetime.datetime.utcnow()),
- 'updated_at': str(datetime.datetime.utcnow()),
- 'deleted_at': None,
- 'deleted': False,
- 'is_public': True,
- 'status': 'available',
- 'image_type': 'ramdisk'}]
+ image_metas = self.service.detail(self.context)
+ self.assertDictMatch(image_metas[0], expected)
+
+ def test_create_without_instance_id(self):
+ """
+ Ensure we can create an image without having to specify an
+ instance_id. Public images are an example of an image not tied to an
+ instance.
+ """
+ fixture = {'name': 'test image'}
+ image_id = self.service.create(self.context, fixture)['id']
+
+ expected = {'name': 'test image', 'properties': {}}
+ self.assertDictMatch(self.sent_to_glance['metadata'], expected)
+
+
+class ImageControllerWithGlanceServiceTest(test.TestCase):
+ """
+ Test of the OpenStack API /images application controller w/Glance.
+ """
+ NOW_GLANCE_FORMAT = "2010-10-11T10:30:22"
+ NOW_API_FORMAT = "2010-10-11T10:30:22Z"
def setUp(self):
+ """Run before each test."""
super(ImageControllerWithGlanceServiceTest, self).setUp()
self.orig_image_service = FLAGS.image_service
FLAGS.image_service = 'nova.image.glance.GlanceImageService'
@@ -223,44 +234,464 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
fakes.stub_out_rate_limiting(self.stubs)
fakes.stub_out_auth(self.stubs)
fakes.stub_out_key_pair_funcs(self.stubs)
- fakes.stub_out_glance(self.stubs, initial_fixtures=self.IMAGE_FIXTURES)
+ self.fixtures = self._make_image_fixtures()
+ fakes.stub_out_glance(self.stubs, initial_fixtures=self.fixtures)
def tearDown(self):
+ """Run after each test."""
self.stubs.UnsetAll()
FLAGS.image_service = self.orig_image_service
super(ImageControllerWithGlanceServiceTest, self).tearDown()
- def test_get_image_index(self):
- req = webob.Request.blank('/v1.0/images')
- res = req.get_response(fakes.wsgi_app())
- res_dict = json.loads(res.body)
-
- fixture_index = [dict(id=f['id'], name=f['name']) for f
- in self.IMAGE_FIXTURES]
+ def _applicable_fixture(self, fixture, user_id):
+ """Determine if this fixture is applicable for given user id."""
+ is_public = fixture["is_public"]
+ try:
+ uid = int(fixture["properties"]["user_id"])
+ except KeyError:
+ uid = None
+ return uid == user_id or is_public
- for image in res_dict['images']:
- self.assertEquals(1, fixture_index.count(image),
- "image %s not in fixture index!" % str(image))
+ def test_get_image_index(self):
+ request = webob.Request.blank('/v1.0/images')
+ response = request.get_response(fakes.wsgi_app())
+
+ response_dict = json.loads(response.body)
+ response_list = response_dict["images"]
+
+ expected = [{'id': 123, 'name': 'public image'},
+ {'id': 124, 'name': 'queued backup'},
+ {'id': 125, 'name': 'saving backup'},
+ {'id': 126, 'name': 'active backup'},
+ {'id': 127, 'name': 'killed backup'}]
+
+ self.assertDictListMatch(response_list, expected)
+
+ def test_get_image(self):
+ request = webob.Request.blank('/v1.0/images/123')
+ response = request.get_response(fakes.wsgi_app())
+
+ self.assertEqual(200, response.status_int)
+
+ actual_image = json.loads(response.body)
+
+ expected_image = {
+ "image": {
+ "id": 123,
+ "name": "public image",
+ "updated": self.NOW_API_FORMAT,
+ "created": self.NOW_API_FORMAT,
+ "status": "ACTIVE",
+ },
+ }
+
+ self.assertEqual(expected_image, actual_image)
+
+ def test_get_image_v1_1(self):
+ request = webob.Request.blank('/v1.1/images/123')
+ response = request.get_response(fakes.wsgi_app())
+
+ actual_image = json.loads(response.body)
+
+ href = "http://localhost/v1.1/images/123"
+
+ expected_image = {
+ "image": {
+ "id": 123,
+ "name": "public image",
+ "updated": self.NOW_API_FORMAT,
+ "created": self.NOW_API_FORMAT,
+ "status": "ACTIVE",
+ "links": [{
+ "rel": "self",
+ "href": href,
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": href,
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": href,
+ }],
+ },
+ }
+
+ self.assertEqual(expected_image, actual_image)
+
+ def test_get_image_xml(self):
+ request = webob.Request.blank('/v1.0/images/123')
+ request.accept = "application/xml"
+ response = request.get_response(fakes.wsgi_app())
+
+ actual_image = minidom.parseString(response.body.replace(" ", ""))
+
+ expected_now = self.NOW_API_FORMAT
+ expected_image = minidom.parseString("""
+ <image id="123"
+ name="public image"
+ updated="%(expected_now)s"
+ created="%(expected_now)s"
+ status="ACTIVE" />
+ """ % (locals()))
+
+ self.assertEqual(expected_image.toxml(), actual_image.toxml())
+
+ def test_get_image_v1_1_xml(self):
+ request = webob.Request.blank('/v1.1/images/123')
+ request.accept = "application/xml"
+ response = request.get_response(fakes.wsgi_app())
+
+ actual_image = minidom.parseString(response.body.replace(" ", ""))
+
+ expected_href = "http://localhost/v1.1/images/123"
+ expected_now = self.NOW_API_FORMAT
+ expected_image = minidom.parseString("""
+ <image id="123"
+ name="public image"
+ updated="%(expected_now)s"
+ created="%(expected_now)s"
+ status="ACTIVE">
+ <links>
+ <link href="%(expected_href)s" rel="self"/>
+ <link href="%(expected_href)s" rel="bookmark"
+ type="application/json" />
+ <link href="%(expected_href)s" rel="bookmark"
+ type="application/xml" />
+ </links>
+ </image>
+ """.replace(" ", "") % (locals()))
+
+ self.assertEqual(expected_image.toxml(), actual_image.toxml())
+
+ def test_get_image_404_json(self):
+ request = webob.Request.blank('/v1.0/images/NonExistantImage')
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEqual(404, response.status_int)
+
+ expected = {
+ "itemNotFound": {
+ "message": "Image not found.",
+ "code": 404,
+ },
+ }
+
+ actual = json.loads(response.body)
+
+ self.assertEqual(expected, actual)
+
+ def test_get_image_404_xml(self):
+ request = webob.Request.blank('/v1.0/images/NonExistantImage')
+ request.accept = "application/xml"
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEqual(404, response.status_int)
+
+ expected = minidom.parseString("""
+ <itemNotFound code="404">
+ <message>
+ Image not found.
+ </message>
+ </itemNotFound>
+ """.replace(" ", ""))
+
+ actual = minidom.parseString(response.body.replace(" ", ""))
+
+ self.assertEqual(expected.toxml(), actual.toxml())
+
+ def test_get_image_404_v1_1_json(self):
+ request = webob.Request.blank('/v1.1/images/NonExistantImage')
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEqual(404, response.status_int)
+
+ expected = {
+ "itemNotFound": {
+ "message": "Image not found.",
+ "code": 404,
+ },
+ }
+
+ actual = json.loads(response.body)
+
+ self.assertEqual(expected, actual)
+
+ def test_get_image_404_v1_1_xml(self):
+ request = webob.Request.blank('/v1.1/images/NonExistantImage')
+ request.accept = "application/xml"
+ response = request.get_response(fakes.wsgi_app())
+ self.assertEqual(404, response.status_int)
+
+ expected = minidom.parseString("""
+ <itemNotFound code="404">
+ <message>
+ Image not found.
+ </message>
+ </itemNotFound>
+ """.replace(" ", ""))
+
+ actual = minidom.parseString(response.body.replace(" ", ""))
+
+ self.assertEqual(expected.toxml(), actual.toxml())
+
+ def test_get_image_index_v1_1(self):
+ request = webob.Request.blank('/v1.1/images')
+ response = request.get_response(fakes.wsgi_app())
+
+ response_dict = json.loads(response.body)
+ response_list = response_dict["images"]
+
+ fixtures = copy.copy(self.fixtures)
+
+ for image in fixtures:
+ if not self._applicable_fixture(image, 1):
+ fixtures.remove(image)
+ continue
+
+ href = "http://localhost/v1.1/images/%s" % image["id"]
+ test_image = {
+ "id": image["id"],
+ "name": image["name"],
+ "links": [{
+ "rel": "self",
+ "href": "http://localhost/v1.1/images/%s" % image["id"],
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": href,
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": href,
+ }],
+ }
+ self.assertTrue(test_image in response_list)
+
+ self.assertEqual(len(response_list), len(fixtures))
def test_get_image_details(self):
- req = webob.Request.blank('/v1.0/images/detail')
+ request = webob.Request.blank('/v1.0/images/detail')
+ response = request.get_response(fakes.wsgi_app())
+
+ response_dict = json.loads(response.body)
+ response_list = response_dict["images"]
+
+ expected = [{
+ 'id': 123,
+ 'name': 'public image',
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'ACTIVE',
+ },
+ {
+ 'id': 124,
+ 'name': 'queued backup',
+ 'serverId': 42,
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'QUEUED',
+ },
+ {
+ 'id': 125,
+ 'name': 'saving backup',
+ 'serverId': 42,
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'SAVING',
+ 'progress': 0,
+ },
+ {
+ 'id': 126,
+ 'name': 'active backup',
+ 'serverId': 42,
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'ACTIVE'
+ },
+ {
+ 'id': 127,
+ 'name': 'killed backup', 'serverId': 42,
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'FAILED',
+ }]
+
+ self.assertDictListMatch(expected, response_list)
+
+ def test_get_image_details_v1_1(self):
+ request = webob.Request.blank('/v1.1/images/detail')
+ response = request.get_response(fakes.wsgi_app())
+
+ response_dict = json.loads(response.body)
+ response_list = response_dict["images"]
+
+ expected = [{
+ 'id': 123,
+ 'name': 'public image',
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'ACTIVE',
+ "links": [{
+ "rel": "self",
+ "href": "http://localhost/v1.1/images/123",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/images/123",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/images/123",
+ }],
+ },
+ {
+ 'id': 124,
+ 'name': 'queued backup',
+ 'serverId': 42,
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'QUEUED',
+ "links": [{
+ "rel": "self",
+ "href": "http://localhost/v1.1/images/124",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/images/124",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/images/124",
+ }],
+ },
+ {
+ 'id': 125,
+ 'name': 'saving backup',
+ 'serverId': 42,
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'SAVING',
+ 'progress': 0,
+ "links": [{
+ "rel": "self",
+ "href": "http://localhost/v1.1/images/125",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/images/125",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/images/125",
+ }],
+ },
+ {
+ 'id': 126,
+ 'name': 'active backup',
+ 'serverId': 42,
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'ACTIVE',
+ "links": [{
+ "rel": "self",
+ "href": "http://localhost/v1.1/images/126",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/images/126",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/images/126",
+ }],
+ },
+ {
+ 'id': 127,
+ 'name': 'killed backup', 'serverId': 42,
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT,
+ 'status': 'FAILED',
+ "links": [{
+ "rel": "self",
+ "href": "http://localhost/v1.1/images/127",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/images/127",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/images/127",
+ }],
+ }]
+
+ self.assertDictListMatch(expected, response_list)
+
+ def test_get_image_found(self):
+ req = webob.Request.blank('/v1.0/images/123')
+ res = req.get_response(fakes.wsgi_app())
+ image_meta = json.loads(res.body)['image']
+ expected = {'id': 123, 'name': 'public image',
+ 'updated': self.NOW_API_FORMAT,
+ 'created': self.NOW_API_FORMAT, 'status': 'ACTIVE'}
+ self.assertDictMatch(image_meta, expected)
+
+ def test_get_image_non_existent(self):
+ req = webob.Request.blank('/v1.0/images/4242')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 404)
+
+ def test_get_image_not_owned(self):
+ """We should return a 404 if we request an image that doesn't belong
+ to us
+ """
+ req = webob.Request.blank('/v1.0/images/128')
res = req.get_response(fakes.wsgi_app())
- res_dict = json.loads(res.body)
-
- def _is_equivalent_subset(x, y):
- if set(x) <= set(y):
- for k, v in x.iteritems():
- if x[k] != y[k]:
- if x[k] == 'active' and y[k] == 'available':
- continue
- return False
- return True
- return False
-
- for image in res_dict['images']:
- for image_fixture in self.IMAGE_FIXTURES:
- if _is_equivalent_subset(image, image_fixture):
- break
- else:
- self.assertEquals(1, 2, "image %s not in fixtures!" %
- str(image))
+ self.assertEqual(res.status_int, 404)
+
+ @classmethod
+ def _make_image_fixtures(cls):
+ image_id = 123
+ base_attrs = {'created_at': cls.NOW_GLANCE_FORMAT,
+ 'updated_at': cls.NOW_GLANCE_FORMAT,
+ 'deleted_at': None,
+ 'deleted': False}
+
+ fixtures = []
+
+ def add_fixture(**kwargs):
+ kwargs.update(base_attrs)
+ fixtures.append(kwargs)
+
+ # Public image
+ add_fixture(id=image_id, name='public image', is_public=True,
+ status='active', properties={})
+ image_id += 1
+
+ # Backup for User 1
+ backup_properties = {'instance_id': '42', 'user_id': '1'}
+ for status in ('queued', 'saving', 'active', 'killed'):
+ add_fixture(id=image_id, name='%s backup' % status,
+ is_public=False, status=status,
+ properties=backup_properties)
+ image_id += 1
+
+ # Backup for User 2
+ other_backup_properties = {'instance_id': '43', 'user_id': '2'}
+ add_fixture(id=image_id, name='someone elses backup', is_public=False,
+ status='active', properties=other_backup_properties)
+ image_id += 1
+
+ return fixtures
diff --git a/nova/tests/api/openstack/test_server_metadata.py b/nova/tests/api/openstack/test_server_metadata.py
new file mode 100644
index 000000000..c8d456472
--- /dev/null
+++ b/nova/tests/api/openstack/test_server_metadata.py
@@ -0,0 +1,164 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import stubout
+import unittest
+import webob
+
+
+from nova.api import openstack
+from nova.tests.api.openstack import fakes
+import nova.wsgi
+
+
+def return_create_instance_metadata(context, server_id, metadata):
+ return stub_server_metadata()
+
+
+def return_server_metadata(context, server_id):
+ return stub_server_metadata()
+
+
+def return_empty_server_metadata(context, server_id):
+ return {}
+
+
+def delete_server_metadata(context, server_id, key):
+ pass
+
+
+def stub_server_metadata():
+ metadata = {
+ "key1": "value1",
+ "key2": "value2",
+ "key3": "value3",
+ "key4": "value4",
+ "key5": "value5"
+ }
+ return metadata
+
+
+class ServerMetaDataTest(unittest.TestCase):
+
+ def setUp(self):
+ super(ServerMetaDataTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ fakes.FakeAuthManager.auth_data = {}
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_auth(self.stubs)
+ fakes.stub_out_key_pair_funcs(self.stubs)
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ super(ServerMetaDataTest, self).tearDown()
+
+ def test_index(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_get',
+ return_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value1', res_dict['metadata']['key1'])
+
+ def test_index_no_data(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_get',
+ return_empty_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual(0, len(res_dict['metadata']))
+
+ def test_show(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_get',
+ return_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key5')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value5', res_dict['key5'])
+
+ def test_show_meta_not_found(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_get',
+ return_empty_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key6')
+ req.environ['api.version'] = '1.1'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(404, res.status_int)
+
+ def test_delete(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_delete',
+ delete_server_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key5')
+ req.environ['api.version'] = '1.1'
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(200, res.status_int)
+
+ def test_create(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
+ return_create_instance_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta')
+ req.environ['api.version'] = '1.1'
+ req.method = 'POST'
+ req.body = '{"metadata": {"key1": "value1"}}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_int)
+ self.assertEqual('value1', res_dict['metadata']['key1'])
+
+ def test_update_item(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
+ return_create_instance_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(200, res.status_int)
+ res_dict = json.loads(res.body)
+ self.assertEqual('value1', res_dict['key1'])
+
+ def test_update_item_too_many_keys(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
+ return_create_instance_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/key1')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1", "key2": "value2"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(400, res.status_int)
+
+ def test_update_item_body_uri_mismatch(self):
+ self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
+ return_create_instance_metadata)
+ req = webob.Request.blank('/v1.1/servers/1/meta/bad')
+ req.environ['api.version'] = '1.1'
+ req.method = 'PUT'
+ req.body = '{"key1": "value1"}'
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(400, res.status_int)
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index efba2970f..130b8c5d5 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -26,6 +26,7 @@ import webob
from nova import context
from nova import db
+from nova import exception
from nova import flags
from nova import test
import nova.api.openstack
@@ -161,9 +162,36 @@ class ServersTest(test.TestCase):
req = webob.Request.blank('/v1.0/servers/1')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
- self.assertEqual(res_dict['server']['id'], '1')
+ self.assertEqual(res_dict['server']['id'], 1)
self.assertEqual(res_dict['server']['name'], 'server1')
+ def test_get_server_by_id_v11(self):
+ req = webob.Request.blank('/v1.1/servers/1')
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(res_dict['server']['id'], 1)
+ self.assertEqual(res_dict['server']['name'], 'server1')
+
+ expected_links = [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/servers/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/servers/1",
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/servers/1",
+ },
+ ]
+
+ print res_dict['server']
+ self.assertEqual(res_dict['server']['links'], expected_links)
+
def test_get_server_by_id_with_addresses(self):
private = "192.168.0.3"
public = ["1.2.3.4"]
@@ -172,7 +200,7 @@ class ServersTest(test.TestCase):
req = webob.Request.blank('/v1.0/servers/1')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
- self.assertEqual(res_dict['server']['id'], '1')
+ self.assertEqual(res_dict['server']['id'], 1)
self.assertEqual(res_dict['server']['name'], 'server1')
addresses = res_dict['server']['addresses']
self.assertEqual(len(addresses["public"]), len(public))
@@ -180,16 +208,15 @@ class ServersTest(test.TestCase):
self.assertEqual(len(addresses["private"]), 1)
self.assertEqual(addresses["private"][0], private)
- def test_get_server_by_id_with_addresses_v1_1(self):
+ def test_get_server_by_id_with_addresses_v11(self):
private = "192.168.0.3"
public = ["1.2.3.4"]
new_return_server = return_server_with_addresses(private, public)
self.stubs.Set(nova.db.api, 'instance_get', new_return_server)
req = webob.Request.blank('/v1.1/servers/1')
- req.environ['api.version'] = '1.1'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
- self.assertEqual(res_dict['server']['id'], '1')
+ self.assertEqual(res_dict['server']['id'], 1)
self.assertEqual(res_dict['server']['name'], 'server1')
addresses = res_dict['server']['addresses']
self.assertEqual(len(addresses["public"]), len(public))
@@ -211,6 +238,35 @@ class ServersTest(test.TestCase):
self.assertEqual(s.get('imageId', None), None)
i += 1
+ def test_get_server_list_v11(self):
+ req = webob.Request.blank('/v1.1/servers')
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+
+ for i, s in enumerate(res_dict['servers']):
+ self.assertEqual(s['id'], i)
+ self.assertEqual(s['name'], 'server%d' % i)
+ self.assertEqual(s.get('imageId', None), None)
+
+ expected_links = [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1/servers/%d" % (i,),
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/json",
+ "href": "http://localhost/v1.1/servers/%d" % (i,),
+ },
+ {
+ "rel": "bookmark",
+ "type": "application/xml",
+ "href": "http://localhost/v1.1/servers/%d" % (i,),
+ },
+ ]
+
+ self.assertEqual(s['links'], expected_links)
+
def test_get_servers_with_limit(self):
req = webob.Request.blank('/v1.0/servers?limit=3')
res = req.get_response(fakes.wsgi_app())
@@ -239,7 +295,37 @@ class ServersTest(test.TestCase):
servers = json.loads(res.body)['servers']
self.assertEqual([s['id'] for s in servers], [1, 2])
- def _test_create_instance_helper(self):
+ def test_get_servers_with_bad_limit(self):
+ req = webob.Request.blank('/v1.0/servers?limit=asdf&offset=1')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+ self.assertTrue(res.body.find('limit param') > -1)
+
+ def test_get_servers_with_bad_offset(self):
+ req = webob.Request.blank('/v1.0/servers?limit=2&offset=asdf')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+ self.assertTrue(res.body.find('offset param') > -1)
+
+ def test_get_servers_with_marker(self):
+ req = webob.Request.blank('/v1.1/servers?marker=2')
+ res = req.get_response(fakes.wsgi_app())
+ servers = json.loads(res.body)['servers']
+ self.assertEqual([s['id'] for s in servers], [3, 4])
+
+ def test_get_servers_with_limit_and_marker(self):
+ req = webob.Request.blank('/v1.1/servers?limit=2&marker=1')
+ res = req.get_response(fakes.wsgi_app())
+ servers = json.loads(res.body)['servers']
+ self.assertEqual([s['id'] for s in servers], [2, 3])
+
+ def test_get_servers_with_bad_marker(self):
+ req = webob.Request.blank('/v1.1/servers?limit=2&marker=asdf')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+ self.assertTrue(res.body.find('marker param') > -1)
+
+ def _setup_for_create_instance(self):
"""Shared implementation for tests below that create instance"""
def instance_create(context, inst):
return {'id': '1', 'display_name': 'server_test'}
@@ -276,14 +362,17 @@ class ServersTest(test.TestCase):
self.stubs.Set(nova.api.openstack.common,
"get_image_id_from_image_hash", image_id_from_hash)
+ def _test_create_instance_helper(self):
+ self._setup_for_create_instance()
+
body = dict(server=dict(
- name='server_test', imageId=2, flavorId=2,
+ name='server_test', imageId=3, flavorId=2,
metadata={'hello': 'world', 'open': 'stack'},
personality={}))
req = webob.Request.blank('/v1.0/servers')
req.method = 'POST'
req.body = json.dumps(body)
- req.headers["Content-Type"] = "application/json"
+ req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
@@ -291,8 +380,9 @@ class ServersTest(test.TestCase):
self.assertEqual('serv', server['adminPass'][:4])
self.assertEqual(16, len(server['adminPass']))
self.assertEqual('server_test', server['name'])
- self.assertEqual('1', server['id'])
-
+ self.assertEqual(1, server['id'])
+ self.assertEqual(2, server['flavorId'])
+ self.assertEqual(3, server['imageId'])
self.assertEqual(res.status_int, 200)
def test_create_instance(self):
@@ -302,62 +392,233 @@ class ServersTest(test.TestCase):
fakes.stub_out_key_pair_funcs(self.stubs, have_key_pair=False)
self._test_create_instance_helper()
+ def test_create_instance_no_name(self):
+ self._setup_for_create_instance()
+
+ body = {
+ 'server': {
+ 'imageId': 3,
+ 'flavorId': 1,
+ 'metadata': {
+ 'hello': 'world',
+ 'open': 'stack',
+ },
+ 'personality': {},
+ },
+ }
+
+ req = webob.Request.blank('/v1.0/servers')
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_create_instance_nonstring_name(self):
+ self._setup_for_create_instance()
+
+ body = {
+ 'server': {
+ 'name': 12,
+ 'imageId': 3,
+ 'flavorId': 1,
+ 'metadata': {
+ 'hello': 'world',
+ 'open': 'stack',
+ },
+ 'personality': {},
+ },
+ }
+
+ req = webob.Request.blank('/v1.0/servers')
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_create_instance_whitespace_name(self):
+ self._setup_for_create_instance()
+
+ body = {
+ 'server': {
+ 'name': ' ',
+ 'imageId': 3,
+ 'flavorId': 1,
+ 'metadata': {
+ 'hello': 'world',
+ 'open': 'stack',
+ },
+ 'personality': {},
+ },
+ }
+
+ req = webob.Request.blank('/v1.0/servers')
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_create_instance_v11(self):
+ self._setup_for_create_instance()
+
+ imageRef = 'http://localhost/v1.1/images/2'
+ flavorRef = 'http://localhost/v1.1/flavors/3'
+ body = {
+ 'server': {
+ 'name': 'server_test',
+ 'imageRef': imageRef,
+ 'flavorRef': flavorRef,
+ 'metadata': {
+ 'hello': 'world',
+ 'open': 'stack',
+ },
+ 'personality': {},
+ },
+ }
+
+ req = webob.Request.blank('/v1.1/servers')
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ res = req.get_response(fakes.wsgi_app())
+
+ server = json.loads(res.body)['server']
+ self.assertEqual('serv', server['adminPass'][:4])
+ self.assertEqual(16, len(server['adminPass']))
+ self.assertEqual('server_test', server['name'])
+ self.assertEqual(1, server['id'])
+ self.assertEqual(flavorRef, server['flavorRef'])
+ self.assertEqual(imageRef, server['imageRef'])
+ self.assertEqual(res.status_int, 200)
+
+ def test_create_instance_v11_bad_href(self):
+ self._setup_for_create_instance()
+
+ imageRef = 'http://localhost/v1.1/images/asdf'
+ flavorRef = 'http://localhost/v1.1/flavors/3'
+ body = dict(server=dict(
+ name='server_test', imageRef=imageRef, flavorRef=flavorRef,
+ metadata={'hello': 'world', 'open': 'stack'},
+ personality={}))
+ req = webob.Request.blank('/v1.1/servers')
+ req.method = 'POST'
+ req.body = json.dumps(body)
+ req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
def test_update_no_body(self):
req = webob.Request.blank('/v1.0/servers/1')
req.method = 'PUT'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 422)
- def test_update_bad_params(self):
+ def test_update_nonstring_name(self):
+ """ Confirm that update is filtering params """
+ inst_dict = dict(name=12, adminPass='bacon')
+ self.body = json.dumps(dict(server=inst_dict))
+
+ req = webob.Request.blank('/v1.0/servers/1')
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ req.body = self.body
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_update_whitespace_name(self):
""" Confirm that update is filtering params """
- inst_dict = dict(cat='leopard', name='server_test', adminPass='bacon')
+ inst_dict = dict(name=' ', adminPass='bacon')
+ self.body = json.dumps(dict(server=inst_dict))
+
+ req = webob.Request.blank('/v1.0/servers/1')
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ req.body = self.body
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_update_null_name(self):
+ """ Confirm that update is filtering params """
+ inst_dict = dict(name='', adminPass='bacon')
+ self.body = json.dumps(dict(server=inst_dict))
+
+ req = webob.Request.blank('/v1.0/servers/1')
+ req.method = 'PUT'
+ req.content_type = "application/json"
+ req.body = self.body
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_update_server_v10(self):
+ inst_dict = dict(name='server_test', adminPass='bacon')
self.body = json.dumps(dict(server=inst_dict))
def server_update(context, id, params):
- self.update_called = True
- filtered_dict = dict(name='server_test', admin_pass='bacon')
+ filtered_dict = dict(
+ display_name='server_test',
+ admin_pass='bacon',
+ )
self.assertEqual(params, filtered_dict)
+ return filtered_dict
self.stubs.Set(nova.db.api, 'instance_update',
server_update)
req = webob.Request.blank('/v1.0/servers/1')
req.method = 'PUT'
+ req.content_type = "application/json"
req.body = self.body
- req.get_response(fakes.wsgi_app())
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 204)
- def test_update_server(self):
+ def test_update_server_adminPass_ignored_v11(self):
inst_dict = dict(name='server_test', adminPass='bacon')
self.body = json.dumps(dict(server=inst_dict))
def server_update(context, id, params):
- filtered_dict = dict(name='server_test', admin_pass='bacon')
+ filtered_dict = dict(display_name='server_test')
self.assertEqual(params, filtered_dict)
+ return filtered_dict
self.stubs.Set(nova.db.api, 'instance_update',
server_update)
- req = webob.Request.blank('/v1.0/servers/1')
+ req = webob.Request.blank('/v1.1/servers/1')
req.method = 'PUT'
+ req.content_type = "application/json"
req.body = self.body
- req.get_response(fakes.wsgi_app())
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 204)
def test_create_backup_schedules(self):
- req = webob.Request.blank('/v1.0/servers/1/backup_schedules')
+ req = webob.Request.blank('/v1.0/servers/1/backup_schedule')
req.method = 'POST'
res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status, '404 Not Found')
+ self.assertEqual(res.status_int, 501)
def test_delete_backup_schedules(self):
- req = webob.Request.blank('/v1.0/servers/1/backup_schedules')
+ req = webob.Request.blank('/v1.0/servers/1/backup_schedule/1')
req.method = 'DELETE'
res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status, '404 Not Found')
+ self.assertEqual(res.status_int, 501)
def test_get_server_backup_schedules(self):
- req = webob.Request.blank('/v1.0/servers/1/backup_schedules')
+ req = webob.Request.blank('/v1.0/servers/1/backup_schedule')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 501)
+
+ def test_get_server_backup_schedule(self):
+ req = webob.Request.blank('/v1.0/servers/1/backup_schedule/1')
res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status, '404 Not Found')
+ self.assertEqual(res.status_int, 501)
+
+ def test_server_backup_schedule_deprecated_v11(self):
+ req = webob.Request.blank('/v1.1/servers/1/backup_schedule')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 404)
def test_get_all_server_details_v1_0(self):
req = webob.Request.blank('/v1.0/servers/detail')
@@ -374,7 +635,6 @@ class ServersTest(test.TestCase):
def test_get_all_server_details_v1_1(self):
req = webob.Request.blank('/v1.1/servers/detail')
- req.environ['api.version'] = '1.1'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
@@ -524,16 +784,6 @@ class ServersTest(test.TestCase):
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
- def test_server_resize(self):
- body = dict(server=dict(
- name='server_test', imageId=2, flavorId=2, metadata={},
- personality={}))
- req = webob.Request.blank('/v1.0/servers/1/action')
- req.method = 'POST'
- req.content_type = 'application/json'
- req.body = json.dumps(body)
- res = req.get_response(fakes.wsgi_app())
-
def test_delete_server_instance(self):
req = webob.Request.blank('/v1.0/servers/1')
req.method = 'DELETE'
@@ -589,6 +839,18 @@ class ServersTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
+ def test_resized_server_has_correct_status(self):
+ req = self.webreq('/1', 'GET', dict(resize=dict(flavorId=3)))
+
+ def fake_migration_get(*args):
+ return {}
+
+ self.stubs.Set(nova.db, 'migration_get_by_instance_and_status',
+ fake_migration_get)
+ res = req.get_response(fakes.wsgi_app())
+ body = json.loads(res.body)
+ self.assertEqual(body['server']['status'], 'resize-confirm')
+
def test_confirm_resize_server(self):
req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
@@ -943,7 +1205,7 @@ class TestServerInstanceCreation(test.TestCase):
def _setup_mock_compute_api_for_personality(self):
- class MockComputeAPI(object):
+ class MockComputeAPI(nova.compute.API):
def __init__(self):
self.injected_files = None
@@ -1174,3 +1436,57 @@ class TestServerInstanceCreation(test.TestCase):
server = dom.childNodes[0]
self.assertEquals(server.nodeName, 'server')
self.assertTrue(server.getAttribute('adminPass').startswith('fake'))
+
+
+class TestGetKernelRamdiskFromImage(test.TestCase):
+ """
+ If we're building from an AMI-style image, we need to be able to fetch the
+ kernel and ramdisk associated with the machine image. This information is
+ stored with the image metadata and return via the ImageService.
+
+ These tests ensure that we parse the metadata return the ImageService
+ correctly and that we handle failure modes appropriately.
+ """
+
+ def test_status_not_active(self):
+ """We should only allow fetching of kernel and ramdisk information if
+ we have a 'fully-formed' image, aka 'active'
+ """
+ image_meta = {'id': 1, 'status': 'queued'}
+ self.assertRaises(exception.Invalid, self._get_k_r, image_meta)
+
+ def test_not_ami(self):
+ """Anything other than ami should return no kernel and no ramdisk"""
+ image_meta = {'id': 1, 'status': 'active',
+ 'properties': {'disk_format': 'vhd'}}
+ kernel_id, ramdisk_id = self._get_k_r(image_meta)
+ self.assertEqual(kernel_id, None)
+ self.assertEqual(ramdisk_id, None)
+
+ def test_ami_no_kernel(self):
+ """If an ami is missing a kernel it should raise NotFound"""
+ image_meta = {'id': 1, 'status': 'active',
+ 'properties': {'disk_format': 'ami', 'ramdisk_id': 1}}
+ self.assertRaises(exception.NotFound, self._get_k_r, image_meta)
+
+ def test_ami_no_ramdisk(self):
+ """If an ami is missing a ramdisk it should raise NotFound"""
+ image_meta = {'id': 1, 'status': 'active',
+ 'properties': {'disk_format': 'ami', 'kernel_id': 1}}
+ self.assertRaises(exception.NotFound, self._get_k_r, image_meta)
+
+ def test_ami_kernel_ramdisk_present(self):
+ """Return IDs if both kernel and ramdisk are present"""
+ image_meta = {'id': 1, 'status': 'active',
+ 'properties': {'disk_format': 'ami', 'kernel_id': 1,
+ 'ramdisk_id': 2}}
+ kernel_id, ramdisk_id = self._get_k_r(image_meta)
+ self.assertEqual(kernel_id, 1)
+ self.assertEqual(ramdisk_id, 2)
+
+ @staticmethod
+ def _get_k_r(image_meta):
+ """Rebinding function to a shorter name for convenience"""
+ kernel_id, ramdisk_id = \
+ servers.Controller._do_get_kernel_ramdisk_from_image(image_meta)
+ return kernel_id, ramdisk_id
diff --git a/nova/tests/api/openstack/test_shared_ip_groups.py b/nova/tests/api/openstack/test_shared_ip_groups.py
index b4de2ef41..c2bd7e45a 100644
--- a/nova/tests/api/openstack/test_shared_ip_groups.py
+++ b/nova/tests/api/openstack/test_shared_ip_groups.py
@@ -16,25 +16,49 @@
# under the License.
import stubout
+import webob
from nova import test
from nova.api.openstack import shared_ip_groups
+from nova.tests.api.openstack import fakes
class SharedIpGroupsTest(test.TestCase):
def setUp(self):
super(SharedIpGroupsTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
+ fakes.FakeAuthManager.reset_fake_data()
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_auth(self.stubs)
def tearDown(self):
self.stubs.UnsetAll()
super(SharedIpGroupsTest, self).tearDown()
def test_get_shared_ip_groups(self):
- pass
+ req = webob.Request.blank('/v1.0/shared_ip_groups')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 501)
def test_create_shared_ip_group(self):
- pass
+ req = webob.Request.blank('/v1.0/shared_ip_groups')
+ req.method = 'POST'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 501)
+
+ def test_update_shared_ip_group(self):
+ req = webob.Request.blank('/v1.0/shared_ip_groups/12')
+ req.method = 'PUT'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 501)
def test_delete_shared_ip_group(self):
- pass
+ req = webob.Request.blank('/v1.0/shared_ip_groups/12')
+ req.method = 'DELETE'
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 501)
+
+ def test_deprecated_v11(self):
+ req = webob.Request.blank('/v1.1/shared_ip_groups')
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 404)
diff --git a/nova/tests/api/openstack/test_versions.py b/nova/tests/api/openstack/test_versions.py
new file mode 100644
index 000000000..2640a4ddb
--- /dev/null
+++ b/nova/tests/api/openstack/test_versions.py
@@ -0,0 +1,123 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010-2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import webob
+
+from nova import context
+from nova import test
+from nova.tests.api.openstack import fakes
+from nova.api.openstack import views
+
+
+class VersionsTest(test.TestCase):
+ def setUp(self):
+ super(VersionsTest, self).setUp()
+ self.context = context.get_admin_context()
+
+ def tearDown(self):
+ super(VersionsTest, self).tearDown()
+
+ def test_get_version_list(self):
+ req = webob.Request.blank('/')
+ req.accept = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res.content_type, "application/json")
+ versions = json.loads(res.body)["versions"]
+ expected = [
+ {
+ "id": "v1.1",
+ "status": "CURRENT",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.1",
+ }
+ ],
+ },
+ {
+ "id": "v1.0",
+ "status": "DEPRECATED",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://localhost/v1.0",
+ }
+ ],
+ },
+ ]
+ self.assertEqual(versions, expected)
+
+ def test_get_version_list_xml(self):
+ req = webob.Request.blank('/')
+ req.accept = "application/xml"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res.content_type, "application/xml")
+
+ expected = """<versions>
+ <version id="v1.1" status="CURRENT">
+ <links>
+ <link href="http://localhost/v1.1" rel="self"/>
+ </links>
+ </version>
+ <version id="v1.0" status="DEPRECATED">
+ <links>
+ <link href="http://localhost/v1.0" rel="self"/>
+ </links>
+ </version>
+ </versions>""".replace(" ", "").replace("\n", "")
+
+ actual = res.body.replace(" ", "").replace("\n", "")
+
+ self.assertEqual(expected, actual)
+
+ def test_view_builder(self):
+ base_url = "http://example.org/"
+
+ version_data = {
+ "id": "3.2.1",
+ "status": "CURRENT",
+ }
+
+ expected = {
+ "id": "3.2.1",
+ "status": "CURRENT",
+ "links": [
+ {
+ "rel": "self",
+ "href": "http://example.org/3.2.1",
+ },
+ ],
+ }
+
+ builder = views.versions.ViewBuilder(base_url)
+ output = builder.build(version_data)
+
+ self.assertEqual(output, expected)
+
+ def test_generate_href(self):
+ base_url = "http://example.org/app/"
+ version_number = "v1.4.6"
+
+ expected = "http://example.org/app/v1.4.6"
+
+ builder = views.versions.ViewBuilder(base_url)
+ actual = builder.generate_href(version_number)
+
+ self.assertEqual(actual, expected)
diff --git a/nova/tests/api/openstack/test_zones.py b/nova/tests/api/openstack/test_zones.py
index 38399bb3f..a3f191aaa 100644
--- a/nova/tests/api/openstack/test_zones.py
+++ b/nova/tests/api/openstack/test_zones.py
@@ -75,6 +75,10 @@ def zone_get_all_db(context):
]
+def zone_capabilities(method, context, params):
+ return dict()
+
+
class ZonesTest(test.TestCase):
def setUp(self):
super(ZonesTest, self).setUp()
@@ -93,13 +97,18 @@ class ZonesTest(test.TestCase):
self.stubs.Set(nova.db, 'zone_create', zone_create)
self.stubs.Set(nova.db, 'zone_delete', zone_delete)
+ self.old_zone_name = FLAGS.zone_name
+ self.old_zone_capabilities = FLAGS.zone_capabilities
+
def tearDown(self):
self.stubs.UnsetAll()
FLAGS.allow_admin_api = self.allow_admin
+ FLAGS.zone_name = self.old_zone_name
+ FLAGS.zone_capabilities = self.old_zone_capabilities
super(ZonesTest, self).tearDown()
def test_get_zone_list_scheduler(self):
- self.stubs.Set(api.API, '_call_scheduler', zone_get_all_scheduler)
+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler)
req = webob.Request.blank('/v1.0/zones')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
@@ -108,8 +117,7 @@ class ZonesTest(test.TestCase):
self.assertEqual(len(res_dict['zones']), 2)
def test_get_zone_list_db(self):
- self.stubs.Set(api.API, '_call_scheduler',
- zone_get_all_scheduler_empty)
+ self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
req = webob.Request.blank('/v1.0/zones')
req.headers["Content-Type"] = "application/json"
@@ -167,3 +175,18 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['id'], 1)
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])
+
+ def test_zone_info(self):
+ FLAGS.zone_name = 'darksecret'
+ FLAGS.zone_capabilities = ['cap1=a;b', 'cap2=c;d']
+ self.stubs.Set(api, '_call_scheduler', zone_capabilities)
+
+ body = dict(zone=dict(username='zeb', password='sneaky'))
+ req = webob.Request.blank('/v1.0/zones/info')
+
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_dict['zone']['name'], 'darksecret')
+ self.assertEqual(res_dict['zone']['cap1'], 'a;b')
+ self.assertEqual(res_dict['zone']['cap2'], 'c;d')
diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py
index 2d25d5fc5..7ddfe377a 100644
--- a/nova/tests/db/fakes.py
+++ b/nova/tests/db/fakes.py
@@ -24,8 +24,8 @@ from nova import test
from nova import utils
-def stub_out_db_instance_api(stubs):
- """ Stubs out the db API for creating Instances """
+def stub_out_db_instance_api(stubs, injected=True):
+ """Stubs out the db API for creating Instances."""
INSTANCE_TYPES = {
'm1.tiny': dict(memory_mb=512,
@@ -56,8 +56,39 @@ def stub_out_db_instance_api(stubs):
flavorid=5,
rxtx_cap=5)}
+ flat_network_fields = {'id': 'fake_flat',
+ 'bridge': 'xenbr0',
+ 'label': 'fake_flat_network',
+ 'netmask': '255.255.255.0',
+ 'cidr_v6': 'fe80::a00:0/120',
+ 'netmask_v6': '120',
+ 'gateway': '10.0.0.1',
+ 'gateway_v6': 'fe80::a00:1',
+ 'broadcast': '10.0.0.255',
+ 'dns': '10.0.0.2',
+ 'ra_server': None,
+ 'injected': injected}
+
+ vlan_network_fields = {'id': 'fake_vlan',
+ 'bridge': 'br111',
+ 'label': 'fake_vlan_network',
+ 'netmask': '255.255.255.0',
+ 'cidr_v6': 'fe80::a00:0/120',
+ 'netmask_v6': '120',
+ 'gateway': '10.0.0.1',
+ 'gateway_v6': 'fe80::a00:1',
+ 'broadcast': '10.0.0.255',
+ 'dns': '10.0.0.2',
+ 'ra_server': None,
+ 'vlan': 111,
+ 'injected': False}
+
+ fixed_ip_fields = {'address': '10.0.0.3',
+ 'address_v6': 'fe80::a00:3',
+ 'network_id': 'fake_flat'}
+
class FakeModel(object):
- """ Stubs out for model """
+ """Stubs out for model."""
def __init__(self, values):
self.values = values
@@ -76,38 +107,40 @@ def stub_out_db_instance_api(stubs):
def fake_instance_type_get_by_name(context, name):
return INSTANCE_TYPES[name]
- def fake_instance_create(values):
- """ Stubs out the db.instance_create method """
-
- type_data = INSTANCE_TYPES[values['instance_type']]
-
- base_options = {
- 'name': values['name'],
- 'id': values['id'],
- 'reservation_id': utils.generate_uid('r'),
- 'image_id': values['image_id'],
- 'kernel_id': values['kernel_id'],
- 'ramdisk_id': values['ramdisk_id'],
- 'state_description': 'scheduling',
- 'user_id': values['user_id'],
- 'project_id': values['project_id'],
- 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
- 'instance_type': values['instance_type'],
- 'memory_mb': type_data['memory_mb'],
- 'mac_address': values['mac_address'],
- 'vcpus': type_data['vcpus'],
- 'local_gb': type_data['local_gb'],
- 'os_type': values['os_type']}
-
- return FakeModel(base_options)
-
def fake_network_get_by_instance(context, instance_id):
- fields = {
- 'bridge': 'xenbr0',
- }
- return FakeModel(fields)
+ # Even instance numbers are on vlan networks
+ if instance_id % 2 == 0:
+ return FakeModel(vlan_network_fields)
+ else:
+ return FakeModel(flat_network_fields)
+ return FakeModel(network_fields)
+
+ def fake_network_get_all_by_instance(context, instance_id):
+ # Even instance numbers are on vlan networks
+ if instance_id % 2 == 0:
+ return [FakeModel(vlan_network_fields)]
+ else:
+ return [FakeModel(flat_network_fields)]
+
+ def fake_instance_get_fixed_address(context, instance_id):
+ return FakeModel(fixed_ip_fields).address
+
+ def fake_instance_get_fixed_address_v6(context, instance_id):
+ return FakeModel(fixed_ip_fields).address
+
+ def fake_fixed_ip_get_all_by_instance(context, instance_id):
+ return [FakeModel(fixed_ip_fields)]
- stubs.Set(db, 'instance_create', fake_instance_create)
stubs.Set(db, 'network_get_by_instance', fake_network_get_by_instance)
+ stubs.Set(db, 'network_get_all_by_instance',
+ fake_network_get_all_by_instance)
stubs.Set(db, 'instance_type_get_all', fake_instance_type_get_all)
stubs.Set(db, 'instance_type_get_by_name', fake_instance_type_get_by_name)
+ stubs.Set(db, 'instance_get_fixed_address',
+ fake_instance_get_fixed_address)
+ stubs.Set(db, 'instance_get_fixed_address_v6',
+ fake_instance_get_fixed_address_v6)
+ stubs.Set(db, 'network_get_all_by_instance',
+ fake_network_get_all_by_instance)
+ stubs.Set(db, 'fixed_ip_get_all_by_instance',
+ fake_fixed_ip_get_all_by_instance)
diff --git a/nova/tests/fake_utils.py b/nova/tests/fake_utils.py
new file mode 100644
index 000000000..be59970c9
--- /dev/null
+++ b/nova/tests/fake_utils.py
@@ -0,0 +1,109 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""This modules stubs out functions in nova.utils."""
+
+import re
+import types
+
+from eventlet import greenthread
+
+from nova import exception
+from nova import log as logging
+from nova import utils
+
+LOG = logging.getLogger('nova.tests.fake_utils')
+
+_fake_execute_repliers = []
+_fake_execute_log = []
+
+
+def fake_execute_get_log():
+ return _fake_execute_log
+
+
+def fake_execute_clear_log():
+ global _fake_execute_log
+ _fake_execute_log = []
+
+
+def fake_execute_set_repliers(repliers):
+ """Allows the client to configure replies to commands."""
+ global _fake_execute_repliers
+ _fake_execute_repliers = repliers
+
+
+def fake_execute_default_reply_handler(*ignore_args, **ignore_kwargs):
+ """A reply handler for commands that haven't been added to the reply list.
+
+ Returns empty strings for stdout and stderr.
+
+ """
+ return '', ''
+
+
+def fake_execute(*cmd_parts, **kwargs):
+ """This function stubs out execute.
+
+ It optionally executes a preconfigued function to return expected data.
+
+ """
+ global _fake_execute_repliers
+
+ process_input = kwargs.get('process_input', None)
+ addl_env = kwargs.get('addl_env', None)
+ check_exit_code = kwargs.get('check_exit_code', 0)
+ cmd_str = ' '.join(str(part) for part in cmd_parts)
+
+ LOG.debug(_("Faking execution of cmd (subprocess): %s"), cmd_str)
+ _fake_execute_log.append(cmd_str)
+
+ reply_handler = fake_execute_default_reply_handler
+
+ for fake_replier in _fake_execute_repliers:
+ if re.match(fake_replier[0], cmd_str):
+ reply_handler = fake_replier[1]
+ LOG.debug(_('Faked command matched %s') % fake_replier[0])
+ break
+
+ if isinstance(reply_handler, basestring):
+ # If the reply handler is a string, return it as stdout
+ reply = reply_handler, ''
+ else:
+ try:
+ # Alternative is a function, so call it
+ reply = reply_handler(cmd_parts,
+ process_input=process_input,
+ addl_env=addl_env,
+ check_exit_code=check_exit_code)
+ except exception.ProcessExecutionError as e:
+ LOG.debug(_('Faked command raised an exception %s' % str(e)))
+ raise
+
+ stdout = reply[0]
+ stderr = reply[1]
+ LOG.debug(_("Reply to faked command is stdout='%(stdout)s' "
+ "stderr='%(stderr)s'") % locals())
+
+ # Replicate the sleep call in the real function
+ greenthread.sleep(0)
+ return reply
+
+
+def stub_out_utils_execute(stubs):
+ fake_execute_set_repliers([])
+ fake_execute_clear_log()
+ stubs.Set(utils, 'execute', fake_execute)
diff --git a/nova/tests/image/__init__.py b/nova/tests/image/__init__.py
new file mode 100644
index 000000000..b94e2e54e
--- /dev/null
+++ b/nova/tests/image/__init__.py
@@ -0,0 +1,16 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Openstack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/nova/tests/image/test_glance.py b/nova/tests/image/test_glance.py
new file mode 100644
index 000000000..9d0b14613
--- /dev/null
+++ b/nova/tests/image/test_glance.py
@@ -0,0 +1,236 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Openstack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import unittest
+
+from nova import context
+from nova import test
+from nova.image import glance
+
+
+class StubGlanceClient(object):
+
+ def __init__(self, images, add_response=None, update_response=None):
+ self.images = images
+ self.add_response = add_response
+ self.update_response = update_response
+
+ def get_image_meta(self, image_id):
+ return self.images[image_id]
+
+ def get_images_detailed(self):
+ return self.images.itervalues()
+
+ def get_image(self, image_id):
+ return self.images[image_id], []
+
+ def add_image(self, metadata, data):
+ return self.add_response
+
+ def update_image(self, image_id, metadata, data):
+ return self.update_response
+
+
+class NullWriter(object):
+ """Used to test ImageService.get which takes a writer object"""
+
+ def write(self, *arg, **kwargs):
+ pass
+
+
+class BaseGlanceTest(unittest.TestCase):
+ NOW_GLANCE_OLD_FORMAT = "2010-10-11T10:30:22"
+ NOW_GLANCE_FORMAT = "2010-10-11T10:30:22.000000"
+ NOW_DATETIME = datetime.datetime(2010, 10, 11, 10, 30, 22)
+
+ def setUp(self):
+ # FIXME(sirp): we can probably use stubs library here rather than
+ # dependency injection
+ self.client = StubGlanceClient(None)
+ self.service = glance.GlanceImageService(self.client)
+ self.context = context.RequestContext(None, None)
+
+ def assertDateTimesFilled(self, image_meta):
+ self.assertEqual(image_meta['created_at'], self.NOW_DATETIME)
+ self.assertEqual(image_meta['updated_at'], self.NOW_DATETIME)
+ self.assertEqual(image_meta['deleted_at'], self.NOW_DATETIME)
+
+ def assertDateTimesEmpty(self, image_meta):
+ self.assertEqual(image_meta['updated_at'], None)
+ self.assertEqual(image_meta['deleted_at'], None)
+
+ def assertDateTimesBlank(self, image_meta):
+ self.assertEqual(image_meta['updated_at'], '')
+ self.assertEqual(image_meta['deleted_at'], '')
+
+
+class TestGlanceImageServiceProperties(BaseGlanceTest):
+ def test_show_passes_through_to_client(self):
+ """Ensure attributes which aren't BASE_IMAGE_ATTRS are stored in the
+ properties dict
+ """
+ fixtures = {'image1': {'name': 'image1', 'is_public': True,
+ 'foo': 'bar',
+ 'properties': {'prop1': 'propvalue1'}}}
+ self.client.images = fixtures
+ image_meta = self.service.show(self.context, 'image1')
+
+ expected = {'name': 'image1', 'is_public': True,
+ 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}
+ self.assertEqual(image_meta, expected)
+
+ def test_detail_passes_through_to_client(self):
+ fixtures = {'image1': {'name': 'image1', 'is_public': True,
+ 'foo': 'bar',
+ 'properties': {'prop1': 'propvalue1'}}}
+ self.client.images = fixtures
+ image_meta = self.service.detail(self.context)
+ expected = [{'name': 'image1', 'is_public': True,
+ 'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}]
+ self.assertEqual(image_meta, expected)
+
+
+class TestGetterDateTimeNoneTests(BaseGlanceTest):
+
+ def test_show_handles_none_datetimes(self):
+ self.client.images = self._make_none_datetime_fixtures()
+ image_meta = self.service.show(self.context, 'image1')
+ self.assertDateTimesEmpty(image_meta)
+
+ def test_show_handles_blank_datetimes(self):
+ self.client.images = self._make_blank_datetime_fixtures()
+ image_meta = self.service.show(self.context, 'image1')
+ self.assertDateTimesBlank(image_meta)
+
+ def test_detail_handles_none_datetimes(self):
+ self.client.images = self._make_none_datetime_fixtures()
+ image_meta = self.service.detail(self.context)[0]
+ self.assertDateTimesEmpty(image_meta)
+
+ def test_detail_handles_blank_datetimes(self):
+ self.client.images = self._make_blank_datetime_fixtures()
+ image_meta = self.service.detail(self.context)[0]
+ self.assertDateTimesBlank(image_meta)
+
+ def test_get_handles_none_datetimes(self):
+ self.client.images = self._make_none_datetime_fixtures()
+ writer = NullWriter()
+ image_meta = self.service.get(self.context, 'image1', writer)
+ self.assertDateTimesEmpty(image_meta)
+
+ def test_get_handles_blank_datetimes(self):
+ self.client.images = self._make_blank_datetime_fixtures()
+ writer = NullWriter()
+ image_meta = self.service.get(self.context, 'image1', writer)
+ self.assertDateTimesBlank(image_meta)
+
+ def test_show_makes_datetimes(self):
+ self.client.images = self._make_datetime_fixtures()
+ image_meta = self.service.show(self.context, 'image1')
+ self.assertDateTimesFilled(image_meta)
+ image_meta = self.service.show(self.context, 'image2')
+ self.assertDateTimesFilled(image_meta)
+
+ def test_detail_makes_datetimes(self):
+ self.client.images = self._make_datetime_fixtures()
+ image_meta = self.service.detail(self.context)[0]
+ self.assertDateTimesFilled(image_meta)
+ image_meta = self.service.detail(self.context)[1]
+ self.assertDateTimesFilled(image_meta)
+
+ def test_get_makes_datetimes(self):
+ self.client.images = self._make_datetime_fixtures()
+ writer = NullWriter()
+ image_meta = self.service.get(self.context, 'image1', writer)
+ self.assertDateTimesFilled(image_meta)
+ image_meta = self.service.get(self.context, 'image2', writer)
+ self.assertDateTimesFilled(image_meta)
+
+ def _make_datetime_fixtures(self):
+ fixtures = {
+ 'image1': {
+ 'name': 'image1',
+ 'is_public': True,
+ 'created_at': self.NOW_GLANCE_FORMAT,
+ 'updated_at': self.NOW_GLANCE_FORMAT,
+ 'deleted_at': self.NOW_GLANCE_FORMAT,
+ },
+ 'image2': {
+ 'name': 'image2',
+ 'is_public': True,
+ 'created_at': self.NOW_GLANCE_OLD_FORMAT,
+ 'updated_at': self.NOW_GLANCE_OLD_FORMAT,
+ 'deleted_at': self.NOW_GLANCE_OLD_FORMAT,
+ },
+ }
+ return fixtures
+
+ def _make_none_datetime_fixtures(self):
+ fixtures = {'image1': {'name': 'image1', 'is_public': True,
+ 'updated_at': None,
+ 'deleted_at': None}}
+ return fixtures
+
+ def _make_blank_datetime_fixtures(self):
+ fixtures = {'image1': {'name': 'image1', 'is_public': True,
+ 'updated_at': '',
+ 'deleted_at': ''}}
+ return fixtures
+
+
+class TestMutatorDateTimeTests(BaseGlanceTest):
+ """Tests create(), update()"""
+
+ def test_create_handles_datetimes(self):
+ self.client.add_response = self._make_datetime_fixture()
+ image_meta = self.service.create(self.context, {})
+ self.assertDateTimesFilled(image_meta)
+
+ def test_create_handles_none_datetimes(self):
+ self.client.add_response = self._make_none_datetime_fixture()
+ dummy_meta = {}
+ image_meta = self.service.create(self.context, dummy_meta)
+ self.assertDateTimesEmpty(image_meta)
+
+ def test_update_handles_datetimes(self):
+ self.client.update_response = self._make_datetime_fixture()
+ dummy_id = 'dummy_id'
+ dummy_meta = {}
+ image_meta = self.service.update(self.context, 'dummy_id', dummy_meta)
+ self.assertDateTimesFilled(image_meta)
+
+ def test_update_handles_none_datetimes(self):
+ self.client.update_response = self._make_none_datetime_fixture()
+ dummy_id = 'dummy_id'
+ dummy_meta = {}
+ image_meta = self.service.update(self.context, 'dummy_id', dummy_meta)
+ self.assertDateTimesEmpty(image_meta)
+
+ def _make_datetime_fixture(self):
+ fixture = {'id': 'image1', 'name': 'image1', 'is_public': True,
+ 'created_at': self.NOW_GLANCE_FORMAT,
+ 'updated_at': self.NOW_GLANCE_FORMAT,
+ 'deleted_at': self.NOW_GLANCE_FORMAT}
+ return fixture
+
+ def _make_none_datetime_fixture(self):
+ fixture = {'id': 'image1', 'name': 'image1', 'is_public': True,
+ 'updated_at': None,
+ 'deleted_at': None}
+ return fixture
diff --git a/nova/tests/integrated/api/client.py b/nova/tests/integrated/api/client.py
index 245eb8c69..7e20c9b00 100644
--- a/nova/tests/integrated/api/client.py
+++ b/nova/tests/integrated/api/client.py
@@ -56,8 +56,12 @@ class OpenStackApiNotFoundException(OpenStackApiException):
class TestOpenStackClient(object):
- """ A really basic OpenStack API client that is under our control,
- so we can make changes / insert hooks for testing"""
+ """Simple OpenStack API Client.
+
+ This is a really basic OpenStack API client that is under our control,
+ so we can make changes / insert hooks for testing
+
+ """
def __init__(self, auth_user, auth_key, auth_uri):
super(TestOpenStackClient, self).__init__()
@@ -90,6 +94,7 @@ class TestOpenStackClient(object):
LOG.info(_("Doing %(method)s on %(relative_url)s") % locals())
if body:
LOG.info(_("Body: %s") % body)
+ headers.setdefault('Content-Type', 'application/json')
conn.request(method, relative_url, body, headers)
response = conn.getresponse()
@@ -108,9 +113,7 @@ class TestOpenStackClient(object):
http_status = response.status
LOG.debug(_("%(auth_uri)s => code %(http_status)s") % locals())
- # Until bug732866 is fixed, we can't check this properly...
- #if http_status == 401:
- if http_status != 204:
+ if http_status == 401:
raise OpenStackApiAuthenticationException(response=response)
auth_headers = {}
@@ -123,7 +126,7 @@ class TestOpenStackClient(object):
def api_request(self, relative_uri, check_response_status=None, **kwargs):
auth_result = self._authenticate()
- #NOTE(justinsb): httplib 'helpfully' converts headers to lower case
+ # NOTE(justinsb): httplib 'helpfully' converts headers to lower case
base_uri = auth_result['x-server-management-url']
full_uri = base_uri + relative_uri
@@ -210,3 +213,32 @@ class TestOpenStackClient(object):
def delete_flavor(self, flavor_id):
return self.api_delete('/flavors/%s' % flavor_id)
+
+ def get_volume(self, volume_id):
+ return self.api_get('/volumes/%s' % volume_id)['volume']
+
+ def get_volumes(self, detail=True):
+ rel_url = '/volumes/detail' if detail else '/volumes'
+ return self.api_get(rel_url)['volumes']
+
+ def post_volume(self, volume):
+ return self.api_post('/volumes', volume)['volume']
+
+ def delete_volume(self, volume_id):
+ return self.api_delete('/volumes/%s' % volume_id)
+
+ def get_server_volume(self, server_id, attachment_id):
+ return self.api_get('/servers/%s/volume_attachments/%s' %
+ (server_id, attachment_id))['volumeAttachment']
+
+ def get_server_volumes(self, server_id):
+ return self.api_get('/servers/%s/volume_attachments' %
+ (server_id))['volumeAttachments']
+
+ def post_server_volume(self, server_id, volume_attachment):
+ return self.api_post('/servers/%s/volume_attachments' %
+ (server_id), volume_attachment)['volumeAttachment']
+
+ def delete_server_volume(self, server_id, attachment_id):
+ return self.api_delete('/servers/%s/volume_attachments/%s' %
+ (server_id, attachment_id))
diff --git a/nova/tests/integrated/integrated_helpers.py b/nova/tests/integrated/integrated_helpers.py
new file mode 100644
index 000000000..2e5d67017
--- /dev/null
+++ b/nova/tests/integrated/integrated_helpers.py
@@ -0,0 +1,221 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Provides common functionality for integrated unit tests
+"""
+
+import random
+import string
+
+from nova import exception
+from nova import flags
+from nova import service
+from nova import test # For the flags
+from nova.auth import manager
+from nova.log import logging
+from nova.tests.integrated.api import client
+
+
+FLAGS = flags.FLAGS
+
+LOG = logging.getLogger('nova.tests.integrated')
+
+
+def generate_random_alphanumeric(length):
+ """Creates a random alphanumeric string of specified length."""
+ return ''.join(random.choice(string.ascii_uppercase + string.digits)
+ for _x in range(length))
+
+
+def generate_random_numeric(length):
+ """Creates a random numeric string of specified length."""
+ return ''.join(random.choice(string.digits)
+ for _x in range(length))
+
+
+def generate_new_element(items, prefix, numeric=False):
+ """Creates a random string with prefix, that is not in 'items' list."""
+ while True:
+ if numeric:
+ candidate = prefix + generate_random_numeric(8)
+ else:
+ candidate = prefix + generate_random_alphanumeric(8)
+ if not candidate in items:
+ return candidate
+ LOG.debug("Random collision on %s" % candidate)
+
+
+class TestUser(object):
+ def __init__(self, name, secret, auth_url):
+ self.name = name
+ self.secret = secret
+ self.auth_url = auth_url
+
+ if not auth_url:
+ raise exception.Error("auth_url is required")
+ self.openstack_api = client.TestOpenStackClient(self.name,
+ self.secret,
+ self.auth_url)
+
+ def get_unused_server_name(self):
+ servers = self.openstack_api.get_servers()
+ server_names = [server['name'] for server in servers]
+ return generate_new_element(server_names, 'server')
+
+ def get_invalid_image(self):
+ images = self.openstack_api.get_images()
+ image_ids = [image['id'] for image in images]
+ return generate_new_element(image_ids, '', numeric=True)
+
+ def get_valid_image(self, create=False):
+ images = self.openstack_api.get_images()
+ if create and not images:
+ # TODO(justinsb): No way currently to create an image through API
+ #created_image = self.openstack_api.post_image(image)
+ #images.append(created_image)
+ raise exception.Error("No way to create an image through API")
+
+ if images:
+ return images[0]
+ return None
+
+
+class IntegratedUnitTestContext(object):
+ def __init__(self, auth_url):
+ self.auth_manager = manager.AuthManager()
+
+ self.auth_url = auth_url
+ self.project_name = None
+
+ self.test_user = None
+
+ self.setup()
+
+ def setup(self):
+ self._create_test_user()
+
+ def _create_test_user(self):
+ self.test_user = self._create_unittest_user()
+
+ # No way to currently pass this through the OpenStack API
+ self.project_name = 'openstack'
+ self._configure_project(self.project_name, self.test_user)
+
+ def cleanup(self):
+ self.test_user = None
+
+ def _create_unittest_user(self):
+ users = self.auth_manager.get_users()
+ user_names = [user.name for user in users]
+ auth_name = generate_new_element(user_names, 'unittest_user_')
+ auth_key = generate_random_alphanumeric(16)
+
+ # Right now there's a bug where auth_name and auth_key are reversed
+ # bug732907
+ auth_key = auth_name
+
+ self.auth_manager.create_user(auth_name, auth_name, auth_key, False)
+ return TestUser(auth_name, auth_key, self.auth_url)
+
+ def _configure_project(self, project_name, user):
+ projects = self.auth_manager.get_projects()
+ project_names = [project.name for project in projects]
+ if not project_name in project_names:
+ project = self.auth_manager.create_project(project_name,
+ user.name,
+ description=None,
+ member_users=None)
+ else:
+ self.auth_manager.add_to_project(user.name, project_name)
+
+
+class _IntegratedTestBase(test.TestCase):
+ def setUp(self):
+ super(_IntegratedTestBase, self).setUp()
+
+ f = self._get_flags()
+ self.flags(**f)
+
+ # set up services
+ self.start_service('compute')
+ self.start_service('volume')
+ # NOTE(justinsb): There's a bug here which is eluding me...
+ # If we start the network_service, all is good, but then subsequent
+ # tests fail: CloudTestCase.test_ajax_console in particular.
+ #self.start_service('network')
+ self.start_service('scheduler')
+
+ self.auth_url = self._start_api_service()
+
+ self.context = IntegratedUnitTestContext(self.auth_url)
+
+ self.user = self.context.test_user
+ self.api = self.user.openstack_api
+
+ def _start_api_service(self):
+ api_service = service.ApiService.create()
+ api_service.start()
+
+ if not api_service:
+ raise Exception("API Service was None")
+
+ auth_url = 'http://localhost:8774/v1.1'
+ return auth_url
+
+ def tearDown(self):
+ self.context.cleanup()
+ super(_IntegratedTestBase, self).tearDown()
+
+ def _get_flags(self):
+ """An opportunity to setup flags, before the services are started."""
+ f = {}
+ f['image_service'] = 'nova.image.fake.FakeImageService'
+ f['fake_network'] = True
+ return f
+
+ def _build_minimal_create_server_request(self):
+ server = {}
+
+ image = self.user.get_valid_image(create=True)
+ LOG.debug("Image: %s" % image)
+
+ if 'imageRef' in image:
+ image_ref = image['imageRef']
+ else:
+ # NOTE(justinsb): The imageRef code hasn't yet landed
+ LOG.warning("imageRef not yet in images output")
+ image_ref = image['id']
+
+ # TODO(justinsb): This is FUBAR
+ image_ref = abs(hash(image_ref))
+
+ image_ref = 'http://fake.server/%s' % image_ref
+
+ # We now have a valid imageId
+ server['imageRef'] = image_ref
+
+ # Set a valid flavorId
+ flavor = self.api.get_flavors()[0]
+ LOG.debug("Using flavor: %s" % flavor)
+ server['flavorRef'] = 'http://fake.server/%s' % flavor['id']
+
+ # Set a valid server name
+ server_name = self.user.get_unused_server_name()
+ server['name'] = server_name
+
+ return server
diff --git a/nova/tests/integrated/test_extensions.py b/nova/tests/integrated/test_extensions.py
new file mode 100644
index 000000000..0d4ee8cab
--- /dev/null
+++ b/nova/tests/integrated/test_extensions.py
@@ -0,0 +1,44 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from nova import flags
+from nova.log import logging
+from nova.tests.integrated import integrated_helpers
+
+
+LOG = logging.getLogger('nova.tests.integrated')
+
+
+FLAGS = flags.FLAGS
+FLAGS.verbose = True
+
+
+class ExtensionsTest(integrated_helpers._IntegratedTestBase):
+ def _get_flags(self):
+ f = super(ExtensionsTest, self)._get_flags()
+ f['osapi_extensions_path'] = os.path.join(os.path.dirname(__file__),
+ "../api/openstack/extensions")
+ return f
+
+ def test_get_foxnsocks(self):
+ """Simple check that fox-n-socks works."""
+ response = self.api.api_request('/foxnsocks')
+ foxnsocks = response.read()
+ LOG.debug("foxnsocks: %s" % foxnsocks)
+ self.assertEqual('Try to say this Mr. Knox, sir...', foxnsocks)
diff --git a/nova/tests/integrated/test_login.py b/nova/tests/integrated/test_login.py
new file mode 100644
index 000000000..a5180b6bc
--- /dev/null
+++ b/nova/tests/integrated/test_login.py
@@ -0,0 +1,68 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from nova import flags
+from nova.log import logging
+from nova.tests.integrated import integrated_helpers
+from nova.tests.integrated.api import client
+
+
+LOG = logging.getLogger('nova.tests.integrated')
+
+FLAGS = flags.FLAGS
+FLAGS.verbose = True
+
+
+class LoginTest(integrated_helpers._IntegratedTestBase):
+ def test_login(self):
+ """Simple check - we list flavors - so we know we're logged in."""
+ flavors = self.api.get_flavors()
+ for flavor in flavors:
+ LOG.debug(_("flavor: %s") % flavor)
+
+ def test_bad_login_password(self):
+ """Test that I get a 401 with a bad username."""
+ bad_credentials_api = client.TestOpenStackClient(self.user.name,
+ "notso_password",
+ self.user.auth_url)
+
+ self.assertRaises(client.OpenStackApiAuthenticationException,
+ bad_credentials_api.get_flavors)
+
+ def test_bad_login_username(self):
+ """Test that I get a 401 with a bad password."""
+ bad_credentials_api = client.TestOpenStackClient("notso_username",
+ self.user.secret,
+ self.user.auth_url)
+
+ self.assertRaises(client.OpenStackApiAuthenticationException,
+ bad_credentials_api.get_flavors)
+
+ def test_bad_login_both_bad(self):
+ """Test that I get a 401 with both bad username and bad password."""
+ bad_credentials_api = client.TestOpenStackClient("notso_username",
+ "notso_password",
+ self.user.auth_url)
+
+ self.assertRaises(client.OpenStackApiAuthenticationException,
+ bad_credentials_api.get_flavors)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/nova/tests/integrated/test_servers.py b/nova/tests/integrated/test_servers.py
new file mode 100644
index 000000000..749ea8955
--- /dev/null
+++ b/nova/tests/integrated/test_servers.py
@@ -0,0 +1,184 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+import unittest
+
+from nova import flags
+from nova.log import logging
+from nova.tests.integrated import integrated_helpers
+from nova.tests.integrated.api import client
+
+
+LOG = logging.getLogger('nova.tests.integrated')
+
+
+FLAGS = flags.FLAGS
+FLAGS.verbose = True
+
+
+class ServersTest(integrated_helpers._IntegratedTestBase):
+ def test_get_servers(self):
+ """Simple check that listing servers works."""
+ servers = self.api.get_servers()
+ for server in servers:
+ LOG.debug("server: %s" % server)
+
+ def test_create_and_delete_server(self):
+ """Creates and deletes a server."""
+
+ # Create server
+
+ # Build the server data gradually, checking errors along the way
+ server = {}
+ good_server = self._build_minimal_create_server_request()
+
+ post = {'server': server}
+
+ # Without an imageRef, this throws 500.
+ # TODO(justinsb): Check whatever the spec says should be thrown here
+ self.assertRaises(client.OpenStackApiException,
+ self.api.post_server, post)
+
+ # With an invalid imageRef, this throws 500.
+ server['imageRef'] = self.user.get_invalid_image()
+ # TODO(justinsb): Check whatever the spec says should be thrown here
+ self.assertRaises(client.OpenStackApiException,
+ self.api.post_server, post)
+
+ # Add a valid imageId/imageRef
+ server['imageId'] = good_server.get('imageId')
+ server['imageRef'] = good_server.get('imageRef')
+
+ # Without flavorId, this throws 500
+ # TODO(justinsb): Check whatever the spec says should be thrown here
+ self.assertRaises(client.OpenStackApiException,
+ self.api.post_server, post)
+
+ # Set a valid flavorId/flavorRef
+ server['flavorRef'] = good_server.get('flavorRef')
+ server['flavorId'] = good_server.get('flavorId')
+
+ # Without a name, this throws 500
+ # TODO(justinsb): Check whatever the spec says should be thrown here
+ self.assertRaises(client.OpenStackApiException,
+ self.api.post_server, post)
+
+ # Set a valid server name
+ server['name'] = good_server['name']
+
+ created_server = self.api.post_server(post)
+ LOG.debug("created_server: %s" % created_server)
+ self.assertTrue(created_server['id'])
+ created_server_id = created_server['id']
+
+ # Check it's there
+ found_server = self.api.get_server(created_server_id)
+ self.assertEqual(created_server_id, found_server['id'])
+
+ # It should also be in the all-servers list
+ servers = self.api.get_servers()
+ server_ids = [server['id'] for server in servers]
+ self.assertTrue(created_server_id in server_ids)
+
+ # Wait (briefly) for creation
+ retries = 0
+ while found_server['status'] == 'build':
+ LOG.debug("found server: %s" % found_server)
+ time.sleep(1)
+ found_server = self.api.get_server(created_server_id)
+ retries = retries + 1
+ if retries > 5:
+ break
+
+ # It should be available...
+ # TODO(justinsb): Mock doesn't yet do this...
+ #self.assertEqual('available', found_server['status'])
+
+ self._delete_server(created_server_id)
+
+ def _delete_server(self, server_id):
+ # Delete the server
+ self.api.delete_server(server_id)
+
+ # Wait (briefly) for deletion
+ for _retries in range(5):
+ try:
+ found_server = self.api.get_server(server_id)
+ except client.OpenStackApiNotFoundException:
+ found_server = None
+ LOG.debug("Got 404, proceeding")
+ break
+
+ LOG.debug("Found_server=%s" % found_server)
+
+ # TODO(justinsb): Mock doesn't yet do accurate state changes
+ #if found_server['status'] != 'deleting':
+ # break
+ time.sleep(1)
+
+ # Should be gone
+ self.assertFalse(found_server)
+
+# TODO(justinsb): Enable this unit test when the metadata bug is fixed
+# def test_create_server_with_metadata(self):
+# """Creates a server with metadata"""
+#
+# # Build the server data gradually, checking errors along the way
+# server = self._build_minimal_create_server_request()
+#
+# for metadata_count in range(30):
+# metadata = {}
+# for i in range(metadata_count):
+# metadata['key_%s' % i] = 'value_%s' % i
+# server['metadata'] = metadata
+#
+# post = {'server': server}
+# created_server = self.api.post_server(post)
+# LOG.debug("created_server: %s" % created_server)
+# self.assertTrue(created_server['id'])
+# created_server_id = created_server['id']
+# # Reenable when bug fixed
+# # self.assertEqual(metadata, created_server.get('metadata'))
+#
+# # Check it's there
+# found_server = self.api.get_server(created_server_id)
+# self.assertEqual(created_server_id, found_server['id'])
+# self.assertEqual(metadata, found_server.get('metadata'))
+#
+# # The server should also be in the all-servers details list
+# servers = self.api.get_servers(detail=True)
+# server_map = dict((server['id'], server) for server in servers)
+# found_server = server_map.get(created_server_id)
+# self.assertTrue(found_server)
+# # Details do include metadata
+# self.assertEqual(metadata, found_server.get('metadata'))
+#
+# # The server should also be in the all-servers summary list
+# servers = self.api.get_servers(detail=False)
+# server_map = dict((server['id'], server) for server in servers)
+# found_server = server_map.get(created_server_id)
+# self.assertTrue(found_server)
+# # Summary should not include metadata
+# self.assertFalse(found_server.get('metadata'))
+#
+# # Cleanup
+# self._delete_server(created_server_id)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/nova/tests/integrated/test_volumes.py b/nova/tests/integrated/test_volumes.py
new file mode 100644
index 000000000..e9fb3c4d1
--- /dev/null
+++ b/nova/tests/integrated/test_volumes.py
@@ -0,0 +1,295 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+import time
+
+from nova import flags
+from nova.log import logging
+from nova.tests.integrated import integrated_helpers
+from nova.tests.integrated.api import client
+from nova.volume import driver
+
+
+LOG = logging.getLogger('nova.tests.integrated')
+
+
+FLAGS = flags.FLAGS
+FLAGS.verbose = True
+
+
+class VolumesTest(integrated_helpers._IntegratedTestBase):
+ def setUp(self):
+ super(VolumesTest, self).setUp()
+ driver.LoggingVolumeDriver.clear_logs()
+
+ def _get_flags(self):
+ f = super(VolumesTest, self)._get_flags()
+ f['use_local_volumes'] = False # Avoids calling local_path
+ f['volume_driver'] = 'nova.volume.driver.LoggingVolumeDriver'
+ return f
+
+ def test_get_volumes_summary(self):
+ """Simple check that listing volumes works."""
+ volumes = self.api.get_volumes(False)
+ for volume in volumes:
+ LOG.debug("volume: %s" % volume)
+
+ def test_get_volumes(self):
+ """Simple check that listing volumes works."""
+ volumes = self.api.get_volumes()
+ for volume in volumes:
+ LOG.debug("volume: %s" % volume)
+
+ def _poll_while(self, volume_id, continue_states, max_retries=5):
+ """Poll (briefly) while the state is in continue_states."""
+ retries = 0
+ while True:
+ try:
+ found_volume = self.api.get_volume(volume_id)
+ except client.OpenStackApiNotFoundException:
+ found_volume = None
+ LOG.debug("Got 404, proceeding")
+ break
+
+ LOG.debug("Found %s" % found_volume)
+
+ self.assertEqual(volume_id, found_volume['id'])
+
+ if not found_volume['status'] in continue_states:
+ break
+
+ time.sleep(1)
+ retries = retries + 1
+ if retries > max_retries:
+ break
+ return found_volume
+
+ def test_create_and_delete_volume(self):
+ """Creates and deletes a volume."""
+
+ # Create volume
+ created_volume = self.api.post_volume({'volume': {'size': 1}})
+ LOG.debug("created_volume: %s" % created_volume)
+ self.assertTrue(created_volume['id'])
+ created_volume_id = created_volume['id']
+
+ # Check it's there
+ found_volume = self.api.get_volume(created_volume_id)
+ self.assertEqual(created_volume_id, found_volume['id'])
+
+ # It should also be in the all-volume list
+ volumes = self.api.get_volumes()
+ volume_names = [volume['id'] for volume in volumes]
+ self.assertTrue(created_volume_id in volume_names)
+
+ # Wait (briefly) for creation. Delay is due to the 'message queue'
+ found_volume = self._poll_while(created_volume_id, ['creating'])
+
+ # It should be available...
+ self.assertEqual('available', found_volume['status'])
+
+ # Delete the volume
+ self.api.delete_volume(created_volume_id)
+
+ # Wait (briefly) for deletion. Delay is due to the 'message queue'
+ found_volume = self._poll_while(created_volume_id, ['deleting'])
+
+ # Should be gone
+ self.assertFalse(found_volume)
+
+ LOG.debug("Logs: %s" % driver.LoggingVolumeDriver.all_logs())
+
+ create_actions = driver.LoggingVolumeDriver.logs_like(
+ 'create_volume',
+ id=created_volume_id)
+ LOG.debug("Create_Actions: %s" % create_actions)
+
+ self.assertEquals(1, len(create_actions))
+ create_action = create_actions[0]
+ self.assertEquals(create_action['id'], created_volume_id)
+ self.assertEquals(create_action['availability_zone'], 'nova')
+ self.assertEquals(create_action['size'], 1)
+
+ export_actions = driver.LoggingVolumeDriver.logs_like(
+ 'create_export',
+ id=created_volume_id)
+ self.assertEquals(1, len(export_actions))
+ export_action = export_actions[0]
+ self.assertEquals(export_action['id'], created_volume_id)
+ self.assertEquals(export_action['availability_zone'], 'nova')
+
+ delete_actions = driver.LoggingVolumeDriver.logs_like(
+ 'delete_volume',
+ id=created_volume_id)
+ self.assertEquals(1, len(delete_actions))
+ delete_action = export_actions[0]
+ self.assertEquals(delete_action['id'], created_volume_id)
+
+ def test_attach_and_detach_volume(self):
+ """Creates, attaches, detaches and deletes a volume."""
+
+ # Create server
+ server_req = {'server': self._build_minimal_create_server_request()}
+ # NOTE(justinsb): Create an extra server so that server_id != volume_id
+ self.api.post_server(server_req)
+ created_server = self.api.post_server(server_req)
+ LOG.debug("created_server: %s" % created_server)
+ server_id = created_server['id']
+
+ # Create volume
+ created_volume = self.api.post_volume({'volume': {'size': 1}})
+ LOG.debug("created_volume: %s" % created_volume)
+ volume_id = created_volume['id']
+ self._poll_while(volume_id, ['creating'])
+
+ # Check we've got different IDs
+ self.assertNotEqual(server_id, volume_id)
+
+ # List current server attachments - should be none
+ attachments = self.api.get_server_volumes(server_id)
+ self.assertEquals([], attachments)
+
+ # Template attach request
+ device = '/dev/sdc'
+ attach_req = {'device': device}
+ post_req = {'volumeAttachment': attach_req}
+
+ # Try to attach to a non-existent volume; should fail
+ attach_req['volumeId'] = 3405691582
+ self.assertRaises(client.OpenStackApiNotFoundException,
+ self.api.post_server_volume, server_id, post_req)
+
+ # Try to attach to a non-existent server; should fail
+ attach_req['volumeId'] = volume_id
+ self.assertRaises(client.OpenStackApiNotFoundException,
+ self.api.post_server_volume, 3405691582, post_req)
+
+ # Should still be no attachments...
+ attachments = self.api.get_server_volumes(server_id)
+ self.assertEquals([], attachments)
+
+ # Do a real attach
+ attach_req['volumeId'] = volume_id
+ attach_result = self.api.post_server_volume(server_id, post_req)
+ LOG.debug(_("Attachment = %s") % attach_result)
+
+ attachment_id = attach_result['id']
+ self.assertEquals(volume_id, attach_result['volumeId'])
+
+ # These fields aren't set because it's async
+ #self.assertEquals(server_id, attach_result['serverId'])
+ #self.assertEquals(device, attach_result['device'])
+
+ # This is just an implementation detail, but let's check it...
+ self.assertEquals(volume_id, attachment_id)
+
+ # NOTE(justinsb): There's an issue with the attach code, in that
+ # it's currently asynchronous and not recorded until the attach
+ # completes. So the caller must be 'smart', like this...
+ attach_done = None
+ retries = 0
+ while True:
+ try:
+ attach_done = self.api.get_server_volume(server_id,
+ attachment_id)
+ break
+ except client.OpenStackApiNotFoundException:
+ LOG.debug("Got 404, waiting")
+
+ time.sleep(1)
+ retries = retries + 1
+ if retries > 10:
+ break
+
+ expect_attach = {}
+ expect_attach['id'] = volume_id
+ expect_attach['volumeId'] = volume_id
+ expect_attach['serverId'] = server_id
+ expect_attach['device'] = device
+
+ self.assertEqual(expect_attach, attach_done)
+
+ # Should be one attachemnt
+ attachments = self.api.get_server_volumes(server_id)
+ self.assertEquals([expect_attach], attachments)
+
+ # Should be able to get details
+ attachment_info = self.api.get_server_volume(server_id, attachment_id)
+ self.assertEquals(expect_attach, attachment_info)
+
+ # Getting details on a different id should fail
+ self.assertRaises(client.OpenStackApiNotFoundException,
+ self.api.get_server_volume, server_id, 3405691582)
+ self.assertRaises(client.OpenStackApiNotFoundException,
+ self.api.get_server_volume,
+ 3405691582, attachment_id)
+
+ # Trying to detach a different id should fail
+ self.assertRaises(client.OpenStackApiNotFoundException,
+ self.api.delete_server_volume, server_id, 3405691582)
+
+ # Detach should work
+ self.api.delete_server_volume(server_id, attachment_id)
+
+ # Again, it's async, so wait...
+ retries = 0
+ while True:
+ try:
+ attachment = self.api.get_server_volume(server_id,
+ attachment_id)
+ LOG.debug("Attachment still there: %s" % attachment)
+ except client.OpenStackApiNotFoundException:
+ LOG.debug("Got 404, delete done")
+ break
+
+ time.sleep(1)
+ retries = retries + 1
+ self.assertTrue(retries < 10)
+
+ # Should be no attachments again
+ attachments = self.api.get_server_volumes(server_id)
+ self.assertEquals([], attachments)
+
+ LOG.debug("Logs: %s" % driver.LoggingVolumeDriver.all_logs())
+
+ # Discover_volume and undiscover_volume are called from compute
+ # on attach/detach
+
+ disco_moves = driver.LoggingVolumeDriver.logs_like(
+ 'discover_volume',
+ id=volume_id)
+ LOG.debug("discover_volume actions: %s" % disco_moves)
+
+ self.assertEquals(1, len(disco_moves))
+ disco_move = disco_moves[0]
+ self.assertEquals(disco_move['id'], volume_id)
+
+ last_days_of_disco_moves = driver.LoggingVolumeDriver.logs_like(
+ 'undiscover_volume',
+ id=volume_id)
+ LOG.debug("undiscover_volume actions: %s" % last_days_of_disco_moves)
+
+ self.assertEquals(1, len(last_days_of_disco_moves))
+ undisco_move = last_days_of_disco_moves[0]
+ self.assertEquals(undisco_move['id'], volume_id)
+ self.assertEquals(undisco_move['mountpoint'], device)
+ self.assertEquals(undisco_move['instance_id'], server_id)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/nova/tests/network/__init__.py b/nova/tests/network/__init__.py
new file mode 100644
index 000000000..97f96b6fa
--- /dev/null
+++ b/nova/tests/network/__init__.py
@@ -0,0 +1,67 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Utility methods
+"""
+import os
+
+from nova import context
+from nova import db
+from nova import flags
+from nova import log as logging
+from nova import utils
+
+FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.network')
+
+
+def binpath(script):
+ """Returns the absolute path to a script in bin"""
+ return os.path.abspath(os.path.join(__file__, "../../../../bin", script))
+
+
+def lease_ip(private_ip):
+ """Run add command on dhcpbridge"""
+ network_ref = db.fixed_ip_get_network(context.get_admin_context(),
+ private_ip)
+ instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
+ private_ip)
+ cmd = (binpath('nova-dhcpbridge'), 'add',
+ instance_ref['mac_address'],
+ private_ip, 'fake')
+ env = {'DNSMASQ_INTERFACE': network_ref['bridge'],
+ 'TESTING': '1',
+ 'FLAGFILE': FLAGS.dhcpbridge_flagfile}
+ (out, err) = utils.execute(*cmd, addl_env=env)
+ LOG.debug("ISSUE_IP: %s, %s ", out, err)
+
+
+def release_ip(private_ip):
+ """Run del command on dhcpbridge"""
+ network_ref = db.fixed_ip_get_network(context.get_admin_context(),
+ private_ip)
+ instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
+ private_ip)
+ cmd = (binpath('nova-dhcpbridge'), 'del',
+ instance_ref['mac_address'],
+ private_ip, 'fake')
+ env = {'DNSMASQ_INTERFACE': network_ref['bridge'],
+ 'TESTING': '1',
+ 'FLAGFILE': FLAGS.dhcpbridge_flagfile}
+ (out, err) = utils.execute(*cmd, addl_env=env)
+ LOG.debug("RELEASE_IP: %s, %s ", out, err)
diff --git a/nova/tests/network/base.py b/nova/tests/network/base.py
new file mode 100644
index 000000000..988a1de72
--- /dev/null
+++ b/nova/tests/network/base.py
@@ -0,0 +1,154 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Base class of Unit Tests for all network models
+"""
+import IPy
+import os
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova import test
+from nova import utils
+from nova.auth import manager
+
+FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.network')
+
+
+class NetworkTestCase(test.TestCase):
+ """Test cases for network code"""
+ def setUp(self):
+ super(NetworkTestCase, self).setUp()
+ # NOTE(vish): if you change these flags, make sure to change the
+ # flags in the corresponding section in nova-dhcpbridge
+ self.flags(connection_type='fake',
+ fake_call=True,
+ fake_network=True)
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
+ self.projects = []
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.context = context.RequestContext(project=None, user=self.user)
+ for i in range(FLAGS.num_networks):
+ name = 'project%s' % i
+ project = self.manager.create_project(name, 'netuser', name)
+ self.projects.append(project)
+ # create the necessary network data for the project
+ user_context = context.RequestContext(project=self.projects[i],
+ user=self.user)
+ host = self.network.get_network_host(user_context.elevated())
+ instance_ref = self._create_instance(0)
+ self.instance_id = instance_ref['id']
+ instance_ref = self._create_instance(1)
+ self.instance2_id = instance_ref['id']
+
+ def tearDown(self):
+ # TODO(termie): this should really be instantiating clean datastores
+ # in between runs, one failure kills all the tests
+ db.instance_destroy(context.get_admin_context(), self.instance_id)
+ db.instance_destroy(context.get_admin_context(), self.instance2_id)
+ for project in self.projects:
+ self.manager.delete_project(project)
+ self.manager.delete_user(self.user)
+ super(NetworkTestCase, self).tearDown()
+
+ def _create_instance(self, project_num, mac=None):
+ if not mac:
+ mac = utils.generate_mac()
+ project = self.projects[project_num]
+ self.context._project = project
+ self.context.project_id = project.id
+ return db.instance_create(self.context,
+ {'project_id': project.id,
+ 'mac_address': mac})
+
+ def _create_address(self, project_num, instance_id=None):
+ """Create an address in given project num"""
+ if instance_id is None:
+ instance_id = self.instance_id
+ self.context._project = self.projects[project_num]
+ self.context.project_id = self.projects[project_num].id
+ return self.network.allocate_fixed_ip(self.context, instance_id)
+
+ def _deallocate_address(self, project_num, address):
+ self.context._project = self.projects[project_num]
+ self.context.project_id = self.projects[project_num].id
+ self.network.deallocate_fixed_ip(self.context, address)
+
+ def _is_allocated_in_project(self, address, project_id):
+ """Returns true if address is in specified project"""
+ project_net = db.network_get_by_bridge(context.get_admin_context(),
+ FLAGS.flat_network_bridge)
+ network = db.fixed_ip_get_network(context.get_admin_context(),
+ address)
+ instance = db.fixed_ip_get_instance(context.get_admin_context(),
+ address)
+ # instance exists until release
+ return instance is not None and network['id'] == project_net['id']
+
+ def test_private_ipv6(self):
+ """Make sure ipv6 is OK"""
+ if FLAGS.use_ipv6:
+ instance_ref = self._create_instance(0)
+ address = self._create_address(0, instance_ref['id'])
+ network_ref = db.project_get_network(
+ context.get_admin_context(),
+ self.context.project_id)
+ address_v6 = db.instance_get_fixed_address_v6(
+ context.get_admin_context(),
+ instance_ref['id'])
+ self.assertEqual(instance_ref['mac_address'],
+ utils.to_mac(address_v6))
+ instance_ref2 = db.fixed_ip_get_instance_v6(
+ context.get_admin_context(),
+ address_v6)
+ self.assertEqual(instance_ref['id'], instance_ref2['id'])
+ self.assertEqual(address_v6,
+ utils.to_global_ipv6(
+ network_ref['cidr_v6'],
+ instance_ref['mac_address']))
+ self._deallocate_address(0, address)
+ db.instance_destroy(context.get_admin_context(),
+ instance_ref['id'])
+
+ def test_available_ips(self):
+ """Make sure the number of available ips for the network is correct
+
+ The number of available IP addresses depends on the test
+ environment's setup.
+
+ Network size is set in test fixture's setUp method.
+
+ There are ips reserved at the bottom and top of the range.
+ services (network, gateway, CloudPipe, broadcast)
+ """
+ network = db.project_get_network(context.get_admin_context(),
+ self.projects[0].id)
+ net_size = flags.FLAGS.network_size
+ admin_context = context.get_admin_context()
+ total_ips = (db.network_count_available_ips(admin_context,
+ network['id']) +
+ db.network_count_reserved_ips(admin_context,
+ network['id']) +
+ db.network_count_allocated_ips(admin_context,
+ network['id']))
+ self.assertEqual(total_ips, net_size)
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
deleted file mode 100644
index 4e2ac205e..000000000
--- a/nova/tests/objectstore_unittest.py
+++ /dev/null
@@ -1,315 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Unittets for S3 objectstore clone.
-"""
-
-import boto
-import glob
-import hashlib
-import os
-import shutil
-import tempfile
-
-from boto.s3.connection import S3Connection, OrdinaryCallingFormat
-from twisted.internet import reactor, threads, defer
-from twisted.web import http, server
-
-from nova import context
-from nova import flags
-from nova import objectstore
-from nova import test
-from nova.auth import manager
-from nova.exception import NotEmpty, NotFound
-from nova.objectstore import image
-from nova.objectstore.handler import S3
-
-
-FLAGS = flags.FLAGS
-
-# Create a unique temporary directory. We don't delete after test to
-# allow checking the contents after running tests. Users and/or tools
-# running the tests need to remove the tests directories.
-OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
-
-# Create bucket/images path
-os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
-os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
-
-
-class ObjectStoreTestCase(test.TestCase):
- """Test objectstore API directly."""
-
- def setUp(self):
- """Setup users and projects."""
- super(ObjectStoreTestCase, self).setUp()
- self.flags(buckets_path=os.path.join(OSS_TEMPDIR, 'buckets'),
- images_path=os.path.join(OSS_TEMPDIR, 'images'),
- ca_path=os.path.join(os.path.dirname(__file__), 'CA'))
-
- self.auth_manager = manager.AuthManager()
- self.auth_manager.create_user('user1')
- self.auth_manager.create_user('user2')
- self.auth_manager.create_user('admin_user', admin=True)
- self.auth_manager.create_project('proj1', 'user1', 'a proj', ['user1'])
- self.auth_manager.create_project('proj2', 'user2', 'a proj', ['user2'])
- self.context = context.RequestContext('user1', 'proj1')
-
- def tearDown(self):
- """Tear down users and projects."""
- self.auth_manager.delete_project('proj1')
- self.auth_manager.delete_project('proj2')
- self.auth_manager.delete_user('user1')
- self.auth_manager.delete_user('user2')
- self.auth_manager.delete_user('admin_user')
- super(ObjectStoreTestCase, self).tearDown()
-
- def test_buckets(self):
- """Test the bucket API."""
- objectstore.bucket.Bucket.create('new_bucket', self.context)
- bucket = objectstore.bucket.Bucket('new_bucket')
-
- # creator is authorized to use bucket
- self.assert_(bucket.is_authorized(self.context))
-
- # another user is not authorized
- context2 = context.RequestContext('user2', 'proj2')
- self.assertFalse(bucket.is_authorized(context2))
-
- # admin is authorized to use bucket
- admin_context = context.RequestContext('admin_user', None)
- self.assertTrue(bucket.is_authorized(admin_context))
-
- # new buckets are empty
- self.assertTrue(bucket.list_keys()['Contents'] == [])
-
- # storing keys works
- bucket['foo'] = "bar"
-
- self.assertEquals(len(bucket.list_keys()['Contents']), 1)
-
- self.assertEquals(bucket['foo'].read(), 'bar')
-
- # md5 of key works
- self.assertEquals(bucket['foo'].md5, hashlib.md5('bar').hexdigest())
-
- # deleting non-empty bucket should throw a NotEmpty exception
- self.assertRaises(NotEmpty, bucket.delete)
-
- # deleting key
- del bucket['foo']
-
- # deleting empty bucket
- bucket.delete()
-
- # accessing deleted bucket throws exception
- self.assertRaises(NotFound, objectstore.bucket.Bucket, 'new_bucket')
-
- def test_images(self):
- self.do_test_images('1mb.manifest.xml', True,
- 'image_bucket1', 'i-testing1')
-
- def test_images_no_kernel_or_ramdisk(self):
- self.do_test_images('1mb.no_kernel_or_ramdisk.manifest.xml',
- False, 'image_bucket2', 'i-testing2')
-
- def do_test_images(self, manifest_file, expect_kernel_and_ramdisk,
- image_bucket, image_name):
- "Test the image API."
-
- # create a bucket for our bundle
- objectstore.bucket.Bucket.create(image_bucket, self.context)
- bucket = objectstore.bucket.Bucket(image_bucket)
-
- # upload an image manifest/parts
- bundle_path = os.path.join(os.path.dirname(__file__), 'bundle')
- for path in glob.glob(bundle_path + '/*'):
- bucket[os.path.basename(path)] = open(path, 'rb').read()
-
- # register an image
- image.Image.register_aws_image(image_name,
- '%s/%s' % (image_bucket, manifest_file),
- self.context)
-
- # verify image
- my_img = image.Image(image_name)
- result_image_file = os.path.join(my_img.path, 'image')
- self.assertEqual(os.stat(result_image_file).st_size, 1048576)
-
- sha = hashlib.sha1(open(result_image_file).read()).hexdigest()
- self.assertEqual(sha, '3b71f43ff30f4b15b5cd85dd9e95ebc7e84eb5a3')
-
- if expect_kernel_and_ramdisk:
- # Verify the default kernel and ramdisk are set
- self.assertEqual(my_img.metadata['kernelId'], 'aki-test')
- self.assertEqual(my_img.metadata['ramdiskId'], 'ari-test')
- else:
- # Verify that the default kernel and ramdisk (the one from FLAGS)
- # doesn't get embedded in the metadata
- self.assertFalse('kernelId' in my_img.metadata)
- self.assertFalse('ramdiskId' in my_img.metadata)
-
- # verify image permissions
- context2 = context.RequestContext('user2', 'proj2')
- self.assertFalse(my_img.is_authorized(context2))
-
- # change user-editable fields
- my_img.update_user_editable_fields({'display_name': 'my cool image'})
- self.assertEqual('my cool image', my_img.metadata['displayName'])
- my_img.update_user_editable_fields({'display_name': ''})
- self.assert_(not my_img.metadata['displayName'])
-
-
-class TestHTTPChannel(http.HTTPChannel):
- """Dummy site required for twisted.web"""
-
- def checkPersistence(self, _, __): # pylint: disable=C0103
- """Otherwise we end up with an unclean reactor."""
- return False
-
-
-class TestSite(server.Site):
- """Dummy site required for twisted.web"""
- protocol = TestHTTPChannel
-
-
-class S3APITestCase(test.TestCase):
- """Test objectstore through S3 API."""
-
- def setUp(self):
- """Setup users, projects, and start a test server."""
- super(S3APITestCase, self).setUp()
-
- FLAGS.auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
- FLAGS.buckets_path = os.path.join(OSS_TEMPDIR, 'buckets')
-
- self.auth_manager = manager.AuthManager()
- self.admin_user = self.auth_manager.create_user('admin', admin=True)
- self.admin_project = self.auth_manager.create_project('admin',
- self.admin_user)
-
- shutil.rmtree(FLAGS.buckets_path)
- os.mkdir(FLAGS.buckets_path)
-
- root = S3()
- self.site = TestSite(root)
- # pylint: disable=E1101
- self.listening_port = reactor.listenTCP(0, self.site,
- interface='127.0.0.1')
- # pylint: enable=E1101
- self.tcp_port = self.listening_port.getHost().port
-
- if not boto.config.has_section('Boto'):
- boto.config.add_section('Boto')
- boto.config.set('Boto', 'num_retries', '0')
- self.conn = S3Connection(aws_access_key_id=self.admin_user.access,
- aws_secret_access_key=self.admin_user.secret,
- host='127.0.0.1',
- port=self.tcp_port,
- is_secure=False,
- calling_format=OrdinaryCallingFormat())
-
- def get_http_connection(host, is_secure):
- """Get a new S3 connection, don't attempt to reuse connections."""
- return self.conn.new_http_connection(host, is_secure)
-
- self.conn.get_http_connection = get_http_connection
-
- def _ensure_no_buckets(self, buckets): # pylint: disable=C0111
- self.assertEquals(len(buckets), 0, "Bucket list was not empty")
- return True
-
- def _ensure_one_bucket(self, buckets, name): # pylint: disable=C0111
- self.assertEquals(len(buckets), 1,
- "Bucket list didn't have exactly one element in it")
- self.assertEquals(buckets[0].name, name, "Wrong name")
- return True
-
- def test_000_list_buckets(self):
- """Make sure we are starting with no buckets."""
- deferred = threads.deferToThread(self.conn.get_all_buckets)
- deferred.addCallback(self._ensure_no_buckets)
- return deferred
-
- def test_001_create_and_delete_bucket(self):
- """Test bucket creation and deletion."""
- bucket_name = 'testbucket'
-
- deferred = threads.deferToThread(self.conn.create_bucket, bucket_name)
- deferred.addCallback(lambda _:
- threads.deferToThread(self.conn.get_all_buckets))
-
- deferred.addCallback(self._ensure_one_bucket, bucket_name)
-
- deferred.addCallback(lambda _:
- threads.deferToThread(self.conn.delete_bucket,
- bucket_name))
- deferred.addCallback(lambda _:
- threads.deferToThread(self.conn.get_all_buckets))
- deferred.addCallback(self._ensure_no_buckets)
- return deferred
-
- def test_002_create_bucket_and_key_and_delete_key_again(self):
- """Test key operations on buckets."""
- bucket_name = 'testbucket'
- key_name = 'somekey'
- key_contents = 'somekey'
-
- deferred = threads.deferToThread(self.conn.create_bucket, bucket_name)
- deferred.addCallback(lambda b:
- threads.deferToThread(b.new_key, key_name))
- deferred.addCallback(lambda k:
- threads.deferToThread(k.set_contents_from_string,
- key_contents))
-
- def ensure_key_contents(bucket_name, key_name, contents):
- """Verify contents for a key in the given bucket."""
- bucket = self.conn.get_bucket(bucket_name)
- key = bucket.get_key(key_name)
- self.assertEquals(key.get_contents_as_string(), contents,
- "Bad contents")
-
- deferred.addCallback(lambda _:
- threads.deferToThread(ensure_key_contents,
- bucket_name, key_name,
- key_contents))
-
- def delete_key(bucket_name, key_name):
- """Delete a key for the given bucket."""
- bucket = self.conn.get_bucket(bucket_name)
- key = bucket.get_key(key_name)
- key.delete()
-
- deferred.addCallback(lambda _:
- threads.deferToThread(delete_key, bucket_name,
- key_name))
- deferred.addCallback(lambda _:
- threads.deferToThread(self.conn.get_bucket,
- bucket_name))
- deferred.addCallback(lambda b: threads.deferToThread(b.get_all_keys))
- deferred.addCallback(self._ensure_no_buckets)
- return deferred
-
- def tearDown(self):
- """Tear down auth and test server."""
- self.auth_manager.delete_user('admin')
- self.auth_manager.delete_project('admin')
- stop_listening = defer.maybeDeferred(self.listening_port.stopListening)
- super(S3APITestCase, self).tearDown()
- return defer.DeferredList([stop_listening])
diff --git a/nova/tests/test_auth.py b/nova/tests/test_auth.py
index 885596f56..f8a1b1564 100644
--- a/nova/tests/test_auth.py
+++ b/nova/tests/test_auth.py
@@ -80,10 +80,10 @@ class user_and_project_generator(object):
self.manager.delete_project(self.project)
-class AuthManagerTestCase(object):
+class _AuthManagerBaseTestCase(test.TestCase):
def setUp(self):
FLAGS.auth_driver = self.auth_driver
- super(AuthManagerTestCase, self).setUp()
+ super(_AuthManagerBaseTestCase, self).setUp()
self.flags(connection_type='fake')
self.manager = manager.AuthManager(new=True)
@@ -331,11 +331,11 @@ class AuthManagerTestCase(object):
self.assertTrue(user.is_admin())
-class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
+class AuthManagerLdapTestCase(_AuthManagerBaseTestCase):
auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
-class AuthManagerDbTestCase(AuthManagerTestCase, test.TestCase):
+class AuthManagerDbTestCase(_AuthManagerBaseTestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver'
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index cf8ee7eff..00803d0ad 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -35,31 +35,22 @@ from nova import log as logging
from nova import rpc
from nova import service
from nova import test
+from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.image import local
-from nova.objectstore import image
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.cloud')
-# Temp dirs for working with image attributes through the cloud controller
-# (stole this from objectstore_unittest.py)
-OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
-IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
-os.makedirs(IMAGES_PATH)
-
-# TODO(termie): these tests are rather fragile, they should at the lest be
-# wiping database state after each run
class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
- self.flags(connection_type='fake',
- images_path=IMAGES_PATH)
+ self.flags(connection_type='fake')
self.conn = rpc.Connection.instance()
@@ -70,6 +61,7 @@ class CloudTestCase(test.TestCase):
self.compute = self.start_service('compute')
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
+ self.image_service = utils.import_object(FLAGS.image_service)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
@@ -318,41 +310,6 @@ class CloudTestCase(test.TestCase):
LOG.debug(_("Terminating instance %s"), instance_id)
rv = self.compute.terminate_instance(instance_id)
- @staticmethod
- def _fake_set_image_description(ctxt, image_id, description):
- from nova.objectstore import handler
-
- class req:
- pass
-
- request = req()
- request.context = ctxt
- request.args = {'image_id': [image_id],
- 'description': [description]}
-
- resource = handler.ImagesResource()
- resource.render_POST(request)
-
- def test_user_editable_image_endpoint(self):
- pathdir = os.path.join(FLAGS.images_path, 'ami-testing')
- os.mkdir(pathdir)
- info = {'isPublic': False}
- with open(os.path.join(pathdir, 'info.json'), 'w') as f:
- json.dump(info, f)
- img = image.Image('ami-testing')
- # self.cloud.set_image_description(self.context, 'ami-testing',
- # 'Foo Img')
- # NOTE(vish): Above won't work unless we start objectstore or create
- # a fake version of api/ec2/images.py conn that can
- # call methods directly instead of going through boto.
- # for now, just cheat and call the method directly
- self._fake_set_image_description(self.context, 'ami-testing',
- 'Foo Img')
- self.assertEqual('Foo Img', img.metadata['description'])
- self._fake_set_image_description(self.context, 'ami-testing', '')
- self.assertEqual('', img.metadata['description'])
- shutil.rmtree(pathdir)
-
def test_update_of_instance_display_fields(self):
inst = db.instance_create(self.context, {})
ec2_id = ec2utils.id_to_ec2_id(inst['id'])
diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py
index 3651f4cef..1b0f426d2 100644
--- a/nova/tests/test_compute.py
+++ b/nova/tests/test_compute.py
@@ -44,6 +44,14 @@ flags.DECLARE('stub_network', 'nova.compute.manager')
flags.DECLARE('live_migration_retry_count', 'nova.compute.manager')
+class FakeTime(object):
+ def __init__(self):
+ self.counter = 0
+
+ def sleep(self, t):
+ self.counter += t
+
+
class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
@@ -82,6 +90,21 @@ class ComputeTestCase(test.TestCase):
inst.update(params)
return db.instance_create(self.context, inst)['id']
+ def _create_instance_type(self, params={}):
+ """Create a test instance"""
+ context = self.context.elevated()
+ inst = {}
+ inst['name'] = 'm1.small'
+ inst['memory_mb'] = '1024'
+ inst['vcpus'] = '1'
+ inst['local_gb'] = '20'
+ inst['flavorid'] = '1'
+ inst['swap'] = '2048'
+ inst['rxtx_quota'] = 100
+ inst['rxtx_cap'] = 200
+ inst.update(params)
+ return db.instance_type_create(context, inst)['id']
+
def _create_group(self):
values = {'name': 'testgroup',
'description': 'testgroup',
@@ -263,6 +286,16 @@ class ComputeTestCase(test.TestCase):
console = self.compute.get_ajax_console(self.context,
instance_id)
+ self.assert_(set(['token', 'host', 'port']).issubset(console.keys()))
+ self.compute.terminate_instance(self.context, instance_id)
+
+ def test_vnc_console(self):
+ """Make sure we can a vnc console for an instance."""
+ instance_id = self._create_instance()
+ self.compute.run_instance(self.context, instance_id)
+
+ console = self.compute.get_vnc_console(self.context,
+ instance_id)
self.assert_(console)
self.compute.terminate_instance(self.context, instance_id)
@@ -299,15 +332,53 @@ class ComputeTestCase(test.TestCase):
"""Ensure instance can be migrated/resized"""
instance_id = self._create_instance()
context = self.context.elevated()
+
self.compute.run_instance(self.context, instance_id)
db.instance_update(self.context, instance_id, {'host': 'foo'})
- self.compute.prep_resize(context, instance_id)
+ self.compute.prep_resize(context, instance_id, 1)
migration_ref = db.migration_get_by_instance_and_status(context,
instance_id, 'pre-migrating')
self.compute.resize_instance(context, instance_id,
migration_ref['id'])
self.compute.terminate_instance(context, instance_id)
+ def test_resize_invalid_flavor_fails(self):
+ """Ensure invalid flavors raise"""
+ instance_id = self._create_instance()
+ context = self.context.elevated()
+ self.compute.run_instance(self.context, instance_id)
+
+ self.assertRaises(exception.NotFound, self.compute_api.resize,
+ context, instance_id, 200)
+
+ self.compute.terminate_instance(context, instance_id)
+
+ def test_resize_down_fails(self):
+ """Ensure resizing down raises and fails"""
+ context = self.context.elevated()
+ instance_id = self._create_instance()
+
+ self.compute.run_instance(self.context, instance_id)
+ db.instance_update(self.context, instance_id,
+ {'instance_type': 'm1.xlarge'})
+
+ self.assertRaises(exception.ApiError, self.compute_api.resize,
+ context, instance_id, 1)
+
+ self.compute.terminate_instance(context, instance_id)
+
+ def test_resize_same_size_fails(self):
+ """Ensure invalid flavors raise"""
+ context = self.context.elevated()
+ instance_id = self._create_instance()
+
+ self.compute.run_instance(self.context, instance_id)
+
+ self.assertRaises(exception.ApiError, self.compute_api.resize,
+ context, instance_id, 1)
+
+ self.compute.terminate_instance(context, instance_id)
+
def test_get_by_flavor_id(self):
type = instance_types.get_by_flavor_id(1)
self.assertEqual(type, 'm1.tiny')
@@ -318,10 +389,8 @@ class ComputeTestCase(test.TestCase):
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.Error, self.compute.prep_resize,
- self.context, instance_id)
+ self.context, instance_id, 1)
self.compute.terminate_instance(self.context, instance_id)
- type = instance_types.get_by_flavor_id("1")
- self.assertEqual(type, 'm1.tiny')
def _setup_other_managers(self):
self.volume_manager = utils.import_object(FLAGS.volume_manager)
@@ -342,7 +411,7 @@ class ComputeTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.NotFound,
self.compute.pre_live_migration,
- c, instance_ref['id'])
+ c, instance_ref['id'], time=FakeTime())
def test_pre_live_migration_instance_has_volume(self):
"""Confirm setup_compute_volume is called when volume is mounted."""
@@ -395,7 +464,7 @@ class ComputeTestCase(test.TestCase):
self.compute.driver = drivermock
self.mox.ReplayAll()
- ret = self.compute.pre_live_migration(c, i_ref['id'])
+ ret = self.compute.pre_live_migration(c, i_ref['id'], time=FakeTime())
self.assertEqual(ret, None)
def test_pre_live_migration_setup_compute_node_fail(self):
@@ -428,7 +497,7 @@ class ComputeTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.ProcessExecutionError,
self.compute.pre_live_migration,
- c, i_ref['id'])
+ c, i_ref['id'], time=FakeTime())
def test_live_migration_works_correctly_with_volume(self):
"""Confirm check_for_export to confirm volume health check."""
@@ -575,3 +644,24 @@ class ComputeTestCase(test.TestCase):
db.instance_destroy(c, instance_id)
db.volume_destroy(c, v_ref['id'])
db.floating_ip_destroy(c, flo_addr)
+
+ def test_run_kill_vm(self):
+ """Detect when a vm is terminated behind the scenes"""
+ instance_id = self._create_instance()
+
+ self.compute.run_instance(self.context, instance_id)
+
+ instances = db.instance_get_all(context.get_admin_context())
+ LOG.info(_("Running instances: %s"), instances)
+ self.assertEqual(len(instances), 1)
+
+ instance_name = instances[0].name
+ self.compute.driver.test_remove_vm(instance_name)
+
+ # Force the compute manager to do its periodic poll
+ error_list = self.compute.periodic_tasks(context.get_admin_context())
+ self.assertFalse(error_list)
+
+ instances = db.instance_get_all(context.get_admin_context())
+ LOG.info(_("After force-killing instances: %s"), instances)
+ self.assertEqual(len(instances), 0)
diff --git a/nova/tests/test_direct.py b/nova/tests/test_direct.py
index 80e4d2e1f..588a24b35 100644
--- a/nova/tests/test_direct.py
+++ b/nova/tests/test_direct.py
@@ -25,12 +25,18 @@ import webob
from nova import compute
from nova import context
from nova import exception
+from nova import network
from nova import test
+from nova import volume
from nova import utils
from nova.api import direct
from nova.tests import test_cloud
+class ArbitraryObject(object):
+ pass
+
+
class FakeService(object):
def echo(self, context, data):
return {'data': data}
@@ -39,6 +45,9 @@ class FakeService(object):
return {'user': context.user_id,
'project': context.project_id}
+ def invalid_return(self, context):
+ return ArbitraryObject()
+
class DirectTestCase(test.TestCase):
def setUp(self):
@@ -84,6 +93,12 @@ class DirectTestCase(test.TestCase):
resp_parsed = json.loads(resp.body)
self.assertEqual(resp_parsed['data'], 'foo')
+ def test_invalid(self):
+ req = webob.Request.blank('/fake/invalid_return')
+ req.environ['openstack.context'] = self.context
+ req.method = 'POST'
+ self.assertRaises(exception.Error, req.get_response, self.router)
+
def test_proxy(self):
proxy = direct.Proxy(self.router)
rv = proxy.fake.echo(self.context, data='baz')
@@ -93,12 +108,20 @@ class DirectTestCase(test.TestCase):
class DirectCloudTestCase(test_cloud.CloudTestCase):
def setUp(self):
super(DirectCloudTestCase, self).setUp()
- compute_handle = compute.API(network_api=self.cloud.network_api,
- volume_api=self.cloud.volume_api)
+ compute_handle = compute.API(image_service=self.cloud.image_service)
+ volume_handle = volume.API()
+ network_handle = network.API()
direct.register_service('compute', compute_handle)
+ direct.register_service('volume', volume_handle)
+ direct.register_service('network', network_handle)
+
self.router = direct.JsonParamsMiddleware(direct.Router())
proxy = direct.Proxy(self.router)
self.cloud.compute_api = proxy.compute
+ self.cloud.volume_api = proxy.volume
+ self.cloud.network_api = proxy.network
+ compute_handle.volume_api = proxy.volume
+ compute_handle.network_api = proxy.network
def tearDown(self):
super(DirectCloudTestCase, self).tearDown()
diff --git a/nova/tests/test_flat_network.py b/nova/tests/test_flat_network.py
new file mode 100644
index 000000000..dcc617e25
--- /dev/null
+++ b/nova/tests/test_flat_network.py
@@ -0,0 +1,161 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Unit Tests for flat network code
+"""
+import IPy
+import os
+import unittest
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova import test
+from nova import utils
+from nova.auth import manager
+from nova.tests.network import base
+
+
+FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.network')
+
+
+class FlatNetworkTestCase(base.NetworkTestCase):
+ """Test cases for network code"""
+ def test_public_network_association(self):
+ """Makes sure that we can allocate a public ip"""
+ # TODO(vish): better way of adding floating ips
+
+ self.context._project = self.projects[0]
+ self.context.project_id = self.projects[0].id
+ pubnet = IPy.IP(flags.FLAGS.floating_range)
+ address = str(pubnet[0])
+ try:
+ db.floating_ip_get_by_address(context.get_admin_context(), address)
+ except exception.NotFound:
+ db.floating_ip_create(context.get_admin_context(),
+ {'address': address,
+ 'host': FLAGS.host})
+
+ self.assertRaises(NotImplementedError,
+ self.network.allocate_floating_ip,
+ self.context, self.projects[0].id)
+
+ fix_addr = self._create_address(0)
+ float_addr = address
+ self.assertRaises(NotImplementedError,
+ self.network.associate_floating_ip,
+ self.context, float_addr, fix_addr)
+
+ address = db.instance_get_floating_address(context.get_admin_context(),
+ self.instance_id)
+ self.assertEqual(address, None)
+
+ self.assertRaises(NotImplementedError,
+ self.network.disassociate_floating_ip,
+ self.context, float_addr)
+
+ address = db.instance_get_floating_address(context.get_admin_context(),
+ self.instance_id)
+ self.assertEqual(address, None)
+
+ self.assertRaises(NotImplementedError,
+ self.network.deallocate_floating_ip,
+ self.context, float_addr)
+
+ self.network.deallocate_fixed_ip(self.context, fix_addr)
+ db.floating_ip_destroy(context.get_admin_context(), float_addr)
+
+ def test_allocate_deallocate_fixed_ip(self):
+ """Makes sure that we can allocate and deallocate a fixed ip"""
+ address = self._create_address(0)
+ self.assertTrue(self._is_allocated_in_project(address,
+ self.projects[0].id))
+ self._deallocate_address(0, address)
+
+ # check if the fixed ip address is really deallocated
+ self.assertFalse(self._is_allocated_in_project(address,
+ self.projects[0].id))
+
+ def test_side_effects(self):
+ """Ensures allocating and releasing has no side effects"""
+ address = self._create_address(0)
+ address2 = self._create_address(1, self.instance2_id)
+
+ self.assertTrue(self._is_allocated_in_project(address,
+ self.projects[0].id))
+ self.assertTrue(self._is_allocated_in_project(address2,
+ self.projects[1].id))
+
+ self._deallocate_address(0, address)
+ self.assertFalse(self._is_allocated_in_project(address,
+ self.projects[0].id))
+
+ # First address release shouldn't affect the second
+ self.assertTrue(self._is_allocated_in_project(address2,
+ self.projects[0].id))
+
+ self._deallocate_address(1, address2)
+ self.assertFalse(self._is_allocated_in_project(address2,
+ self.projects[1].id))
+
+ def test_ips_are_reused(self):
+ """Makes sure that ip addresses that are deallocated get reused"""
+ address = self._create_address(0)
+ self.network.deallocate_fixed_ip(self.context, address)
+
+ address2 = self._create_address(0)
+ self.assertEqual(address, address2)
+
+ self.network.deallocate_fixed_ip(self.context, address2)
+
+ def test_too_many_addresses(self):
+ """Test for a NoMoreAddresses exception when all fixed ips are used.
+ """
+ admin_context = context.get_admin_context()
+ network = db.project_get_network(admin_context, self.projects[0].id)
+ num_available_ips = db.network_count_available_ips(admin_context,
+ network['id'])
+ addresses = []
+ instance_ids = []
+ for i in range(num_available_ips):
+ instance_ref = self._create_instance(0)
+ instance_ids.append(instance_ref['id'])
+ address = self._create_address(0, instance_ref['id'])
+ addresses.append(address)
+
+ ip_count = db.network_count_available_ips(context.get_admin_context(),
+ network['id'])
+ self.assertEqual(ip_count, 0)
+ self.assertRaises(db.NoMoreAddresses,
+ self.network.allocate_fixed_ip,
+ self.context,
+ 'foo')
+
+ for i in range(num_available_ips):
+ self.network.deallocate_fixed_ip(self.context, addresses[i])
+ db.instance_destroy(context.get_admin_context(), instance_ids[i])
+ ip_count = db.network_count_available_ips(context.get_admin_context(),
+ network['id'])
+ self.assertEqual(ip_count, num_available_ips)
+
+ def run(self, result=None):
+ if(FLAGS.network_manager == 'nova.network.manager.FlatManager'):
+ super(FlatNetworkTestCase, self).run(result)
diff --git a/nova/tests/test_localization.py b/nova/tests/test_localization.py
index 393d71038..a25809a79 100644
--- a/nova/tests/test_localization.py
+++ b/nova/tests/test_localization.py
@@ -21,9 +21,10 @@ import sys
import unittest
import nova
+from nova import test
-class LocalizationTestCase(unittest.TestCase):
+class LocalizationTestCase(test.TestCase):
def test_multiple_positional_format_placeholders(self):
pat = re.compile("\W_\(")
single_pat = re.compile("\W%\W")
diff --git a/nova/tests/test_misc.py b/nova/tests/test_misc.py
index 1fbaf304f..4e17e1ce0 100644
--- a/nova/tests/test_misc.py
+++ b/nova/tests/test_misc.py
@@ -18,8 +18,12 @@ import errno
import os
import select
+from eventlet import greenpool
+from eventlet import greenthread
+
from nova import test
-from nova.utils import parse_mailmap, str_dict_replace, synchronized
+from nova import utils
+from nova.utils import parse_mailmap, str_dict_replace
class ProjectTestCase(test.TestCase):
@@ -63,7 +67,7 @@ class ProjectTestCase(test.TestCase):
class LockTestCase(test.TestCase):
def test_synchronized_wrapped_function_metadata(self):
- @synchronized('whatever')
+ @utils.synchronized('whatever')
def foo():
"""Bar"""
pass
@@ -72,11 +76,42 @@ class LockTestCase(test.TestCase):
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
- def test_synchronized(self):
+ def test_synchronized_internally(self):
+ """We can lock across multiple green threads"""
+ saved_sem_num = len(utils._semaphores)
+ seen_threads = list()
+
+ @utils.synchronized('testlock2', external=False)
+ def f(id):
+ for x in range(10):
+ seen_threads.append(id)
+ greenthread.sleep(0)
+
+ threads = []
+ pool = greenpool.GreenPool(10)
+ for i in range(10):
+ threads.append(pool.spawn(f, i))
+
+ for thread in threads:
+ thread.wait()
+
+ self.assertEquals(len(seen_threads), 100)
+ # Looking at the seen threads, split it into chunks of 10, and verify
+ # that the last 9 match the first in each chunk.
+ for i in range(10):
+ for j in range(9):
+ self.assertEquals(seen_threads[i * 10],
+ seen_threads[i * 10 + 1 + j])
+
+ self.assertEqual(saved_sem_num, len(utils._semaphores),
+ "Semaphore leak detected")
+
+ def test_synchronized_externally(self):
+ """We can lock across multiple processes"""
rpipe1, wpipe1 = os.pipe()
rpipe2, wpipe2 = os.pipe()
- @synchronized('testlock')
+ @utils.synchronized('testlock1', external=True)
def f(rpipe, wpipe):
try:
os.write(wpipe, "foo")
diff --git a/nova/tests/test_network.py b/nova/tests/test_network.py
index 1e634b388..77f6aaff3 100644
--- a/nova/tests/test_network.py
+++ b/nova/tests/test_network.py
@@ -20,21 +20,10 @@ Unit Tests for network code
"""
import IPy
import os
-import time
-from nova import context
-from nova import db
-from nova import exception
-from nova import flags
-from nova import log as logging
from nova import test
-from nova import utils
-from nova.auth import manager
from nova.network import linux_net
-FLAGS = flags.FLAGS
-LOG = logging.getLogger('nova.tests.network')
-
class IptablesManagerTestCase(test.TestCase):
sample_filter = ['#Generated by iptables-save on Fri Feb 18 15:17:05 2011',
@@ -175,363 +164,3 @@ class IptablesManagerTestCase(test.TestCase):
self.assertTrue('-A %s -j run_tests.py-%s' \
% (chain, chain) in new_lines,
"Built-in chain %s not wrapped" % (chain,))
-
-
-class NetworkTestCase(test.TestCase):
- """Test cases for network code"""
- def setUp(self):
- super(NetworkTestCase, self).setUp()
- # NOTE(vish): if you change these flags, make sure to change the
- # flags in the corresponding section in nova-dhcpbridge
- self.flags(connection_type='fake',
- fake_call=True,
- fake_network=True)
- self.manager = manager.AuthManager()
- self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
- self.projects = []
- self.network = utils.import_object(FLAGS.network_manager)
- self.context = context.RequestContext(project=None, user=self.user)
- for i in range(FLAGS.num_networks):
- name = 'project%s' % i
- project = self.manager.create_project(name, 'netuser', name)
- self.projects.append(project)
- # create the necessary network data for the project
- user_context = context.RequestContext(project=self.projects[i],
- user=self.user)
- host = self.network.get_network_host(user_context.elevated())
- instance_ref = self._create_instance(0)
- self.instance_id = instance_ref['id']
- instance_ref = self._create_instance(1)
- self.instance2_id = instance_ref['id']
-
- def tearDown(self):
- # TODO(termie): this should really be instantiating clean datastores
- # in between runs, one failure kills all the tests
- db.instance_destroy(context.get_admin_context(), self.instance_id)
- db.instance_destroy(context.get_admin_context(), self.instance2_id)
- for project in self.projects:
- self.manager.delete_project(project)
- self.manager.delete_user(self.user)
- super(NetworkTestCase, self).tearDown()
-
- def _create_instance(self, project_num, mac=None):
- if not mac:
- mac = utils.generate_mac()
- project = self.projects[project_num]
- self.context._project = project
- self.context.project_id = project.id
- return db.instance_create(self.context,
- {'project_id': project.id,
- 'mac_address': mac})
-
- def _create_address(self, project_num, instance_id=None):
- """Create an address in given project num"""
- if instance_id is None:
- instance_id = self.instance_id
- self.context._project = self.projects[project_num]
- self.context.project_id = self.projects[project_num].id
- return self.network.allocate_fixed_ip(self.context, instance_id)
-
- def _deallocate_address(self, project_num, address):
- self.context._project = self.projects[project_num]
- self.context.project_id = self.projects[project_num].id
- self.network.deallocate_fixed_ip(self.context, address)
-
- def test_private_ipv6(self):
- """Make sure ipv6 is OK"""
- if FLAGS.use_ipv6:
- instance_ref = self._create_instance(0)
- address = self._create_address(0, instance_ref['id'])
- network_ref = db.project_get_network(
- context.get_admin_context(),
- self.context.project_id)
- address_v6 = db.instance_get_fixed_address_v6(
- context.get_admin_context(),
- instance_ref['id'])
- self.assertEqual(instance_ref['mac_address'],
- utils.to_mac(address_v6))
- instance_ref2 = db.fixed_ip_get_instance_v6(
- context.get_admin_context(),
- address_v6)
- self.assertEqual(instance_ref['id'], instance_ref2['id'])
- self.assertEqual(address_v6,
- utils.to_global_ipv6(
- network_ref['cidr_v6'],
- instance_ref['mac_address']))
- self._deallocate_address(0, address)
- db.instance_destroy(context.get_admin_context(),
- instance_ref['id'])
-
- def test_public_network_association(self):
- """Makes sure that we can allocaate a public ip"""
- # TODO(vish): better way of adding floating ips
- self.context._project = self.projects[0]
- self.context.project_id = self.projects[0].id
- pubnet = IPy.IP(flags.FLAGS.floating_range)
- address = str(pubnet[0])
- try:
- db.floating_ip_get_by_address(context.get_admin_context(), address)
- except exception.NotFound:
- db.floating_ip_create(context.get_admin_context(),
- {'address': address,
- 'host': FLAGS.host})
- float_addr = self.network.allocate_floating_ip(self.context,
- self.projects[0].id)
- fix_addr = self._create_address(0)
- lease_ip(fix_addr)
- self.assertEqual(float_addr, str(pubnet[0]))
- self.network.associate_floating_ip(self.context, float_addr, fix_addr)
- address = db.instance_get_floating_address(context.get_admin_context(),
- self.instance_id)
- self.assertEqual(address, float_addr)
- self.network.disassociate_floating_ip(self.context, float_addr)
- address = db.instance_get_floating_address(context.get_admin_context(),
- self.instance_id)
- self.assertEqual(address, None)
- self.network.deallocate_floating_ip(self.context, float_addr)
- self.network.deallocate_fixed_ip(self.context, fix_addr)
- release_ip(fix_addr)
- db.floating_ip_destroy(context.get_admin_context(), float_addr)
-
- def test_allocate_deallocate_fixed_ip(self):
- """Makes sure that we can allocate and deallocate a fixed ip"""
- address = self._create_address(0)
- self.assertTrue(is_allocated_in_project(address, self.projects[0].id))
- lease_ip(address)
- self._deallocate_address(0, address)
-
- # Doesn't go away until it's dhcp released
- self.assertTrue(is_allocated_in_project(address, self.projects[0].id))
-
- release_ip(address)
- self.assertFalse(is_allocated_in_project(address, self.projects[0].id))
-
- def test_side_effects(self):
- """Ensures allocating and releasing has no side effects"""
- address = self._create_address(0)
- address2 = self._create_address(1, self.instance2_id)
-
- self.assertTrue(is_allocated_in_project(address, self.projects[0].id))
- self.assertTrue(is_allocated_in_project(address2, self.projects[1].id))
- self.assertFalse(is_allocated_in_project(address, self.projects[1].id))
-
- # Addresses are allocated before they're issued
- lease_ip(address)
- lease_ip(address2)
-
- self._deallocate_address(0, address)
- release_ip(address)
- self.assertFalse(is_allocated_in_project(address, self.projects[0].id))
-
- # First address release shouldn't affect the second
- self.assertTrue(is_allocated_in_project(address2, self.projects[1].id))
-
- self._deallocate_address(1, address2)
- release_ip(address2)
- self.assertFalse(is_allocated_in_project(address2,
- self.projects[1].id))
-
- def test_subnet_edge(self):
- """Makes sure that private ips don't overlap"""
- first = self._create_address(0)
- lease_ip(first)
- instance_ids = []
- for i in range(1, FLAGS.num_networks):
- instance_ref = self._create_instance(i, mac=utils.generate_mac())
- instance_ids.append(instance_ref['id'])
- address = self._create_address(i, instance_ref['id'])
- instance_ref = self._create_instance(i, mac=utils.generate_mac())
- instance_ids.append(instance_ref['id'])
- address2 = self._create_address(i, instance_ref['id'])
- instance_ref = self._create_instance(i, mac=utils.generate_mac())
- instance_ids.append(instance_ref['id'])
- address3 = self._create_address(i, instance_ref['id'])
- lease_ip(address)
- lease_ip(address2)
- lease_ip(address3)
- self.context._project = self.projects[i]
- self.context.project_id = self.projects[i].id
- self.assertFalse(is_allocated_in_project(address,
- self.projects[0].id))
- self.assertFalse(is_allocated_in_project(address2,
- self.projects[0].id))
- self.assertFalse(is_allocated_in_project(address3,
- self.projects[0].id))
- self.network.deallocate_fixed_ip(self.context, address)
- self.network.deallocate_fixed_ip(self.context, address2)
- self.network.deallocate_fixed_ip(self.context, address3)
- release_ip(address)
- release_ip(address2)
- release_ip(address3)
- for instance_id in instance_ids:
- db.instance_destroy(context.get_admin_context(), instance_id)
- self.context._project = self.projects[0]
- self.context.project_id = self.projects[0].id
- self.network.deallocate_fixed_ip(self.context, first)
- self._deallocate_address(0, first)
- release_ip(first)
-
- def test_vpn_ip_and_port_looks_valid(self):
- """Ensure the vpn ip and port are reasonable"""
- self.assert_(self.projects[0].vpn_ip)
- self.assert_(self.projects[0].vpn_port >= FLAGS.vpn_start)
- self.assert_(self.projects[0].vpn_port <= FLAGS.vpn_start +
- FLAGS.num_networks)
-
- def test_too_many_networks(self):
- """Ensure error is raised if we run out of networks"""
- projects = []
- networks_left = (FLAGS.num_networks -
- db.network_count(context.get_admin_context()))
- for i in range(networks_left):
- project = self.manager.create_project('many%s' % i, self.user)
- projects.append(project)
- db.project_get_network(context.get_admin_context(), project.id)
- project = self.manager.create_project('last', self.user)
- projects.append(project)
- self.assertRaises(db.NoMoreNetworks,
- db.project_get_network,
- context.get_admin_context(),
- project.id)
- for project in projects:
- self.manager.delete_project(project)
-
- def test_ips_are_reused(self):
- """Makes sure that ip addresses that are deallocated get reused"""
- address = self._create_address(0)
- lease_ip(address)
- self.network.deallocate_fixed_ip(self.context, address)
- release_ip(address)
-
- address2 = self._create_address(0)
- self.assertEqual(address, address2)
- lease_ip(address)
- self.network.deallocate_fixed_ip(self.context, address2)
- release_ip(address)
-
- def test_available_ips(self):
- """Make sure the number of available ips for the network is correct
-
- The number of available IP addresses depends on the test
- environment's setup.
-
- Network size is set in test fixture's setUp method.
-
- There are ips reserved at the bottom and top of the range.
- services (network, gateway, CloudPipe, broadcast)
- """
- network = db.project_get_network(context.get_admin_context(),
- self.projects[0].id)
- net_size = flags.FLAGS.network_size
- admin_context = context.get_admin_context()
- total_ips = (db.network_count_available_ips(admin_context,
- network['id']) +
- db.network_count_reserved_ips(admin_context,
- network['id']) +
- db.network_count_allocated_ips(admin_context,
- network['id']))
- self.assertEqual(total_ips, net_size)
-
- def test_too_many_addresses(self):
- """Test for a NoMoreAddresses exception when all fixed ips are used.
- """
- admin_context = context.get_admin_context()
- network = db.project_get_network(admin_context, self.projects[0].id)
- num_available_ips = db.network_count_available_ips(admin_context,
- network['id'])
- addresses = []
- instance_ids = []
- for i in range(num_available_ips):
- instance_ref = self._create_instance(0)
- instance_ids.append(instance_ref['id'])
- address = self._create_address(0, instance_ref['id'])
- addresses.append(address)
- lease_ip(address)
-
- ip_count = db.network_count_available_ips(context.get_admin_context(),
- network['id'])
- self.assertEqual(ip_count, 0)
- self.assertRaises(db.NoMoreAddresses,
- self.network.allocate_fixed_ip,
- self.context,
- 'foo')
-
- for i in range(num_available_ips):
- self.network.deallocate_fixed_ip(self.context, addresses[i])
- release_ip(addresses[i])
- db.instance_destroy(context.get_admin_context(), instance_ids[i])
- ip_count = db.network_count_available_ips(context.get_admin_context(),
- network['id'])
- self.assertEqual(ip_count, num_available_ips)
-
- def test_dhcp_lease_output(self):
- admin_ctxt = context.get_admin_context()
- address = self._create_address(0, self.instance_id)
- lease_ip(address)
- network_ref = db.network_get_by_instance(admin_ctxt, self.instance_id)
- leases = linux_net.get_dhcp_leases(context.get_admin_context(),
- network_ref['id'])
- for line in leases.split('\n'):
- seconds, mac, ip, hostname, client_id = line.split(' ')
- self.assertTrue(int(seconds) > time.time(), 'Lease expires in '
- 'the past')
- octets = mac.split(':')
- self.assertEqual(len(octets), 6, "Wrong number of octets "
- "in %s" % (max,))
- for octet in octets:
- self.assertEqual(len(octet), 2, "Oddly sized octet: %s"
- % (octet,))
- # This will throw an exception if the octet is invalid
- int(octet, 16)
-
- # And this will raise an exception in case of an invalid IP
- IPy.IP(ip)
-
- release_ip(address)
-
-
-def is_allocated_in_project(address, project_id):
- """Returns true if address is in specified project"""
- project_net = db.project_get_network(context.get_admin_context(),
- project_id)
- network = db.fixed_ip_get_network(context.get_admin_context(), address)
- instance = db.fixed_ip_get_instance(context.get_admin_context(), address)
- # instance exists until release
- return instance is not None and network['id'] == project_net['id']
-
-
-def binpath(script):
- """Returns the absolute path to a script in bin"""
- return os.path.abspath(os.path.join(__file__, "../../../bin", script))
-
-
-def lease_ip(private_ip):
- """Run add command on dhcpbridge"""
- network_ref = db.fixed_ip_get_network(context.get_admin_context(),
- private_ip)
- instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
- private_ip)
- cmd = (binpath('nova-dhcpbridge'), 'add',
- instance_ref['mac_address'],
- private_ip, 'fake')
- env = {'DNSMASQ_INTERFACE': network_ref['bridge'],
- 'TESTING': '1',
- 'FLAGFILE': FLAGS.dhcpbridge_flagfile}
- (out, err) = utils.execute(*cmd, addl_env=env)
- LOG.debug("ISSUE_IP: %s, %s ", out, err)
-
-
-def release_ip(private_ip):
- """Run del command on dhcpbridge"""
- network_ref = db.fixed_ip_get_network(context.get_admin_context(),
- private_ip)
- instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
- private_ip)
- cmd = (binpath('nova-dhcpbridge'), 'del',
- instance_ref['mac_address'],
- private_ip, 'fake')
- env = {'DNSMASQ_INTERFACE': network_ref['bridge'],
- 'TESTING': '1',
- 'FLAGFILE': FLAGS.dhcpbridge_flagfile}
- (out, err) = utils.execute(*cmd, addl_env=env)
- LOG.debug("RELEASE_IP: %s, %s ", out, err)
diff --git a/nova/tests/test_objectstore.py b/nova/tests/test_objectstore.py
new file mode 100644
index 000000000..c78772f27
--- /dev/null
+++ b/nova/tests/test_objectstore.py
@@ -0,0 +1,148 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unittets for S3 objectstore clone.
+"""
+
+import boto
+import glob
+import hashlib
+import os
+import shutil
+import tempfile
+
+from boto import exception as boto_exception
+from boto.s3 import connection as s3
+
+from nova import context
+from nova import exception
+from nova import flags
+from nova import wsgi
+from nova import test
+from nova.auth import manager
+from nova.objectstore import s3server
+
+
+FLAGS = flags.FLAGS
+
+# Create a unique temporary directory. We don't delete after test to
+# allow checking the contents after running tests. Users and/or tools
+# running the tests need to remove the tests directories.
+OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
+
+# Create bucket/images path
+os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
+os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
+
+
+class S3APITestCase(test.TestCase):
+ """Test objectstore through S3 API."""
+
+ def setUp(self):
+ """Setup users, projects, and start a test server."""
+ super(S3APITestCase, self).setUp()
+ self.flags(auth_driver='nova.auth.ldapdriver.FakeLdapDriver',
+ buckets_path=os.path.join(OSS_TEMPDIR, 'buckets'),
+ s3_host='127.0.0.1')
+
+ self.auth_manager = manager.AuthManager()
+ self.admin_user = self.auth_manager.create_user('admin', admin=True)
+ self.admin_project = self.auth_manager.create_project('admin',
+ self.admin_user)
+
+ shutil.rmtree(FLAGS.buckets_path)
+ os.mkdir(FLAGS.buckets_path)
+
+ router = s3server.S3Application(FLAGS.buckets_path)
+ server = wsgi.Server()
+ server.start(router, FLAGS.s3_port, host=FLAGS.s3_host)
+
+ if not boto.config.has_section('Boto'):
+ boto.config.add_section('Boto')
+ boto.config.set('Boto', 'num_retries', '0')
+ conn = s3.S3Connection(aws_access_key_id=self.admin_user.access,
+ aws_secret_access_key=self.admin_user.secret,
+ host=FLAGS.s3_host,
+ port=FLAGS.s3_port,
+ is_secure=False,
+ calling_format=s3.OrdinaryCallingFormat())
+ self.conn = conn
+
+ def get_http_connection(host, is_secure):
+ """Get a new S3 connection, don't attempt to reuse connections."""
+ return self.conn.new_http_connection(host, is_secure)
+
+ self.conn.get_http_connection = get_http_connection
+
+ def _ensure_no_buckets(self, buckets): # pylint: disable=C0111
+ self.assertEquals(len(buckets), 0, "Bucket list was not empty")
+ return True
+
+ def _ensure_one_bucket(self, buckets, name): # pylint: disable=C0111
+ self.assertEquals(len(buckets), 1,
+ "Bucket list didn't have exactly one element in it")
+ self.assertEquals(buckets[0].name, name, "Wrong name")
+ return True
+
+ def test_000_list_buckets(self):
+ """Make sure we are starting with no buckets."""
+ self._ensure_no_buckets(self.conn.get_all_buckets())
+
+ def test_001_create_and_delete_bucket(self):
+ """Test bucket creation and deletion."""
+ bucket_name = 'testbucket'
+
+ self.conn.create_bucket(bucket_name)
+ self._ensure_one_bucket(self.conn.get_all_buckets(), bucket_name)
+ self.conn.delete_bucket(bucket_name)
+ self._ensure_no_buckets(self.conn.get_all_buckets())
+
+ def test_002_create_bucket_and_key_and_delete_key_again(self):
+ """Test key operations on buckets."""
+ bucket_name = 'testbucket'
+ key_name = 'somekey'
+ key_contents = 'somekey'
+
+ b = self.conn.create_bucket(bucket_name)
+ k = b.new_key(key_name)
+ k.set_contents_from_string(key_contents)
+
+ bucket = self.conn.get_bucket(bucket_name)
+
+ # make sure the contents are correct
+ key = bucket.get_key(key_name)
+ self.assertEquals(key.get_contents_as_string(), key_contents,
+ "Bad contents")
+
+ # delete the key
+ key.delete()
+
+ self._ensure_no_buckets(bucket.get_all_keys())
+
+ def test_unknown_bucket(self):
+ bucket_name = 'falalala'
+ self.assertRaises(boto_exception.S3ResponseError,
+ self.conn.get_bucket,
+ bucket_name)
+
+ def tearDown(self):
+ """Tear down auth and test server."""
+ self.auth_manager.delete_user('admin')
+ self.auth_manager.delete_project('admin')
+ super(S3APITestCase, self).tearDown()
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index 4820e04fb..44d7c91eb 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -36,7 +36,7 @@ class RpcTestCase(test.TestCase):
super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
- self.consumer = rpc.AdapterConsumer(connection=self.conn,
+ self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
self.consumer.attach_to_eventlet()
@@ -97,7 +97,7 @@ class RpcTestCase(test.TestCase):
nested = Nested()
conn = rpc.Connection.instance(True)
- consumer = rpc.AdapterConsumer(connection=conn,
+ consumer = rpc.TopicAdapterConsumer(connection=conn,
topic='nested',
proxy=nested)
consumer.attach_to_eventlet()
diff --git a/nova/tests/test_scheduler.py b/nova/tests/test_scheduler.py
index 244e43bd9..6df74dd61 100644
--- a/nova/tests/test_scheduler.py
+++ b/nova/tests/test_scheduler.py
@@ -21,6 +21,9 @@ Tests For Scheduler
import datetime
import mox
+import novaclient.exceptions
+import stubout
+import webob
from mox import IgnoreArg
from nova import context
@@ -32,6 +35,7 @@ from nova import test
from nova import rpc
from nova import utils
from nova.auth import manager as auth_manager
+from nova.scheduler import api
from nova.scheduler import manager
from nova.scheduler import driver
from nova.compute import power_state
@@ -937,3 +941,160 @@ class SimpleDriverTestCase(test.TestCase):
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
db.service_destroy(self.context, s_ref2['id'])
+
+
+class FakeZone(object):
+ def __init__(self, api_url, username, password):
+ self.api_url = api_url
+ self.username = username
+ self.password = password
+
+
+def zone_get_all(context):
+ return [
+ FakeZone('http://example.com', 'bob', 'xxx'),
+ ]
+
+
+class FakeRerouteCompute(api.reroute_compute):
+ def _call_child_zones(self, zones, function):
+ return []
+
+ def get_collection_context_and_id(self, args, kwargs):
+ return ("servers", None, 1)
+
+ def unmarshall_result(self, zone_responses):
+ return dict(magic="found me")
+
+
+def go_boom(self, context, instance):
+ raise exception.InstanceNotFound("boom message", instance)
+
+
+def found_instance(self, context, instance):
+ return dict(name='myserver')
+
+
+class FakeResource(object):
+ def __init__(self, attribute_dict):
+ for k, v in attribute_dict.iteritems():
+ setattr(self, k, v)
+
+ def pause(self):
+ pass
+
+
+class ZoneRedirectTest(test.TestCase):
+ def setUp(self):
+ super(ZoneRedirectTest, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+
+ self.stubs.Set(db, 'zone_get_all', zone_get_all)
+
+ self.enable_zone_routing = FLAGS.enable_zone_routing
+ FLAGS.enable_zone_routing = True
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ FLAGS.enable_zone_routing = self.enable_zone_routing
+ super(ZoneRedirectTest, self).tearDown()
+
+ def test_trap_found_locally(self):
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(found_instance)(None, None, 1)
+ except api.RedirectResult, e:
+ self.fail(_("Successful database hit should succeed"))
+
+ def test_trap_not_found_locally(self):
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(go_boom)(None, None, 1)
+ self.assertFail(_("Should have rerouted."))
+ except api.RedirectResult, e:
+ self.assertEquals(e.results['magic'], 'found me')
+
+ def test_routing_flags(self):
+ FLAGS.enable_zone_routing = False
+ decorator = FakeRerouteCompute("foo")
+ try:
+ result = decorator(go_boom)(None, None, 1)
+ self.assertFail(_("Should have thrown exception."))
+ except exception.InstanceNotFound, e:
+ self.assertEquals(e.message, 'boom message')
+
+ def test_get_collection_context_and_id(self):
+ decorator = api.reroute_compute("foo")
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None, 10, 20), {}), ("servers", 10, 20))
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None, 11,), dict(instance_id=21)), ("servers", 11, 21))
+ self.assertEquals(decorator.get_collection_context_and_id(
+ (None,), dict(context=12, instance_id=22)), ("servers", 12, 22))
+
+ def test_unmarshal_single_server(self):
+ decorator = api.reroute_compute("foo")
+ self.assertEquals(decorator.unmarshall_result([]), {})
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, b=2)), ]),
+ dict(server=dict(a=1, b=2)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, _b=2)), ]),
+ dict(server=dict(a=1,)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(a=1, manager=2)), ]),
+ dict(server=dict(a=1,)))
+ self.assertEquals(decorator.unmarshall_result(
+ [FakeResource(dict(_a=1, manager=2)), ]),
+ dict(server={}))
+
+
+class FakeServerCollection(object):
+ def get(self, instance_id):
+ return FakeResource(dict(a=10, b=20))
+
+ def find(self, name):
+ return FakeResource(dict(a=11, b=22))
+
+
+class FakeEmptyServerCollection(object):
+ def get(self, f):
+ raise novaclient.NotFound(1)
+
+ def find(self, name):
+ raise novaclient.NotFound(2)
+
+
+class FakeNovaClient(object):
+ def __init__(self, collection):
+ self.servers = collection
+
+
+class DynamicNovaClientTest(test.TestCase):
+ def test_issue_novaclient_command_found(self):
+ zone = FakeZone('http://example.com', 'bob', 'xxx')
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "get", 100).a, 10)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "find", "name").b, 22)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeServerCollection()),
+ zone, "servers", "pause", 100), None)
+
+ def test_issue_novaclient_command_not_found(self):
+ zone = FakeZone('http://example.com', 'bob', 'xxx')
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "get", 100), None)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "find", "name"), None)
+
+ self.assertEquals(api._issue_novaclient_command(
+ FakeNovaClient(FakeEmptyServerCollection()),
+ zone, "servers", "any", "name"), None)
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index 393f9d20b..d48de2057 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -109,20 +109,29 @@ class ServiceTestCase(test.TestCase):
app = service.Service.create(host=host, binary=binary)
self.mox.StubOutWithMock(rpc,
- 'AdapterConsumer',
+ 'TopicAdapterConsumer',
use_mock_anything=True)
- rpc.AdapterConsumer(connection=mox.IgnoreArg(),
+ self.mox.StubOutWithMock(rpc,
+ 'FanoutAdapterConsumer',
+ use_mock_anything=True)
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
- rpc.AdapterConsumer)
+ rpc.TopicAdapterConsumer)
- rpc.AdapterConsumer(connection=mox.IgnoreArg(),
+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic='%s.%s' % (topic, host),
proxy=mox.IsA(service.Service)).AndReturn(
- rpc.AdapterConsumer)
+ rpc.TopicAdapterConsumer)
+
+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
+ topic=topic,
+ proxy=mox.IsA(service.Service)).AndReturn(
+ rpc.FanoutAdapterConsumer)
- rpc.AdapterConsumer.attach_to_eventlet()
- rpc.AdapterConsumer.attach_to_eventlet()
+ rpc.TopicAdapterConsumer.attach_to_eventlet()
+ rpc.TopicAdapterConsumer.attach_to_eventlet()
+ rpc.FanoutAdapterConsumer.attach_to_eventlet()
service_create = {'host': host,
'binary': binary,
@@ -279,6 +288,7 @@ class ServiceTestCase(test.TestCase):
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
service.rpc.Connection.instance(new=mox.IgnoreArg())
+ service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(serv.manager.driver,
'update_available_resource')
serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py
index e237674e6..35c838065 100644
--- a/nova/tests/test_test.py
+++ b/nova/tests/test_test.py
@@ -34,7 +34,7 @@ class IsolationTestCase(test.TestCase):
def test_rpc_consumer_isolation(self):
connection = rpc.Connection.instance(new=True)
- consumer = rpc.TopicConsumer(connection, topic='compute')
+ consumer = rpc.TopicAdapterConsumer(connection, topic='compute')
consumer.register_callback(
lambda x, y: self.fail('I should never be called'))
consumer.attach_to_eventlet()
diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py
index b214f5ce7..958c8e3e2 100644
--- a/nova/tests/test_virt.py
+++ b/nova/tests/test_virt.py
@@ -77,13 +77,11 @@ class CacheConcurrencyTestCase(test.TestCase):
eventlet.sleep(0)
try:
self.assertFalse(done2.ready())
- self.assertTrue('fname' in conn._image_sems)
finally:
wait1.send()
done1.wait()
eventlet.sleep(0)
self.assertTrue(done2.ready())
- self.assertFalse('fname' in conn._image_sems)
def test_different_fname_concurrency(self):
"""Ensures that two different fname caches are concurrent"""
@@ -227,6 +225,49 @@ class LibvirtConnTestCase(test.TestCase):
self._check_xml_and_uri(instance_data, expect_kernel=True,
expect_ramdisk=True, rescue=True)
+ def test_lxc_container_and_uri(self):
+ instance_data = dict(self.test_instance)
+ self._check_xml_and_container(instance_data)
+
+ def _check_xml_and_container(self, instance):
+ user_context = context.RequestContext(project=self.project,
+ user=self.user)
+ instance_ref = db.instance_create(user_context, instance)
+ host = self.network.get_network_host(user_context.elevated())
+ network_ref = db.project_get_network(context.get_admin_context(),
+ self.project.id)
+
+ fixed_ip = {'address': self.test_ip,
+ 'network_id': network_ref['id']}
+
+ ctxt = context.get_admin_context()
+ fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
+ db.fixed_ip_update(ctxt, self.test_ip,
+ {'allocated': True,
+ 'instance_id': instance_ref['id']})
+
+ self.flags(libvirt_type='lxc')
+ conn = libvirt_conn.LibvirtConnection(True)
+
+ uri = conn.get_uri()
+ self.assertEquals(uri, 'lxc:///')
+
+ xml = conn.to_xml(instance_ref)
+ tree = xml_to_tree(xml)
+
+ check = [
+ (lambda t: t.find('.').get('type'), 'lxc'),
+ (lambda t: t.find('./os/type').text, 'exe'),
+ (lambda t: t.find('./devices/filesystem/target').get('dir'), '/')]
+
+ for i, (check, expected_result) in enumerate(check):
+ self.assertEqual(check(tree),
+ expected_result,
+ '%s failed common check %d' % (xml, i))
+
+ target = tree.find('./devices/filesystem/source').get('dir')
+ self.assertTrue(len(target) > 0)
+
def _check_xml_and_uri(self, instance, expect_ramdisk, expect_kernel,
rescue=False):
user_context = context.RequestContext(project=self.project,
@@ -429,6 +470,15 @@ class LibvirtConnTestCase(test.TestCase):
def fake_raise(self):
raise libvirt.libvirtError('ERR')
+ class FakeTime(object):
+ def __init__(self):
+ self.counter = 0
+
+ def sleep(self, t):
+ self.counter += t
+
+ fake_timer = FakeTime()
+
self.create_fake_libvirt_mock(nwfilterLookupByName=fake_raise)
instance_ref = db.instance_create(self.context, self.test_instance)
@@ -438,11 +488,15 @@ class LibvirtConnTestCase(test.TestCase):
conn = libvirt_conn.LibvirtConnection(False)
conn.firewall_driver.setattr('setup_basic_filtering', fake_none)
conn.firewall_driver.setattr('prepare_instance_filter', fake_none)
- conn.ensure_filtering_rules_for_instance(instance_ref)
+ conn.ensure_filtering_rules_for_instance(instance_ref,
+ time=fake_timer)
except exception.Error, e:
c1 = (0 <= e.message.find('Timeout migrating for'))
self.assertTrue(c1)
+ self.assertEqual(29, fake_timer.counter, "Didn't wait the expected "
+ "amount of time")
+
db.instance_destroy(self.context, instance_ref['id'])
def test_live_migration_raises_exception(self):
@@ -785,7 +839,8 @@ class NWFilterTestCase(test.TestCase):
instance_ref = db.instance_create(self.context,
{'user_id': 'fake',
- 'project_id': 'fake'})
+ 'project_id': 'fake',
+ 'mac_address': '00:A0:C9:14:C8:29'})
inst_id = instance_ref['id']
ip = '10.11.12.13'
@@ -802,7 +857,8 @@ class NWFilterTestCase(test.TestCase):
'instance_id': instance_ref['id']})
def _ensure_all_called():
- instance_filter = 'nova-instance-%s' % instance_ref['name']
+ instance_filter = 'nova-instance-%s-%s' % (instance_ref['name'],
+ '00A0C914C829')
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
'no-arp-spoofing', 'no-ip-spoofing',
diff --git a/nova/tests/test_vlan_network.py b/nova/tests/test_vlan_network.py
new file mode 100644
index 000000000..063b81832
--- /dev/null
+++ b/nova/tests/test_vlan_network.py
@@ -0,0 +1,242 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Unit Tests for vlan network code
+"""
+import IPy
+import os
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova import test
+from nova import utils
+from nova.auth import manager
+from nova.tests.network import base
+from nova.tests.network import binpath,\
+ lease_ip, release_ip
+
+FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.network')
+
+
+class VlanNetworkTestCase(base.NetworkTestCase):
+ """Test cases for network code"""
+ def test_public_network_association(self):
+ """Makes sure that we can allocaate a public ip"""
+ # TODO(vish): better way of adding floating ips
+ self.context._project = self.projects[0]
+ self.context.project_id = self.projects[0].id
+ pubnet = IPy.IP(flags.FLAGS.floating_range)
+ address = str(pubnet[0])
+ try:
+ db.floating_ip_get_by_address(context.get_admin_context(), address)
+ except exception.NotFound:
+ db.floating_ip_create(context.get_admin_context(),
+ {'address': address,
+ 'host': FLAGS.host})
+ float_addr = self.network.allocate_floating_ip(self.context,
+ self.projects[0].id)
+ fix_addr = self._create_address(0)
+ lease_ip(fix_addr)
+ self.assertEqual(float_addr, str(pubnet[0]))
+ self.network.associate_floating_ip(self.context, float_addr, fix_addr)
+ address = db.instance_get_floating_address(context.get_admin_context(),
+ self.instance_id)
+ self.assertEqual(address, float_addr)
+ self.network.disassociate_floating_ip(self.context, float_addr)
+ address = db.instance_get_floating_address(context.get_admin_context(),
+ self.instance_id)
+ self.assertEqual(address, None)
+ self.network.deallocate_floating_ip(self.context, float_addr)
+ self.network.deallocate_fixed_ip(self.context, fix_addr)
+ release_ip(fix_addr)
+ db.floating_ip_destroy(context.get_admin_context(), float_addr)
+
+ def test_allocate_deallocate_fixed_ip(self):
+ """Makes sure that we can allocate and deallocate a fixed ip"""
+ address = self._create_address(0)
+ self.assertTrue(self._is_allocated_in_project(address,
+ self.projects[0].id))
+ lease_ip(address)
+ self._deallocate_address(0, address)
+
+ # Doesn't go away until it's dhcp released
+ self.assertTrue(self._is_allocated_in_project(address,
+ self.projects[0].id))
+
+ release_ip(address)
+ self.assertFalse(self._is_allocated_in_project(address,
+ self.projects[0].id))
+
+ def test_side_effects(self):
+ """Ensures allocating and releasing has no side effects"""
+ address = self._create_address(0)
+ address2 = self._create_address(1, self.instance2_id)
+
+ self.assertTrue(self._is_allocated_in_project(address,
+ self.projects[0].id))
+ self.assertTrue(self._is_allocated_in_project(address2,
+ self.projects[1].id))
+ self.assertFalse(self._is_allocated_in_project(address,
+ self.projects[1].id))
+
+ # Addresses are allocated before they're issued
+ lease_ip(address)
+ lease_ip(address2)
+
+ self._deallocate_address(0, address)
+ release_ip(address)
+ self.assertFalse(self._is_allocated_in_project(address,
+ self.projects[0].id))
+
+ # First address release shouldn't affect the second
+ self.assertTrue(self._is_allocated_in_project(address2,
+ self.projects[1].id))
+
+ self._deallocate_address(1, address2)
+ release_ip(address2)
+ self.assertFalse(self._is_allocated_in_project(address2,
+ self.projects[1].id))
+
+ def test_subnet_edge(self):
+ """Makes sure that private ips don't overlap"""
+ first = self._create_address(0)
+ lease_ip(first)
+ instance_ids = []
+ for i in range(1, FLAGS.num_networks):
+ instance_ref = self._create_instance(i, mac=utils.generate_mac())
+ instance_ids.append(instance_ref['id'])
+ address = self._create_address(i, instance_ref['id'])
+ instance_ref = self._create_instance(i, mac=utils.generate_mac())
+ instance_ids.append(instance_ref['id'])
+ address2 = self._create_address(i, instance_ref['id'])
+ instance_ref = self._create_instance(i, mac=utils.generate_mac())
+ instance_ids.append(instance_ref['id'])
+ address3 = self._create_address(i, instance_ref['id'])
+ lease_ip(address)
+ lease_ip(address2)
+ lease_ip(address3)
+ self.context._project = self.projects[i]
+ self.context.project_id = self.projects[i].id
+ self.assertFalse(self._is_allocated_in_project(address,
+ self.projects[0].id))
+ self.assertFalse(self._is_allocated_in_project(address2,
+ self.projects[0].id))
+ self.assertFalse(self._is_allocated_in_project(address3,
+ self.projects[0].id))
+ self.network.deallocate_fixed_ip(self.context, address)
+ self.network.deallocate_fixed_ip(self.context, address2)
+ self.network.deallocate_fixed_ip(self.context, address3)
+ release_ip(address)
+ release_ip(address2)
+ release_ip(address3)
+ for instance_id in instance_ids:
+ db.instance_destroy(context.get_admin_context(), instance_id)
+ self.context._project = self.projects[0]
+ self.context.project_id = self.projects[0].id
+ self.network.deallocate_fixed_ip(self.context, first)
+ self._deallocate_address(0, first)
+ release_ip(first)
+
+ def test_vpn_ip_and_port_looks_valid(self):
+ """Ensure the vpn ip and port are reasonable"""
+ self.assert_(self.projects[0].vpn_ip)
+ self.assert_(self.projects[0].vpn_port >= FLAGS.vpn_start)
+ self.assert_(self.projects[0].vpn_port <= FLAGS.vpn_start +
+ FLAGS.num_networks)
+
+ def test_too_many_networks(self):
+ """Ensure error is raised if we run out of networks"""
+ projects = []
+ networks_left = (FLAGS.num_networks -
+ db.network_count(context.get_admin_context()))
+ for i in range(networks_left):
+ project = self.manager.create_project('many%s' % i, self.user)
+ projects.append(project)
+ db.project_get_network(context.get_admin_context(), project.id)
+ project = self.manager.create_project('last', self.user)
+ projects.append(project)
+ self.assertRaises(db.NoMoreNetworks,
+ db.project_get_network,
+ context.get_admin_context(),
+ project.id)
+ for project in projects:
+ self.manager.delete_project(project)
+
+ def test_ips_are_reused(self):
+ """Makes sure that ip addresses that are deallocated get reused"""
+ address = self._create_address(0)
+ lease_ip(address)
+ self.network.deallocate_fixed_ip(self.context, address)
+ release_ip(address)
+
+ address2 = self._create_address(0)
+ self.assertEqual(address, address2)
+ lease_ip(address)
+ self.network.deallocate_fixed_ip(self.context, address2)
+ release_ip(address)
+
+ def test_too_many_addresses(self):
+ """Test for a NoMoreAddresses exception when all fixed ips are used.
+ """
+ admin_context = context.get_admin_context()
+ network = db.project_get_network(admin_context, self.projects[0].id)
+ num_available_ips = db.network_count_available_ips(admin_context,
+ network['id'])
+ addresses = []
+ instance_ids = []
+ for i in range(num_available_ips):
+ instance_ref = self._create_instance(0)
+ instance_ids.append(instance_ref['id'])
+ address = self._create_address(0, instance_ref['id'])
+ addresses.append(address)
+ lease_ip(address)
+
+ ip_count = db.network_count_available_ips(context.get_admin_context(),
+ network['id'])
+ self.assertEqual(ip_count, 0)
+ self.assertRaises(db.NoMoreAddresses,
+ self.network.allocate_fixed_ip,
+ self.context,
+ 'foo')
+
+ for i in range(num_available_ips):
+ self.network.deallocate_fixed_ip(self.context, addresses[i])
+ release_ip(addresses[i])
+ db.instance_destroy(context.get_admin_context(), instance_ids[i])
+ ip_count = db.network_count_available_ips(context.get_admin_context(),
+ network['id'])
+ self.assertEqual(ip_count, num_available_ips)
+
+ def _is_allocated_in_project(self, address, project_id):
+ """Returns true if address is in specified project"""
+ project_net = db.project_get_network(context.get_admin_context(),
+ project_id)
+ network = db.fixed_ip_get_network(context.get_admin_context(),
+ address)
+ instance = db.fixed_ip_get_instance(context.get_admin_context(),
+ address)
+ # instance exists until release
+ return instance is not None and network['id'] == project_net['id']
+
+ def run(self, result=None):
+ if(FLAGS.network_manager == 'nova.network.manager.VlanManager'):
+ super(VlanNetworkTestCase, self).run(result)
diff --git a/nova/tests/test_vmwareapi.py b/nova/tests/test_vmwareapi.py
new file mode 100644
index 000000000..22b66010a
--- /dev/null
+++ b/nova/tests/test_vmwareapi.py
@@ -0,0 +1,252 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Test suite for VMWareAPI.
+"""
+
+import stubout
+
+from nova import context
+from nova import db
+from nova import flags
+from nova import test
+from nova import utils
+from nova.auth import manager
+from nova.compute import power_state
+from nova.tests.glance import stubs as glance_stubs
+from nova.tests.vmwareapi import db_fakes
+from nova.tests.vmwareapi import stubs
+from nova.virt import vmwareapi_conn
+from nova.virt.vmwareapi import fake as vmwareapi_fake
+
+
+FLAGS = flags.FLAGS
+
+
+class VMWareAPIVMTestCase(test.TestCase):
+ """Unit tests for Vmware API connection calls."""
+
+ def setUp(self):
+ super(VMWareAPIVMTestCase, self).setUp()
+ self.flags(vmwareapi_host_ip='test_url',
+ vmwareapi_host_username='test_username',
+ vmwareapi_host_password='test_pass')
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake',
+ admin=True)
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.stubs = stubout.StubOutForTesting()
+ vmwareapi_fake.reset()
+ db_fakes.stub_out_db_instance_api(self.stubs)
+ stubs.set_stubs(self.stubs)
+ glance_stubs.stubout_glance_client(self.stubs,
+ glance_stubs.FakeGlance)
+ self.conn = vmwareapi_conn.get_connection(False)
+
+ def _create_instance_in_the_db(self):
+ values = {'name': 1,
+ 'id': 1,
+ 'project_id': self.project.id,
+ 'user_id': self.user.id,
+ 'image_id': "1",
+ 'kernel_id': "1",
+ 'ramdisk_id': "1",
+ 'instance_type': 'm1.large',
+ 'mac_address': 'aa:bb:cc:dd:ee:ff',
+ }
+ self.instance = db.instance_create(values)
+
+ def _create_vm(self):
+ """Create and spawn the VM."""
+ self._create_instance_in_the_db()
+ self.type_data = db.instance_type_get_by_name(None, 'm1.large')
+ self.conn.spawn(self.instance)
+ self._check_vm_record()
+
+ def _check_vm_record(self):
+ """
+ Check if the spawned VM's properties correspond to the instance in
+ the db.
+ """
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 1)
+
+ # Get Nova record for VM
+ vm_info = self.conn.get_info(1)
+
+ # Get record for VM
+ vms = vmwareapi_fake._get_objects("VirtualMachine")
+ vm = vms[0]
+
+ # Check that m1.large above turned into the right thing.
+ mem_kib = long(self.type_data['memory_mb']) << 10
+ vcpus = self.type_data['vcpus']
+ self.assertEquals(vm_info['max_mem'], mem_kib)
+ self.assertEquals(vm_info['mem'], mem_kib)
+ self.assertEquals(vm.get("summary.config.numCpu"), vcpus)
+ self.assertEquals(vm.get("summary.config.memorySizeMB"),
+ self.type_data['memory_mb'])
+
+ # Check that the VM is running according to Nova
+ self.assertEquals(vm_info['state'], power_state.RUNNING)
+
+ # Check that the VM is running according to vSphere API.
+ self.assertEquals(vm.get("runtime.powerState"), 'poweredOn')
+
+ def _check_vm_info(self, info, pwr_state=power_state.RUNNING):
+ """
+ Check if the get_info returned values correspond to the instance
+ object in the db.
+ """
+ mem_kib = long(self.type_data['memory_mb']) << 10
+ self.assertEquals(info["state"], pwr_state)
+ self.assertEquals(info["max_mem"], mem_kib)
+ self.assertEquals(info["mem"], mem_kib)
+ self.assertEquals(info["num_cpu"], self.type_data['vcpus'])
+
+ def test_list_instances(self):
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 0)
+
+ def test_list_instances_1(self):
+ self._create_vm()
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 1)
+
+ def test_spawn(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_snapshot(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.snapshot(self.instance, "Test-Snapshot")
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_snapshot_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertRaises(Exception, self.conn.snapshot, self.instance,
+ "Test-Snapshot")
+
+ def test_reboot(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.reboot(self.instance)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_reboot_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertRaises(Exception, self.conn.reboot, self.instance)
+
+ def test_reboot_not_poweredon(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.suspend(self.instance, self.dummy_callback_handler)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.PAUSED)
+ self.assertRaises(Exception, self.conn.reboot, self.instance)
+
+ def test_suspend(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.suspend(self.instance, self.dummy_callback_handler)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.PAUSED)
+
+ def test_suspend_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertRaises(Exception, self.conn.suspend, self.instance,
+ self.dummy_callback_handler)
+
+ def test_resume(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.conn.suspend(self.instance, self.dummy_callback_handler)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.PAUSED)
+ self.conn.resume(self.instance, self.dummy_callback_handler)
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_resume_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertRaises(Exception, self.conn.resume, self.instance,
+ self.dummy_callback_handler)
+
+ def test_resume_not_suspended(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ self.assertRaises(Exception, self.conn.resume, self.instance,
+ self.dummy_callback_handler)
+
+ def test_get_info(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+
+ def test_destroy(self):
+ self._create_vm()
+ info = self.conn.get_info(1)
+ self._check_vm_info(info, power_state.RUNNING)
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 1)
+ self.conn.destroy(self.instance)
+ instances = self.conn.list_instances()
+ self.assertEquals(len(instances), 0)
+
+ def test_destroy_non_existent(self):
+ self._create_instance_in_the_db()
+ self.assertEquals(self.conn.destroy(self.instance), None)
+
+ def test_pause(self):
+ pass
+
+ def test_unpause(self):
+ pass
+
+ def test_diagnostics(self):
+ pass
+
+ def test_get_console_output(self):
+ pass
+
+ def test_get_ajax_console(self):
+ pass
+
+ def dummy_callback_handler(self, ret):
+ """
+ Dummy callback function to be passed to suspend, resume, etc., calls.
+ """
+ pass
+
+ def tearDown(self):
+ super(VMWareAPIVMTestCase, self).tearDown()
+ vmwareapi_fake.cleanup()
+ self.manager.delete_project(self.project)
+ self.manager.delete_user(self.user)
+ self.stubs.UnsetAll()
diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py
index 5d68ca2ae..d71b75f3f 100644
--- a/nova/tests/test_volume.py
+++ b/nova/tests/test_volume.py
@@ -356,8 +356,8 @@ class ISCSITestCase(DriverTestCase):
tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0])
self.mox.StubOutWithMock(self.volume.driver, '_execute')
self.volume.driver._execute("sudo", "ietadm", "--op", "show",
- "--tid=%(tid)d" % locals()
- ).AndRaise(exception.ProcessExecutionError())
+ "--tid=%(tid)d" % locals()).AndRaise(
+ exception.ProcessExecutionError())
self.mox.ReplayAll()
self.assertRaises(exception.ProcessExecutionError,
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 66a973a78..17e3f55e9 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -14,16 +14,18 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Test suite for XenAPI
-"""
+"""Test suite for XenAPI."""
import functools
+import os
+import re
import stubout
+import ast
from nova import db
from nova import context
from nova import flags
+from nova import log as logging
from nova import test
from nova import utils
from nova.auth import manager
@@ -38,6 +40,9 @@ from nova.virt.xenapi.vmops import VMOps
from nova.tests.db import fakes as db_fakes
from nova.tests.xenapi import stubs
from nova.tests.glance import stubs as glance_stubs
+from nova.tests import fake_utils
+
+LOG = logging.getLogger('nova.tests.test_xenapi')
FLAGS = flags.FLAGS
@@ -58,19 +63,18 @@ def stub_vm_utils_with_vdi_attached_here(function, should_return=True):
class XenAPIVolumeTestCase(test.TestCase):
- """
- Unit tests for Volume operations
- """
+ """Unit tests for Volume operations."""
def setUp(self):
super(XenAPIVolumeTestCase, self).setUp()
self.stubs = stubout.StubOutForTesting()
+ self.context = context.RequestContext('fake', 'fake', False)
FLAGS.target_host = '127.0.0.1'
FLAGS.xenapi_connection_url = 'test_url'
FLAGS.xenapi_connection_password = 'test_pass'
db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset()
- self.values = {'name': 1, 'id': 1,
+ self.values = {'id': 1,
'project_id': 'fake',
'user_id': 'fake',
'image_id': 1,
@@ -90,10 +94,10 @@ class XenAPIVolumeTestCase(test.TestCase):
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
- return db.volume_create(context.get_admin_context(), vol)
+ return db.volume_create(self.context, vol)
def test_create_iscsi_storage(self):
- """ This shows how to test helper classes' methods """
+ """This shows how to test helper classes' methods."""
stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass')
helper = volume_utils.VolumeHelper
@@ -108,7 +112,7 @@ class XenAPIVolumeTestCase(test.TestCase):
db.volume_destroy(context.get_admin_context(), vol['id'])
def test_parse_volume_info_raise_exception(self):
- """ This shows how to test helper classes' methods """
+ """This shows how to test helper classes' methods."""
stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass')
helper = volume_utils.VolumeHelper
@@ -122,11 +126,11 @@ class XenAPIVolumeTestCase(test.TestCase):
db.volume_destroy(context.get_admin_context(), vol['id'])
def test_attach_volume(self):
- """ This shows how to test Ops classes' methods """
+ """This shows how to test Ops classes' methods."""
stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
conn = xenapi_conn.get_connection(False)
volume = self._create_volume()
- instance = db.instance_create(self.values)
+ instance = db.instance_create(self.context, self.values)
vm = xenapi_fake.create_vm(instance.name, 'Running')
result = conn.attach_volume(instance.name, volume['id'], '/dev/sdc')
@@ -141,12 +145,12 @@ class XenAPIVolumeTestCase(test.TestCase):
check()
def test_attach_volume_raise_exception(self):
- """ This shows how to test when exceptions are raised """
+ """This shows how to test when exceptions are raised."""
stubs.stubout_session(self.stubs,
stubs.FakeSessionForVolumeFailedTests)
conn = xenapi_conn.get_connection(False)
volume = self._create_volume()
- instance = db.instance_create(self.values)
+ instance = db.instance_create(self.context, self.values)
xenapi_fake.create_vm(instance.name, 'Running')
self.assertRaises(Exception,
conn.attach_volume,
@@ -164,9 +168,7 @@ def reset_network(*args):
class XenAPIVMTestCase(test.TestCase):
- """
- Unit tests for VM operations
- """
+ """Unit tests for VM operations."""
def setUp(self):
super(XenAPIVMTestCase, self).setUp()
self.manager = manager.AuthManager()
@@ -175,10 +177,12 @@ class XenAPIVMTestCase(test.TestCase):
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.network = utils.import_object(FLAGS.network_manager)
self.stubs = stubout.StubOutForTesting()
- FLAGS.xenapi_connection_url = 'test_url'
- FLAGS.xenapi_connection_password = 'test_pass'
+ self.flags(xenapi_connection_url='test_url',
+ xenapi_connection_password='test_pass',
+ instance_name_template='%d')
xenapi_fake.reset()
xenapi_fake.create_local_srs()
+ xenapi_fake.create_local_pifs()
db_fakes.stub_out_db_instance_api(self.stubs)
xenapi_fake.create_network('fake', FLAGS.flat_network_bridge)
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
@@ -186,8 +190,11 @@ class XenAPIVMTestCase(test.TestCase):
stubs.stubout_stream_disk(self.stubs)
stubs.stubout_is_vdi_pv(self.stubs)
self.stubs.Set(VMOps, 'reset_network', reset_network)
+ stubs.stub_out_vm_methods(self.stubs)
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
+ fake_utils.stub_out_utils_execute(self.stubs)
+ self.context = context.RequestContext('fake', 'fake', False)
self.conn = xenapi_conn.get_connection(False)
def test_list_instances_0(self):
@@ -212,7 +219,7 @@ class XenAPIVMTestCase(test.TestCase):
if not vm_rec["is_control_domain"]:
vm_labels.append(vm_rec["name_label"])
- self.assertEquals(vm_labels, [1])
+ self.assertEquals(vm_labels, ['1'])
def ensure_vbd_was_torn_down():
vbd_labels = []
@@ -220,7 +227,7 @@ class XenAPIVMTestCase(test.TestCase):
vbd_rec = xenapi_fake.get_record('VBD', vbd_ref)
vbd_labels.append(vbd_rec["vm_name_label"])
- self.assertEquals(vbd_labels, [1])
+ self.assertEquals(vbd_labels, ['1'])
def ensure_vdi_was_torn_down():
for vdi_ref in xenapi_fake.get_all('VDI'):
@@ -235,13 +242,12 @@ class XenAPIVMTestCase(test.TestCase):
check()
- def create_vm_record(self, conn, os_type):
+ def create_vm_record(self, conn, os_type, instance_id=1):
instances = conn.list_instances()
- self.assertEquals(instances, [1])
+ self.assertEquals(instances, [str(instance_id)])
# Get Nova record for VM
- vm_info = conn.get_info(1)
-
+ vm_info = conn.get_info(instance_id)
# Get XenAPI record for VM
vms = [rec for ref, rec
in xenapi_fake.get_all_records('VM').iteritems()
@@ -250,7 +256,7 @@ class XenAPIVMTestCase(test.TestCase):
self.vm_info = vm_info
self.vm = vm
- def check_vm_record(self, conn):
+ def check_vm_record(self, conn, check_injection=False):
# Check that m1.large above turned into the right thing.
instance_type = db.instance_type_get_by_name(conn, 'm1.large')
mem_kib = long(instance_type['memory_mb']) << 10
@@ -270,6 +276,25 @@ class XenAPIVMTestCase(test.TestCase):
# Check that the VM is running according to XenAPI.
self.assertEquals(self.vm['power_state'], 'Running')
+ if check_injection:
+ xenstore_data = self.vm['xenstore_data']
+ key = 'vm-data/networking/aabbccddeeff'
+ xenstore_value = xenstore_data[key]
+ tcpip_data = ast.literal_eval(xenstore_value)
+ self.assertEquals(tcpip_data,
+ {'label': 'fake_flat_network',
+ 'broadcast': '10.0.0.255',
+ 'ips': [{'ip': '10.0.0.3',
+ 'netmask':'255.255.255.0',
+ 'enabled':'1'}],
+ 'ip6s': [{'ip': 'fe80::a8bb:ccff:fedd:eeff',
+ 'netmask': '120',
+ 'enabled': '1',
+ 'gateway': 'fe80::a00:1'}],
+ 'mac': 'aa:bb:cc:dd:ee:ff',
+ 'dns': ['10.0.0.2'],
+ 'gateway': '10.0.0.1'})
+
def check_vm_params_for_windows(self):
self.assertEquals(self.vm['platform']['nx'], 'true')
self.assertEquals(self.vm['HVM_boot_params'], {'order': 'dc'})
@@ -303,10 +328,10 @@ class XenAPIVMTestCase(test.TestCase):
self.assertEquals(self.vm['HVM_boot_policy'], '')
def _test_spawn(self, image_id, kernel_id, ramdisk_id,
- instance_type="m1.large", os_type="linux"):
- stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
- values = {'name': 1,
- 'id': 1,
+ instance_type="m1.large", os_type="linux",
+ instance_id=1, check_injection=False):
+ stubs.stubout_loopingcall_start(self.stubs)
+ values = {'id': instance_id,
'project_id': self.project.id,
'user_id': self.user.id,
'image_id': image_id,
@@ -315,12 +340,10 @@ class XenAPIVMTestCase(test.TestCase):
'instance_type': instance_type,
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': os_type}
-
- conn = xenapi_conn.get_connection(False)
- instance = db.instance_create(values)
- conn.spawn(instance)
- self.create_vm_record(conn, os_type)
- self.check_vm_record(conn)
+ instance = db.instance_create(self.context, values)
+ self.conn.spawn(instance)
+ self.create_vm_record(self.conn, os_type, instance_id)
+ self.check_vm_record(self.conn, check_injection)
def test_spawn_not_enough_memory(self):
FLAGS.xenapi_image_service = 'glance'
@@ -361,6 +384,107 @@ class XenAPIVMTestCase(test.TestCase):
glance_stubs.FakeGlance.IMAGE_RAMDISK)
self.check_vm_params_for_linux_with_external_kernel()
+ def test_spawn_netinject_file(self):
+ FLAGS.xenapi_image_service = 'glance'
+ db_fakes.stub_out_db_instance_api(self.stubs, injected=True)
+
+ self._tee_executed = False
+
+ def _tee_handler(cmd, **kwargs):
+ input = kwargs.get('process_input', None)
+ self.assertNotEqual(input, None)
+ config = [line.strip() for line in input.split("\n")]
+ # Find the start of eth0 configuration and check it
+ index = config.index('auto eth0')
+ self.assertEquals(config[index + 1:index + 8], [
+ 'iface eth0 inet static',
+ 'address 10.0.0.3',
+ 'netmask 255.255.255.0',
+ 'broadcast 10.0.0.255',
+ 'gateway 10.0.0.1',
+ 'dns-nameservers 10.0.0.2',
+ ''])
+ self._tee_executed = True
+ return '', ''
+
+ fake_utils.fake_execute_set_repliers([
+ # Capture the sudo tee .../etc/network/interfaces command
+ (r'(sudo\s+)?tee.*interfaces', _tee_handler),
+ ])
+ FLAGS.xenapi_image_service = 'glance'
+ self._test_spawn(glance_stubs.FakeGlance.IMAGE_MACHINE,
+ glance_stubs.FakeGlance.IMAGE_KERNEL,
+ glance_stubs.FakeGlance.IMAGE_RAMDISK,
+ check_injection=True)
+ self.assertTrue(self._tee_executed)
+
+ def test_spawn_netinject_xenstore(self):
+ FLAGS.xenapi_image_service = 'glance'
+ db_fakes.stub_out_db_instance_api(self.stubs, injected=True)
+
+ self._tee_executed = False
+
+ def _mount_handler(cmd, *ignore_args, **ignore_kwargs):
+ # When mounting, create real files under the mountpoint to simulate
+ # files in the mounted filesystem
+
+ # mount point will be the last item of the command list
+ self._tmpdir = cmd[len(cmd) - 1]
+ LOG.debug(_('Creating files in %s to simulate guest agent' %
+ self._tmpdir))
+ os.makedirs(os.path.join(self._tmpdir, 'usr', 'sbin'))
+ # Touch the file using open
+ open(os.path.join(self._tmpdir, 'usr', 'sbin',
+ 'xe-update-networking'), 'w').close()
+ return '', ''
+
+ def _umount_handler(cmd, *ignore_args, **ignore_kwargs):
+ # Umount would normall make files in the m,ounted filesystem
+ # disappear, so do that here
+ LOG.debug(_('Removing simulated guest agent files in %s' %
+ self._tmpdir))
+ os.remove(os.path.join(self._tmpdir, 'usr', 'sbin',
+ 'xe-update-networking'))
+ os.rmdir(os.path.join(self._tmpdir, 'usr', 'sbin'))
+ os.rmdir(os.path.join(self._tmpdir, 'usr'))
+ return '', ''
+
+ def _tee_handler(cmd, *ignore_args, **ignore_kwargs):
+ self._tee_executed = True
+ return '', ''
+
+ fake_utils.fake_execute_set_repliers([
+ (r'(sudo\s+)?mount', _mount_handler),
+ (r'(sudo\s+)?umount', _umount_handler),
+ (r'(sudo\s+)?tee.*interfaces', _tee_handler)])
+ self._test_spawn(1, 2, 3, check_injection=True)
+
+ # tee must not run in this case, where an injection-capable
+ # guest agent is detected
+ self.assertFalse(self._tee_executed)
+
+ def test_spawn_vlanmanager(self):
+ self.flags(xenapi_image_service='glance',
+ network_manager='nova.network.manager.VlanManager',
+ network_driver='nova.network.xenapi_net',
+ vlan_interface='fake0')
+ # Reset network table
+ xenapi_fake.reset_table('network')
+ # Instance id = 2 will use vlan network (see db/fakes.py)
+ fake_instance_id = 2
+ network_bk = self.network
+ # Ensure we use xenapi_net driver
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.network.setup_compute_network(None, fake_instance_id)
+ self._test_spawn(glance_stubs.FakeGlance.IMAGE_MACHINE,
+ glance_stubs.FakeGlance.IMAGE_KERNEL,
+ glance_stubs.FakeGlance.IMAGE_RAMDISK,
+ instance_id=fake_instance_id)
+ # TODO(salvatore-orlando): a complete test here would require
+ # a check for making sure the bridge for the VM's VIF is
+ # consistent with bridge specified in nova db
+ self.network = network_bk
+
def test_spawn_with_network_qos(self):
self._create_instance()
for vif_ref in xenapi_fake.get_all('VIF'):
@@ -369,6 +493,18 @@ class XenAPIVMTestCase(test.TestCase):
self.assertEquals(vif_rec['qos_algorithm_params']['kbps'],
str(4 * 1024))
+ def test_rescue(self):
+ self.flags(xenapi_inject_image=False)
+ instance = self._create_instance()
+ conn = xenapi_conn.get_connection(False)
+ conn.rescue(instance, None)
+
+ def test_unrescue(self):
+ instance = self._create_instance()
+ conn = xenapi_conn.get_connection(False)
+ # Ensure that it will not unrescue a non-rescued instance.
+ self.assertRaises(Exception, conn.unrescue, instance, None)
+
def tearDown(self):
super(XenAPIVMTestCase, self).tearDown()
self.manager.delete_project(self.project)
@@ -378,9 +514,9 @@ class XenAPIVMTestCase(test.TestCase):
self.stubs.UnsetAll()
def _create_instance(self):
- """Creates and spawns a test instance"""
+ """Creates and spawns a test instance."""
+ stubs.stubout_loopingcall_start(self.stubs)
values = {
- 'name': 1,
'id': 1,
'project_id': self.project.id,
'user_id': self.user.id,
@@ -390,15 +526,13 @@ class XenAPIVMTestCase(test.TestCase):
'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': 'linux'}
- instance = db.instance_create(values)
+ instance = db.instance_create(self.context, values)
self.conn.spawn(instance)
return instance
class XenAPIDiffieHellmanTestCase(test.TestCase):
- """
- Unit tests for Diffie-Hellman code
- """
+ """Unit tests for Diffie-Hellman code."""
def setUp(self):
super(XenAPIDiffieHellmanTestCase, self).setUp()
self.alice = SimpleDH()
@@ -422,9 +556,7 @@ class XenAPIDiffieHellmanTestCase(test.TestCase):
class XenAPIMigrateInstance(test.TestCase):
- """
- Unit test for verifying migration-related actions
- """
+ """Unit test for verifying migration-related actions."""
def setUp(self):
super(XenAPIMigrateInstance, self).setUp()
@@ -435,21 +567,26 @@ class XenAPIMigrateInstance(test.TestCase):
db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset()
+ xenapi_fake.create_network('fake', FLAGS.flat_network_bridge)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
- self.values = {'name': 1, 'id': 1,
+ self.context = context.RequestContext('fake', 'fake', False)
+ self.values = {'id': 1,
'project_id': self.project.id,
'user_id': self.user.id,
'image_id': 1,
'kernel_id': None,
'ramdisk_id': None,
+ 'local_gb': 5,
'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff',
'os_type': 'linux'}
+ fake_utils.stub_out_utils_execute(self.stubs)
stubs.stub_out_migration_methods(self.stubs)
+ stubs.stubout_get_this_vm_uuid(self.stubs)
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
@@ -460,22 +597,21 @@ class XenAPIMigrateInstance(test.TestCase):
self.stubs.UnsetAll()
def test_migrate_disk_and_power_off(self):
- instance = db.instance_create(self.values)
+ instance = db.instance_create(self.context, self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
conn = xenapi_conn.get_connection(False)
conn.migrate_disk_and_power_off(instance, '127.0.0.1')
def test_finish_resize(self):
- instance = db.instance_create(self.values)
+ instance = db.instance_create(self.context, self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
+ stubs.stubout_loopingcall_start(self.stubs)
conn = xenapi_conn.get_connection(False)
conn.finish_resize(instance, dict(base_copy='hurr', cow='durr'))
class XenAPIDetermineDiskImageTestCase(test.TestCase):
- """
- Unit tests for code that detects the ImageType
- """
+ """Unit tests for code that detects the ImageType."""
def setUp(self):
super(XenAPIDetermineDiskImageTestCase, self).setUp()
glance_stubs.stubout_glance_client(self.stubs,
@@ -494,9 +630,7 @@ class XenAPIDetermineDiskImageTestCase(test.TestCase):
self.assertEqual(disk_type, dt)
def test_instance_disk(self):
- """
- If a kernel is specified then the image type is DISK (aka machine)
- """
+ """If a kernel is specified, the image type is DISK (aka machine)."""
FLAGS.xenapi_image_service = 'objectstore'
self.fake_instance.image_id = glance_stubs.FakeGlance.IMAGE_MACHINE
self.fake_instance.kernel_id = glance_stubs.FakeGlance.IMAGE_KERNEL
diff --git a/nova/tests/test_zones.py b/nova/tests/test_zones.py
index 5a52a0506..688dc704d 100644
--- a/nova/tests/test_zones.py
+++ b/nova/tests/test_zones.py
@@ -76,6 +76,40 @@ class ZoneManagerTestCase(test.TestCase):
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user1')
+ def test_service_capabilities(self):
+ zm = zone_manager.ZoneManager()
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, {})
+
+ zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
+
+ zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
+
+ zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
+
+ zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
+ svc10_a=(99, 99), svc10_b=(99, 99)))
+
+ zm.update_service_capabilities("svc1", "host3", dict(c=5))
+ caps = zm.get_zone_capabilities(self, None)
+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
+ svc1_c=(5, 5), svc10_a=(99, 99),
+ svc10_b=(99, 99)))
+
+ caps = zm.get_zone_capabilities(self, 'svc1')
+ self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
+ svc1_c=(5, 5)))
+ caps = zm.get_zone_capabilities(self, 'svc10')
+ self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
+
def test_refresh_from_db_replace_existing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
diff --git a/nova/tests/vmwareapi/__init__.py b/nova/tests/vmwareapi/__init__.py
new file mode 100644
index 000000000..478ee742b
--- /dev/null
+++ b/nova/tests/vmwareapi/__init__.py
@@ -0,0 +1,21 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+:mod:`vmwareapi` -- Stubs for VMware API
+=======================================
+"""
diff --git a/nova/tests/vmwareapi/db_fakes.py b/nova/tests/vmwareapi/db_fakes.py
new file mode 100644
index 000000000..0addd5573
--- /dev/null
+++ b/nova/tests/vmwareapi/db_fakes.py
@@ -0,0 +1,109 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Stubouts, mocks and fixtures for the test suite
+"""
+
+import time
+
+from nova import db
+from nova import utils
+
+
+def stub_out_db_instance_api(stubs):
+ """Stubs out the db API for creating Instances."""
+
+ INSTANCE_TYPES = {
+ 'm1.tiny': dict(memory_mb=512, vcpus=1, local_gb=0, flavorid=1),
+ 'm1.small': dict(memory_mb=2048, vcpus=1, local_gb=20, flavorid=2),
+ 'm1.medium':
+ dict(memory_mb=4096, vcpus=2, local_gb=40, flavorid=3),
+ 'm1.large': dict(memory_mb=8192, vcpus=4, local_gb=80, flavorid=4),
+ 'm1.xlarge':
+ dict(memory_mb=16384, vcpus=8, local_gb=160, flavorid=5)}
+
+ class FakeModel(object):
+ """Stubs out for model."""
+
+ def __init__(self, values):
+ self.values = values
+
+ def __getattr__(self, name):
+ return self.values[name]
+
+ def __getitem__(self, key):
+ if key in self.values:
+ return self.values[key]
+ else:
+ raise NotImplementedError()
+
+ def fake_instance_create(values):
+ """Stubs out the db.instance_create method."""
+
+ type_data = INSTANCE_TYPES[values['instance_type']]
+
+ base_options = {
+ 'name': values['name'],
+ 'id': values['id'],
+ 'reservation_id': utils.generate_uid('r'),
+ 'image_id': values['image_id'],
+ 'kernel_id': values['kernel_id'],
+ 'ramdisk_id': values['ramdisk_id'],
+ 'state_description': 'scheduling',
+ 'user_id': values['user_id'],
+ 'project_id': values['project_id'],
+ 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
+ 'instance_type': values['instance_type'],
+ 'memory_mb': type_data['memory_mb'],
+ 'mac_address': values['mac_address'],
+ 'vcpus': type_data['vcpus'],
+ 'local_gb': type_data['local_gb'],
+ }
+ return FakeModel(base_options)
+
+ def fake_network_get_by_instance(context, instance_id):
+ """Stubs out the db.network_get_by_instance method."""
+
+ fields = {
+ 'bridge': 'vmnet0',
+ 'netmask': '255.255.255.0',
+ 'gateway': '10.10.10.1',
+ 'vlan': 100}
+ return FakeModel(fields)
+
+ def fake_instance_action_create(context, action):
+ """Stubs out the db.instance_action_create method."""
+ pass
+
+ def fake_instance_get_fixed_address(context, instance_id):
+ """Stubs out the db.instance_get_fixed_address method."""
+ return '10.10.10.10'
+
+ def fake_instance_type_get_all(context, inactive=0):
+ return INSTANCE_TYPES
+
+ def fake_instance_type_get_by_name(context, name):
+ return INSTANCE_TYPES[name]
+
+ stubs.Set(db, 'instance_create', fake_instance_create)
+ stubs.Set(db, 'network_get_by_instance', fake_network_get_by_instance)
+ stubs.Set(db, 'instance_action_create', fake_instance_action_create)
+ stubs.Set(db, 'instance_get_fixed_address',
+ fake_instance_get_fixed_address)
+ stubs.Set(db, 'instance_type_get_all', fake_instance_type_get_all)
+ stubs.Set(db, 'instance_type_get_by_name', fake_instance_type_get_by_name)
diff --git a/nova/tests/vmwareapi/stubs.py b/nova/tests/vmwareapi/stubs.py
new file mode 100644
index 000000000..a648efb16
--- /dev/null
+++ b/nova/tests/vmwareapi/stubs.py
@@ -0,0 +1,46 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Stubouts for the test suite
+"""
+
+from nova.virt import vmwareapi_conn
+from nova.virt.vmwareapi import fake
+from nova.virt.vmwareapi import vmware_images
+
+
+def fake_get_vim_object(arg):
+ """Stubs out the VMWareAPISession's get_vim_object method."""
+ return fake.FakeVim()
+
+
+def fake_is_vim_object(arg, module):
+ """Stubs out the VMWareAPISession's is_vim_object method."""
+ return isinstance(module, fake.FakeVim)
+
+
+def set_stubs(stubs):
+ """Set the stubs."""
+ stubs.Set(vmware_images, 'fetch_image', fake.fake_fetch_image)
+ stubs.Set(vmware_images, 'get_vmdk_size_and_properties',
+ fake.fake_get_vmdk_size_and_properties)
+ stubs.Set(vmware_images, 'upload_image', fake.fake_upload_image)
+ stubs.Set(vmwareapi_conn.VMWareAPISession, "_get_vim_object",
+ fake_get_vim_object)
+ stubs.Set(vmwareapi_conn.VMWareAPISession, "_is_vim_object",
+ fake_is_vim_object)
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 70d46a1fb..205f6c902 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -21,6 +21,7 @@ from nova.virt.xenapi import fake
from nova.virt.xenapi import volume_utils
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi import vmops
+from nova import utils
def stubout_instance_snapshot(stubs):
@@ -137,14 +138,17 @@ def stubout_is_vdi_pv(stubs):
stubs.Set(vm_utils, '_is_vdi_pv', f)
+def stubout_loopingcall_start(stubs):
+ def fake_start(self, interval, now=True):
+ self.f(*self.args, **self.kw)
+ stubs.Set(utils.LoopingCall, 'start', fake_start)
+
+
class FakeSessionForVMTests(fake.SessionBase):
""" Stubs out a XenAPISession for VM tests """
def __init__(self, uri):
super(FakeSessionForVMTests, self).__init__(uri)
- def network_get_all_records_where(self, _1, _2):
- return self.xenapi.network.get_all_records()
-
def host_call_plugin(self, _1, _2, _3, _4, _5):
sr_ref = fake.get_all('SR')[0]
vdi_ref = fake.create_vdi('', False, sr_ref, False)
@@ -185,6 +189,25 @@ class FakeSessionForVMTests(fake.SessionBase):
pass
+def stub_out_vm_methods(stubs):
+ def fake_shutdown(self, inst, vm, method="clean"):
+ pass
+
+ def fake_acquire_bootlock(self, vm):
+ pass
+
+ def fake_release_bootlock(self, vm):
+ pass
+
+ def fake_spawn_rescue(self, inst):
+ inst._rescue = False
+
+ stubs.Set(vmops.VMOps, "_shutdown", fake_shutdown)
+ stubs.Set(vmops.VMOps, "_acquire_bootlock", fake_acquire_bootlock)
+ stubs.Set(vmops.VMOps, "_release_bootlock", fake_release_bootlock)
+ stubs.Set(vmops.VMOps, "spawn_rescue", fake_spawn_rescue)
+
+
class FakeSessionForVolumeTests(fake.SessionBase):
""" Stubs out a XenAPISession for Volume tests """
def __init__(self, uri):
@@ -228,6 +251,9 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def VDI_get_by_uuid(*args):
return 'hurr'
+ def VDI_resize_online(*args):
+ pass
+
def VM_start(self, _1, ref, _2, _3):
vm = fake.get_record('VM', ref)
if vm['power_state'] != 'Halted':
@@ -240,7 +266,7 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def stub_out_migration_methods(stubs):
def fake_get_snapshot(self, instance):
- return 'foo', 'bar'
+ return 'vm_ref', dict(image='foo', snap='bar')
@classmethod
def fake_get_vdi(cls, session, vm_ref):
@@ -249,7 +275,7 @@ def stub_out_migration_methods(stubs):
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
return vdi_ref, {'uuid': vdi_rec['uuid'], }
- def fake_shutdown(self, inst, vm, method='clean'):
+ def fake_shutdown(self, inst, vm, hard=True):
pass
@classmethod