summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorTrey Morris <trey.morris@rackspace.com>2010-12-23 21:53:33 +0000
committerTrey Morris <trey.morris@rackspace.com>2010-12-23 21:53:33 +0000
commit5f8d02b39fb8917b34b68bbbf450656e1b68211c (patch)
tree4a8489e10fecea511e3fffece42de81c9fdc7837 /nova/tests
parent1c26d2b2ce824dbc64525eea699efbfa8bf04617 (diff)
parent75e2cbec9eb5132a49446f1b6d563d5f43d007de (diff)
downloadnova-5f8d02b39fb8917b34b68bbbf450656e1b68211c.tar.gz
nova-5f8d02b39fb8917b34b68bbbf450656e1b68211c.tar.xz
nova-5f8d02b39fb8917b34b68bbbf450656e1b68211c.zip
fixed merge conflict
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/__init__.py81
-rw-r--r--nova/tests/api/openstack/__init__.py13
-rw-r--r--nova/tests/api/openstack/fakes.py22
-rw-r--r--nova/tests/api/openstack/test_auth.py4
-rw-r--r--nova/tests/api/openstack/test_servers.py37
-rw-r--r--nova/tests/api/test.py81
-rw-r--r--nova/tests/api_integration.py54
-rw-r--r--nova/tests/db/__init__.py20
-rw-r--r--nova/tests/db/fakes.py75
-rw-r--r--nova/tests/objectstore_unittest.py4
-rw-r--r--nova/tests/process_unittest.py132
-rw-r--r--nova/tests/test_access.py (renamed from nova/tests/access_unittest.py)2
-rw-r--r--nova/tests/test_api.py (renamed from nova/tests/api_unittest.py)0
-rw-r--r--nova/tests/test_auth.py (renamed from nova/tests/auth_unittest.py)33
-rw-r--r--nova/tests/test_cloud.py (renamed from nova/tests/cloud_unittest.py)36
-rw-r--r--nova/tests/test_compute.py (renamed from nova/tests/compute_unittest.py)50
-rw-r--r--nova/tests/test_flags.py (renamed from nova/tests/flags_unittest.py)2
-rw-r--r--nova/tests/test_middleware.py86
-rw-r--r--nova/tests/test_misc.py (renamed from nova/tests/misc_unittest.py)10
-rw-r--r--nova/tests/test_network.py (renamed from nova/tests/network_unittest.py)10
-rw-r--r--nova/tests/test_quota.py (renamed from nova/tests/quota_unittest.py)2
-rw-r--r--nova/tests/test_rpc.py (renamed from nova/tests/rpc_unittest.py)65
-rw-r--r--nova/tests/test_scheduler.py (renamed from nova/tests/scheduler_unittest.py)27
-rw-r--r--nova/tests/test_service.py (renamed from nova/tests/service_unittest.py)60
-rw-r--r--nova/tests/test_twistd.py (renamed from nova/tests/twistd_unittest.py)0
-rw-r--r--nova/tests/test_virt.py (renamed from nova/tests/virt_unittest.py)154
-rw-r--r--nova/tests/test_volume.py (renamed from nova/tests/volume_unittest.py)58
-rw-r--r--nova/tests/test_xenapi.py219
-rw-r--r--nova/tests/validator_unittest.py42
-rw-r--r--nova/tests/xenapi/__init__.py20
-rw-r--r--nova/tests/xenapi/stubs.py94
31 files changed, 951 insertions, 542 deletions
diff --git a/nova/tests/api/__init__.py b/nova/tests/api/__init__.py
index 9caa8c9d0..e69de29bb 100644
--- a/nova/tests/api/__init__.py
+++ b/nova/tests/api/__init__.py
@@ -1,81 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 OpenStack LLC.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Test for the root WSGI middleware for all API controllers.
-"""
-
-import unittest
-
-import stubout
-import webob
-import webob.dec
-
-import nova.exception
-from nova import api
-from nova.tests.api.fakes import APIStub
-
-
-class Test(unittest.TestCase):
-
- def setUp(self):
- self.stubs = stubout.StubOutForTesting()
-
- def tearDown(self):
- self.stubs.UnsetAll()
-
- def _request(self, url, subdomain, **kwargs):
- environ_keys = {'HTTP_HOST': '%s.example.com' % subdomain}
- environ_keys.update(kwargs)
- req = webob.Request.blank(url, environ_keys)
- return req.get_response(api.API('ec2'))
-
- def test_openstack(self):
- self.stubs.Set(api.openstack, 'API', APIStub)
- result = self._request('/v1.0/cloud', 'api')
- self.assertEqual(result.body, "/cloud")
-
- def test_ec2(self):
- self.stubs.Set(api.ec2, 'API', APIStub)
- result = self._request('/services/cloud', 'ec2')
- self.assertEqual(result.body, "/cloud")
-
- def test_not_found(self):
- self.stubs.Set(api.ec2, 'API', APIStub)
- self.stubs.Set(api.openstack, 'API', APIStub)
- result = self._request('/test/cloud', 'ec2')
- self.assertNotEqual(result.body, "/cloud")
-
- def test_query_api_versions(self):
- result = self._request('/', 'api')
- self.assertTrue('CURRENT' in result.body)
-
- def test_metadata(self):
- def go(url):
- result = self._request(url, 'ec2', REMOTE_ADDR='128.192.151.2')
- # Each should get to the ORM layer and fail to find the IP
- self.assertRaises(nova.exception.NotFound, go, '/latest/')
- self.assertRaises(nova.exception.NotFound, go, '/2009-04-04/')
- self.assertRaises(nova.exception.NotFound, go, '/1.0/')
-
- def test_ec2_root(self):
- result = self._request('/', 'ec2')
- self.assertTrue('2007-12-15\n' in result.body)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/nova/tests/api/openstack/__init__.py b/nova/tests/api/openstack/__init__.py
index 2e357febe..9e183bd0d 100644
--- a/nova/tests/api/openstack/__init__.py
+++ b/nova/tests/api/openstack/__init__.py
@@ -17,11 +17,16 @@
import unittest
-from nova.api.openstack import limited
-from nova.api.openstack import RateLimitingMiddleware
+from nova import context
+from nova import flags
+from nova.api.openstack.ratelimiting import RateLimitingMiddleware
+from nova.api.openstack.common import limited
from nova.tests.api.fakes import APIStub
+from nova import utils
from webob import Request
+FLAGS = flags.FLAGS
+
class RateLimitingMiddlewareTest(unittest.TestCase):
@@ -46,6 +51,8 @@ class RateLimitingMiddlewareTest(unittest.TestCase):
def exhaust(self, middleware, method, url, username, times):
req = Request.blank(url, dict(REQUEST_METHOD=method),
headers={'X-Auth-User': username})
+ req.environ['nova.context'] = context.RequestContext(username,
+ username)
for i in range(times):
resp = req.get_response(middleware)
self.assertEqual(resp.status_int, 200)
@@ -62,7 +69,7 @@ class RateLimitingMiddlewareTest(unittest.TestCase):
middleware = RateLimitingMiddleware(APIStub())
self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10)
self.exhaust(middleware, 'POST', '/images/4', 'usr2', 10)
- self.assertTrue(set(middleware.limiter._levels) ==
+ self.assertTrue(set(middleware.limiter._levels) == \
set(['usr1:POST', 'usr1:POST servers', 'usr2:POST']))
def test_POST_servers_action_correctly_ratelimited(self):
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 21b8aac1c..79663e43a 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -29,8 +29,11 @@ from nova import exception as exc
from nova import flags
from nova import utils
import nova.api.openstack.auth
-from nova.image import service
+from nova.api.openstack import auth
+from nova.api.openstack import ratelimiting
from nova.image import glance
+from nova.image import local
+from nova.image import service
from nova.tests import fake_flags
from nova.wsgi import Router
@@ -51,10 +54,11 @@ class FakeRouter(Router):
return res
-def fake_auth_init(self):
+def fake_auth_init(self, application):
self.db = FakeAuthDatabase()
self.context = Context()
self.auth = FakeAuthManager()
+ self.application = application
@webob.dec.wsgify
@@ -75,28 +79,28 @@ def stub_out_image_service(stubs):
def fake_image_show(meh, context, id):
return dict(kernelId=1, ramdiskId=1)
- stubs.Set(nova.image.local.LocalImageService, 'show', fake_image_show)
+ stubs.Set(local.LocalImageService, 'show', fake_image_show)
def stub_out_auth(stubs):
def fake_auth_init(self, app):
self.application = app
- stubs.Set(nova.api.openstack.AuthMiddleware,
+ stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__init__', fake_auth_init)
- stubs.Set(nova.api.openstack.AuthMiddleware,
+ stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__call__', fake_wsgi)
def stub_out_rate_limiting(stubs):
def fake_rate_init(self, app):
- super(nova.api.openstack.RateLimitingMiddleware, self).__init__(app)
+ super(ratelimiting.RateLimitingMiddleware, self).__init__(app)
self.application = app
- stubs.Set(nova.api.openstack.RateLimitingMiddleware,
+ stubs.Set(nova.api.openstack.ratelimiting.RateLimitingMiddleware,
'__init__', fake_rate_init)
- stubs.Set(nova.api.openstack.RateLimitingMiddleware,
+ stubs.Set(nova.api.openstack.ratelimiting.RateLimitingMiddleware,
'__call__', fake_wsgi)
@@ -173,7 +177,7 @@ class FakeToken(object):
class FakeRequestContext(object):
- def __init__(self, user, project):
+ def __init__(self, user, project, *args, **kwargs):
self.user_id = 1
self.project_id = 1
diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py
index 7b427c2db..489a1dfbf 100644
--- a/nova/tests/api/openstack/test_auth.py
+++ b/nova/tests/api/openstack/test_auth.py
@@ -34,7 +34,7 @@ class Test(unittest.TestCase):
def setUp(self):
self.stubs = stubout.StubOutForTesting()
- self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
+ self.stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__init__', fakes.fake_auth_init)
self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
@@ -131,7 +131,7 @@ class Test(unittest.TestCase):
class TestLimiter(unittest.TestCase):
def setUp(self):
self.stubs = stubout.StubOutForTesting()
- self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
+ self.stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__init__', fakes.fake_auth_init)
self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 8444b6fce..3820f5f27 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -56,11 +56,16 @@ def instance_address(context, instance_id):
def stub_instance(id, user_id=1):
- return Instance(id=id + 123456, state=0, image_id=10, user_id=user_id,
+ return Instance(id=int(id) + 123456, state=0, image_id=10, user_id=user_id,
display_name='server%s' % id, internal_id=id)
+def fake_compute_api(cls, req, id):
+ return True
+
+
class ServersTest(unittest.TestCase):
+
def setUp(self):
self.stubs = stubout.StubOutForTesting()
fakes.FakeAuthManager.auth_data = {}
@@ -82,9 +87,15 @@ class ServersTest(unittest.TestCase):
instance_address)
self.stubs.Set(nova.db.api, 'instance_get_floating_address',
instance_address)
+ self.stubs.Set(nova.compute.api.ComputeAPI, 'pause',
+ fake_compute_api)
+ self.stubs.Set(nova.compute.api.ComputeAPI, 'unpause',
+ fake_compute_api)
+ self.allow_admin = FLAGS.allow_admin_api
def tearDown(self):
self.stubs.UnsetAll()
+ FLAGS.allow_admin_api = self.allow_admin
def test_get_server_by_id(self):
req = webob.Request.blank('/v1.0/servers/1')
@@ -211,6 +222,30 @@ class ServersTest(unittest.TestCase):
self.assertEqual(s['imageId'], 10)
i += 1
+ def test_server_pause(self):
+ FLAGS.allow_admin_api = True
+ body = dict(server=dict(
+ name='server_test', imageId=2, flavorId=2, metadata={},
+ personality={}))
+ req = webob.Request.blank('/v1.0/servers/1/pause')
+ req.method = 'POST'
+ req.content_type = 'application/json'
+ req.body = json.dumps(body)
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 202)
+
+ def test_server_unpause(self):
+ FLAGS.allow_admin_api = True
+ body = dict(server=dict(
+ name='server_test', imageId=2, flavorId=2, metadata={},
+ personality={}))
+ req = webob.Request.blank('/v1.0/servers/1/unpause')
+ req.method = 'POST'
+ req.content_type = 'application/json'
+ req.body = json.dumps(body)
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 202)
+
def test_server_reboot(self):
body = dict(server=dict(
name='server_test', imageId=2, flavorId=2, metadata={},
diff --git a/nova/tests/api/test.py b/nova/tests/api/test.py
new file mode 100644
index 000000000..9caa8c9d0
--- /dev/null
+++ b/nova/tests/api/test.py
@@ -0,0 +1,81 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Test for the root WSGI middleware for all API controllers.
+"""
+
+import unittest
+
+import stubout
+import webob
+import webob.dec
+
+import nova.exception
+from nova import api
+from nova.tests.api.fakes import APIStub
+
+
+class Test(unittest.TestCase):
+
+ def setUp(self):
+ self.stubs = stubout.StubOutForTesting()
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+
+ def _request(self, url, subdomain, **kwargs):
+ environ_keys = {'HTTP_HOST': '%s.example.com' % subdomain}
+ environ_keys.update(kwargs)
+ req = webob.Request.blank(url, environ_keys)
+ return req.get_response(api.API('ec2'))
+
+ def test_openstack(self):
+ self.stubs.Set(api.openstack, 'API', APIStub)
+ result = self._request('/v1.0/cloud', 'api')
+ self.assertEqual(result.body, "/cloud")
+
+ def test_ec2(self):
+ self.stubs.Set(api.ec2, 'API', APIStub)
+ result = self._request('/services/cloud', 'ec2')
+ self.assertEqual(result.body, "/cloud")
+
+ def test_not_found(self):
+ self.stubs.Set(api.ec2, 'API', APIStub)
+ self.stubs.Set(api.openstack, 'API', APIStub)
+ result = self._request('/test/cloud', 'ec2')
+ self.assertNotEqual(result.body, "/cloud")
+
+ def test_query_api_versions(self):
+ result = self._request('/', 'api')
+ self.assertTrue('CURRENT' in result.body)
+
+ def test_metadata(self):
+ def go(url):
+ result = self._request(url, 'ec2', REMOTE_ADDR='128.192.151.2')
+ # Each should get to the ORM layer and fail to find the IP
+ self.assertRaises(nova.exception.NotFound, go, '/latest/')
+ self.assertRaises(nova.exception.NotFound, go, '/2009-04-04/')
+ self.assertRaises(nova.exception.NotFound, go, '/1.0/')
+
+ def test_ec2_root(self):
+ result = self._request('/', 'ec2')
+ self.assertTrue('2007-12-15\n' in result.body)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/nova/tests/api_integration.py b/nova/tests/api_integration.py
deleted file mode 100644
index 54403c655..000000000
--- a/nova/tests/api_integration.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import boto
-from boto.ec2.regioninfo import RegionInfo
-import unittest
-
-
-ACCESS_KEY = 'fake'
-SECRET_KEY = 'fake'
-CLC_IP = '127.0.0.1'
-CLC_PORT = 8773
-REGION = 'test'
-
-
-def get_connection():
- return boto.connect_ec2(
- aws_access_key_id=ACCESS_KEY,
- aws_secret_access_key=SECRET_KEY,
- is_secure=False,
- region=RegionInfo(None, REGION, CLC_IP),
- port=CLC_PORT,
- path='/services/Cloud',
- debug=99)
-
-
-class APIIntegrationTests(unittest.TestCase):
- def test_001_get_all_images(self):
- conn = get_connection()
- res = conn.get_all_images()
-
-
-if __name__ == '__main__':
- unittest.main()
-
-#print conn.get_all_key_pairs()
-#print conn.create_key_pair
-#print conn.create_security_group('name', 'description')
diff --git a/nova/tests/db/__init__.py b/nova/tests/db/__init__.py
new file mode 100644
index 000000000..2d43aac42
--- /dev/null
+++ b/nova/tests/db/__init__.py
@@ -0,0 +1,20 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+:mod:`db` -- Stubs for DB API
+=============================
+"""
diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py
new file mode 100644
index 000000000..05bdd172e
--- /dev/null
+++ b/nova/tests/db/fakes.py
@@ -0,0 +1,75 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Stubouts, mocks and fixtures for the test suite"""
+
+import time
+
+from nova import db
+from nova import utils
+from nova.compute import instance_types
+
+
+def stub_out_db_instance_api(stubs):
+ """ Stubs out the db API for creating Instances """
+
+ class FakeModel(object):
+ """ Stubs out for model """
+ def __init__(self, values):
+ self.values = values
+
+ def __getattr__(self, name):
+ return self.values[name]
+
+ def __getitem__(self, key):
+ if key in self.values:
+ return self.values[key]
+ else:
+ raise NotImplementedError()
+
+ def fake_instance_create(values):
+ """ Stubs out the db.instance_create method """
+
+ type_data = instance_types.INSTANCE_TYPES[values['instance_type']]
+
+ base_options = {
+ 'name': values['name'],
+ 'id': values['id'],
+ 'reservation_id': utils.generate_uid('r'),
+ 'image_id': values['image_id'],
+ 'kernel_id': values['kernel_id'],
+ 'ramdisk_id': values['ramdisk_id'],
+ 'state_description': 'scheduling',
+ 'user_id': values['user_id'],
+ 'project_id': values['project_id'],
+ 'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
+ 'instance_type': values['instance_type'],
+ 'memory_mb': type_data['memory_mb'],
+ 'mac_address': values['mac_address'],
+ 'vcpus': type_data['vcpus'],
+ 'local_gb': type_data['local_gb'],
+ }
+ return FakeModel(base_options)
+
+ def fake_network_get_by_instance(context, instance_id):
+ fields = {
+ 'bridge': 'xenbr0',
+ }
+ return FakeModel(fields)
+
+ stubs.Set(db, 'instance_create', fake_instance_create)
+ stubs.Set(db, 'network_get_by_instance', fake_network_get_by_instance)
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
index 061799923..ceac17adb 100644
--- a/nova/tests/objectstore_unittest.py
+++ b/nova/tests/objectstore_unittest.py
@@ -54,7 +54,7 @@ os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
-class ObjectStoreTestCase(test.TrialTestCase):
+class ObjectStoreTestCase(test.TestCase):
"""Test objectstore API directly."""
def setUp(self):
@@ -191,7 +191,7 @@ class TestSite(server.Site):
protocol = TestHTTPChannel
-class S3APITestCase(test.TrialTestCase):
+class S3APITestCase(test.TestCase):
"""Test objectstore through S3 API."""
def setUp(self):
diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py
deleted file mode 100644
index 67245af03..000000000
--- a/nova/tests/process_unittest.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-from twisted.internet import defer
-from twisted.internet import reactor
-from xml.etree import ElementTree
-
-from nova import exception
-from nova import flags
-from nova import process
-from nova import test
-from nova import utils
-
-FLAGS = flags.FLAGS
-
-
-class ProcessTestCase(test.TrialTestCase):
- def setUp(self):
- logging.getLogger().setLevel(logging.DEBUG)
- super(ProcessTestCase, self).setUp()
-
- def test_execute_stdout(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('echo test')
-
- def _check(rv):
- self.assertEqual(rv[0], 'test\n')
- self.assertEqual(rv[1], '')
-
- d.addCallback(_check)
- d.addErrback(self.fail)
- return d
-
- def test_execute_stderr(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('cat BAD_FILE', check_exit_code=False)
-
- def _check(rv):
- self.assertEqual(rv[0], '')
- self.assert_('No such file' in rv[1])
-
- d.addCallback(_check)
- d.addErrback(self.fail)
- return d
-
- def test_execute_unexpected_stderr(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('cat BAD_FILE')
- d.addCallback(lambda x: self.fail('should have raised an error'))
- d.addErrback(lambda failure: failure.trap(IOError))
- return d
-
- def test_max_processes(self):
- pool = process.ProcessPool(2)
- d1 = pool.simple_execute('sleep 0.01')
- d2 = pool.simple_execute('sleep 0.01')
- d3 = pool.simple_execute('sleep 0.005')
- d4 = pool.simple_execute('sleep 0.005')
-
- called = []
-
- def _called(rv, name):
- called.append(name)
-
- d1.addCallback(_called, 'd1')
- d2.addCallback(_called, 'd2')
- d3.addCallback(_called, 'd3')
- d4.addCallback(_called, 'd4')
-
- # Make sure that d3 and d4 had to wait on the other two and were called
- # in order
- # NOTE(termie): there may be a race condition in this test if for some
- # reason one of the sleeps takes longer to complete
- # than it should
- d4.addCallback(lambda x: self.assertEqual(called[2], 'd3'))
- d4.addCallback(lambda x: self.assertEqual(called[3], 'd4'))
- d4.addErrback(self.fail)
- return d4
-
- def test_kill_long_process(self):
- pool = process.ProcessPool(2)
-
- d1 = pool.simple_execute('sleep 1')
- d2 = pool.simple_execute('sleep 0.005')
-
- timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
-
- # kill d1 and wait on it to end then cancel the timeout
- d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
- d2.addCallback(lambda _: d1)
- d2.addBoth(lambda _: timeout.active() and timeout.cancel())
- d2.addErrback(self.fail)
- return d2
-
- def test_process_exit_is_contained(self):
- pool = process.ProcessPool(2)
-
- d1 = pool.simple_execute('sleep 1')
- d1.addCallback(lambda x: self.fail('should have errbacked'))
- d1.addErrback(lambda fail: fail.trap(IOError))
- reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
-
- return d1
-
- def test_shared_pool_is_singleton(self):
- pool1 = process.SharedPool()
- pool2 = process.SharedPool()
- self.assertEqual(id(pool1._instance), id(pool2._instance))
-
- def test_shared_pool_works_as_singleton(self):
- d1 = process.simple_execute('sleep 1')
- d2 = process.simple_execute('sleep 0.005')
- # lp609749: would have failed with
- # exceptions.AssertionError: Someone released me too many times:
- # too many tokens!
- return d1
diff --git a/nova/tests/access_unittest.py b/nova/tests/test_access.py
index 0f66c0a26..58fdea3b5 100644
--- a/nova/tests/access_unittest.py
+++ b/nova/tests/test_access.py
@@ -35,7 +35,7 @@ class Context(object):
pass
-class AccessTestCase(test.TrialTestCase):
+class AccessTestCase(test.TestCase):
def setUp(self):
super(AccessTestCase, self).setUp()
um = manager.AuthManager()
diff --git a/nova/tests/api_unittest.py b/nova/tests/test_api.py
index 33d4cb294..33d4cb294 100644
--- a/nova/tests/api_unittest.py
+++ b/nova/tests/test_api.py
diff --git a/nova/tests/auth_unittest.py b/nova/tests/test_auth.py
index fe891beee..15d40bc53 100644
--- a/nova/tests/auth_unittest.py
+++ b/nova/tests/test_auth.py
@@ -208,17 +208,13 @@ class AuthManagerTestCase(object):
# so it probably belongs in crypto_unittest
# but I'm leaving it where I found it.
with user_and_project_generator(self.manager) as (user, project):
- # NOTE(todd): Should mention why we must setup controller first
- # (somebody please clue me in)
- cloud_controller = cloud.CloudController()
- cloud_controller.setup()
- _key, cert_str = self.manager._generate_x509_cert('test1',
- 'testproj')
+ # NOTE(vish): Setup runs genroot.sh if it hasn't been run
+ cloud.CloudController().setup()
+ _key, cert_str = crypto.generate_x509_cert(user.id, project.id)
logging.debug(cert_str)
- # Need to verify that it's signed by the right intermediate CA
- full_chain = crypto.fetch_ca(project_id='testproj', chain=True)
- int_cert = crypto.fetch_ca(project_id='testproj', chain=False)
+ full_chain = crypto.fetch_ca(project_id=project.id, chain=True)
+ int_cert = crypto.fetch_ca(project_id=project.id, chain=False)
cloud_cert = crypto.fetch_ca()
logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
signed_cert = X509.load_cert_string(cert_str)
@@ -227,7 +223,8 @@ class AuthManagerTestCase(object):
cloud_cert = X509.load_cert_string(cloud_cert)
self.assertTrue(signed_cert.verify(chain_cert.get_pubkey()))
self.assertTrue(signed_cert.verify(int_cert.get_pubkey()))
- if not FLAGS.use_intermediate_ca:
+
+ if not FLAGS.use_project_ca:
self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey()))
else:
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
@@ -326,24 +323,20 @@ class AuthManagerTestCase(object):
self.assertTrue(user.is_admin())
-class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
+class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
def __init__(self, *args, **kwargs):
AuthManagerTestCase.__init__(self)
- test.TrialTestCase.__init__(self, *args, **kwargs)
+ test.TestCase.__init__(self, *args, **kwargs)
import nova.auth.fakeldap as fakeldap
- FLAGS.redis_db = 8
if FLAGS.flush_db:
- logging.info("Flushing redis datastore")
- try:
- r = fakeldap.Redis.instance()
- r.flushdb()
- except:
- self.skip = True
+ logging.info("Flushing datastore")
+ r = fakeldap.Store.instance()
+ r.flushdb()
-class AuthManagerDbTestCase(AuthManagerTestCase, test.TrialTestCase):
+class AuthManagerDbTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver'
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/test_cloud.py
index 770c94219..70d2c44da 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/test_cloud.py
@@ -22,22 +22,18 @@ import logging
from M2Crypto import BIO
from M2Crypto import RSA
import os
-import StringIO
import tempfile
import time
from eventlet import greenthread
-from twisted.internet import defer
-import unittest
-from xml.etree import ElementTree
from nova import context
from nova import crypto
from nova import db
from nova import flags
from nova import rpc
+from nova import service
from nova import test
-from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
@@ -53,10 +49,11 @@ IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
os.makedirs(IMAGES_PATH)
-class CloudTestCase(test.TrialTestCase):
+class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
- self.flags(connection_type='fake', images_path=IMAGES_PATH)
+ self.flags(connection_type='fake',
+ images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG)
@@ -64,27 +61,23 @@ class CloudTestCase(test.TrialTestCase):
# set up our cloud
self.cloud = cloud.CloudController()
- # set up a service
- self.compute = utils.import_object(FLAGS.compute_manager)
- self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.compute_topic,
- proxy=self.compute)
- self.compute_consumer.attach_to_eventlet()
- self.network = utils.import_object(FLAGS.network_manager)
- self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.network_topic,
- proxy=self.network)
- self.network_consumer.attach_to_eventlet()
+ # set up services
+ self.compute = service.Service.create(binary='nova-compute')
+ self.compute.start()
+ self.network = service.Service.create(binary='nova-network')
+ self.network.start()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
- project=self.project)
+ project=self.project)
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+ self.compute.kill()
+ self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -111,12 +104,13 @@ class CloudTestCase(test.TrialTestCase):
{'address': address,
'host': FLAGS.host})
self.cloud.allocate_address(self.context)
- inst = db.instance_create(self.context, {})
+ inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
+ greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
@@ -199,7 +193,7 @@ class CloudTestCase(test.TrialTestCase):
logging.debug("Need to watch instance %s until it's running..." %
instance['instance_id'])
while True:
- rv = yield defer.succeed(time.sleep(1))
+ greenthread.sleep(1)
info = self.cloud._get_instance(instance['instance_id'])
logging.debug(info['state'])
if info['state'] == power_state.RUNNING:
diff --git a/nova/tests/compute_unittest.py b/nova/tests/test_compute.py
index 6f3ef96cb..348bb3351 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/test_compute.py
@@ -22,8 +22,6 @@ Tests For Compute
import datetime
import logging
-from twisted.internet import defer
-
from nova import context
from nova import db
from nova import exception
@@ -33,15 +31,17 @@ from nova import utils
from nova.auth import manager
from nova.compute import api as compute_api
+
FLAGS = flags.FLAGS
-class ComputeTestCase(test.TrialTestCase):
+class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(ComputeTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
@@ -94,24 +94,22 @@ class ComputeTestCase(test.TrialTestCase):
db.security_group_destroy(self.context, group['id'])
db.instance_destroy(self.context, ref[0]['id'])
- @defer.inlineCallbacks
def test_run_terminate(self):
"""Make sure it is possible to run and terminate instance"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("Running instances: %s", instances)
self.assertEqual(len(instances), 1)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("After terminating instances: %s", instances)
self.assertEqual(len(instances), 0)
- @defer.inlineCallbacks
def test_run_terminate_timestamps(self):
"""Make sure timestamps are set for launched and destroyed"""
instance_id = self._create_instance()
@@ -119,42 +117,48 @@ class ComputeTestCase(test.TrialTestCase):
self.assertEqual(instance_ref['launched_at'], None)
self.assertEqual(instance_ref['deleted_at'], None)
launch = datetime.datetime.utcnow()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] > launch)
self.assertEqual(instance_ref['deleted_at'], None)
terminate = datetime.datetime.utcnow()
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
self.context = self.context.elevated(True)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
- @defer.inlineCallbacks
+ def test_pause(self):
+ """Ensure instance can be paused"""
+ instance_id = self._create_instance()
+ self.compute.run_instance(self.context, instance_id)
+ self.compute.pause_instance(self.context, instance_id)
+ self.compute.unpause_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
+
def test_reboot(self):
"""Ensure instance can be rebooted"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
- yield self.compute.reboot_instance(self.context, instance_id)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
+ self.compute.reboot_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
- @defer.inlineCallbacks
def test_console_output(self):
"""Make sure we can get console output from instance"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
- console = yield self.compute.get_console_output(self.context,
+ console = self.compute.get_console_output(self.context,
instance_id)
self.assert_(console)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
- @defer.inlineCallbacks
def test_run_instance_existing(self):
"""Ensure failure when running an instance that already exists"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
- self.assertFailure(self.compute.run_instance(self.context,
- instance_id),
- exception.Error)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
+ self.assertRaises(exception.Error,
+ self.compute.run_instance,
+ self.context,
+ instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
diff --git a/nova/tests/flags_unittest.py b/nova/tests/test_flags.py
index b97df075d..707300fcf 100644
--- a/nova/tests/flags_unittest.py
+++ b/nova/tests/test_flags.py
@@ -24,7 +24,7 @@ FLAGS = flags.FLAGS
flags.DEFINE_string('flags_unittest', 'foo', 'for testing purposes only')
-class FlagsTestCase(test.TrialTestCase):
+class FlagsTestCase(test.TestCase):
def setUp(self):
super(FlagsTestCase, self).setUp()
diff --git a/nova/tests/test_middleware.py b/nova/tests/test_middleware.py
new file mode 100644
index 000000000..0febf52d6
--- /dev/null
+++ b/nova/tests/test_middleware.py
@@ -0,0 +1,86 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import webob
+import webob.dec
+import webob.exc
+
+from nova.api import ec2
+from nova import flags
+from nova import test
+from nova import utils
+
+
+FLAGS = flags.FLAGS
+
+
+@webob.dec.wsgify
+def conditional_forbid(req):
+ """Helper wsgi app returns 403 if param 'die' is 1."""
+ if 'die' in req.params and req.params['die'] == '1':
+ raise webob.exc.HTTPForbidden()
+ return 'OK'
+
+
+class LockoutTestCase(test.TrialTestCase):
+ """Test case for the Lockout middleware."""
+ def setUp(self): # pylint: disable-msg=C0103
+ super(LockoutTestCase, self).setUp()
+ utils.set_time_override()
+ self.lockout = ec2.Lockout(conditional_forbid)
+
+ def tearDown(self): # pylint: disable-msg=C0103
+ utils.clear_time_override()
+ super(LockoutTestCase, self).tearDown()
+
+ def _send_bad_attempts(self, access_key, num_attempts=1):
+ """Fail x."""
+ for i in xrange(num_attempts):
+ req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key)
+ self.assertEqual(req.get_response(self.lockout).status_int, 403)
+
+ def _is_locked_out(self, access_key):
+ """Sends a test request to see if key is locked out."""
+ req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key)
+ return (req.get_response(self.lockout).status_int == 403)
+
+ def test_lockout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test'))
+
+ def test_timeout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test'))
+ utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
+ self.assertFalse(self._is_locked_out('test'))
+
+ def test_multiple_keys(self):
+ self._send_bad_attempts('test1', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test1'))
+ self.assertFalse(self._is_locked_out('test2'))
+ utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
+ self.assertFalse(self._is_locked_out('test1'))
+ self.assertFalse(self._is_locked_out('test2'))
+
+ def test_window_timeout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
+ self.assertFalse(self._is_locked_out('test'))
+ utils.advance_time_seconds(FLAGS.lockout_window * 60)
+ self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
+ self.assertFalse(self._is_locked_out('test'))
diff --git a/nova/tests/misc_unittest.py b/nova/tests/test_misc.py
index 2758276e5..33c1777d5 100644
--- a/nova/tests/misc_unittest.py
+++ b/nova/tests/test_misc.py
@@ -20,15 +20,15 @@ from nova import test
from nova.utils import parse_mailmap, str_dict_replace
-class ProjectTestCase(test.TrialTestCase):
+class ProjectTestCase(test.TestCase):
def test_authors_up_to_date(self):
- if os.path.exists('../.bzr'):
+ if os.path.exists('.bzr'):
contributors = set()
- mailmap = parse_mailmap('../.mailmap')
+ mailmap = parse_mailmap('.mailmap')
import bzrlib.workingtree
- tree = bzrlib.workingtree.WorkingTree.open('..')
+ tree = bzrlib.workingtree.WorkingTree.open('.')
tree.lock_read()
try:
parents = tree.get_parent_ids()
@@ -42,7 +42,7 @@ class ProjectTestCase(test.TrialTestCase):
email = author.split(' ')[-1]
contributors.add(str_dict_replace(email, mailmap))
- authors_file = open('../Authors', 'r').read()
+ authors_file = open('Authors', 'r').read()
missing = set()
for contributor in contributors:
diff --git a/nova/tests/network_unittest.py b/nova/tests/test_network.py
index 6f4705719..96473ac7c 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/test_network.py
@@ -26,6 +26,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
+from nova import service
from nova import test
from nova import utils
from nova.auth import manager
@@ -33,13 +34,14 @@ from nova.auth import manager
FLAGS = flags.FLAGS
-class NetworkTestCase(test.TrialTestCase):
+class NetworkTestCase(test.TestCase):
"""Test cases for network code"""
def setUp(self):
super(NetworkTestCase, self).setUp()
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
+ fake_call=True,
fake_network=True,
network_size=16,
num_networks=5)
@@ -56,16 +58,13 @@ class NetworkTestCase(test.TrialTestCase):
# create the necessary network data for the project
user_context = context.RequestContext(project=self.projects[i],
user=self.user)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
instance_ref = self._create_instance(0)
self.instance_id = instance_ref['id']
instance_ref = self._create_instance(1)
self.instance2_id = instance_ref['id']
def tearDown(self):
- super(NetworkTestCase, self).tearDown()
# TODO(termie): this should really be instantiating clean datastores
# in between runs, one failure kills all the tests
db.instance_destroy(context.get_admin_context(), self.instance_id)
@@ -73,6 +72,7 @@ class NetworkTestCase(test.TrialTestCase):
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
+ super(NetworkTestCase, self).tearDown()
def _create_instance(self, project_num, mac=None):
if not mac:
diff --git a/nova/tests/quota_unittest.py b/nova/tests/test_quota.py
index 1966b51f7..8cf2a5e54 100644
--- a/nova/tests/quota_unittest.py
+++ b/nova/tests/test_quota.py
@@ -32,7 +32,7 @@ from nova.api.ec2 import cloud
FLAGS = flags.FLAGS
-class QuotaTestCase(test.TrialTestCase):
+class QuotaTestCase(test.TestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(QuotaTestCase, self).setUp()
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/test_rpc.py
index f35b65a39..6ea2edcab 100644
--- a/nova/tests/rpc_unittest.py
+++ b/nova/tests/test_rpc.py
@@ -20,8 +20,6 @@ Unit Tests for remote procedure calls using queue
"""
import logging
-from twisted.internet import defer
-
from nova import context
from nova import flags
from nova import rpc
@@ -31,32 +29,31 @@ from nova import test
FLAGS = flags.FLAGS
-class RpcTestCase(test.TrialTestCase):
+class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
- self.consumer.attach_to_twisted()
+ self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_call_succeed(self):
"""Get a value through rpc call"""
value = 42
- result = yield rpc.call_twisted(self.context,
- 'test', {"method": "echo",
+ result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
value = 42
- result = yield rpc.call_twisted(self.context,
- 'test', {"method": "context",
- "args": {"value": value}})
+ result = rpc.call(self.context,
+ 'test', {"method": "context",
+ "args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
@@ -67,18 +64,48 @@ class RpcTestCase(test.TrialTestCase):
to an int in the test.
"""
value = 42
- self.assertFailure(rpc.call_twisted(self.context, 'test',
- {"method": "fail",
- "args": {"value": value}}),
- rpc.RemoteError)
+ self.assertRaises(rpc.RemoteError,
+ rpc.call,
+ self.context,
+ 'test',
+ {"method": "fail",
+ "args": {"value": value}})
try:
- yield rpc.call_twisted(self.context,
- 'test', {"method": "fail",
- "args": {"value": value}})
+ rpc.call(self.context,
+ 'test',
+ {"method": "fail",
+ "args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
+ def test_nested_calls(self):
+ """Test that we can do an rpc.call inside another call"""
+ class Nested(object):
+ @staticmethod
+ def echo(context, queue, value):
+ """Calls echo in the passed queue"""
+ logging.debug("Nested received %s, %s", queue, value)
+ ret = rpc.call(context,
+ queue,
+ {"method": "echo",
+ "args": {"value": value}})
+ logging.debug("Nested return %s", ret)
+ return value
+
+ nested = Nested()
+ conn = rpc.Connection.instance(True)
+ consumer = rpc.AdapterConsumer(connection=conn,
+ topic='nested',
+ proxy=nested)
+ consumer.attach_to_eventlet()
+ value = 42
+ result = rpc.call(self.context,
+ 'nested', {"method": "echo",
+ "args": {"queue": "test",
+ "value": value}})
+ self.assertEqual(value, result)
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call
@@ -89,13 +116,13 @@ class TestReceiver(object):
def echo(context, value):
"""Simply returns whatever value is sent in"""
logging.debug("Received %s", value)
- return defer.succeed(value)
+ return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context"""
logging.debug("Received %s", context)
- return defer.succeed(context.to_dict())
+ return context.to_dict()
@staticmethod
def fail(context, value):
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/test_scheduler.py
index cb5fe6b9c..91517cc5d 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/test_scheduler.py
@@ -44,11 +44,11 @@ class TestDriver(driver.Scheduler):
return 'named_host'
-class SchedulerTestCase(test.TrialTestCase):
+class SchedulerTestCase(test.TestCase):
"""Test case for scheduler"""
def setUp(self):
super(SchedulerTestCase, self).setUp()
- self.flags(scheduler_driver='nova.tests.scheduler_unittest.TestDriver')
+ self.flags(scheduler_driver='nova.tests.test_scheduler.TestDriver')
def test_fallback(self):
scheduler = manager.SchedulerManager()
@@ -73,11 +73,12 @@ class SchedulerTestCase(test.TrialTestCase):
scheduler.named_method(ctxt, 'topic', num=7)
-class SimpleDriverTestCase(test.TrialTestCase):
+class SimpleDriverTestCase(test.TestCase):
"""Test case for simple driver"""
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
max_cores=4,
max_gigabytes=4,
network_manager='nova.network.manager.FlatManager',
@@ -122,12 +123,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(len(hosts), 2)
compute1.kill()
@@ -139,12 +140,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance()
@@ -162,12 +163,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
instance_ids1 = []
instance_ids2 = []
for index in xrange(FLAGS.max_cores):
@@ -195,12 +196,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume1.startService()
+ volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume2.startService()
+ volume2.start()
volume_id1 = self._create_volume()
volume1.create_volume(self.context, volume_id1)
volume_id2 = self._create_volume()
@@ -218,12 +219,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume1.startService()
+ volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume2.startService()
+ volume2.start()
volume_ids1 = []
volume_ids2 = []
for index in xrange(FLAGS.max_gigabytes):
diff --git a/nova/tests/service_unittest.py b/nova/tests/test_service.py
index a268bc4fe..b30838ad7 100644
--- a/nova/tests/service_unittest.py
+++ b/nova/tests/test_service.py
@@ -22,9 +22,6 @@ Unit Tests for remote procedure calls using queue
import mox
-from twisted.application.app import startApplication
-from twisted.internet import defer
-
from nova import exception
from nova import flags
from nova import rpc
@@ -33,7 +30,7 @@ from nova import service
from nova import manager
FLAGS = flags.FLAGS
-flags.DEFINE_string("fake_manager", "nova.tests.service_unittest.FakeManager",
+flags.DEFINE_string("fake_manager", "nova.tests.test_service.FakeManager",
"Manager for testing")
@@ -48,34 +45,34 @@ class ExtendedService(service.Service):
return 'service'
-class ServiceManagerTestCase(test.TrialTestCase):
+class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services"""
def test_attribute_error_for_no_manager(self):
serv = service.Service('test',
'test',
'test',
- 'nova.tests.service_unittest.FakeManager')
+ 'nova.tests.test_service.FakeManager')
self.assertRaises(AttributeError, getattr, serv, 'test_method')
def test_message_gets_to_manager(self):
serv = service.Service('test',
'test',
'test',
- 'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ 'nova.tests.test_service.FakeManager')
+ serv.start()
self.assertEqual(serv.test_method(), 'manager')
def test_override_manager_method(self):
serv = ExtendedService('test',
'test',
'test',
- 'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ 'nova.tests.test_service.FakeManager')
+ serv.start()
self.assertEqual(serv.test_method(), 'service')
-class ServiceTestCase(test.TrialTestCase):
+class ServiceTestCase(test.TestCase):
"""Test cases for Services"""
def setUp(self):
@@ -94,8 +91,6 @@ class ServiceTestCase(test.TrialTestCase):
self.mox.StubOutWithMock(rpc,
'AdapterConsumer',
use_mock_anything=True)
- self.mox.StubOutWithMock(
- service.task, 'LoopingCall', use_mock_anything=True)
rpc.AdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@@ -106,19 +101,8 @@ class ServiceTestCase(test.TrialTestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.AdapterConsumer)
- rpc.AdapterConsumer.attach_to_twisted()
- rpc.AdapterConsumer.attach_to_twisted()
-
- # Stub out looping call a bit needlessly since we don't have an easy
- # way to cancel it (yet) when the tests finishes
- service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
- service.task.LoopingCall)
- service.task.LoopingCall.start(interval=mox.IgnoreArg(),
- now=mox.IgnoreArg())
- service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
- service.task.LoopingCall)
- service.task.LoopingCall.start(interval=mox.IgnoreArg(),
- now=mox.IgnoreArg())
+ rpc.AdapterConsumer.attach_to_eventlet()
+ rpc.AdapterConsumer.attach_to_eventlet()
service_create = {'host': host,
'binary': binary,
@@ -136,14 +120,14 @@ class ServiceTestCase(test.TrialTestCase):
service_create).AndReturn(service_ref)
self.mox.ReplayAll()
- startApplication(app, False)
+ app.start()
+ app.stop()
self.assert_(app)
# We're testing sort of weird behavior in how report_state decides
# whether it is disconnected, it looks for a variable on itself called
# 'model_disconnected' and report_state doesn't really do much so this
# these are mostly just for coverage
- @defer.inlineCallbacks
def test_report_state_no_service(self):
host = 'foo'
binary = 'bar'
@@ -172,11 +156,10 @@ class ServiceTestCase(test.TrialTestCase):
serv = service.Service(host,
binary,
topic,
- 'nova.tests.service_unittest.FakeManager')
- serv.startService()
- yield serv.report_state()
+ 'nova.tests.test_service.FakeManager')
+ serv.start()
+ serv.report_state()
- @defer.inlineCallbacks
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
@@ -203,12 +186,11 @@ class ServiceTestCase(test.TrialTestCase):
serv = service.Service(host,
binary,
topic,
- 'nova.tests.service_unittest.FakeManager')
- serv.startService()
- yield serv.report_state()
+ 'nova.tests.test_service.FakeManager')
+ serv.start()
+ serv.report_state()
self.assert_(serv.model_disconnected)
- @defer.inlineCallbacks
def test_report_state_newly_connected(self):
host = 'foo'
binary = 'bar'
@@ -237,9 +219,9 @@ class ServiceTestCase(test.TrialTestCase):
serv = service.Service(host,
binary,
topic,
- 'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ 'nova.tests.test_service.FakeManager')
+ serv.start()
serv.model_disconnected = True
- yield serv.report_state()
+ serv.report_state()
self.assert_(not serv.model_disconnected)
diff --git a/nova/tests/twistd_unittest.py b/nova/tests/test_twistd.py
index 75007b9c8..75007b9c8 100644
--- a/nova/tests/twistd_unittest.py
+++ b/nova/tests/test_twistd.py
diff --git a/nova/tests/virt_unittest.py b/nova/tests/test_virt.py
index d49383fb7..8dab8de2f 100644
--- a/nova/tests/virt_unittest.py
+++ b/nova/tests/test_virt.py
@@ -30,9 +30,10 @@ FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
-class LibvirtConnTestCase(test.TrialTestCase):
+class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
+ self.flags(fake_call=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
@@ -40,33 +41,64 @@ class LibvirtConnTestCase(test.TrialTestCase):
self.network = utils.import_object(FLAGS.network_manager)
FLAGS.instances_path = ''
- def test_get_uri_and_template(self):
- ip = '10.11.12.13'
-
- instance = {'internal_id': 1,
- 'memory_kb': '1024000',
- 'basepath': '/some/path',
- 'bridge_name': 'br100',
- 'mac_address': '02:12:34:46:56:67',
- 'vcpus': 2,
- 'project_id': 'fake',
- 'bridge': 'br101',
- 'instance_type': 'm1.small'}
-
+ test_ip = '10.11.12.13'
+ test_instance = {'memory_kb': '1024000',
+ 'basepath': '/some/path',
+ 'bridge_name': 'br100',
+ 'mac_address': '02:12:34:46:56:67',
+ 'vcpus': 2,
+ 'project_id': 'fake',
+ 'bridge': 'br101',
+ 'instance_type': 'm1.small'}
+
+ def test_xml_and_uri_no_ramdisk_no_kernel(self):
+ instance_data = dict(self.test_instance)
+ self._check_xml_and_uri(instance_data,
+ expect_kernel=False, expect_ramdisk=False)
+
+ def test_xml_and_uri_no_ramdisk(self):
+ instance_data = dict(self.test_instance)
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self._check_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=False)
+
+ def test_xml_and_uri_no_kernel(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ self._check_xml_and_uri(instance_data,
+ expect_kernel=False, expect_ramdisk=False)
+
+ def test_xml_and_uri(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self._check_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=True)
+
+ def test_xml_and_uri_rescue(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self._check_xml_and_uri(instance_data, expect_kernel=True,
+ expect_ramdisk=True, rescue=True)
+
+ def _check_xml_and_uri(self, instance, expect_ramdisk, expect_kernel,
+ rescue=False):
user_context = context.RequestContext(project=self.project,
user=self.user)
instance_ref = db.instance_create(user_context, instance)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
+ network_ref = db.project_get_network(context.get_admin_context(),
+ self.project.id)
- fixed_ip = {'address': ip,
+ fixed_ip = {'address': self.test_ip,
'network_id': network_ref['id']}
ctxt = context.get_admin_context()
fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
- db.fixed_ip_update(ctxt, ip, {'allocated': True,
- 'instance_id': instance_ref['id']})
+ db.fixed_ip_update(ctxt, self.test_ip,
+ {'allocated': True,
+ 'instance_id': instance_ref['id']})
type_uri_map = {'qemu': ('qemu:///system',
[(lambda t: t.find('.').get('type'), 'qemu'),
@@ -78,23 +110,73 @@ class LibvirtConnTestCase(test.TrialTestCase):
(lambda t: t.find('./devices/emulator'), None)]),
'uml': ('uml:///system',
[(lambda t: t.find('.').get('type'), 'uml'),
- (lambda t: t.find('./os/type').text, 'uml')])}
+ (lambda t: t.find('./os/type').text, 'uml')]),
+ 'xen': ('xen:///',
+ [(lambda t: t.find('.').get('type'), 'xen'),
+ (lambda t: t.find('./os/type').text, 'linux')]),
+ }
+
+ for hypervisor_type in ['qemu', 'kvm', 'xen']:
+ check_list = type_uri_map[hypervisor_type][1]
+
+ if rescue:
+ check = (lambda t: t.find('./os/kernel').text.split('/')[1],
+ 'rescue-kernel')
+ check_list.append(check)
+ check = (lambda t: t.find('./os/initrd').text.split('/')[1],
+ 'rescue-ramdisk')
+ check_list.append(check)
+ else:
+ if expect_kernel:
+ check = (lambda t: t.find('./os/kernel').text.split(
+ '/')[1], 'kernel')
+ else:
+ check = (lambda t: t.find('./os/kernel'), None)
+ check_list.append(check)
+
+ if expect_ramdisk:
+ check = (lambda t: t.find('./os/initrd').text.split(
+ '/')[1], 'ramdisk')
+ else:
+ check = (lambda t: t.find('./os/initrd'), None)
+ check_list.append(check)
common_checks = [
(lambda t: t.find('.').tag, 'domain'),
- (lambda t: t.find('./devices/interface/filterref/parameter').\
- get('name'), 'IP'),
- (lambda t: t.find('./devices/interface/filterref/parameter').\
- get('value'), '10.11.12.13')]
+ (lambda t: t.find(
+ './devices/interface/filterref/parameter').get('name'), 'IP'),
+ (lambda t: t.find(
+ './devices/interface/filterref/parameter').get(
+ 'value'), '10.11.12.13'),
+ (lambda t: t.findall(
+ './devices/interface/filterref/parameter')[1].get(
+ 'name'), 'DHCPSERVER'),
+ (lambda t: t.findall(
+ './devices/interface/filterref/parameter')[1].get(
+ 'value'), '10.0.0.1'),
+ (lambda t: t.find('./devices/serial/source').get(
+ 'path').split('/')[1], 'console.log'),
+ (lambda t: t.find('./memory').text, '2097152')]
+
+ if rescue:
+ common_checks += [
+ (lambda t: t.findall('./devices/disk/source')[0].get(
+ 'file').split('/')[1], 'rescue-disk'),
+ (lambda t: t.findall('./devices/disk/source')[1].get(
+ 'file').split('/')[1], 'disk')]
+ else:
+ common_checks += [(lambda t: t.findall(
+ './devices/disk/source')[0].get('file').split('/')[1],
+ 'disk')]
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
- uri, _template, _rescue = conn.get_uri_and_templates()
+ uri = conn.get_uri()
self.assertEquals(uri, expected_uri)
- xml = conn.to_xml(instance_ref)
+ xml = conn.to_xml(instance_ref, rescue)
tree = xml_to_tree(xml)
for i, (check, expected_result) in enumerate(checks):
self.assertEqual(check(tree),
@@ -106,6 +188,9 @@ class LibvirtConnTestCase(test.TrialTestCase):
expected_result,
'%s failed common check %d' % (xml, i))
+ # This test is supposed to make sure we don't override a specifically
+ # set uri
+ #
# Deliberately not just assigning this string to FLAGS.libvirt_uri and
# checking against that later on. This way we make sure the
# implementation doesn't fiddle around with the FLAGS.
@@ -114,7 +199,7 @@ class LibvirtConnTestCase(test.TrialTestCase):
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
- uri, _template, _rescue = conn.get_uri_and_templates()
+ uri = conn.get_uri()
self.assertEquals(uri, testuri)
def tearDown(self):
@@ -123,7 +208,7 @@ class LibvirtConnTestCase(test.TrialTestCase):
self.manager.delete_user(self.user)
-class NWFilterTestCase(test.TrialTestCase):
+class NWFilterTestCase(test.TestCase):
def setUp(self):
super(NWFilterTestCase, self).setUp()
@@ -235,7 +320,7 @@ class NWFilterTestCase(test.TrialTestCase):
'project_id': 'fake'})
inst_id = instance_ref['id']
- def _ensure_all_called(_):
+ def _ensure_all_called():
instance_filter = 'nova-instance-%s' % instance_ref['name']
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
@@ -252,8 +337,7 @@ class NWFilterTestCase(test.TrialTestCase):
self.security_group.id)
instance = db.instance_get(self.context, inst_id)
- d = self.fw.setup_nwfilters_for_instance(instance)
- d.addCallback(_ensure_all_called)
- d.addCallback(lambda _: self.teardown_security_group())
-
- return d
+ self.fw.setup_base_nwfilters()
+ self.fw.setup_nwfilters_for_instance(instance)
+ _ensure_all_called()
+ self.teardown_security_group()
diff --git a/nova/tests/volume_unittest.py b/nova/tests/test_volume.py
index 12321a96f..b13455fb0 100644
--- a/nova/tests/volume_unittest.py
+++ b/nova/tests/test_volume.py
@@ -21,8 +21,6 @@ Tests for Volume Code.
"""
import logging
-from twisted.internet import defer
-
from nova import context
from nova import exception
from nova import db
@@ -33,7 +31,7 @@ from nova import utils
FLAGS = flags.FLAGS
-class VolumeTestCase(test.TrialTestCase):
+class VolumeTestCase(test.TestCase):
"""Test Case for volumes."""
def setUp(self):
@@ -56,51 +54,48 @@ class VolumeTestCase(test.TrialTestCase):
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)['id']
- @defer.inlineCallbacks
def test_create_delete_volume(self):
"""Test volume can be created and deleted."""
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
- @defer.inlineCallbacks
def test_too_big_volume(self):
"""Ensure failure if a too large of a volume is requested."""
# FIXME(vish): validation needs to move into the data layer in
# volume_create
- defer.returnValue(True)
+ return True
try:
volume_id = self._create_volume('1001')
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
self.fail("Should have thrown TypeError")
except TypeError:
pass
- @defer.inlineCallbacks
def test_too_many_volumes(self):
"""Ensure that NoMoreTargets is raised when we run out of volumes."""
vols = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
vols.append(volume_id)
volume_id = self._create_volume()
- self.assertFailure(self.volume.create_volume(self.context,
- volume_id),
- db.NoMoreTargets)
+ self.assertRaises(db.NoMoreTargets,
+ self.volume.create_volume,
+ self.context,
+ volume_id)
db.volume_destroy(context.get_admin_context(), volume_id)
for volume_id in vols:
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
- @defer.inlineCallbacks
def test_run_attach_detach_volume(self):
"""Make sure volume can be attached and detached from instance."""
inst = {}
@@ -115,15 +110,15 @@ class VolumeTestCase(test.TrialTestCase):
instance_id = db.instance_create(self.context, inst)['id']
mountpoint = "/dev/sdf"
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
if FLAGS.fake_tests:
db.volume_attached(self.context, volume_id, instance_id,
mountpoint)
else:
- yield self.compute.attach_volume(self.context,
- instance_id,
- volume_id,
- mountpoint)
+ self.compute.attach_volume(self.context,
+ instance_id,
+ volume_id,
+ mountpoint)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
@@ -131,25 +126,26 @@ class VolumeTestCase(test.TrialTestCase):
instance_ref = db.volume_get_instance(self.context, volume_id)
self.assertEqual(instance_ref['id'], instance_id)
- self.assertFailure(self.volume.delete_volume(self.context, volume_id),
- exception.Error)
+ self.assertRaises(exception.Error,
+ self.volume.delete_volume,
+ self.context,
+ volume_id)
if FLAGS.fake_tests:
db.volume_detached(self.context, volume_id)
else:
- yield self.compute.detach_volume(self.context,
- instance_id,
- volume_id)
+ self.compute.detach_volume(self.context,
+ instance_id,
+ volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.Error,
db.volume_get,
self.context,
volume_id)
db.instance_destroy(self.context, instance_id)
- @defer.inlineCallbacks
def test_concurrent_volumes_get_different_targets(self):
"""Ensure multiple concurrent volumes get different targets."""
volume_ids = []
@@ -164,15 +160,11 @@ class VolumeTestCase(test.TrialTestCase):
self.assert_(iscsi_target not in targets)
targets.append(iscsi_target)
logging.debug("Target %s allocated", iscsi_target)
- deferreds = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
d = self.volume.create_volume(self.context, volume_id)
- d.addCallback(_check)
- d.addErrback(self.fail)
- deferreds.append(d)
- yield defer.DeferredList(deferreds)
+ _check(d)
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
new file mode 100644
index 000000000..b5d3ea395
--- /dev/null
+++ b/nova/tests/test_xenapi.py
@@ -0,0 +1,219 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Test suite for XenAPI
+"""
+
+import stubout
+
+from nova import db
+from nova import context
+from nova import flags
+from nova import test
+from nova import utils
+from nova.auth import manager
+from nova.compute import instance_types
+from nova.compute import power_state
+from nova.virt import xenapi_conn
+from nova.virt.xenapi import fake
+from nova.virt.xenapi import volume_utils
+from nova.tests.db import fakes
+from nova.tests.xenapi import stubs
+
+FLAGS = flags.FLAGS
+
+
+class XenAPIVolumeTestCase(test.TestCase):
+ """
+ Unit tests for Volume operations
+ """
+ def setUp(self):
+ super(XenAPIVolumeTestCase, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ FLAGS.target_host = '127.0.0.1'
+ FLAGS.xenapi_connection_url = 'test_url'
+ FLAGS.xenapi_connection_password = 'test_pass'
+ fakes.stub_out_db_instance_api(self.stubs)
+ fake.reset()
+ self.values = {'name': 1, 'id': 1,
+ 'project_id': 'fake',
+ 'user_id': 'fake',
+ 'image_id': 1,
+ 'kernel_id': 2,
+ 'ramdisk_id': 3,
+ 'instance_type': 'm1.large',
+ 'mac_address': 'aa:bb:cc:dd:ee:ff',
+ }
+
+ def _create_volume(self, size='0'):
+ """Create a volume object."""
+ vol = {}
+ vol['size'] = size
+ vol['user_id'] = 'fake'
+ vol['project_id'] = 'fake'
+ vol['host'] = 'localhost'
+ vol['availability_zone'] = FLAGS.storage_availability_zone
+ vol['status'] = "creating"
+ vol['attach_status'] = "detached"
+ return db.volume_create(context.get_admin_context(), vol)
+
+ def test_create_iscsi_storage(self):
+ """ This shows how to test helper classes' methods """
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
+ session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass')
+ helper = volume_utils.VolumeHelper
+ helper.XenAPI = session.get_imported_xenapi()
+ vol = self._create_volume()
+ info = helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
+ label = 'SR-%s' % vol['ec2_id']
+ description = 'Test-SR'
+ sr_ref = helper.create_iscsi_storage(session, info, label, description)
+ srs = fake.get_all('SR')
+ self.assertEqual(sr_ref, srs[0])
+ db.volume_destroy(context.get_admin_context(), vol['id'])
+
+ def test_parse_volume_info_raise_exception(self):
+ """ This shows how to test helper classes' methods """
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
+ session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass')
+ helper = volume_utils.VolumeHelper
+ helper.XenAPI = session.get_imported_xenapi()
+ vol = self._create_volume()
+ # oops, wrong mount point!
+ self.assertRaises(volume_utils.StorageError,
+ helper.parse_volume_info,
+ vol['ec2_id'],
+ '/dev/sd')
+ db.volume_destroy(context.get_admin_context(), vol['id'])
+
+ def test_attach_volume(self):
+ """ This shows how to test Ops classes' methods """
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
+ conn = xenapi_conn.get_connection(False)
+ volume = self._create_volume()
+ instance = db.instance_create(self.values)
+ fake.create_vm(instance.name, 'Running')
+ result = conn.attach_volume(instance.name, volume['ec2_id'],
+ '/dev/sdc')
+
+ def check():
+ # check that the VM has a VBD attached to it
+ # Get XenAPI reference for the VM
+ vms = fake.get_all('VM')
+ # Get XenAPI record for VBD
+ vbds = fake.get_all('VBD')
+ vbd = fake.get_record('VBD', vbds[0])
+ vm_ref = vbd['VM']
+ self.assertEqual(vm_ref, vms[0])
+
+ check()
+
+ def test_attach_volume_raise_exception(self):
+ """ This shows how to test when exceptions are raised """
+ stubs.stubout_session(self.stubs,
+ stubs.FakeSessionForVolumeFailedTests)
+ conn = xenapi_conn.get_connection(False)
+ volume = self._create_volume()
+ instance = db.instance_create(self.values)
+ fake.create_vm(instance.name, 'Running')
+ self.assertRaises(Exception,
+ conn.attach_volume,
+ instance.name,
+ volume['ec2_id'],
+ '/dev/sdc')
+
+ def tearDown(self):
+ super(XenAPIVolumeTestCase, self).tearDown()
+ self.stubs.UnsetAll()
+
+
+class XenAPIVMTestCase(test.TestCase):
+ """
+ Unit tests for VM operations
+ """
+ def setUp(self):
+ super(XenAPIVMTestCase, self).setUp()
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake',
+ admin=True)
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.stubs = stubout.StubOutForTesting()
+ FLAGS.xenapi_connection_url = 'test_url'
+ FLAGS.xenapi_connection_password = 'test_pass'
+ fake.reset()
+ fakes.stub_out_db_instance_api(self.stubs)
+ fake.create_network('fake', FLAGS.flat_network_bridge)
+
+ def test_list_instances_0(self):
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
+ conn = xenapi_conn.get_connection(False)
+ instances = conn.list_instances()
+ self.assertEquals(instances, [])
+
+ def test_spawn(self):
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
+ values = {'name': 1, 'id': 1,
+ 'project_id': self.project.id,
+ 'user_id': self.user.id,
+ 'image_id': 1,
+ 'kernel_id': 2,
+ 'ramdisk_id': 3,
+ 'instance_type': 'm1.large',
+ 'mac_address': 'aa:bb:cc:dd:ee:ff',
+ }
+ conn = xenapi_conn.get_connection(False)
+ instance = db.instance_create(values)
+ conn.spawn(instance)
+
+ def check():
+ instances = conn.list_instances()
+ self.assertEquals(instances, [1])
+
+ # Get Nova record for VM
+ vm_info = conn.get_info(1)
+
+ # Get XenAPI record for VM
+ vms = fake.get_all('VM')
+ vm = fake.get_record('VM', vms[0])
+
+ # Check that m1.large above turned into the right thing.
+ instance_type = instance_types.INSTANCE_TYPES['m1.large']
+ mem_kib = long(instance_type['memory_mb']) << 10
+ mem_bytes = str(mem_kib << 10)
+ vcpus = instance_type['vcpus']
+ self.assertEquals(vm_info['max_mem'], mem_kib)
+ self.assertEquals(vm_info['mem'], mem_kib)
+ self.assertEquals(vm['memory_static_max'], mem_bytes)
+ self.assertEquals(vm['memory_dynamic_max'], mem_bytes)
+ self.assertEquals(vm['memory_dynamic_min'], mem_bytes)
+ self.assertEquals(vm['VCPUs_max'], str(vcpus))
+ self.assertEquals(vm['VCPUs_at_startup'], str(vcpus))
+
+ # Check that the VM is running according to Nova
+ self.assertEquals(vm_info['state'], power_state.RUNNING)
+
+ # Check that the VM is running according to XenAPI.
+ self.assertEquals(vm['power_state'], 'Running')
+
+ check()
+
+ def tearDown(self):
+ super(XenAPIVMTestCase, self).tearDown()
+ self.manager.delete_project(self.project)
+ self.manager.delete_user(self.user)
+ self.stubs.UnsetAll()
diff --git a/nova/tests/validator_unittest.py b/nova/tests/validator_unittest.py
deleted file mode 100644
index b5f1c0667..000000000
--- a/nova/tests/validator_unittest.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import unittest
-
-from nova import flags
-from nova import test
-from nova import validate
-
-
-class ValidationTestCase(test.TrialTestCase):
- def setUp(self):
- super(ValidationTestCase, self).setUp()
-
- def tearDown(self):
- super(ValidationTestCase, self).tearDown()
-
- def test_type_validation(self):
- self.assertTrue(type_case("foo", 5, 1))
- self.assertRaises(TypeError, type_case, "bar", "5", 1)
- self.assertRaises(TypeError, type_case, None, 5, 1)
-
-
-@validate.typetest(instanceid=str, size=int, number_of_instances=int)
-def type_case(instanceid, size, number_of_instances):
- return True
diff --git a/nova/tests/xenapi/__init__.py b/nova/tests/xenapi/__init__.py
new file mode 100644
index 000000000..1dd02bdc1
--- /dev/null
+++ b/nova/tests/xenapi/__init__.py
@@ -0,0 +1,20 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+:mod:`xenapi` -- Stubs for XenAPI
+=================================
+"""
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
new file mode 100644
index 000000000..1dacad6a3
--- /dev/null
+++ b/nova/tests/xenapi/stubs.py
@@ -0,0 +1,94 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Citrix Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Stubouts, mocks and fixtures for the test suite"""
+
+from nova.virt import xenapi_conn
+from nova.virt.xenapi import fake
+
+
+def stubout_session(stubs, cls):
+ """ Stubs out two methods from XenAPISession """
+ def fake_import(self):
+ """ Stubs out get_imported_xenapi of XenAPISession """
+ fake_module = 'nova.virt.xenapi.fake'
+ from_list = ['fake']
+ return __import__(fake_module, globals(), locals(), from_list, -1)
+
+ stubs.Set(xenapi_conn.XenAPISession, '_create_session',
+ lambda s, url: cls(url))
+ stubs.Set(xenapi_conn.XenAPISession, 'get_imported_xenapi',
+ fake_import)
+
+
+class FakeSessionForVMTests(fake.SessionBase):
+ """ Stubs out a XenAPISession for VM tests """
+ def __init__(self, uri):
+ super(FakeSessionForVMTests, self).__init__(uri)
+
+ def network_get_all_records_where(self, _1, _2):
+ return self.xenapi.network.get_all_records()
+
+ def host_call_plugin(self, _1, _2, _3, _4, _5):
+ return ''
+
+ def VM_start(self, _1, ref, _2, _3):
+ vm = fake.get_record('VM', ref)
+ if vm['power_state'] != 'Halted':
+ raise fake.Failure(['VM_BAD_POWER_STATE', ref, 'Halted',
+ vm['power_state']])
+ vm['power_state'] = 'Running'
+ vm['is_a_template'] = False
+ vm['is_control_domain'] = False
+
+
+class FakeSessionForVolumeTests(fake.SessionBase):
+ """ Stubs out a XenAPISession for Volume tests """
+ def __init__(self, uri):
+ super(FakeSessionForVolumeTests, self).__init__(uri)
+
+ def VBD_plug(self, _1, ref):
+ rec = fake.get_record('VBD', ref)
+ rec['currently-attached'] = True
+
+ def VDI_introduce(self, _1, uuid, _2, _3, _4, _5,
+ _6, _7, _8, _9, _10, _11):
+ valid_vdi = False
+ refs = fake.get_all('VDI')
+ for ref in refs:
+ rec = fake.get_record('VDI', ref)
+ if rec['uuid'] == uuid:
+ valid_vdi = True
+ if not valid_vdi:
+ raise fake.Failure([['INVALID_VDI', 'session', self._session]])
+
+
+class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests):
+ """ Stubs out a XenAPISession for Volume tests: it injects failures """
+ def __init__(self, uri):
+ super(FakeSessionForVolumeFailedTests, self).__init__(uri)
+
+ def VDI_introduce(self, _1, uuid, _2, _3, _4, _5,
+ _6, _7, _8, _9, _10, _11):
+ # This is for testing failure
+ raise fake.Failure([['INVALID_VDI', 'session', self._session]])
+
+ def PBD_unplug(self, _1, ref):
+ rec = fake.get_record('PBD', ref)
+ rec['currently-attached'] = False
+
+ def SR_forget(self, _1, ref):
+ pass