summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/__init__.py5
-rw-r--r--nova/tests/access_unittest.py2
-rw-r--r--nova/tests/api/openstack/fakes.py13
-rw-r--r--nova/tests/api/openstack/test_adminapi.py61
-rw-r--r--nova/tests/api/openstack/test_auth.py3
-rw-r--r--nova/tests/api/openstack/test_servers.py37
-rw-r--r--nova/tests/auth_unittest.py33
-rw-r--r--nova/tests/cloud_unittest.py49
-rw-r--r--nova/tests/compute_unittest.py50
-rw-r--r--nova/tests/flags_unittest.py2
-rw-r--r--nova/tests/middleware_unittest.py86
-rw-r--r--nova/tests/misc_unittest.py45
-rw-r--r--nova/tests/network_unittest.py10
-rw-r--r--nova/tests/objectstore_unittest.py4
-rw-r--r--nova/tests/process_unittest.py132
-rw-r--r--nova/tests/quota_unittest.py2
-rw-r--r--nova/tests/rpc_unittest.py65
-rw-r--r--nova/tests/scheduler_unittest.py25
-rw-r--r--nova/tests/service_unittest.py46
-rw-r--r--nova/tests/validator_unittest.py42
-rw-r--r--nova/tests/virt_unittest.py156
-rw-r--r--nova/tests/volume_unittest.py58
22 files changed, 522 insertions, 404 deletions
diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py
index aaf213923..8dc87d0e2 100644
--- a/nova/tests/__init__.py
+++ b/nova/tests/__init__.py
@@ -29,3 +29,8 @@
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
+
+# See http://code.google.com/p/python-nose/issues/detail?id=373
+# The code below enables nosetests to work with i18n _() blocks
+import __builtin__
+setattr(__builtin__, '_', lambda x: x)
diff --git a/nova/tests/access_unittest.py b/nova/tests/access_unittest.py
index 0f66c0a26..58fdea3b5 100644
--- a/nova/tests/access_unittest.py
+++ b/nova/tests/access_unittest.py
@@ -35,7 +35,7 @@ class Context(object):
pass
-class AccessTestCase(test.TrialTestCase):
+class AccessTestCase(test.TestCase):
def setUp(self):
super(AccessTestCase, self).setUp()
um = manager.AuthManager()
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index c3f129a32..21b8aac1c 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -24,9 +24,10 @@ import webob
import webob.dec
from nova import auth
-from nova import utils
-from nova import flags
+from nova import context
from nova import exception as exc
+from nova import flags
+from nova import utils
import nova.api.openstack.auth
from nova.image import service
from nova.image import glance
@@ -58,7 +59,7 @@ def fake_auth_init(self):
@webob.dec.wsgify
def fake_wsgi(self, req):
- req.environ['nova.context'] = dict(user=dict(id=1))
+ req.environ['nova.context'] = context.RequestContext(1, 1)
if req.body:
req.environ['inst_dict'] = json.loads(req.body)
return self.application
@@ -171,6 +172,12 @@ class FakeToken(object):
setattr(self, k, v)
+class FakeRequestContext(object):
+ def __init__(self, user, project):
+ self.user_id = 1
+ self.project_id = 1
+
+
class FakeAuthDatabase(object):
data = {}
diff --git a/nova/tests/api/openstack/test_adminapi.py b/nova/tests/api/openstack/test_adminapi.py
new file mode 100644
index 000000000..1b2e1654d
--- /dev/null
+++ b/nova/tests/api/openstack/test_adminapi.py
@@ -0,0 +1,61 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import stubout
+import webob
+
+import nova.api
+from nova import flags
+from nova.tests.api.openstack import fakes
+
+FLAGS = flags.FLAGS
+
+
+class AdminAPITest(unittest.TestCase):
+ def setUp(self):
+ self.stubs = stubout.StubOutForTesting()
+ fakes.FakeAuthManager.auth_data = {}
+ fakes.FakeAuthDatabase.data = {}
+ fakes.stub_out_networking(self.stubs)
+ fakes.stub_out_rate_limiting(self.stubs)
+ fakes.stub_out_auth(self.stubs)
+ self.allow_admin = FLAGS.allow_admin_api
+
+ def tearDown(self):
+ self.stubs.UnsetAll()
+ FLAGS.allow_admin_api = self.allow_admin
+
+ def test_admin_enabled(self):
+ FLAGS.allow_admin_api = True
+ # We should still be able to access public operations.
+ req = webob.Request.blank('/v1.0/flavors')
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 200)
+ # TODO: Confirm admin operations are available.
+
+ def test_admin_disabled(self):
+ FLAGS.allow_admin_api = False
+ # We should still be able to access public operations.
+ req = webob.Request.blank('/v1.0/flavors')
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 200)
+ # TODO: Confirm admin operations are unavailable.
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py
index 14e720be4..7b427c2db 100644
--- a/nova/tests/api/openstack/test_auth.py
+++ b/nova/tests/api/openstack/test_auth.py
@@ -26,6 +26,7 @@ import nova.api
import nova.api.openstack.auth
import nova.auth.manager
from nova import auth
+from nova import context
from nova.tests.api.openstack import fakes
@@ -35,6 +36,7 @@ class Test(unittest.TestCase):
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
'__init__', fakes.fake_auth_init)
+ self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_rate_limiting(self.stubs)
@@ -131,6 +133,7 @@ class TestLimiter(unittest.TestCase):
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(nova.api.openstack.auth.BasicApiAuthManager,
'__init__', fakes.fake_auth_init)
+ self.stubs.Set(context, 'RequestContext', fakes.FakeRequestContext)
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_networking(self.stubs)
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 8444b6fce..3820f5f27 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -56,11 +56,16 @@ def instance_address(context, instance_id):
def stub_instance(id, user_id=1):
- return Instance(id=id + 123456, state=0, image_id=10, user_id=user_id,
+ return Instance(id=int(id) + 123456, state=0, image_id=10, user_id=user_id,
display_name='server%s' % id, internal_id=id)
+def fake_compute_api(cls, req, id):
+ return True
+
+
class ServersTest(unittest.TestCase):
+
def setUp(self):
self.stubs = stubout.StubOutForTesting()
fakes.FakeAuthManager.auth_data = {}
@@ -82,9 +87,15 @@ class ServersTest(unittest.TestCase):
instance_address)
self.stubs.Set(nova.db.api, 'instance_get_floating_address',
instance_address)
+ self.stubs.Set(nova.compute.api.ComputeAPI, 'pause',
+ fake_compute_api)
+ self.stubs.Set(nova.compute.api.ComputeAPI, 'unpause',
+ fake_compute_api)
+ self.allow_admin = FLAGS.allow_admin_api
def tearDown(self):
self.stubs.UnsetAll()
+ FLAGS.allow_admin_api = self.allow_admin
def test_get_server_by_id(self):
req = webob.Request.blank('/v1.0/servers/1')
@@ -211,6 +222,30 @@ class ServersTest(unittest.TestCase):
self.assertEqual(s['imageId'], 10)
i += 1
+ def test_server_pause(self):
+ FLAGS.allow_admin_api = True
+ body = dict(server=dict(
+ name='server_test', imageId=2, flavorId=2, metadata={},
+ personality={}))
+ req = webob.Request.blank('/v1.0/servers/1/pause')
+ req.method = 'POST'
+ req.content_type = 'application/json'
+ req.body = json.dumps(body)
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 202)
+
+ def test_server_unpause(self):
+ FLAGS.allow_admin_api = True
+ body = dict(server=dict(
+ name='server_test', imageId=2, flavorId=2, metadata={},
+ personality={}))
+ req = webob.Request.blank('/v1.0/servers/1/unpause')
+ req.method = 'POST'
+ req.content_type = 'application/json'
+ req.body = json.dumps(body)
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 202)
+
def test_server_reboot(self):
body = dict(server=dict(
name='server_test', imageId=2, flavorId=2, metadata={},
diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py
index fe891beee..15d40bc53 100644
--- a/nova/tests/auth_unittest.py
+++ b/nova/tests/auth_unittest.py
@@ -208,17 +208,13 @@ class AuthManagerTestCase(object):
# so it probably belongs in crypto_unittest
# but I'm leaving it where I found it.
with user_and_project_generator(self.manager) as (user, project):
- # NOTE(todd): Should mention why we must setup controller first
- # (somebody please clue me in)
- cloud_controller = cloud.CloudController()
- cloud_controller.setup()
- _key, cert_str = self.manager._generate_x509_cert('test1',
- 'testproj')
+ # NOTE(vish): Setup runs genroot.sh if it hasn't been run
+ cloud.CloudController().setup()
+ _key, cert_str = crypto.generate_x509_cert(user.id, project.id)
logging.debug(cert_str)
- # Need to verify that it's signed by the right intermediate CA
- full_chain = crypto.fetch_ca(project_id='testproj', chain=True)
- int_cert = crypto.fetch_ca(project_id='testproj', chain=False)
+ full_chain = crypto.fetch_ca(project_id=project.id, chain=True)
+ int_cert = crypto.fetch_ca(project_id=project.id, chain=False)
cloud_cert = crypto.fetch_ca()
logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
signed_cert = X509.load_cert_string(cert_str)
@@ -227,7 +223,8 @@ class AuthManagerTestCase(object):
cloud_cert = X509.load_cert_string(cloud_cert)
self.assertTrue(signed_cert.verify(chain_cert.get_pubkey()))
self.assertTrue(signed_cert.verify(int_cert.get_pubkey()))
- if not FLAGS.use_intermediate_ca:
+
+ if not FLAGS.use_project_ca:
self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey()))
else:
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
@@ -326,24 +323,20 @@ class AuthManagerTestCase(object):
self.assertTrue(user.is_admin())
-class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
+class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
def __init__(self, *args, **kwargs):
AuthManagerTestCase.__init__(self)
- test.TrialTestCase.__init__(self, *args, **kwargs)
+ test.TestCase.__init__(self, *args, **kwargs)
import nova.auth.fakeldap as fakeldap
- FLAGS.redis_db = 8
if FLAGS.flush_db:
- logging.info("Flushing redis datastore")
- try:
- r = fakeldap.Redis.instance()
- r.flushdb()
- except:
- self.skip = True
+ logging.info("Flushing datastore")
+ r = fakeldap.Store.instance()
+ r.flushdb()
-class AuthManagerDbTestCase(AuthManagerTestCase, test.TrialTestCase):
+class AuthManagerDbTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver'
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py
index 9886a2449..70d2c44da 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/cloud_unittest.py
@@ -22,22 +22,18 @@ import logging
from M2Crypto import BIO
from M2Crypto import RSA
import os
-import StringIO
import tempfile
import time
from eventlet import greenthread
-from twisted.internet import defer
-import unittest
-from xml.etree import ElementTree
from nova import context
from nova import crypto
from nova import db
from nova import flags
from nova import rpc
+from nova import service
from nova import test
-from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
@@ -53,10 +49,11 @@ IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
os.makedirs(IMAGES_PATH)
-class CloudTestCase(test.TrialTestCase):
+class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
- self.flags(connection_type='fake', images_path=IMAGES_PATH)
+ self.flags(connection_type='fake',
+ images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG)
@@ -64,27 +61,23 @@ class CloudTestCase(test.TrialTestCase):
# set up our cloud
self.cloud = cloud.CloudController()
- # set up a service
- self.compute = utils.import_object(FLAGS.compute_manager)
- self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.compute_topic,
- proxy=self.compute)
- self.compute_consumer.attach_to_eventlet()
- self.network = utils.import_object(FLAGS.network_manager)
- self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.network_topic,
- proxy=self.network)
- self.network_consumer.attach_to_eventlet()
+ # set up services
+ self.compute = service.Service.create(binary='nova-compute')
+ self.compute.start()
+ self.network = service.Service.create(binary='nova-network')
+ self.network.start()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
- project=self.project)
+ project=self.project)
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+ self.compute.kill()
+ self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -111,12 +104,13 @@ class CloudTestCase(test.TrialTestCase):
{'address': address,
'host': FLAGS.host})
self.cloud.allocate_address(self.context)
- inst = db.instance_create(self.context, {})
+ inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
+ greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
@@ -126,6 +120,19 @@ class CloudTestCase(test.TrialTestCase):
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
+ def test_describe_volumes(self):
+ """Makes sure describe_volumes works and filters results."""
+ vol1 = db.volume_create(self.context, {})
+ vol2 = db.volume_create(self.context, {})
+ result = self.cloud.describe_volumes(self.context)
+ self.assertEqual(len(result['volumeSet']), 2)
+ result = self.cloud.describe_volumes(self.context,
+ volume_id=[vol2['ec2_id']])
+ self.assertEqual(len(result['volumeSet']), 1)
+ self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['ec2_id'])
+ db.volume_destroy(self.context, vol1['id'])
+ db.volume_destroy(self.context, vol2['id'])
+
def test_console_output(self):
image_id = FLAGS.default_image
instance_type = FLAGS.default_instance_type
@@ -186,7 +193,7 @@ class CloudTestCase(test.TrialTestCase):
logging.debug("Need to watch instance %s until it's running..." %
instance['instance_id'])
while True:
- rv = yield defer.succeed(time.sleep(1))
+ greenthread.sleep(1)
info = self.cloud._get_instance(instance['instance_id'])
logging.debug(info['state'])
if info['state'] == power_state.RUNNING:
diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py
index 6f3ef96cb..348bb3351 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -22,8 +22,6 @@ Tests For Compute
import datetime
import logging
-from twisted.internet import defer
-
from nova import context
from nova import db
from nova import exception
@@ -33,15 +31,17 @@ from nova import utils
from nova.auth import manager
from nova.compute import api as compute_api
+
FLAGS = flags.FLAGS
-class ComputeTestCase(test.TrialTestCase):
+class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(ComputeTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
@@ -94,24 +94,22 @@ class ComputeTestCase(test.TrialTestCase):
db.security_group_destroy(self.context, group['id'])
db.instance_destroy(self.context, ref[0]['id'])
- @defer.inlineCallbacks
def test_run_terminate(self):
"""Make sure it is possible to run and terminate instance"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("Running instances: %s", instances)
self.assertEqual(len(instances), 1)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("After terminating instances: %s", instances)
self.assertEqual(len(instances), 0)
- @defer.inlineCallbacks
def test_run_terminate_timestamps(self):
"""Make sure timestamps are set for launched and destroyed"""
instance_id = self._create_instance()
@@ -119,42 +117,48 @@ class ComputeTestCase(test.TrialTestCase):
self.assertEqual(instance_ref['launched_at'], None)
self.assertEqual(instance_ref['deleted_at'], None)
launch = datetime.datetime.utcnow()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] > launch)
self.assertEqual(instance_ref['deleted_at'], None)
terminate = datetime.datetime.utcnow()
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
self.context = self.context.elevated(True)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
- @defer.inlineCallbacks
+ def test_pause(self):
+ """Ensure instance can be paused"""
+ instance_id = self._create_instance()
+ self.compute.run_instance(self.context, instance_id)
+ self.compute.pause_instance(self.context, instance_id)
+ self.compute.unpause_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
+
def test_reboot(self):
"""Ensure instance can be rebooted"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
- yield self.compute.reboot_instance(self.context, instance_id)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
+ self.compute.reboot_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
- @defer.inlineCallbacks
def test_console_output(self):
"""Make sure we can get console output from instance"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
- console = yield self.compute.get_console_output(self.context,
+ console = self.compute.get_console_output(self.context,
instance_id)
self.assert_(console)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
- @defer.inlineCallbacks
def test_run_instance_existing(self):
"""Ensure failure when running an instance that already exists"""
instance_id = self._create_instance()
- yield self.compute.run_instance(self.context, instance_id)
- self.assertFailure(self.compute.run_instance(self.context,
- instance_id),
- exception.Error)
- yield self.compute.terminate_instance(self.context, instance_id)
+ self.compute.run_instance(self.context, instance_id)
+ self.assertRaises(exception.Error,
+ self.compute.run_instance,
+ self.context,
+ instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
diff --git a/nova/tests/flags_unittest.py b/nova/tests/flags_unittest.py
index b97df075d..707300fcf 100644
--- a/nova/tests/flags_unittest.py
+++ b/nova/tests/flags_unittest.py
@@ -24,7 +24,7 @@ FLAGS = flags.FLAGS
flags.DEFINE_string('flags_unittest', 'foo', 'for testing purposes only')
-class FlagsTestCase(test.TrialTestCase):
+class FlagsTestCase(test.TestCase):
def setUp(self):
super(FlagsTestCase, self).setUp()
diff --git a/nova/tests/middleware_unittest.py b/nova/tests/middleware_unittest.py
new file mode 100644
index 000000000..0febf52d6
--- /dev/null
+++ b/nova/tests/middleware_unittest.py
@@ -0,0 +1,86 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import webob
+import webob.dec
+import webob.exc
+
+from nova.api import ec2
+from nova import flags
+from nova import test
+from nova import utils
+
+
+FLAGS = flags.FLAGS
+
+
+@webob.dec.wsgify
+def conditional_forbid(req):
+ """Helper wsgi app returns 403 if param 'die' is 1."""
+ if 'die' in req.params and req.params['die'] == '1':
+ raise webob.exc.HTTPForbidden()
+ return 'OK'
+
+
+class LockoutTestCase(test.TrialTestCase):
+ """Test case for the Lockout middleware."""
+ def setUp(self): # pylint: disable-msg=C0103
+ super(LockoutTestCase, self).setUp()
+ utils.set_time_override()
+ self.lockout = ec2.Lockout(conditional_forbid)
+
+ def tearDown(self): # pylint: disable-msg=C0103
+ utils.clear_time_override()
+ super(LockoutTestCase, self).tearDown()
+
+ def _send_bad_attempts(self, access_key, num_attempts=1):
+ """Fail x."""
+ for i in xrange(num_attempts):
+ req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key)
+ self.assertEqual(req.get_response(self.lockout).status_int, 403)
+
+ def _is_locked_out(self, access_key):
+ """Sends a test request to see if key is locked out."""
+ req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key)
+ return (req.get_response(self.lockout).status_int == 403)
+
+ def test_lockout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test'))
+
+ def test_timeout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test'))
+ utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
+ self.assertFalse(self._is_locked_out('test'))
+
+ def test_multiple_keys(self):
+ self._send_bad_attempts('test1', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test1'))
+ self.assertFalse(self._is_locked_out('test2'))
+ utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
+ self.assertFalse(self._is_locked_out('test1'))
+ self.assertFalse(self._is_locked_out('test2'))
+
+ def test_window_timeout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
+ self.assertFalse(self._is_locked_out('test'))
+ utils.advance_time_seconds(FLAGS.lockout_window * 60)
+ self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
+ self.assertFalse(self._is_locked_out('test'))
diff --git a/nova/tests/misc_unittest.py b/nova/tests/misc_unittest.py
index 667c63ad0..3d947427a 100644
--- a/nova/tests/misc_unittest.py
+++ b/nova/tests/misc_unittest.py
@@ -20,7 +20,7 @@ from nova import test
from nova.utils import parse_mailmap, str_dict_replace
-class ProjectTestCase(test.TrialTestCase):
+class ProjectTestCase(test.TestCase):
def test_authors_up_to_date(self):
if os.path.exists('../.bzr'):
contributors = set()
@@ -30,23 +30,26 @@ class ProjectTestCase(test.TrialTestCase):
import bzrlib.workingtree
tree = bzrlib.workingtree.WorkingTree.open('..')
tree.lock_read()
- parents = tree.get_parent_ids()
- g = tree.branch.repository.get_graph()
- for p in parents[1:]:
- rev_ids = [r for r, _ in g.iter_ancestry(parents)
- if r != "null:"]
- revs = tree.branch.repository.get_revisions(rev_ids)
- for r in revs:
- for author in r.get_apparent_authors():
- email = author.split(' ')[-1]
- contributors.add(str_dict_replace(email, mailmap))
-
- authors_file = open('../Authors', 'r').read()
-
- missing = set()
- for contributor in contributors:
- if not contributor in authors_file:
- missing.add(contributor)
-
- self.assertTrue(len(missing) == 0,
- '%r not listed in Authors' % missing)
+ try:
+ parents = tree.get_parent_ids()
+ g = tree.branch.repository.get_graph()
+ for p in parents[1:]:
+ rev_ids = [r for r, _ in g.iter_ancestry(parents)
+ if r != "null:"]
+ revs = tree.branch.repository.get_revisions(rev_ids)
+ for r in revs:
+ for author in r.get_apparent_authors():
+ email = author.split(' ')[-1]
+ contributors.add(str_dict_replace(email, mailmap))
+
+ authors_file = open('../Authors', 'r').read()
+
+ missing = set()
+ for contributor in contributors:
+ if not contributor in authors_file:
+ missing.add(contributor)
+
+ self.assertTrue(len(missing) == 0,
+ '%r not listed in Authors' % missing)
+ finally:
+ tree.unlock()
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index 6f4705719..96473ac7c 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -26,6 +26,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
+from nova import service
from nova import test
from nova import utils
from nova.auth import manager
@@ -33,13 +34,14 @@ from nova.auth import manager
FLAGS = flags.FLAGS
-class NetworkTestCase(test.TrialTestCase):
+class NetworkTestCase(test.TestCase):
"""Test cases for network code"""
def setUp(self):
super(NetworkTestCase, self).setUp()
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
+ fake_call=True,
fake_network=True,
network_size=16,
num_networks=5)
@@ -56,16 +58,13 @@ class NetworkTestCase(test.TrialTestCase):
# create the necessary network data for the project
user_context = context.RequestContext(project=self.projects[i],
user=self.user)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
instance_ref = self._create_instance(0)
self.instance_id = instance_ref['id']
instance_ref = self._create_instance(1)
self.instance2_id = instance_ref['id']
def tearDown(self):
- super(NetworkTestCase, self).tearDown()
# TODO(termie): this should really be instantiating clean datastores
# in between runs, one failure kills all the tests
db.instance_destroy(context.get_admin_context(), self.instance_id)
@@ -73,6 +72,7 @@ class NetworkTestCase(test.TrialTestCase):
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
+ super(NetworkTestCase, self).tearDown()
def _create_instance(self, project_num, mac=None):
if not mac:
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
index 061799923..ceac17adb 100644
--- a/nova/tests/objectstore_unittest.py
+++ b/nova/tests/objectstore_unittest.py
@@ -54,7 +54,7 @@ os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
-class ObjectStoreTestCase(test.TrialTestCase):
+class ObjectStoreTestCase(test.TestCase):
"""Test objectstore API directly."""
def setUp(self):
@@ -191,7 +191,7 @@ class TestSite(server.Site):
protocol = TestHTTPChannel
-class S3APITestCase(test.TrialTestCase):
+class S3APITestCase(test.TestCase):
"""Test objectstore through S3 API."""
def setUp(self):
diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py
deleted file mode 100644
index 67245af03..000000000
--- a/nova/tests/process_unittest.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-from twisted.internet import defer
-from twisted.internet import reactor
-from xml.etree import ElementTree
-
-from nova import exception
-from nova import flags
-from nova import process
-from nova import test
-from nova import utils
-
-FLAGS = flags.FLAGS
-
-
-class ProcessTestCase(test.TrialTestCase):
- def setUp(self):
- logging.getLogger().setLevel(logging.DEBUG)
- super(ProcessTestCase, self).setUp()
-
- def test_execute_stdout(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('echo test')
-
- def _check(rv):
- self.assertEqual(rv[0], 'test\n')
- self.assertEqual(rv[1], '')
-
- d.addCallback(_check)
- d.addErrback(self.fail)
- return d
-
- def test_execute_stderr(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('cat BAD_FILE', check_exit_code=False)
-
- def _check(rv):
- self.assertEqual(rv[0], '')
- self.assert_('No such file' in rv[1])
-
- d.addCallback(_check)
- d.addErrback(self.fail)
- return d
-
- def test_execute_unexpected_stderr(self):
- pool = process.ProcessPool(2)
- d = pool.simple_execute('cat BAD_FILE')
- d.addCallback(lambda x: self.fail('should have raised an error'))
- d.addErrback(lambda failure: failure.trap(IOError))
- return d
-
- def test_max_processes(self):
- pool = process.ProcessPool(2)
- d1 = pool.simple_execute('sleep 0.01')
- d2 = pool.simple_execute('sleep 0.01')
- d3 = pool.simple_execute('sleep 0.005')
- d4 = pool.simple_execute('sleep 0.005')
-
- called = []
-
- def _called(rv, name):
- called.append(name)
-
- d1.addCallback(_called, 'd1')
- d2.addCallback(_called, 'd2')
- d3.addCallback(_called, 'd3')
- d4.addCallback(_called, 'd4')
-
- # Make sure that d3 and d4 had to wait on the other two and were called
- # in order
- # NOTE(termie): there may be a race condition in this test if for some
- # reason one of the sleeps takes longer to complete
- # than it should
- d4.addCallback(lambda x: self.assertEqual(called[2], 'd3'))
- d4.addCallback(lambda x: self.assertEqual(called[3], 'd4'))
- d4.addErrback(self.fail)
- return d4
-
- def test_kill_long_process(self):
- pool = process.ProcessPool(2)
-
- d1 = pool.simple_execute('sleep 1')
- d2 = pool.simple_execute('sleep 0.005')
-
- timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
-
- # kill d1 and wait on it to end then cancel the timeout
- d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
- d2.addCallback(lambda _: d1)
- d2.addBoth(lambda _: timeout.active() and timeout.cancel())
- d2.addErrback(self.fail)
- return d2
-
- def test_process_exit_is_contained(self):
- pool = process.ProcessPool(2)
-
- d1 = pool.simple_execute('sleep 1')
- d1.addCallback(lambda x: self.fail('should have errbacked'))
- d1.addErrback(lambda fail: fail.trap(IOError))
- reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
-
- return d1
-
- def test_shared_pool_is_singleton(self):
- pool1 = process.SharedPool()
- pool2 = process.SharedPool()
- self.assertEqual(id(pool1._instance), id(pool2._instance))
-
- def test_shared_pool_works_as_singleton(self):
- d1 = process.simple_execute('sleep 1')
- d2 = process.simple_execute('sleep 0.005')
- # lp609749: would have failed with
- # exceptions.AssertionError: Someone released me too many times:
- # too many tokens!
- return d1
diff --git a/nova/tests/quota_unittest.py b/nova/tests/quota_unittest.py
index 1966b51f7..8cf2a5e54 100644
--- a/nova/tests/quota_unittest.py
+++ b/nova/tests/quota_unittest.py
@@ -32,7 +32,7 @@ from nova.api.ec2 import cloud
FLAGS = flags.FLAGS
-class QuotaTestCase(test.TrialTestCase):
+class QuotaTestCase(test.TestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(QuotaTestCase, self).setUp()
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py
index f35b65a39..6ea2edcab 100644
--- a/nova/tests/rpc_unittest.py
+++ b/nova/tests/rpc_unittest.py
@@ -20,8 +20,6 @@ Unit Tests for remote procedure calls using queue
"""
import logging
-from twisted.internet import defer
-
from nova import context
from nova import flags
from nova import rpc
@@ -31,32 +29,31 @@ from nova import test
FLAGS = flags.FLAGS
-class RpcTestCase(test.TrialTestCase):
+class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
- self.consumer.attach_to_twisted()
+ self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_call_succeed(self):
"""Get a value through rpc call"""
value = 42
- result = yield rpc.call_twisted(self.context,
- 'test', {"method": "echo",
+ result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
value = 42
- result = yield rpc.call_twisted(self.context,
- 'test', {"method": "context",
- "args": {"value": value}})
+ result = rpc.call(self.context,
+ 'test', {"method": "context",
+ "args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
@@ -67,18 +64,48 @@ class RpcTestCase(test.TrialTestCase):
to an int in the test.
"""
value = 42
- self.assertFailure(rpc.call_twisted(self.context, 'test',
- {"method": "fail",
- "args": {"value": value}}),
- rpc.RemoteError)
+ self.assertRaises(rpc.RemoteError,
+ rpc.call,
+ self.context,
+ 'test',
+ {"method": "fail",
+ "args": {"value": value}})
try:
- yield rpc.call_twisted(self.context,
- 'test', {"method": "fail",
- "args": {"value": value}})
+ rpc.call(self.context,
+ 'test',
+ {"method": "fail",
+ "args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
+ def test_nested_calls(self):
+ """Test that we can do an rpc.call inside another call"""
+ class Nested(object):
+ @staticmethod
+ def echo(context, queue, value):
+ """Calls echo in the passed queue"""
+ logging.debug("Nested received %s, %s", queue, value)
+ ret = rpc.call(context,
+ queue,
+ {"method": "echo",
+ "args": {"value": value}})
+ logging.debug("Nested return %s", ret)
+ return value
+
+ nested = Nested()
+ conn = rpc.Connection.instance(True)
+ consumer = rpc.AdapterConsumer(connection=conn,
+ topic='nested',
+ proxy=nested)
+ consumer.attach_to_eventlet()
+ value = 42
+ result = rpc.call(self.context,
+ 'nested', {"method": "echo",
+ "args": {"queue": "test",
+ "value": value}})
+ self.assertEqual(value, result)
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call
@@ -89,13 +116,13 @@ class TestReceiver(object):
def echo(context, value):
"""Simply returns whatever value is sent in"""
logging.debug("Received %s", value)
- return defer.succeed(value)
+ return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context"""
logging.debug("Received %s", context)
- return defer.succeed(context.to_dict())
+ return context.to_dict()
@staticmethod
def fail(context, value):
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py
index cb5fe6b9c..df5e7afa5 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/scheduler_unittest.py
@@ -44,7 +44,7 @@ class TestDriver(driver.Scheduler):
return 'named_host'
-class SchedulerTestCase(test.TrialTestCase):
+class SchedulerTestCase(test.TestCase):
"""Test case for scheduler"""
def setUp(self):
super(SchedulerTestCase, self).setUp()
@@ -73,11 +73,12 @@ class SchedulerTestCase(test.TrialTestCase):
scheduler.named_method(ctxt, 'topic', num=7)
-class SimpleDriverTestCase(test.TrialTestCase):
+class SimpleDriverTestCase(test.TestCase):
"""Test case for simple driver"""
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
max_cores=4,
max_gigabytes=4,
network_manager='nova.network.manager.FlatManager',
@@ -122,12 +123,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(len(hosts), 2)
compute1.kill()
@@ -139,12 +140,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance()
@@ -162,12 +163,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute1.startService()
+ compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
- compute2.startService()
+ compute2.start()
instance_ids1 = []
instance_ids2 = []
for index in xrange(FLAGS.max_cores):
@@ -195,12 +196,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume1.startService()
+ volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume2.startService()
+ volume2.start()
volume_id1 = self._create_volume()
volume1.create_volume(self.context, volume_id1)
volume_id2 = self._create_volume()
@@ -218,12 +219,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume1.startService()
+ volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
- volume2.startService()
+ volume2.start()
volume_ids1 = []
volume_ids2 = []
for index in xrange(FLAGS.max_gigabytes):
diff --git a/nova/tests/service_unittest.py b/nova/tests/service_unittest.py
index a268bc4fe..47c092f8e 100644
--- a/nova/tests/service_unittest.py
+++ b/nova/tests/service_unittest.py
@@ -22,9 +22,6 @@ Unit Tests for remote procedure calls using queue
import mox
-from twisted.application.app import startApplication
-from twisted.internet import defer
-
from nova import exception
from nova import flags
from nova import rpc
@@ -48,7 +45,7 @@ class ExtendedService(service.Service):
return 'service'
-class ServiceManagerTestCase(test.TrialTestCase):
+class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services"""
def test_attribute_error_for_no_manager(self):
@@ -63,7 +60,7 @@ class ServiceManagerTestCase(test.TrialTestCase):
'test',
'test',
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
self.assertEqual(serv.test_method(), 'manager')
def test_override_manager_method(self):
@@ -71,11 +68,11 @@ class ServiceManagerTestCase(test.TrialTestCase):
'test',
'test',
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
self.assertEqual(serv.test_method(), 'service')
-class ServiceTestCase(test.TrialTestCase):
+class ServiceTestCase(test.TestCase):
"""Test cases for Services"""
def setUp(self):
@@ -94,8 +91,6 @@ class ServiceTestCase(test.TrialTestCase):
self.mox.StubOutWithMock(rpc,
'AdapterConsumer',
use_mock_anything=True)
- self.mox.StubOutWithMock(
- service.task, 'LoopingCall', use_mock_anything=True)
rpc.AdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@@ -106,19 +101,8 @@ class ServiceTestCase(test.TrialTestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.AdapterConsumer)
- rpc.AdapterConsumer.attach_to_twisted()
- rpc.AdapterConsumer.attach_to_twisted()
-
- # Stub out looping call a bit needlessly since we don't have an easy
- # way to cancel it (yet) when the tests finishes
- service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
- service.task.LoopingCall)
- service.task.LoopingCall.start(interval=mox.IgnoreArg(),
- now=mox.IgnoreArg())
- service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
- service.task.LoopingCall)
- service.task.LoopingCall.start(interval=mox.IgnoreArg(),
- now=mox.IgnoreArg())
+ rpc.AdapterConsumer.attach_to_eventlet()
+ rpc.AdapterConsumer.attach_to_eventlet()
service_create = {'host': host,
'binary': binary,
@@ -136,14 +120,14 @@ class ServiceTestCase(test.TrialTestCase):
service_create).AndReturn(service_ref)
self.mox.ReplayAll()
- startApplication(app, False)
+ app.start()
+ app.stop()
self.assert_(app)
# We're testing sort of weird behavior in how report_state decides
# whether it is disconnected, it looks for a variable on itself called
# 'model_disconnected' and report_state doesn't really do much so this
# these are mostly just for coverage
- @defer.inlineCallbacks
def test_report_state_no_service(self):
host = 'foo'
binary = 'bar'
@@ -173,10 +157,9 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
- yield serv.report_state()
+ serv.start()
+ serv.report_state()
- @defer.inlineCallbacks
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
@@ -204,11 +187,10 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
- yield serv.report_state()
+ serv.start()
+ serv.report_state()
self.assert_(serv.model_disconnected)
- @defer.inlineCallbacks
def test_report_state_newly_connected(self):
host = 'foo'
binary = 'bar'
@@ -238,8 +220,8 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
- serv.startService()
+ serv.start()
serv.model_disconnected = True
- yield serv.report_state()
+ serv.report_state()
self.assert_(not serv.model_disconnected)
diff --git a/nova/tests/validator_unittest.py b/nova/tests/validator_unittest.py
deleted file mode 100644
index b5f1c0667..000000000
--- a/nova/tests/validator_unittest.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import unittest
-
-from nova import flags
-from nova import test
-from nova import validate
-
-
-class ValidationTestCase(test.TrialTestCase):
- def setUp(self):
- super(ValidationTestCase, self).setUp()
-
- def tearDown(self):
- super(ValidationTestCase, self).tearDown()
-
- def test_type_validation(self):
- self.assertTrue(type_case("foo", 5, 1))
- self.assertRaises(TypeError, type_case, "bar", "5", 1)
- self.assertRaises(TypeError, type_case, None, 5, 1)
-
-
-@validate.typetest(instanceid=str, size=int, number_of_instances=int)
-def type_case(instanceid, size, number_of_instances):
- return True
diff --git a/nova/tests/virt_unittest.py b/nova/tests/virt_unittest.py
index d49383fb7..9ad009510 100644
--- a/nova/tests/virt_unittest.py
+++ b/nova/tests/virt_unittest.py
@@ -30,9 +30,10 @@ FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
-class LibvirtConnTestCase(test.TrialTestCase):
+class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
+ self.flags(fake_call=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
@@ -40,33 +41,66 @@ class LibvirtConnTestCase(test.TrialTestCase):
self.network = utils.import_object(FLAGS.network_manager)
FLAGS.instances_path = ''
- def test_get_uri_and_template(self):
- ip = '10.11.12.13'
-
- instance = {'internal_id': 1,
- 'memory_kb': '1024000',
- 'basepath': '/some/path',
- 'bridge_name': 'br100',
- 'mac_address': '02:12:34:46:56:67',
- 'vcpus': 2,
- 'project_id': 'fake',
- 'bridge': 'br101',
- 'instance_type': 'm1.small'}
-
+ test_ip = '10.11.12.13'
+ test_instance = {'memory_kb': '1024000',
+ 'basepath': '/some/path',
+ 'bridge_name': 'br100',
+ 'mac_address': '02:12:34:46:56:67',
+ 'vcpus': 2,
+ 'project_id': 'fake',
+ 'bridge': 'br101',
+ 'instance_type': 'm1.small'}
+
+ def test_xml_and_uri_no_ramdisk_no_kernel(self):
+ instance_data = dict(self.test_instance)
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=False, expect_ramdisk=False)
+
+ def test_xml_and_uri_no_ramdisk(self):
+ instance_data = dict(self.test_instance)
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=False)
+
+ def test_xml_and_uri_no_kernel(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=False, expect_ramdisk=False)
+
+ def test_xml_and_uri(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=True)
+
+ def test_xml_and_uri_rescue(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=True,
+ rescue=True)
+
+ def do_test_xml_and_uri(self, instance,
+ expect_ramdisk, expect_kernel,
+ rescue=False):
user_context = context.RequestContext(project=self.project,
user=self.user)
instance_ref = db.instance_create(user_context, instance)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
+ network_ref = db.project_get_network(context.get_admin_context(),
+ self.project.id)
- fixed_ip = {'address': ip,
+ fixed_ip = {'address': self.test_ip,
'network_id': network_ref['id']}
ctxt = context.get_admin_context()
fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
- db.fixed_ip_update(ctxt, ip, {'allocated': True,
- 'instance_id': instance_ref['id']})
+ db.fixed_ip_update(ctxt, self.test_ip,
+ {'allocated': True,
+ 'instance_id': instance_ref['id']})
type_uri_map = {'qemu': ('qemu:///system',
[(lambda t: t.find('.').get('type'), 'qemu'),
@@ -78,23 +112,73 @@ class LibvirtConnTestCase(test.TrialTestCase):
(lambda t: t.find('./devices/emulator'), None)]),
'uml': ('uml:///system',
[(lambda t: t.find('.').get('type'), 'uml'),
- (lambda t: t.find('./os/type').text, 'uml')])}
+ (lambda t: t.find('./os/type').text, 'uml')]),
+ 'xen': ('xen:///',
+ [(lambda t: t.find('.').get('type'), 'xen'),
+ (lambda t: t.find('./os/type').text, 'linux')]),
+ }
+
+ for hypervisor_type in ['qemu', 'kvm', 'xen']:
+ check_list = type_uri_map[hypervisor_type][1]
+
+ if rescue:
+ check = (lambda t: t.find('./os/kernel').text.split('/')[1],
+ 'rescue-kernel')
+ check_list.append(check)
+ check = (lambda t: t.find('./os/initrd').text.split('/')[1],
+ 'rescue-ramdisk')
+ check_list.append(check)
+ else:
+ if expect_kernel:
+ check = (lambda t: t.find('./os/kernel').text.split(
+ '/')[1], 'kernel')
+ else:
+ check = (lambda t: t.find('./os/kernel'), None)
+ check_list.append(check)
+
+ if expect_ramdisk:
+ check = (lambda t: t.find('./os/initrd').text.split(
+ '/')[1], 'ramdisk')
+ else:
+ check = (lambda t: t.find('./os/initrd'), None)
+ check_list.append(check)
common_checks = [
(lambda t: t.find('.').tag, 'domain'),
- (lambda t: t.find('./devices/interface/filterref/parameter').\
- get('name'), 'IP'),
- (lambda t: t.find('./devices/interface/filterref/parameter').\
- get('value'), '10.11.12.13')]
+ (lambda t: t.find(
+ './devices/interface/filterref/parameter').get('name'), 'IP'),
+ (lambda t: t.find(
+ './devices/interface/filterref/parameter').get(
+ 'value'), '10.11.12.13'),
+ (lambda t: t.findall(
+ './devices/interface/filterref/parameter')[1].get(
+ 'name'), 'DHCPSERVER'),
+ (lambda t: t.findall(
+ './devices/interface/filterref/parameter')[1].get(
+ 'value'), '10.0.0.1'),
+ (lambda t: t.find('./devices/serial/source').get(
+ 'path').split('/')[1], 'console.log'),
+ (lambda t: t.find('./memory').text, '2097152')]
+
+ if rescue:
+ common_checks += [
+ (lambda t: t.findall('./devices/disk/source')[0].get(
+ 'file').split('/')[1], 'rescue-disk'),
+ (lambda t: t.findall('./devices/disk/source')[1].get(
+ 'file').split('/')[1], 'disk')]
+ else:
+ common_checks += [(lambda t: t.findall(
+ './devices/disk/source')[0].get('file').split('/')[1],
+ 'disk')]
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
- uri, _template, _rescue = conn.get_uri_and_templates()
+ uri = conn.get_uri()
self.assertEquals(uri, expected_uri)
- xml = conn.to_xml(instance_ref)
+ xml = conn.to_xml(instance_ref, rescue)
tree = xml_to_tree(xml)
for i, (check, expected_result) in enumerate(checks):
self.assertEqual(check(tree),
@@ -106,6 +190,9 @@ class LibvirtConnTestCase(test.TrialTestCase):
expected_result,
'%s failed common check %d' % (xml, i))
+ # This test is supposed to make sure we don't override a specifically
+ # set uri
+ #
# Deliberately not just assigning this string to FLAGS.libvirt_uri and
# checking against that later on. This way we make sure the
# implementation doesn't fiddle around with the FLAGS.
@@ -114,7 +201,7 @@ class LibvirtConnTestCase(test.TrialTestCase):
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
- uri, _template, _rescue = conn.get_uri_and_templates()
+ uri = conn.get_uri()
self.assertEquals(uri, testuri)
def tearDown(self):
@@ -123,7 +210,7 @@ class LibvirtConnTestCase(test.TrialTestCase):
self.manager.delete_user(self.user)
-class NWFilterTestCase(test.TrialTestCase):
+class NWFilterTestCase(test.TestCase):
def setUp(self):
super(NWFilterTestCase, self).setUp()
@@ -235,7 +322,7 @@ class NWFilterTestCase(test.TrialTestCase):
'project_id': 'fake'})
inst_id = instance_ref['id']
- def _ensure_all_called(_):
+ def _ensure_all_called():
instance_filter = 'nova-instance-%s' % instance_ref['name']
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
@@ -252,8 +339,7 @@ class NWFilterTestCase(test.TrialTestCase):
self.security_group.id)
instance = db.instance_get(self.context, inst_id)
- d = self.fw.setup_nwfilters_for_instance(instance)
- d.addCallback(_ensure_all_called)
- d.addCallback(lambda _: self.teardown_security_group())
-
- return d
+ self.fw.setup_base_nwfilters()
+ self.fw.setup_nwfilters_for_instance(instance)
+ _ensure_all_called()
+ self.teardown_security_group()
diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py
index 12321a96f..b13455fb0 100644
--- a/nova/tests/volume_unittest.py
+++ b/nova/tests/volume_unittest.py
@@ -21,8 +21,6 @@ Tests for Volume Code.
"""
import logging
-from twisted.internet import defer
-
from nova import context
from nova import exception
from nova import db
@@ -33,7 +31,7 @@ from nova import utils
FLAGS = flags.FLAGS
-class VolumeTestCase(test.TrialTestCase):
+class VolumeTestCase(test.TestCase):
"""Test Case for volumes."""
def setUp(self):
@@ -56,51 +54,48 @@ class VolumeTestCase(test.TrialTestCase):
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)['id']
- @defer.inlineCallbacks
def test_create_delete_volume(self):
"""Test volume can be created and deleted."""
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
- @defer.inlineCallbacks
def test_too_big_volume(self):
"""Ensure failure if a too large of a volume is requested."""
# FIXME(vish): validation needs to move into the data layer in
# volume_create
- defer.returnValue(True)
+ return True
try:
volume_id = self._create_volume('1001')
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
self.fail("Should have thrown TypeError")
except TypeError:
pass
- @defer.inlineCallbacks
def test_too_many_volumes(self):
"""Ensure that NoMoreTargets is raised when we run out of volumes."""
vols = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
vols.append(volume_id)
volume_id = self._create_volume()
- self.assertFailure(self.volume.create_volume(self.context,
- volume_id),
- db.NoMoreTargets)
+ self.assertRaises(db.NoMoreTargets,
+ self.volume.create_volume,
+ self.context,
+ volume_id)
db.volume_destroy(context.get_admin_context(), volume_id)
for volume_id in vols:
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
- @defer.inlineCallbacks
def test_run_attach_detach_volume(self):
"""Make sure volume can be attached and detached from instance."""
inst = {}
@@ -115,15 +110,15 @@ class VolumeTestCase(test.TrialTestCase):
instance_id = db.instance_create(self.context, inst)['id']
mountpoint = "/dev/sdf"
volume_id = self._create_volume()
- yield self.volume.create_volume(self.context, volume_id)
+ self.volume.create_volume(self.context, volume_id)
if FLAGS.fake_tests:
db.volume_attached(self.context, volume_id, instance_id,
mountpoint)
else:
- yield self.compute.attach_volume(self.context,
- instance_id,
- volume_id,
- mountpoint)
+ self.compute.attach_volume(self.context,
+ instance_id,
+ volume_id,
+ mountpoint)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
@@ -131,25 +126,26 @@ class VolumeTestCase(test.TrialTestCase):
instance_ref = db.volume_get_instance(self.context, volume_id)
self.assertEqual(instance_ref['id'], instance_id)
- self.assertFailure(self.volume.delete_volume(self.context, volume_id),
- exception.Error)
+ self.assertRaises(exception.Error,
+ self.volume.delete_volume,
+ self.context,
+ volume_id)
if FLAGS.fake_tests:
db.volume_detached(self.context, volume_id)
else:
- yield self.compute.detach_volume(self.context,
- instance_id,
- volume_id)
+ self.compute.detach_volume(self.context,
+ instance_id,
+ volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
- yield self.volume.delete_volume(self.context, volume_id)
+ self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.Error,
db.volume_get,
self.context,
volume_id)
db.instance_destroy(self.context, instance_id)
- @defer.inlineCallbacks
def test_concurrent_volumes_get_different_targets(self):
"""Ensure multiple concurrent volumes get different targets."""
volume_ids = []
@@ -164,15 +160,11 @@ class VolumeTestCase(test.TrialTestCase):
self.assert_(iscsi_target not in targets)
targets.append(iscsi_target)
logging.debug("Target %s allocated", iscsi_target)
- deferreds = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
d = self.volume.create_volume(self.context, volume_id)
- d.addCallback(_check)
- d.addErrback(self.fail)
- deferreds.append(d)
- yield defer.DeferredList(deferreds)
+ _check(d)
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)