From 8d3099e1cfd8f5cc43d3549ecf839f6303b45600 Mon Sep 17 00:00:00 2001 From: Rick Harris Date: Tue, 21 May 2013 16:24:39 +0000 Subject: virt: Move generic virt tests to nova/tests/virt/ In the interest of better organization, this patch moves generic virt testing code into nova/tests/virt. Change-Id: I79473726a848f0abbcb13865efe30e02aea09ae5 --- nova/tests/test_driver.py | 60 --- nova/tests/test_virt.py | 139 ------ nova/tests/test_virt_disk.py | 219 --------- nova/tests/test_virt_disk_vfs_guestfs.py | 203 -------- nova/tests/test_virt_disk_vfs_localfs.py | 387 ---------------- nova/tests/test_virt_drivers.py | 642 -------------------------- nova/tests/virt/test_driver.py | 60 +++ nova/tests/virt/test_virt.py | 139 ++++++ nova/tests/virt/test_virt_disk.py | 219 +++++++++ nova/tests/virt/test_virt_disk_vfs_guestfs.py | 203 ++++++++ nova/tests/virt/test_virt_disk_vfs_localfs.py | 387 ++++++++++++++++ nova/tests/virt/test_virt_drivers.py | 642 ++++++++++++++++++++++++++ 12 files changed, 1650 insertions(+), 1650 deletions(-) delete mode 100644 nova/tests/test_driver.py delete mode 100644 nova/tests/test_virt.py delete mode 100644 nova/tests/test_virt_disk.py delete mode 100644 nova/tests/test_virt_disk_vfs_guestfs.py delete mode 100644 nova/tests/test_virt_disk_vfs_localfs.py delete mode 100644 nova/tests/test_virt_drivers.py create mode 100644 nova/tests/virt/test_driver.py create mode 100644 nova/tests/virt/test_virt.py create mode 100644 nova/tests/virt/test_virt_disk.py create mode 100644 nova/tests/virt/test_virt_disk_vfs_guestfs.py create mode 100644 nova/tests/virt/test_virt_disk_vfs_localfs.py create mode 100644 nova/tests/virt/test_virt_drivers.py diff --git a/nova/tests/test_driver.py b/nova/tests/test_driver.py deleted file mode 100644 index 6fd8b00b1..000000000 --- a/nova/tests/test_driver.py +++ /dev/null @@ -1,60 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (c) 2013 Citrix Systems, Inc. -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from nova import test -from nova.virt import driver - - -class FakeDriver(object): - def __init__(self, *args, **kwargs): - self.args = args - self.kwargs = kwargs - - -class FakeDriver2(FakeDriver): - pass - - -class ToDriverRegistryTestCase(test.TestCase): - - def assertDriverInstance(self, inst, class_, *args, **kwargs): - self.assertEquals(class_, inst.__class__) - self.assertEquals(args, inst.args) - self.assertEquals(kwargs, inst.kwargs) - - def test_driver_dict_from_config(self): - drvs = driver.driver_dict_from_config( - [ - 'key1=nova.tests.test_driver.FakeDriver', - 'key2=nova.tests.test_driver.FakeDriver2', - ], 'arg1', 'arg2', param1='value1', param2='value2' - ) - - self.assertEquals( - sorted(['key1', 'key2']), - sorted(drvs.keys()) - ) - - self.assertDriverInstance( - drvs['key1'], - FakeDriver, 'arg1', 'arg2', param1='value1', - param2='value2') - - self.assertDriverInstance( - drvs['key2'], - FakeDriver2, 'arg1', 'arg2', param1='value1', - param2='value2') diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py deleted file mode 100644 index 452277c54..000000000 --- a/nova/tests/test_virt.py +++ /dev/null @@ -1,139 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 Isaku Yamahata -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os - -from nova import test -from nova import utils -from nova.virt.disk import api as disk_api -from nova.virt import driver - - -class TestVirtDriver(test.TestCase): - def test_block_device(self): - swap = {'device_name': '/dev/sdb', - 'swap_size': 1} - ephemerals = [{'num': 0, - 'virtual_name': 'ephemeral0', - 'device_name': '/dev/sdc1', - 'size': 1}] - block_device_mapping = [{'mount_device': '/dev/sde', - 'device_path': 'fake_device'}] - block_device_info = { - 'root_device_name': '/dev/sda', - 'swap': swap, - 'ephemerals': ephemerals, - 'block_device_mapping': block_device_mapping} - - empty_block_device_info = {} - - self.assertEqual( - driver.block_device_info_get_root(block_device_info), '/dev/sda') - self.assertEqual( - driver.block_device_info_get_root(empty_block_device_info), None) - self.assertEqual( - driver.block_device_info_get_root(None), None) - - self.assertEqual( - driver.block_device_info_get_swap(block_device_info), swap) - self.assertEqual(driver.block_device_info_get_swap( - empty_block_device_info)['device_name'], None) - self.assertEqual(driver.block_device_info_get_swap( - empty_block_device_info)['swap_size'], 0) - self.assertEqual( - driver.block_device_info_get_swap({'swap': None})['device_name'], - None) - self.assertEqual( - driver.block_device_info_get_swap({'swap': None})['swap_size'], - 0) - self.assertEqual( - driver.block_device_info_get_swap(None)['device_name'], None) - self.assertEqual( - driver.block_device_info_get_swap(None)['swap_size'], 0) - - self.assertEqual( - driver.block_device_info_get_ephemerals(block_device_info), - ephemerals) - self.assertEqual( - driver.block_device_info_get_ephemerals(empty_block_device_info), - []) - self.assertEqual( - driver.block_device_info_get_ephemerals(None), - []) - - def test_swap_is_usable(self): - self.assertFalse(driver.swap_is_usable(None)) - self.assertFalse(driver.swap_is_usable({'device_name': None})) - self.assertFalse(driver.swap_is_usable({'device_name': '/dev/sdb', - 'swap_size': 0})) - self.assertTrue(driver.swap_is_usable({'device_name': '/dev/sdb', - 'swap_size': 1})) - - -class TestVirtDisk(test.TestCase): - def setUp(self): - super(TestVirtDisk, self).setUp() - self.executes = [] - - def fake_execute(*cmd, **kwargs): - self.executes.append(cmd) - return None, None - - self.stubs.Set(utils, 'execute', fake_execute) - - def test_lxc_teardown_container(self): - - def proc_mounts(self, mount_point): - mount_points = { - '/mnt/loop/nopart': '/dev/loop0', - '/mnt/loop/part': '/dev/mapper/loop0p1', - '/mnt/nbd/nopart': '/dev/nbd15', - '/mnt/nbd/part': '/dev/mapper/nbd15p1', - } - return mount_points[mount_point] - - self.stubs.Set(os.path, 'exists', lambda _: True) - self.stubs.Set(disk_api._DiskImage, '_device_for_path', proc_mounts) - expected_commands = [] - - disk_api.teardown_container('/mnt/loop/nopart') - expected_commands += [ - ('umount', '/dev/loop0'), - ('losetup', '--detach', '/dev/loop0'), - ] - - disk_api.teardown_container('/mnt/loop/part') - expected_commands += [ - ('umount', '/dev/mapper/loop0p1'), - ('kpartx', '-d', '/dev/loop0'), - ('losetup', '--detach', '/dev/loop0'), - ] - - disk_api.teardown_container('/mnt/nbd/nopart') - expected_commands += [ - ('umount', '/dev/nbd15'), - ('qemu-nbd', '-d', '/dev/nbd15'), - ] - - disk_api.teardown_container('/mnt/nbd/part') - expected_commands += [ - ('umount', '/dev/mapper/nbd15p1'), - ('kpartx', '-d', '/dev/nbd15'), - ('qemu-nbd', '-d', '/dev/nbd15'), - ] - - self.assertEqual(self.executes, expected_commands) diff --git a/nova/tests/test_virt_disk.py b/nova/tests/test_virt_disk.py deleted file mode 100644 index 0c51e8267..000000000 --- a/nova/tests/test_virt_disk.py +++ /dev/null @@ -1,219 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright (C) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import sys - -from nova import exception -from nova import test -from nova.tests import fakeguestfs -from nova.virt.disk import api as diskapi -from nova.virt.disk.vfs import guestfs as vfsguestfs - - -class VirtDiskTest(test.TestCase): - - def setUp(self): - super(VirtDiskTest, self).setUp() - sys.modules['guestfs'] = fakeguestfs - vfsguestfs.guestfs = fakeguestfs - - def test_inject_data(self): - - self.assertTrue(diskapi.inject_data("/some/file", use_cow=True)) - - self.assertTrue(diskapi.inject_data("/some/file", - mandatory=('files',))) - - self.assertTrue(diskapi.inject_data("/some/file", key="mysshkey", - mandatory=('key',))) - - os_name = os.name - os.name = 'nt' # Cause password injection to fail - self.assertRaises(exception.NovaException, - diskapi.inject_data, - "/some/file", admin_password="p", - mandatory=('admin_password',)) - self.assertFalse(diskapi.inject_data("/some/file", admin_password="p")) - os.name = os_name - - def test_inject_data_key(self): - - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - diskapi._inject_key_into_fs("mysshkey", vfs) - - self.assertTrue("/root/.ssh" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/root/.ssh"], - {'isdir': True, 'gid': 0, 'uid': 0, 'mode': 0700}) - self.assertTrue("/root/.ssh/authorized_keys" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/root/.ssh/authorized_keys"], - {'isdir': False, - 'content': "Hello World\n# The following ssh " + - "key was injected by Nova\nmysshkey\n", - 'gid': 100, - 'uid': 100, - 'mode': 0600}) - - vfs.teardown() - - def test_inject_data_key_with_selinux(self): - - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - vfs.make_path("etc/selinux") - vfs.make_path("etc/rc.d") - diskapi._inject_key_into_fs("mysshkey", vfs) - - self.assertTrue("/etc/rc.d/rc.local" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/etc/rc.d/rc.local"], - {'isdir': False, - 'content': "Hello World#!/bin/sh\n# Added by " + - "Nova to ensure injected ssh keys " + - "have the right context\nrestorecon " + - "-RF root/.ssh 2>/dev/null || :\n", - 'gid': 100, - 'uid': 100, - 'mode': 0700}) - - self.assertTrue("/root/.ssh" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/root/.ssh"], - {'isdir': True, 'gid': 0, 'uid': 0, 'mode': 0700}) - self.assertTrue("/root/.ssh/authorized_keys" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/root/.ssh/authorized_keys"], - {'isdir': False, - 'content': "Hello World\n# The following ssh " + - "key was injected by Nova\nmysshkey\n", - 'gid': 100, - 'uid': 100, - 'mode': 0600}) - - vfs.teardown() - - def test_inject_data_key_with_selinux_append_with_newline(self): - - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - vfs.replace_file("/etc/rc.d/rc.local", "#!/bin/sh\necho done") - vfs.make_path("etc/selinux") - vfs.make_path("etc/rc.d") - diskapi._inject_key_into_fs("mysshkey", vfs) - - self.assertTrue("/etc/rc.d/rc.local" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/etc/rc.d/rc.local"], - {'isdir': False, - 'content': "#!/bin/sh\necho done\n# Added " - "by Nova to ensure injected ssh keys have " - "the right context\nrestorecon -RF " - "root/.ssh 2>/dev/null || :\n", - 'gid': 100, - 'uid': 100, - 'mode': 0700}) - vfs.teardown() - - def test_inject_net(self): - - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - diskapi._inject_net_into_fs("mynetconfig", vfs) - - self.assertTrue("/etc/network/interfaces" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/etc/network/interfaces"], - {'content': 'mynetconfig', - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - vfs.teardown() - - def test_inject_metadata(self): - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - diskapi._inject_metadata_into_fs([{"key": "foo", - "value": "bar"}, - {"key": "eek", - "value": "wizz"}], vfs) - - self.assertTrue("/meta.js" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/meta.js"], - {'content': '{"foo": "bar", ' + - '"eek": "wizz"}', - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - vfs.teardown() - - def test_inject_admin_password(self): - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - def fake_salt(): - return "1234567890abcdef" - - self.stubs.Set(diskapi, '_generate_salt', fake_salt) - - vfs.handle.write("/etc/shadow", - "root:$1$12345678$xxxxx:14917:0:99999:7:::\n" + - "bin:*:14495:0:99999:7:::\n" + - "daemon:*:14495:0:99999:7:::\n") - - vfs.handle.write("/etc/passwd", - "root:x:0:0:root:/root:/bin/bash\n" + - "bin:x:1:1:bin:/bin:/sbin/nologin\n" + - "daemon:x:2:2:daemon:/sbin:/sbin/nologin\n") - - diskapi._inject_admin_password_into_fs("123456", vfs) - - self.assertEquals(vfs.handle.files["/etc/passwd"], - {'content': "root:x:0:0:root:/root:/bin/bash\n" + - "bin:x:1:1:bin:/bin:/sbin/nologin\n" + - "daemon:x:2:2:daemon:/sbin:" + - "/sbin/nologin\n", - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - shadow = vfs.handle.files["/etc/shadow"] - - # if the encrypted password is only 13 characters long, then - # nova.virt.disk.api:_set_password fell back to DES. - if len(shadow['content']) == 91: - self.assertEquals(shadow, - {'content': "root:12tir.zIbWQ3c" + - ":14917:0:99999:7:::\n" + - "bin:*:14495:0:99999:7:::\n" + - "daemon:*:14495:0:99999:7:::\n", - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - else: - self.assertEquals(shadow, - {'content': "root:$1$12345678$a4ge4d5iJ5vw" + - "vbFS88TEN0:14917:0:99999:7:::\n" + - "bin:*:14495:0:99999:7:::\n" + - "daemon:*:14495:0:99999:7:::\n", - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - vfs.teardown() diff --git a/nova/tests/test_virt_disk_vfs_guestfs.py b/nova/tests/test_virt_disk_vfs_guestfs.py deleted file mode 100644 index 16c85f815..000000000 --- a/nova/tests/test_virt_disk_vfs_guestfs.py +++ /dev/null @@ -1,203 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright (C) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import sys - -from nova import exception -from nova import test - -from nova.tests import fakeguestfs -from nova.virt.disk.vfs import guestfs as vfsimpl - - -class VirtDiskVFSGuestFSTest(test.TestCase): - - def setUp(self): - super(VirtDiskVFSGuestFSTest, self).setUp() - sys.modules['guestfs'] = fakeguestfs - vfsimpl.guestfs = fakeguestfs - - def test_appliance_setup_inspect(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=-1) - vfs.setup() - - self.assertEqual(vfs.handle.running, True) - self.assertEqual(len(vfs.handle.mounts), 2) - self.assertEqual(vfs.handle.mounts[0][1], - "/dev/mapper/guestvgf-lv_root") - self.assertEqual(vfs.handle.mounts[1][1], "/dev/vda1") - self.assertEqual(vfs.handle.mounts[0][2], "/") - self.assertEqual(vfs.handle.mounts[1][2], "/boot") - - handle = vfs.handle - vfs.teardown() - - self.assertEqual(vfs.handle, None) - self.assertEqual(handle.running, False) - self.assertEqual(handle.closed, True) - self.assertEqual(len(handle.mounts), 0) - - def test_appliance_setup_inspect_no_root_raises(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=-1) - # call setup to init the handle so we can stub it - vfs.setup() - - def fake_inspect_os(): - return [] - - self.stubs.Set(vfs.handle, 'inspect_os', fake_inspect_os) - self.assertRaises(exception.NovaException, vfs.setup_os_inspect) - - def test_appliance_setup_inspect_multi_boots_raises(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=-1) - # call setup to init the handle so we can stub it - vfs.setup() - - def fake_inspect_os(): - return ['fake1', 'fake2'] - - self.stubs.Set(vfs.handle, 'inspect_os', fake_inspect_os) - self.assertRaises(exception.NovaException, vfs.setup_os_inspect) - - def test_appliance_setup_static_nopart(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=None) - vfs.setup() - - self.assertEqual(vfs.handle.running, True) - self.assertEqual(len(vfs.handle.mounts), 1) - self.assertEqual(vfs.handle.mounts[0][1], "/dev/sda") - self.assertEqual(vfs.handle.mounts[0][2], "/") - - handle = vfs.handle - vfs.teardown() - - self.assertEqual(vfs.handle, None) - self.assertEqual(handle.running, False) - self.assertEqual(handle.closed, True) - self.assertEqual(len(handle.mounts), 0) - - def test_appliance_setup_static_part(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=2) - vfs.setup() - - self.assertEqual(vfs.handle.running, True) - self.assertEqual(len(vfs.handle.mounts), 1) - self.assertEqual(vfs.handle.mounts[0][1], "/dev/sda2") - self.assertEqual(vfs.handle.mounts[0][2], "/") - - handle = vfs.handle - vfs.teardown() - - self.assertEqual(vfs.handle, None) - self.assertEqual(handle.running, False) - self.assertEqual(handle.closed, True) - self.assertEqual(len(handle.mounts), 0) - - def test_makepath(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.make_path("/some/dir") - vfs.make_path("/other/dir") - - self.assertTrue("/some/dir" in vfs.handle.files) - self.assertTrue("/other/dir" in vfs.handle.files) - self.assertTrue(vfs.handle.files["/some/dir"]["isdir"]) - self.assertTrue(vfs.handle.files["/other/dir"]["isdir"]) - - vfs.teardown() - - def test_append_file(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.append_file("/some/file", " Goodbye") - - self.assertTrue("/some/file" in vfs.handle.files) - self.assertEqual(vfs.handle.files["/some/file"]["content"], - "Hello World Goodbye") - - vfs.teardown() - - def test_replace_file(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.replace_file("/some/file", "Goodbye") - - self.assertTrue("/some/file" in vfs.handle.files) - self.assertEqual(vfs.handle.files["/some/file"]["content"], - "Goodbye") - - vfs.teardown() - - def test_read_file(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - self.assertEqual(vfs.read_file("/some/file"), "Hello World") - - vfs.teardown() - - def test_has_file(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.read_file("/some/file") - - self.assertTrue(vfs.has_file("/some/file")) - self.assertFalse(vfs.has_file("/other/file")) - - vfs.teardown() - - def test_set_permissions(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.read_file("/some/file") - - self.assertEquals(vfs.handle.files["/some/file"]["mode"], 0700) - - vfs.set_permissions("/some/file", 0777) - self.assertEquals(vfs.handle.files["/some/file"]["mode"], 0777) - - vfs.teardown() - - def test_set_ownership(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.read_file("/some/file") - - self.assertEquals(vfs.handle.files["/some/file"]["uid"], 100) - self.assertEquals(vfs.handle.files["/some/file"]["gid"], 100) - - vfs.set_ownership("/some/file", "fred", None) - self.assertEquals(vfs.handle.files["/some/file"]["uid"], 105) - self.assertEquals(vfs.handle.files["/some/file"]["gid"], 100) - - vfs.set_ownership("/some/file", None, "users") - self.assertEquals(vfs.handle.files["/some/file"]["uid"], 105) - self.assertEquals(vfs.handle.files["/some/file"]["gid"], 500) - - vfs.set_ownership("/some/file", "joe", "admins") - self.assertEquals(vfs.handle.files["/some/file"]["uid"], 110) - self.assertEquals(vfs.handle.files["/some/file"]["gid"], 600) - - vfs.teardown() diff --git a/nova/tests/test_virt_disk_vfs_localfs.py b/nova/tests/test_virt_disk_vfs_localfs.py deleted file mode 100644 index b52817b18..000000000 --- a/nova/tests/test_virt_disk_vfs_localfs.py +++ /dev/null @@ -1,387 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright (C) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo.config import cfg - -from nova import exception -from nova.openstack.common import processutils -from nova import test -from nova.tests import utils as tests_utils - -from nova.virt.disk.vfs import localfs as vfsimpl - -CONF = cfg.CONF - -dirs = [] -files = {} -commands = [] - - -def fake_execute(*args, **kwargs): - commands.append({"args": args, "kwargs": kwargs}) - - if args[0] == "readlink": - if args[1] == "-nm": - if args[2] in ["/scratch/dir/some/file", - "/scratch/dir/some/dir", - "/scratch/dir/other/dir", - "/scratch/dir/other/file"]: - return args[2], "" - elif args[1] == "-e": - if args[2] in files: - return args[2], "" - - return "", "No such file" - elif args[0] == "mkdir": - dirs.append(args[2]) - elif args[0] == "chown": - owner = args[1] - path = args[2] - if path not in files: - raise Exception("No such file: " + path) - - sep = owner.find(':') - if sep != -1: - user = owner[0:sep] - group = owner[sep + 1:] - else: - user = owner - group = None - - if user: - if user == "fred": - uid = 105 - else: - uid = 110 - files[path]["uid"] = uid - if group: - if group == "users": - gid = 500 - else: - gid = 600 - files[path]["gid"] = gid - elif args[0] == "chgrp": - group = args[1] - path = args[2] - if path not in files: - raise Exception("No such file: " + path) - - if group == "users": - gid = 500 - else: - gid = 600 - files[path]["gid"] = gid - elif args[0] == "chmod": - mode = args[1] - path = args[2] - if path not in files: - raise Exception("No such file: " + path) - - files[path]["mode"] = int(mode, 8) - elif args[0] == "cat": - path = args[1] - if path not in files: - files[path] = { - "content": "Hello World", - "gid": 100, - "uid": 100, - "mode": 0700 - } - return files[path]["content"], "" - elif args[0] == "tee": - if args[1] == "-a": - path = args[2] - append = True - else: - path = args[1] - append = False - if path not in files: - files[path] = { - "content": "Hello World", - "gid": 100, - "uid": 100, - "mode": 0700, - } - if append: - files[path]["content"] += kwargs["process_input"] - else: - files[path]["content"] = kwargs["process_input"] - - -class VirtDiskVFSLocalFSTestPaths(test.TestCase): - def setUp(self): - super(VirtDiskVFSLocalFSTestPaths, self).setUp() - - real_execute = processutils.execute - - def nonroot_execute(*cmd_parts, **kwargs): - kwargs.pop('run_as_root', None) - return real_execute(*cmd_parts, **kwargs) - - self.stubs.Set(processutils, 'execute', nonroot_execute) - - def test_check_safe_path(self): - if tests_utils.is_osx(): - self.skipTest("Unable to test on OSX") - vfs = vfsimpl.VFSLocalFS("dummy.img") - vfs.imgdir = "/foo" - ret = vfs._canonical_path('etc/something.conf') - self.assertEquals(ret, '/foo/etc/something.conf') - - def test_check_unsafe_path(self): - if tests_utils.is_osx(): - self.skipTest("Unable to test on OSX") - vfs = vfsimpl.VFSLocalFS("dummy.img") - vfs.imgdir = "/foo" - self.assertRaises(exception.Invalid, - vfs._canonical_path, - 'etc/../../../something.conf') - - -class VirtDiskVFSLocalFSTest(test.TestCase): - def test_makepath(self): - global dirs, commands - dirs = [] - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.make_path("/some/dir") - vfs.make_path("/other/dir") - - self.assertEqual(dirs, - ["/scratch/dir/some/dir", "/scratch/dir/other/dir"]), - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/dir'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('mkdir', '-p', - '/scratch/dir/some/dir'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/other/dir'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('mkdir', '-p', - '/scratch/dir/other/dir'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_append_file(self): - global files, commands - files = {} - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.append_file("/some/file", " Goodbye") - - self.assertTrue("/scratch/dir/some/file" in files) - self.assertEquals(files["/scratch/dir/some/file"]["content"], - "Hello World Goodbye") - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('tee', '-a', - '/scratch/dir/some/file'), - 'kwargs': {'process_input': ' Goodbye', - 'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_replace_file(self): - global files, commands - files = {} - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.replace_file("/some/file", "Goodbye") - - self.assertTrue("/scratch/dir/some/file" in files) - self.assertEquals(files["/scratch/dir/some/file"]["content"], - "Goodbye") - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('tee', '/scratch/dir/some/file'), - 'kwargs': {'process_input': 'Goodbye', - 'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_read_file(self): - global commands, files - files = {} - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - self.assertEqual(vfs.read_file("/some/file"), "Hello World") - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('cat', '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_has_file(self): - global commands, files - files = {} - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.read_file("/some/file") - - self.assertTrue(vfs.has_file("/some/file")) - self.assertFalse(vfs.has_file("/other/file")) - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('cat', '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-e', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/other/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-e', - '/scratch/dir/other/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - ]) - - def test_set_permissions(self): - global commands, files - commands = [] - files = {} - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.read_file("/some/file") - - vfs.set_permissions("/some/file", 0777) - self.assertEquals(files["/scratch/dir/some/file"]["mode"], 0777) - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('cat', '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('chmod', '777', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_set_ownership(self): - global commands, files - commands = [] - files = {} - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.read_file("/some/file") - - self.assertEquals(files["/scratch/dir/some/file"]["uid"], 100) - self.assertEquals(files["/scratch/dir/some/file"]["gid"], 100) - - vfs.set_ownership("/some/file", "fred", None) - self.assertEquals(files["/scratch/dir/some/file"]["uid"], 105) - self.assertEquals(files["/scratch/dir/some/file"]["gid"], 100) - - vfs.set_ownership("/some/file", None, "users") - self.assertEquals(files["/scratch/dir/some/file"]["uid"], 105) - self.assertEquals(files["/scratch/dir/some/file"]["gid"], 500) - - vfs.set_ownership("/some/file", "joe", "admins") - self.assertEquals(files["/scratch/dir/some/file"]["uid"], 110) - self.assertEquals(files["/scratch/dir/some/file"]["gid"], 600) - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('cat', '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('chown', 'fred', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('chgrp', 'users', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('chown', 'joe:admins', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}]) diff --git a/nova/tests/test_virt_drivers.py b/nova/tests/test_virt_drivers.py deleted file mode 100644 index c054b9624..000000000 --- a/nova/tests/test_virt_drivers.py +++ /dev/null @@ -1,642 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2010 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import base64 -import fixtures -import netaddr -import sys -import traceback - -from nova.compute import manager -from nova import exception -from nova.openstack.common import importutils -from nova.openstack.common import log as logging -from nova import test -from nova.tests.image import fake as fake_image -from nova.tests import utils as test_utils -from nova.tests.virt.libvirt import fake_libvirt_utils -from nova.virt import event as virtevent -from nova.virt import fake - -LOG = logging.getLogger(__name__) - - -def catch_notimplementederror(f): - """Decorator to simplify catching drivers raising NotImplementedError - - If a particular call makes a driver raise NotImplementedError, we - log it so that we can extract this information afterwards to - automatically generate a hypervisor/feature support matrix.""" - def wrapped_func(self, *args, **kwargs): - try: - return f(self, *args, **kwargs) - except NotImplementedError: - frame = traceback.extract_tb(sys.exc_info()[2])[-1] - LOG.error('%(driver)s does not implement %(method)s' % { - 'driver': type(self.connection), - 'method': frame[2]}) - - wrapped_func.__name__ = f.__name__ - wrapped_func.__doc__ = f.__doc__ - return wrapped_func - - -class _FakeDriverBackendTestCase(object): - def _setup_fakelibvirt(self): - # So that the _supports_direct_io does the test based - # on the current working directory, instead of the - # default instances_path which doesn't exist - self.flags(instances_path='') - - # Put fakelibvirt in place - if 'libvirt' in sys.modules: - self.saved_libvirt = sys.modules['libvirt'] - else: - self.saved_libvirt = None - - import nova.tests.virt.libvirt.fake_imagebackend as fake_imagebackend - import nova.tests.virt.libvirt.fake_libvirt_utils as fake_libvirt_utils - import nova.tests.virt.libvirt.fakelibvirt as fakelibvirt - - sys.modules['libvirt'] = fakelibvirt - import nova.virt.libvirt.driver - import nova.virt.libvirt.firewall - - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.driver.imagebackend', - fake_imagebackend)) - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.driver.libvirt', - fakelibvirt)) - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.driver.libvirt_utils', - fake_libvirt_utils)) - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.imagebackend.libvirt_utils', - fake_libvirt_utils)) - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.firewall.libvirt', - fakelibvirt)) - - self.flags(rescue_image_id="2", - rescue_kernel_id="3", - rescue_ramdisk_id=None, - libvirt_snapshots_directory='./') - - def fake_extend(image, size): - pass - - def fake_migrateToURI(*a): - pass - - def fake_make_drive(_self, _path): - pass - - def fake_get_instance_disk_info(_self, instance, xml=None): - return '[]' - - self.stubs.Set(nova.virt.libvirt.driver.LibvirtDriver, - 'get_instance_disk_info', - fake_get_instance_disk_info) - - self.stubs.Set(nova.virt.libvirt.driver.disk, - 'extend', fake_extend) - - # Like the existing fakelibvirt.migrateToURI, do nothing, - # but don't fail for these tests. - self.stubs.Set(nova.virt.libvirt.driver.libvirt.Domain, - 'migrateToURI', fake_migrateToURI) - - # We can't actually make a config drive v2 because ensure_tree has - # been faked out - self.stubs.Set(nova.virt.configdrive.ConfigDriveBuilder, - 'make_drive', fake_make_drive) - - def _teardown_fakelibvirt(self): - # Restore libvirt - if self.saved_libvirt: - sys.modules['libvirt'] = self.saved_libvirt - - def setUp(self): - super(_FakeDriverBackendTestCase, self).setUp() - # TODO(sdague): it would be nice to do this in a way that only - # the relevant backends where replaced for tests, though this - # should not harm anything by doing it for all backends - fake_image.stub_out_image_service(self.stubs) - self._setup_fakelibvirt() - - def tearDown(self): - fake_image.FakeImageService_reset() - self._teardown_fakelibvirt() - super(_FakeDriverBackendTestCase, self).tearDown() - - -class VirtDriverLoaderTestCase(_FakeDriverBackendTestCase, test.TestCase): - """Test that ComputeManager can successfully load both - old style and new style drivers and end up with the correct - final class""" - - # if your driver supports being tested in a fake way, it can go here - # - # both long form and short form drivers are supported - new_drivers = { - 'nova.virt.fake.FakeDriver': 'FakeDriver', - 'nova.virt.libvirt.LibvirtDriver': 'LibvirtDriver', - 'fake.FakeDriver': 'FakeDriver', - 'libvirt.LibvirtDriver': 'LibvirtDriver' - } - - def test_load_new_drivers(self): - for cls, driver in self.new_drivers.iteritems(): - self.flags(compute_driver=cls) - # NOTE(sdague) the try block is to make it easier to debug a - # failure by knowing which driver broke - try: - cm = manager.ComputeManager() - except Exception as e: - self.fail("Couldn't load driver %s - %s" % (cls, e)) - - self.assertEqual(cm.driver.__class__.__name__, driver, - "Could't load driver %s" % cls) - - def test_fail_to_load_new_drivers(self): - self.flags(compute_driver='nova.virt.amiga') - - def _fake_exit(error): - raise test.TestingException() - - self.stubs.Set(sys, 'exit', _fake_exit) - self.assertRaises(test.TestingException, manager.ComputeManager) - - -class _VirtDriverTestCase(_FakeDriverBackendTestCase): - def setUp(self): - super(_VirtDriverTestCase, self).setUp() - - self.flags(instances_path=self.useFixture(fixtures.TempDir()).path) - self.connection = importutils.import_object(self.driver_module, - fake.FakeVirtAPI()) - self.ctxt = test_utils.get_test_admin_context() - self.image_service = fake_image.FakeImageService() - - def _get_running_instance(self): - instance_ref = test_utils.get_test_instance() - network_info = test_utils.get_test_network_info() - image_info = test_utils.get_test_image_info(None, instance_ref) - self.connection.spawn(self.ctxt, instance_ref, image_info, - [], 'herp', network_info=network_info) - return instance_ref, network_info - - @catch_notimplementederror - def test_init_host(self): - self.connection.init_host('myhostname') - - @catch_notimplementederror - def test_list_instances(self): - self.connection.list_instances() - - @catch_notimplementederror - def test_spawn(self): - instance_ref, network_info = self._get_running_instance() - domains = self.connection.list_instances() - self.assertIn(instance_ref['name'], domains) - - num_instances = self.connection.get_num_instances() - self.assertEqual(1, num_instances) - - @catch_notimplementederror - def test_snapshot_not_running(self): - instance_ref = test_utils.get_test_instance() - img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'}) - self.assertRaises(exception.InstanceNotRunning, - self.connection.snapshot, - self.ctxt, instance_ref, img_ref['id'], - lambda *args, **kwargs: None) - - @catch_notimplementederror - def test_snapshot_running(self): - img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'}) - instance_ref, network_info = self._get_running_instance() - self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'], - lambda *args, **kwargs: None) - - @catch_notimplementederror - def test_reboot(self): - reboot_type = "SOFT" - instance_ref, network_info = self._get_running_instance() - self.connection.reboot(self.ctxt, instance_ref, network_info, - reboot_type) - - @catch_notimplementederror - def test_get_host_ip_addr(self): - host_ip = self.connection.get_host_ip_addr() - - # Will raise an exception if it's not a valid IP at all - ip = netaddr.IPAddress(host_ip) - - # For now, assume IPv4. - self.assertEquals(ip.version, 4) - - @catch_notimplementederror - def test_set_admin_password(self): - instance_ref, network_info = self._get_running_instance() - self.connection.set_admin_password(instance_ref, 'p4ssw0rd') - - @catch_notimplementederror - def test_inject_file(self): - instance_ref, network_info = self._get_running_instance() - self.connection.inject_file(instance_ref, - base64.b64encode('/testfile'), - base64.b64encode('testcontents')) - - @catch_notimplementederror - def test_resume_state_on_host_boot(self): - instance_ref, network_info = self._get_running_instance() - self.connection.resume_state_on_host_boot(self.ctxt, instance_ref, - network_info) - - @catch_notimplementederror - def test_rescue(self): - instance_ref, network_info = self._get_running_instance() - self.connection.rescue(self.ctxt, instance_ref, network_info, None, '') - - @catch_notimplementederror - def test_unrescue_unrescued_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.unrescue(instance_ref, network_info) - - @catch_notimplementederror - def test_unrescue_rescued_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.rescue(self.ctxt, instance_ref, network_info, None, '') - self.connection.unrescue(instance_ref, network_info) - - @catch_notimplementederror - def test_poll_rebooting_instances(self): - instances = [self._get_running_instance()] - self.connection.poll_rebooting_instances(10, instances) - - @catch_notimplementederror - def test_migrate_disk_and_power_off(self): - instance_ref, network_info = self._get_running_instance() - instance_type_ref = test_utils.get_test_instance_type() - self.connection.migrate_disk_and_power_off( - self.ctxt, instance_ref, 'dest_host', instance_type_ref, - network_info) - - @catch_notimplementederror - def test_power_off(self): - instance_ref, network_info = self._get_running_instance() - self.connection.power_off(instance_ref) - - @catch_notimplementederror - def test_power_on_running(self): - instance_ref, network_info = self._get_running_instance() - self.connection.power_on(instance_ref) - - @catch_notimplementederror - def test_power_on_powered_off(self): - instance_ref, network_info = self._get_running_instance() - self.connection.power_off(instance_ref) - self.connection.power_on(instance_ref) - - @catch_notimplementederror - def test_soft_delete(self): - instance_ref, network_info = self._get_running_instance() - self.connection.soft_delete(instance_ref) - - @catch_notimplementederror - def test_restore_running(self): - instance_ref, network_info = self._get_running_instance() - self.connection.restore(instance_ref) - - @catch_notimplementederror - def test_restore_soft_deleted(self): - instance_ref, network_info = self._get_running_instance() - self.connection.soft_delete(instance_ref) - self.connection.restore(instance_ref) - - @catch_notimplementederror - def test_pause(self): - instance_ref, network_info = self._get_running_instance() - self.connection.pause(instance_ref) - - @catch_notimplementederror - def test_unpause_unpaused_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.unpause(instance_ref) - - @catch_notimplementederror - def test_unpause_paused_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.pause(instance_ref) - self.connection.unpause(instance_ref) - - @catch_notimplementederror - def test_suspend(self): - instance_ref, network_info = self._get_running_instance() - self.connection.suspend(instance_ref) - - @catch_notimplementederror - def test_resume_unsuspended_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.resume(instance_ref, network_info) - - @catch_notimplementederror - def test_resume_suspended_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.suspend(instance_ref) - self.connection.resume(instance_ref, network_info) - - @catch_notimplementederror - def test_destroy_instance_nonexistent(self): - fake_instance = {'id': 42, 'name': 'I just made this up!', - 'uuid': 'bda5fb9e-b347-40e8-8256-42397848cb00'} - network_info = test_utils.get_test_network_info() - self.connection.destroy(fake_instance, network_info) - - @catch_notimplementederror - def test_destroy_instance(self): - instance_ref, network_info = self._get_running_instance() - self.assertIn(instance_ref['name'], - self.connection.list_instances()) - self.connection.destroy(instance_ref, network_info) - self.assertNotIn(instance_ref['name'], - self.connection.list_instances()) - - @catch_notimplementederror - def test_get_volume_connector(self): - result = self.connection.get_volume_connector({'id': 'fake'}) - self.assertTrue('ip' in result) - self.assertTrue('initiator' in result) - self.assertTrue('host' in result) - - @catch_notimplementederror - def test_attach_detach_volume(self): - instance_ref, network_info = self._get_running_instance() - self.connection.attach_volume({'driver_volume_type': 'fake'}, - instance_ref, - '/dev/sda') - self.connection.detach_volume({'driver_volume_type': 'fake'}, - instance_ref, - '/dev/sda') - - @catch_notimplementederror - def test_attach_detach_different_power_states(self): - instance_ref, network_info = self._get_running_instance() - self.connection.power_off(instance_ref) - self.connection.attach_volume({'driver_volume_type': 'fake'}, - instance_ref, - '/dev/sda') - self.connection.power_on(instance_ref) - self.connection.detach_volume({'driver_volume_type': 'fake'}, - instance_ref, - '/dev/sda') - - @catch_notimplementederror - def test_get_info(self): - instance_ref, network_info = self._get_running_instance() - info = self.connection.get_info(instance_ref) - self.assertIn('state', info) - self.assertIn('max_mem', info) - self.assertIn('mem', info) - self.assertIn('num_cpu', info) - self.assertIn('cpu_time', info) - - @catch_notimplementederror - def test_get_info_for_unknown_instance(self): - self.assertRaises(exception.NotFound, - self.connection.get_info, - {'name': 'I just made this name up'}) - - @catch_notimplementederror - def test_get_diagnostics(self): - instance_ref, network_info = self._get_running_instance() - self.connection.get_diagnostics(instance_ref) - - @catch_notimplementederror - def test_block_stats(self): - instance_ref, network_info = self._get_running_instance() - stats = self.connection.block_stats(instance_ref['name'], 'someid') - self.assertEquals(len(stats), 5) - - @catch_notimplementederror - def test_interface_stats(self): - instance_ref, network_info = self._get_running_instance() - stats = self.connection.interface_stats(instance_ref['name'], 'someid') - self.assertEquals(len(stats), 8) - - @catch_notimplementederror - def test_get_console_output(self): - fake_libvirt_utils.files['dummy.log'] = '' - instance_ref, network_info = self._get_running_instance() - console_output = self.connection.get_console_output(instance_ref) - self.assertTrue(isinstance(console_output, basestring)) - - @catch_notimplementederror - def test_get_vnc_console(self): - instance_ref, network_info = self._get_running_instance() - vnc_console = self.connection.get_vnc_console(instance_ref) - self.assertIn('internal_access_path', vnc_console) - self.assertIn('host', vnc_console) - self.assertIn('port', vnc_console) - - @catch_notimplementederror - def test_get_spice_console(self): - instance_ref, network_info = self._get_running_instance() - spice_console = self.connection.get_spice_console(instance_ref) - self.assertIn('internal_access_path', spice_console) - self.assertIn('host', spice_console) - self.assertIn('port', spice_console) - self.assertIn('tlsPort', spice_console) - - @catch_notimplementederror - def test_get_console_pool_info(self): - instance_ref, network_info = self._get_running_instance() - console_pool = self.connection.get_console_pool_info(instance_ref) - self.assertIn('address', console_pool) - self.assertIn('username', console_pool) - self.assertIn('password', console_pool) - - @catch_notimplementederror - def test_refresh_security_group_rules(self): - # FIXME: Create security group and add the instance to it - instance_ref, network_info = self._get_running_instance() - self.connection.refresh_security_group_rules(1) - - @catch_notimplementederror - def test_refresh_security_group_members(self): - # FIXME: Create security group and add the instance to it - instance_ref, network_info = self._get_running_instance() - self.connection.refresh_security_group_members(1) - - @catch_notimplementederror - def test_refresh_provider_fw_rules(self): - instance_ref, network_info = self._get_running_instance() - self.connection.refresh_provider_fw_rules() - - @catch_notimplementederror - def test_ensure_filtering_for_instance(self): - instance_ref = test_utils.get_test_instance() - network_info = test_utils.get_test_network_info() - self.connection.ensure_filtering_rules_for_instance(instance_ref, - network_info) - - @catch_notimplementederror - def test_unfilter_instance(self): - instance_ref = test_utils.get_test_instance() - network_info = test_utils.get_test_network_info() - self.connection.unfilter_instance(instance_ref, network_info) - - @catch_notimplementederror - def test_live_migration(self): - instance_ref, network_info = self._get_running_instance() - self.connection.live_migration(self.ctxt, instance_ref, 'otherhost', - lambda *a: None, lambda *a: None) - - @catch_notimplementederror - def _check_host_status_fields(self, host_status): - self.assertIn('disk_total', host_status) - self.assertIn('disk_used', host_status) - self.assertIn('host_memory_total', host_status) - self.assertIn('host_memory_free', host_status) - - @catch_notimplementederror - def test_get_host_stats(self): - host_status = self.connection.get_host_stats() - self._check_host_status_fields(host_status) - - @catch_notimplementederror - def test_set_host_enabled(self): - self.connection.set_host_enabled('a useless argument?', True) - - @catch_notimplementederror - def test_get_host_uptime(self): - self.connection.get_host_uptime('a useless argument?') - - @catch_notimplementederror - def test_host_power_action_reboot(self): - self.connection.host_power_action('a useless argument?', 'reboot') - - @catch_notimplementederror - def test_host_power_action_shutdown(self): - self.connection.host_power_action('a useless argument?', 'shutdown') - - @catch_notimplementederror - def test_host_power_action_startup(self): - self.connection.host_power_action('a useless argument?', 'startup') - - @catch_notimplementederror - def test_add_to_aggregate(self): - self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host') - - @catch_notimplementederror - def test_remove_from_aggregate(self): - self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host') - - def test_events(self): - got_events = [] - - def handler(event): - got_events.append(event) - - self.connection.register_event_listener(handler) - - event1 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_STARTED) - event2 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_PAUSED) - - self.connection.emit_event(event1) - self.connection.emit_event(event2) - want_events = [event1, event2] - self.assertEqual(want_events, got_events) - - event3 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_RESUMED) - event4 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_STOPPED) - - self.connection.emit_event(event3) - self.connection.emit_event(event4) - - want_events = [event1, event2, event3, event4] - self.assertEqual(want_events, got_events) - - def test_event_bad_object(self): - # Passing in something which does not inherit - # from virtevent.Event - - def handler(event): - pass - - self.connection.register_event_listener(handler) - - badevent = { - "foo": "bar" - } - - self.assertRaises(ValueError, - self.connection.emit_event, - badevent) - - def test_event_bad_callback(self): - # Check that if a callback raises an exception, - # it does not propagate back out of the - # 'emit_event' call - - def handler(event): - raise Exception("Hit Me!") - - self.connection.register_event_listener(handler) - - event1 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_STARTED) - - self.connection.emit_event(event1) - - -class AbstractDriverTestCase(_VirtDriverTestCase, test.TestCase): - def setUp(self): - self.driver_module = "nova.virt.driver.ComputeDriver" - super(AbstractDriverTestCase, self).setUp() - - -class FakeConnectionTestCase(_VirtDriverTestCase, test.TestCase): - def setUp(self): - self.driver_module = 'nova.virt.fake.FakeDriver' - super(FakeConnectionTestCase, self).setUp() - - -class LibvirtConnTestCase(_VirtDriverTestCase, test.TestCase): - def setUp(self): - # Point _VirtDriverTestCase at the right module - self.driver_module = 'nova.virt.libvirt.LibvirtDriver' - super(LibvirtConnTestCase, self).setUp() - - def test_force_hard_reboot(self): - self.flags(libvirt_wait_soft_reboot_seconds=0) - self.test_reboot() - - def test_migrate_disk_and_power_off(self): - # there is lack of fake stuff to execute this method. so pass. - self.skipTest("Test nothing, but this method" - " needed to override superclass.") diff --git a/nova/tests/virt/test_driver.py b/nova/tests/virt/test_driver.py new file mode 100644 index 000000000..0416bbdd5 --- /dev/null +++ b/nova/tests/virt/test_driver.py @@ -0,0 +1,60 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 Citrix Systems, Inc. +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova import test +from nova.virt import driver + + +class FakeDriver(object): + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + +class FakeDriver2(FakeDriver): + pass + + +class ToDriverRegistryTestCase(test.TestCase): + + def assertDriverInstance(self, inst, class_, *args, **kwargs): + self.assertEquals(class_, inst.__class__) + self.assertEquals(args, inst.args) + self.assertEquals(kwargs, inst.kwargs) + + def test_driver_dict_from_config(self): + drvs = driver.driver_dict_from_config( + [ + 'key1=nova.tests.virt.test_driver.FakeDriver', + 'key2=nova.tests.virt.test_driver.FakeDriver2', + ], 'arg1', 'arg2', param1='value1', param2='value2' + ) + + self.assertEquals( + sorted(['key1', 'key2']), + sorted(drvs.keys()) + ) + + self.assertDriverInstance( + drvs['key1'], + FakeDriver, 'arg1', 'arg2', param1='value1', + param2='value2') + + self.assertDriverInstance( + drvs['key2'], + FakeDriver2, 'arg1', 'arg2', param1='value1', + param2='value2') diff --git a/nova/tests/virt/test_virt.py b/nova/tests/virt/test_virt.py new file mode 100644 index 000000000..452277c54 --- /dev/null +++ b/nova/tests/virt/test_virt.py @@ -0,0 +1,139 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Isaku Yamahata +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from nova import test +from nova import utils +from nova.virt.disk import api as disk_api +from nova.virt import driver + + +class TestVirtDriver(test.TestCase): + def test_block_device(self): + swap = {'device_name': '/dev/sdb', + 'swap_size': 1} + ephemerals = [{'num': 0, + 'virtual_name': 'ephemeral0', + 'device_name': '/dev/sdc1', + 'size': 1}] + block_device_mapping = [{'mount_device': '/dev/sde', + 'device_path': 'fake_device'}] + block_device_info = { + 'root_device_name': '/dev/sda', + 'swap': swap, + 'ephemerals': ephemerals, + 'block_device_mapping': block_device_mapping} + + empty_block_device_info = {} + + self.assertEqual( + driver.block_device_info_get_root(block_device_info), '/dev/sda') + self.assertEqual( + driver.block_device_info_get_root(empty_block_device_info), None) + self.assertEqual( + driver.block_device_info_get_root(None), None) + + self.assertEqual( + driver.block_device_info_get_swap(block_device_info), swap) + self.assertEqual(driver.block_device_info_get_swap( + empty_block_device_info)['device_name'], None) + self.assertEqual(driver.block_device_info_get_swap( + empty_block_device_info)['swap_size'], 0) + self.assertEqual( + driver.block_device_info_get_swap({'swap': None})['device_name'], + None) + self.assertEqual( + driver.block_device_info_get_swap({'swap': None})['swap_size'], + 0) + self.assertEqual( + driver.block_device_info_get_swap(None)['device_name'], None) + self.assertEqual( + driver.block_device_info_get_swap(None)['swap_size'], 0) + + self.assertEqual( + driver.block_device_info_get_ephemerals(block_device_info), + ephemerals) + self.assertEqual( + driver.block_device_info_get_ephemerals(empty_block_device_info), + []) + self.assertEqual( + driver.block_device_info_get_ephemerals(None), + []) + + def test_swap_is_usable(self): + self.assertFalse(driver.swap_is_usable(None)) + self.assertFalse(driver.swap_is_usable({'device_name': None})) + self.assertFalse(driver.swap_is_usable({'device_name': '/dev/sdb', + 'swap_size': 0})) + self.assertTrue(driver.swap_is_usable({'device_name': '/dev/sdb', + 'swap_size': 1})) + + +class TestVirtDisk(test.TestCase): + def setUp(self): + super(TestVirtDisk, self).setUp() + self.executes = [] + + def fake_execute(*cmd, **kwargs): + self.executes.append(cmd) + return None, None + + self.stubs.Set(utils, 'execute', fake_execute) + + def test_lxc_teardown_container(self): + + def proc_mounts(self, mount_point): + mount_points = { + '/mnt/loop/nopart': '/dev/loop0', + '/mnt/loop/part': '/dev/mapper/loop0p1', + '/mnt/nbd/nopart': '/dev/nbd15', + '/mnt/nbd/part': '/dev/mapper/nbd15p1', + } + return mount_points[mount_point] + + self.stubs.Set(os.path, 'exists', lambda _: True) + self.stubs.Set(disk_api._DiskImage, '_device_for_path', proc_mounts) + expected_commands = [] + + disk_api.teardown_container('/mnt/loop/nopart') + expected_commands += [ + ('umount', '/dev/loop0'), + ('losetup', '--detach', '/dev/loop0'), + ] + + disk_api.teardown_container('/mnt/loop/part') + expected_commands += [ + ('umount', '/dev/mapper/loop0p1'), + ('kpartx', '-d', '/dev/loop0'), + ('losetup', '--detach', '/dev/loop0'), + ] + + disk_api.teardown_container('/mnt/nbd/nopart') + expected_commands += [ + ('umount', '/dev/nbd15'), + ('qemu-nbd', '-d', '/dev/nbd15'), + ] + + disk_api.teardown_container('/mnt/nbd/part') + expected_commands += [ + ('umount', '/dev/mapper/nbd15p1'), + ('kpartx', '-d', '/dev/nbd15'), + ('qemu-nbd', '-d', '/dev/nbd15'), + ] + + self.assertEqual(self.executes, expected_commands) diff --git a/nova/tests/virt/test_virt_disk.py b/nova/tests/virt/test_virt_disk.py new file mode 100644 index 000000000..0c51e8267 --- /dev/null +++ b/nova/tests/virt/test_virt_disk.py @@ -0,0 +1,219 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (C) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import sys + +from nova import exception +from nova import test +from nova.tests import fakeguestfs +from nova.virt.disk import api as diskapi +from nova.virt.disk.vfs import guestfs as vfsguestfs + + +class VirtDiskTest(test.TestCase): + + def setUp(self): + super(VirtDiskTest, self).setUp() + sys.modules['guestfs'] = fakeguestfs + vfsguestfs.guestfs = fakeguestfs + + def test_inject_data(self): + + self.assertTrue(diskapi.inject_data("/some/file", use_cow=True)) + + self.assertTrue(diskapi.inject_data("/some/file", + mandatory=('files',))) + + self.assertTrue(diskapi.inject_data("/some/file", key="mysshkey", + mandatory=('key',))) + + os_name = os.name + os.name = 'nt' # Cause password injection to fail + self.assertRaises(exception.NovaException, + diskapi.inject_data, + "/some/file", admin_password="p", + mandatory=('admin_password',)) + self.assertFalse(diskapi.inject_data("/some/file", admin_password="p")) + os.name = os_name + + def test_inject_data_key(self): + + vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") + vfs.setup() + + diskapi._inject_key_into_fs("mysshkey", vfs) + + self.assertTrue("/root/.ssh" in vfs.handle.files) + self.assertEquals(vfs.handle.files["/root/.ssh"], + {'isdir': True, 'gid': 0, 'uid': 0, 'mode': 0700}) + self.assertTrue("/root/.ssh/authorized_keys" in vfs.handle.files) + self.assertEquals(vfs.handle.files["/root/.ssh/authorized_keys"], + {'isdir': False, + 'content': "Hello World\n# The following ssh " + + "key was injected by Nova\nmysshkey\n", + 'gid': 100, + 'uid': 100, + 'mode': 0600}) + + vfs.teardown() + + def test_inject_data_key_with_selinux(self): + + vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") + vfs.setup() + + vfs.make_path("etc/selinux") + vfs.make_path("etc/rc.d") + diskapi._inject_key_into_fs("mysshkey", vfs) + + self.assertTrue("/etc/rc.d/rc.local" in vfs.handle.files) + self.assertEquals(vfs.handle.files["/etc/rc.d/rc.local"], + {'isdir': False, + 'content': "Hello World#!/bin/sh\n# Added by " + + "Nova to ensure injected ssh keys " + + "have the right context\nrestorecon " + + "-RF root/.ssh 2>/dev/null || :\n", + 'gid': 100, + 'uid': 100, + 'mode': 0700}) + + self.assertTrue("/root/.ssh" in vfs.handle.files) + self.assertEquals(vfs.handle.files["/root/.ssh"], + {'isdir': True, 'gid': 0, 'uid': 0, 'mode': 0700}) + self.assertTrue("/root/.ssh/authorized_keys" in vfs.handle.files) + self.assertEquals(vfs.handle.files["/root/.ssh/authorized_keys"], + {'isdir': False, + 'content': "Hello World\n# The following ssh " + + "key was injected by Nova\nmysshkey\n", + 'gid': 100, + 'uid': 100, + 'mode': 0600}) + + vfs.teardown() + + def test_inject_data_key_with_selinux_append_with_newline(self): + + vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") + vfs.setup() + + vfs.replace_file("/etc/rc.d/rc.local", "#!/bin/sh\necho done") + vfs.make_path("etc/selinux") + vfs.make_path("etc/rc.d") + diskapi._inject_key_into_fs("mysshkey", vfs) + + self.assertTrue("/etc/rc.d/rc.local" in vfs.handle.files) + self.assertEquals(vfs.handle.files["/etc/rc.d/rc.local"], + {'isdir': False, + 'content': "#!/bin/sh\necho done\n# Added " + "by Nova to ensure injected ssh keys have " + "the right context\nrestorecon -RF " + "root/.ssh 2>/dev/null || :\n", + 'gid': 100, + 'uid': 100, + 'mode': 0700}) + vfs.teardown() + + def test_inject_net(self): + + vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") + vfs.setup() + + diskapi._inject_net_into_fs("mynetconfig", vfs) + + self.assertTrue("/etc/network/interfaces" in vfs.handle.files) + self.assertEquals(vfs.handle.files["/etc/network/interfaces"], + {'content': 'mynetconfig', + 'gid': 100, + 'isdir': False, + 'mode': 0700, + 'uid': 100}) + vfs.teardown() + + def test_inject_metadata(self): + vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") + vfs.setup() + + diskapi._inject_metadata_into_fs([{"key": "foo", + "value": "bar"}, + {"key": "eek", + "value": "wizz"}], vfs) + + self.assertTrue("/meta.js" in vfs.handle.files) + self.assertEquals(vfs.handle.files["/meta.js"], + {'content': '{"foo": "bar", ' + + '"eek": "wizz"}', + 'gid': 100, + 'isdir': False, + 'mode': 0700, + 'uid': 100}) + vfs.teardown() + + def test_inject_admin_password(self): + vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") + vfs.setup() + + def fake_salt(): + return "1234567890abcdef" + + self.stubs.Set(diskapi, '_generate_salt', fake_salt) + + vfs.handle.write("/etc/shadow", + "root:$1$12345678$xxxxx:14917:0:99999:7:::\n" + + "bin:*:14495:0:99999:7:::\n" + + "daemon:*:14495:0:99999:7:::\n") + + vfs.handle.write("/etc/passwd", + "root:x:0:0:root:/root:/bin/bash\n" + + "bin:x:1:1:bin:/bin:/sbin/nologin\n" + + "daemon:x:2:2:daemon:/sbin:/sbin/nologin\n") + + diskapi._inject_admin_password_into_fs("123456", vfs) + + self.assertEquals(vfs.handle.files["/etc/passwd"], + {'content': "root:x:0:0:root:/root:/bin/bash\n" + + "bin:x:1:1:bin:/bin:/sbin/nologin\n" + + "daemon:x:2:2:daemon:/sbin:" + + "/sbin/nologin\n", + 'gid': 100, + 'isdir': False, + 'mode': 0700, + 'uid': 100}) + shadow = vfs.handle.files["/etc/shadow"] + + # if the encrypted password is only 13 characters long, then + # nova.virt.disk.api:_set_password fell back to DES. + if len(shadow['content']) == 91: + self.assertEquals(shadow, + {'content': "root:12tir.zIbWQ3c" + + ":14917:0:99999:7:::\n" + + "bin:*:14495:0:99999:7:::\n" + + "daemon:*:14495:0:99999:7:::\n", + 'gid': 100, + 'isdir': False, + 'mode': 0700, + 'uid': 100}) + else: + self.assertEquals(shadow, + {'content': "root:$1$12345678$a4ge4d5iJ5vw" + + "vbFS88TEN0:14917:0:99999:7:::\n" + + "bin:*:14495:0:99999:7:::\n" + + "daemon:*:14495:0:99999:7:::\n", + 'gid': 100, + 'isdir': False, + 'mode': 0700, + 'uid': 100}) + vfs.teardown() diff --git a/nova/tests/virt/test_virt_disk_vfs_guestfs.py b/nova/tests/virt/test_virt_disk_vfs_guestfs.py new file mode 100644 index 000000000..16c85f815 --- /dev/null +++ b/nova/tests/virt/test_virt_disk_vfs_guestfs.py @@ -0,0 +1,203 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (C) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys + +from nova import exception +from nova import test + +from nova.tests import fakeguestfs +from nova.virt.disk.vfs import guestfs as vfsimpl + + +class VirtDiskVFSGuestFSTest(test.TestCase): + + def setUp(self): + super(VirtDiskVFSGuestFSTest, self).setUp() + sys.modules['guestfs'] = fakeguestfs + vfsimpl.guestfs = fakeguestfs + + def test_appliance_setup_inspect(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", + imgfmt="qcow2", + partition=-1) + vfs.setup() + + self.assertEqual(vfs.handle.running, True) + self.assertEqual(len(vfs.handle.mounts), 2) + self.assertEqual(vfs.handle.mounts[0][1], + "/dev/mapper/guestvgf-lv_root") + self.assertEqual(vfs.handle.mounts[1][1], "/dev/vda1") + self.assertEqual(vfs.handle.mounts[0][2], "/") + self.assertEqual(vfs.handle.mounts[1][2], "/boot") + + handle = vfs.handle + vfs.teardown() + + self.assertEqual(vfs.handle, None) + self.assertEqual(handle.running, False) + self.assertEqual(handle.closed, True) + self.assertEqual(len(handle.mounts), 0) + + def test_appliance_setup_inspect_no_root_raises(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", + imgfmt="qcow2", + partition=-1) + # call setup to init the handle so we can stub it + vfs.setup() + + def fake_inspect_os(): + return [] + + self.stubs.Set(vfs.handle, 'inspect_os', fake_inspect_os) + self.assertRaises(exception.NovaException, vfs.setup_os_inspect) + + def test_appliance_setup_inspect_multi_boots_raises(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", + imgfmt="qcow2", + partition=-1) + # call setup to init the handle so we can stub it + vfs.setup() + + def fake_inspect_os(): + return ['fake1', 'fake2'] + + self.stubs.Set(vfs.handle, 'inspect_os', fake_inspect_os) + self.assertRaises(exception.NovaException, vfs.setup_os_inspect) + + def test_appliance_setup_static_nopart(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", + imgfmt="qcow2", + partition=None) + vfs.setup() + + self.assertEqual(vfs.handle.running, True) + self.assertEqual(len(vfs.handle.mounts), 1) + self.assertEqual(vfs.handle.mounts[0][1], "/dev/sda") + self.assertEqual(vfs.handle.mounts[0][2], "/") + + handle = vfs.handle + vfs.teardown() + + self.assertEqual(vfs.handle, None) + self.assertEqual(handle.running, False) + self.assertEqual(handle.closed, True) + self.assertEqual(len(handle.mounts), 0) + + def test_appliance_setup_static_part(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", + imgfmt="qcow2", + partition=2) + vfs.setup() + + self.assertEqual(vfs.handle.running, True) + self.assertEqual(len(vfs.handle.mounts), 1) + self.assertEqual(vfs.handle.mounts[0][1], "/dev/sda2") + self.assertEqual(vfs.handle.mounts[0][2], "/") + + handle = vfs.handle + vfs.teardown() + + self.assertEqual(vfs.handle, None) + self.assertEqual(handle.running, False) + self.assertEqual(handle.closed, True) + self.assertEqual(len(handle.mounts), 0) + + def test_makepath(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.setup() + vfs.make_path("/some/dir") + vfs.make_path("/other/dir") + + self.assertTrue("/some/dir" in vfs.handle.files) + self.assertTrue("/other/dir" in vfs.handle.files) + self.assertTrue(vfs.handle.files["/some/dir"]["isdir"]) + self.assertTrue(vfs.handle.files["/other/dir"]["isdir"]) + + vfs.teardown() + + def test_append_file(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.setup() + vfs.append_file("/some/file", " Goodbye") + + self.assertTrue("/some/file" in vfs.handle.files) + self.assertEqual(vfs.handle.files["/some/file"]["content"], + "Hello World Goodbye") + + vfs.teardown() + + def test_replace_file(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.setup() + vfs.replace_file("/some/file", "Goodbye") + + self.assertTrue("/some/file" in vfs.handle.files) + self.assertEqual(vfs.handle.files["/some/file"]["content"], + "Goodbye") + + vfs.teardown() + + def test_read_file(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.setup() + self.assertEqual(vfs.read_file("/some/file"), "Hello World") + + vfs.teardown() + + def test_has_file(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.setup() + vfs.read_file("/some/file") + + self.assertTrue(vfs.has_file("/some/file")) + self.assertFalse(vfs.has_file("/other/file")) + + vfs.teardown() + + def test_set_permissions(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.setup() + vfs.read_file("/some/file") + + self.assertEquals(vfs.handle.files["/some/file"]["mode"], 0700) + + vfs.set_permissions("/some/file", 0777) + self.assertEquals(vfs.handle.files["/some/file"]["mode"], 0777) + + vfs.teardown() + + def test_set_ownership(self): + vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.setup() + vfs.read_file("/some/file") + + self.assertEquals(vfs.handle.files["/some/file"]["uid"], 100) + self.assertEquals(vfs.handle.files["/some/file"]["gid"], 100) + + vfs.set_ownership("/some/file", "fred", None) + self.assertEquals(vfs.handle.files["/some/file"]["uid"], 105) + self.assertEquals(vfs.handle.files["/some/file"]["gid"], 100) + + vfs.set_ownership("/some/file", None, "users") + self.assertEquals(vfs.handle.files["/some/file"]["uid"], 105) + self.assertEquals(vfs.handle.files["/some/file"]["gid"], 500) + + vfs.set_ownership("/some/file", "joe", "admins") + self.assertEquals(vfs.handle.files["/some/file"]["uid"], 110) + self.assertEquals(vfs.handle.files["/some/file"]["gid"], 600) + + vfs.teardown() diff --git a/nova/tests/virt/test_virt_disk_vfs_localfs.py b/nova/tests/virt/test_virt_disk_vfs_localfs.py new file mode 100644 index 000000000..b52817b18 --- /dev/null +++ b/nova/tests/virt/test_virt_disk_vfs_localfs.py @@ -0,0 +1,387 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (C) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg + +from nova import exception +from nova.openstack.common import processutils +from nova import test +from nova.tests import utils as tests_utils + +from nova.virt.disk.vfs import localfs as vfsimpl + +CONF = cfg.CONF + +dirs = [] +files = {} +commands = [] + + +def fake_execute(*args, **kwargs): + commands.append({"args": args, "kwargs": kwargs}) + + if args[0] == "readlink": + if args[1] == "-nm": + if args[2] in ["/scratch/dir/some/file", + "/scratch/dir/some/dir", + "/scratch/dir/other/dir", + "/scratch/dir/other/file"]: + return args[2], "" + elif args[1] == "-e": + if args[2] in files: + return args[2], "" + + return "", "No such file" + elif args[0] == "mkdir": + dirs.append(args[2]) + elif args[0] == "chown": + owner = args[1] + path = args[2] + if path not in files: + raise Exception("No such file: " + path) + + sep = owner.find(':') + if sep != -1: + user = owner[0:sep] + group = owner[sep + 1:] + else: + user = owner + group = None + + if user: + if user == "fred": + uid = 105 + else: + uid = 110 + files[path]["uid"] = uid + if group: + if group == "users": + gid = 500 + else: + gid = 600 + files[path]["gid"] = gid + elif args[0] == "chgrp": + group = args[1] + path = args[2] + if path not in files: + raise Exception("No such file: " + path) + + if group == "users": + gid = 500 + else: + gid = 600 + files[path]["gid"] = gid + elif args[0] == "chmod": + mode = args[1] + path = args[2] + if path not in files: + raise Exception("No such file: " + path) + + files[path]["mode"] = int(mode, 8) + elif args[0] == "cat": + path = args[1] + if path not in files: + files[path] = { + "content": "Hello World", + "gid": 100, + "uid": 100, + "mode": 0700 + } + return files[path]["content"], "" + elif args[0] == "tee": + if args[1] == "-a": + path = args[2] + append = True + else: + path = args[1] + append = False + if path not in files: + files[path] = { + "content": "Hello World", + "gid": 100, + "uid": 100, + "mode": 0700, + } + if append: + files[path]["content"] += kwargs["process_input"] + else: + files[path]["content"] = kwargs["process_input"] + + +class VirtDiskVFSLocalFSTestPaths(test.TestCase): + def setUp(self): + super(VirtDiskVFSLocalFSTestPaths, self).setUp() + + real_execute = processutils.execute + + def nonroot_execute(*cmd_parts, **kwargs): + kwargs.pop('run_as_root', None) + return real_execute(*cmd_parts, **kwargs) + + self.stubs.Set(processutils, 'execute', nonroot_execute) + + def test_check_safe_path(self): + if tests_utils.is_osx(): + self.skipTest("Unable to test on OSX") + vfs = vfsimpl.VFSLocalFS("dummy.img") + vfs.imgdir = "/foo" + ret = vfs._canonical_path('etc/something.conf') + self.assertEquals(ret, '/foo/etc/something.conf') + + def test_check_unsafe_path(self): + if tests_utils.is_osx(): + self.skipTest("Unable to test on OSX") + vfs = vfsimpl.VFSLocalFS("dummy.img") + vfs.imgdir = "/foo" + self.assertRaises(exception.Invalid, + vfs._canonical_path, + 'etc/../../../something.conf') + + +class VirtDiskVFSLocalFSTest(test.TestCase): + def test_makepath(self): + global dirs, commands + dirs = [] + commands = [] + self.stubs.Set(processutils, 'execute', fake_execute) + + vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.imgdir = "/scratch/dir" + vfs.make_path("/some/dir") + vfs.make_path("/other/dir") + + self.assertEqual(dirs, + ["/scratch/dir/some/dir", "/scratch/dir/other/dir"]), + + root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config + self.assertEqual(commands, + [{'args': ('readlink', '-nm', + '/scratch/dir/some/dir'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('mkdir', '-p', + '/scratch/dir/some/dir'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-nm', + '/scratch/dir/other/dir'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('mkdir', '-p', + '/scratch/dir/other/dir'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}]) + + def test_append_file(self): + global files, commands + files = {} + commands = [] + self.stubs.Set(processutils, 'execute', fake_execute) + + vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.imgdir = "/scratch/dir" + vfs.append_file("/some/file", " Goodbye") + + self.assertTrue("/scratch/dir/some/file" in files) + self.assertEquals(files["/scratch/dir/some/file"]["content"], + "Hello World Goodbye") + + root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config + self.assertEqual(commands, + [{'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('tee', '-a', + '/scratch/dir/some/file'), + 'kwargs': {'process_input': ' Goodbye', + 'run_as_root': True, + 'root_helper': root_helper}}]) + + def test_replace_file(self): + global files, commands + files = {} + commands = [] + self.stubs.Set(processutils, 'execute', fake_execute) + + vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.imgdir = "/scratch/dir" + vfs.replace_file("/some/file", "Goodbye") + + self.assertTrue("/scratch/dir/some/file" in files) + self.assertEquals(files["/scratch/dir/some/file"]["content"], + "Goodbye") + + root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config + self.assertEqual(commands, + [{'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('tee', '/scratch/dir/some/file'), + 'kwargs': {'process_input': 'Goodbye', + 'run_as_root': True, + 'root_helper': root_helper}}]) + + def test_read_file(self): + global commands, files + files = {} + commands = [] + self.stubs.Set(processutils, 'execute', fake_execute) + + vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.imgdir = "/scratch/dir" + self.assertEqual(vfs.read_file("/some/file"), "Hello World") + + root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config + self.assertEqual(commands, + [{'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('cat', '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}]) + + def test_has_file(self): + global commands, files + files = {} + commands = [] + self.stubs.Set(processutils, 'execute', fake_execute) + + vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.imgdir = "/scratch/dir" + vfs.read_file("/some/file") + + self.assertTrue(vfs.has_file("/some/file")) + self.assertFalse(vfs.has_file("/other/file")) + + root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config + self.assertEqual(commands, + [{'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('cat', '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-e', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-nm', + '/scratch/dir/other/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-e', + '/scratch/dir/other/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + ]) + + def test_set_permissions(self): + global commands, files + commands = [] + files = {} + self.stubs.Set(processutils, 'execute', fake_execute) + + vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.imgdir = "/scratch/dir" + vfs.read_file("/some/file") + + vfs.set_permissions("/some/file", 0777) + self.assertEquals(files["/scratch/dir/some/file"]["mode"], 0777) + + root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config + self.assertEqual(commands, + [{'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('cat', '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('chmod', '777', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}]) + + def test_set_ownership(self): + global commands, files + commands = [] + files = {} + self.stubs.Set(processutils, 'execute', fake_execute) + + vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") + vfs.imgdir = "/scratch/dir" + vfs.read_file("/some/file") + + self.assertEquals(files["/scratch/dir/some/file"]["uid"], 100) + self.assertEquals(files["/scratch/dir/some/file"]["gid"], 100) + + vfs.set_ownership("/some/file", "fred", None) + self.assertEquals(files["/scratch/dir/some/file"]["uid"], 105) + self.assertEquals(files["/scratch/dir/some/file"]["gid"], 100) + + vfs.set_ownership("/some/file", None, "users") + self.assertEquals(files["/scratch/dir/some/file"]["uid"], 105) + self.assertEquals(files["/scratch/dir/some/file"]["gid"], 500) + + vfs.set_ownership("/some/file", "joe", "admins") + self.assertEquals(files["/scratch/dir/some/file"]["uid"], 110) + self.assertEquals(files["/scratch/dir/some/file"]["gid"], 600) + + root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config + self.assertEqual(commands, + [{'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('cat', '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('chown', 'fred', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('chgrp', 'users', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('readlink', '-nm', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}, + {'args': ('chown', 'joe:admins', + '/scratch/dir/some/file'), + 'kwargs': {'run_as_root': True, + 'root_helper': root_helper}}]) diff --git a/nova/tests/virt/test_virt_drivers.py b/nova/tests/virt/test_virt_drivers.py new file mode 100644 index 000000000..c054b9624 --- /dev/null +++ b/nova/tests/virt/test_virt_drivers.py @@ -0,0 +1,642 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2010 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +import fixtures +import netaddr +import sys +import traceback + +from nova.compute import manager +from nova import exception +from nova.openstack.common import importutils +from nova.openstack.common import log as logging +from nova import test +from nova.tests.image import fake as fake_image +from nova.tests import utils as test_utils +from nova.tests.virt.libvirt import fake_libvirt_utils +from nova.virt import event as virtevent +from nova.virt import fake + +LOG = logging.getLogger(__name__) + + +def catch_notimplementederror(f): + """Decorator to simplify catching drivers raising NotImplementedError + + If a particular call makes a driver raise NotImplementedError, we + log it so that we can extract this information afterwards to + automatically generate a hypervisor/feature support matrix.""" + def wrapped_func(self, *args, **kwargs): + try: + return f(self, *args, **kwargs) + except NotImplementedError: + frame = traceback.extract_tb(sys.exc_info()[2])[-1] + LOG.error('%(driver)s does not implement %(method)s' % { + 'driver': type(self.connection), + 'method': frame[2]}) + + wrapped_func.__name__ = f.__name__ + wrapped_func.__doc__ = f.__doc__ + return wrapped_func + + +class _FakeDriverBackendTestCase(object): + def _setup_fakelibvirt(self): + # So that the _supports_direct_io does the test based + # on the current working directory, instead of the + # default instances_path which doesn't exist + self.flags(instances_path='') + + # Put fakelibvirt in place + if 'libvirt' in sys.modules: + self.saved_libvirt = sys.modules['libvirt'] + else: + self.saved_libvirt = None + + import nova.tests.virt.libvirt.fake_imagebackend as fake_imagebackend + import nova.tests.virt.libvirt.fake_libvirt_utils as fake_libvirt_utils + import nova.tests.virt.libvirt.fakelibvirt as fakelibvirt + + sys.modules['libvirt'] = fakelibvirt + import nova.virt.libvirt.driver + import nova.virt.libvirt.firewall + + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.driver.imagebackend', + fake_imagebackend)) + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.driver.libvirt', + fakelibvirt)) + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.driver.libvirt_utils', + fake_libvirt_utils)) + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.imagebackend.libvirt_utils', + fake_libvirt_utils)) + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.firewall.libvirt', + fakelibvirt)) + + self.flags(rescue_image_id="2", + rescue_kernel_id="3", + rescue_ramdisk_id=None, + libvirt_snapshots_directory='./') + + def fake_extend(image, size): + pass + + def fake_migrateToURI(*a): + pass + + def fake_make_drive(_self, _path): + pass + + def fake_get_instance_disk_info(_self, instance, xml=None): + return '[]' + + self.stubs.Set(nova.virt.libvirt.driver.LibvirtDriver, + 'get_instance_disk_info', + fake_get_instance_disk_info) + + self.stubs.Set(nova.virt.libvirt.driver.disk, + 'extend', fake_extend) + + # Like the existing fakelibvirt.migrateToURI, do nothing, + # but don't fail for these tests. + self.stubs.Set(nova.virt.libvirt.driver.libvirt.Domain, + 'migrateToURI', fake_migrateToURI) + + # We can't actually make a config drive v2 because ensure_tree has + # been faked out + self.stubs.Set(nova.virt.configdrive.ConfigDriveBuilder, + 'make_drive', fake_make_drive) + + def _teardown_fakelibvirt(self): + # Restore libvirt + if self.saved_libvirt: + sys.modules['libvirt'] = self.saved_libvirt + + def setUp(self): + super(_FakeDriverBackendTestCase, self).setUp() + # TODO(sdague): it would be nice to do this in a way that only + # the relevant backends where replaced for tests, though this + # should not harm anything by doing it for all backends + fake_image.stub_out_image_service(self.stubs) + self._setup_fakelibvirt() + + def tearDown(self): + fake_image.FakeImageService_reset() + self._teardown_fakelibvirt() + super(_FakeDriverBackendTestCase, self).tearDown() + + +class VirtDriverLoaderTestCase(_FakeDriverBackendTestCase, test.TestCase): + """Test that ComputeManager can successfully load both + old style and new style drivers and end up with the correct + final class""" + + # if your driver supports being tested in a fake way, it can go here + # + # both long form and short form drivers are supported + new_drivers = { + 'nova.virt.fake.FakeDriver': 'FakeDriver', + 'nova.virt.libvirt.LibvirtDriver': 'LibvirtDriver', + 'fake.FakeDriver': 'FakeDriver', + 'libvirt.LibvirtDriver': 'LibvirtDriver' + } + + def test_load_new_drivers(self): + for cls, driver in self.new_drivers.iteritems(): + self.flags(compute_driver=cls) + # NOTE(sdague) the try block is to make it easier to debug a + # failure by knowing which driver broke + try: + cm = manager.ComputeManager() + except Exception as e: + self.fail("Couldn't load driver %s - %s" % (cls, e)) + + self.assertEqual(cm.driver.__class__.__name__, driver, + "Could't load driver %s" % cls) + + def test_fail_to_load_new_drivers(self): + self.flags(compute_driver='nova.virt.amiga') + + def _fake_exit(error): + raise test.TestingException() + + self.stubs.Set(sys, 'exit', _fake_exit) + self.assertRaises(test.TestingException, manager.ComputeManager) + + +class _VirtDriverTestCase(_FakeDriverBackendTestCase): + def setUp(self): + super(_VirtDriverTestCase, self).setUp() + + self.flags(instances_path=self.useFixture(fixtures.TempDir()).path) + self.connection = importutils.import_object(self.driver_module, + fake.FakeVirtAPI()) + self.ctxt = test_utils.get_test_admin_context() + self.image_service = fake_image.FakeImageService() + + def _get_running_instance(self): + instance_ref = test_utils.get_test_instance() + network_info = test_utils.get_test_network_info() + image_info = test_utils.get_test_image_info(None, instance_ref) + self.connection.spawn(self.ctxt, instance_ref, image_info, + [], 'herp', network_info=network_info) + return instance_ref, network_info + + @catch_notimplementederror + def test_init_host(self): + self.connection.init_host('myhostname') + + @catch_notimplementederror + def test_list_instances(self): + self.connection.list_instances() + + @catch_notimplementederror + def test_spawn(self): + instance_ref, network_info = self._get_running_instance() + domains = self.connection.list_instances() + self.assertIn(instance_ref['name'], domains) + + num_instances = self.connection.get_num_instances() + self.assertEqual(1, num_instances) + + @catch_notimplementederror + def test_snapshot_not_running(self): + instance_ref = test_utils.get_test_instance() + img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'}) + self.assertRaises(exception.InstanceNotRunning, + self.connection.snapshot, + self.ctxt, instance_ref, img_ref['id'], + lambda *args, **kwargs: None) + + @catch_notimplementederror + def test_snapshot_running(self): + img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'}) + instance_ref, network_info = self._get_running_instance() + self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'], + lambda *args, **kwargs: None) + + @catch_notimplementederror + def test_reboot(self): + reboot_type = "SOFT" + instance_ref, network_info = self._get_running_instance() + self.connection.reboot(self.ctxt, instance_ref, network_info, + reboot_type) + + @catch_notimplementederror + def test_get_host_ip_addr(self): + host_ip = self.connection.get_host_ip_addr() + + # Will raise an exception if it's not a valid IP at all + ip = netaddr.IPAddress(host_ip) + + # For now, assume IPv4. + self.assertEquals(ip.version, 4) + + @catch_notimplementederror + def test_set_admin_password(self): + instance_ref, network_info = self._get_running_instance() + self.connection.set_admin_password(instance_ref, 'p4ssw0rd') + + @catch_notimplementederror + def test_inject_file(self): + instance_ref, network_info = self._get_running_instance() + self.connection.inject_file(instance_ref, + base64.b64encode('/testfile'), + base64.b64encode('testcontents')) + + @catch_notimplementederror + def test_resume_state_on_host_boot(self): + instance_ref, network_info = self._get_running_instance() + self.connection.resume_state_on_host_boot(self.ctxt, instance_ref, + network_info) + + @catch_notimplementederror + def test_rescue(self): + instance_ref, network_info = self._get_running_instance() + self.connection.rescue(self.ctxt, instance_ref, network_info, None, '') + + @catch_notimplementederror + def test_unrescue_unrescued_instance(self): + instance_ref, network_info = self._get_running_instance() + self.connection.unrescue(instance_ref, network_info) + + @catch_notimplementederror + def test_unrescue_rescued_instance(self): + instance_ref, network_info = self._get_running_instance() + self.connection.rescue(self.ctxt, instance_ref, network_info, None, '') + self.connection.unrescue(instance_ref, network_info) + + @catch_notimplementederror + def test_poll_rebooting_instances(self): + instances = [self._get_running_instance()] + self.connection.poll_rebooting_instances(10, instances) + + @catch_notimplementederror + def test_migrate_disk_and_power_off(self): + instance_ref, network_info = self._get_running_instance() + instance_type_ref = test_utils.get_test_instance_type() + self.connection.migrate_disk_and_power_off( + self.ctxt, instance_ref, 'dest_host', instance_type_ref, + network_info) + + @catch_notimplementederror + def test_power_off(self): + instance_ref, network_info = self._get_running_instance() + self.connection.power_off(instance_ref) + + @catch_notimplementederror + def test_power_on_running(self): + instance_ref, network_info = self._get_running_instance() + self.connection.power_on(instance_ref) + + @catch_notimplementederror + def test_power_on_powered_off(self): + instance_ref, network_info = self._get_running_instance() + self.connection.power_off(instance_ref) + self.connection.power_on(instance_ref) + + @catch_notimplementederror + def test_soft_delete(self): + instance_ref, network_info = self._get_running_instance() + self.connection.soft_delete(instance_ref) + + @catch_notimplementederror + def test_restore_running(self): + instance_ref, network_info = self._get_running_instance() + self.connection.restore(instance_ref) + + @catch_notimplementederror + def test_restore_soft_deleted(self): + instance_ref, network_info = self._get_running_instance() + self.connection.soft_delete(instance_ref) + self.connection.restore(instance_ref) + + @catch_notimplementederror + def test_pause(self): + instance_ref, network_info = self._get_running_instance() + self.connection.pause(instance_ref) + + @catch_notimplementederror + def test_unpause_unpaused_instance(self): + instance_ref, network_info = self._get_running_instance() + self.connection.unpause(instance_ref) + + @catch_notimplementederror + def test_unpause_paused_instance(self): + instance_ref, network_info = self._get_running_instance() + self.connection.pause(instance_ref) + self.connection.unpause(instance_ref) + + @catch_notimplementederror + def test_suspend(self): + instance_ref, network_info = self._get_running_instance() + self.connection.suspend(instance_ref) + + @catch_notimplementederror + def test_resume_unsuspended_instance(self): + instance_ref, network_info = self._get_running_instance() + self.connection.resume(instance_ref, network_info) + + @catch_notimplementederror + def test_resume_suspended_instance(self): + instance_ref, network_info = self._get_running_instance() + self.connection.suspend(instance_ref) + self.connection.resume(instance_ref, network_info) + + @catch_notimplementederror + def test_destroy_instance_nonexistent(self): + fake_instance = {'id': 42, 'name': 'I just made this up!', + 'uuid': 'bda5fb9e-b347-40e8-8256-42397848cb00'} + network_info = test_utils.get_test_network_info() + self.connection.destroy(fake_instance, network_info) + + @catch_notimplementederror + def test_destroy_instance(self): + instance_ref, network_info = self._get_running_instance() + self.assertIn(instance_ref['name'], + self.connection.list_instances()) + self.connection.destroy(instance_ref, network_info) + self.assertNotIn(instance_ref['name'], + self.connection.list_instances()) + + @catch_notimplementederror + def test_get_volume_connector(self): + result = self.connection.get_volume_connector({'id': 'fake'}) + self.assertTrue('ip' in result) + self.assertTrue('initiator' in result) + self.assertTrue('host' in result) + + @catch_notimplementederror + def test_attach_detach_volume(self): + instance_ref, network_info = self._get_running_instance() + self.connection.attach_volume({'driver_volume_type': 'fake'}, + instance_ref, + '/dev/sda') + self.connection.detach_volume({'driver_volume_type': 'fake'}, + instance_ref, + '/dev/sda') + + @catch_notimplementederror + def test_attach_detach_different_power_states(self): + instance_ref, network_info = self._get_running_instance() + self.connection.power_off(instance_ref) + self.connection.attach_volume({'driver_volume_type': 'fake'}, + instance_ref, + '/dev/sda') + self.connection.power_on(instance_ref) + self.connection.detach_volume({'driver_volume_type': 'fake'}, + instance_ref, + '/dev/sda') + + @catch_notimplementederror + def test_get_info(self): + instance_ref, network_info = self._get_running_instance() + info = self.connection.get_info(instance_ref) + self.assertIn('state', info) + self.assertIn('max_mem', info) + self.assertIn('mem', info) + self.assertIn('num_cpu', info) + self.assertIn('cpu_time', info) + + @catch_notimplementederror + def test_get_info_for_unknown_instance(self): + self.assertRaises(exception.NotFound, + self.connection.get_info, + {'name': 'I just made this name up'}) + + @catch_notimplementederror + def test_get_diagnostics(self): + instance_ref, network_info = self._get_running_instance() + self.connection.get_diagnostics(instance_ref) + + @catch_notimplementederror + def test_block_stats(self): + instance_ref, network_info = self._get_running_instance() + stats = self.connection.block_stats(instance_ref['name'], 'someid') + self.assertEquals(len(stats), 5) + + @catch_notimplementederror + def test_interface_stats(self): + instance_ref, network_info = self._get_running_instance() + stats = self.connection.interface_stats(instance_ref['name'], 'someid') + self.assertEquals(len(stats), 8) + + @catch_notimplementederror + def test_get_console_output(self): + fake_libvirt_utils.files['dummy.log'] = '' + instance_ref, network_info = self._get_running_instance() + console_output = self.connection.get_console_output(instance_ref) + self.assertTrue(isinstance(console_output, basestring)) + + @catch_notimplementederror + def test_get_vnc_console(self): + instance_ref, network_info = self._get_running_instance() + vnc_console = self.connection.get_vnc_console(instance_ref) + self.assertIn('internal_access_path', vnc_console) + self.assertIn('host', vnc_console) + self.assertIn('port', vnc_console) + + @catch_notimplementederror + def test_get_spice_console(self): + instance_ref, network_info = self._get_running_instance() + spice_console = self.connection.get_spice_console(instance_ref) + self.assertIn('internal_access_path', spice_console) + self.assertIn('host', spice_console) + self.assertIn('port', spice_console) + self.assertIn('tlsPort', spice_console) + + @catch_notimplementederror + def test_get_console_pool_info(self): + instance_ref, network_info = self._get_running_instance() + console_pool = self.connection.get_console_pool_info(instance_ref) + self.assertIn('address', console_pool) + self.assertIn('username', console_pool) + self.assertIn('password', console_pool) + + @catch_notimplementederror + def test_refresh_security_group_rules(self): + # FIXME: Create security group and add the instance to it + instance_ref, network_info = self._get_running_instance() + self.connection.refresh_security_group_rules(1) + + @catch_notimplementederror + def test_refresh_security_group_members(self): + # FIXME: Create security group and add the instance to it + instance_ref, network_info = self._get_running_instance() + self.connection.refresh_security_group_members(1) + + @catch_notimplementederror + def test_refresh_provider_fw_rules(self): + instance_ref, network_info = self._get_running_instance() + self.connection.refresh_provider_fw_rules() + + @catch_notimplementederror + def test_ensure_filtering_for_instance(self): + instance_ref = test_utils.get_test_instance() + network_info = test_utils.get_test_network_info() + self.connection.ensure_filtering_rules_for_instance(instance_ref, + network_info) + + @catch_notimplementederror + def test_unfilter_instance(self): + instance_ref = test_utils.get_test_instance() + network_info = test_utils.get_test_network_info() + self.connection.unfilter_instance(instance_ref, network_info) + + @catch_notimplementederror + def test_live_migration(self): + instance_ref, network_info = self._get_running_instance() + self.connection.live_migration(self.ctxt, instance_ref, 'otherhost', + lambda *a: None, lambda *a: None) + + @catch_notimplementederror + def _check_host_status_fields(self, host_status): + self.assertIn('disk_total', host_status) + self.assertIn('disk_used', host_status) + self.assertIn('host_memory_total', host_status) + self.assertIn('host_memory_free', host_status) + + @catch_notimplementederror + def test_get_host_stats(self): + host_status = self.connection.get_host_stats() + self._check_host_status_fields(host_status) + + @catch_notimplementederror + def test_set_host_enabled(self): + self.connection.set_host_enabled('a useless argument?', True) + + @catch_notimplementederror + def test_get_host_uptime(self): + self.connection.get_host_uptime('a useless argument?') + + @catch_notimplementederror + def test_host_power_action_reboot(self): + self.connection.host_power_action('a useless argument?', 'reboot') + + @catch_notimplementederror + def test_host_power_action_shutdown(self): + self.connection.host_power_action('a useless argument?', 'shutdown') + + @catch_notimplementederror + def test_host_power_action_startup(self): + self.connection.host_power_action('a useless argument?', 'startup') + + @catch_notimplementederror + def test_add_to_aggregate(self): + self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host') + + @catch_notimplementederror + def test_remove_from_aggregate(self): + self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host') + + def test_events(self): + got_events = [] + + def handler(event): + got_events.append(event) + + self.connection.register_event_listener(handler) + + event1 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_STARTED) + event2 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_PAUSED) + + self.connection.emit_event(event1) + self.connection.emit_event(event2) + want_events = [event1, event2] + self.assertEqual(want_events, got_events) + + event3 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_RESUMED) + event4 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_STOPPED) + + self.connection.emit_event(event3) + self.connection.emit_event(event4) + + want_events = [event1, event2, event3, event4] + self.assertEqual(want_events, got_events) + + def test_event_bad_object(self): + # Passing in something which does not inherit + # from virtevent.Event + + def handler(event): + pass + + self.connection.register_event_listener(handler) + + badevent = { + "foo": "bar" + } + + self.assertRaises(ValueError, + self.connection.emit_event, + badevent) + + def test_event_bad_callback(self): + # Check that if a callback raises an exception, + # it does not propagate back out of the + # 'emit_event' call + + def handler(event): + raise Exception("Hit Me!") + + self.connection.register_event_listener(handler) + + event1 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_STARTED) + + self.connection.emit_event(event1) + + +class AbstractDriverTestCase(_VirtDriverTestCase, test.TestCase): + def setUp(self): + self.driver_module = "nova.virt.driver.ComputeDriver" + super(AbstractDriverTestCase, self).setUp() + + +class FakeConnectionTestCase(_VirtDriverTestCase, test.TestCase): + def setUp(self): + self.driver_module = 'nova.virt.fake.FakeDriver' + super(FakeConnectionTestCase, self).setUp() + + +class LibvirtConnTestCase(_VirtDriverTestCase, test.TestCase): + def setUp(self): + # Point _VirtDriverTestCase at the right module + self.driver_module = 'nova.virt.libvirt.LibvirtDriver' + super(LibvirtConnTestCase, self).setUp() + + def test_force_hard_reboot(self): + self.flags(libvirt_wait_soft_reboot_seconds=0) + self.test_reboot() + + def test_migrate_disk_and_power_off(self): + # there is lack of fake stuff to execute this method. so pass. + self.skipTest("Test nothing, but this method" + " needed to override superclass.") -- cgit