summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorIlya Alekseyev <ialekseev@griddynamics.com>2010-12-23 22:58:20 +0300
committerIlya Alekseyev <ialekseev@griddynamics.com>2010-12-23 22:58:20 +0300
commit57ead438d06dd5c6e98e971670f397bed5d7e29c (patch)
tree6a94815ea5d66930e4638769e463b7ec60c9cdc2 /nova/tests
parentd88817a360676173ac31566e13201d56f1e2b0b0 (diff)
parent75e2cbec9eb5132a49446f1b6d563d5f43d007de (diff)
Merge with trunk
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/__init__.py81
-rw-r--r--nova/tests/api/openstack/__init__.py13
-rw-r--r--nova/tests/api/openstack/fakes.py22
-rw-r--r--nova/tests/api/openstack/test_auth.py4
-rw-r--r--nova/tests/api/test.py81
-rw-r--r--nova/tests/api_integration.py54
-rw-r--r--nova/tests/db/__init__.py20
-rw-r--r--nova/tests/db/fakes.py75
-rw-r--r--nova/tests/test_access.py (renamed from nova/tests/access_unittest.py)0
-rw-r--r--nova/tests/test_api.py (renamed from nova/tests/api_unittest.py)0
-rw-r--r--nova/tests/test_auth.py (renamed from nova/tests/auth_unittest.py)0
-rw-r--r--nova/tests/test_cloud.py (renamed from nova/tests/cloud_unittest.py)30
-rw-r--r--nova/tests/test_compute.py (renamed from nova/tests/compute_unittest.py)1
-rw-r--r--nova/tests/test_flags.py (renamed from nova/tests/flags_unittest.py)0
-rw-r--r--nova/tests/test_middleware.py86
-rw-r--r--nova/tests/test_misc.py (renamed from nova/tests/misc_unittest.py)8
-rw-r--r--nova/tests/test_network.py (renamed from nova/tests/network_unittest.py)8
-rw-r--r--nova/tests/test_quota.py (renamed from nova/tests/quota_unittest.py)0
-rw-r--r--nova/tests/test_rpc.py (renamed from nova/tests/rpc_unittest.py)29
-rw-r--r--nova/tests/test_scheduler.py (renamed from nova/tests/scheduler_unittest.py)3
-rw-r--r--nova/tests/test_service.py (renamed from nova/tests/service_unittest.py)14
-rw-r--r--nova/tests/test_twistd.py (renamed from nova/tests/twistd_unittest.py)0
-rw-r--r--nova/tests/test_virt.py (renamed from nova/tests/virt_unittest.py)37
-rw-r--r--nova/tests/test_volume.py (renamed from nova/tests/volume_unittest.py)0
-rw-r--r--nova/tests/test_xenapi.py219
-rw-r--r--nova/tests/xenapi/__init__.py20
-rw-r--r--nova/tests/xenapi/stubs.py94
27 files changed, 697 insertions, 202 deletions
diff --git a/nova/tests/api/__init__.py b/nova/tests/api/__init__.py
index 9caa8c9d0..e69de29bb 100644
--- a/nova/tests/api/__init__.py
+++ b/nova/tests/api/__init__.py
@@ -1,81 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 OpenStack LLC.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Test for the root WSGI middleware for all API controllers.
-"""
-
-import unittest
-
-import stubout
-import webob
-import webob.dec
-
-import nova.exception
-from nova import api
-from nova.tests.api.fakes import APIStub
-
-
-class Test(unittest.TestCase):
-
- def setUp(self):
- self.stubs = stubout.StubOutForTesting()
-
- def tearDown(self):
- self.stubs.UnsetAll()
-
- def _request(self, url, subdomain, **kwargs):
- environ_keys = {'HTTP_HOST': '%s.example.com' % subdomain}
- environ_keys.update(kwargs)
- req = webob.Request.blank(url, environ_keys)
- return req.get_response(api.API('ec2'))
-
- def test_openstack(self):
- self.stubs.Set(api.openstack, 'API', APIStub)
- result = self._request('/v1.0/cloud', 'api')
- self.assertEqual(result.body, "/cloud")
-
- def test_ec2(self):
- self.stubs.Set(api.ec2, 'API', APIStub)
- result = self._request('/services/cloud', 'ec2')
- self.assertEqual(result.body, "/cloud")
-
- def test_not_found(self):
- self.stubs.Set(api.ec2, 'API', APIStub)
- self.stubs.Set(api.openstack, 'API', APIStub)
- result = self._request('/test/cloud', 'ec2')
- self.assertNotEqual(result.body, "/cloud")
-
- def test_query_api_versions(self):
- result = self._request('/', 'api')
- self.assertTrue('CURRENT' in result.body)
-
- def test_metadata(self):
- def go(url):
- result = self._request(url, 'ec2', REMOTE_ADDR='128.192.151.2')
- # Each should get to the ORM layer and fail to find the IP
- self.assertRaises(nova.exception.NotFound, go, '/latest/')
- self.assertRaises(nova.exception.NotFound, go, '/2009-04-04/')
- self.assertRaises(nova.exception.NotFound, go, '/1.0/')
-
- def test_ec2_root(self):
- result = self._request('/', 'ec2')
- self.assertTrue('2007-12-15\n' in result.body)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/nova/tests/api/openstack/__init__.py b/nova/tests/api/openstack/__init__.py
index 2e357febe..9e183bd0d 100644
--- a/nova/tests/api/openstack/__init__.py
+++ b/nova/tests/api/openstack/__init__.py
@@ -17,11 +17,16 @@
import unittest
-from nova.api.openstack import limited
-from nova.api.openstack import RateLimitingMiddleware
+from nova import context
+from nova import flags
+from nova.api.openstack.ratelimiting import RateLimitingMiddleware
+from nova.api.openstack.common import limited
from nova.tests.api.fakes import APIStub
+from nova import utils
from webob import Request
+FLAGS = flags.FLAGS
+
class RateLimitingMiddlewareTest(unittest.TestCase):
@@ -46,6 +51,8 @@ class RateLimitingMiddlewareTest(unittest.TestCase):
def exhaust(self, middleware, method, url, username, times):
req = Request.blank(url, dict(REQUEST_METHOD=method),
headers={'X-Auth-User': username})
+ req.environ['nova.context'] = context.RequestContext(username,
+ username)
for i in range(times):
resp = req.get_response(middleware)
self.assertEqual(resp.status_int, 200)
@@ -62,7 +69,7 @@ class RateLimitingMiddlewareTest(unittest.TestCase):
middleware = RateLimitingMiddleware(APIStub())
self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10)
self.exhaust(middleware, 'POST', '/images/4', 'usr2', 10)
- self.assertTrue(set(middleware.limiter._levels) ==
+ self.assertTrue(set(middleware.limiter._levels) == \
set(['usr1:POST', 'usr1:POST servers', 'usr2:POST']))
def test_POST_servers_action_correctly_ratelimited(self):
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 21b8aac1c..79663e43a 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -29,8 +29,11 @@ from nova import exception as exc
from nova import flags
from nova import utils
import nova.api.openstack.auth
-from nova.image import service
+from nova.api.openstack import auth
+from nova.api.openstack import ratelimiting
from nova.image import glance
+from nova.image import local
+from nova.image import service
from nova.tests import fake_flags
from nova.wsgi import Router
@@ -51,10 +54,11 @@ class FakeRouter(Router):
return res
-def fake_auth_init(self):
+def fake_auth_init(self, application):
self.db = FakeAuthDatabase()
self.context = Context()
self.auth = FakeAuthManager()
+ self.application = application
@webob.dec.wsgify
@@ -75,28 +79,28 @@ def stub_out_image_service(stubs):
def fake_image_show(meh, context, id):
return dict(kernelId=1, ramdiskId=1)
- stubs.Set(nova.image.local.LocalImageService, 'show', fake_image_show)
+ stubs.Set(local.LocalImageService, 'show', fake_image_show)
def stub_out_auth(stubs):
def fake_auth_init(self, app):
self.application = app
- stubs.Set(nova.api.openstack.AuthMiddleware,
+ stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__init__', fake_auth_init)
- stubs.Set(nova.api.openstack.AuthMiddleware,
+ stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__call__', fake_wsgi)
def stub_out_rate_limiting(stubs):
def fake_rate_init(self, app):
- super(nova.api.openstack.RateLimitingMiddleware, self).__init__(app)
+ super(ratelimiting.RateLimitingMiddleware, self).__init__(app)
self.application = app
- stubs.Set(nova.api.openstack.RateLimitingMiddleware,
+ stubs.Set(nova.api.openstack.ratelimiting.RateLimitingMiddleware,
'__init__', fake_rate_init)
- stubs.Set(nova.api.openstack.RateLimitingMiddleware,
+ stubs.Set(nova.api.openstack.ratelimiting.RateLimitingMiddleware,
'__call__', fake_wsgi)
@@ -173,7 +177,7 @@ class FakeToken(object):
class FakeRequestContext(object):
- def __init__(self, user, project):
+ def __init__(self, user, project, *args, **kwargs):
self.user_id = 1
self.project_id = 1
diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py
index 7b427c2db..489a1dfbf 100644
--- a/nova/tests/api/openstack/test_auth.py
+++ b/nova/tests/api/openstack/test_auth.py
@@ -34,7 +34,7 @@ class Test(unittest.TestCase):
def setUp(self):
self.stubs = stubout.StubOutForTesting()
- self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
+ self.stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__init__', fakes.fake_auth_init)
self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
@@ -131,7 +131,7 @@ class Test(unittest.TestCase):
class TestLimiter(unittest.TestCase):
def setUp(self):
self.stubs = stubout.StubOutForTesting()
- self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
+ self.stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__init__', fakes.fake_auth_init)
self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
diff --git a/nova/tests/api/test.py b/nova/tests/api/test.py
new file mode 100644
index 000000000..9caa8c9d0
--- /dev/null
+++ b/nova/tests/api/test.py
@@ -0,0 +1,81 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Test for the root WSGI middleware for all API controllers.
+"""
+
+import unittest
+
+import stubout
+import webob
+import webob.dec
+
+import nova.exception
+from nova import api
+from nova.tests.api.fakes import APIStub
+
+
+class Test(unittest.TestCase):
+
+ def setUp(self):
+ self.stubs = stubout.StubOutForTesting()
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+
+ def _request(self, url, subdomain, **kwargs):
+ environ_keys = {'HTTP_HOST': '%s.example.com' % subdomain}
+ environ_keys.update(kwargs)
+ req = webob.Request.blank(url, environ_keys)
+ return req.get_response(api.API('ec2'))
+
+ def test_openstack(self):
+ self.stubs.Set(api.openstack, 'API', APIStub)
+ result = self._request('/v1.0/cloud', 'api')
+ self.assertEqual(result.body, "/cloud")
+
+ def test_ec2(self):
+ self.stubs.Set(api.ec2, 'API', APIStub)
+ result = self._request('/services/cloud', 'ec2')
+ self.assertEqual(result.body, "/cloud")
+
+ def test_not_found(self):
+ self.stubs.Set(api.ec2, 'API', APIStub)
+ self.stubs.Set(api.openstack, 'API', APIStub)
+ result = self._request('/test/cloud', 'ec2')
+ self.assertNotEqual(result.body, "/cloud")
+
+ def test_query_api_versions(self):
+ result = self._request('/', 'api')
+ self.assertTrue('CURRENT' in result.body)
+
+ def test_metadata(self):
+ def go(url):
+ result = self._request(url, 'ec2', REMOTE_ADDR='128.192.151.2')
+ # Each should get to the ORM layer and fail to find the IP
+ self.assertRaises(nova.exception.NotFound, go, '/latest/')
+ self.assertRaises(nova.exception.NotFound, go, '/2009-04-04/')
+ self.assertRaises(nova.exception.NotFound, go, '/1.0/')
+
+ def test_ec2_root(self):
+ result = self._request('/', 'ec2')
+ self.assertTrue('2007-12-15\n' in result.body)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/nova/tests/api_integration.py b/nova/tests/api_integration.py
deleted file mode 100644
index 54403c655..000000000
--- a/nova/tests/api_integration.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import boto
-from boto.ec2.regioninfo import RegionInfo
-import unittest
-
-
-ACCESS_KEY = 'fake'
-SECRET_KEY = 'fake'
-CLC_IP = '127.0.0.1'
-CLC_PORT = 8773
-REGION = 'test'
-
-
-def get_connection():
- return boto.connect_ec2(
- aws_access_key_id=ACCESS_KEY,
- aws_secret_access_key=SECRET_KEY,
- is_secure=False,
- region=RegionInfo(None, REGION, CLC_IP),
- port=CLC_PORT,
- path='/services/Cloud',
- debug=99)
-
-
-class APIIntegrationTests(unittest.TestCase):
- def test_001_get_all_images(self):
- conn = get_connection()
- res = conn.get_all_images()
-
-
-if __name__ == '__main__':
- unittest.main()
-
-#print conn.get_all_key_pairs()
-#print conn.create_key_pair
-#print conn.create_security_group('name', 'description')
diff --git a/nova/tests/db/__init__.py b/nova/tests/db/__init__.py
new file mode 100644
index 000000000..2d43aac42
--- /dev/null
+++ b/nova/tests/db/__init__.py
@@ -0,0 +1,20 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+:mod:`db` -- Stubs for DB API
+=============================
+"""
diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py
new file mode 100644
index 000000000..05bdd172e
--- /dev/null
+++ b/nova/tests/db/fakes.py
@@ -0,0 +1,75 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Stubouts, mocks and fixtures for the test suite"""
+
+import time
+
+from nova import db
+from nova import utils
+from nova.compute import instance_types
+
+
+def stub_out_db_instance_api(stubs):
+ """ Stubs out the db API for creating Instances """
+
+ class FakeModel(object):
+ """ Stubs out for model """
+ def __init__(self, values):
+ self.values = values
+
+ def __getattr__(self, name):
+ return self.values[name]
+
+ def __getitem__(self, key):
+ if key in self.values:
+ return self.values[key]
+ else:
+ raise NotImplementedError()
+
+ def fake_instance_create(values):
+ """ Stubs out the db.instance_create method """
+
+ type_data = instance_types.INSTANCE_TYPES[values['instance_type']]
+
+ base_options = {
+ 'name': values['name'],
+ 'id': values['id'],
+ 'reservation_id': utils.generate_uid('r'),
+ 'image_id': values['image_id'],
+ 'kernel_id': values['kernel_id'],
+ 'ramdisk_id': values['ramdisk_id'],
+ 'state_description': 'scheduling',
+ 'user_id': values['user_id'],
+ 'project_id': values['project_id'],
+ 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
+ 'instance_type': values['instance_type'],
+ 'memory_mb': type_data['memory_mb'],
+ 'mac_address': values['mac_address'],
+ 'vcpus': type_data['vcpus'],
+ 'local_gb': type_data['local_gb'],
+ }
+ return FakeModel(base_options)
+
+ def fake_network_get_by_instance(context, instance_id):
+ fields = {
+ 'bridge': 'xenbr0',
+ }
+ return FakeModel(fields)
+
+ stubs.Set(db, 'instance_create', fake_instance_create)
+ stubs.Set(db, 'network_get_by_instance', fake_network_get_by_instance)
diff --git a/nova/tests/access_unittest.py b/nova/tests/test_access.py
index 58fdea3b5..58fdea3b5 100644
--- a/nova/tests/access_unittest.py
+++ b/nova/tests/test_access.py
diff --git a/nova/tests/api_unittest.py b/nova/tests/test_api.py
index 33d4cb294..33d4cb294 100644
--- a/nova/tests/api_unittest.py
+++ b/nova/tests/test_api.py
diff --git a/nova/tests/auth_unittest.py b/nova/tests/test_auth.py
index 15d40bc53..15d40bc53 100644
--- a/nova/tests/auth_unittest.py
+++ b/nova/tests/test_auth.py
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/test_cloud.py
index 53a762310..70d2c44da 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/test_cloud.py
@@ -22,20 +22,18 @@ import logging
from M2Crypto import BIO
from M2Crypto import RSA
import os
-import StringIO
import tempfile
import time
from eventlet import greenthread
-from xml.etree import ElementTree
from nova import context
from nova import crypto
from nova import db
from nova import flags
from nova import rpc
+from nova import service
from nova import test
-from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
@@ -54,7 +52,8 @@ os.makedirs(IMAGES_PATH)
class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
- self.flags(connection_type='fake', images_path=IMAGES_PATH)
+ self.flags(connection_type='fake',
+ images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG)
@@ -62,27 +61,23 @@ class CloudTestCase(test.TestCase):
# set up our cloud
self.cloud = cloud.CloudController()
- # set up a service
- self.compute = utils.import_object(FLAGS.compute_manager)
- self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.compute_topic,
- proxy=self.compute)
- self.compute_consumer.attach_to_eventlet()
- self.network = utils.import_object(FLAGS.network_manager)
- self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.network_topic,
- proxy=self.network)
- self.network_consumer.attach_to_eventlet()
+ # set up services
+ self.compute = service.Service.create(binary='nova-compute')
+ self.compute.start()
+ self.network = service.Service.create(binary='nova-network')
+ self.network.start()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
- project=self.project)
+ project=self.project)
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+ self.compute.kill()
+ self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -109,12 +104,13 @@ class CloudTestCase(test.TestCase):
{'address': address,
'host': FLAGS.host})
self.cloud.allocate_address(self.context)
- inst = db.instance_create(self.context, {})
+ inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
+ greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
diff --git a/nova/tests/compute_unittest.py b/nova/tests/test_compute.py
index 187ca31de..348bb3351 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/test_compute.py
@@ -41,6 +41,7 @@ class ComputeTestCase(test.TestCase):
logging.getLogger().setLevel(logging.DEBUG)
super(ComputeTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
diff --git a/nova/tests/flags_unittest.py b/nova/tests/test_flags.py
index 707300fcf..707300fcf 100644
--- a/nova/tests/flags_unittest.py
+++ b/nova/tests/test_flags.py
diff --git a/nova/tests/test_middleware.py b/nova/tests/test_middleware.py
new file mode 100644
index 000000000..0febf52d6
--- /dev/null
+++ b/nova/tests/test_middleware.py
@@ -0,0 +1,86 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import webob
+import webob.dec
+import webob.exc
+
+from nova.api import ec2
+from nova import flags
+from nova import test
+from nova import utils
+
+
+FLAGS = flags.FLAGS
+
+
+@webob.dec.wsgify
+def conditional_forbid(req):
+ """Helper wsgi app returns 403 if param 'die' is 1."""
+ if 'die' in req.params and req.params['die'] == '1':
+ raise webob.exc.HTTPForbidden()
+ return 'OK'
+
+
+class LockoutTestCase(test.TrialTestCase):
+ """Test case for the Lockout middleware."""
+ def setUp(self): # pylint: disable-msg=C0103
+ super(LockoutTestCase, self).setUp()
+ utils.set_time_override()
+ self.lockout = ec2.Lockout(conditional_forbid)
+
+ def tearDown(self): # pylint: disable-msg=C0103
+ utils.clear_time_override()
+ super(LockoutTestCase, self).tearDown()
+
+ def _send_bad_attempts(self, access_key, num_attempts=1):
+ """Fail x."""
+ for i in xrange(num_attempts):
+ req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key)
+ self.assertEqual(req.get_response(self.lockout).status_int, 403)
+
+ def _is_locked_out(self, access_key):
+ """Sends a test request to see if key is locked out."""
+ req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key)
+ return (req.get_response(self.lockout).status_int == 403)
+
+ def test_lockout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test'))
+
+ def test_timeout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test'))
+ utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
+ self.assertFalse(self._is_locked_out('test'))
+
+ def test_multiple_keys(self):
+ self._send_bad_attempts('test1', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test1'))
+ self.assertFalse(self._is_locked_out('test2'))
+ utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
+ self.assertFalse(self._is_locked_out('test1'))
+ self.assertFalse(self._is_locked_out('test2'))
+
+ def test_window_timeout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
+ self.assertFalse(self._is_locked_out('test'))
+ utils.advance_time_seconds(FLAGS.lockout_window * 60)
+ self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
+ self.assertFalse(self._is_locked_out('test'))
diff --git a/nova/tests/misc_unittest.py b/nova/tests/test_misc.py
index 3d947427a..33c1777d5 100644
--- a/nova/tests/misc_unittest.py
+++ b/nova/tests/test_misc.py
@@ -22,13 +22,13 @@ from nova.utils import parse_mailmap, str_dict_replace
class ProjectTestCase(test.TestCase):
def test_authors_up_to_date(self):
- if os.path.exists('../.bzr'):
+ if os.path.exists('.bzr'):
contributors = set()
- mailmap = parse_mailmap('../.mailmap')
+ mailmap = parse_mailmap('.mailmap')
import bzrlib.workingtree
- tree = bzrlib.workingtree.WorkingTree.open('..')
+ tree = bzrlib.workingtree.WorkingTree.open('.')
tree.lock_read()
try:
parents = tree.get_parent_ids()
@@ -42,7 +42,7 @@ class ProjectTestCase(test.TestCase):
email = author.split(' ')[-1]
contributors.add(str_dict_replace(email, mailmap))
- authors_file = open('../Authors', 'r').read()
+ authors_file = open('Authors', 'r').read()
missing = set()
for contributor in contributors:
diff --git a/nova/tests/network_unittest.py b/nova/tests/test_network.py
index bcac20585..96473ac7c 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/test_network.py
@@ -26,6 +26,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
+from nova import service
from nova import test
from nova import utils
from nova.auth import manager
@@ -40,6 +41,7 @@ class NetworkTestCase(test.TestCase):
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
+ fake_call=True,
fake_network=True,
network_size=16,
num_networks=5)
@@ -56,16 +58,13 @@ class NetworkTestCase(test.TestCase):
# create the necessary network data for the project
user_context = context.RequestContext(project=self.projects[i],
user=self.user)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
instance_ref = self._create_instance(0)
self.instance_id = instance_ref['id']
instance_ref = self._create_instance(1)
self.instance2_id = instance_ref['id']
def tearDown(self):
- super(NetworkTestCase, self).tearDown()
# TODO(termie): this should really be instantiating clean datastores
# in between runs, one failure kills all the tests
db.instance_destroy(context.get_admin_context(), self.instance_id)
@@ -73,6 +72,7 @@ class NetworkTestCase(test.TestCase):
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
+ super(NetworkTestCase, self).tearDown()
def _create_instance(self, project_num, mac=None):
if not mac:
diff --git a/nova/tests/quota_unittest.py b/nova/tests/test_quota.py
index 8cf2a5e54..8cf2a5e54 100644
--- a/nova/tests/quota_unittest.py
+++ b/nova/tests/test_quota.py
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/test_rpc.py
index a2495e65a..6ea2edcab 100644
--- a/nova/tests/rpc_unittest.py
+++ b/nova/tests/test_rpc.py
@@ -33,7 +33,7 @@ class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
@@ -79,6 +79,33 @@ class RpcTestCase(test.TestCase):
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
+ def test_nested_calls(self):
+ """Test that we can do an rpc.call inside another call"""
+ class Nested(object):
+ @staticmethod
+ def echo(context, queue, value):
+ """Calls echo in the passed queue"""
+ logging.debug("Nested received %s, %s", queue, value)
+ ret = rpc.call(context,
+ queue,
+ {"method": "echo",
+ "args": {"value": value}})
+ logging.debug("Nested return %s", ret)
+ return value
+
+ nested = Nested()
+ conn = rpc.Connection.instance(True)
+ consumer = rpc.AdapterConsumer(connection=conn,
+ topic='nested',
+ proxy=nested)
+ consumer.attach_to_eventlet()
+ value = 42
+ result = rpc.call(self.context,
+ 'nested', {"method": "echo",
+ "args": {"queue": "test",
+ "value": value}})
+ self.assertEqual(value, result)
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/test_scheduler.py
index dbc195a6e..487e7b9e6 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/test_scheduler.py
@@ -51,7 +51,7 @@ class SchedulerTestCase(test.TestCase):
"""Test case for scheduler"""
def setUp(self):
super(SchedulerTestCase, self).setUp()
- self.flags(scheduler_driver='nova.tests.scheduler_unittest.TestDriver')
+ self.flags(scheduler_driver='nova.tests.test_scheduler.TestDriver')
def test_fallback(self):
scheduler = manager.SchedulerManager()
@@ -121,6 +121,7 @@ class SimpleDriverTestCase(test.TestCase):
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
max_cores=4,
max_gigabytes=4,
network_manager='nova.network.manager.FlatManager',
diff --git a/nova/tests/service_unittest.py b/nova/tests/test_service.py
index 47c092f8e..b30838ad7 100644
--- a/nova/tests/service_unittest.py
+++ b/nova/tests/test_service.py
@@ -30,7 +30,7 @@ from nova import service
from nova import manager
FLAGS = flags.FLAGS
-flags.DEFINE_string("fake_manager", "nova.tests.service_unittest.FakeManager",
+flags.DEFINE_string("fake_manager", "nova.tests.test_service.FakeManager",
"Manager for testing")
@@ -52,14 +52,14 @@ class ServiceManagerTestCase(test.TestCase):
serv = service.Service('test',
'test',
'test',
- 'nova.tests.service_unittest.FakeManager')
+ 'nova.tests.test_service.FakeManager')
self.assertRaises(AttributeError, getattr, serv, 'test_method')
def test_message_gets_to_manager(self):
serv = service.Service('test',
'test',
'test',
- 'nova.tests.service_unittest.FakeManager')
+ 'nova.tests.test_service.FakeManager')
serv.start()
self.assertEqual(serv.test_method(), 'manager')
@@ -67,7 +67,7 @@ class ServiceManagerTestCase(test.TestCase):
serv = ExtendedService('test',
'test',
'test',
- 'nova.tests.service_unittest.FakeManager')
+ 'nova.tests.test_service.FakeManager')
serv.start()
self.assertEqual(serv.test_method(), 'service')
@@ -156,7 +156,7 @@ class ServiceTestCase(test.TestCase):
serv = service.Service(host,
binary,
topic,
- 'nova.tests.service_unittest.FakeManager')
+ 'nova.tests.test_service.FakeManager')
serv.start()
serv.report_state()
@@ -186,7 +186,7 @@ class ServiceTestCase(test.TestCase):
serv = service.Service(host,
binary,
topic,
- 'nova.tests.service_unittest.FakeManager')
+ 'nova.tests.test_service.FakeManager')
serv.start()
serv.report_state()
self.assert_(serv.model_disconnected)
@@ -219,7 +219,7 @@ class ServiceTestCase(test.TestCase):
serv = service.Service(host,
binary,
topic,
- 'nova.tests.service_unittest.FakeManager')
+ 'nova.tests.test_service.FakeManager')
serv.start()
serv.model_disconnected = True
serv.report_state()
diff --git a/nova/tests/twistd_unittest.py b/nova/tests/test_twistd.py
index 75007b9c8..75007b9c8 100644
--- a/nova/tests/twistd_unittest.py
+++ b/nova/tests/test_twistd.py
diff --git a/nova/tests/virt_unittest.py b/nova/tests/test_virt.py
index cb35db1e1..8dab8de2f 100644
--- a/nova/tests/virt_unittest.py
+++ b/nova/tests/test_virt.py
@@ -33,6 +33,7 @@ flags.DECLARE('instances_path', 'nova.compute.manager')
class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
+ self.flags(fake_call=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
@@ -52,45 +53,43 @@ class LibvirtConnTestCase(test.TestCase):
def test_xml_and_uri_no_ramdisk_no_kernel(self):
instance_data = dict(self.test_instance)
- self.do_test_xml_and_uri(instance_data,
- expect_kernel=False, expect_ramdisk=False)
+ self._check_xml_and_uri(instance_data,
+ expect_kernel=False, expect_ramdisk=False)
def test_xml_and_uri_no_ramdisk(self):
instance_data = dict(self.test_instance)
instance_data['kernel_id'] = 'aki-deadbeef'
- self.do_test_xml_and_uri(instance_data,
- expect_kernel=True, expect_ramdisk=False)
+ self._check_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=False)
def test_xml_and_uri_no_kernel(self):
instance_data = dict(self.test_instance)
instance_data['ramdisk_id'] = 'ari-deadbeef'
- self.do_test_xml_and_uri(instance_data,
- expect_kernel=False, expect_ramdisk=False)
+ self._check_xml_and_uri(instance_data,
+ expect_kernel=False, expect_ramdisk=False)
def test_xml_and_uri(self):
instance_data = dict(self.test_instance)
instance_data['ramdisk_id'] = 'ari-deadbeef'
instance_data['kernel_id'] = 'aki-deadbeef'
- self.do_test_xml_and_uri(instance_data,
- expect_kernel=True, expect_ramdisk=True)
+ self._check_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=True)
def test_xml_and_uri_rescue(self):
instance_data = dict(self.test_instance)
instance_data['ramdisk_id'] = 'ari-deadbeef'
instance_data['kernel_id'] = 'aki-deadbeef'
- self.do_test_xml_and_uri(instance_data,
- expect_kernel=True, expect_ramdisk=True,
- rescue=True)
+ self._check_xml_and_uri(instance_data, expect_kernel=True,
+ expect_ramdisk=True, rescue=True)
- def do_test_xml_and_uri(self, instance,
- expect_ramdisk, expect_kernel,
- rescue=False):
+ def _check_xml_and_uri(self, instance, expect_ramdisk, expect_kernel,
+ rescue=False):
user_context = context.RequestContext(project=self.project,
user=self.user)
instance_ref = db.instance_create(user_context, instance)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
+ network_ref = db.project_get_network(context.get_admin_context(),
+ self.project.id)
fixed_ip = {'address': self.test_ip,
'network_id': network_ref['id']}
@@ -338,7 +337,7 @@ class NWFilterTestCase(test.TestCase):
self.security_group.id)
instance = db.instance_get(self.context, inst_id)
- d = self.fw.setup_nwfilters_for_instance(instance)
+ self.fw.setup_base_nwfilters()
+ self.fw.setup_nwfilters_for_instance(instance)
_ensure_all_called()
self.teardown_security_group()
- return d
diff --git a/nova/tests/volume_unittest.py b/nova/tests/test_volume.py
index b13455fb0..b13455fb0 100644
--- a/nova/tests/volume_unittest.py
+++ b/nova/tests/test_volume.py
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
new file mode 100644
index 000000000..b5d3ea395
--- /dev/null
+++ b/nova/tests/test_xenapi.py
@@ -0,0 +1,219 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Test suite for XenAPI
+"""
+
+import stubout
+
+from nova import db
+from nova import context
+from nova import flags
+from nova import test
+from nova import utils
+from nova.auth import manager
+from nova.compute import instance_types
+from nova.compute import power_state
+from nova.virt import xenapi_conn
+from nova.virt.xenapi import fake
+from nova.virt.xenapi import volume_utils
+from nova.tests.db import fakes
+from nova.tests.xenapi import stubs
+
+FLAGS = flags.FLAGS
+
+
+class XenAPIVolumeTestCase(test.TestCase):
+ """
+ Unit tests for Volume operations
+ """
+ def setUp(self):
+ super(XenAPIVolumeTestCase, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ FLAGS.target_host = '127.0.0.1'
+ FLAGS.xenapi_connection_url = 'test_url'
+ FLAGS.xenapi_connection_password = 'test_pass'
+ fakes.stub_out_db_instance_api(self.stubs)
+ fake.reset()
+ self.values = {'name': 1, 'id': 1,
+ 'project_id': 'fake',
+ 'user_id': 'fake',
+ 'image_id': 1,
+ 'kernel_id': 2,
+ 'ramdisk_id': 3,
+ 'instance_type': 'm1.large',
+ 'mac_address': 'aa:bb:cc:dd:ee:ff',
+ }
+
+ def _create_volume(self, size='0'):
+ """Create a volume object."""
+ vol = {}
+ vol['size'] = size
+ vol['user_id'] = 'fake'
+ vol['project_id'] = 'fake'
+ vol['host'] = 'localhost'
+ vol['availability_zone'] = FLAGS.storage_availability_zone
+ vol['status'] = "creating"
+ vol['attach_status'] = "detached"
+ return db.volume_create(context.get_admin_context(), vol)
+
+ def test_create_iscsi_storage(self):
+ """ This shows how to test helper classes' methods """
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
+ session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass')
+ helper = volume_utils.VolumeHelper
+ helper.XenAPI = session.get_imported_xenapi()
+ vol = self._create_volume()
+ info = helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
+ label = 'SR-%s' % vol['ec2_id']
+ description = 'Test-SR'
+ sr_ref = helper.create_iscsi_storage(session, info, label, description)
+ srs = fake.get_all('SR')
+ self.assertEqual(sr_ref, srs[0])
+ db.volume_destroy(context.get_admin_context(), vol['id'])
+
+ def test_parse_volume_info_raise_exception(self):
+ """ This shows how to test helper classes' methods """
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
+ session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass')
+ helper = volume_utils.VolumeHelper
+ helper.XenAPI = session.get_imported_xenapi()
+ vol = self._create_volume()
+ # oops, wrong mount point!
+ self.assertRaises(volume_utils.StorageError,
+ helper.parse_volume_info,
+ vol['ec2_id'],
+ '/dev/sd')
+ db.volume_destroy(context.get_admin_context(), vol['id'])
+
+ def test_attach_volume(self):
+ """ This shows how to test Ops classes' methods """
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
+ conn = xenapi_conn.get_connection(False)
+ volume = self._create_volume()
+ instance = db.instance_create(self.values)
+ fake.create_vm(instance.name, 'Running')
+ result = conn.attach_volume(instance.name, volume['ec2_id'],
+ '/dev/sdc')
+
+ def check():
+ # check that the VM has a VBD attached to it
+ # Get XenAPI reference for the VM
+ vms = fake.get_all('VM')
+ # Get XenAPI record for VBD
+ vbds = fake.get_all('VBD')
+ vbd = fake.get_record('VBD', vbds[0])
+ vm_ref = vbd['VM']
+ self.assertEqual(vm_ref, vms[0])
+
+ check()
+
+ def test_attach_volume_raise_exception(self):
+ """ This shows how to test when exceptions are raised """
+ stubs.stubout_session(self.stubs,
+ stubs.FakeSessionForVolumeFailedTests)
+ conn = xenapi_conn.get_connection(False)
+ volume = self._create_volume()
+ instance = db.instance_create(self.values)
+ fake.create_vm(instance.name, 'Running')
+ self.assertRaises(Exception,
+ conn.attach_volume,
+ instance.name,
+ volume['ec2_id'],
+ '/dev/sdc')
+
+ def tearDown(self):
+ super(XenAPIVolumeTestCase, self).tearDown()
+ self.stubs.UnsetAll()
+
+
+class XenAPIVMTestCase(test.TestCase):
+ """
+ Unit tests for VM operations
+ """
+ def setUp(self):
+ super(XenAPIVMTestCase, self).setUp()
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake',
+ admin=True)
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.stubs = stubout.StubOutForTesting()
+ FLAGS.xenapi_connection_url = 'test_url'
+ FLAGS.xenapi_connection_password = 'test_pass'
+ fake.reset()
+ fakes.stub_out_db_instance_api(self.stubs)
+ fake.create_network('fake', FLAGS.flat_network_bridge)
+
+ def test_list_instances_0(self):
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
+ conn = xenapi_conn.get_connection(False)
+ instances = conn.list_instances()
+ self.assertEquals(instances, [])
+
+ def test_spawn(self):
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
+ values = {'name': 1, 'id': 1,
+ 'project_id': self.project.id,
+ 'user_id': self.user.id,
+ 'image_id': 1,
+ 'kernel_id': 2,
+ 'ramdisk_id': 3,
+ 'instance_type': 'm1.large',
+ 'mac_address': 'aa:bb:cc:dd:ee:ff',
+ }
+ conn = xenapi_conn.get_connection(False)
+ instance = db.instance_create(values)
+ conn.spawn(instance)
+
+ def check():
+ instances = conn.list_instances()
+ self.assertEquals(instances, [1])
+
+ # Get Nova record for VM
+ vm_info = conn.get_info(1)
+
+ # Get XenAPI record for VM
+ vms = fake.get_all('VM')
+ vm = fake.get_record('VM', vms[0])
+
+ # Check that m1.large above turned into the right thing.
+ instance_type = instance_types.INSTANCE_TYPES['m1.large']
+ mem_kib = long(instance_type['memory_mb']) << 10
+ mem_bytes = str(mem_kib << 10)
+ vcpus = instance_type['vcpus']
+ self.assertEquals(vm_info['max_mem'], mem_kib)
+ self.assertEquals(vm_info['mem'], mem_kib)
+ self.assertEquals(vm['memory_static_max'], mem_bytes)
+ self.assertEquals(vm['memory_dynamic_max'], mem_bytes)
+ self.assertEquals(vm['memory_dynamic_min'], mem_bytes)
+ self.assertEquals(vm['VCPUs_max'], str(vcpus))
+ self.assertEquals(vm['VCPUs_at_startup'], str(vcpus))
+
+ # Check that the VM is running according to Nova
+ self.assertEquals(vm_info['state'], power_state.RUNNING)
+
+ # Check that the VM is running according to XenAPI.
+ self.assertEquals(vm['power_state'], 'Running')
+
+ check()
+
+ def tearDown(self):
+ super(XenAPIVMTestCase, self).tearDown()
+ self.manager.delete_project(self.project)
+ self.manager.delete_user(self.user)
+ self.stubs.UnsetAll()
diff --git a/nova/tests/xenapi/__init__.py b/nova/tests/xenapi/__init__.py
new file mode 100644
index 000000000..1dd02bdc1
--- /dev/null
+++ b/nova/tests/xenapi/__init__.py
@@ -0,0 +1,20 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+:mod:`xenapi` -- Stubs for XenAPI
+=================================
+"""
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
new file mode 100644
index 000000000..1dacad6a3
--- /dev/null
+++ b/nova/tests/xenapi/stubs.py
@@ -0,0 +1,94 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Stubouts, mocks and fixtures for the test suite"""
+
+from nova.virt import xenapi_conn
+from nova.virt.xenapi import fake
+
+
+def stubout_session(stubs, cls):
+ """ Stubs out two methods from XenAPISession """
+ def fake_import(self):
+ """ Stubs out get_imported_xenapi of XenAPISession """
+ fake_module = 'nova.virt.xenapi.fake'
+ from_list = ['fake']
+ return __import__(fake_module, globals(), locals(), from_list, -1)
+
+ stubs.Set(xenapi_conn.XenAPISession, '_create_session',
+ lambda s, url: cls(url))
+ stubs.Set(xenapi_conn.XenAPISession, 'get_imported_xenapi',
+ fake_import)
+
+
+class FakeSessionForVMTests(fake.SessionBase):
+ """ Stubs out a XenAPISession for VM tests """
+ def __init__(self, uri):
+ super(FakeSessionForVMTests, self).__init__(uri)
+
+ def network_get_all_records_where(self, _1, _2):
+ return self.xenapi.network.get_all_records()
+
+ def host_call_plugin(self, _1, _2, _3, _4, _5):
+ return ''
+
+ def VM_start(self, _1, ref, _2, _3):
+ vm = fake.get_record('VM', ref)
+ if vm['power_state'] != 'Halted':
+ raise fake.Failure(['VM_BAD_POWER_STATE', ref, 'Halted',
+ vm['power_state']])
+ vm['power_state'] = 'Running'
+ vm['is_a_template'] = False
+ vm['is_control_domain'] = False
+
+
+class FakeSessionForVolumeTests(fake.SessionBase):
+ """ Stubs out a XenAPISession for Volume tests """
+ def __init__(self, uri):
+ super(FakeSessionForVolumeTests, self).__init__(uri)
+
+ def VBD_plug(self, _1, ref):
+ rec = fake.get_record('VBD', ref)
+ rec['currently-attached'] = True
+
+ def VDI_introduce(self, _1, uuid, _2, _3, _4, _5,
+ _6, _7, _8, _9, _10, _11):
+ valid_vdi = False
+ refs = fake.get_all('VDI')
+ for ref in refs:
+ rec = fake.get_record('VDI', ref)
+ if rec['uuid'] == uuid:
+ valid_vdi = True
+ if not valid_vdi:
+ raise fake.Failure([['INVALID_VDI', 'session', self._session]])
+
+
+class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests):
+ """ Stubs out a XenAPISession for Volume tests: it injects failures """
+ def __init__(self, uri):
+ super(FakeSessionForVolumeFailedTests, self).__init__(uri)
+
+ def VDI_introduce(self, _1, uuid, _2, _3, _4, _5,
+ _6, _7, _8, _9, _10, _11):
+ # This is for testing failure
+ raise fake.Failure([['INVALID_VDI', 'session', self._session]])
+
+ def PBD_unplug(self, _1, ref):
+ rec = fake.get_record('PBD', ref)
+ rec['currently-attached'] = False
+
+ def SR_forget(self, _1, ref):
+ pass