summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/openstack/test_servers.py37
-rw-r--r--nova/tests/auth_unittest.py27
-rw-r--r--nova/tests/cloud_unittest.py30
-rw-r--r--nova/tests/compute_unittest.py9
-rw-r--r--nova/tests/middleware_unittest.py86
-rw-r--r--nova/tests/network_unittest.py8
-rw-r--r--nova/tests/rpc_unittest.py29
-rw-r--r--nova/tests/scheduler_unittest.py1
-rw-r--r--nova/tests/virt_unittest.py145
9 files changed, 303 insertions, 69 deletions
diff --git a/nova/tests/api/openstack/test_servers.py b/nova/tests/api/openstack/test_servers.py
index 8444b6fce..3820f5f27 100644
--- a/nova/tests/api/openstack/test_servers.py
+++ b/nova/tests/api/openstack/test_servers.py
@@ -56,11 +56,16 @@ def instance_address(context, instance_id):
def stub_instance(id, user_id=1):
- return Instance(id=id + 123456, state=0, image_id=10, user_id=user_id,
+ return Instance(id=int(id) + 123456, state=0, image_id=10, user_id=user_id,
display_name='server%s' % id, internal_id=id)
+def fake_compute_api(cls, req, id):
+ return True
+
+
class ServersTest(unittest.TestCase):
+
def setUp(self):
self.stubs = stubout.StubOutForTesting()
fakes.FakeAuthManager.auth_data = {}
@@ -82,9 +87,15 @@ class ServersTest(unittest.TestCase):
instance_address)
self.stubs.Set(nova.db.api, 'instance_get_floating_address',
instance_address)
+ self.stubs.Set(nova.compute.api.ComputeAPI, 'pause',
+ fake_compute_api)
+ self.stubs.Set(nova.compute.api.ComputeAPI, 'unpause',
+ fake_compute_api)
+ self.allow_admin = FLAGS.allow_admin_api
def tearDown(self):
self.stubs.UnsetAll()
+ FLAGS.allow_admin_api = self.allow_admin
def test_get_server_by_id(self):
req = webob.Request.blank('/v1.0/servers/1')
@@ -211,6 +222,30 @@ class ServersTest(unittest.TestCase):
self.assertEqual(s['imageId'], 10)
i += 1
+ def test_server_pause(self):
+ FLAGS.allow_admin_api = True
+ body = dict(server=dict(
+ name='server_test', imageId=2, flavorId=2, metadata={},
+ personality={}))
+ req = webob.Request.blank('/v1.0/servers/1/pause')
+ req.method = 'POST'
+ req.content_type = 'application/json'
+ req.body = json.dumps(body)
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 202)
+
+ def test_server_unpause(self):
+ FLAGS.allow_admin_api = True
+ body = dict(server=dict(
+ name='server_test', imageId=2, flavorId=2, metadata={},
+ personality={}))
+ req = webob.Request.blank('/v1.0/servers/1/unpause')
+ req.method = 'POST'
+ req.content_type = 'application/json'
+ req.body = json.dumps(body)
+ res = req.get_response(nova.api.API('os'))
+ self.assertEqual(res.status_int, 202)
+
def test_server_reboot(self):
body = dict(server=dict(
name='server_test', imageId=2, flavorId=2, metadata={},
diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py
index 4508d6721..15d40bc53 100644
--- a/nova/tests/auth_unittest.py
+++ b/nova/tests/auth_unittest.py
@@ -208,17 +208,13 @@ class AuthManagerTestCase(object):
# so it probably belongs in crypto_unittest
# but I'm leaving it where I found it.
with user_and_project_generator(self.manager) as (user, project):
- # NOTE(todd): Should mention why we must setup controller first
- # (somebody please clue me in)
- cloud_controller = cloud.CloudController()
- cloud_controller.setup()
- _key, cert_str = self.manager._generate_x509_cert('test1',
- 'testproj')
+ # NOTE(vish): Setup runs genroot.sh if it hasn't been run
+ cloud.CloudController().setup()
+ _key, cert_str = crypto.generate_x509_cert(user.id, project.id)
logging.debug(cert_str)
- # Need to verify that it's signed by the right intermediate CA
- full_chain = crypto.fetch_ca(project_id='testproj', chain=True)
- int_cert = crypto.fetch_ca(project_id='testproj', chain=False)
+ full_chain = crypto.fetch_ca(project_id=project.id, chain=True)
+ int_cert = crypto.fetch_ca(project_id=project.id, chain=False)
cloud_cert = crypto.fetch_ca()
logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
signed_cert = X509.load_cert_string(cert_str)
@@ -227,7 +223,8 @@ class AuthManagerTestCase(object):
cloud_cert = X509.load_cert_string(cloud_cert)
self.assertTrue(signed_cert.verify(chain_cert.get_pubkey()))
self.assertTrue(signed_cert.verify(int_cert.get_pubkey()))
- if not FLAGS.use_intermediate_ca:
+
+ if not FLAGS.use_project_ca:
self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey()))
else:
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
@@ -333,14 +330,10 @@ class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
AuthManagerTestCase.__init__(self)
test.TestCase.__init__(self, *args, **kwargs)
import nova.auth.fakeldap as fakeldap
- FLAGS.redis_db = 8
if FLAGS.flush_db:
- logging.info("Flushing redis datastore")
- try:
- r = fakeldap.Redis.instance()
- r.flushdb()
- except:
- self.skip = True
+ logging.info("Flushing datastore")
+ r = fakeldap.Store.instance()
+ r.flushdb()
class AuthManagerDbTestCase(AuthManagerTestCase, test.TestCase):
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py
index 53a762310..70d2c44da 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/cloud_unittest.py
@@ -22,20 +22,18 @@ import logging
from M2Crypto import BIO
from M2Crypto import RSA
import os
-import StringIO
import tempfile
import time
from eventlet import greenthread
-from xml.etree import ElementTree
from nova import context
from nova import crypto
from nova import db
from nova import flags
from nova import rpc
+from nova import service
from nova import test
-from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
@@ -54,7 +52,8 @@ os.makedirs(IMAGES_PATH)
class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
- self.flags(connection_type='fake', images_path=IMAGES_PATH)
+ self.flags(connection_type='fake',
+ images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG)
@@ -62,27 +61,23 @@ class CloudTestCase(test.TestCase):
# set up our cloud
self.cloud = cloud.CloudController()
- # set up a service
- self.compute = utils.import_object(FLAGS.compute_manager)
- self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.compute_topic,
- proxy=self.compute)
- self.compute_consumer.attach_to_eventlet()
- self.network = utils.import_object(FLAGS.network_manager)
- self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.network_topic,
- proxy=self.network)
- self.network_consumer.attach_to_eventlet()
+ # set up services
+ self.compute = service.Service.create(binary='nova-compute')
+ self.compute.start()
+ self.network = service.Service.create(binary='nova-network')
+ self.network.start()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
- project=self.project)
+ project=self.project)
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+ self.compute.kill()
+ self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -109,12 +104,13 @@ class CloudTestCase(test.TestCase):
{'address': address,
'host': FLAGS.host})
self.cloud.allocate_address(self.context)
- inst = db.instance_create(self.context, {})
+ inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
+ greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py
index c6353d357..348bb3351 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -41,6 +41,7 @@ class ComputeTestCase(test.TestCase):
logging.getLogger().setLevel(logging.DEBUG)
super(ComputeTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
@@ -127,6 +128,14 @@ class ComputeTestCase(test.TestCase):
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
+ def test_pause(self):
+ """Ensure instance can be paused"""
+ instance_id = self._create_instance()
+ self.compute.run_instance(self.context, instance_id)
+ self.compute.pause_instance(self.context, instance_id)
+ self.compute.unpause_instance(self.context, instance_id)
+ self.compute.terminate_instance(self.context, instance_id)
+
def test_reboot(self):
"""Ensure instance can be rebooted"""
instance_id = self._create_instance()
diff --git a/nova/tests/middleware_unittest.py b/nova/tests/middleware_unittest.py
new file mode 100644
index 000000000..0febf52d6
--- /dev/null
+++ b/nova/tests/middleware_unittest.py
@@ -0,0 +1,86 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import webob
+import webob.dec
+import webob.exc
+
+from nova.api import ec2
+from nova import flags
+from nova import test
+from nova import utils
+
+
+FLAGS = flags.FLAGS
+
+
+@webob.dec.wsgify
+def conditional_forbid(req):
+ """Helper wsgi app returns 403 if param 'die' is 1."""
+ if 'die' in req.params and req.params['die'] == '1':
+ raise webob.exc.HTTPForbidden()
+ return 'OK'
+
+
+class LockoutTestCase(test.TrialTestCase):
+ """Test case for the Lockout middleware."""
+ def setUp(self): # pylint: disable-msg=C0103
+ super(LockoutTestCase, self).setUp()
+ utils.set_time_override()
+ self.lockout = ec2.Lockout(conditional_forbid)
+
+ def tearDown(self): # pylint: disable-msg=C0103
+ utils.clear_time_override()
+ super(LockoutTestCase, self).tearDown()
+
+ def _send_bad_attempts(self, access_key, num_attempts=1):
+ """Fail x."""
+ for i in xrange(num_attempts):
+ req = webob.Request.blank('/?AWSAccessKeyId=%s&die=1' % access_key)
+ self.assertEqual(req.get_response(self.lockout).status_int, 403)
+
+ def _is_locked_out(self, access_key):
+ """Sends a test request to see if key is locked out."""
+ req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key)
+ return (req.get_response(self.lockout).status_int == 403)
+
+ def test_lockout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test'))
+
+ def test_timeout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test'))
+ utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
+ self.assertFalse(self._is_locked_out('test'))
+
+ def test_multiple_keys(self):
+ self._send_bad_attempts('test1', FLAGS.lockout_attempts)
+ self.assertTrue(self._is_locked_out('test1'))
+ self.assertFalse(self._is_locked_out('test2'))
+ utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
+ self.assertFalse(self._is_locked_out('test1'))
+ self.assertFalse(self._is_locked_out('test2'))
+
+ def test_window_timeout(self):
+ self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
+ self.assertFalse(self._is_locked_out('test'))
+ utils.advance_time_seconds(FLAGS.lockout_window * 60)
+ self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
+ self.assertFalse(self._is_locked_out('test'))
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index bcac20585..96473ac7c 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -26,6 +26,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
+from nova import service
from nova import test
from nova import utils
from nova.auth import manager
@@ -40,6 +41,7 @@ class NetworkTestCase(test.TestCase):
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
+ fake_call=True,
fake_network=True,
network_size=16,
num_networks=5)
@@ -56,16 +58,13 @@ class NetworkTestCase(test.TestCase):
# create the necessary network data for the project
user_context = context.RequestContext(project=self.projects[i],
user=self.user)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
instance_ref = self._create_instance(0)
self.instance_id = instance_ref['id']
instance_ref = self._create_instance(1)
self.instance2_id = instance_ref['id']
def tearDown(self):
- super(NetworkTestCase, self).tearDown()
# TODO(termie): this should really be instantiating clean datastores
# in between runs, one failure kills all the tests
db.instance_destroy(context.get_admin_context(), self.instance_id)
@@ -73,6 +72,7 @@ class NetworkTestCase(test.TestCase):
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
+ super(NetworkTestCase, self).tearDown()
def _create_instance(self, project_num, mac=None):
if not mac:
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py
index a2495e65a..6ea2edcab 100644
--- a/nova/tests/rpc_unittest.py
+++ b/nova/tests/rpc_unittest.py
@@ -33,7 +33,7 @@ class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
@@ -79,6 +79,33 @@ class RpcTestCase(test.TestCase):
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
+ def test_nested_calls(self):
+ """Test that we can do an rpc.call inside another call"""
+ class Nested(object):
+ @staticmethod
+ def echo(context, queue, value):
+ """Calls echo in the passed queue"""
+ logging.debug("Nested received %s, %s", queue, value)
+ ret = rpc.call(context,
+ queue,
+ {"method": "echo",
+ "args": {"value": value}})
+ logging.debug("Nested return %s", ret)
+ return value
+
+ nested = Nested()
+ conn = rpc.Connection.instance(True)
+ consumer = rpc.AdapterConsumer(connection=conn,
+ topic='nested',
+ proxy=nested)
+ consumer.attach_to_eventlet()
+ value = 42
+ result = rpc.call(self.context,
+ 'nested', {"method": "echo",
+ "args": {"queue": "test",
+ "value": value}})
+ self.assertEqual(value, result)
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py
index d1756b8fb..df5e7afa5 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/scheduler_unittest.py
@@ -78,6 +78,7 @@ class SimpleDriverTestCase(test.TestCase):
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
max_cores=4,
max_gigabytes=4,
network_manager='nova.network.manager.FlatManager',
diff --git a/nova/tests/virt_unittest.py b/nova/tests/virt_unittest.py
index 85e569858..9ad009510 100644
--- a/nova/tests/virt_unittest.py
+++ b/nova/tests/virt_unittest.py
@@ -33,6 +33,7 @@ flags.DECLARE('instances_path', 'nova.compute.manager')
class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
+ self.flags(fake_call=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
@@ -40,33 +41,66 @@ class LibvirtConnTestCase(test.TestCase):
self.network = utils.import_object(FLAGS.network_manager)
FLAGS.instances_path = ''
- def test_get_uri_and_template(self):
- ip = '10.11.12.13'
-
- instance = {'internal_id': 1,
- 'memory_kb': '1024000',
- 'basepath': '/some/path',
- 'bridge_name': 'br100',
- 'mac_address': '02:12:34:46:56:67',
- 'vcpus': 2,
- 'project_id': 'fake',
- 'bridge': 'br101',
- 'instance_type': 'm1.small'}
-
+ test_ip = '10.11.12.13'
+ test_instance = {'memory_kb': '1024000',
+ 'basepath': '/some/path',
+ 'bridge_name': 'br100',
+ 'mac_address': '02:12:34:46:56:67',
+ 'vcpus': 2,
+ 'project_id': 'fake',
+ 'bridge': 'br101',
+ 'instance_type': 'm1.small'}
+
+ def test_xml_and_uri_no_ramdisk_no_kernel(self):
+ instance_data = dict(self.test_instance)
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=False, expect_ramdisk=False)
+
+ def test_xml_and_uri_no_ramdisk(self):
+ instance_data = dict(self.test_instance)
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=False)
+
+ def test_xml_and_uri_no_kernel(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=False, expect_ramdisk=False)
+
+ def test_xml_and_uri(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=True)
+
+ def test_xml_and_uri_rescue(self):
+ instance_data = dict(self.test_instance)
+ instance_data['ramdisk_id'] = 'ari-deadbeef'
+ instance_data['kernel_id'] = 'aki-deadbeef'
+ self.do_test_xml_and_uri(instance_data,
+ expect_kernel=True, expect_ramdisk=True,
+ rescue=True)
+
+ def do_test_xml_and_uri(self, instance,
+ expect_ramdisk, expect_kernel,
+ rescue=False):
user_context = context.RequestContext(project=self.project,
user=self.user)
instance_ref = db.instance_create(user_context, instance)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
+ network_ref = db.project_get_network(context.get_admin_context(),
+ self.project.id)
- fixed_ip = {'address': ip,
+ fixed_ip = {'address': self.test_ip,
'network_id': network_ref['id']}
ctxt = context.get_admin_context()
fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
- db.fixed_ip_update(ctxt, ip, {'allocated': True,
- 'instance_id': instance_ref['id']})
+ db.fixed_ip_update(ctxt, self.test_ip,
+ {'allocated': True,
+ 'instance_id': instance_ref['id']})
type_uri_map = {'qemu': ('qemu:///system',
[(lambda t: t.find('.').get('type'), 'qemu'),
@@ -78,23 +112,73 @@ class LibvirtConnTestCase(test.TestCase):
(lambda t: t.find('./devices/emulator'), None)]),
'uml': ('uml:///system',
[(lambda t: t.find('.').get('type'), 'uml'),
- (lambda t: t.find('./os/type').text, 'uml')])}
+ (lambda t: t.find('./os/type').text, 'uml')]),
+ 'xen': ('xen:///',
+ [(lambda t: t.find('.').get('type'), 'xen'),
+ (lambda t: t.find('./os/type').text, 'linux')]),
+ }
+
+ for hypervisor_type in ['qemu', 'kvm', 'xen']:
+ check_list = type_uri_map[hypervisor_type][1]
+
+ if rescue:
+ check = (lambda t: t.find('./os/kernel').text.split('/')[1],
+ 'rescue-kernel')
+ check_list.append(check)
+ check = (lambda t: t.find('./os/initrd').text.split('/')[1],
+ 'rescue-ramdisk')
+ check_list.append(check)
+ else:
+ if expect_kernel:
+ check = (lambda t: t.find('./os/kernel').text.split(
+ '/')[1], 'kernel')
+ else:
+ check = (lambda t: t.find('./os/kernel'), None)
+ check_list.append(check)
+
+ if expect_ramdisk:
+ check = (lambda t: t.find('./os/initrd').text.split(
+ '/')[1], 'ramdisk')
+ else:
+ check = (lambda t: t.find('./os/initrd'), None)
+ check_list.append(check)
common_checks = [
(lambda t: t.find('.').tag, 'domain'),
- (lambda t: t.find('./devices/interface/filterref/parameter').\
- get('name'), 'IP'),
- (lambda t: t.find('./devices/interface/filterref/parameter').\
- get('value'), '10.11.12.13')]
+ (lambda t: t.find(
+ './devices/interface/filterref/parameter').get('name'), 'IP'),
+ (lambda t: t.find(
+ './devices/interface/filterref/parameter').get(
+ 'value'), '10.11.12.13'),
+ (lambda t: t.findall(
+ './devices/interface/filterref/parameter')[1].get(
+ 'name'), 'DHCPSERVER'),
+ (lambda t: t.findall(
+ './devices/interface/filterref/parameter')[1].get(
+ 'value'), '10.0.0.1'),
+ (lambda t: t.find('./devices/serial/source').get(
+ 'path').split('/')[1], 'console.log'),
+ (lambda t: t.find('./memory').text, '2097152')]
+
+ if rescue:
+ common_checks += [
+ (lambda t: t.findall('./devices/disk/source')[0].get(
+ 'file').split('/')[1], 'rescue-disk'),
+ (lambda t: t.findall('./devices/disk/source')[1].get(
+ 'file').split('/')[1], 'disk')]
+ else:
+ common_checks += [(lambda t: t.findall(
+ './devices/disk/source')[0].get('file').split('/')[1],
+ 'disk')]
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
- uri, _template, _rescue = conn.get_uri_and_templates()
+ uri = conn.get_uri()
self.assertEquals(uri, expected_uri)
- xml = conn.to_xml(instance_ref)
+ xml = conn.to_xml(instance_ref, rescue)
tree = xml_to_tree(xml)
for i, (check, expected_result) in enumerate(checks):
self.assertEqual(check(tree),
@@ -106,6 +190,9 @@ class LibvirtConnTestCase(test.TestCase):
expected_result,
'%s failed common check %d' % (xml, i))
+ # This test is supposed to make sure we don't override a specifically
+ # set uri
+ #
# Deliberately not just assigning this string to FLAGS.libvirt_uri and
# checking against that later on. This way we make sure the
# implementation doesn't fiddle around with the FLAGS.
@@ -114,7 +201,7 @@ class LibvirtConnTestCase(test.TestCase):
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
- uri, _template, _rescue = conn.get_uri_and_templates()
+ uri = conn.get_uri()
self.assertEquals(uri, testuri)
def tearDown(self):
@@ -252,7 +339,7 @@ class NWFilterTestCase(test.TestCase):
self.security_group.id)
instance = db.instance_get(self.context, inst_id)
- d = self.fw.setup_nwfilters_for_instance(instance)
+ self.fw.setup_base_nwfilters()
+ self.fw.setup_nwfilters_for_instance(instance)
_ensure_all_called()
self.teardown_security_group()
- return d