# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 Grid Dynamics # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import fixtures import os from nova.openstack.common import cfg from nova.openstack.common import uuidutils from nova import test from nova.tests import fake_libvirt_utils from nova.virt.libvirt import imagebackend CONF = cfg.CONF class _ImageTestCase(object): INSTANCES_PATH = '/instances_path' def mock_create_image(self, image): def create_image(fn, base, size, *args, **kwargs): fn(target=base, *args, **kwargs) image.create_image = create_image def setUp(self): super(_ImageTestCase, self).setUp() self.flags(disable_process_locking=True, instances_path=self.INSTANCES_PATH) self.INSTANCE = {'name': 'instance', 'uuid': uuidutils.generate_uuid()} self.NAME = 'fake.vm' self.TEMPLATE = 'template' self.OLD_STYLE_INSTANCE_PATH = \ fake_libvirt_utils.get_instance_path(self.INSTANCE, forceold=True) self.PATH = os.path.join( fake_libvirt_utils.get_instance_path(self.INSTANCE), self.NAME) # TODO(mikal): rename template_dir to base_dir and template_path # to cached_image_path. This will be less confusing. self.TEMPLATE_DIR = os.path.join(CONF.instances_path, '_base') self.TEMPLATE_PATH = os.path.join(self.TEMPLATE_DIR, 'template') self.useFixture(fixtures.MonkeyPatch( 'nova.virt.libvirt.imagebackend.libvirt_utils', fake_libvirt_utils)) def test_cache(self): self.mox.StubOutWithMock(os.path, 'exists') if self.OLD_STYLE_INSTANCE_PATH: os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False) os.path.exists(self.TEMPLATE_DIR).AndReturn(False) os.path.exists(self.PATH).AndReturn(False) os.path.exists(self.TEMPLATE_PATH).AndReturn(False) fn = self.mox.CreateMockAnything() fn(target=self.TEMPLATE_PATH) self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree') imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) self.mox.VerifyAll() def test_cache_image_exists(self): self.mox.StubOutWithMock(os.path, 'exists') if self.OLD_STYLE_INSTANCE_PATH: os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False) os.path.exists(self.TEMPLATE_DIR).AndReturn(True) os.path.exists(self.PATH).AndReturn(True) os.path.exists(self.TEMPLATE_PATH).AndReturn(True) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.cache(None, self.TEMPLATE) self.mox.VerifyAll() def test_cache_base_dir_exists(self): self.mox.StubOutWithMock(os.path, 'exists') if self.OLD_STYLE_INSTANCE_PATH: os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False) os.path.exists(self.TEMPLATE_DIR).AndReturn(True) os.path.exists(self.PATH).AndReturn(False) os.path.exists(self.TEMPLATE_PATH).AndReturn(False) fn = self.mox.CreateMockAnything() fn(target=self.TEMPLATE_PATH) self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree') self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) self.mox.VerifyAll() def test_cache_template_exists(self): self.mox.StubOutWithMock(os.path, 'exists') if self.OLD_STYLE_INSTANCE_PATH: os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False) os.path.exists(self.TEMPLATE_DIR).AndReturn(True) os.path.exists(self.PATH).AndReturn(False) os.path.exists(self.TEMPLATE_PATH).AndReturn(True) fn = self.mox.CreateMockAnything() self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) self.mox.VerifyAll() class RawTestCase(_ImageTestCase, test.TestCase): SIZE = 1024 def setUp(self): self.image_class = imagebackend.Raw super(RawTestCase, self).setUp() def prepare_mocks(self): fn = self.mox.CreateMockAnything() self.mox.StubOutWithMock(imagebackend.lockutils.synchronized, '__call__') self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image') self.mox.StubOutWithMock(imagebackend.disk, 'extend') return fn def test_create_image(self): fn = self.prepare_mocks() fn(target=self.TEMPLATE_PATH, image_id=None) imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, None, image_id=None) self.mox.VerifyAll() def test_create_image_generated(self): fn = self.prepare_mocks() fn(target=self.PATH) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, None) self.mox.VerifyAll() def test_create_image_extend(self): fn = self.prepare_mocks() fn(target=self.TEMPLATE_PATH, image_id=None) imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH) imagebackend.disk.extend(self.PATH, self.SIZE) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, image_id=None) self.mox.VerifyAll() class Qcow2TestCase(_ImageTestCase, test.TestCase): SIZE = 1024 * 1024 * 1024 def setUp(self): self.image_class = imagebackend.Qcow2 super(Qcow2TestCase, self).setUp() self.QCOW2_BASE = (self.TEMPLATE_PATH + '_%d' % (self.SIZE / (1024 * 1024 * 1024))) def prepare_mocks(self): fn = self.mox.CreateMockAnything() self.mox.StubOutWithMock(imagebackend.lockutils.synchronized, '__call__') self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'create_cow_image') self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image') self.mox.StubOutWithMock(imagebackend.disk, 'extend') return fn def test_create_image(self): fn = self.prepare_mocks() fn(target=self.TEMPLATE_PATH) imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH, self.PATH) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, None) self.mox.VerifyAll() def test_create_image_with_size(self): fn = self.prepare_mocks() fn(target=self.TEMPLATE_PATH) self.mox.StubOutWithMock(os.path, 'exists') if self.OLD_STYLE_INSTANCE_PATH: os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False) os.path.exists(self.TEMPLATE_PATH).AndReturn(False) os.path.exists(self.PATH).AndReturn(False) imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH, self.PATH) imagebackend.disk.extend(self.PATH, self.SIZE) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE) self.mox.VerifyAll() class LvmTestCase(_ImageTestCase, test.TestCase): VG = 'FakeVG' TEMPLATE_SIZE = 512 SIZE = 1024 def setUp(self): self.image_class = imagebackend.Lvm super(LvmTestCase, self).setUp() self.flags(libvirt_images_volume_group=self.VG) self.LV = '%s_%s' % (self.INSTANCE['name'], self.NAME) self.OLD_STYLE_INSTANCE_PATH = None self.PATH = os.path.join('/dev', self.VG, self.LV) self.disk = imagebackend.disk self.utils = imagebackend.utils self.libvirt_utils = imagebackend.libvirt_utils def prepare_mocks(self): fn = self.mox.CreateMockAnything() self.mox.StubOutWithMock(self.disk, 'resize2fs') self.mox.StubOutWithMock(self.libvirt_utils, 'create_lvm_image') self.mox.StubOutWithMock(self.disk, 'get_disk_size') self.mox.StubOutWithMock(self.utils, 'execute') return fn def _create_image(self, sparse): fn = self.prepare_mocks() fn(target=self.TEMPLATE_PATH) self.libvirt_utils.create_lvm_image(self.VG, self.LV, self.TEMPLATE_SIZE, sparse=sparse) self.disk.get_disk_size(self.TEMPLATE_PATH ).AndReturn(self.TEMPLATE_SIZE) cmd = ('qemu-img', 'convert', '-O', 'raw', self.TEMPLATE_PATH, self.PATH) self.utils.execute(*cmd, run_as_root=True) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, None) self.mox.VerifyAll() def _create_image_generated(self, sparse): fn = self.prepare_mocks() self.libvirt_utils.create_lvm_image(self.VG, self.LV, self.SIZE, sparse=sparse) fn(target=self.PATH, ephemeral_size=None) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, ephemeral_size=None) self.mox.VerifyAll() def _create_image_resize(self, sparse): fn = self.prepare_mocks() fn(target=self.TEMPLATE_PATH) self.libvirt_utils.create_lvm_image(self.VG, self.LV, self.SIZE, sparse=sparse) self.disk.get_disk_size(self.TEMPLATE_PATH ).AndReturn(self.TEMPLATE_SIZE) cmd = ('qemu-img', 'convert', '-O', 'raw', self.TEMPLATE_PATH, self.PATH) self.utils.execute(*cmd, run_as_root=True) self.disk.resize2fs(self.PATH, run_as_root=True) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE) self.mox.VerifyAll() def test_create_image(self): self._create_image(False) def test_create_image_sparsed(self): self.flags(libvirt_sparse_logical_volumes=True) self._create_image(True) def test_create_image_generated(self): self._create_image_generated(False) def test_create_image_generated_sparsed(self): self.flags(libvirt_sparse_logical_volumes=True) self._create_image_generated(True) def test_create_image_resize(self): self._create_image_resize(False) def test_create_image_resize_sparsed(self): self.flags(libvirt_sparse_logical_volumes=True) self._create_image_resize(True) def test_create_image_negative(self): fn = self.prepare_mocks() fn(target=self.TEMPLATE_PATH) self.libvirt_utils.create_lvm_image(self.VG, self.LV, self.SIZE, sparse=False ).AndRaise(RuntimeError()) self.disk.get_disk_size(self.TEMPLATE_PATH ).AndReturn(self.TEMPLATE_SIZE) self.mox.StubOutWithMock(self.libvirt_utils, 'remove_logical_volumes') self.libvirt_utils.remove_logical_volumes(self.PATH) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises(RuntimeError, image.create_image, fn, self.TEMPLATE_PATH, self.SIZE) self.mox.VerifyAll() def test_create_image_generated_negative(self): fn = self.prepare_mocks() fn(target=self.PATH, ephemeral_size=None).AndRaise(RuntimeError()) self.libvirt_utils.create_lvm_image(self.VG, self.LV, self.SIZE, sparse=False) self.mox.StubOutWithMock(self.libvirt_utils, 'remove_logical_volumes') self.libvirt_utils.remove_logical_volumes(self.PATH) self.mox.ReplayAll() image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises(RuntimeError, image.create_image, fn, self.TEMPLATE_PATH, self.SIZE, ephemeral_size=None) self.mox.VerifyAll() class BackendTestCase(test.TestCase): INSTANCE = {'name': 'fake-instance', 'uuid': uuidutils.generate_uuid()} NAME = 'fake-name.suffix' def get_image(self, use_cow, image_type): return imagebackend.Backend(use_cow).image(self.INSTANCE, self.NAME, image_type) def _test_image(self, image_type, image_not_cow, image_cow): image1 = self.get_image(False, image_type) image2 = self.get_image(True, image_type) def assertIsInstance(instance, class_object): failure = ('Expected %s,' + ' but got %s.') % (class_object.__name__, instance.__class__.__name__) self.assertTrue(isinstance(instance, class_object), failure) assertIsInstance(image1, image_not_cow) assertIsInstance(image2, image_cow) def test_image_raw(self): self._test_image('raw', imagebackend.Raw, imagebackend.Raw) def test_image_qcow2(self): self._test_image('qcow2', imagebackend.Qcow2, imagebackend.Qcow2) def test_image_lvm(self): self.flags(libvirt_images_volume_group='FakeVG') self._test_image('lvm', imagebackend.Lvm, imagebackend.Lvm) def test_image_default(self): self._test_image('default', imagebackend.Raw, imagebackend.Qcow2)