summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorKei Masumoto <masumotok@nttdata.co.jp>2011-03-01 17:36:24 +0900
committerKei Masumoto <masumotok@nttdata.co.jp>2011-03-01 17:36:24 +0900
commit52285565fb421d34e7efcdb5cf89d10431319f8f (patch)
tree83bff317636b85809811051f8130dccb4d84c8b2 /nova/tests
parent485a6c5a9502679bc5ecf02f8e758170ac0335dc (diff)
parentedf5da85648659b1a7ad105248d69ef9f8c977e4 (diff)
1. merged trunk rev749
2. rpc.call returns '/' as '\/', so nova.compute.manager.mktmpfile, nova.compute.manager.confirm.tmpfile, nova.scheduler.driver.Scheduler.mounted_on_same_shared_storage are modified followed by this changes. 3. nova.tests.test_virt.py is modified so that other teams modification is easily detected since other team is using nova.db.sqlalchemy.models.ComputeService.
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/__init__.py25
-rw-r--r--nova/tests/api/openstack/__init__.py4
-rw-r--r--nova/tests/api/openstack/fakes.py25
-rw-r--r--nova/tests/api/openstack/test_adminapi.py11
-rw-r--r--nova/tests/api/openstack/test_api.py4
-rw-r--r--nova/tests/api/openstack/test_auth.py52
-rw-r--r--nova/tests/api/openstack/test_common.py5
-rw-r--r--nova/tests/api/openstack/test_faults.py4
-rw-r--r--nova/tests/api/openstack/test_flavors.py10
-rw-r--r--nova/tests/api/openstack/test_images.py14
-rw-r--r--nova/tests/api/openstack/test_ratelimiting.py15
-rw-r--r--nova/tests/api/openstack/test_servers.py35
-rw-r--r--nova/tests/api/openstack/test_shared_ip_groups.py7
-rw-r--r--nova/tests/api/openstack/test_zones.py10
-rw-r--r--nova/tests/api/test_wsgi.py6
-rw-r--r--nova/tests/fake_flags.py6
-rw-r--r--nova/tests/objectstore_unittest.py1
-rw-r--r--nova/tests/test_api.py23
-rw-r--r--nova/tests/test_auth.py9
-rw-r--r--nova/tests/test_cloud.py116
-rw-r--r--nova/tests/test_console.py2
-rw-r--r--nova/tests/test_direct.py2
-rw-r--r--nova/tests/test_localization.py1
-rw-r--r--nova/tests/test_log.py63
-rw-r--r--nova/tests/test_network.py11
-rw-r--r--nova/tests/test_quota.py25
-rw-r--r--nova/tests/test_scheduler.py104
-rw-r--r--nova/tests/test_service.py7
-rw-r--r--nova/tests/test_test.py40
-rw-r--r--nova/tests/test_utils.py174
-rw-r--r--nova/tests/test_virt.py442
-rw-r--r--nova/tests/test_xenapi.py1
-rw-r--r--nova/tests/xenapi/stubs.py6
33 files changed, 670 insertions, 590 deletions
diff --git a/nova/tests/__init__.py b/nova/tests/__init__.py
index 592d5bea9..7fba02a93 100644
--- a/nova/tests/__init__.py
+++ b/nova/tests/__init__.py
@@ -37,5 +37,30 @@ setattr(__builtin__, '_', lambda x: x)
def setup():
+ import os
+ import shutil
+
+ from nova import context
+ from nova import flags
from nova.db import migration
+ from nova.network import manager as network_manager
+ from nova.tests import fake_flags
+
+ FLAGS = flags.FLAGS
+
+ testdb = os.path.join(FLAGS.state_path, FLAGS.sqlite_db)
+ if os.path.exists(testdb):
+ os.unlink(testdb)
migration.db_sync()
+ ctxt = context.get_admin_context()
+ network_manager.VlanManager().create_networks(ctxt,
+ FLAGS.fixed_range,
+ FLAGS.num_networks,
+ FLAGS.network_size,
+ FLAGS.fixed_range_v6,
+ FLAGS.vlan_start,
+ FLAGS.vpn_start,
+ )
+
+ cleandb = os.path.join(FLAGS.state_path, FLAGS.sqlite_clean_db)
+ shutil.copyfile(testdb, cleandb)
diff --git a/nova/tests/api/openstack/__init__.py b/nova/tests/api/openstack/__init__.py
index 77b1dd37f..e18120285 100644
--- a/nova/tests/api/openstack/__init__.py
+++ b/nova/tests/api/openstack/__init__.py
@@ -16,7 +16,7 @@
# under the License.
import webob.dec
-import unittest
+from nova import test
from nova import context
from nova import flags
@@ -33,7 +33,7 @@ def simple_wsgi(req):
return ""
-class RateLimitingMiddlewareTest(unittest.TestCase):
+class RateLimitingMiddlewareTest(test.TestCase):
def test_get_action_name(self):
middleware = RateLimitingMiddleware(simple_wsgi)
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index e0b7b8029..49ce8c1b5 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -188,7 +188,11 @@ def stub_out_glance(stubs, initial_fixtures=None):
class FakeToken(object):
+ id = 0
+
def __init__(self, **kwargs):
+ FakeToken.id += 1
+ self.id = FakeToken.id
for k, v in kwargs.iteritems():
setattr(self, k, v)
@@ -203,26 +207,28 @@ class FakeAuthDatabase(object):
data = {}
@staticmethod
- def auth_get_token(context, token_hash):
+ def auth_token_get(context, token_hash):
return FakeAuthDatabase.data.get(token_hash, None)
@staticmethod
- def auth_create_token(context, token):
+ def auth_token_create(context, token):
fake_token = FakeToken(created_at=datetime.datetime.now(), **token)
FakeAuthDatabase.data[fake_token.token_hash] = fake_token
+ FakeAuthDatabase.data['id_%i' % fake_token.id] = fake_token
return fake_token
@staticmethod
- def auth_destroy_token(context, token):
- if token.token_hash in FakeAuthDatabase.data:
- del FakeAuthDatabase.data['token_hash']
+ def auth_token_destroy(context, token_id):
+ token = FakeAuthDatabase.data.get('id_%i' % token_id)
+ if token and token.token_hash in FakeAuthDatabase.data:
+ del FakeAuthDatabase.data[token.token_hash]
+ del FakeAuthDatabase.data['id_%i' % token_id]
class FakeAuthManager(object):
auth_data = {}
- def add_user(self, user):
- key = user.id
+ def add_user(self, key, user):
FakeAuthManager.auth_data[key] = user
def get_user(self, uid):
@@ -235,10 +241,7 @@ class FakeAuthManager(object):
return None
def get_user_from_access_key(self, key):
- for k, v in FakeAuthManager.auth_data.iteritems():
- if v.access == key:
- return v
- return None
+ return FakeAuthManager.auth_data.get(key, None)
class FakeRateLimiter(object):
diff --git a/nova/tests/api/openstack/test_adminapi.py b/nova/tests/api/openstack/test_adminapi.py
index 73120c31d..dfce1b127 100644
--- a/nova/tests/api/openstack/test_adminapi.py
+++ b/nova/tests/api/openstack/test_adminapi.py
@@ -15,13 +15,13 @@
# License for the specific language governing permissions and limitations
# under the License.
-import unittest
import stubout
import webob
from paste import urlmap
from nova import flags
+from nova import test
from nova.api import openstack
from nova.api.openstack import ratelimiting
from nova.api.openstack import auth
@@ -30,9 +30,10 @@ from nova.tests.api.openstack import fakes
FLAGS = flags.FLAGS
-class AdminAPITest(unittest.TestCase):
+class AdminAPITest(test.TestCase):
def setUp(self):
+ super(AdminAPITest, self).setUp()
self.stubs = stubout.StubOutForTesting()
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
@@ -44,6 +45,7 @@ class AdminAPITest(unittest.TestCase):
def tearDown(self):
self.stubs.UnsetAll()
FLAGS.allow_admin_api = self.allow_admin
+ super(AdminAPITest, self).tearDown()
def test_admin_enabled(self):
FLAGS.allow_admin_api = True
@@ -58,8 +60,5 @@ class AdminAPITest(unittest.TestCase):
# We should still be able to access public operations.
req = webob.Request.blank('/v1.0/flavors')
res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status_int, 200)
# TODO: Confirm admin operations are unavailable.
-
-if __name__ == '__main__':
- unittest.main()
+ self.assertEqual(res.status_int, 200)
diff --git a/nova/tests/api/openstack/test_api.py b/nova/tests/api/openstack/test_api.py
index db0fe1060..5112c486f 100644
--- a/nova/tests/api/openstack/test_api.py
+++ b/nova/tests/api/openstack/test_api.py
@@ -15,17 +15,17 @@
# License for the specific language governing permissions and limitations
# under the License.
-import unittest
import webob.exc
import webob.dec
from webob import Request
+from nova import test
from nova.api import openstack
from nova.api.openstack import faults
-class APITest(unittest.TestCase):
+class APITest(test.TestCase):
def _wsgi_app(self, inner_app):
# simpler version of the app than fakes.wsgi_app
diff --git a/nova/tests/api/openstack/test_auth.py b/nova/tests/api/openstack/test_auth.py
index eab78b50c..ff8d42a14 100644
--- a/nova/tests/api/openstack/test_auth.py
+++ b/nova/tests/api/openstack/test_auth.py
@@ -16,7 +16,6 @@
# under the License.
import datetime
-import unittest
import stubout
import webob
@@ -27,12 +26,15 @@ import nova.api.openstack.auth
import nova.auth.manager
from nova import auth
from nova import context
+from nova import db
+from nova import test
from nova.tests.api.openstack import fakes
-class Test(unittest.TestCase):
+class Test(test.TestCase):
def setUp(self):
+ super(Test, self).setUp()
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__init__', fakes.fake_auth_init)
@@ -45,10 +47,11 @@ class Test(unittest.TestCase):
def tearDown(self):
self.stubs.UnsetAll()
fakes.fake_data_store = {}
+ super(Test, self).tearDown()
def test_authorize_user(self):
f = fakes.FakeAuthManager()
- f.add_user(nova.auth.manager.User(1, 'herp', 'herp', 'derp', None))
+ f.add_user('derp', nova.auth.manager.User(1, 'herp', None, None, None))
req = webob.Request.blank('/v1.0/')
req.headers['X-Auth-User'] = 'herp'
@@ -62,7 +65,7 @@ class Test(unittest.TestCase):
def test_authorize_token(self):
f = fakes.FakeAuthManager()
- f.add_user(nova.auth.manager.User(1, 'herp', 'herp', 'derp', None))
+ f.add_user('derp', nova.auth.manager.User(1, 'herp', None, None, None))
req = webob.Request.blank('/v1.0/', {'HTTP_HOST': 'foo'})
req.headers['X-Auth-User'] = 'herp'
@@ -97,10 +100,10 @@ class Test(unittest.TestCase):
token_hash=token_hash,
created_at=datetime.datetime(1990, 1, 1))
- self.stubs.Set(fakes.FakeAuthDatabase, 'auth_destroy_token',
+ self.stubs.Set(fakes.FakeAuthDatabase, 'auth_token_destroy',
destroy_token_mock)
- self.stubs.Set(fakes.FakeAuthDatabase, 'auth_get_token',
+ self.stubs.Set(fakes.FakeAuthDatabase, 'auth_token_get',
bad_token)
req = webob.Request.blank('/v1.0/')
@@ -128,8 +131,36 @@ class Test(unittest.TestCase):
self.assertEqual(result.status, '401 Unauthorized')
-class TestLimiter(unittest.TestCase):
+class TestFunctional(test.TestCase):
+ def test_token_expiry(self):
+ ctx = context.get_admin_context()
+ tok = db.auth_token_create(ctx, dict(
+ token_hash='bacon',
+ cdn_management_url='',
+ server_management_url='',
+ storage_url='',
+ user_id='ham',
+ ))
+
+ db.auth_token_update(ctx, tok.token_hash, dict(
+ created_at=datetime.datetime(2000, 1, 1, 12, 0, 0),
+ ))
+
+ req = webob.Request.blank('/v1.0/')
+ req.headers['X-Auth-Token'] = 'bacon'
+ result = req.get_response(fakes.wsgi_app())
+ self.assertEqual(result.status, '401 Unauthorized')
+
+ def test_token_doesnotexist(self):
+ req = webob.Request.blank('/v1.0/')
+ req.headers['X-Auth-Token'] = 'ham'
+ result = req.get_response(fakes.wsgi_app())
+ self.assertEqual(result.status, '401 Unauthorized')
+
+
+class TestLimiter(test.TestCase):
def setUp(self):
+ super(TestLimiter, self).setUp()
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(nova.api.openstack.auth.AuthMiddleware,
'__init__', fakes.fake_auth_init)
@@ -141,10 +172,11 @@ class TestLimiter(unittest.TestCase):
def tearDown(self):
self.stubs.UnsetAll()
fakes.fake_data_store = {}
+ super(TestLimiter, self).tearDown()
def test_authorize_token(self):
f = fakes.FakeAuthManager()
- f.add_user(nova.auth.manager.User(1, 'herp', 'herp', 'derp', None))
+ f.add_user('derp', nova.auth.manager.User(1, 'herp', None, None, None))
req = webob.Request.blank('/v1.0/')
req.headers['X-Auth-User'] = 'herp'
@@ -161,7 +193,3 @@ class TestLimiter(unittest.TestCase):
result = req.get_response(fakes.wsgi_app())
self.assertEqual(result.status, '200 OK')
self.assertEqual(result.headers['X-Test-Success'], 'True')
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/nova/tests/api/openstack/test_common.py b/nova/tests/api/openstack/test_common.py
index 9d9837cc9..59d850157 100644
--- a/nova/tests/api/openstack/test_common.py
+++ b/nova/tests/api/openstack/test_common.py
@@ -19,14 +19,14 @@
Test suites for 'common' code used throughout the OpenStack HTTP API.
"""
-import unittest
from webob import Request
+from nova import test
from nova.api.openstack.common import limited
-class LimiterTest(unittest.TestCase):
+class LimiterTest(test.TestCase):
"""
Unit tests for the `nova.api.openstack.common.limited` method which takes
in a list of items and, depending on the 'offset' and 'limit' GET params,
@@ -37,6 +37,7 @@ class LimiterTest(unittest.TestCase):
"""
Run before each test.
"""
+ super(LimiterTest, self).setUp()
self.tiny = range(1)
self.small = range(10)
self.medium = range(1000)
diff --git a/nova/tests/api/openstack/test_faults.py b/nova/tests/api/openstack/test_faults.py
index fda2b5ede..7667753f4 100644
--- a/nova/tests/api/openstack/test_faults.py
+++ b/nova/tests/api/openstack/test_faults.py
@@ -15,15 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.
-import unittest
import webob
import webob.dec
import webob.exc
+from nova import test
from nova.api.openstack import faults
-class TestFaults(unittest.TestCase):
+class TestFaults(test.TestCase):
def test_fault_parts(self):
req = webob.Request.blank('/.xml')
diff --git a/nova/tests/api/openstack/test_flavors.py b/nova/tests/api/openstack/test_flavors.py
index 1bdaea161..761265965 100644
--- a/nova/tests/api/openstack/test_flavors.py
+++ b/nova/tests/api/openstack/test_flavors.py
@@ -15,18 +15,18 @@
# License for the specific language governing permissions and limitations
# under the License.
-import unittest
-
import stubout
import webob
+from nova import test
import nova.api
from nova.api.openstack import flavors
from nova.tests.api.openstack import fakes
-class FlavorsTest(unittest.TestCase):
+class FlavorsTest(test.TestCase):
def setUp(self):
+ super(FlavorsTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
@@ -36,6 +36,7 @@ class FlavorsTest(unittest.TestCase):
def tearDown(self):
self.stubs.UnsetAll()
+ super(FlavorsTest, self).tearDown()
def test_get_flavor_list(self):
req = webob.Request.blank('/v1.0/flavors')
@@ -43,6 +44,3 @@ class FlavorsTest(unittest.TestCase):
def test_get_flavor_by_id(self):
pass
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py
index 8ab4d7569..e232bc3d5 100644
--- a/nova/tests/api/openstack/test_images.py
+++ b/nova/tests/api/openstack/test_images.py
@@ -22,7 +22,6 @@ and as a WSGI layer
import json
import datetime
-import unittest
import stubout
import webob
@@ -30,6 +29,7 @@ import webob
from nova import context
from nova import exception
from nova import flags
+from nova import test
from nova import utils
import nova.api.openstack
from nova.api.openstack import images
@@ -130,12 +130,13 @@ class BaseImageServiceTests(object):
self.assertEquals(1, num_images)
-class LocalImageServiceTest(unittest.TestCase,
+class LocalImageServiceTest(test.TestCase,
BaseImageServiceTests):
"""Tests the local image service"""
def setUp(self):
+ super(LocalImageServiceTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
service_class = 'nova.image.local.LocalImageService'
self.service = utils.import_object(service_class)
@@ -145,14 +146,16 @@ class LocalImageServiceTest(unittest.TestCase,
self.service.delete_all()
self.service.delete_imagedir()
self.stubs.UnsetAll()
+ super(LocalImageServiceTest, self).tearDown()
-class GlanceImageServiceTest(unittest.TestCase,
+class GlanceImageServiceTest(test.TestCase,
BaseImageServiceTests):
"""Tests the local image service"""
def setUp(self):
+ super(GlanceImageServiceTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
fakes.stub_out_glance(self.stubs)
fakes.stub_out_compute_api_snapshot(self.stubs)
@@ -163,9 +166,10 @@ class GlanceImageServiceTest(unittest.TestCase,
def tearDown(self):
self.stubs.UnsetAll()
+ super(GlanceImageServiceTest, self).tearDown()
-class ImageControllerWithGlanceServiceTest(unittest.TestCase):
+class ImageControllerWithGlanceServiceTest(test.TestCase):
"""Test of the OpenStack API /images application controller"""
@@ -194,6 +198,7 @@ class ImageControllerWithGlanceServiceTest(unittest.TestCase):
'image_type': 'ramdisk'}]
def setUp(self):
+ super(ImageControllerWithGlanceServiceTest, self).setUp()
self.orig_image_service = FLAGS.image_service
FLAGS.image_service = 'nova.image.glance.GlanceImageService'
self.stubs = stubout.StubOutForTesting()
@@ -208,6 +213,7 @@ class ImageControllerWithGlanceServiceTest(unittest.TestCase):
def tearDown(self):
self.stubs.UnsetAll()
FLAGS.image_service = self.orig_image_service
+ super(ImageControllerWithGlanceServiceTest, self).tearDown()
def test_get_image_index(self):
req = webob.Request.blank('/v1.0/images')
diff --git a/nova/tests/api/openstack/test_ratelimiting.py b/nova/tests/api/openstack/test_ratelimiting.py
index 4c9d6bc23..9ae90ee20 100644
--- a/nova/tests/api/openstack/test_ratelimiting.py
+++ b/nova/tests/api/openstack/test_ratelimiting.py
@@ -1,15 +1,16 @@
import httplib
import StringIO
import time
-import unittest
import webob
+from nova import test
import nova.api.openstack.ratelimiting as ratelimiting
-class LimiterTest(unittest.TestCase):
+class LimiterTest(test.TestCase):
def setUp(self):
+ super(LimiterTest, self).setUp()
self.limits = {
'a': (5, ratelimiting.PER_SECOND),
'b': (5, ratelimiting.PER_MINUTE),
@@ -83,9 +84,10 @@ class FakeLimiter(object):
return self._delay
-class WSGIAppTest(unittest.TestCase):
+class WSGIAppTest(test.TestCase):
def setUp(self):
+ super(WSGIAppTest, self).setUp()
self.limiter = FakeLimiter(self)
self.app = ratelimiting.WSGIApp(self.limiter)
@@ -206,7 +208,7 @@ def wire_HTTPConnection_to_WSGI(host, app):
httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
-class WSGIAppProxyTest(unittest.TestCase):
+class WSGIAppProxyTest(test.TestCase):
def setUp(self):
"""Our WSGIAppProxy is going to call across an HTTPConnection to a
@@ -218,6 +220,7 @@ class WSGIAppProxyTest(unittest.TestCase):
at the WSGIApp. And the limiter isn't real -- it's a fake that
behaves the way we tell it to.
"""
+ super(WSGIAppProxyTest, self).setUp()
self.limiter = FakeLimiter(self)
app = ratelimiting.WSGIApp(self.limiter)
wire_HTTPConnection_to_WSGI('100.100.100.100:80', app)
@@ -238,7 +241,3 @@ class WSGIAppProxyTest(unittest.TestCase):
self.limiter.mock('murder', 'brutus', None)
self.proxy.perform('stab', 'brutus')
self.assertRaises(AssertionError, shouldRaise)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index a7be0796e..7a25abe9d 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -17,17 +17,18 @@
import datetime
import json
-import unittest
import stubout
import webob
from nova import db
from nova import flags
+from nova import test
import nova.api.openstack
from nova.api.openstack import servers
import nova.db.api
from nova.db.sqlalchemy.models import Instance
+from nova.db.sqlalchemy.models import InstanceMetadata
import nova.rpc
from nova.tests.api.openstack import fakes
@@ -64,6 +65,9 @@ def instance_address(context, instance_id):
def stub_instance(id, user_id=1, private_address=None, public_addresses=None):
+ metadata = []
+ metadata.append(InstanceMetadata(key='seq', value=id))
+
if public_addresses == None:
public_addresses = list()
@@ -84,7 +88,7 @@ def stub_instance(id, user_id=1, private_address=None, public_addresses=None):
"vcpus": 0,
"local_gb": 0,
"hostname": "",
- "host": "",
+ "host": None,
"instance_type": "",
"user_data": "",
"reservation_id": "",
@@ -95,7 +99,8 @@ def stub_instance(id, user_id=1, private_address=None, public_addresses=None):
"availability_zone": "",
"display_name": "server%s" % id,
"display_description": "",
- "locked": False}
+ "locked": False,
+ "metadata": metadata}
instance["fixed_ip"] = {
"address": private_address,
@@ -108,9 +113,10 @@ def fake_compute_api(cls, req, id):
return True
-class ServersTest(unittest.TestCase):
+class ServersTest(test.TestCase):
def setUp(self):
+ super(ServersTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
@@ -141,6 +147,7 @@ class ServersTest(unittest.TestCase):
def tearDown(self):
self.stubs.UnsetAll()
FLAGS.allow_admin_api = self.allow_admin
+ super(ServersTest, self).tearDown()
def test_get_server_by_id(self):
req = webob.Request.blank('/v1.0/servers/1')
@@ -214,7 +221,8 @@ class ServersTest(unittest.TestCase):
"get_image_id_from_image_hash", image_id_from_hash)
body = dict(server=dict(
- name='server_test', imageId=2, flavorId=2, metadata={},
+ name='server_test', imageId=2, flavorId=2,
+ metadata={'hello': 'world', 'open': 'stack'},
personality={}))
req = webob.Request.blank('/v1.0/servers')
req.method = 'POST'
@@ -291,6 +299,7 @@ class ServersTest(unittest.TestCase):
self.assertEqual(s['id'], i)
self.assertEqual(s['name'], 'server%d' % i)
self.assertEqual(s['imageId'], 10)
+ self.assertEqual(s['metadata']['seq'], i)
i += 1
def test_server_pause(self):
@@ -353,6 +362,18 @@ class ServersTest(unittest.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 202)
+ def test_server_inject_network_info(self):
+ FLAGS.allow_admin_api = True
+ body = dict(server=dict(
+ name='server_test', imageId=2, flavorId=2, metadata={},
+ personality={}))
+ req = webob.Request.blank('/v1.0/servers/1/inject_network_info')
+ req.method = 'POST'
+ req.content_type = 'application/json'
+ req.body = json.dumps(body)
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 202)
+
def test_server_diagnostics(self):
req = webob.Request.blank("/v1.0/servers/1/diagnostics")
req.method = "GET"
@@ -410,7 +431,3 @@ class ServersTest(unittest.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status, '202 Accepted')
self.assertEqual(self.server_delete_called, True)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/nova/tests/api/openstack/test_shared_ip_groups.py b/nova/tests/api/openstack/test_shared_ip_groups.py
index c2fc3a203..b4de2ef41 100644
--- a/nova/tests/api/openstack/test_shared_ip_groups.py
+++ b/nova/tests/api/openstack/test_shared_ip_groups.py
@@ -15,19 +15,20 @@
# License for the specific language governing permissions and limitations
# under the License.
-import unittest
-
import stubout
+from nova import test
from nova.api.openstack import shared_ip_groups
-class SharedIpGroupsTest(unittest.TestCase):
+class SharedIpGroupsTest(test.TestCase):
def setUp(self):
+ super(SharedIpGroupsTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
def tearDown(self):
self.stubs.UnsetAll()
+ super(SharedIpGroupsTest, self).tearDown()
def test_get_shared_ip_groups(self):
pass
diff --git a/nova/tests/api/openstack/test_zones.py b/nova/tests/api/openstack/test_zones.py
index df497ef1b..555b206b9 100644
--- a/nova/tests/api/openstack/test_zones.py
+++ b/nova/tests/api/openstack/test_zones.py
@@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import unittest
import stubout
import webob
@@ -22,6 +21,7 @@ import json
import nova.db
from nova import context
from nova import flags
+from nova import test
from nova.api.openstack import zones
from nova.tests.api.openstack import fakes
@@ -60,8 +60,9 @@ def zone_get_all(context):
password='qwerty')]
-class ZonesTest(unittest.TestCase):
+class ZonesTest(test.TestCase):
def setUp(self):
+ super(ZonesTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
fakes.FakeAuthManager.auth_data = {}
fakes.FakeAuthDatabase.data = {}
@@ -81,6 +82,7 @@ class ZonesTest(unittest.TestCase):
def tearDown(self):
self.stubs.UnsetAll()
FLAGS.allow_admin_api = self.allow_admin
+ super(ZonesTest, self).tearDown()
def test_get_zone_list(self):
req = webob.Request.blank('/v1.0/zones')
@@ -134,7 +136,3 @@ class ZonesTest(unittest.TestCase):
self.assertEqual(res_dict['zone']['id'], 1)
self.assertEqual(res_dict['zone']['api_url'], 'http://foo.com')
self.assertFalse('username' in res_dict['zone'])
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/nova/tests/api/test_wsgi.py b/nova/tests/api/test_wsgi.py
index 44e2d615c..2c7852214 100644
--- a/nova/tests/api/test_wsgi.py
+++ b/nova/tests/api/test_wsgi.py
@@ -21,7 +21,7 @@
Test WSGI basics and provide some helper functions for other WSGI tests.
"""
-import unittest
+from nova import test
import routes
import webob
@@ -29,7 +29,7 @@ import webob
from nova import wsgi
-class Test(unittest.TestCase):
+class Test(test.TestCase):
def test_debug(self):
@@ -92,7 +92,7 @@ class Test(unittest.TestCase):
self.assertNotEqual(result.body, "123")
-class SerializerTest(unittest.TestCase):
+class SerializerTest(test.TestCase):
def match(self, url, accept, expect):
input_dict = dict(servers=dict(a=(2, 3)))
diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py
index 1097488ec..cbd949477 100644
--- a/nova/tests/fake_flags.py
+++ b/nova/tests/fake_flags.py
@@ -29,8 +29,8 @@ FLAGS.auth_driver = 'nova.auth.dbdriver.DbDriver'
flags.DECLARE('network_size', 'nova.network.manager')
flags.DECLARE('num_networks', 'nova.network.manager')
flags.DECLARE('fake_network', 'nova.network.manager')
-FLAGS.network_size = 16
-FLAGS.num_networks = 5
+FLAGS.network_size = 8
+FLAGS.num_networks = 2
FLAGS.fake_network = True
flags.DECLARE('num_shelves', 'nova.volume.driver')
flags.DECLARE('blades_per_shelf', 'nova.volume.driver')
@@ -39,5 +39,5 @@ FLAGS.num_shelves = 2
FLAGS.blades_per_shelf = 4
FLAGS.iscsi_num_targets = 8
FLAGS.verbose = True
-FLAGS.sql_connection = 'sqlite:///nova.sqlite'
+FLAGS.sqlite_db = "tests.sqlite"
FLAGS.use_ipv6 = True
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
index da86e6e11..5a1be08eb 100644
--- a/nova/tests/objectstore_unittest.py
+++ b/nova/tests/objectstore_unittest.py
@@ -311,4 +311,5 @@ class S3APITestCase(test.TestCase):
self.auth_manager.delete_user('admin')
self.auth_manager.delete_project('admin')
stop_listening = defer.maybeDeferred(self.listening_port.stopListening)
+ super(S3APITestCase, self).tearDown()
return defer.DeferredList([stop_listening])
diff --git a/nova/tests/test_api.py b/nova/tests/test_api.py
index fa27825cd..d5c54a1c3 100644
--- a/nova/tests/test_api.py
+++ b/nova/tests/test_api.py
@@ -20,6 +20,7 @@
import boto
from boto.ec2 import regioninfo
+import datetime
import httplib
import random
import StringIO
@@ -127,6 +128,28 @@ class ApiEc2TestCase(test.TestCase):
self.ec2.new_http_connection(host, is_secure).AndReturn(self.http)
return self.http
+ def test_return_valid_isoformat(self):
+ """
+ Ensure that the ec2 api returns datetime in xs:dateTime
+ (which apparently isn't datetime.isoformat())
+ NOTE(ken-pepple): https://bugs.launchpad.net/nova/+bug/721297
+ """
+ conv = apirequest._database_to_isoformat
+ # sqlite database representation with microseconds
+ time_to_convert = datetime.datetime.strptime(
+ "2011-02-21 20:14:10.634276",
+ "%Y-%m-%d %H:%M:%S.%f")
+ self.assertEqual(
+ conv(time_to_convert),
+ '2011-02-21T20:14:10Z')
+ # mysqlite database representation
+ time_to_convert = datetime.datetime.strptime(
+ "2011-02-21 19:56:18",
+ "%Y-%m-%d %H:%M:%S")
+ self.assertEqual(
+ conv(time_to_convert),
+ '2011-02-21T19:56:18Z')
+
def test_xmlns_version_matches_request_version(self):
self.expect_http(api_version='2010-10-30')
self.mox.ReplayAll()
diff --git a/nova/tests/test_auth.py b/nova/tests/test_auth.py
index 35ffffb67..2a7817032 100644
--- a/nova/tests/test_auth.py
+++ b/nova/tests/test_auth.py
@@ -327,15 +327,6 @@ class AuthManagerTestCase(object):
class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
- def __init__(self, *args, **kwargs):
- AuthManagerTestCase.__init__(self)
- test.TestCase.__init__(self, *args, **kwargs)
- import nova.auth.fakeldap as fakeldap
- if FLAGS.flush_db:
- LOG.info("Flushing datastore")
- r = fakeldap.Store.instance()
- r.flushdb()
-
class AuthManagerDbTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver'
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index 445cc6e8b..061910013 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -65,18 +65,21 @@ class CloudTestCase(test.TestCase):
self.cloud = cloud.CloudController()
# set up services
- self.compute = service.Service.create(binary='nova-compute')
- self.compute.start()
- self.network = service.Service.create(binary='nova-network')
- self.network.start()
+ self.compute = self.start_service('compute')
+ self.scheduter = self.start_service('scheduler')
+ self.network = self.start_service('network')
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
project=self.project)
+ host = self.network.get_network_host(self.context.elevated())
def tearDown(self):
+ network_ref = db.project_get_network(self.context,
+ self.project.id)
+ db.network_disassociate(self.context, network_ref['id'])
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
self.compute.kill()
@@ -102,7 +105,7 @@ class CloudTestCase(test.TestCase):
address = "10.10.10.10"
db.floating_ip_create(self.context,
{'address': address,
- 'host': FLAGS.host})
+ 'host': self.network.host})
self.cloud.allocate_address(self.context)
self.cloud.describe_addresses(self.context)
self.cloud.release_address(self.context,
@@ -115,9 +118,9 @@ class CloudTestCase(test.TestCase):
address = "10.10.10.10"
db.floating_ip_create(self.context,
{'address': address,
- 'host': FLAGS.host})
+ 'host': self.network.host})
self.cloud.allocate_address(self.context)
- inst = db.instance_create(self.context, {'host': FLAGS.host})
+ inst = db.instance_create(self.context, {'host': self.compute.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.id_to_ec2_id(inst['id'])
self.cloud.associate_address(self.context,
@@ -133,6 +136,22 @@ class CloudTestCase(test.TestCase):
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
+ def test_describe_security_groups(self):
+ """Makes sure describe_security_groups works and filters results."""
+ sec = db.security_group_create(self.context,
+ {'project_id': self.context.project_id,
+ 'name': 'test'})
+ result = self.cloud.describe_security_groups(self.context)
+ # NOTE(vish): should have the default group as well
+ self.assertEqual(len(result['securityGroupInfo']), 2)
+ result = self.cloud.describe_security_groups(self.context,
+ group_name=[sec['name']])
+ self.assertEqual(len(result['securityGroupInfo']), 1)
+ self.assertEqual(
+ result['securityGroupInfo'][0]['groupName'],
+ sec['name'])
+ db.security_group_destroy(self.context, sec['id'])
+
def test_describe_volumes(self):
"""Makes sure describe_volumes works and filters results."""
vol1 = db.volume_create(self.context, {})
@@ -203,27 +222,32 @@ class CloudTestCase(test.TestCase):
'instance_type': instance_type,
'max_count': max_count}
rv = self.cloud.run_instances(self.context, **kwargs)
+ greenthread.sleep(0.3)
instance_id = rv['instancesSet'][0]['instanceId']
output = self.cloud.get_console_output(context=self.context,
- instance_id=[instance_id])
+ instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
+ greenthread.sleep(0.3)
def test_ajax_console(self):
+ image_id = FLAGS.default_image
kwargs = {'image_id': image_id}
- rv = yield self.cloud.run_instances(self.context, **kwargs)
+ rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
- output = yield self.cloud.get_console_output(context=self.context,
- instance_id=[instance_id])
- self.assertEquals(b64decode(output['output']),
- 'http://fakeajaxconsole.com/?token=FAKETOKEN')
+ greenthread.sleep(0.3)
+ output = self.cloud.get_ajax_console(context=self.context,
+ instance_id=[instance_id])
+ self.assertEquals(output['url'],
+ '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
- rv = yield self.cloud.terminate_instances(self.context, [instance_id])
+ rv = self.cloud.terminate_instances(self.context, [instance_id])
+ greenthread.sleep(0.3)
def test_key_generation(self):
result = self._create_key('test')
@@ -286,70 +310,6 @@ class CloudTestCase(test.TestCase):
LOG.debug(_("Terminating instance %s"), instance_id)
rv = self.compute.terminate_instance(instance_id)
- def test_describe_instances(self):
- """Makes sure describe_instances works."""
- instance1 = db.instance_create(self.context, {'host': 'host2'})
- comp1 = db.service_create(self.context, {'host': 'host2',
- 'availability_zone': 'zone1',
- 'topic': "compute"})
- result = self.cloud.describe_instances(self.context)
- self.assertEqual(result['reservationSet'][0]
- ['instancesSet'][0]
- ['placement']['availabilityZone'], 'zone1')
- db.instance_destroy(self.context, instance1['id'])
- db.service_destroy(self.context, comp1['id'])
-
- def test_instance_update_state(self):
- # TODO(termie): what is this code even testing?
- def instance(num):
- return {
- 'reservation_id': 'r-1',
- 'instance_id': 'i-%s' % num,
- 'image_id': 'ami-%s' % num,
- 'private_dns_name': '10.0.0.%s' % num,
- 'dns_name': '10.0.0%s' % num,
- 'ami_launch_index': str(num),
- 'instance_type': 'fake',
- 'availability_zone': 'fake',
- 'key_name': None,
- 'kernel_id': 'fake',
- 'ramdisk_id': 'fake',
- 'groups': ['default'],
- 'product_codes': None,
- 'state': 0x01,
- 'user_data': ''}
- rv = self.cloud._format_describe_instances(self.context)
- logging.error(str(rv))
- self.assertEqual(len(rv['reservationSet']), 0)
-
- # simulate launch of 5 instances
- # self.cloud.instances['pending'] = {}
- #for i in xrange(5):
- # inst = instance(i)
- # self.cloud.instances['pending'][inst['instance_id']] = inst
-
- #rv = self.cloud._format_instances(self.admin)
- #self.assert_(len(rv['reservationSet']) == 1)
- #self.assert_(len(rv['reservationSet'][0]['instances_set']) == 5)
- # report 4 nodes each having 1 of the instances
- #for i in xrange(4):
- # self.cloud.update_state('instances',
- # {('node-%s' % i): {('i-%s' % i):
- # instance(i)}})
-
- # one instance should be pending still
- #self.assert_(len(self.cloud.instances['pending'].keys()) == 1)
-
- # check that the reservations collapse
- #rv = self.cloud._format_instances(self.admin)
- #self.assert_(len(rv['reservationSet']) == 1)
- #self.assert_(len(rv['reservationSet'][0]['instances_set']) == 5)
-
- # check that we can get metadata for each instance
- #for i in xrange(4):
- # data = self.cloud.get_metadata(instance(i)['private_dns_name'])
- # self.assert_(data['meta-data']['ami-id'] == 'ami-%s' % i)
-
@staticmethod
def _fake_set_image_description(ctxt, image_id, description):
from nova.objectstore import handler
diff --git a/nova/tests/test_console.py b/nova/tests/test_console.py
index 85bf94458..49ff24413 100644
--- a/nova/tests/test_console.py
+++ b/nova/tests/test_console.py
@@ -21,7 +21,6 @@ Tests For Console proxy.
"""
import datetime
-import logging
from nova import context
from nova import db
@@ -38,7 +37,6 @@ FLAGS = flags.FLAGS
class ConsoleTestCase(test.TestCase):
"""Test case for console proxy"""
def setUp(self):
- logging.getLogger().setLevel(logging.DEBUG)
super(ConsoleTestCase, self).setUp()
self.flags(console_driver='nova.console.fake.FakeConsoleProxy',
stub_compute=True)
diff --git a/nova/tests/test_direct.py b/nova/tests/test_direct.py
index 8a74b2296..b6bfab534 100644
--- a/nova/tests/test_direct.py
+++ b/nova/tests/test_direct.py
@@ -19,7 +19,6 @@
"""Tests for Direct API."""
import json
-import logging
import webob
@@ -53,6 +52,7 @@ class DirectTestCase(test.TestCase):
def tearDown(self):
direct.ROUTES = {}
+ super(DirectTestCase, self).tearDown()
def test_delegated_auth(self):
req = webob.Request.blank('/fake/context')
diff --git a/nova/tests/test_localization.py b/nova/tests/test_localization.py
index 6992773f5..393d71038 100644
--- a/nova/tests/test_localization.py
+++ b/nova/tests/test_localization.py
@@ -15,7 +15,6 @@
# under the License.
import glob
-import logging
import os
import re
import sys
diff --git a/nova/tests/test_log.py b/nova/tests/test_log.py
index c2c9d7772..122351ff6 100644
--- a/nova/tests/test_log.py
+++ b/nova/tests/test_log.py
@@ -1,9 +1,12 @@
import cStringIO
from nova import context
+from nova import flags
from nova import log
from nova import test
+FLAGS = flags.FLAGS
+
def _fake_context():
return context.RequestContext(1, 1)
@@ -14,15 +17,11 @@ class RootLoggerTestCase(test.TestCase):
super(RootLoggerTestCase, self).setUp()
self.log = log.logging.root
- def tearDown(self):
- super(RootLoggerTestCase, self).tearDown()
- log.NovaLogger.manager.loggerDict = {}
-
def test_is_nova_instance(self):
self.assert_(isinstance(self.log, log.NovaLogger))
- def test_name_is_nova_root(self):
- self.assertEqual("nova.root", self.log.name)
+ def test_name_is_nova(self):
+ self.assertEqual("nova", self.log.name)
def test_handlers_have_nova_formatter(self):
formatters = []
@@ -45,25 +44,36 @@ class RootLoggerTestCase(test.TestCase):
log.audit("foo", context=_fake_context())
self.assert_(True) # didn't raise exception
+ def test_will_be_verbose_if_verbose_flag_set(self):
+ self.flags(verbose=True)
+ log.reset()
+ self.assertEqual(log.DEBUG, self.log.level)
+
+ def test_will_not_be_verbose_if_verbose_flag_not_set(self):
+ self.flags(verbose=False)
+ log.reset()
+ self.assertEqual(log.INFO, self.log.level)
+
class LogHandlerTestCase(test.TestCase):
def test_log_path_logdir(self):
- self.flags(logdir='/some/path')
- self.assertEquals(log.get_log_file_path(binary='foo-bar'),
+ self.flags(logdir='/some/path', logfile=None)
+ self.assertEquals(log._get_log_file_path(binary='foo-bar'),
'/some/path/foo-bar.log')
def test_log_path_logfile(self):
self.flags(logfile='/some/path/foo-bar.log')
- self.assertEquals(log.get_log_file_path(binary='foo-bar'),
+ self.assertEquals(log._get_log_file_path(binary='foo-bar'),
'/some/path/foo-bar.log')
def test_log_path_none(self):
- self.assertTrue(log.get_log_file_path(binary='foo-bar') is None)
+ self.flags(logdir=None, logfile=None)
+ self.assertTrue(log._get_log_file_path(binary='foo-bar') is None)
def test_log_path_logfile_overrides_logdir(self):
self.flags(logdir='/some/other/path',
logfile='/some/path/foo-bar.log')
- self.assertEquals(log.get_log_file_path(binary='foo-bar'),
+ self.assertEquals(log._get_log_file_path(binary='foo-bar'),
'/some/path/foo-bar.log')
@@ -76,13 +86,15 @@ class NovaFormatterTestCase(test.TestCase):
logging_debug_format_suffix="--DBG")
self.log = log.logging.root
self.stream = cStringIO.StringIO()
- handler = log.StreamHandler(self.stream)
- self.log.addHandler(handler)
+ self.handler = log.StreamHandler(self.stream)
+ self.log.addHandler(self.handler)
+ self.level = self.log.level
self.log.setLevel(log.DEBUG)
def tearDown(self):
+ self.log.setLevel(self.level)
+ self.log.removeHandler(self.handler)
super(NovaFormatterTestCase, self).tearDown()
- log.NovaLogger.manager.loggerDict = {}
def test_uncontextualized_log(self):
self.log.info("foo")
@@ -102,30 +114,15 @@ class NovaFormatterTestCase(test.TestCase):
class NovaLoggerTestCase(test.TestCase):
def setUp(self):
super(NovaLoggerTestCase, self).setUp()
- self.flags(default_log_levels=["nova-test=AUDIT"], verbose=False)
+ levels = FLAGS.default_log_levels
+ levels.append("nova-test=AUDIT")
+ self.flags(default_log_levels=levels,
+ verbose=True)
self.log = log.getLogger('nova-test')
- def tearDown(self):
- super(NovaLoggerTestCase, self).tearDown()
- log.NovaLogger.manager.loggerDict = {}
-
def test_has_level_from_flags(self):
self.assertEqual(log.AUDIT, self.log.level)
def test_child_log_has_level_of_parent_flag(self):
l = log.getLogger('nova-test.foo')
self.assertEqual(log.AUDIT, l.level)
-
-
-class VerboseLoggerTestCase(test.TestCase):
- def setUp(self):
- super(VerboseLoggerTestCase, self).setUp()
- self.flags(default_log_levels=["nova.test=AUDIT"], verbose=True)
- self.log = log.getLogger('nova.test')
-
- def tearDown(self):
- super(VerboseLoggerTestCase, self).tearDown()
- log.NovaLogger.manager.loggerDict = {}
-
- def test_will_be_verbose_if_named_nova_and_verbose_flag_set(self):
- self.assertEqual(log.DEBUG, self.log.level)
diff --git a/nova/tests/test_network.py b/nova/tests/test_network.py
index 00f9323f3..ce1c77210 100644
--- a/nova/tests/test_network.py
+++ b/nova/tests/test_network.py
@@ -42,15 +42,13 @@ class NetworkTestCase(test.TestCase):
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
fake_call=True,
- fake_network=True,
- network_size=16,
- num_networks=5)
+ fake_network=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
self.projects = []
self.network = utils.import_object(FLAGS.network_manager)
self.context = context.RequestContext(project=None, user=self.user)
- for i in range(5):
+ for i in range(FLAGS.num_networks):
name = 'project%s' % i
project = self.manager.create_project(name, 'netuser', name)
self.projects.append(project)
@@ -117,6 +115,9 @@ class NetworkTestCase(test.TestCase):
utils.to_global_ipv6(
network_ref['cidr_v6'],
instance_ref['mac_address']))
+ self._deallocate_address(0, address)
+ db.instance_destroy(context.get_admin_context(),
+ instance_ref['id'])
def test_public_network_association(self):
"""Makes sure that we can allocaate a public ip"""
@@ -192,7 +193,7 @@ class NetworkTestCase(test.TestCase):
first = self._create_address(0)
lease_ip(first)
instance_ids = []
- for i in range(1, 5):
+ for i in range(1, FLAGS.num_networks):
instance_ref = self._create_instance(i, mac=utils.generate_mac())
instance_ids.append(instance_ref['id'])
address = self._create_address(i, instance_ref['id'])
diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py
index 9548a8c13..1e42fddf3 100644
--- a/nova/tests/test_quota.py
+++ b/nova/tests/test_quota.py
@@ -16,6 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+from nova import compute
from nova import context
from nova import db
from nova import flags
@@ -87,6 +88,18 @@ class QuotaTestCase(test.TestCase):
num_instances = quota.allowed_instances(self.context, 100,
instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 10)
+
+ # metadata_items
+ too_many_items = FLAGS.quota_metadata_items + 1000
+ num_metadata_items = quota.allowed_metadata_items(self.context,
+ too_many_items)
+ self.assertEqual(num_metadata_items, FLAGS.quota_metadata_items)
+ db.quota_update(self.context, self.project.id, {'metadata_items': 5})
+ num_metadata_items = quota.allowed_metadata_items(self.context,
+ too_many_items)
+ self.assertEqual(num_metadata_items, 5)
+
+ # Cleanup
db.quota_destroy(self.context, self.project.id)
def test_too_many_instances(self):
@@ -151,3 +164,15 @@ class QuotaTestCase(test.TestCase):
self.assertRaises(quota.QuotaError, self.cloud.allocate_address,
self.context)
db.floating_ip_destroy(context.get_admin_context(), address)
+
+ def test_too_many_metadata_items(self):
+ metadata = {}
+ for i in range(FLAGS.quota_metadata_items + 1):
+ metadata['key%s' % i] = 'value%s' % i
+ self.assertRaises(quota.QuotaError, compute.API().create,
+ self.context,
+ min_count=1,
+ max_count=1,
+ instance_type='m1.small',
+ image_id='fake',
+ metadata=metadata)
diff --git a/nova/tests/test_scheduler.py b/nova/tests/test_scheduler.py
index 71e524bca..c4e4d148e 100644
--- a/nova/tests/test_scheduler.py
+++ b/nova/tests/test_scheduler.py
@@ -249,6 +249,7 @@ class SimpleDriverTestCase(test.TestCase):
def tearDown(self):
self.manager.delete_user(self.user)
self.manager.delete_project(self.project)
+ super(SimpleDriverTestCase, self).tearDown()
def _create_instance(self, **kwargs):
"""Create a test instance"""
@@ -486,18 +487,8 @@ class SimpleDriverTestCase(test.TestCase):
def test_doesnt_report_disabled_hosts_as_up(self):
"""Ensures driver doesn't find hosts before they are enabled"""
- # NOTE(vish): constructing service without create method
- # because we are going to use it without queue
- compute1 = service.Service('host1',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute1.start()
- compute2 = service.Service('host2',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute2.start()
+ compute1 = self.start_service('compute', host='host1')
+ compute2 = self.start_service('compute', host='host2')
s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute')
s2 = db.service_get_by_args(self.context, 'host2', 'nova-compute')
db.service_update(self.context, s1['id'], {'disabled': True})
@@ -509,18 +500,8 @@ class SimpleDriverTestCase(test.TestCase):
def test_reports_enabled_hosts_as_up(self):
"""Ensures driver can find the hosts that are up"""
- # NOTE(vish): constructing service without create method
- # because we are going to use it without queue
- compute1 = service.Service('host1',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute1.start()
- compute2 = service.Service('host2',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute2.start()
+ compute1 = self.start_service('compute', host='host1')
+ compute2 = self.start_service('compute', host='host2')
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(2, len(hosts))
compute1.kill()
@@ -528,16 +509,8 @@ class SimpleDriverTestCase(test.TestCase):
def test_least_busy_host_gets_instance(self):
"""Ensures the host with less cores gets the next one"""
- compute1 = service.Service('host1',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute1.start()
- compute2 = service.Service('host2',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute2.start()
+ compute1 = self.start_service('compute', host='host1')
+ compute2 = self.start_service('compute', host='host2')
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance()
@@ -551,16 +524,8 @@ class SimpleDriverTestCase(test.TestCase):
def test_specific_host_gets_instance(self):
"""Ensures if you set availability_zone it launches on that zone"""
- compute1 = service.Service('host1',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute1.start()
- compute2 = service.Service('host2',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute2.start()
+ compute1 = self.start_service('compute', host='host1')
+ compute2 = self.start_service('compute', host='host2')
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance(availability_zone='nova:host1')
@@ -573,11 +538,7 @@ class SimpleDriverTestCase(test.TestCase):
compute2.kill()
def test_wont_sechedule_if_specified_host_is_down(self):
- compute1 = service.Service('host1',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute1.start()
+ compute1 = self.start_service('compute', host='host1')
s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute')
now = datetime.datetime.utcnow()
delta = datetime.timedelta(seconds=FLAGS.service_down_time * 2)
@@ -592,11 +553,7 @@ class SimpleDriverTestCase(test.TestCase):
compute1.kill()
def test_will_schedule_on_disabled_host_if_specified(self):
- compute1 = service.Service('host1',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute1.start()
+ compute1 = self.start_service('compute', host='host1')
s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute')
db.service_update(self.context, s1['id'], {'disabled': True})
instance_id2 = self._create_instance(availability_zone='nova:host1')
@@ -608,16 +565,8 @@ class SimpleDriverTestCase(test.TestCase):
def test_too_many_cores(self):
"""Ensures we don't go over max cores"""
- compute1 = service.Service('host1',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute1.start()
- compute2 = service.Service('host2',
- 'nova-compute',
- 'compute',
- FLAGS.compute_manager)
- compute2.start()
+ compute1 = self.start_service('compute', host='host1')
+ compute2 = self.start_service('compute', host='host2')
instance_ids1 = []
instance_ids2 = []
for index in xrange(FLAGS.max_cores):
@@ -632,6 +581,7 @@ class SimpleDriverTestCase(test.TestCase):
self.scheduler.driver.schedule_run_instance,
self.context,
instance_id)
+ db.instance_destroy(self.context, instance_id)
for instance_id in instance_ids1:
compute1.terminate_instance(self.context, instance_id)
for instance_id in instance_ids2:
@@ -641,16 +591,8 @@ class SimpleDriverTestCase(test.TestCase):
def test_least_busy_host_gets_volume(self):
"""Ensures the host with less gigabytes gets the next one"""
- volume1 = service.Service('host1',
- 'nova-volume',
- 'volume',
- FLAGS.volume_manager)
- volume1.start()
- volume2 = service.Service('host2',
- 'nova-volume',
- 'volume',
- FLAGS.volume_manager)
- volume2.start()
+ volume1 = self.start_service('volume', host='host1')
+ volume2 = self.start_service('volume', host='host2')
volume_id1 = self._create_volume()
volume1.create_volume(self.context, volume_id1)
volume_id2 = self._create_volume()
@@ -664,16 +606,8 @@ class SimpleDriverTestCase(test.TestCase):
def test_too_many_gigabytes(self):
"""Ensures we don't go over max gigabytes"""
- volume1 = service.Service('host1',
- 'nova-volume',
- 'volume',
- FLAGS.volume_manager)
- volume1.start()
- volume2 = service.Service('host2',
- 'nova-volume',
- 'volume',
- FLAGS.volume_manager)
- volume2.start()
+ volume1 = self.start_service('volume', host='host1')
+ volume2 = self.start_service('volume', host='host2')
volume_ids1 = []
volume_ids2 = []
for index in xrange(FLAGS.max_gigabytes):
@@ -893,7 +827,7 @@ class SimpleDriverTestCase(test.TestCase):
{"method": 'mktmpfile'}).AndReturn(fpath)
driver.rpc.call(mox.IgnoreArg(),
db.queue_get_for(mox.IgnoreArg(), topic, i_ref['host']),
- {"method": 'confirm_tmpfile', "args": {'path': fpath}})
+ {"method": 'confirm_tmpfile', "args": {'filename': fpath}})
self.mox.ReplayAll()
try:
diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py
index bbd5c6d92..d17f6a22a 100644
--- a/nova/tests/test_service.py
+++ b/nova/tests/test_service.py
@@ -69,13 +69,6 @@ class ExtendedService(service.Service):
class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services"""
- def test_attribute_error_for_no_manager(self):
- serv = service.Service('test',
- 'test',
- 'test',
- 'nova.tests.test_service.FakeManager')
- self.assertRaises(AttributeError, getattr, serv, 'test_method')
-
def test_message_gets_to_manager(self):
serv = service.Service('test',
'test',
diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py
new file mode 100644
index 000000000..e237674e6
--- /dev/null
+++ b/nova/tests/test_test.py
@@ -0,0 +1,40 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Tests for the testing base code."""
+
+from nova import rpc
+from nova import test
+
+
+class IsolationTestCase(test.TestCase):
+ """Ensure that things are cleaned up after failed tests.
+
+ These tests don't really do much here, but if isolation fails a bunch
+ of other tests should fail.
+
+ """
+ def test_service_isolation(self):
+ self.start_service('compute')
+
+ def test_rpc_consumer_isolation(self):
+ connection = rpc.Connection.instance(new=True)
+ consumer = rpc.TopicConsumer(connection, topic='compute')
+ consumer.register_callback(
+ lambda x, y: self.fail('I should never be called'))
+ consumer.attach_to_eventlet()
diff --git a/nova/tests/test_utils.py b/nova/tests/test_utils.py
new file mode 100644
index 000000000..34a407f1a
--- /dev/null
+++ b/nova/tests/test_utils.py
@@ -0,0 +1,174 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Justin Santa Barbara
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova import test
+from nova import utils
+from nova import exception
+
+
+class GetFromPathTestCase(test.TestCase):
+ def test_tolerates_nones(self):
+ f = utils.get_from_path
+
+ input = []
+ self.assertEquals([], f(input, "a"))
+ self.assertEquals([], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [None]
+ self.assertEquals([], f(input, "a"))
+ self.assertEquals([], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': None}]
+ self.assertEquals([], f(input, "a"))
+ self.assertEquals([], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': {'b': None}}]
+ self.assertEquals([{'b': None}], f(input, "a"))
+ self.assertEquals([], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': {'b': {'c': None}}}]
+ self.assertEquals([{'b': {'c': None}}], f(input, "a"))
+ self.assertEquals([{'c': None}], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': {'b': {'c': None}}}, {'a': None}]
+ self.assertEquals([{'b': {'c': None}}], f(input, "a"))
+ self.assertEquals([{'c': None}], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': {'b': {'c': None}}}, {'a': {'b': None}}]
+ self.assertEquals([{'b': {'c': None}}, {'b': None}], f(input, "a"))
+ self.assertEquals([{'c': None}], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ def test_does_select(self):
+ f = utils.get_from_path
+
+ input = [{'a': 'a_1'}]
+ self.assertEquals(['a_1'], f(input, "a"))
+ self.assertEquals([], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': {'b': 'b_1'}}]
+ self.assertEquals([{'b': 'b_1'}], f(input, "a"))
+ self.assertEquals(['b_1'], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': {'b': {'c': 'c_1'}}}]
+ self.assertEquals([{'b': {'c': 'c_1'}}], f(input, "a"))
+ self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
+ self.assertEquals(['c_1'], f(input, "a/b/c"))
+
+ input = [{'a': {'b': {'c': 'c_1'}}}, {'a': None}]
+ self.assertEquals([{'b': {'c': 'c_1'}}], f(input, "a"))
+ self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
+ self.assertEquals(['c_1'], f(input, "a/b/c"))
+
+ input = [{'a': {'b': {'c': 'c_1'}}},
+ {'a': {'b': None}}]
+ self.assertEquals([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a"))
+ self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
+ self.assertEquals(['c_1'], f(input, "a/b/c"))
+
+ input = [{'a': {'b': {'c': 'c_1'}}},
+ {'a': {'b': {'c': 'c_2'}}}]
+ self.assertEquals([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
+ f(input, "a"))
+ self.assertEquals([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
+ self.assertEquals(['c_1', 'c_2'], f(input, "a/b/c"))
+
+ self.assertEquals([], f(input, "a/b/c/d"))
+ self.assertEquals([], f(input, "c/a/b/d"))
+ self.assertEquals([], f(input, "i/r/t"))
+
+ def test_flattens_lists(self):
+ f = utils.get_from_path
+
+ input = [{'a': [1, 2, 3]}]
+ self.assertEquals([1, 2, 3], f(input, "a"))
+ self.assertEquals([], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': {'b': [1, 2, 3]}}]
+ self.assertEquals([{'b': [1, 2, 3]}], f(input, "a"))
+ self.assertEquals([1, 2, 3], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': {'b': [1, 2, 3]}}, {'a': {'b': [4, 5, 6]}}]
+ self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}]
+ self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = [{'a': [1, 2, {'b': 'b_1'}]}]
+ self.assertEquals([1, 2, {'b': 'b_1'}], f(input, "a"))
+ self.assertEquals(['b_1'], f(input, "a/b"))
+
+ def test_bad_xpath(self):
+ f = utils.get_from_path
+
+ self.assertRaises(exception.Error, f, [], None)
+ self.assertRaises(exception.Error, f, [], "")
+ self.assertRaises(exception.Error, f, [], "/")
+ self.assertRaises(exception.Error, f, [], "/a")
+ self.assertRaises(exception.Error, f, [], "/a/")
+ self.assertRaises(exception.Error, f, [], "//")
+ self.assertRaises(exception.Error, f, [], "//a")
+ self.assertRaises(exception.Error, f, [], "a//a")
+ self.assertRaises(exception.Error, f, [], "a//a/")
+ self.assertRaises(exception.Error, f, [], "a/a/")
+
+ def test_real_failure1(self):
+ # Real world failure case...
+ # We weren't coping when the input was a Dictionary instead of a List
+ # This led to test_accepts_dictionaries
+ f = utils.get_from_path
+
+ inst = {'fixed_ip': {'floating_ips': [{'address': '1.2.3.4'}],
+ 'address': '192.168.0.3'},
+ 'hostname': ''}
+
+ private_ips = f(inst, 'fixed_ip/address')
+ public_ips = f(inst, 'fixed_ip/floating_ips/address')
+ self.assertEquals(['192.168.0.3'], private_ips)
+ self.assertEquals(['1.2.3.4'], public_ips)
+
+ def test_accepts_dictionaries(self):
+ f = utils.get_from_path
+
+ input = {'a': [1, 2, 3]}
+ self.assertEquals([1, 2, 3], f(input, "a"))
+ self.assertEquals([], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = {'a': {'b': [1, 2, 3]}}
+ self.assertEquals([{'b': [1, 2, 3]}], f(input, "a"))
+ self.assertEquals([1, 2, 3], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = {'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}
+ self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
+ self.assertEquals([], f(input, "a/b/c"))
+
+ input = {'a': [1, 2, {'b': 'b_1'}]}
+ self.assertEquals([1, 2, {'b': 'b_1'}], f(input, "a"))
+ self.assertEquals(['b_1'], f(input, "a/b"))
diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py
index 91bdfcc5a..f46b5950e 100644
--- a/nova/tests/test_virt.py
+++ b/nova/tests/test_virt.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import libvirt
import mox
from xml.etree.ElementTree import fromstring as xml_to_tree
@@ -23,21 +24,16 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
-from nova import logging
from nova import test
from nova import utils
from nova.api.ec2 import cloud
from nova.auth import manager
from nova.db.sqlalchemy import models
-from nova.compute import power_state
from nova.virt import libvirt_conn
FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
-libvirt = None
-libxml2 = None
-
class LibvirtConnTestCase(test.TestCase):
def setUp(self):
@@ -65,6 +61,7 @@ class LibvirtConnTestCase(test.TestCase):
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.network = utils.import_object(FLAGS.network_manager)
FLAGS.instances_path = ''
+ self.call_libvirt_dependant_setup = False
test_ip = '10.11.12.13'
test_instance = {'memory_kb': '1024000',
@@ -76,32 +73,22 @@ class LibvirtConnTestCase(test.TestCase):
'bridge': 'br101',
'instance_type': 'm1.small'}
- def _driver_dependant_test_setup(self):
- """Call this method at the top of each testcase method.
-
- Checks if libvirt and cheetah, etc is installed.
- Otherwise, skip testing."""
-
+ def libvirt_dependant_setup(self):
+ """A setup method of LibvirtConnection dependent test."""
+ # try to connect libvirt. if fail, skip test.
+ self.call_libvirt_dependant_setup = True
try:
- global libvirt
- global libxml2
- libvirt_conn.libvirt = __import__('libvirt')
- libvirt_conn.libxml2 = __import__('libxml2')
- libvirt_conn._late_load_cheetah()
- libvirt = __import__('libvirt')
- except ImportError, e:
- logging.warn("""This test has not been done since """
- """using driver-dependent library Cheetah/libvirt/libxml2.""")
- raise
-
- # inebitable mocks for calling
- obj = utils.import_object(FLAGS.firewall_driver)
- fwmock = self.mox.CreateMock(obj)
- self.mox.StubOutWithMock(libvirt_conn, 'utils',
- use_mock_anything=True)
- libvirt_conn.utils.import_object(FLAGS.firewall_driver).\
- AndReturn(fwmock)
- return fwmock
+ libvirt.openReadOnly('qemu:///system')
+ except libvirt.libvirtError:
+ return
+ return libvirt_conn.get_connection(False)
+
+ def libvirt_dependant_teardown(self):
+ """teardown method of LibvirtConnection dependent test."""
+ if self.call_libvirt_dependant_setup:
+ libvirt_conn.libvirt = None
+ libvirt_conn.libxml2 = None
+ self.call_libvirt_dependant_setup = False
def test_xml_and_uri_no_ramdisk_no_kernel(self):
instance_data = dict(self.test_instance)
@@ -255,102 +242,30 @@ class LibvirtConnTestCase(test.TestCase):
conn = libvirt_conn.LibvirtConnection(True)
uri = conn.get_uri()
self.assertEquals(uri, testuri)
-
- def test_get_vcpu_total(self):
- """Check if get_vcpu_total returns appropriate cpu value."""
- try:
- self._driver_dependant_test_setup()
- except:
- return
-
- self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
- self.assertTrue(0 < conn.get_vcpu_total())
-
- def test_get_memory_mb_total(self):
- """Check if get_memory_mb returns appropriate memory value."""
- try:
- self._driver_dependant_test_setup()
- except:
- return
-
- self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
- self.assertTrue(0 < conn.get_memory_mb_total())
+ db.instance_destroy(user_context, instance_ref['id'])
def test_get_vcpu_used(self):
"""Check if get_local_gb_total returns appropriate disk value."""
- try:
- self._driver_dependant_test_setup()
- except:
- return
-
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection,
- '_conn', use_mock_anything=True)
+ self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn')
libvirt_conn.LibvirtConnection._conn.listDomainsID().AndReturn([1, 2])
vdmock = self.mox.CreateMock(libvirt.virDomain)
- self.mox.StubOutWithMock(vdmock, "vcpus", use_mock_anything=True)
+ self.mox.StubOutWithMock(vdmock, "vcpus")
vdmock.vcpus().AndReturn(['', [('dummycpu'), ('dummycpu')]])
vdmock.vcpus().AndReturn(['', [('dummycpu'), ('dummycpu')]])
- libvirt_conn.LibvirtConnection._conn.lookupByID(mox.IgnoreArg()).\
- AndReturn(vdmock)
- libvirt_conn.LibvirtConnection._conn.lookupByID(mox.IgnoreArg()).\
- AndReturn(vdmock)
+ arg = mox.IgnoreArg()
+ libvirt_conn.LibvirtConnection._conn.lookupByID(arg).AndReturn(vdmock)
+ libvirt_conn.LibvirtConnection._conn.lookupByID(arg).AndReturn(vdmock)
self.mox.ReplayAll()
conn = libvirt_conn.LibvirtConnection(False)
self.assertTrue(conn.get_vcpu_used() == 4)
- def test_get_memory_mb_used(self):
- """Check if get_memory_mb returns appropriate memory value."""
- try:
- self._driver_dependant_test_setup()
- except:
- return
-
- self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
- self.assertTrue(0 < conn.get_memory_mb_used())
-
- def test_get_cpu_info_works_correctly(self):
- """Check if get_cpu_info works correctly as expected."""
- xml = """<cpu>
- <arch>x86_64</arch>
- <model>Nehalem</model>
- <vendor>Intel</vendor>
- <topology sockets='2' cores='4' threads='2'/>
- <feature name='rdtscp'/>
- <feature name='dca'/>
- <feature name='xtpr'/>
- <feature name='tm2'/>
- <feature name='est'/>
- <feature name='vmx'/>
- <feature name='ds_cpl'/>
- <feature name='monitor'/>
- <feature name='pbe'/>
- <feature name='tm'/>
- <feature name='ht'/>
- <feature name='ss'/>
- <feature name='acpi'/>
- <feature name='ds'/>
- <feature name='vme'/>
- </cpu>
- """
-
- try:
- self._driver_dependant_test_setup()
- except:
- return
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection,
- '_conn', use_mock_anything=True)
- libvirt_conn.LibvirtConnection._conn.getCapabilities().AndReturn(xml)
-
- self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
- self.assertTrue(0 < len(conn.get_cpu_info()))
-
def test_get_cpu_info_inappropreate_xml(self):
"""Raise exception if given xml is inappropriate."""
+ conn = self.libvirt_dependant_setup()
+ if not conn:
+ return
+
xml = """<cccccpu>
<arch>x86_64</arch>
<model>Nehalem</model>
@@ -374,16 +289,10 @@ class LibvirtConnTestCase(test.TestCase):
</cccccpu>
"""
- try:
- self._driver_dependant_test_setup()
- except:
- return
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection,
- '_conn', use_mock_anything=True)
- libvirt_conn.LibvirtConnection._conn.getCapabilities().AndReturn(xml)
+ self.mox.StubOutWithMock(conn._conn, 'getCapabilities')
+ conn._conn.getCapabilities().AndReturn(xml)
self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
try:
conn.get_cpu_info()
except exception.Invalid, e:
@@ -392,6 +301,9 @@ class LibvirtConnTestCase(test.TestCase):
def test_get_cpu_info_inappropreate_xml2(self):
"""Raise exception if given xml is inappropriate(topology tag)."""
+ conn = self.libvirt_dependant_setup()
+ if not conn:
+ return
xml = """<cpu>
<arch>x86_64</arch>
@@ -414,16 +326,10 @@ class LibvirtConnTestCase(test.TestCase):
<feature name='vme'/>
</cpu>
"""
- try:
- self._driver_dependant_test_setup()
- except:
- return
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection,
- '_conn', use_mock_anything=True)
- libvirt_conn.LibvirtConnection._conn.getCapabilities().AndReturn(xml)
+ self.mox.StubOutWithMock(conn._conn, 'getCapabilities')
+ conn._conn.getCapabilities().AndReturn(xml)
self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
try:
conn.get_cpu_info()
except exception.Invalid, e:
@@ -432,109 +338,86 @@ class LibvirtConnTestCase(test.TestCase):
def test_update_available_resource_works_correctly(self):
"""Confirm compute_service table is updated successfully."""
- try:
- self._driver_dependant_test_setup()
- except:
+ conn = self.libvirt_dependant_setup()
+ if not conn:
return
- def dic_key_check(dic):
- validkey = ['vcpus', 'memory_mb', 'local_gb',
- 'vcpus_used', 'memory_mb_used', 'local_gb_used',
- 'hypervisor_type', 'hypervisor_version', 'cpu_info']
- return (list(set(validkey)) == list(set(dic.keys())))
-
- host = 'foo'
- binary = 'nova-compute'
- service_ref = {'id': 1,
- 'host': host,
- 'binary': binary,
- 'topic': 'compute'}
-
- self.mox.StubOutWithMock(db, 'service_get_all_by_topic')
- db.service_get_all_by_topic(mox.IgnoreMox(), 'compute').\
- AndReturn([service_ref])
- dbmock.service_update(mox.IgnoreArg(),
- service_ref['id'],
- mox.Func(dic_key_check))
-
- self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
- conn.update_available_resource(host)
+ host = 'dummy'
+ zone = 'dummyzone'
+ ctxt = context.get_admin_context()
+ org_path = FLAGS.instances_path = ''
+ FLAGS.instances_path = '.'
+
+ service_ref = db.service_create(ctxt,
+ {'host': host,
+ 'binary': 'nova-compute',
+ 'topic': 'compute',
+ 'report_count': 0,
+ 'availability_zone': zone})
+ conn.update_available_resource(ctxt, host)
+
+ service_ref = db.service_get(ctxt, service_ref['id'])
+ print service_ref['compute_service']
+ compute_service = service_ref['compute_service'][0]
+ c1 = (compute_service['vcpus'] > 0)
+ c2 = (compute_service['memory_mb'] > 0)
+ c3 = (compute_service['local_gb'] > 0)
+ # vcpu_used is checked at test_get_vcpu_used.
+ c4 = (compute_service['memory_mb_used'] > 0)
+ c5 = (compute_service['local_gb_used'] > 0)
+ c6 = (len(compute_service['hypervisor_type']) > 0)
+ c7 = (compute_service['hypervisor_version'] > 0)
+
+ self.assertTrue(c1 and c2 and c3 and c4 and c5 and c6 and c7)
+
+ db.service_destroy(ctxt, service_ref['id'])
+ FLAGS.instances_path = org_path
def test_update_resource_info_raise_exception(self):
"""Raise exception if no recorde found on services table."""
- try:
- self._driver_dependant_test_setup()
- except:
- return
-
- host = 'foo'
- binary = 'nova-compute'
- dbmock = self.mox.CreateMock(db)
- self.mox.StubOutWithMock(db, 'service_get_all_by_topic')
- db.service_get_all_by_topic(mox.IgnoreMox(), 'compute').\
- AndRaise(exceptin.NotFound())
-
- self.mox.ReplayAll()
+ host = 'dummy'
+ org_path = FLAGS.instances_path = ''
+ FLAGS.instances_path = '.'
try:
conn = libvirt_conn.LibvirtConnection(False)
- conn.update_available_resource(host)
+ conn.update_available_resource(context.get_admin_context(), host)
except exception.Invalid, e:
- msg = 'Cannot insert compute manager specific info'
+ msg = 'Cannot update compute manager specific info'
c1 = (0 <= e.message.find(msg))
- self.assertTrue(c1)
+ self.assertTrue(c1)
+ FLAGS.instances_path = org_path
def test_compare_cpu_works_correctly(self):
"""Calling libvirt.compute_cpu() and works correctly."""
- t = {}
- t['arch'] = 'x86'
- t['model'] = 'model'
- t['vendor'] = 'Intel'
- t['topology'] = {'cores': "2", "threads": "1", "sockets": "4"}
- t['features'] = ["tm"]
- cpu_info = utils.dumps(t)
-
- try:
- self._driver_dependant_test_setup()
- except:
- return
-
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection,
- '_conn',
- use_mock_anything=True)
- libvirt_conn.LibvirtConnection._conn.compareCPU(mox.IgnoreArg(),
- 0).AndReturn(1)
-
- self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
- self.assertTrue(None == conn.compare_cpu(cpu_info))
-
- def test_compare_cpu_raises_exception(self):
- """Libvirt-related exception occurs when calling compare_cpu()."""
- t = {}
- t['arch'] = 'x86'
- t['model'] = 'model'
- t['vendor'] = 'Intel'
- t['topology'] = {'cores': "2", "threads": "1", "sockets": "4"}
- t['features'] = ["tm"]
- cpu_info = utils.dumps(t)
-
- try:
- self._driver_dependant_test_setup()
- except:
+ conn = self.libvirt_dependant_setup()
+ if not conn:
return
+ host = 'dummy'
+ zone = 'dummyzone'
+ ctxt = context.get_admin_context()
+ org_path = FLAGS.instances_path = ''
+ FLAGS.instances_path = '.'
+
+ service_ref = db.service_create(ctxt,
+ {'host': host,
+ 'binary': 'nova-compute',
+ 'topic': 'compute',
+ 'report_count': 0,
+ 'availability_zone': zone})
+ conn.update_available_resource(ctxt, host)
+ service_ref = db.service_get(ctxt, service_ref['id'])
+ ret = conn.compare_cpu(service_ref['compute_service'][0]['cpu_info'])
+ self.assertTrue(ret == None)
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn',
- use_mock_anything=True)
- libvirt_conn.LibvirtConnection._conn.compareCPU(mox.IgnoreArg(), 0).\
- AndRaise(libvirt.libvirtError('ERR'))
-
- self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
- self.assertRaises(libvirt.libvirtError, conn.compare_cpu, cpu_info)
+ db.service_destroy(ctxt, service_ref['id'])
+ FLAGS.instances_path = org_path
def test_compare_cpu_no_compatibility(self):
"""Libvirt.compare_cpu() return less than 0.(no compatibility)."""
+ conn = self.libvirt_dependant_setup()
+ if not conn:
+ return
+
t = {}
t['arch'] = 'x86'
t['model'] = 'model'
@@ -542,71 +425,66 @@ class LibvirtConnTestCase(test.TestCase):
t['topology'] = {'cores': "2", "threads": "1", "sockets": "4"}
t['features'] = ["tm"]
cpu_info = utils.dumps(t)
-
- try:
- self._driver_dependant_test_setup()
- except:
- return
-
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn',
- use_mock_anything=True)
- libvirt_conn.LibvirtConnection._conn.compareCPU(mox.IgnoreArg(), 0).\
- AndRaise(exception.Invalid('ERR'))
+ self.mox.StubOutWithMock(conn._conn, 'compareCPU')
+ conn._conn.compareCPU(mox.IgnoreArg(), 0).AndReturn(0)
self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
self.assertRaises(exception.Invalid, conn.compare_cpu, cpu_info)
def test_ensure_filtering_rules_for_instance_works_correctly(self):
"""ensure_filtering_rules_for_instance() works successfully."""
+ conn = self.libvirt_dependant_setup()
+ if not conn:
+ return
+
instance_ref = models.Instance()
instance_ref.__setitem__('id', 1)
+ fwdriver = conn.firewall_driver
- try:
- nwmock, fwmock = self._driver_dependant_test_setup()
- except:
- return
-
- nwmock.setup_basic_filtering(mox.IgnoreArg())
- fwmock.prepare_instance_filter(instance_ref)
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn',
- use_mock_anything=True)
+ self.mox.StubOutWithMock(fwdriver, 'setup_basic_filtering')
+ fwdriver.setup_basic_filtering(instance_ref)
+ self.mox.StubOutWithMock(fwdriver, 'prepare_instance_filter')
+ fwdriver.prepare_instance_filter(instance_ref)
+ self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn')
n = 'nova-instance-%s' % instance_ref.name
- libvirt_conn.LibvirtConnection._conn.nwfilterLookupByName(n)
+ conn._conn.nwfilterLookupByName(n)
self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
conn.ensure_filtering_rules_for_instance(instance_ref)
def test_ensure_filtering_rules_for_instance_timeout(self):
"""ensure_filtering_fules_for_instance() finishes with timeout."""
+ conn = self.libvirt_dependant_setup()
+ if not conn:
+ return
+
instance_ref = models.Instance()
instance_ref.__setitem__('id', 1)
+ fwdriver = conn.firewall_driver
- try:
- nwmock, fwmock = self._driver_dependant_test_setup()
- except:
- return
-
- nwmock.setup_basic_filtering(mox.IgnoreArg())
- fwmock.prepare_instance_filter(instance_ref)
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn',
- use_mock_anything=True)
+ self.mox.StubOutWithMock(fwdriver, 'setup_basic_filtering')
+ fwdriver.setup_basic_filtering(instance_ref)
+ self.mox.StubOutWithMock(fwdriver, 'prepare_instance_filter')
+ fwdriver.prepare_instance_filter(instance_ref)
+ self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn')
n = 'nova-instance-%s' % instance_ref.name
- for i in range(FLAGS.live_migration_timeout_sec * 2):
- libvirt_conn.LibvirtConnection._conn.\
- nwfilterLookupByName(n).AndRaise(libvirt.libvirtError('ERR'))
+ for i in range(FLAGS.live_migration_retry_count):
+ conn._conn.nwfilterLookupByName(n).\
+ AndRaise(libvirt.libvirtError('ERR'))
self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
try:
conn.ensure_filtering_rules_for_instance(instance_ref)
except exception.Error, e:
c1 = (0 <= e.message.find('Timeout migrating for'))
- self.assertTrue(c1)
+ self.assertTrue(c1)
def test_live_migration_works_correctly(self):
"""_live_migration() works as expected correctly."""
+ conn = self.libvirt_dependant_setup()
+ if not conn:
+ return
+
class dummyCall(object):
f = None
@@ -615,76 +493,57 @@ class LibvirtConnTestCase(test.TestCase):
i_ref = models.Instance()
i_ref.__setitem__('id', 1)
- i_ref.__setitem__('host', 'dummy')
ctxt = context.get_admin_context()
- try:
- self._driver_dependant_test_setup()
- except:
- return
-
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn',
- use_mock_anything=True)
vdmock = self.mox.CreateMock(libvirt.virDomain)
- self.mox.StubOutWithMock(vdmock, "migrateToURI",
- use_mock_anything=True)
- vdmock.migrateToURI(FLAGS.live_migration_uri % i_ref['host'],
+ self.mox.StubOutWithMock(vdmock, "migrateToURI")
+ vdmock.migrateToURI(FLAGS.live_migration_uri % 'dest',
mox.IgnoreArg(),
None, FLAGS.live_migration_bandwidth).\
AndReturn(None)
- libvirt_conn.LibvirtConnection._conn.lookupByName(i_ref.name).\
- AndReturn(vdmock)
+ self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn')
+ conn._conn.lookupByName(i_ref.name).AndReturn(vdmock)
+ self.mox.StubOutWithMock(libvirt_conn.utils, 'LoopingCall')
libvirt_conn.utils.LoopingCall(f=None).AndReturn(dummyCall())
self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
- # Not setting post_method/recover_method in this testcase.
- ret = conn._live_migration(ctxt, i_ref, i_ref['host'], '', '')
+ # Nothing to do with setting post_method/recover_method or not.
+ ret = conn._live_migration(ctxt, i_ref, 'dest', '', '')
self.assertTrue(ret == None)
def test_live_migration_raises_exception(self):
"""Confirms recover method is called when exceptions are raised."""
+ conn = self.libvirt_dependant_setup()
+ if not conn:
+ return
+
i_ref = models.Instance()
i_ref.__setitem__('id', 1)
- i_ref.__setitem__('host', 'dummy')
ctxt = context.get_admin_context()
- def dummy_recover_method(self, c, instance):
+ def dummy_recover_method(c, instance, host=None):
pass
- try:
- nwmock, fwmock = self._driver_dependant_test_setup()
- except:
- return
-
- self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn',
- use_mock_anything=True)
vdmock = self.mox.CreateMock(libvirt.virDomain)
- self.mox.StubOutWithMock(vdmock, "migrateToURI",
- use_mock_anything=True)
- vdmock.migrateToURI(FLAGS.live_migration_uri % dest, mox.IgnoreArg(),
+ self.mox.StubOutWithMock(vdmock, "migrateToURI")
+ vdmock.migrateToURI(FLAGS.live_migration_uri % 'dest',
+ mox.IgnoreArg(),
None, FLAGS.live_migration_bandwidth).\
AndRaise(libvirt.libvirtError('ERR'))
- libvirt_conn.LibvirtConnection._conn.lookupByName(instance_ref.name).\
- AndReturn(vdmock)
- self.mox.StubOutWithMock(db, 'instance_set_state')
- db.instance_set_state(ctxt, instance_ref['id'],
- power_state.RUNNING, 'running')
- self.mox.StubOutWithMock(db, 'volume_update')
- for v in instance_ref.volumes:
- db.volume_update(ctxt, v['id'], {'status': 'in-use'})
+ self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn')
+ conn._conn.lookupByName(i_ref.name).AndReturn(vdmock)
self.mox.ReplayAll()
- conn = libvirt_conn.LibvirtConnection(False)
self.assertRaises(libvirt.libvirtError,
- conn._mlive_migration,
- ctxt, instance_ref, dest,
+ conn._live_migration,
+ ctxt, i_ref, 'dest',
'', dummy_recover_method)
def tearDown(self):
- super(LibvirtConnTestCase, self).tearDown()
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+ super(LibvirtConnTestCase, self).tearDown()
+ self.libvirt_dependant_teardown()
class IptablesFirewallTestCase(test.TestCase):
@@ -841,6 +700,7 @@ class IptablesFirewallTestCase(test.TestCase):
'--dports 80:81 -j ACCEPT' % security_group_chain \
in self.out_rules,
"TCP port 80/81 acceptance rule wasn't added")
+ db.instance_destroy(admin_ctxt, instance_ref['id'])
class NWFilterTestCase(test.TestCase):
@@ -864,6 +724,7 @@ class NWFilterTestCase(test.TestCase):
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+ super(NWFilterTestCase, self).tearDown()
def test_cidr_rule_nwfilter_xml(self):
cloud_controller = cloud.CloudController()
@@ -990,3 +851,4 @@ class NWFilterTestCase(test.TestCase):
self.fw.apply_instance_filter(instance)
_ensure_all_called()
self.teardown_security_group()
+ db.instance_destroy(admin_ctxt, instance_ref['id'])
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 6b8efc9d8..2cbe58aab 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -167,6 +167,7 @@ class XenAPIVMTestCase(test.TestCase):
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
stubs.stubout_get_this_vm_uuid(self.stubs)
stubs.stubout_stream_disk(self.stubs)
+ stubs.stubout_is_vdi_pv(self.stubs)
self.stubs.Set(VMOps, 'reset_network', reset_network)
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 624995ada..4fec2bd75 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -130,6 +130,12 @@ def stubout_stream_disk(stubs):
stubs.Set(vm_utils, '_stream_disk', f)
+def stubout_is_vdi_pv(stubs):
+ def f(_1):
+ return False
+ stubs.Set(vm_utils, '_is_vdi_pv', f)
+
+
class FakeSessionForVMTests(fake.SessionBase):
""" Stubs out a XenAPISession for VM tests """
def __init__(self, uri):