diff options
| author | Eric Day <eday@oddments.org> | 2010-10-22 00:48:27 -0700 |
|---|---|---|
| committer | Eric Day <eday@oddments.org> | 2010-10-22 00:48:27 -0700 |
| commit | a53aea8fedcd9cfab8b54eaaff6a1ae1d2c60684 (patch) | |
| tree | b4ec125f23c515d03eff8d2112672ccdbe5ff9fd | |
| parent | 8c6ef1380426aca6d7c82e3489789ccde8ee047c (diff) | |
| download | nova-a53aea8fedcd9cfab8b54eaaff6a1ae1d2c60684.tar.gz nova-a53aea8fedcd9cfab8b54eaaff6a1ae1d2c60684.tar.xz nova-a53aea8fedcd9cfab8b54eaaff6a1ae1d2c60684.zip | |
PEP8 cleanup in nova/tests, except for tests. There should be no functional changes here, just style changes to get violations down.
26 files changed, 290 insertions, 188 deletions
diff --git a/nova/tests/access_unittest.py b/nova/tests/access_unittest.py index 8167259c4..0f66c0a26 100644 --- a/nova/tests/access_unittest.py +++ b/nova/tests/access_unittest.py @@ -29,9 +29,12 @@ from nova.auth import manager FLAGS = flags.FLAGS + + class Context(object): pass + class AccessTestCase(test.TrialTestCase): def setUp(self): super(AccessTestCase, self).setUp() @@ -56,9 +59,11 @@ class AccessTestCase(test.TrialTestCase): self.project.add_role(self.testnet, 'netadmin') self.project.add_role(self.testsys, 'sysadmin') #user is set in each test + def noopWSGIApp(environ, start_response): start_response('200 OK', []) return [''] + self.mw = ec2.Authorizer(noopWSGIApp) self.mw.action_roles = {'str': { '_allow_all': ['all'], @@ -80,7 +85,7 @@ class AccessTestCase(test.TrialTestCase): def response_status(self, user, methodName): ctxt = context.RequestContext(user, self.project) - environ = {'ec2.context' : ctxt, + environ = {'ec2.context': ctxt, 'ec2.controller': 'some string', 'ec2.action': methodName} req = webob.Request.blank('/', environ) diff --git a/nova/tests/api/__init__.py b/nova/tests/api/__init__.py index a0681f414..46f09e906 100644 --- a/nova/tests/api/__init__.py +++ b/nova/tests/api/__init__.py @@ -66,8 +66,7 @@ class Test(unittest.TestCase): def test_metadata(self): def go(url): - result = self._request(url, 'ec2', - REMOTE_ADDR='128.192.151.2') + result = self._request(url, 'ec2', REMOTE_ADDR='128.192.151.2') # Each should get to the ORM layer and fail to find the IP self.assertRaises(nova.exception.NotFound, go, '/latest/') self.assertRaises(nova.exception.NotFound, go, '/2009-04-04/') @@ -78,6 +77,5 @@ class Test(unittest.TestCase): self.assertTrue('2007-12-15\n' in result.body) - if __name__ == '__main__': unittest.main() diff --git a/nova/tests/api/fakes.py b/nova/tests/api/fakes.py index d0a2cc027..0aedcaff0 100644 --- a/nova/tests/api/fakes.py +++ b/nova/tests/api/fakes.py @@ -1,6 +1,24 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import webob.dec from nova import wsgi + class APIStub(object): """Class to verify request and mark it was called.""" @webob.dec.wsgify diff --git a/nova/tests/api/openstack/__init__.py b/nova/tests/api/openstack/__init__.py index b534897f5..2e357febe 100644 --- a/nova/tests/api/openstack/__init__.py +++ b/nova/tests/api/openstack/__init__.py @@ -27,11 +27,13 @@ class RateLimitingMiddlewareTest(unittest.TestCase): def test_get_action_name(self): middleware = RateLimitingMiddleware(APIStub()) + def verify(method, url, action_name): req = Request.blank(url) req.method = method action = middleware.get_action_name(req) self.assertEqual(action, action_name) + verify('PUT', '/servers/4', 'PUT') verify('DELETE', '/servers/4', 'DELETE') verify('POST', '/images/4', 'POST') @@ -60,7 +62,7 @@ class RateLimitingMiddlewareTest(unittest.TestCase): middleware = RateLimitingMiddleware(APIStub()) self.exhaust(middleware, 'POST', '/servers/4', 'usr1', 10) self.exhaust(middleware, 'POST', '/images/4', 'usr2', 10) - self.assertTrue(set(middleware.limiter._levels) == + self.assertTrue(set(middleware.limiter._levels) == set(['usr1:POST', 'usr1:POST servers', 'usr2:POST'])) def test_POST_servers_action_correctly_ratelimited(self): @@ -85,19 +87,19 @@ class LimiterTest(unittest.TestCase): def test_limiter(self): items = range(2000) req = Request.blank('/') - self.assertEqual(limited(items, req), items[ :1000]) + self.assertEqual(limited(items, req), items[:1000]) req = Request.blank('/?offset=0') - self.assertEqual(limited(items, req), items[ :1000]) + self.assertEqual(limited(items, req), items[:1000]) req = Request.blank('/?offset=3') self.assertEqual(limited(items, req), items[3:1003]) req = Request.blank('/?offset=2005') self.assertEqual(limited(items, req), []) req = Request.blank('/?limit=10') - self.assertEqual(limited(items, req), items[ :10]) + self.assertEqual(limited(items, req), items[:10]) req = Request.blank('/?limit=0') - self.assertEqual(limited(items, req), items[ :1000]) + self.assertEqual(limited(items, req), items[:1000]) req = Request.blank('/?limit=3000') - self.assertEqual(limited(items, req), items[ :1000]) + self.assertEqual(limited(items, req), items[:1000]) req = Request.blank('/?offset=1&limit=3') self.assertEqual(limited(items, req), items[1:4]) req = Request.blank('/?offset=3&limit=0') diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py index 14170fbb2..7ecb72ab3 100644 --- a/nova/tests/api/openstack/fakes.py +++ b/nova/tests/api/openstack/fakes.py @@ -36,7 +36,7 @@ from nova.wsgi import Router FLAGS = flags.FLAGS -class Context(object): +class Context(object): pass @@ -84,11 +84,11 @@ def stub_out_image_service(stubs): def stub_out_auth(stubs): def fake_auth_init(self, app): self.application = app - - stubs.Set(nova.api.openstack.AuthMiddleware, - '__init__', fake_auth_init) - stubs.Set(nova.api.openstack.AuthMiddleware, - '__call__', fake_wsgi) + + stubs.Set(nova.api.openstack.AuthMiddleware, + '__init__', fake_auth_init) + stubs.Set(nova.api.openstack.AuthMiddleware, + '__call__', fake_wsgi) def stub_out_rate_limiting(stubs): @@ -105,7 +105,7 @@ def stub_out_rate_limiting(stubs): def stub_out_networking(stubs): def get_my_ip(): - return '127.0.0.1' + return '127.0.0.1' stubs.Set(nova.utils, 'get_my_ip', get_my_ip) FLAGS.FAKE_subdomain = 'api' @@ -137,7 +137,6 @@ def stub_out_glance(stubs, initial_fixtures=[]): return id def fake_update_image_metadata(self, image_id, image_data): - f = self.fake_get_image_metadata(image_id) if not f: raise exc.NotFound @@ -145,7 +144,6 @@ def stub_out_glance(stubs, initial_fixtures=[]): f.update(image_data) def fake_delete_image_metadata(self, image_id): - f = self.fake_get_image_metadata(image_id) if not f: raise exc.NotFound @@ -164,9 +162,11 @@ def stub_out_glance(stubs, initial_fixtures=[]): fake_parallax_client.fake_get_image_metadata) stubs.Set(nova.image.services.glance.ParallaxClient, 'add_image_metadata', fake_parallax_client.fake_add_image_metadata) - stubs.Set(nova.image.services.glance.ParallaxClient, 'update_image_metadata', + stubs.Set(nova.image.services.glance.ParallaxClient, + 'update_image_metadata', fake_parallax_client.fake_update_image_metadata) - stubs.Set(nova.image.services.glance.ParallaxClient, 'delete_image_metadata', + stubs.Set(nova.image.services.glance.ParallaxClient, + 'delete_image_metadata', fake_parallax_client.fake_delete_image_metadata) stubs.Set(nova.image.services.glance.GlanceImageService, 'delete_all', fake_parallax_client.fake_delete_all) @@ -174,7 +174,7 @@ def stub_out_glance(stubs, initial_fixtures=[]): class FakeToken(object): def __init__(self, **kwargs): - for k,v in kwargs.iteritems(): + for k, v in kwargs.iteritems(): setattr(self, k, v) @@ -200,7 +200,7 @@ class FakeAuthDatabase(object): class FakeAuthManager(object): auth_data = {} - def add_user(self, key, user): + def add_user(self, key, user): FakeAuthManager.auth_data[key] = user def get_user(self, uid): diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py index 61d17d7e8..b63da187f 100644 --- a/nova/tests/api/openstack/test_auth.py +++ b/nova/tests/api/openstack/test_auth.py @@ -1,3 +1,20 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import datetime import unittest @@ -11,7 +28,9 @@ import nova.auth.manager from nova import auth from nova.tests.api.openstack import fakes + class Test(unittest.TestCase): + def setUp(self): self.stubs = stubout.StubOutForTesting() self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager, @@ -42,7 +61,7 @@ class Test(unittest.TestCase): def test_authorize_token(self): f = fakes.FakeAuthManager() f.add_user('derp', nova.auth.manager.User(1, 'herp', None, None, None)) - + req = webob.Request.blank('/v1.0/') req.headers['X-Auth-User'] = 'herp' req.headers['X-Auth-Key'] = 'derp' @@ -63,14 +82,14 @@ class Test(unittest.TestCase): result = req.get_response(nova.api.API()) self.assertEqual(result.status, '200 OK') self.assertEqual(result.headers['X-Test-Success'], 'True') - + def test_token_expiry(self): self.destroy_called = False token_hash = 'bacon' def destroy_token_mock(meh, context, token): self.destroy_called = True - + def bad_token(meh, context, token_hash): return fakes.FakeToken( token_hash=token_hash, diff --git a/nova/tests/api/openstack/test_faults.py b/nova/tests/api/openstack/test_faults.py index 70a811469..fda2b5ede 100644 --- a/nova/tests/api/openstack/test_faults.py +++ b/nova/tests/api/openstack/test_faults.py @@ -1,3 +1,20 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + import unittest import webob import webob.dec @@ -5,6 +22,7 @@ import webob.exc from nova.api.openstack import faults + class TestFaults(unittest.TestCase): def test_fault_parts(self): @@ -19,7 +37,7 @@ class TestFaults(unittest.TestCase): def test_retry_header(self): req = webob.Request.blank('/.xml') - exc = webob.exc.HTTPRequestEntityTooLarge(explanation='sorry', + exc = webob.exc.HTTPRequestEntityTooLarge(explanation='sorry', headers={'Retry-After': 4}) f = faults.Fault(exc) resp = req.get_response(f) diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py index e156c0957..d61c3a99b 100644 --- a/nova/tests/api/openstack/test_images.py +++ b/nova/tests/api/openstack/test_images.py @@ -90,7 +90,7 @@ class BaseImageServiceTests(object): id = self.service.create(fixture) fixture['status'] = 'in progress' - + self.service.update(id, fixture) new_image_data = self.service.show(id) self.assertEquals('in progress', new_image_data['status']) @@ -121,7 +121,7 @@ class BaseImageServiceTests(object): num_images = len(self.service.index()) self.assertEquals(2, num_images, str(self.service.index())) - + self.service.delete(ids[0]) num_images = len(self.service.index()) @@ -135,7 +135,8 @@ class LocalImageServiceTest(unittest.TestCase, def setUp(self): self.stubs = stubout.StubOutForTesting() - self.service = utils.import_object('nova.image.service.LocalImageService') + service_class = 'nova.image.service.LocalImageService' + self.service = utils.import_object(service_class) def tearDown(self): self.service.delete_all() @@ -150,7 +151,8 @@ class GlanceImageServiceTest(unittest.TestCase, def setUp(self): self.stubs = stubout.StubOutForTesting() fakes.stub_out_glance(self.stubs) - self.service = utils.import_object('nova.image.services.glance.GlanceImageService') + service_class = 'nova.image.services.glance.GlanceImageService' + self.service = utils.import_object(service_class) self.service.delete_all() def tearDown(self): @@ -172,8 +174,7 @@ class ImageControllerWithGlanceServiceTest(unittest.TestCase): 'deleted': False, 'is_public': True, 'status': 'available', - 'image_type': 'kernel' - }, + 'image_type': 'kernel'}, {'id': 'slkduhfas73kkaskgdas', 'name': 'public image #2', 'created_at': str(datetime.datetime.utcnow()), @@ -182,9 +183,7 @@ class ImageControllerWithGlanceServiceTest(unittest.TestCase): 'deleted': False, 'is_public': True, 'status': 'available', - 'image_type': 'ramdisk' - }, - ] + 'image_type': 'ramdisk'}] def setUp(self): self.orig_image_service = FLAGS.image_service @@ -211,7 +210,8 @@ class ImageControllerWithGlanceServiceTest(unittest.TestCase): in self.IMAGE_FIXTURES] for image in res_dict['images']: - self.assertEquals(1, fixture_index.count(image), "image %s not in fixture index!" % str(image)) + self.assertEquals(1, fixture_index.count(image), + "image %s not in fixture index!" % str(image)) def test_get_image_details(self): req = webob.Request.blank('/v1.0/images/detail') @@ -219,4 +219,5 @@ class ImageControllerWithGlanceServiceTest(unittest.TestCase): res_dict = json.loads(res.body) for image in res_dict['images']: - self.assertEquals(1, self.IMAGE_FIXTURES.count(image), "image %s not in fixtures!" % str(image)) + self.assertEquals(1, self.IMAGE_FIXTURES.count(image), + "image %s not in fixtures!" % str(image)) diff --git a/nova/tests/api/openstack/test_ratelimiting.py b/nova/tests/api/openstack/test_ratelimiting.py index ad9e67454..4c9d6bc23 100644 --- a/nova/tests/api/openstack/test_ratelimiting.py +++ b/nova/tests/api/openstack/test_ratelimiting.py @@ -6,6 +6,7 @@ import webob import nova.api.openstack.ratelimiting as ratelimiting + class LimiterTest(unittest.TestCase): def setUp(self): @@ -66,13 +67,16 @@ class LimiterTest(unittest.TestCase): class FakeLimiter(object): """Fake Limiter class that you can tell how to behave.""" + def __init__(self, test): self._action = self._username = self._delay = None self.test = test + def mock(self, action, username, delay): self._action = action self._username = username self._delay = delay + def perform(self, action, username): self.test.assertEqual(action, self._action) self.test.assertEqual(username, self._username) @@ -88,7 +92,7 @@ class WSGIAppTest(unittest.TestCase): def test_invalid_methods(self): requests = [] for method in ['GET', 'PUT', 'DELETE']: - req = webob.Request.blank('/limits/michael/breakdance', + req = webob.Request.blank('/limits/michael/breakdance', dict(REQUEST_METHOD=method)) requests.append(req) for req in requests: @@ -180,7 +184,7 @@ def wire_HTTPConnection_to_WSGI(host, app): the connection object will be a fake. Its requests will be sent directly to the given WSGI app rather than through a socket. - + Code connecting to hosts other than host will not be affected. This method may be called multiple times to map different hosts to @@ -189,13 +193,16 @@ def wire_HTTPConnection_to_WSGI(host, app): class HTTPConnectionDecorator(object): """Wraps the real HTTPConnection class so that when you instantiate the class you might instead get a fake instance.""" + def __init__(self, wrapped): self.wrapped = wrapped + def __call__(self, connection_host, *args, **kwargs): if connection_host == host: return FakeHttplibConnection(app, host) else: return self.wrapped(connection_host, *args, **kwargs) + httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection) diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py index d1ee533b6..55efcf733 100644 --- a/nova/tests/api/openstack/test_servers.py +++ b/nova/tests/api/openstack/test_servers.py @@ -32,9 +32,9 @@ from nova.tests.api.openstack import fakes FLAGS = flags.FLAGS - FLAGS.verbose = True + def return_server(context, id): return stub_instance(id) @@ -44,10 +44,8 @@ def return_servers(context, user_id=1): def stub_instance(id, user_id=1): - return Instance( - id=id, state=0, image_id=10, server_name='server%s'%id, - user_id=user_id - ) + return Instance(id=id, state=0, image_id=10, server_name='server%s' % id, + user_id=user_id) class ServersTest(unittest.TestCase): @@ -61,9 +59,10 @@ class ServersTest(unittest.TestCase): fakes.stub_out_key_pair_funcs(self.stubs) fakes.stub_out_image_service(self.stubs) self.stubs.Set(nova.db.api, 'instance_get_all', return_servers) - self.stubs.Set(nova.db.api, 'instance_get_by_internal_id', return_server) - self.stubs.Set(nova.db.api, 'instance_get_all_by_user', - return_servers) + self.stubs.Set(nova.db.api, 'instance_get_by_internal_id', + return_server) + self.stubs.Set(nova.db.api, 'instance_get_all_by_user', + return_servers) def tearDown(self): self.stubs.UnsetAll() @@ -79,17 +78,17 @@ class ServersTest(unittest.TestCase): req = webob.Request.blank('/v1.0/servers') res = req.get_response(nova.api.API()) res_dict = json.loads(res.body) - + i = 0 for s in res_dict['servers']: self.assertEqual(s['id'], i) - self.assertEqual(s['name'], 'server%d'%i) + self.assertEqual(s['name'], 'server%d' % i) self.assertEqual(s.get('imageId', None), None) i += 1 def test_create_instance(self): def server_update(context, id, params): - pass + pass def instance_create(context, inst): class Foo(object): @@ -98,9 +97,9 @@ class ServersTest(unittest.TestCase): def fake_method(*args, **kwargs): pass - + def project_get_network(context, user_id): - return dict(id='1', host='localhost') + return dict(id='1', host='localhost') def queue_get_for(context, *args): return 'network_topic' @@ -114,11 +113,10 @@ class ServersTest(unittest.TestCase): self.stubs.Set(nova.db.api, 'queue_get_for', queue_get_for) self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip', fake_method) - + body = dict(server=dict( name='server_test', imageId=2, flavorId=2, metadata={}, - personality = {} - )) + personality={})) req = webob.Request.blank('/v1.0/servers') req.method = 'POST' req.body = json.dumps(body) @@ -188,44 +186,41 @@ class ServersTest(unittest.TestCase): req = webob.Request.blank('/v1.0/servers/detail') res = req.get_response(nova.api.API()) res_dict = json.loads(res.body) - + i = 0 for s in res_dict['servers']: self.assertEqual(s['id'], i) - self.assertEqual(s['name'], 'server%d'%i) + self.assertEqual(s['name'], 'server%d' % i) self.assertEqual(s['imageId'], 10) i += 1 def test_server_reboot(self): body = dict(server=dict( name='server_test', imageId=2, flavorId=2, metadata={}, - personality = {} - )) + personality={})) req = webob.Request.blank('/v1.0/servers/1/action') req.method = 'POST' - req.content_type= 'application/json' + req.content_type = 'application/json' req.body = json.dumps(body) res = req.get_response(nova.api.API()) def test_server_rebuild(self): body = dict(server=dict( name='server_test', imageId=2, flavorId=2, metadata={}, - personality = {} - )) + personality={})) req = webob.Request.blank('/v1.0/servers/1/action') req.method = 'POST' - req.content_type= 'application/json' + req.content_type = 'application/json' req.body = json.dumps(body) res = req.get_response(nova.api.API()) def test_server_resize(self): body = dict(server=dict( name='server_test', imageId=2, flavorId=2, metadata={}, - personality = {} - )) + personality={})) req = webob.Request.blank('/v1.0/servers/1/action') req.method = 'POST' - req.content_type= 'application/json' + req.content_type = 'application/json' req.body = json.dumps(body) res = req.get_response(nova.api.API()) @@ -234,8 +229,9 @@ class ServersTest(unittest.TestCase): req.method = 'DELETE' self.server_delete_called = False + def instance_destroy_mock(context, id): - self.server_delete_called = True + self.server_delete_called = True self.stubs.Set(nova.db.api, 'instance_destroy', instance_destroy_mock) diff --git a/nova/tests/api/test_wsgi.py b/nova/tests/api/test_wsgi.py index 9425b01d0..604d0cb66 100644 --- a/nova/tests/api/test_wsgi.py +++ b/nova/tests/api/test_wsgi.py @@ -72,7 +72,7 @@ class Test(unittest.TestCase): """Test controller to call from router.""" test = self - def show(self, req, id): # pylint: disable-msg=W0622,C0103 + def show(self, req, id): # pylint: disable-msg=W0622,C0103 """Default action called for requests with an ID.""" self.test.assertEqual(req.path_info, '/tests/123') self.test.assertEqual(id, '123') @@ -95,7 +95,7 @@ class Test(unittest.TestCase): class SerializerTest(unittest.TestCase): def match(self, url, accept, expect): - input_dict = dict(servers=dict(a=(2,3))) + input_dict = dict(servers=dict(a=(2, 3))) expected_xml = '<servers><a>(2,3)</a></servers>' expected_json = '{"servers":{"a":[2,3]}}' req = webob.Request.blank(url, headers=dict(Accept=accept)) diff --git a/nova/tests/api_integration.py b/nova/tests/api_integration.py index 23a88f083..54403c655 100644 --- a/nova/tests/api_integration.py +++ b/nova/tests/api_integration.py @@ -28,16 +28,17 @@ CLC_IP = '127.0.0.1' CLC_PORT = 8773 REGION = 'test' + def get_connection(): - return boto.connect_ec2 ( + return boto.connect_ec2( aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, is_secure=False, region=RegionInfo(None, REGION, CLC_IP), port=CLC_PORT, path='/services/Cloud', - debug=99 - ) + debug=99) + class APIIntegrationTests(unittest.TestCase): def test_001_get_all_images(self): @@ -51,4 +52,3 @@ if __name__ == '__main__': #print conn.get_all_key_pairs() #print conn.create_key_pair #print conn.create_security_group('name', 'description') - diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py index 55a7ff32f..80493b10a 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/api_unittest.py @@ -99,6 +99,7 @@ class XmlConversionTestCase(test.BaseTestCase): self.assertEqual(conv('-'), '-') self.assertEqual(conv('-0'), 0) + class ApiEc2TestCase(test.BaseTestCase): """Unit test for the cloud controller on an EC2 API""" def setUp(self): @@ -138,7 +139,6 @@ class ApiEc2TestCase(test.BaseTestCase): self.manager.delete_project(project) self.manager.delete_user(user) - def test_get_all_key_pairs(self): """Test that, after creating a user and project and generating a key pair, that the API call to list key pairs works properly""" @@ -183,7 +183,7 @@ class ApiEc2TestCase(test.BaseTestCase): self.manager.add_role('fake', 'netadmin') project.add_role('fake', 'netadmin') - security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \ + security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8))) self.ec2.create_security_group(security_group_name, 'test group') @@ -217,10 +217,11 @@ class ApiEc2TestCase(test.BaseTestCase): self.manager.add_role('fake', 'netadmin') project.add_role('fake', 'netadmin') - security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \ + security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8))) - group = self.ec2.create_security_group(security_group_name, 'test group') + group = self.ec2.create_security_group(security_group_name, + 'test group') self.expect_http() self.mox.ReplayAll() @@ -282,12 +283,14 @@ class ApiEc2TestCase(test.BaseTestCase): self.manager.add_role('fake', 'netadmin') project.add_role('fake', 'netadmin') - security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \ + rand_string = 'sdiuisudfsdcnpaqwertasd' + security_group_name = "".join(random.choice(rand_string) + for x in range(random.randint(4, 8))) + other_security_group_name = "".join(random.choice(rand_string) for x in range(random.randint(4, 8))) - other_security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \ - for x in range(random.randint(4, 8))) - group = self.ec2.create_security_group(security_group_name, 'test group') + group = self.ec2.create_security_group(security_group_name, + 'test group') self.expect_http() self.mox.ReplayAll() @@ -313,9 +316,8 @@ class ApiEc2TestCase(test.BaseTestCase): if group.name == security_group_name: self.assertEquals(len(group.rules), 1) self.assertEquals(len(group.rules[0].grants), 1) - self.assertEquals(str(group.rules[0].grants[0]), - '%s-%s' % (other_security_group_name, 'fake')) - + self.assertEquals(str(group.rules[0].grants[0]), '%s-%s' % + (other_security_group_name, 'fake')) self.expect_http() self.mox.ReplayAll() diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py index 97d22d702..fe891beee 100644 --- a/nova/tests/auth_unittest.py +++ b/nova/tests/auth_unittest.py @@ -28,6 +28,7 @@ from nova.api.ec2 import cloud FLAGS = flags.FLAGS + class user_generator(object): def __init__(self, manager, **user_state): if 'name' not in user_state: @@ -41,6 +42,7 @@ class user_generator(object): def __exit__(self, value, type, trace): self.manager.delete_user(self.user) + class project_generator(object): def __init__(self, manager, **project_state): if 'name' not in project_state: @@ -56,6 +58,7 @@ class project_generator(object): def __exit__(self, value, type, trace): self.manager.delete_project(self.project) + class user_and_project_generator(object): def __init__(self, manager, user_state={}, project_state={}): self.manager = manager @@ -75,6 +78,7 @@ class user_and_project_generator(object): self.manager.delete_user(self.user) self.manager.delete_project(self.project) + class AuthManagerTestCase(object): def setUp(self): FLAGS.auth_driver = self.auth_driver @@ -96,7 +100,7 @@ class AuthManagerTestCase(object): self.assertEqual('private-party', u.access) def test_004_signature_is_valid(self): - #self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? )) + #self.assertTrue(self.manager.authenticate(**boto.generate_url ...? )) pass #raise NotImplementedError @@ -127,7 +131,7 @@ class AuthManagerTestCase(object): self.assertFalse(self.manager.has_role('test1', 'itsec')) def test_can_create_and_get_project(self): - with user_and_project_generator(self.manager) as (u,p): + with user_and_project_generator(self.manager) as (u, p): self.assert_(self.manager.get_user('test1')) self.assert_(self.manager.get_user('test1')) self.assert_(self.manager.get_project('testproj')) @@ -321,6 +325,7 @@ class AuthManagerTestCase(object): self.assertEqual('secret', user.secret) self.assertTrue(user.is_admin()) + class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase): auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver' @@ -337,6 +342,7 @@ class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase): except: self.skip = True + class AuthManagerDbTestCase(AuthManagerTestCase, test.TrialTestCase): auth_driver = 'nova.auth.dbdriver.DbDriver' diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 835bfdf49..2d61d2675 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -46,13 +46,13 @@ from nova.objectstore import image FLAGS = flags.FLAGS - # Temp dirs for working with image attributes through the cloud controller # (stole this from objectstore_unittest.py) OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-') IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images') os.makedirs(IMAGES_PATH) + class CloudTestCase(test.TrialTestCase): def setUp(self): super(CloudTestCase, self).setUp() @@ -97,17 +97,17 @@ class CloudTestCase(test.TrialTestCase): max_count = 1 kwargs = {'image_id': image_id, 'instance_type': instance_type, - 'max_count': max_count } + 'max_count': max_count} rv = yield self.cloud.run_instances(self.context, **kwargs) instance_id = rv['instancesSet'][0]['instanceId'] - output = yield self.cloud.get_console_output(context=self.context, instance_id=[instance_id]) + output = yield self.cloud.get_console_output(context=self.context, + instance_id=[instance_id]) self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT') # TODO(soren): We need this until we can stop polling in the rpc code # for unit tests. greenthread.sleep(0.3) rv = yield self.cloud.terminate_instances(self.context, [instance_id]) - def test_key_generation(self): result = self._create_key('test') private_key = result['private_key'] @@ -146,8 +146,10 @@ class CloudTestCase(test.TrialTestCase): 'max_count': max_count} rv = yield self.cloud.run_instances(self.context, **kwargs) # TODO: check for proper response - instance = rv['reservationSet'][0][rv['reservationSet'][0].keys()[0]][0] - logging.debug("Need to watch instance %s until it's running..." % instance['instance_id']) + instance_id = rv['reservationSet'][0].keys()[0] + instance = rv['reservationSet'][0][instance_id][0] + logging.debug("Need to watch instance %s until it's running..." % + instance['instance_id']) while True: rv = yield defer.succeed(time.sleep(1)) info = self.cloud._get_instance(instance['instance_id']) @@ -157,14 +159,15 @@ class CloudTestCase(test.TrialTestCase): self.assert_(rv) if connection_type != 'fake': - time.sleep(45) # Should use boto for polling here + time.sleep(45) # Should use boto for polling here for reservations in rv['reservationSet']: # for res_id in reservations.keys(): - # logging.debug(reservations[res_id]) - # for instance in reservations[res_id]: - for instance in reservations[reservations.keys()[0]]: - logging.debug("Terminating instance %s" % instance['instance_id']) - rv = yield self.compute.terminate_instance(instance['instance_id']) + # logging.debug(reservations[res_id]) + # for instance in reservations[res_id]: + for instance in reservations[reservations.keys()[0]]: + instance_id = instance['instance_id'] + logging.debug("Terminating instance %s" % instance_id) + rv = yield self.compute.terminate_instance(instance_id) def test_instance_update_state(self): def instance(num): @@ -183,8 +186,7 @@ class CloudTestCase(test.TrialTestCase): 'groups': ['default'], 'product_codes': None, 'state': 0x01, - 'user_data': '' - } + 'user_data': ''} rv = self.cloud._format_describe_instances(self.context) self.assert_(len(rv['reservationSet']) == 0) @@ -199,7 +201,9 @@ class CloudTestCase(test.TrialTestCase): #self.assert_(len(rv['reservationSet'][0]['instances_set']) == 5) # report 4 nodes each having 1 of the instances #for i in xrange(4): - # self.cloud.update_state('instances', {('node-%s' % i): {('i-%s' % i): instance(i)}}) + # self.cloud.update_state('instances', + # {('node-%s' % i): {('i-%s' % i): + # instance(i)}}) # one instance should be pending still #self.assert_(len(self.cloud.instances['pending'].keys()) == 1) @@ -217,8 +221,10 @@ class CloudTestCase(test.TrialTestCase): @staticmethod def _fake_set_image_description(ctxt, image_id, description): from nova.objectstore import handler + class req: pass + request = req() request.context = ctxt request.args = {'image_id': [image_id], diff --git a/nova/tests/flags_unittest.py b/nova/tests/flags_unittest.py index 714170e5e..b97df075d 100644 --- a/nova/tests/flags_unittest.py +++ b/nova/tests/flags_unittest.py @@ -23,7 +23,9 @@ from nova import test FLAGS = flags.FLAGS flags.DEFINE_string('flags_unittest', 'foo', 'for testing purposes only') + class FlagsTestCase(test.TrialTestCase): + def setUp(self): super(FlagsTestCase, self).setUp() self.FLAGS = flags.FlagValues() @@ -35,7 +37,8 @@ class FlagsTestCase(test.TrialTestCase): self.assert_('false' not in self.FLAGS) self.assert_('true' not in self.FLAGS) - flags.DEFINE_string('string', 'default', 'desc', flag_values=self.FLAGS) + flags.DEFINE_string('string', 'default', 'desc', + flag_values=self.FLAGS) flags.DEFINE_integer('int', 1, 'desc', flag_values=self.FLAGS) flags.DEFINE_bool('false', False, 'desc', flag_values=self.FLAGS) flags.DEFINE_bool('true', True, 'desc', flag_values=self.FLAGS) diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index 44cf47407..b7caed4fd 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -98,7 +98,6 @@ class NetworkTestCase(test.TrialTestCase): self.context.project_id = self.projects[project_num].id self.network.deallocate_fixed_ip(self.context, address) - def test_public_network_association(self): """Makes sure that we can allocaate a public ip""" # TODO(vish): better way of adding floating ips @@ -118,10 +117,12 @@ class NetworkTestCase(test.TrialTestCase): lease_ip(fix_addr) self.assertEqual(float_addr, str(pubnet[0])) self.network.associate_floating_ip(self.context, float_addr, fix_addr) - address = db.instance_get_floating_address(context.get_admin_context(), self.instance_id) + address = db.instance_get_floating_address(context.get_admin_context(), + self.instance_id) self.assertEqual(address, float_addr) self.network.disassociate_floating_ip(self.context, float_addr) - address = db.instance_get_floating_address(context.get_admin_context(), self.instance_id) + address = db.instance_get_floating_address(context.get_admin_context(), + self.instance_id) self.assertEqual(address, None) self.network.deallocate_floating_ip(self.context, float_addr) self.network.deallocate_fixed_ip(self.context, fix_addr) @@ -254,18 +255,24 @@ class NetworkTestCase(test.TrialTestCase): There are ips reserved at the bottom and top of the range. services (network, gateway, CloudPipe, broadcast) """ - network = db.project_get_network(context.get_admin_context(), self.projects[0].id) + network = db.project_get_network(context.get_admin_context(), + self.projects[0].id) net_size = flags.FLAGS.network_size - total_ips = (db.network_count_available_ips(context.get_admin_context(), network['id']) + - db.network_count_reserved_ips(context.get_admin_context(), network['id']) + - db.network_count_allocated_ips(context.get_admin_context(), network['id'])) + admin_context = context.get_admin_context() + total_ips = (db.network_count_available_ips(admin_context, + network['id']) + + db.network_count_reserved_ips(admin_context, + network['id']) + + db.network_count_allocated_ips(admin_context, + network['id'])) self.assertEqual(total_ips, net_size) def test_too_many_addresses(self): """Test for a NoMoreAddresses exception when all fixed ips are used. """ - network = db.project_get_network(context.get_admin_context(), self.projects[0].id) - num_available_ips = db.network_count_available_ips(context.get_admin_context(), + admin_context = context.get_admin_context() + network = db.project_get_network(admin_context, self.projects[0].id) + num_available_ips = db.network_count_available_ips(admin_context, network['id']) addresses = [] instance_ids = [] @@ -276,8 +283,9 @@ class NetworkTestCase(test.TrialTestCase): addresses.append(address) lease_ip(address) - self.assertEqual(db.network_count_available_ips(context.get_admin_context(), - network['id']), 0) + ip_count = db.network_count_available_ips(context.get_admin_context(), + network['id']) + self.assertEqual(ip_count, 0) self.assertRaises(db.NoMoreAddresses, self.network.allocate_fixed_ip, self.context, @@ -287,14 +295,15 @@ class NetworkTestCase(test.TrialTestCase): self.network.deallocate_fixed_ip(self.context, addresses[i]) release_ip(addresses[i]) db.instance_destroy(context.get_admin_context(), instance_ids[i]) - self.assertEqual(db.network_count_available_ips(context.get_admin_context(), - network['id']), - num_available_ips) + ip_count = db.network_count_available_ips(context.get_admin_context(), + network['id']) + self.assertEqual(ip_count, num_available_ips) def is_allocated_in_project(address, project_id): """Returns true if address is in specified project""" - project_net = db.project_get_network(context.get_admin_context(), project_id) + project_net = db.project_get_network(context.get_admin_context(), + project_id) network = db.fixed_ip_get_network(context.get_admin_context(), address) instance = db.fixed_ip_get_instance(context.get_admin_context(), address) # instance exists until release @@ -308,8 +317,10 @@ def binpath(script): def lease_ip(private_ip): """Run add command on dhcpbridge""" - network_ref = db.fixed_ip_get_network(context.get_admin_context(), private_ip) - instance_ref = db.fixed_ip_get_instance(context.get_admin_context(), private_ip) + network_ref = db.fixed_ip_get_network(context.get_admin_context(), + private_ip) + instance_ref = db.fixed_ip_get_instance(context.get_admin_context(), + private_ip) cmd = "%s add %s %s fake" % (binpath('nova-dhcpbridge'), instance_ref['mac_address'], private_ip) @@ -322,8 +333,10 @@ def lease_ip(private_ip): def release_ip(private_ip): """Run del command on dhcpbridge""" - network_ref = db.fixed_ip_get_network(context.get_admin_context(), private_ip) - instance_ref = db.fixed_ip_get_instance(context.get_admin_context(), private_ip) + network_ref = db.fixed_ip_get_network(context.get_admin_context(), + private_ip) + instance_ref = db.fixed_ip_get_instance(context.get_admin_context(), + private_ip) cmd = "%s del %s %s fake" % (binpath('nova-dhcpbridge'), instance_ref['mac_address'], private_ip) diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py index d1604c4e5..061799923 100644 --- a/nova/tests/objectstore_unittest.py +++ b/nova/tests/objectstore_unittest.py @@ -181,7 +181,7 @@ class ObjectStoreTestCase(test.TrialTestCase): class TestHTTPChannel(http.HTTPChannel): """Dummy site required for twisted.web""" - def checkPersistence(self, _, __): # pylint: disable-msg=C0103 + def checkPersistence(self, _, __): # pylint: disable-msg=C0103 """Otherwise we end up with an unclean reactor.""" return False @@ -217,7 +217,6 @@ class S3APITestCase(test.TrialTestCase): # pylint: enable-msg=E1101 self.tcp_port = self.listening_port.getHost().port - if not boto.config.has_section('Boto'): boto.config.add_section('Boto') boto.config.set('Boto', 'num_retries', '0') @@ -234,11 +233,11 @@ class S3APITestCase(test.TrialTestCase): self.conn.get_http_connection = get_http_connection - def _ensure_no_buckets(self, buckets): # pylint: disable-msg=C0111 + def _ensure_no_buckets(self, buckets): # pylint: disable-msg=C0111 self.assertEquals(len(buckets), 0, "Bucket list was not empty") return True - def _ensure_one_bucket(self, buckets, name): # pylint: disable-msg=C0111 + def _ensure_one_bucket(self, buckets, name): # pylint: disable-msg=C0111 self.assertEquals(len(buckets), 1, "Bucket list didn't have exactly one element in it") self.assertEquals(buckets[0].name, name, "Wrong name") diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py index 25c60c616..67245af03 100644 --- a/nova/tests/process_unittest.py +++ b/nova/tests/process_unittest.py @@ -38,6 +38,7 @@ class ProcessTestCase(test.TrialTestCase): def test_execute_stdout(self): pool = process.ProcessPool(2) d = pool.simple_execute('echo test') + def _check(rv): self.assertEqual(rv[0], 'test\n') self.assertEqual(rv[1], '') @@ -49,6 +50,7 @@ class ProcessTestCase(test.TrialTestCase): def test_execute_stderr(self): pool = process.ProcessPool(2) d = pool.simple_execute('cat BAD_FILE', check_exit_code=False) + def _check(rv): self.assertEqual(rv[0], '') self.assert_('No such file' in rv[1]) @@ -72,6 +74,7 @@ class ProcessTestCase(test.TrialTestCase): d4 = pool.simple_execute('sleep 0.005') called = [] + def _called(rv, name): called.append(name) diff --git a/nova/tests/quota_unittest.py b/nova/tests/quota_unittest.py index 92f3be508..9e3afbf4e 100644 --- a/nova/tests/quota_unittest.py +++ b/nova/tests/quota_unittest.py @@ -141,12 +141,13 @@ class QuotaTestCase(test.TrialTestCase): try: db.floating_ip_get_by_address(context.get_admin_context(), address) except exception.NotFound: - db.floating_ip_create(context.get_admin_context(), {'address': address, - 'host': FLAGS.host}) + db.floating_ip_create(context.get_admin_context(), + {'address': address, 'host': FLAGS.host}) float_addr = self.network.allocate_floating_ip(self.context, self.project.id) # NOTE(vish): This assert never fails. When cloud attempts to # make an rpc.call, the test just finishes with OK. It # appears to be something in the magic inline callbacks # that is breaking. - self.assertRaises(cloud.QuotaError, self.cloud.allocate_address, self.context) + self.assertRaises(cloud.QuotaError, self.cloud.allocate_address, + self.context) diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py index c380393a0..f35b65a39 100644 --- a/nova/tests/rpc_unittest.py +++ b/nova/tests/rpc_unittest.py @@ -41,7 +41,7 @@ class RpcTestCase(test.TrialTestCase): topic='test', proxy=self.receiver) self.consumer.attach_to_twisted() - self.context= context.get_admin_context() + self.context = context.get_admin_context() def test_call_succeed(self): """Get a value through rpc call""" @@ -67,9 +67,9 @@ class RpcTestCase(test.TrialTestCase): to an int in the test. """ value = 42 - self.assertFailure(rpc.call_twisted(self.context, - 'test', {"method": "fail", - "args": {"value": value}}), + self.assertFailure(rpc.call_twisted(self.context, 'test', + {"method": "fail", + "args": {"value": value}}), rpc.RemoteError) try: yield rpc.call_twisted(self.context, @@ -101,4 +101,3 @@ class TestReceiver(object): def fail(context, value): """Raises an exception with the value sent in""" raise Exception(value) - diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py index 166a32702..27345d055 100644 --- a/nova/tests/scheduler_unittest.py +++ b/nova/tests/scheduler_unittest.py @@ -34,6 +34,7 @@ from nova.scheduler import driver FLAGS = flags.FLAGS flags.DECLARE('max_cores', 'nova.scheduler.simple') + class TestDriver(driver.Scheduler): """Scheduler Driver for Tests""" def schedule(context, topic, *args, **kwargs): @@ -42,6 +43,7 @@ class TestDriver(driver.Scheduler): def schedule_named_method(context, topic, num): return 'named_host' + class SchedulerTestCase(test.TrialTestCase): """Test case for scheduler""" def setUp(self): diff --git a/nova/tests/service_unittest.py b/nova/tests/service_unittest.py index 6df108a98..e74e0f726 100644 --- a/nova/tests/service_unittest.py +++ b/nova/tests/service_unittest.py @@ -179,7 +179,8 @@ class ServiceTestCase(test.BaseTestCase): binary).AndRaise(exception.NotFound()) service.db.service_create(self.context, service_create).AndReturn(service_ref) - service.db.service_get(self.context, service_ref['id']).AndReturn(service_ref) + service.db.service_get(self.context, + service_ref['id']).AndReturn(service_ref) service.db.service_update(self.context, service_ref['id'], mox.ContainsKeyValue('report_count', 1)) @@ -227,4 +228,3 @@ class ServiceTestCase(test.BaseTestCase): rv = yield s.report_state(host, binary) self.assert_(not s.model_disconnected) - diff --git a/nova/tests/validator_unittest.py b/nova/tests/validator_unittest.py index 84daa135e..b5f1c0667 100644 --- a/nova/tests/validator_unittest.py +++ b/nova/tests/validator_unittest.py @@ -35,7 +35,8 @@ class ValidationTestCase(test.TrialTestCase): self.assertTrue(type_case("foo", 5, 1)) self.assertRaises(TypeError, type_case, "bar", "5", 1) self.assertRaises(TypeError, type_case, None, 5, 1) - + + @validate.typetest(instanceid=str, size=int, number_of_instances=int) def type_case(instanceid, size, number_of_instances): return True diff --git a/nova/tests/virt_unittest.py b/nova/tests/virt_unittest.py index 76af5cabd..ce78d450c 100644 --- a/nova/tests/virt_unittest.py +++ b/nova/tests/virt_unittest.py @@ -29,11 +29,13 @@ from nova.virt import libvirt_conn FLAGS = flags.FLAGS flags.DECLARE('instances_path', 'nova.compute.manager') + class LibvirtConnTestCase(test.TrialTestCase): def setUp(self): super(LibvirtConnTestCase, self).setUp() self.manager = manager.AuthManager() - self.user = self.manager.create_user('fake', 'fake', 'fake', admin=True) + self.user = self.manager.create_user('fake', 'fake', 'fake', + admin=True) self.project = self.manager.create_project('fake', 'fake', 'fake') self.network = utils.import_object(FLAGS.network_manager) FLAGS.instances_path = '' @@ -41,15 +43,15 @@ class LibvirtConnTestCase(test.TrialTestCase): def test_get_uri_and_template(self): ip = '10.11.12.13' - instance = { 'internal_id' : 1, - 'memory_kb' : '1024000', - 'basepath' : '/some/path', - 'bridge_name' : 'br100', - 'mac_address' : '02:12:34:46:56:67', - 'vcpus' : 2, - 'project_id' : 'fake', - 'bridge' : 'br101', - 'instance_type' : 'm1.small'} + instance = {'internal_id': 1, + 'memory_kb': '1024000', + 'basepath': '/some/path', + 'bridge_name': 'br100', + 'mac_address': '02:12:34:46:56:67', + 'vcpus': 2, + 'project_id': 'fake', + 'bridge': 'br101', + 'instance_type': 'm1.small'} user_context = context.RequestContext(project=self.project, user=self.user) @@ -58,36 +60,34 @@ class LibvirtConnTestCase(test.TrialTestCase): self.network.set_network_host(context.get_admin_context(), network_ref['id']) - fixed_ip = { 'address' : ip, - 'network_id' : network_ref['id'] } + fixed_ip = {'address': ip, + 'network_id': network_ref['id']} ctxt = context.get_admin_context() fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip) db.fixed_ip_update(ctxt, ip, {'allocated': True, - 'instance_id': instance_ref['id'] }) - - type_uri_map = { 'qemu' : ('qemu:///system', - [(lambda t: t.find('.').get('type'), 'qemu'), - (lambda t: t.find('./os/type').text, 'hvm'), - (lambda t: t.find('./devices/emulator'), None)]), - 'kvm' : ('qemu:///system', - [(lambda t: t.find('.').get('type'), 'kvm'), - (lambda t: t.find('./os/type').text, 'hvm'), - (lambda t: t.find('./devices/emulator'), None)]), - 'uml' : ('uml:///system', - [(lambda t: t.find('.').get('type'), 'uml'), - (lambda t: t.find('./os/type').text, 'uml')]), - } - - common_checks = [(lambda t: t.find('.').tag, 'domain'), - (lambda t: \ - t.find('./devices/interface/filterref/parameter') \ - .get('name'), 'IP'), - (lambda t: \ - t.find('./devices/interface/filterref/parameter') \ - .get('value'), '10.11.12.13')] - - for (libvirt_type,(expected_uri, checks)) in type_uri_map.iteritems(): + 'instance_id': instance_ref['id']}) + + type_uri_map = {'qemu': ('qemu:///system', + [(lambda t: t.find('.').get('type'), 'qemu'), + (lambda t: t.find('./os/type').text, 'hvm'), + (lambda t: t.find('./devices/emulator'), None)]), + 'kvm': ('qemu:///system', + [(lambda t: t.find('.').get('type'), 'kvm'), + (lambda t: t.find('./os/type').text, 'hvm'), + (lambda t: t.find('./devices/emulator'), None)]), + 'uml': ('uml:///system', + [(lambda t: t.find('.').get('type'), 'uml'), + (lambda t: t.find('./os/type').text, 'uml')])} + + common_checks = [ + (lambda t: t.find('.').tag, 'domain'), + (lambda t: t.find('./devices/interface/filterref/parameter').\ + get('name'), 'IP'), + (lambda t: t.find('./devices/interface/filterref/parameter').\ + get('value'), '10.11.12.13')] + + for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems(): FLAGS.libvirt_type = libvirt_type conn = libvirt_conn.LibvirtConnection(True) @@ -111,19 +111,20 @@ class LibvirtConnTestCase(test.TrialTestCase): # implementation doesn't fiddle around with the FLAGS. testuri = 'something completely different' FLAGS.libvirt_uri = testuri - for (libvirt_type,(expected_uri, checks)) in type_uri_map.iteritems(): + for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems(): FLAGS.libvirt_type = libvirt_type conn = libvirt_conn.LibvirtConnection(True) uri, template = conn.get_uri_and_template() self.assertEquals(uri, testuri) - def tearDown(self): super(LibvirtConnTestCase, self).tearDown() self.manager.delete_project(self.project) self.manager.delete_user(self.user) + class NWFilterTestCase(test.TrialTestCase): + def setUp(self): super(NWFilterTestCase, self).setUp() @@ -131,7 +132,8 @@ class NWFilterTestCase(test.TrialTestCase): pass self.manager = manager.AuthManager() - self.user = self.manager.create_user('fake', 'fake', 'fake', admin=True) + self.user = self.manager.create_user('fake', 'fake', 'fake', + admin=True) self.project = self.manager.create_project('fake', 'fake', 'fake') self.context = context.RequestContext(self.user, self.project) @@ -143,7 +145,6 @@ class NWFilterTestCase(test.TrialTestCase): self.manager.delete_project(self.project) self.manager.delete_user(self.user) - def test_cidr_rule_nwfilter_xml(self): cloud_controller = cloud.CloudController() cloud_controller.create_security_group(self.context, @@ -156,7 +157,6 @@ class NWFilterTestCase(test.TrialTestCase): ip_protocol='tcp', cidr_ip='0.0.0.0/0') - security_group = db.security_group_get_by_name(self.context, 'fake', 'testgroup') @@ -182,15 +182,12 @@ class NWFilterTestCase(test.TrialTestCase): self.assertEqual(ip_conditions[0].getAttribute('srcipmask'), '0.0.0.0') self.assertEqual(ip_conditions[0].getAttribute('dstportstart'), '80') self.assertEqual(ip_conditions[0].getAttribute('dstportend'), '81') - - self.teardown_security_group() def teardown_security_group(self): cloud_controller = cloud.CloudController() cloud_controller.delete_security_group(self.context, 'testgroup') - def setup_and_return_security_group(self): cloud_controller = cloud.CloudController() cloud_controller.create_security_group(self.context, @@ -244,16 +241,19 @@ class NWFilterTestCase(test.TrialTestCase): for required in [secgroup_filter, 'allow-dhcp-server', 'no-arp-spoofing', 'no-ip-spoofing', 'no-mac-spoofing']: - self.assertTrue(required in self.recursive_depends[instance_filter], - "Instance's filter does not include %s" % required) + self.assertTrue(required in + self.recursive_depends[instance_filter], + "Instance's filter does not include %s" % + required) self.security_group = self.setup_and_return_security_group() - db.instance_add_security_group(self.context, inst_id, self.security_group.id) + db.instance_add_security_group(self.context, inst_id, + self.security_group.id) instance = db.instance_get(self.context, inst_id) d = self.fw.setup_nwfilters_for_instance(instance) d.addCallback(_ensure_all_called) - d.addCallback(lambda _:self.teardown_security_group()) + d.addCallback(lambda _: self.teardown_security_group()) return d diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py index 20bae8a7f..fdee30b48 100644 --- a/nova/tests/volume_unittest.py +++ b/nova/tests/volume_unittest.py @@ -59,7 +59,8 @@ class VolumeTestCase(test.TrialTestCase): """Test volume can be created and deleted""" volume_id = self._create_volume() yield self.volume.create_volume(self.context, volume_id) - self.assertEqual(volume_id, db.volume_get(context.get_admin_context(), volume_id).id) + self.assertEqual(volume_id, db.volume_get(context.get_admin_context(), + volume_id).id) yield self.volume.delete_volume(self.context, volume_id) self.assertRaises(exception.NotFound, @@ -114,7 +115,8 @@ class VolumeTestCase(test.TrialTestCase): volume_id = self._create_volume() yield self.volume.create_volume(self.context, volume_id) if FLAGS.fake_tests: - db.volume_attached(self.context, volume_id, instance_id, mountpoint) + db.volume_attached(self.context, volume_id, instance_id, + mountpoint) else: yield self.compute.attach_volume(self.context, instance_id, @@ -154,7 +156,8 @@ class VolumeTestCase(test.TrialTestCase): def _check(volume_id): """Make sure blades aren't duplicated""" volume_ids.append(volume_id) - (shelf_id, blade_id) = db.volume_get_shelf_and_blade(context.get_admin_context(), + admin_context = context.get_admin_context() + (shelf_id, blade_id) = db.volume_get_shelf_and_blade(admin_context, volume_id) shelf_blade = '%s.%s' % (shelf_id, blade_id) self.assert_(shelf_blade not in shelf_blades) |
