summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorNachi Ueno <nati.ueno@gmail.com>2011-01-12 17:47:54 +0900
committerNachi Ueno <nati.ueno@gmail.com>2011-01-12 17:47:54 +0900
commit6a4b4f0767f8518e57384ff88efafaa853d642a4 (patch)
tree3a734da93c2161533dae571f862001cfc626cba9 /nova/tests
parent7a6b7c32ed25d1edc58b924ce5621dc0d8de9686 (diff)
parent78882d496b94915b8a6e2f2edce13e8129299982 (diff)
Merged with r548
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/openstack/fakes.py4
-rw-r--r--nova/tests/api/openstack/test_images.py3
-rw-r--r--nova/tests/api/openstack/test_servers.py39
-rw-r--r--nova/tests/api/openstack/test_shared_ip_groups.py (renamed from nova/tests/api/openstack/test_sharedipgroups.py)2
-rw-r--r--nova/tests/hyperv_unittest.py71
-rw-r--r--nova/tests/objectstore_unittest.py2
-rw-r--r--nova/tests/test_access.py1
-rw-r--r--nova/tests/test_auth.py9
-rw-r--r--nova/tests/test_cloud.py53
-rw-r--r--nova/tests/test_compute.py37
-rw-r--r--nova/tests/test_console.py129
-rw-r--r--nova/tests/test_log.py110
-rw-r--r--nova/tests/test_network.py9
-rw-r--r--nova/tests/test_quota.py14
-rw-r--r--nova/tests/test_rpc.py11
-rw-r--r--nova/tests/test_virt.py142
-rw-r--r--nova/tests/test_volume.py6
-rw-r--r--nova/tests/test_xenapi.py11
-rw-r--r--nova/tests/xenapi/stubs.py24
19 files changed, 591 insertions, 86 deletions
diff --git a/nova/tests/api/openstack/fakes.py b/nova/tests/api/openstack/fakes.py
index 961431154..194304e79 100644
--- a/nova/tests/api/openstack/fakes.py
+++ b/nova/tests/api/openstack/fakes.py
@@ -107,13 +107,13 @@ def stub_out_rate_limiting(stubs):
def stub_out_networking(stubs):
def get_my_ip():
return '127.0.0.1'
- stubs.Set(nova.utils, 'get_my_ip', get_my_ip)
+ stubs.Set(nova.flags, '_get_my_ip', get_my_ip)
def stub_out_compute_api_snapshot(stubs):
def snapshot(self, context, instance_id, name):
return 123
- stubs.Set(nova.compute.api.ComputeAPI, 'snapshot', snapshot)
+ stubs.Set(nova.compute.API, 'snapshot', snapshot)
def stub_out_glance(stubs, initial_fixtures=[]):
diff --git a/nova/tests/api/openstack/test_images.py b/nova/tests/api/openstack/test_images.py
index 0f274bd15..00ca739a5 100644
--- a/nova/tests/api/openstack/test_images.py
+++ b/nova/tests/api/openstack/test_images.py
@@ -22,7 +22,6 @@ and as a WSGI layer
import json
import datetime
-import logging
import unittest
import stubout
@@ -173,6 +172,7 @@ class ImageControllerWithGlanceServiceTest(unittest.TestCase):
IMAGE_FIXTURES = [
{'id': '23g2ogk23k4hhkk4k42l',
+ 'imageId': '23g2ogk23k4hhkk4k42l',
'name': 'public image #1',
'created_at': str(datetime.datetime.utcnow()),
'updated_at': str(datetime.datetime.utcnow()),
@@ -182,6 +182,7 @@ class ImageControllerWithGlanceServiceTest(unittest.TestCase):
'status': 'available',
'image_type': 'kernel'},
{'id': 'slkduhfas73kkaskgdas',
+ 'imageId': 'slkduhfas73kkaskgdas',
'name': 'public image #2',
'created_at': str(datetime.datetime.utcnow()),
'updated_at': str(datetime.datetime.utcnow()),
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 70ff714e6..0396daf98 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -56,8 +56,8 @@ def instance_address(context, instance_id):
def stub_instance(id, user_id=1):
- return Instance(id=int(id) + 123456, state=0, image_id=10, user_id=user_id,
- display_name='server%s' % id, internal_id=id)
+ return Instance(id=id, state=0, image_id=10, user_id=user_id,
+ display_name='server%s' % id)
def fake_compute_api(cls, req, id):
@@ -76,8 +76,7 @@ class ServersTest(unittest.TestCase):
fakes.stub_out_key_pair_funcs(self.stubs)
fakes.stub_out_image_service(self.stubs)
self.stubs.Set(nova.db.api, 'instance_get_all', return_servers)
- self.stubs.Set(nova.db.api, 'instance_get_by_internal_id',
- return_server)
+ self.stubs.Set(nova.db.api, 'instance_get_by_id', return_server)
self.stubs.Set(nova.db.api, 'instance_get_all_by_user',
return_servers)
self.stubs.Set(nova.db.api, 'instance_add_security_group',
@@ -87,18 +86,12 @@ class ServersTest(unittest.TestCase):
instance_address)
self.stubs.Set(nova.db.api, 'instance_get_floating_address',
instance_address)
- self.stubs.Set(nova.compute.api.ComputeAPI, 'pause',
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, 'unpause',
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, 'suspend',
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, 'resume',
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, "get_diagnostics",
- fake_compute_api)
- self.stubs.Set(nova.compute.api.ComputeAPI, "get_actions",
- fake_compute_api)
+ self.stubs.Set(nova.compute.API, 'pause', fake_compute_api)
+ self.stubs.Set(nova.compute.API, 'unpause', fake_compute_api)
+ self.stubs.Set(nova.compute.API, 'suspend', fake_compute_api)
+ self.stubs.Set(nova.compute.API, 'resume', fake_compute_api)
+ self.stubs.Set(nova.compute.API, "get_diagnostics", fake_compute_api)
+ self.stubs.Set(nova.compute.API, "get_actions", fake_compute_api)
self.allow_admin = FLAGS.allow_admin_api
def tearDown(self):
@@ -109,7 +102,7 @@ class ServersTest(unittest.TestCase):
req = webob.Request.blank('/v1.0/servers/1')
res = req.get_response(nova.api.API('os'))
res_dict = json.loads(res.body)
- self.assertEqual(res_dict['server']['id'], 1)
+ self.assertEqual(res_dict['server']['id'], '1')
self.assertEqual(res_dict['server']['name'], 'server1')
def test_get_server_list(self):
@@ -126,7 +119,7 @@ class ServersTest(unittest.TestCase):
def test_create_instance(self):
def instance_create(context, inst):
- return {'id': 1, 'internal_id': 1, 'display_name': ''}
+ return {'id': '1', 'display_name': ''}
def server_update(context, id, params):
return instance_create(context, id)
@@ -140,6 +133,12 @@ class ServersTest(unittest.TestCase):
def queue_get_for(context, *args):
return 'network_topic'
+ def kernel_ramdisk_mapping(*args, **kwargs):
+ return (1, 1)
+
+ def image_id_from_hash(*args, **kwargs):
+ return 2
+
self.stubs.Set(nova.db.api, 'project_get_network', project_get_network)
self.stubs.Set(nova.db.api, 'instance_create', instance_create)
self.stubs.Set(nova.rpc, 'cast', fake_method)
@@ -149,6 +148,10 @@ class ServersTest(unittest.TestCase):
self.stubs.Set(nova.db.api, 'queue_get_for', queue_get_for)
self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip',
fake_method)
+ self.stubs.Set(nova.api.openstack.servers.Controller,
+ "_get_kernel_ramdisk_from_image", kernel_ramdisk_mapping)
+ self.stubs.Set(nova.api.openstack.common,
+ "get_image_id_from_image_hash", image_id_from_hash)
body = dict(server=dict(
name='server_test', imageId=2, flavorId=2, metadata={},
diff --git a/nova/tests/api/openstack/test_sharedipgroups.py b/nova/tests/api/openstack/test_shared_ip_groups.py
index d199951d8..c2fc3a203 100644
--- a/nova/tests/api/openstack/test_sharedipgroups.py
+++ b/nova/tests/api/openstack/test_shared_ip_groups.py
@@ -19,7 +19,7 @@ import unittest
import stubout
-from nova.api.openstack import sharedipgroups
+from nova.api.openstack import shared_ip_groups
class SharedIpGroupsTest(unittest.TestCase):
diff --git a/nova/tests/hyperv_unittest.py b/nova/tests/hyperv_unittest.py
new file mode 100644
index 000000000..3980ae3cb
--- /dev/null
+++ b/nova/tests/hyperv_unittest.py
@@ -0,0 +1,71 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2010 Cloud.com, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Hyper-V driver
+"""
+
+import random
+
+from nova import context
+from nova import db
+from nova import flags
+from nova import test
+from nova.auth import manager
+from nova.virt import hyperv
+
+FLAGS = flags.FLAGS
+FLAGS.connection_type = 'hyperv'
+
+
+class HyperVTestCase(test.TestCase):
+ """Test cases for the Hyper-V driver"""
+ def setUp(self):
+ super(HyperVTestCase, self).setUp()
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake',
+ admin=True)
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.context = context.RequestContext(self.user, self.project)
+
+ def test_create_destroy(self):
+ """Create a VM and destroy it"""
+ instance = {'internal_id': random.randint(1, 1000000),
+ 'memory_mb': '1024',
+ 'mac_address': '02:12:34:46:56:67',
+ 'vcpus': 2,
+ 'project_id': 'fake',
+ 'instance_type': 'm1.small'}
+ instance_ref = db.instance_create(self.context, instance)
+
+ conn = hyperv.get_connection(False)
+ conn._create_vm(instance_ref) # pylint: disable-msg=W0212
+ found = [n for n in conn.list_instances()
+ if n == instance_ref['name']]
+ self.assertTrue(len(found) == 1)
+ info = conn.get_info(instance_ref['name'])
+ #Unfortunately since the vm is not running at this point,
+ #we cannot obtain memory information from get_info
+ self.assertEquals(info['num_cpu'], instance_ref['vcpus'])
+
+ conn.destroy(instance_ref)
+ found = [n for n in conn.list_instances()
+ if n == instance_ref['name']]
+ self.assertTrue(len(found) == 0)
+
+ def tearDown(self):
+ super(HyperVTestCase, self).tearDown()
+ self.manager.delete_project(self.project)
+ self.manager.delete_user(self.user)
diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py
index ceac17adb..da86e6e11 100644
--- a/nova/tests/objectstore_unittest.py
+++ b/nova/tests/objectstore_unittest.py
@@ -23,7 +23,6 @@ Unittets for S3 objectstore clone.
import boto
import glob
import hashlib
-import logging
import os
import shutil
import tempfile
@@ -63,7 +62,6 @@ class ObjectStoreTestCase(test.TestCase):
self.flags(buckets_path=os.path.join(OSS_TEMPDIR, 'buckets'),
images_path=os.path.join(OSS_TEMPDIR, 'images'),
ca_path=os.path.join(os.path.dirname(__file__), 'CA'))
- logging.getLogger().setLevel(logging.DEBUG)
self.auth_manager = manager.AuthManager()
self.auth_manager.create_user('user1')
diff --git a/nova/tests/test_access.py b/nova/tests/test_access.py
index 58fdea3b5..0929903cf 100644
--- a/nova/tests/test_access.py
+++ b/nova/tests/test_access.py
@@ -17,7 +17,6 @@
# under the License.
import unittest
-import logging
import webob
from nova import context
diff --git a/nova/tests/test_auth.py b/nova/tests/test_auth.py
index 15d40bc53..35ffffb67 100644
--- a/nova/tests/test_auth.py
+++ b/nova/tests/test_auth.py
@@ -16,17 +16,18 @@
# License for the specific language governing permissions and limitations
# under the License.
-import logging
from M2Crypto import X509
import unittest
from nova import crypto
from nova import flags
+from nova import log as logging
from nova import test
from nova.auth import manager
from nova.api.ec2 import cloud
FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.auth_unittest')
class user_generator(object):
@@ -211,12 +212,12 @@ class AuthManagerTestCase(object):
# NOTE(vish): Setup runs genroot.sh if it hasn't been run
cloud.CloudController().setup()
_key, cert_str = crypto.generate_x509_cert(user.id, project.id)
- logging.debug(cert_str)
+ LOG.debug(cert_str)
full_chain = crypto.fetch_ca(project_id=project.id, chain=True)
int_cert = crypto.fetch_ca(project_id=project.id, chain=False)
cloud_cert = crypto.fetch_ca()
- logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
+ LOG.debug("CA chain:\n\n =====\n%s\n\n=====", full_chain)
signed_cert = X509.load_cert_string(cert_str)
chain_cert = X509.load_cert_string(full_chain)
int_cert = X509.load_cert_string(int_cert)
@@ -331,7 +332,7 @@ class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
test.TestCase.__init__(self, *args, **kwargs)
import nova.auth.fakeldap as fakeldap
if FLAGS.flush_db:
- logging.info("Flushing datastore")
+ LOG.info("Flushing datastore")
r = fakeldap.Store.instance()
r.flushdb()
diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py
index 70d2c44da..b8a15c7b2 100644
--- a/nova/tests/test_cloud.py
+++ b/nova/tests/test_cloud.py
@@ -18,7 +18,6 @@
from base64 import b64decode
import json
-import logging
from M2Crypto import BIO
from M2Crypto import RSA
import os
@@ -31,6 +30,7 @@ from nova import context
from nova import crypto
from nova import db
from nova import flags
+from nova import log as logging
from nova import rpc
from nova import service
from nova import test
@@ -41,6 +41,7 @@ from nova.objectstore import image
FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.cloud')
# Temp dirs for working with image attributes through the cloud controller
# (stole this from objectstore_unittest.py)
@@ -56,7 +57,6 @@ class CloudTestCase(test.TestCase):
images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance()
- logging.getLogger().setLevel(logging.DEBUG)
# set up our cloud
self.cloud = cloud.CloudController()
@@ -106,7 +106,7 @@ class CloudTestCase(test.TestCase):
self.cloud.allocate_address(self.context)
inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
- ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
+ ec2_id = cloud.id_to_ec2_id(inst['id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
@@ -127,12 +127,29 @@ class CloudTestCase(test.TestCase):
result = self.cloud.describe_volumes(self.context)
self.assertEqual(len(result['volumeSet']), 2)
result = self.cloud.describe_volumes(self.context,
- volume_id=[vol2['ec2_id']])
+ volume_id=[vol2['id']])
self.assertEqual(len(result['volumeSet']), 1)
- self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['ec2_id'])
+ self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['id'])
db.volume_destroy(self.context, vol1['id'])
db.volume_destroy(self.context, vol2['id'])
+ def test_describe_instances(self):
+ """Makes sure describe_instances works and filters results."""
+ inst1 = db.instance_create(self.context, {'reservation_id': 'a'})
+ inst2 = db.instance_create(self.context, {'reservation_id': 'a'})
+ result = self.cloud.describe_instances(self.context)
+ result = result['reservationSet'][0]
+ self.assertEqual(len(result['instancesSet']), 2)
+ instance_id = cloud.id_to_ec2_id(inst2['id'])
+ result = self.cloud.describe_instances(self.context,
+ instance_id=[instance_id])
+ result = result['reservationSet'][0]
+ self.assertEqual(len(result['instancesSet']), 1)
+ self.assertEqual(result['instancesSet'][0]['instanceId'],
+ instance_id)
+ db.instance_destroy(self.context, inst1['id'])
+ db.instance_destroy(self.context, inst2['id'])
+
def test_console_output(self):
image_id = FLAGS.default_image
instance_type = FLAGS.default_instance_type
@@ -140,15 +157,15 @@ class CloudTestCase(test.TestCase):
kwargs = {'image_id': image_id,
'instance_type': instance_type,
'max_count': max_count}
- rv = yield self.cloud.run_instances(self.context, **kwargs)
+ rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
- output = yield self.cloud.get_console_output(context=self.context,
+ output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
- rv = yield self.cloud.terminate_instances(self.context, [instance_id])
+ rv = self.cloud.terminate_instances(self.context, [instance_id])
def test_key_generation(self):
result = self._create_key('test')
@@ -178,7 +195,7 @@ class CloudTestCase(test.TestCase):
def test_run_instances(self):
if FLAGS.connection_type == 'fake':
- logging.debug("Can't test instances without a real virtual env.")
+ LOG.debug(_("Can't test instances without a real virtual env."))
return
image_id = FLAGS.default_image
instance_type = FLAGS.default_instance_type
@@ -186,30 +203,30 @@ class CloudTestCase(test.TestCase):
kwargs = {'image_id': image_id,
'instance_type': instance_type,
'max_count': max_count}
- rv = yield self.cloud.run_instances(self.context, **kwargs)
+ rv = self.cloud.run_instances(self.context, **kwargs)
# TODO: check for proper response
instance_id = rv['reservationSet'][0].keys()[0]
instance = rv['reservationSet'][0][instance_id][0]
- logging.debug("Need to watch instance %s until it's running..." %
- instance['instance_id'])
+ LOG.debug(_("Need to watch instance %s until it's running..."),
+ instance['instance_id'])
while True:
greenthread.sleep(1)
info = self.cloud._get_instance(instance['instance_id'])
- logging.debug(info['state'])
+ LOG.debug(info['state'])
if info['state'] == power_state.RUNNING:
break
self.assert_(rv)
- if connection_type != 'fake':
+ if FLAGS.connection_type != 'fake':
time.sleep(45) # Should use boto for polling here
for reservations in rv['reservationSet']:
# for res_id in reservations.keys():
- # logging.debug(reservations[res_id])
+ # LOG.debug(reservations[res_id])
# for instance in reservations[res_id]:
for instance in reservations[reservations.keys()[0]]:
instance_id = instance['instance_id']
- logging.debug("Terminating instance %s" % instance_id)
- rv = yield self.compute.terminate_instance(instance_id)
+ LOG.debug(_("Terminating instance %s"), instance_id)
+ rv = self.compute.terminate_instance(instance_id)
def test_instance_update_state(self):
def instance(num):
@@ -296,7 +313,7 @@ class CloudTestCase(test.TestCase):
def test_update_of_instance_display_fields(self):
inst = db.instance_create(self.context, {})
- ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
+ ec2_id = cloud.id_to_ec2_id(inst['id'])
self.cloud.update_instance(self.context, ec2_id,
display_name='c00l 1m4g3')
inst = db.instance_get(self.context, inst['id'])
diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py
index 1fb9143f1..1d407c5a3 100644
--- a/nova/tests/test_compute.py
+++ b/nova/tests/test_compute.py
@@ -20,31 +20,31 @@ Tests For Compute
"""
import datetime
-import logging
+from nova import compute
from nova import context
from nova import db
from nova import exception
from nova import flags
+from nova import log as logging
from nova import test
from nova import utils
from nova.auth import manager
-from nova.compute import api as compute_api
FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.compute')
class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
- logging.getLogger().setLevel(logging.DEBUG)
super(ComputeTestCase, self).setUp()
self.flags(connection_type='fake',
stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
- self.compute_api = compute_api.ComputeAPI()
+ self.compute_api = compute.API()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake')
self.project = self.manager.create_project('fake', 'fake', 'fake')
@@ -72,7 +72,7 @@ class ComputeTestCase(test.TestCase):
"""Verify that an instance cannot be created without a display_name."""
cases = [dict(), dict(display_name=None)]
for instance in cases:
- ref = self.compute_api.create_instances(self.context,
+ ref = self.compute_api.create(self.context,
FLAGS.default_instance_type, None, **instance)
try:
self.assertNotEqual(ref[0].display_name, None)
@@ -80,13 +80,13 @@ class ComputeTestCase(test.TestCase):
db.instance_destroy(self.context, ref[0]['id'])
def test_create_instance_associates_security_groups(self):
- """Make sure create_instances associates security groups"""
+ """Make sure create associates security groups"""
values = {'name': 'default',
'description': 'default',
'user_id': self.user.id,
'project_id': self.project.id}
group = db.security_group_create(self.context, values)
- ref = self.compute_api.create_instances(self.context,
+ ref = self.compute_api.create(self.context,
FLAGS.default_instance_type, None, security_group=['default'])
try:
self.assertEqual(len(ref[0]['security_groups']), 1)
@@ -101,13 +101,13 @@ class ComputeTestCase(test.TestCase):
self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
- logging.info(_("Running instances: %s"), instances)
+ LOG.info(_("Running instances: %s"), instances)
self.assertEqual(len(instances), 1)
self.compute.terminate_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
- logging.info(_("After terminating instances: %s"), instances)
+ LOG.info(_("After terminating instances: %s"), instances)
self.assertEqual(len(instances), 0)
def test_run_terminate_timestamps(self):
@@ -178,3 +178,22 @@ class ComputeTestCase(test.TestCase):
self.context,
instance_id)
self.compute.terminate_instance(self.context, instance_id)
+
+ def test_lock(self):
+ """ensure locked instance cannot be changed"""
+ instance_id = self._create_instance()
+ self.compute.run_instance(self.context, instance_id)
+
+ non_admin_context = context.RequestContext(None, None, False, False)
+
+ # decorator should return False (fail) with locked nonadmin context
+ self.compute.lock_instance(self.context, instance_id)
+ ret_val = self.compute.reboot_instance(non_admin_context, instance_id)
+ self.assertEqual(ret_val, False)
+
+ # decorator should return None (success) with unlocked nonadmin context
+ self.compute.unlock_instance(self.context, instance_id)
+ ret_val = self.compute.reboot_instance(non_admin_context, instance_id)
+ self.assertEqual(ret_val, None)
+
+ self.compute.terminate_instance(self.context, instance_id)
diff --git a/nova/tests/test_console.py b/nova/tests/test_console.py
new file mode 100644
index 000000000..31b5ca79c
--- /dev/null
+++ b/nova/tests/test_console.py
@@ -0,0 +1,129 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Openstack, LLC.
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests For Console proxy.
+"""
+
+import datetime
+import logging
+
+from nova import context
+from nova import db
+from nova import exception
+from nova import flags
+from nova import test
+from nova import utils
+from nova.auth import manager
+from nova.console import manager as console_manager
+
+FLAGS = flags.FLAGS
+
+
+class ConsoleTestCase(test.TestCase):
+ """Test case for console proxy"""
+ def setUp(self):
+ logging.getLogger().setLevel(logging.DEBUG)
+ super(ConsoleTestCase, self).setUp()
+ self.flags(console_driver='nova.console.fake.FakeConsoleProxy',
+ stub_compute=True)
+ self.console = utils.import_object(FLAGS.console_manager)
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake')
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.context = context.get_admin_context()
+ self.host = 'test_compute_host'
+
+ def tearDown(self):
+ self.manager.delete_user(self.user)
+ self.manager.delete_project(self.project)
+ super(ConsoleTestCase, self).tearDown()
+
+ def _create_instance(self):
+ """Create a test instance"""
+ inst = {}
+ #inst['host'] = self.host
+ #inst['name'] = 'instance-1234'
+ inst['image_id'] = 'ami-test'
+ inst['reservation_id'] = 'r-fakeres'
+ inst['launch_time'] = '10'
+ inst['user_id'] = self.user.id
+ inst['project_id'] = self.project.id
+ inst['instance_type'] = 'm1.tiny'
+ inst['mac_address'] = utils.generate_mac()
+ inst['ami_launch_index'] = 0
+ return db.instance_create(self.context, inst)['id']
+
+ def test_get_pool_for_instance_host(self):
+ pool = self.console.get_pool_for_instance_host(self.context, self.host)
+ self.assertEqual(pool['compute_host'], self.host)
+
+ def test_get_pool_creates_new_pool_if_needed(self):
+ self.assertRaises(exception.NotFound,
+ db.console_pool_get_by_host_type,
+ self.context,
+ self.host,
+ self.console.host,
+ self.console.driver.console_type)
+ pool = self.console.get_pool_for_instance_host(self.context,
+ self.host)
+ pool2 = db.console_pool_get_by_host_type(self.context,
+ self.host,
+ self.console.host,
+ self.console.driver.console_type)
+ self.assertEqual(pool['id'], pool2['id'])
+
+ def test_get_pool_does_not_create_new_pool_if_exists(self):
+ pool_info = {'address': '127.0.0.1',
+ 'username': 'test',
+ 'password': '1234pass',
+ 'host': self.console.host,
+ 'console_type': self.console.driver.console_type,
+ 'compute_host': 'sometesthostname'}
+ new_pool = db.console_pool_create(self.context, pool_info)
+ pool = self.console.get_pool_for_instance_host(self.context,
+ 'sometesthostname')
+ self.assertEqual(pool['id'], new_pool['id'])
+
+ def test_add_console(self):
+ instance_id = self._create_instance()
+ self.console.add_console(self.context, instance_id)
+ instance = db.instance_get(self.context, instance_id)
+ pool = db.console_pool_get_by_host_type(self.context,
+ instance['host'],
+ self.console.host,
+ self.console.driver.console_type)
+
+ console_instances = [con['instance_id'] for con in pool.consoles]
+ self.assert_(instance_id in console_instances)
+
+ def test_add_console_does_not_duplicate(self):
+ instance_id = self._create_instance()
+ cons1 = self.console.add_console(self.context, instance_id)
+ cons2 = self.console.add_console(self.context, instance_id)
+ self.assertEqual(cons1, cons2)
+
+ def test_remove_console(self):
+ instance_id = self._create_instance()
+ console_id = self.console.add_console(self.context, instance_id)
+ self.console.remove_console(self.context, console_id)
+
+ self.assertRaises(exception.NotFound,
+ db.console_get,
+ self.context,
+ console_id)
diff --git a/nova/tests/test_log.py b/nova/tests/test_log.py
new file mode 100644
index 000000000..beb1d97cf
--- /dev/null
+++ b/nova/tests/test_log.py
@@ -0,0 +1,110 @@
+import cStringIO
+
+from nova import context
+from nova import log
+from nova import test
+
+
+def _fake_context():
+ return context.RequestContext(1, 1)
+
+
+class RootLoggerTestCase(test.TrialTestCase):
+ def setUp(self):
+ super(RootLoggerTestCase, self).setUp()
+ self.log = log.logging.root
+
+ def tearDown(self):
+ super(RootLoggerTestCase, self).tearDown()
+ log.NovaLogger.manager.loggerDict = {}
+
+ def test_is_nova_instance(self):
+ self.assert_(isinstance(self.log, log.NovaLogger))
+
+ def test_name_is_nova_root(self):
+ self.assertEqual("nova.root", self.log.name)
+
+ def test_handlers_have_nova_formatter(self):
+ formatters = []
+ for h in self.log.handlers:
+ f = h.formatter
+ if isinstance(f, log.NovaFormatter):
+ formatters.append(f)
+ self.assert_(formatters)
+ self.assertEqual(len(formatters), len(self.log.handlers))
+
+ def test_handles_context_kwarg(self):
+ self.log.info("foo", context=_fake_context())
+ self.assert_(True) # didn't raise exception
+
+ def test_module_level_methods_handle_context_arg(self):
+ log.info("foo", context=_fake_context())
+ self.assert_(True) # didn't raise exception
+
+ def test_module_level_audit_handles_context_arg(self):
+ log.audit("foo", context=_fake_context())
+ self.assert_(True) # didn't raise exception
+
+
+class NovaFormatterTestCase(test.TrialTestCase):
+ def setUp(self):
+ super(NovaFormatterTestCase, self).setUp()
+ self.flags(logging_context_format_string="HAS CONTEXT "\
+ "[%(request_id)s]: %(message)s",
+ logging_default_format_string="NOCTXT: %(message)s",
+ logging_debug_format_suffix="--DBG")
+ self.log = log.logging.root
+ self.stream = cStringIO.StringIO()
+ handler = log.StreamHandler(self.stream)
+ self.log.addHandler(handler)
+ self.log.setLevel(log.DEBUG)
+
+ def tearDown(self):
+ super(NovaFormatterTestCase, self).tearDown()
+ log.NovaLogger.manager.loggerDict = {}
+
+ def test_uncontextualized_log(self):
+ self.log.info("foo")
+ self.assertEqual("NOCTXT: foo\n", self.stream.getvalue())
+
+ def test_contextualized_log(self):
+ ctxt = _fake_context()
+ self.log.info("bar", context=ctxt)
+ expected = "HAS CONTEXT [%s]: bar\n" % ctxt.request_id
+ self.assertEqual(expected, self.stream.getvalue())
+
+ def test_debugging_log(self):
+ self.log.debug("baz")
+ self.assertEqual("NOCTXT: baz --DBG\n", self.stream.getvalue())
+
+
+class NovaLoggerTestCase(test.TrialTestCase):
+ def setUp(self):
+ super(NovaLoggerTestCase, self).setUp()
+ self.flags(default_log_levels=["nova-test=AUDIT"], verbose=False)
+ self.log = log.getLogger('nova-test')
+
+ def tearDown(self):
+ super(NovaLoggerTestCase, self).tearDown()
+ log.NovaLogger.manager.loggerDict = {}
+
+ def test_has_level_from_flags(self):
+ self.assertEqual(log.AUDIT, self.log.level)
+
+ def test_child_log_has_level_of_parent_flag(self):
+ l = log.getLogger('nova-test.foo')
+ self.assertEqual(log.AUDIT, l.level)
+
+
+class VerboseLoggerTestCase(test.TrialTestCase):
+ def setUp(self):
+ super(VerboseLoggerTestCase, self).setUp()
+ self.flags(default_log_levels=["nova.test=AUDIT"], verbose=True)
+ self.log = log.getLogger('nova.test')
+
+ def tearDown(self):
+ super(VerboseLoggerTestCase, self).tearDown()
+ log.NovaLogger.manager.loggerDict = {}
+
+ def test_will_be_verbose_if_named_nova_and_verbose_flag_set(self):
+ self.assertEqual(log.DEBUG, self.log.level)
diff --git a/nova/tests/test_network.py b/nova/tests/test_network.py
index 32da6be90..407dbc769 100644
--- a/nova/tests/test_network.py
+++ b/nova/tests/test_network.py
@@ -20,18 +20,18 @@ Unit Tests for network code
"""
import IPy
import os
-import logging
from nova import context
from nova import db
from nova import exception
from nova import flags
-from nova import service
+from nova import log as logging
from nova import test
from nova import utils
from nova.auth import manager
FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.network')
class NetworkTestCase(test.TestCase):
@@ -45,7 +45,6 @@ class NetworkTestCase(test.TestCase):
fake_network=True,
network_size=16,
num_networks=5)
- logging.getLogger().setLevel(logging.DEBUG)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
self.projects = []
@@ -349,7 +348,7 @@ def lease_ip(private_ip):
'TESTING': '1',
'FLAGFILE': FLAGS.dhcpbridge_flagfile}
(out, err) = utils.execute(cmd, addl_env=env)
- logging.debug("ISSUE_IP: %s, %s ", out, err)
+ LOG.debug("ISSUE_IP: %s, %s ", out, err)
def release_ip(private_ip):
@@ -365,4 +364,4 @@ def release_ip(private_ip):
'TESTING': '1',
'FLAGFILE': FLAGS.dhcpbridge_flagfile}
(out, err) = utils.execute(cmd, addl_env=env)
- logging.debug("RELEASE_IP: %s, %s ", out, err)
+ LOG.debug("RELEASE_IP: %s, %s ", out, err)
diff --git a/nova/tests/test_quota.py b/nova/tests/test_quota.py
index 8cf2a5e54..9548a8c13 100644
--- a/nova/tests/test_quota.py
+++ b/nova/tests/test_quota.py
@@ -16,17 +16,15 @@
# License for the specific language governing permissions and limitations
# under the License.
-import logging
-
from nova import context
from nova import db
-from nova import exception
from nova import flags
from nova import quota
from nova import test
from nova import utils
from nova.auth import manager
from nova.api.ec2 import cloud
+from nova.compute import instance_types
FLAGS = flags.FLAGS
@@ -34,7 +32,6 @@ FLAGS = flags.FLAGS
class QuotaTestCase(test.TestCase):
def setUp(self):
- logging.getLogger().setLevel(logging.DEBUG)
super(QuotaTestCase, self).setUp()
self.flags(connection_type='fake',
quota_instances=2,
@@ -78,14 +75,17 @@ class QuotaTestCase(test.TestCase):
def test_quota_overrides(self):
"""Make sure overriding a projects quotas works"""
- num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ num_instances = quota.allowed_instances(self.context, 100,
+ instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 2)
db.quota_create(self.context, {'project_id': self.project.id,
'instances': 10})
- num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ num_instances = quota.allowed_instances(self.context, 100,
+ instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 4)
db.quota_update(self.context, self.project.id, {'cores': 100})
- num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
+ num_instances = quota.allowed_instances(self.context, 100,
+ instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 10)
db.quota_destroy(self.context, self.project.id)
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index 6ea2edcab..85593ab46 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -18,15 +18,16 @@
"""
Unit Tests for remote procedure calls using queue
"""
-import logging
from nova import context
from nova import flags
+from nova import log as logging
from nova import rpc
from nova import test
FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.rpc')
class RpcTestCase(test.TestCase):
@@ -85,12 +86,12 @@ class RpcTestCase(test.TestCase):
@staticmethod
def echo(context, queue, value):
"""Calls echo in the passed queue"""
- logging.debug("Nested received %s, %s", queue, value)
+ LOG.debug(_("Nested received %s, %s"), queue, value)
ret = rpc.call(context,
queue,
{"method": "echo",
"args": {"value": value}})
- logging.debug("Nested return %s", ret)
+ LOG.debug(_("Nested return %s"), ret)
return value
nested = Nested()
@@ -115,13 +116,13 @@ class TestReceiver(object):
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in"""
- logging.debug("Received %s", value)
+ LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context"""
- logging.debug("Received %s", context)
+ LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py
index 4aa489d08..afdc89ba2 100644
--- a/nova/tests/test_virt.py
+++ b/nova/tests/test_virt.py
@@ -208,8 +208,141 @@ class LibvirtConnTestCase(test.TestCase):
self.manager.delete_user(self.user)
-class NWFilterTestCase(test.TestCase):
+class IptablesFirewallTestCase(test.TestCase):
+ def setUp(self):
+ super(IptablesFirewallTestCase, self).setUp()
+ self.manager = manager.AuthManager()
+ self.user = self.manager.create_user('fake', 'fake', 'fake',
+ admin=True)
+ self.project = self.manager.create_project('fake', 'fake', 'fake')
+ self.context = context.RequestContext('fake', 'fake')
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.fw = libvirt_conn.IptablesFirewallDriver()
+
+ def tearDown(self):
+ self.manager.delete_project(self.project)
+ self.manager.delete_user(self.user)
+ super(IptablesFirewallTestCase, self).tearDown()
+
+ def _p(self, *args, **kwargs):
+ if 'iptables-restore' in args:
+ print ' '.join(args), kwargs['stdin']
+ if 'iptables-save' in args:
+ return
+
+ in_rules = [
+ '# Generated by iptables-save v1.4.4 on Mon Dec 6 11:54:13 2010',
+ '*filter',
+ ':INPUT ACCEPT [969615:281627771]',
+ ':FORWARD ACCEPT [0:0]',
+ ':OUTPUT ACCEPT [915599:63811649]',
+ ':nova-block-ipv4 - [0:0]',
+ '-A INPUT -i virbr0 -p udp -m udp --dport 53 -j ACCEPT ',
+ '-A INPUT -i virbr0 -p tcp -m tcp --dport 53 -j ACCEPT ',
+ '-A INPUT -i virbr0 -p udp -m udp --dport 67 -j ACCEPT ',
+ '-A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ',
+ '-A FORWARD -d 192.168.122.0/24 -o virbr0 -m state --state RELATED'
+ ',ESTABLISHED -j ACCEPT ',
+ '-A FORWARD -s 192.168.122.0/24 -i virbr0 -j ACCEPT ',
+ '-A FORWARD -i virbr0 -o virbr0 -j ACCEPT ',
+ '-A FORWARD -o virbr0 -j REJECT --reject-with icmp-port-unreachable ',
+ '-A FORWARD -i virbr0 -j REJECT --reject-with icmp-port-unreachable ',
+ 'COMMIT',
+ '# Completed on Mon Dec 6 11:54:13 2010',
+ ]
+
+ def test_static_filters(self):
+ self.fw.execute = self._p
+ instance_ref = db.instance_create(self.context,
+ {'user_id': 'fake',
+ 'project_id': 'fake'})
+ ip = '10.11.12.13'
+
+ network_ref = db.project_get_network(self.context,
+ 'fake')
+
+ fixed_ip = {'address': ip,
+ 'network_id': network_ref['id']}
+
+ admin_ctxt = context.get_admin_context()
+ db.fixed_ip_create(admin_ctxt, fixed_ip)
+ db.fixed_ip_update(admin_ctxt, ip, {'allocated': True,
+ 'instance_id': instance_ref['id']})
+
+ secgroup = db.security_group_create(admin_ctxt,
+ {'user_id': 'fake',
+ 'project_id': 'fake',
+ 'name': 'testgroup',
+ 'description': 'test group'})
+
+ db.security_group_rule_create(admin_ctxt,
+ {'parent_group_id': secgroup['id'],
+ 'protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'cidr': '192.168.11.0/24'})
+
+ db.security_group_rule_create(admin_ctxt,
+ {'parent_group_id': secgroup['id'],
+ 'protocol': 'icmp',
+ 'from_port': 8,
+ 'to_port': -1,
+ 'cidr': '192.168.11.0/24'})
+
+ db.security_group_rule_create(admin_ctxt,
+ {'parent_group_id': secgroup['id'],
+ 'protocol': 'tcp',
+ 'from_port': 80,
+ 'to_port': 81,
+ 'cidr': '192.168.10.0/24'})
+
+ db.instance_add_security_group(admin_ctxt, instance_ref['id'],
+ secgroup['id'])
+ instance_ref = db.instance_get(admin_ctxt, instance_ref['id'])
+
+ self.fw.add_instance(instance_ref)
+
+ out_rules = self.fw.modify_rules(self.in_rules)
+
+ in_rules = filter(lambda l: not l.startswith('#'), self.in_rules)
+ for rule in in_rules:
+ if not 'nova' in rule:
+ self.assertTrue(rule in out_rules,
+ 'Rule went missing: %s' % rule)
+
+ instance_chain = None
+ for rule in out_rules:
+ # This is pretty crude, but it'll do for now
+ if '-d 10.11.12.13 -j' in rule:
+ instance_chain = rule.split(' ')[-1]
+ break
+ self.assertTrue(instance_chain, "The instance chain wasn't added")
+
+ security_group_chain = None
+ for rule in out_rules:
+ # This is pretty crude, but it'll do for now
+ if '-A %s -j' % instance_chain in rule:
+ security_group_chain = rule.split(' ')[-1]
+ break
+ self.assertTrue(security_group_chain,
+ "The security group chain wasn't added")
+
+ self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -j ACCEPT' % \
+ security_group_chain in out_rules,
+ "ICMP acceptance rule wasn't added")
+
+ self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -m icmp --icmp-type'
+ ' 8 -j ACCEPT' % security_group_chain in out_rules,
+ "ICMP Echo Request acceptance rule wasn't added")
+
+ self.assertTrue('-A %s -p tcp -s 192.168.10.0/24 -m multiport '
+ '--dports 80:81 -j ACCEPT' % security_group_chain \
+ in out_rules,
+ "TCP port 80/81 acceptance rule wasn't added")
+
+
+class NWFilterTestCase(test.TestCase):
def setUp(self):
super(NWFilterTestCase, self).setUp()
@@ -224,7 +357,8 @@ class NWFilterTestCase(test.TestCase):
self.fake_libvirt_connection = Mock()
- self.fw = libvirt_conn.NWFilterFirewall(self.fake_libvirt_connection)
+ self.fw = libvirt_conn.NWFilterFirewall(
+ lambda: self.fake_libvirt_connection)
def tearDown(self):
self.manager.delete_project(self.project)
@@ -337,7 +471,7 @@ class NWFilterTestCase(test.TestCase):
self.security_group.id)
instance = db.instance_get(self.context, inst_id)
- self.fw.setup_base_nwfilters()
- self.fw.setup_nwfilters_for_instance(instance)
+ self.fw.setup_basic_filtering(instance)
+ self.fw.prepare_instance_filter(instance)
_ensure_all_called()
self.teardown_security_group()
diff --git a/nova/tests/test_volume.py b/nova/tests/test_volume.py
index b13455fb0..b40ca004b 100644
--- a/nova/tests/test_volume.py
+++ b/nova/tests/test_volume.py
@@ -19,23 +19,23 @@
Tests for Volume Code.
"""
-import logging
from nova import context
from nova import exception
from nova import db
from nova import flags
+from nova import log as logging
from nova import test
from nova import utils
FLAGS = flags.FLAGS
+LOG = logging.getLogger('nova.tests.volume')
class VolumeTestCase(test.TestCase):
"""Test Case for volumes."""
def setUp(self):
- logging.getLogger().setLevel(logging.DEBUG)
super(VolumeTestCase, self).setUp()
self.compute = utils.import_object(FLAGS.compute_manager)
self.flags(connection_type='fake')
@@ -159,7 +159,7 @@ class VolumeTestCase(test.TestCase):
volume_id)
self.assert_(iscsi_target not in targets)
targets.append(iscsi_target)
- logging.debug("Target %s allocated", iscsi_target)
+ LOG.debug(_("Target %s allocated"), iscsi_target)
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 33571dad0..ec9462ada 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -79,8 +79,8 @@ class XenAPIVolumeTestCase(test.TestCase):
helper = volume_utils.VolumeHelper
helper.XenAPI = session.get_imported_xenapi()
vol = self._create_volume()
- info = helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
- label = 'SR-%s' % vol['ec2_id']
+ info = helper.parse_volume_info(vol['id'], '/dev/sdc')
+ label = 'SR-%s' % vol['id']
description = 'Test-SR'
sr_ref = helper.create_iscsi_storage(session, info, label, description)
srs = xenapi_fake.get_all('SR')
@@ -97,7 +97,7 @@ class XenAPIVolumeTestCase(test.TestCase):
# oops, wrong mount point!
self.assertRaises(volume_utils.StorageError,
helper.parse_volume_info,
- vol['ec2_id'],
+ vol['id'],
'/dev/sd')
db.volume_destroy(context.get_admin_context(), vol['id'])
@@ -108,8 +108,7 @@ class XenAPIVolumeTestCase(test.TestCase):
volume = self._create_volume()
instance = db.instance_create(self.values)
xenapi_fake.create_vm(instance.name, 'Running')
- result = conn.attach_volume(instance.name, volume['ec2_id'],
- '/dev/sdc')
+ result = conn.attach_volume(instance.name, volume['id'], '/dev/sdc')
def check():
# check that the VM has a VBD attached to it
@@ -134,7 +133,7 @@ class XenAPIVolumeTestCase(test.TestCase):
self.assertRaises(Exception,
conn.attach_volume,
instance.name,
- volume['ec2_id'],
+ volume['id'],
'/dev/sdc')
def tearDown(self):
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 55f751f11..292bd9ba9 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -41,9 +41,33 @@ def stubout_instance_snapshot(stubs):
rv = done.wait()
return rv
+ def fake_loop(self):
+ pass
+
stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task',
fake_wait_for_task)
+ stubs.Set(xenapi_conn.XenAPISession, '_stop_loop', fake_loop)
+
+ from nova.virt.xenapi.fake import create_vdi
+ name_label = "instance-%s" % instance_id
+ #TODO: create fake SR record
+ sr_ref = "fakesr"
+ vdi_ref = create_vdi(name_label=name_label, read_only=False,
+ sr_ref=sr_ref, sharable=False)
+ vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
+ vdi_uuid = vdi_rec['uuid']
+ return vdi_uuid
+
+ stubs.Set(vm_utils.VMHelper, 'fetch_image', fake_fetch_image)
+
+ def fake_parse_xmlrpc_value(val):
+ return val
+
+ stubs.Set(xenapi_conn, '_parse_xmlrpc_value', fake_parse_xmlrpc_value)
+
+ def fake_wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref,
+ original_parent_uuid):
from nova.virt.xenapi.fake import create_vdi
name_label = "instance-%s" % instance_id
#TODO: create fake SR record