diff options
| author | Ilya Alekseyev <ialekseev@griddynamics.com> | 2010-12-23 22:58:20 +0300 |
|---|---|---|
| committer | Ilya Alekseyev <ialekseev@griddynamics.com> | 2010-12-23 22:58:20 +0300 |
| commit | 57ead438d06dd5c6e98e971670f397bed5d7e29c (patch) | |
| tree | 6a94815ea5d66930e4638769e463b7ec60c9cdc2 /nova/tests | |
| parent | d88817a360676173ac31566e13201d56f1e2b0b0 (diff) | |
| parent | 75e2cbec9eb5132a49446f1b6d563d5f43d007de (diff) | |
Merge with trunk
Diffstat (limited to 'nova/tests')
| -rw-r--r-- | nova/tests/api/__init__.py | 81 | ||||
| -rw-r--r-- | nova/tests/api/openstack/__init__.py | 13 | ||||
| -rw-r--r-- | nova/tests/api/openstack/fakes.py | 22 | ||||
| -rw-r--r-- | nova/tests/api/openstack/test_auth.py | 4 | ||||
| -rw-r--r-- | nova/tests/api/test.py | 81 | ||||
| -rw-r--r-- | nova/tests/api_integration.py | 54 | ||||
| -rw-r--r-- | nova/tests/db/__init__.py | 20 | ||||
| -rw-r--r-- | nova/tests/db/fakes.py | 75 | ||||
| -rw-r--r-- | nova/tests/test_access.py (renamed from nova/tests/access_unittest.py) | 0 | ||||
| -rw-r--r-- | nova/tests/test_api.py (renamed from nova/tests/api_unittest.py) | 0 | ||||
| -rw-r--r-- | nova/tests/test_auth.py (renamed from nova/tests/auth_unittest.py) | 0 | ||||
| -rw-r--r-- | nova/tests/test_cloud.py (renamed from nova/tests/cloud_unittest.py) | 30 | ||||
| -rw-r--r-- | nova/tests/test_compute.py (renamed from nova/tests/compute_unittest.py) | 1 | ||||
| -rw-r--r-- | nova/tests/test_flags.py (renamed from nova/tests/flags_unittest.py) | 0 | ||||
| -rw-r--r-- | nova/tests/test_middleware.py | 86 | ||||
| -rw-r--r-- | nova/tests/test_misc.py (renamed from nova/tests/misc_unittest.py) | 8 | ||||
| -rw-r--r-- | nova/tests/test_network.py (renamed from nova/tests/network_unittest.py) | 8 | ||||
| -rw-r--r-- | nova/tests/test_quota.py (renamed from nova/tests/quota_unittest.py) | 0 | ||||
| -rw-r--r-- | nova/tests/test_rpc.py (renamed from nova/tests/rpc_unittest.py) | 29 | ||||
| -rw-r--r-- | nova/tests/test_scheduler.py (renamed from nova/tests/scheduler_unittest.py) | 3 | ||||
| -rw-r--r-- | nova/tests/test_service.py (renamed from nova/tests/service_unittest.py) | 14 | ||||
| -rw-r--r-- | nova/tests/test_twistd.py (renamed from nova/tests/twistd_unittest.py) | 0 | ||||
| -rw-r--r-- | nova/tests/test_virt.py (renamed from nova/tests/virt_unittest.py) | 37 | ||||
| -rw-r--r-- | nova/tests/test_volume.py (renamed from nova/tests/volume_unittest.py) | 0 | ||||
| -rw-r--r-- | nova/tests/test_xenapi.py | 219 | ||||
| -rw-r--r-- | nova/tests/xenapi/__init__.py | 20 | ||||
| -rw-r--r-- | nova/tests/xenapi/stubs.py | 94 |
27 files changed, 697 insertions, 202 deletions
diff --git a/nova/tests/api/__init__.py b/nova/tests/api/__init__.py index 9caa8c9d0..e69de29bb 100644 --- a/nova/tests/api/__init__.py +++ b/nova/tests/api/__init__.py @@ -1,81 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Test for the root WSGI middleware for all API controllers. -""" - -import unittest - -import stubout -import webob -import webob.dec - -import nova.exception -from nova import api -from nova.tests.api.fakes import APIStub - - -class Test(unittest.TestCase): - - def setUp(self): - self.stubs = stubout.StubOutForTesting() - - def tearDown(self): - self.stubs.UnsetAll() - - def _request(self, url, subdomain, **kwargs): - environ_keys = {'HTTP_HOST': '%s.example.com' % subdomain} - environ_keys.update(kwargs) - req = webob.Request.blank(url, environ_keys) - return req.get_response(api.API('ec2')) - - def test_openstack(self): - self.stubs.Set(api.openstack, 'API', APIStub) - result = self._request('/v1.0/cloud', 'api') - self.assertEqual(result.body, "/cloud") - - def test_ec2(self): - self.stubs.Set(api.ec2, 'API', APIStub) - result = self._request('/services/cloud', 'ec2') - self.assertEqual(result.body, "/cloud") - - def test_not_found(self): - self.stubs.Set(api.ec2, 'API', APIStub) - self.stubs.Set(api.openstack, 'API', APIStub) - result = self._request('/test/cloud', 'ec2') - self.assertNotEqual(result.body, "/cloud") - - def test_query_api_versions(self): - result = self._request('/', 'api') - self.assertTrue('CURRENT' in result.body) - - def test_metadata(self): - def go(url): - result = self._request(url, 'ec2', REMOTE_ADDR='128.192.151.2') - # Each should get to the ORM layer and fail to find the IP - self.assertRaises(nova.exception.NotFound, go, '/latest/') - self.assertRaises(nova.exception.NotFound, go, '/2009-04-04/') - self.assertRaises(nova.exception.NotFound, go, '/1.0/') - - def test_ec2_root(self): - result = self._request('/', 'ec2') - self.assertTrue('2007-12-15\n' in result.body) - - -if __name__ == '__main__': - unittest.main() diff --git a/nova/tests/api/openstack/__init__.py b/nova/tests/api/openstack/__init__.py index 2e357febe..9e183bd0d 100644 --- a/nova/tests/api/openstack/__init__.py +++ b/nova/tests/api/openstack/__init__.py @@ -17,11 +17,16 @@ import unittest -from nova.api.openstack import limited -from nova.api.openstack import RateLimitingMiddleware +from nova import context +from nova import flags +from nova.api.openstack.ratelimiting import RateLimitingMiddleware +from nova.api.openstack.common import limited from nova.tests.api.fakes import APIStub +from nova import utils from webob import Request +FLAGS = flags.FLAGS + class RateLimitingMiddlewareTest(unittest.TestCase): @@ -46,6 +51,8 @@ class RateLimitingMiddlewareTest(unittest.TestCase): def exhaust(self, middleware, method, url, username, times): req = Request.blank(url, dict(REQUEST_METHOD=method), headers={'X-Auth-User': username}) + req.environ['nova.context'] = context.RequestContext(username, + username) for i in range(times): resp = req.get_response(middleware) self.assertEqual(resp.status_int, 200) @@ -62,7 +69,7 @@ class RateLimitingMiddlewareTest(unittest.TestCase): middleware = RateLimitingMiddleware(APIStub()) self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10) self.exhaust(middleware, 'POST', '/images/4', 'usr2', 10) - self.assertTrue(set(middleware.limiter._levels) == + self.assertTrue(set(middleware.limiter._levels) == \ set(['usr1:POST', 'usr1:POST servers', 'usr2:POST'])) def test_POST_servers_action_correctly_ratelimited(self): diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index 21b8aac1c..79663e43a 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -29,8 +29,11 @@ from nova import exception as exc from nova import flags from nova import utils import nova.api.openstack.auth -from nova.image import service +from nova.api.openstack import auth +from nova.api.openstack import ratelimiting from nova.image import glance +from nova.image import local +from nova.image import service from nova.tests import fake_flags from nova.wsgi import Router @@ -51,10 +54,11 @@ class FakeRouter(Router): return res -def fake_auth_init(self): +def fake_auth_init(self, application): self.db = FakeAuthDatabase() self.context = Context() self.auth = FakeAuthManager() + self.application = application @webob.dec.wsgify @@ -75,28 +79,28 @@ def stub_out_image_service(stubs): def fake_image_show(meh, context, id): return dict(kernelId=1, ramdiskId=1) - stubs.Set(nova.image.local.LocalImageService, 'show', fake_image_show) + stubs.Set(local.LocalImageService, 'show', fake_image_show) def stub_out_auth(stubs): def fake_auth_init(self, app): self.application = app - stubs.Set(nova.api.openstack.AuthMiddleware, + stubs.Set(nova.api.openstack.auth.AuthMiddleware, '__init__', fake_auth_init) - stubs.Set(nova.api.openstack.AuthMiddleware, + stubs.Set(nova.api.openstack.auth.AuthMiddleware, '__call__', fake_wsgi) def stub_out_rate_limiting(stubs): def fake_rate_init(self, app): - super(nova.api.openstack.RateLimitingMiddleware, self).__init__(app) + super(ratelimiting.RateLimitingMiddleware, self).__init__(app) self.application = app - stubs.Set(nova.api.openstack.RateLimitingMiddleware, + stubs.Set(nova.api.openstack.ratelimiting.RateLimitingMiddleware, '__init__', fake_rate_init) - stubs.Set(nova.api.openstack.RateLimitingMiddleware, + stubs.Set(nova.api.openstack.ratelimiting.RateLimitingMiddleware, '__call__', fake_wsgi) @@ -173,7 +177,7 @@ class FakeToken(object): class FakeRequestContext(object): - def __init__(self, user, project): + def __init__(self, user, project, *args, **kwargs): self.user_id = 1 self.project_id = 1 diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py index 7b427c2db..489a1dfbf 100644 --- a/nova/tests/api/openstack/test_auth.py +++ b/nova/tests/api/openstack/test_auth.py @@ -34,7 +34,7 @@ class Test(unittest.TestCase): def setUp(self): self.stubs = stubout.StubOutForTesting() - self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager, + self.stubs.Set(nova.api.openstack.auth.AuthMiddleware, '__init__', fakes.fake_auth_init) self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext) fakes.FakeAuthManager.auth_data = {} @@ -131,7 +131,7 @@ class Test(unittest.TestCase): class TestLimiter(unittest.TestCase): def setUp(self): self.stubs = stubout.StubOutForTesting() - self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager, + self.stubs.Set(nova.api.openstack.auth.AuthMiddleware, '__init__', fakes.fake_auth_init) self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext) fakes.FakeAuthManager.auth_data = {} diff --git a/nova/tests/api/test.py b/nova/tests/api/test.py new file mode 100644 index 000000000..9caa8c9d0 --- /dev/null +++ b/nova/tests/api/test.py @@ -0,0 +1,81 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test for the root WSGI middleware for all API controllers. +""" + +import unittest + +import stubout +import webob +import webob.dec + +import nova.exception +from nova import api +from nova.tests.api.fakes import APIStub + + +class Test(unittest.TestCase): + + def setUp(self): + self.stubs = stubout.StubOutForTesting() + + def tearDown(self): + self.stubs.UnsetAll() + + def _request(self, url, subdomain, **kwargs): + environ_keys = {'HTTP_HOST': '%s.example.com' % subdomain} + environ_keys.update(kwargs) + req = webob.Request.blank(url, environ_keys) + return req.get_response(api.API('ec2')) + + def test_openstack(self): + self.stubs.Set(api.openstack, 'API', APIStub) + result = self._request('/v1.0/cloud', 'api') + self.assertEqual(result.body, "/cloud") + + def test_ec2(self): + self.stubs.Set(api.ec2, 'API', APIStub) + result = self._request('/services/cloud', 'ec2') + self.assertEqual(result.body, "/cloud") + + def test_not_found(self): + self.stubs.Set(api.ec2, 'API', APIStub) + self.stubs.Set(api.openstack, 'API', APIStub) + result = self._request('/test/cloud', 'ec2') + self.assertNotEqual(result.body, "/cloud") + + def test_query_api_versions(self): + result = self._request('/', 'api') + self.assertTrue('CURRENT' in result.body) + + def test_metadata(self): + def go(url): + result = self._request(url, 'ec2', REMOTE_ADDR='128.192.151.2') + # Each should get to the ORM layer and fail to find the IP + self.assertRaises(nova.exception.NotFound, go, '/latest/') + self.assertRaises(nova.exception.NotFound, go, '/2009-04-04/') + self.assertRaises(nova.exception.NotFound, go, '/1.0/') + + def test_ec2_root(self): + result = self._request('/', 'ec2') + self.assertTrue('2007-12-15\n' in result.body) + + +if __name__ == '__main__': + unittest.main() diff --git a/nova/tests/api_integration.py b/nova/tests/api_integration.py deleted file mode 100644 index 54403c655..000000000 --- a/nova/tests/api_integration.py +++ /dev/null @@ -1,54 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import boto -from boto.ec2.regioninfo import RegionInfo -import unittest - - -ACCESS_KEY = 'fake' -SECRET_KEY = 'fake' -CLC_IP = '127.0.0.1' -CLC_PORT = 8773 -REGION = 'test' - - -def get_connection(): - return boto.connect_ec2( - aws_access_key_id=ACCESS_KEY, - aws_secret_access_key=SECRET_KEY, - is_secure=False, - region=RegionInfo(None, REGION, CLC_IP), - port=CLC_PORT, - path='/services/Cloud', - debug=99) - - -class APIIntegrationTests(unittest.TestCase): - def test_001_get_all_images(self): - conn = get_connection() - res = conn.get_all_images() - - -if __name__ == '__main__': - unittest.main() - -#print conn.get_all_key_pairs() -#print conn.create_key_pair -#print conn.create_security_group('name', 'description') diff --git a/nova/tests/db/__init__.py b/nova/tests/db/__init__.py new file mode 100644 index 000000000..2d43aac42 --- /dev/null +++ b/nova/tests/db/__init__.py @@ -0,0 +1,20 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`db` -- Stubs for DB API +============================= +""" diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py new file mode 100644 index 000000000..05bdd172e --- /dev/null +++ b/nova/tests/db/fakes.py @@ -0,0 +1,75 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack, LLC +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Stubouts, mocks and fixtures for the test suite""" + +import time + +from nova import db +from nova import utils +from nova.compute import instance_types + + +def stub_out_db_instance_api(stubs): + """ Stubs out the db API for creating Instances """ + + class FakeModel(object): + """ Stubs out for model """ + def __init__(self, values): + self.values = values + + def __getattr__(self, name): + return self.values[name] + + def __getitem__(self, key): + if key in self.values: + return self.values[key] + else: + raise NotImplementedError() + + def fake_instance_create(values): + """ Stubs out the db.instance_create method """ + + type_data = instance_types.INSTANCE_TYPES[values['instance_type']] + + base_options = { + 'name': values['name'], + 'id': values['id'], + 'reservation_id': utils.generate_uid('r'), + 'image_id': values['image_id'], + 'kernel_id': values['kernel_id'], + 'ramdisk_id': values['ramdisk_id'], + 'state_description': 'scheduling', + 'user_id': values['user_id'], + 'project_id': values['project_id'], + 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), + 'instance_type': values['instance_type'], + 'memory_mb': type_data['memory_mb'], + 'mac_address': values['mac_address'], + 'vcpus': type_data['vcpus'], + 'local_gb': type_data['local_gb'], + } + return FakeModel(base_options) + + def fake_network_get_by_instance(context, instance_id): + fields = { + 'bridge': 'xenbr0', + } + return FakeModel(fields) + + stubs.Set(db, 'instance_create', fake_instance_create) + stubs.Set(db, 'network_get_by_instance', fake_network_get_by_instance) diff --git a/nova/tests/access_unittest.py b/nova/tests/test_access.py index 58fdea3b5..58fdea3b5 100644 --- a/nova/tests/access_unittest.py +++ b/nova/tests/test_access.py diff --git a/nova/tests/api_unittest.py b/nova/tests/test_api.py index 33d4cb294..33d4cb294 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/test_api.py diff --git a/nova/tests/auth_unittest.py b/nova/tests/test_auth.py index 15d40bc53..15d40bc53 100644 --- a/nova/tests/auth_unittest.py +++ b/nova/tests/test_auth.py diff --git a/nova/tests/cloud_unittest.py b/nova/tests/test_cloud.py index 53a762310..70d2c44da 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/test_cloud.py @@ -22,20 +22,18 @@ import logging from M2Crypto import BIO from M2Crypto import RSA import os -import StringIO import tempfile import time from eventlet import greenthread -from xml.etree import ElementTree from nova import context from nova import crypto from nova import db from nova import flags from nova import rpc +from nova import service from nova import test -from nova import utils from nova.auth import manager from nova.compute import power_state from nova.api.ec2 import cloud @@ -54,7 +52,8 @@ os.makedirs(IMAGES_PATH) class CloudTestCase(test.TestCase): def setUp(self): super(CloudTestCase, self).setUp() - self.flags(connection_type='fake', images_path=IMAGES_PATH) + self.flags(connection_type='fake', + images_path=IMAGES_PATH) self.conn = rpc.Connection.instance() logging.getLogger().setLevel(logging.DEBUG) @@ -62,27 +61,23 @@ class CloudTestCase(test.TestCase): # set up our cloud self.cloud = cloud.CloudController() - # set up a service - self.compute = utils.import_object(FLAGS.compute_manager) - self.compute_consumer = rpc.AdapterConsumer(connection=self.conn, - topic=FLAGS.compute_topic, - proxy=self.compute) - self.compute_consumer.attach_to_eventlet() - self.network = utils.import_object(FLAGS.network_manager) - self.network_consumer = rpc.AdapterConsumer(connection=self.conn, - topic=FLAGS.network_topic, - proxy=self.network) - self.network_consumer.attach_to_eventlet() + # set up services + self.compute = service.Service.create(binary='nova-compute') + self.compute.start() + self.network = service.Service.create(binary='nova-network') + self.network.start() self.manager = manager.AuthManager() self.user = self.manager.create_user('admin', 'admin', 'admin', True) self.project = self.manager.create_project('proj', 'admin', 'proj') self.context = context.RequestContext(user=self.user, - project=self.project) + project=self.project) def tearDown(self): self.manager.delete_project(self.project) self.manager.delete_user(self.user) + self.compute.kill() + self.network.kill() super(CloudTestCase, self).tearDown() def _create_key(self, name): @@ -109,12 +104,13 @@ class CloudTestCase(test.TestCase): {'address': address, 'host': FLAGS.host}) self.cloud.allocate_address(self.context) - inst = db.instance_create(self.context, {}) + inst = db.instance_create(self.context, {'host': FLAGS.host}) fixed = self.network.allocate_fixed_ip(self.context, inst['id']) ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id']) self.cloud.associate_address(self.context, instance_id=ec2_id, public_ip=address) + greenthread.sleep(0.3) self.cloud.disassociate_address(self.context, public_ip=address) self.cloud.release_address(self.context, diff --git a/nova/tests/compute_unittest.py b/nova/tests/test_compute.py index 187ca31de..348bb3351 100644 --- a/nova/tests/compute_unittest.py +++ b/nova/tests/test_compute.py @@ -41,6 +41,7 @@ class ComputeTestCase(test.TestCase): logging.getLogger().setLevel(logging.DEBUG) super(ComputeTestCase, self).setUp() self.flags(connection_type='fake', + stub_network=True, network_manager='nova.network.manager.FlatManager') self.compute = utils.import_object(FLAGS.compute_manager) self.compute_api = compute_api.ComputeAPI() diff --git a/nova/tests/flags_unittest.py b/nova/tests/test_flags.py index 707300fcf..707300fcf 100644 --- a/nova/tests/flags_unittest.py +++ b/nova/tests/test_flags.py diff --git a/nova/tests/test_middleware.py b/nova/tests/test_middleware.py new file mode 100644 index 000000000..0febf52d6 --- /dev/null +++ b/nova/tests/test_middleware.py @@ -0,0 +1,86 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import webob +import webob.dec +import webob.exc + +from nova.api import ec2 +from nova import flags +from nova import test +from nova import utils + + +FLAGS = flags.FLAGS + + +@webob.dec.wsgify +def conditional_forbid(req): + """Helper wsgi app returns 403 if param 'die' is 1.""" + if 'die' in req.params and req.params['die'] == '1': + raise webob.exc.HTTPForbidden() + return 'OK' + + +class LockoutTestCase(test.TrialTestCase): + """Test case for the Lockout middleware.""" + def setUp(self): # pylint: disable-msg=C0103 + super(LockoutTestCase, self).setUp() + utils.set_time_override() + self.lockout = ec2.Lockout(conditional_forbid) + + def tearDown(self): # pylint: disable-msg=C0103 + utils.clear_time_override() + super(LockoutTestCase, self).tearDown() + + def _send_bad_attempts(self, access_key, num_attempts=1): + """Fail x.""" + for i in xrange(num_attempts): + req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key) + self.assertEqual(req.get_response(self.lockout).status_int, 403) + + def _is_locked_out(self, access_key): + """Sends a test request to see if key is locked out.""" + req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key) + return (req.get_response(self.lockout).status_int == 403) + + def test_lockout(self): + self._send_bad_attempts('test', FLAGS.lockout_attempts) + self.assertTrue(self._is_locked_out('test')) + + def test_timeout(self): + self._send_bad_attempts('test', FLAGS.lockout_attempts) + self.assertTrue(self._is_locked_out('test')) + utils.advance_time_seconds(FLAGS.lockout_minutes * 60) + self.assertFalse(self._is_locked_out('test')) + + def test_multiple_keys(self): + self._send_bad_attempts('test1', FLAGS.lockout_attempts) + self.assertTrue(self._is_locked_out('test1')) + self.assertFalse(self._is_locked_out('test2')) + utils.advance_time_seconds(FLAGS.lockout_minutes * 60) + self.assertFalse(self._is_locked_out('test1')) + self.assertFalse(self._is_locked_out('test2')) + + def test_window_timeout(self): + self._send_bad_attempts('test', FLAGS.lockout_attempts - 1) + self.assertFalse(self._is_locked_out('test')) + utils.advance_time_seconds(FLAGS.lockout_window * 60) + self._send_bad_attempts('test', FLAGS.lockout_attempts - 1) + self.assertFalse(self._is_locked_out('test')) diff --git a/nova/tests/misc_unittest.py b/nova/tests/test_misc.py index 3d947427a..33c1777d5 100644 --- a/nova/tests/misc_unittest.py +++ b/nova/tests/test_misc.py @@ -22,13 +22,13 @@ from nova.utils import parse_mailmap, str_dict_replace class ProjectTestCase(test.TestCase): def test_authors_up_to_date(self): - if os.path.exists('../.bzr'): + if os.path.exists('.bzr'): contributors = set() - mailmap = parse_mailmap('../.mailmap') + mailmap = parse_mailmap('.mailmap') import bzrlib.workingtree - tree = bzrlib.workingtree.WorkingTree.open('..') + tree = bzrlib.workingtree.WorkingTree.open('.') tree.lock_read() try: parents = tree.get_parent_ids() @@ -42,7 +42,7 @@ class ProjectTestCase(test.TestCase): email = author.split(' ')[-1] contributors.add(str_dict_replace(email, mailmap)) - authors_file = open('../Authors', 'r').read() + authors_file = open('Authors', 'r').read() missing = set() for contributor in contributors: diff --git a/nova/tests/network_unittest.py b/nova/tests/test_network.py index bcac20585..96473ac7c 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/test_network.py @@ -26,6 +26,7 @@ from nova import context from nova import db from nova import exception from nova import flags +from nova import service from nova import test from nova import utils from nova.auth import manager @@ -40,6 +41,7 @@ class NetworkTestCase(test.TestCase): # NOTE(vish): if you change these flags, make sure to change the # flags in the corresponding section in nova-dhcpbridge self.flags(connection_type='fake', + fake_call=True, fake_network=True, network_size=16, num_networks=5) @@ -56,16 +58,13 @@ class NetworkTestCase(test.TestCase): # create the necessary network data for the project user_context = context.RequestContext(project=self.projects[i], user=self.user) - network_ref = self.network.get_network(user_context) - self.network.set_network_host(context.get_admin_context(), - network_ref['id']) + host = self.network.get_network_host(user_context.elevated()) instance_ref = self._create_instance(0) self.instance_id = instance_ref['id'] instance_ref = self._create_instance(1) self.instance2_id = instance_ref['id'] def tearDown(self): - super(NetworkTestCase, self).tearDown() # TODO(termie): this should really be instantiating clean datastores # in between runs, one failure kills all the tests db.instance_destroy(context.get_admin_context(), self.instance_id) @@ -73,6 +72,7 @@ class NetworkTestCase(test.TestCase): for project in self.projects: self.manager.delete_project(project) self.manager.delete_user(self.user) + super(NetworkTestCase, self).tearDown() def _create_instance(self, project_num, mac=None): if not mac: diff --git a/nova/tests/quota_unittest.py b/nova/tests/test_quota.py index 8cf2a5e54..8cf2a5e54 100644 --- a/nova/tests/quota_unittest.py +++ b/nova/tests/test_quota.py diff --git a/nova/tests/rpc_unittest.py b/nova/tests/test_rpc.py index a2495e65a..6ea2edcab 100644 --- a/nova/tests/rpc_unittest.py +++ b/nova/tests/test_rpc.py @@ -33,7 +33,7 @@ class RpcTestCase(test.TestCase): """Test cases for rpc""" def setUp(self): super(RpcTestCase, self).setUp() - self.conn = rpc.Connection.instance() + self.conn = rpc.Connection.instance(True) self.receiver = TestReceiver() self.consumer = rpc.AdapterConsumer(connection=self.conn, topic='test', @@ -79,6 +79,33 @@ class RpcTestCase(test.TestCase): except rpc.RemoteError as exc: self.assertEqual(int(exc.value), value) + def test_nested_calls(self): + """Test that we can do an rpc.call inside another call""" + class Nested(object): + @staticmethod + def echo(context, queue, value): + """Calls echo in the passed queue""" + logging.debug("Nested received %s, %s", queue, value) + ret = rpc.call(context, + queue, + {"method": "echo", + "args": {"value": value}}) + logging.debug("Nested return %s", ret) + return value + + nested = Nested() + conn = rpc.Connection.instance(True) + consumer = rpc.AdapterConsumer(connection=conn, + topic='nested', + proxy=nested) + consumer.attach_to_eventlet() + value = 42 + result = rpc.call(self.context, + 'nested', {"method": "echo", + "args": {"queue": "test", + "value": value}}) + self.assertEqual(value, result) + class TestReceiver(object): """Simple Proxy class so the consumer has methods to call diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/test_scheduler.py index dbc195a6e..487e7b9e6 100644 --- a/nova/tests/scheduler_unittest.py +++ b/nova/tests/test_scheduler.py @@ -51,7 +51,7 @@ class SchedulerTestCase(test.TestCase): """Test case for scheduler""" def setUp(self): super(SchedulerTestCase, self).setUp() - self.flags(scheduler_driver='nova.tests.scheduler_unittest.TestDriver') + self.flags(scheduler_driver='nova.tests.test_scheduler.TestDriver') def test_fallback(self): scheduler = manager.SchedulerManager() @@ -121,6 +121,7 @@ class SimpleDriverTestCase(test.TestCase): def setUp(self): super(SimpleDriverTestCase, self).setUp() self.flags(connection_type='fake', + stub_network=True, max_cores=4, max_gigabytes=4, network_manager='nova.network.manager.FlatManager', diff --git a/nova/tests/service_unittest.py b/nova/tests/test_service.py index 47c092f8e..b30838ad7 100644 --- a/nova/tests/service_unittest.py +++ b/nova/tests/test_service.py @@ -30,7 +30,7 @@ from nova import service from nova import manager FLAGS = flags.FLAGS -flags.DEFINE_string("fake_manager", "nova.tests.service_unittest.FakeManager", +flags.DEFINE_string("fake_manager", "nova.tests.test_service.FakeManager", "Manager for testing") @@ -52,14 +52,14 @@ class ServiceManagerTestCase(test.TestCase): serv = service.Service('test', 'test', 'test', - 'nova.tests.service_unittest.FakeManager') + 'nova.tests.test_service.FakeManager') self.assertRaises(AttributeError, getattr, serv, 'test_method') def test_message_gets_to_manager(self): serv = service.Service('test', 'test', 'test', - 'nova.tests.service_unittest.FakeManager') + 'nova.tests.test_service.FakeManager') serv.start() self.assertEqual(serv.test_method(), 'manager') @@ -67,7 +67,7 @@ class ServiceManagerTestCase(test.TestCase): serv = ExtendedService('test', 'test', 'test', - 'nova.tests.service_unittest.FakeManager') + 'nova.tests.test_service.FakeManager') serv.start() self.assertEqual(serv.test_method(), 'service') @@ -156,7 +156,7 @@ class ServiceTestCase(test.TestCase): serv = service.Service(host, binary, topic, - 'nova.tests.service_unittest.FakeManager') + 'nova.tests.test_service.FakeManager') serv.start() serv.report_state() @@ -186,7 +186,7 @@ class ServiceTestCase(test.TestCase): serv = service.Service(host, binary, topic, - 'nova.tests.service_unittest.FakeManager') + 'nova.tests.test_service.FakeManager') serv.start() serv.report_state() self.assert_(serv.model_disconnected) @@ -219,7 +219,7 @@ class ServiceTestCase(test.TestCase): serv = service.Service(host, binary, topic, - 'nova.tests.service_unittest.FakeManager') + 'nova.tests.test_service.FakeManager') serv.start() serv.model_disconnected = True serv.report_state() diff --git a/nova/tests/twistd_unittest.py b/nova/tests/test_twistd.py index 75007b9c8..75007b9c8 100644 --- a/nova/tests/twistd_unittest.py +++ b/nova/tests/test_twistd.py diff --git a/nova/tests/virt_unittest.py b/nova/tests/test_virt.py index cb35db1e1..8dab8de2f 100644 --- a/nova/tests/virt_unittest.py +++ b/nova/tests/test_virt.py @@ -33,6 +33,7 @@ flags.DECLARE('instances_path', 'nova.compute.manager') class LibvirtConnTestCase(test.TestCase): def setUp(self): super(LibvirtConnTestCase, self).setUp() + self.flags(fake_call=True) self.manager = manager.AuthManager() self.user = self.manager.create_user('fake', 'fake', 'fake', admin=True) @@ -52,45 +53,43 @@ class LibvirtConnTestCase(test.TestCase): def test_xml_and_uri_no_ramdisk_no_kernel(self): instance_data = dict(self.test_instance) - self.do_test_xml_and_uri(instance_data, - expect_kernel=False, expect_ramdisk=False) + self._check_xml_and_uri(instance_data, + expect_kernel=False, expect_ramdisk=False) def test_xml_and_uri_no_ramdisk(self): instance_data = dict(self.test_instance) instance_data['kernel_id'] = 'aki-deadbeef' - self.do_test_xml_and_uri(instance_data, - expect_kernel=True, expect_ramdisk=False) + self._check_xml_and_uri(instance_data, + expect_kernel=True, expect_ramdisk=False) def test_xml_and_uri_no_kernel(self): instance_data = dict(self.test_instance) instance_data['ramdisk_id'] = 'ari-deadbeef' - self.do_test_xml_and_uri(instance_data, - expect_kernel=False, expect_ramdisk=False) + self._check_xml_and_uri(instance_data, + expect_kernel=False, expect_ramdisk=False) def test_xml_and_uri(self): instance_data = dict(self.test_instance) instance_data['ramdisk_id'] = 'ari-deadbeef' instance_data['kernel_id'] = 'aki-deadbeef' - self.do_test_xml_and_uri(instance_data, - expect_kernel=True, expect_ramdisk=True) + self._check_xml_and_uri(instance_data, + expect_kernel=True, expect_ramdisk=True) def test_xml_and_uri_rescue(self): instance_data = dict(self.test_instance) instance_data['ramdisk_id'] = 'ari-deadbeef' instance_data['kernel_id'] = 'aki-deadbeef' - self.do_test_xml_and_uri(instance_data, - expect_kernel=True, expect_ramdisk=True, - rescue=True) + self._check_xml_and_uri(instance_data, expect_kernel=True, + expect_ramdisk=True, rescue=True) - def do_test_xml_and_uri(self, instance, - expect_ramdisk, expect_kernel, - rescue=False): + def _check_xml_and_uri(self, instance, expect_ramdisk, expect_kernel, + rescue=False): user_context = context.RequestContext(project=self.project, user=self.user) instance_ref = db.instance_create(user_context, instance) - network_ref = self.network.get_network(user_context) - self.network.set_network_host(context.get_admin_context(), - network_ref['id']) + host = self.network.get_network_host(user_context.elevated()) + network_ref = db.project_get_network(context.get_admin_context(), + self.project.id) fixed_ip = {'address': self.test_ip, 'network_id': network_ref['id']} @@ -338,7 +337,7 @@ class NWFilterTestCase(test.TestCase): self.security_group.id) instance = db.instance_get(self.context, inst_id) - d = self.fw.setup_nwfilters_for_instance(instance) + self.fw.setup_base_nwfilters() + self.fw.setup_nwfilters_for_instance(instance) _ensure_all_called() self.teardown_security_group() - return d diff --git a/nova/tests/volume_unittest.py b/nova/tests/test_volume.py index b13455fb0..b13455fb0 100644 --- a/nova/tests/volume_unittest.py +++ b/nova/tests/test_volume.py diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py new file mode 100644 index 000000000..b5d3ea395 --- /dev/null +++ b/nova/tests/test_xenapi.py @@ -0,0 +1,219 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Test suite for XenAPI +""" + +import stubout + +from nova import db +from nova import context +from nova import flags +from nova import test +from nova import utils +from nova.auth import manager +from nova.compute import instance_types +from nova.compute import power_state +from nova.virt import xenapi_conn +from nova.virt.xenapi import fake +from nova.virt.xenapi import volume_utils +from nova.tests.db import fakes +from nova.tests.xenapi import stubs + +FLAGS = flags.FLAGS + + +class XenAPIVolumeTestCase(test.TestCase): + """ + Unit tests for Volume operations + """ + def setUp(self): + super(XenAPIVolumeTestCase, self).setUp() + self.stubs = stubout.StubOutForTesting() + FLAGS.target_host = '127.0.0.1' + FLAGS.xenapi_connection_url = 'test_url' + FLAGS.xenapi_connection_password = 'test_pass' + fakes.stub_out_db_instance_api(self.stubs) + fake.reset() + self.values = {'name': 1, 'id': 1, + 'project_id': 'fake', + 'user_id': 'fake', + 'image_id': 1, + 'kernel_id': 2, + 'ramdisk_id': 3, + 'instance_type': 'm1.large', + 'mac_address': 'aa:bb:cc:dd:ee:ff', + } + + def _create_volume(self, size='0'): + """Create a volume object.""" + vol = {} + vol['size'] = size + vol['user_id'] = 'fake' + vol['project_id'] = 'fake' + vol['host'] = 'localhost' + vol['availability_zone'] = FLAGS.storage_availability_zone + vol['status'] = "creating" + vol['attach_status'] = "detached" + return db.volume_create(context.get_admin_context(), vol) + + def test_create_iscsi_storage(self): + """ This shows how to test helper classes' methods """ + stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests) + session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass') + helper = volume_utils.VolumeHelper + helper.XenAPI = session.get_imported_xenapi() + vol = self._create_volume() + info = helper.parse_volume_info(vol['ec2_id'], '/dev/sdc') + label = 'SR-%s' % vol['ec2_id'] + description = 'Test-SR' + sr_ref = helper.create_iscsi_storage(session, info, label, description) + srs = fake.get_all('SR') + self.assertEqual(sr_ref, srs[0]) + db.volume_destroy(context.get_admin_context(), vol['id']) + + def test_parse_volume_info_raise_exception(self): + """ This shows how to test helper classes' methods """ + stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests) + session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass') + helper = volume_utils.VolumeHelper + helper.XenAPI = session.get_imported_xenapi() + vol = self._create_volume() + # oops, wrong mount point! + self.assertRaises(volume_utils.StorageError, + helper.parse_volume_info, + vol['ec2_id'], + '/dev/sd') + db.volume_destroy(context.get_admin_context(), vol['id']) + + def test_attach_volume(self): + """ This shows how to test Ops classes' methods """ + stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests) + conn = xenapi_conn.get_connection(False) + volume = self._create_volume() + instance = db.instance_create(self.values) + fake.create_vm(instance.name, 'Running') + result = conn.attach_volume(instance.name, volume['ec2_id'], + '/dev/sdc') + + def check(): + # check that the VM has a VBD attached to it + # Get XenAPI reference for the VM + vms = fake.get_all('VM') + # Get XenAPI record for VBD + vbds = fake.get_all('VBD') + vbd = fake.get_record('VBD', vbds[0]) + vm_ref = vbd['VM'] + self.assertEqual(vm_ref, vms[0]) + + check() + + def test_attach_volume_raise_exception(self): + """ This shows how to test when exceptions are raised """ + stubs.stubout_session(self.stubs, + stubs.FakeSessionForVolumeFailedTests) + conn = xenapi_conn.get_connection(False) + volume = self._create_volume() + instance = db.instance_create(self.values) + fake.create_vm(instance.name, 'Running') + self.assertRaises(Exception, + conn.attach_volume, + instance.name, + volume['ec2_id'], + '/dev/sdc') + + def tearDown(self): + super(XenAPIVolumeTestCase, self).tearDown() + self.stubs.UnsetAll() + + +class XenAPIVMTestCase(test.TestCase): + """ + Unit tests for VM operations + """ + def setUp(self): + super(XenAPIVMTestCase, self).setUp() + self.manager = manager.AuthManager() + self.user = self.manager.create_user('fake', 'fake', 'fake', + admin=True) + self.project = self.manager.create_project('fake', 'fake', 'fake') + self.network = utils.import_object(FLAGS.network_manager) + self.stubs = stubout.StubOutForTesting() + FLAGS.xenapi_connection_url = 'test_url' + FLAGS.xenapi_connection_password = 'test_pass' + fake.reset() + fakes.stub_out_db_instance_api(self.stubs) + fake.create_network('fake', FLAGS.flat_network_bridge) + + def test_list_instances_0(self): + stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) + conn = xenapi_conn.get_connection(False) + instances = conn.list_instances() + self.assertEquals(instances, []) + + def test_spawn(self): + stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) + values = {'name': 1, 'id': 1, + 'project_id': self.project.id, + 'user_id': self.user.id, + 'image_id': 1, + 'kernel_id': 2, + 'ramdisk_id': 3, + 'instance_type': 'm1.large', + 'mac_address': 'aa:bb:cc:dd:ee:ff', + } + conn = xenapi_conn.get_connection(False) + instance = db.instance_create(values) + conn.spawn(instance) + + def check(): + instances = conn.list_instances() + self.assertEquals(instances, [1]) + + # Get Nova record for VM + vm_info = conn.get_info(1) + + # Get XenAPI record for VM + vms = fake.get_all('VM') + vm = fake.get_record('VM', vms[0]) + + # Check that m1.large above turned into the right thing. + instance_type = instance_types.INSTANCE_TYPES['m1.large'] + mem_kib = long(instance_type['memory_mb']) << 10 + mem_bytes = str(mem_kib << 10) + vcpus = instance_type['vcpus'] + self.assertEquals(vm_info['max_mem'], mem_kib) + self.assertEquals(vm_info['mem'], mem_kib) + self.assertEquals(vm['memory_static_max'], mem_bytes) + self.assertEquals(vm['memory_dynamic_max'], mem_bytes) + self.assertEquals(vm['memory_dynamic_min'], mem_bytes) + self.assertEquals(vm['VCPUs_max'], str(vcpus)) + self.assertEquals(vm['VCPUs_at_startup'], str(vcpus)) + + # Check that the VM is running according to Nova + self.assertEquals(vm_info['state'], power_state.RUNNING) + + # Check that the VM is running according to XenAPI. + self.assertEquals(vm['power_state'], 'Running') + + check() + + def tearDown(self): + super(XenAPIVMTestCase, self).tearDown() + self.manager.delete_project(self.project) + self.manager.delete_user(self.user) + self.stubs.UnsetAll() diff --git a/nova/tests/xenapi/__init__.py b/nova/tests/xenapi/__init__.py new file mode 100644 index 000000000..1dd02bdc1 --- /dev/null +++ b/nova/tests/xenapi/__init__.py @@ -0,0 +1,20 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`xenapi` -- Stubs for XenAPI +================================= +""" diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py new file mode 100644 index 000000000..1dacad6a3 --- /dev/null +++ b/nova/tests/xenapi/stubs.py @@ -0,0 +1,94 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Stubouts, mocks and fixtures for the test suite""" + +from nova.virt import xenapi_conn +from nova.virt.xenapi import fake + + +def stubout_session(stubs, cls): + """ Stubs out two methods from XenAPISession """ + def fake_import(self): + """ Stubs out get_imported_xenapi of XenAPISession """ + fake_module = 'nova.virt.xenapi.fake' + from_list = ['fake'] + return __import__(fake_module, globals(), locals(), from_list, -1) + + stubs.Set(xenapi_conn.XenAPISession, '_create_session', + lambda s, url: cls(url)) + stubs.Set(xenapi_conn.XenAPISession, 'get_imported_xenapi', + fake_import) + + +class FakeSessionForVMTests(fake.SessionBase): + """ Stubs out a XenAPISession for VM tests """ + def __init__(self, uri): + super(FakeSessionForVMTests, self).__init__(uri) + + def network_get_all_records_where(self, _1, _2): + return self.xenapi.network.get_all_records() + + def host_call_plugin(self, _1, _2, _3, _4, _5): + return '' + + def VM_start(self, _1, ref, _2, _3): + vm = fake.get_record('VM', ref) + if vm['power_state'] != 'Halted': + raise fake.Failure(['VM_BAD_POWER_STATE', ref, 'Halted', + vm['power_state']]) + vm['power_state'] = 'Running' + vm['is_a_template'] = False + vm['is_control_domain'] = False + + +class FakeSessionForVolumeTests(fake.SessionBase): + """ Stubs out a XenAPISession for Volume tests """ + def __init__(self, uri): + super(FakeSessionForVolumeTests, self).__init__(uri) + + def VBD_plug(self, _1, ref): + rec = fake.get_record('VBD', ref) + rec['currently-attached'] = True + + def VDI_introduce(self, _1, uuid, _2, _3, _4, _5, + _6, _7, _8, _9, _10, _11): + valid_vdi = False + refs = fake.get_all('VDI') + for ref in refs: + rec = fake.get_record('VDI', ref) + if rec['uuid'] == uuid: + valid_vdi = True + if not valid_vdi: + raise fake.Failure([['INVALID_VDI', 'session', self._session]]) + + +class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests): + """ Stubs out a XenAPISession for Volume tests: it injects failures """ + def __init__(self, uri): + super(FakeSessionForVolumeFailedTests, self).__init__(uri) + + def VDI_introduce(self, _1, uuid, _2, _3, _4, _5, + _6, _7, _8, _9, _10, _11): + # This is for testing failure + raise fake.Failure([['INVALID_VDI', 'session', self._session]]) + + def PBD_unplug(self, _1, ref): + rec = fake.get_record('PBD', ref) + rec['currently-attached'] = False + + def SR_forget(self, _1, ref): + pass |
