diff options
Diffstat (limited to 'tests/storage_test')
-rw-r--r-- | tests/storage_test/Makefile.am | 24 | ||||
-rw-r--r-- | tests/storage_test/action_test.py | 926 | ||||
-rw-r--r-- | tests/storage_test/devicelibs_test/Makefile.am | 22 | ||||
-rw-r--r-- | tests/storage_test/devicelibs_test/baseclass.py | 80 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/crypto_test.py | 135 | ||||
-rw-r--r-- | tests/storage_test/devicelibs_test/edd_test.py | 212 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/lvm_test.py | 239 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/mdraid_test.py | 114 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/mpath_test.py | 86 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/swap_test.py | 74 | ||||
-rw-r--r-- | tests/storage_test/partitioning_test.py | 130 | ||||
-rw-r--r-- | tests/storage_test/size_test.py | 86 | ||||
-rw-r--r-- | tests/storage_test/storagetestcase.py | 287 | ||||
-rw-r--r-- | tests/storage_test/tsort_test.py | 62 | ||||
-rw-r--r-- | tests/storage_test/udev_test.py | 139 |
15 files changed, 0 insertions, 2616 deletions
diff --git a/tests/storage_test/Makefile.am b/tests/storage_test/Makefile.am deleted file mode 100644 index 23565abb8..000000000 --- a/tests/storage_test/Makefile.am +++ /dev/null @@ -1,24 +0,0 @@ -# tests/storage/Makefile.am for anaconda -# -# Copyright (C) 2009 Red Hat, Inc. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published -# by the Free Software Foundation; either version 2.1 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# Author: David Cantrell <dcantrell@redhat.com> - -SUBDIRS = devicelibs_test - -EXTRA_DIST = *.py - -MAINTAINERCLEANFILES = Makefile.in diff --git a/tests/storage_test/action_test.py b/tests/storage_test/action_test.py deleted file mode 100644 index b07a44b23..000000000 --- a/tests/storage_test/action_test.py +++ /dev/null @@ -1,926 +0,0 @@ -#!/usr/bin/python - -import unittest -from mock import Mock -from mock import TestCase - -from storagetestcase import StorageTestCase -import pyanaconda.storage as storage -from pyanaconda.storage.formats import getFormat - -# device classes for brevity's sake -- later on, that is -from pyanaconda.storage.devices import StorageDevice -from pyanaconda.storage.devices import DiskDevice -from pyanaconda.storage.devices import PartitionDevice -from pyanaconda.storage.devices import MDRaidArrayDevice -from pyanaconda.storage.devices import DMDevice -from pyanaconda.storage.devices import LUKSDevice -from pyanaconda.storage.devices import LVMVolumeGroupDevice -from pyanaconda.storage.devices import LVMLogicalVolumeDevice -from pyanaconda.storage.devices import FileDevice - -# action classes -from pyanaconda.storage.deviceaction import ActionCreateDevice -from pyanaconda.storage.deviceaction import ActionResizeDevice -from pyanaconda.storage.deviceaction import ActionDestroyDevice -from pyanaconda.storage.deviceaction import ActionCreateFormat -from pyanaconda.storage.deviceaction import ActionResizeFormat -from pyanaconda.storage.deviceaction import ActionMigrateFormat -from pyanaconda.storage.deviceaction import ActionDestroyFormat - -""" DeviceActionTestSuite """ - -class DeviceActionTestCase(StorageTestCase): - def setUp(self): - """ Create something like a preexisting autopart on two disks (sda,sdb). - - The other two disks (sdc,sdd) are left for individual tests to use. - """ - self.setUpAnaconda() - - for name in ["sda", "sdb", "sdc", "sdd"]: - disk = self.newDevice(device_class=DiskDevice, - name=name, size=100000) - disk.format = self.newFormat("disklabel", path=disk.path, - exists=True) - self.storage.devicetree._addDevice(disk) - - # create a layout similar to autopart as a starting point - sda = self.storage.devicetree.getDeviceByName("sda") - sdb = self.storage.devicetree.getDeviceByName("sdb") - - sda1 = self.newDevice(device_class=PartitionDevice, - exists=True, name="sda1", parents=[sda], size=500) - sda1.format = self.newFormat("ext4", mountpoint="/boot", - device_instance=sda1, - device=sda1.path, exists=True) - self.storage.devicetree._addDevice(sda1) - - sda2 = self.newDevice(device_class=PartitionDevice, - size=99500, name="sda2", parents=[sda], exists=True) - sda2.format = self.newFormat("lvmpv", device=sda2.path, exists=True) - self.storage.devicetree._addDevice(sda2) - - sdb1 = self.newDevice(device_class=PartitionDevice, - size=99999, name="sdb1", parents=[sdb], exists=True) - sdb1.format = self.newFormat("lvmpv", device=sdb1.path, exists=True) - self.storage.devicetree._addDevice(sdb1) - - vg = self.newDevice(device_class=LVMVolumeGroupDevice, - name="VolGroup", parents=[sda2, sdb1], - exists=True) - self.storage.devicetree._addDevice(vg) - - lv_root = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="lv_root", vgdev=vg, size=160000, - exists=True) - lv_root.format = self.newFormat("ext4", mountpoint="/", - device_instance=lv_root, - device=lv_root.path, exists=True) - self.storage.devicetree._addDevice(lv_root) - - lv_swap = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="lv_swap", vgdev=vg, size=4000, - exists=True) - lv_swap.format = self.newFormat("swap", device=lv_swap.path, - device_instance=lv_swap, - exists=True) - self.storage.devicetree._addDevice(lv_swap) - - def testActions(self, *args, **kwargs): - """ Verify correct management of actions. - - - action creation/registration/cancellation - - ActionCreateDevice adds device to tree - - ActionDestroyDevice removes device from tree - - ActionCreateFormat sets device.format in tree - - ActionDestroyFormat unsets device.format in tree - - cancelled action's registration side-effects reversed - - failure to register destruction of non-leaf device - - failure to register creation of device already in tree? - - failure to register destruction of device not in tree? - - - action pruning - - non-existent-device create..destroy cycles removed - - all actions on this device should get removed - - all actions pruned from to-be-destroyed devices - - resize, format, migrate, &c - - redundant resize/migrate/format actions pruned - - last one registered stays - - - action sorting - - destroy..resize..migrate..create - - creation - - leaves-last, including formatting - - destruction - - leaves-first - """ - devicetree = self.storage.devicetree - - # clear the disks - self.destroyAllDevices() - self.assertEqual(devicetree.getDevicesByType("lvmlv"), []) - self.assertEqual(devicetree.getDevicesByType("lvmvg"), []) - self.assertEqual(devicetree.getDevicesByType("partition"), []) - - sda = devicetree.getDeviceByName("sda") - self.assertNotEqual(sda, None, "failed to find disk 'sda'") - - sda1 = self.newDevice(device_class=PartitionDevice, - name="sda1", size=500, parents=[sda]) - self.scheduleCreateDevice(device=sda1) - - sda2 = self.newDevice(device_class=PartitionDevice, - name="sda2", size=100000, parents=[sda]) - self.scheduleCreateDevice(device=sda2) - format = self.newFormat("lvmpv", device=sda2.path) - self.scheduleCreateFormat(device=sda2, format=format) - - vg = self.newDevice(device_class=LVMVolumeGroupDevice, - name="vg", parents=[sda2]) - self.scheduleCreateDevice(device=vg) - - lv_root = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="lv_root", vgdev=vg, size=60000) - self.scheduleCreateDevice(device=lv_root) - format = self.newFormat("ext4", device=lv_root.path, mountpoint="/") - self.scheduleCreateFormat(device=lv_root, format=format) - - lv_swap = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="lv_swap", vgdev=vg, size=4000) - self.scheduleCreateDevice(device=lv_swap) - format = self.newFormat("swap", device=lv_swap.path) - self.scheduleCreateFormat(device=lv_swap, format=format) - - sda3 = self.newDevice(device_class=PartitionDevice, - name="sda3", parents=[sda], size=40000) - self.scheduleCreateDevice(device=sda3) - format = self.newFormat("mdmember", device=sda3.path) - self.scheduleCreateFormat(device=sda3, format=format) - - sdb = devicetree.getDeviceByName("sdb") - self.assertNotEqual(sdb, None, "failed to find disk 'sdb'") - - sdb1 = self.newDevice(device_class=PartitionDevice, - name="sdb1", parents=[sdb], size=40000) - self.scheduleCreateDevice(device=sdb1) - format = self.newFormat("mdmember", device=sdb1.path,) - self.scheduleCreateFormat(device=sdb1, format=format) - - md0 = self.newDevice(device_class=MDRaidArrayDevice, - name="md0", level="raid0", minor=0, size=80000, - memberDevices=2, totalDevices=2, - parents=[sdb1, sda3]) - self.scheduleCreateDevice(device=md0) - - format = self.newFormat("ext4", device=md0.path, mountpoint="/home") - self.scheduleCreateFormat(device=md0, format=format) - - format = self.newFormat("ext4", mountpoint="/boot", device=sda1.path) - self.scheduleCreateFormat(device=sda1, format=format) - - def testActionCreation(self, *args, **kwargs): - """ Verify correct operation of action class constructors. """ - # instantiation of device resize action for non-existent device should - # fail - # XXX resizable depends on existence, so this is covered implicitly - sdd = self.storage.devicetree.getDeviceByName("sdd") - p = self.newDevice(device_class=PartitionDevice, - name="sdd1", size=32768, parents=[sdd]) - self.failUnlessRaises(ValueError, - storage.deviceaction.ActionResizeDevice, - p, - p.size + 7232) - - # instantiation of device resize action for non-resizable device - # should fail - vg = self.storage.devicetree.getDeviceByName("VolGroup") - self.assertNotEqual(vg, None) - self.failUnlessRaises(ValueError, - storage.deviceaction.ActionResizeDevice, - vg, - vg.size + 32) - - # instantiation of format resize action for non-resizable format type - # should fail - lv_swap = self.storage.devicetree.getDeviceByName("VolGroup-lv_swap") - self.assertNotEqual(lv_swap, None) - self.failUnlessRaises(ValueError, - storage.deviceaction.ActionResizeFormat, - lv_swap, - lv_swap.size + 32) - - # instantiation of format resize action for non-existent format - # should fail - lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root") - self.assertNotEqual(lv_root, None) - lv_root.format.exists = False - self.failUnlessRaises(ValueError, - storage.deviceaction.ActionResizeFormat, - lv_root, - lv_root.size - 1000) - lv_root.format.exists = True - - # instantiation of format migrate action for non-migratable format - # type should fail - lv_swap = self.storage.devicetree.getDeviceByName("VolGroup-lv_swap") - self.assertNotEqual(lv_swap, None) - self.assertEqual(lv_swap.exists, True) - self.failUnlessRaises(ValueError, - storage.deviceaction.ActionMigrateFormat, - lv_swap) - - # instantiation of format migrate for non-existent format should fail - lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root") - self.assertNotEqual(lv_root, None) - orig_format = lv_root.format - lv_root.format = getFormat("ext3", device=lv_root.path) - self.failUnlessRaises(ValueError, - storage.deviceaction.ActionMigrateFormat, - lv_root) - lv_root.format = orig_format - - # instantiation of device create action for existing device should - # fail - lv_swap = self.storage.devicetree.getDeviceByName("VolGroup-lv_swap") - self.assertNotEqual(lv_swap, None) - self.assertEqual(lv_swap.exists, True) - self.failUnlessRaises(ValueError, - storage.deviceaction.ActionCreateDevice, - lv_swap) - - # instantiation of format destroy action for device causes device's - # format attribute to be a DeviceFormat instance - lv_swap = self.storage.devicetree.getDeviceByName("VolGroup-lv_swap") - self.assertNotEqual(lv_swap, None) - orig_format = lv_swap.format - self.assertEqual(lv_swap.format.type, "swap") - a = storage.deviceaction.ActionDestroyFormat(lv_swap) - self.assertEqual(lv_swap.format.type, None) - - # instantiation of format create action for device causes new format - # to be accessible via device's format attribute - new_format = getFormat("vfat", device=lv_swap.path) - a = storage.deviceaction.ActionCreateFormat(lv_swap, new_format) - self.assertEqual(lv_swap.format, new_format) - lv_swap.format = orig_format - - def testActionRegistration(self, *args, **kwargs): - """ Verify correct operation of action registration and cancelling. """ - # self.setUp has just been run, so we should have something like - # a preexisting autopart config in the devicetree. - - # registering a destroy action for a non-leaf device should fail - vg = self.storage.devicetree.getDeviceByName("VolGroup") - self.assertNotEqual(vg, None) - self.assertEqual(vg.isleaf, False) - a = storage.deviceaction.ActionDestroyDevice(vg) - self.failUnlessRaises(ValueError, - self.storage.devicetree.registerAction, - a) - - # registering any action other than create for a device that's not in - # the devicetree should fail - sdc = self.storage.devicetree.getDeviceByName("sdc") - self.assertNotEqual(sdc, None) - sdc1 = self.newDevice(device_class=PartitionDevice, - name="sdc1", size=100000, parents=[sdc], - exists=True) - - sdc1_format = self.newFormat("ext2", device=sdc1.path, mountpoint="/") - create_sdc1_format = ActionCreateFormat(sdc1, sdc1_format) - self.failUnlessRaises(storage.errors.DeviceTreeError, - self.storage.devicetree.registerAction, - create_sdc1_format) - - sdc1_format.exists = True - - migrate_sdc1 = ActionMigrateFormat(sdc1) - self.failUnlessRaises(storage.errors.DeviceTreeError, - self.storage.devicetree.registerAction, - migrate_sdc1) - migrate_sdc1.cancel() - - resize_sdc1_format = ActionResizeFormat(sdc1, sdc1.size - 10000) - self.failUnlessRaises(storage.errors.DeviceTreeError, - self.storage.devicetree.registerAction, - resize_sdc1_format) - - resize_sdc1 = ActionResizeDevice(sdc1, sdc1.size - 10000) - self.failUnlessRaises(storage.errors.DeviceTreeError, - self.storage.devicetree.registerAction, - resize_sdc1) - - resize_sdc1.cancel() - resize_sdc1_format.cancel() - - destroy_sdc1_format = ActionDestroyFormat(sdc1) - self.failUnlessRaises(storage.errors.DeviceTreeError, - self.storage.devicetree.registerAction, - destroy_sdc1_format) - - - destroy_sdc1 = ActionDestroyDevice(sdc1) - self.failUnlessRaises(storage.errors.DeviceTreeError, - self.storage.devicetree.registerAction, - resize_sdc1) - - # registering a device destroy action should cause the device to be - # removed from the devicetree - lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root") - self.assertNotEqual(lv_root, None) - a = ActionDestroyDevice(lv_root) - self.storage.devicetree.registerAction(a) - lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root") - self.assertEqual(lv_root, None) - self.storage.devicetree.cancelAction(a) - - # registering a device create action should cause the device to be - # added to the devicetree - sdd = self.storage.devicetree.getDeviceByName("sdd") - self.assertNotEqual(sdd, None) - sdd1 = self.storage.devicetree.getDeviceByName("sdd1") - self.assertEqual(sdd1, None) - sdd1 = self.newDevice(device_class=PartitionDevice, - name="sdd1", size=100000, parents=[sdd]) - a = ActionCreateDevice(sdd1) - self.storage.devicetree.registerAction(a) - sdd1 = self.storage.devicetree.getDeviceByName("sdd1") - self.assertNotEqual(sdd1, None) - - def testActionObsoletes(self, *args, **kwargs): - """ Verify correct operation of DeviceAction.obsoletes. """ - self.destroyAllDevices(disks=["sdc"]) - sdc = self.storage.devicetree.getDeviceByName("sdc") - self.assertNotEqual(sdc, None) - - sdc1 = self.newDevice(device_class=PartitionDevice, - name="sdc1", parents=[sdc], size=40000) - - # ActionCreateDevice - # - # - obsoletes other ActionCreateDevice instances w/ lower id and same - # device - create_device_1 = ActionCreateDevice(sdc1) - create_device_2 = ActionCreateDevice(sdc1) - self.assertEqual(create_device_2.obsoletes(create_device_1), True) - self.assertEqual(create_device_1.obsoletes(create_device_2), False) - - # ActionCreateFormat - # - # - obsoletes other ActionCreateFormat instances w/ lower id and same - # device - format_1 = self.newFormat("ext3", mountpoint="/home", device=sdc1.path) - format_2 = self.newFormat("ext3", mountpoint="/opt", device=sdc1.path) - create_format_1 = ActionCreateFormat(sdc1, format_1) - create_format_2 = ActionCreateFormat(sdc1, format_2) - self.assertEqual(create_format_2.obsoletes(create_format_1), True) - self.assertEqual(create_format_1.obsoletes(create_format_2), False) - - # ActionMigrateFormat - # - # - obsoletes other ActionMigrateFormat instances w/ lower id and same - # device - sdc1.format = self.newFormat("ext2", mountpoint="/", device=sdc1.path, - device_instance=sdc1, - exists=True) - migrate_1 = ActionMigrateFormat(sdc1) - migrate_2 = ActionMigrateFormat(sdc1) - self.assertEqual(migrate_2.obsoletes(migrate_1), True) - self.assertEqual(migrate_1.obsoletes(migrate_2), False) - - # ActionResizeFormat - # - # - obsoletes other ActionResizeFormat instances w/ lower id and same - # device - resize_format_1 = ActionResizeFormat(sdc1, sdc1.size - 1000) - resize_format_2 = ActionResizeFormat(sdc1, sdc1.size - 5000) - self.assertEqual(resize_format_2.obsoletes(resize_format_1), True) - self.assertEqual(resize_format_1.obsoletes(resize_format_2), False) - - # ActionCreateFormat - # - # - obsoletes migrate, resize format actions w/ lower id on same device - new_format = self.newFormat("ext4", mountpoint="/foo", device=sdc1.path) - create_format_3 = ActionCreateFormat(sdc1, new_format) - self.assertEqual(create_format_3.obsoletes(resize_format_1), True) - self.assertEqual(create_format_3.obsoletes(resize_format_2), True) - self.assertEqual(create_format_3.obsoletes(migrate_1), True) - self.assertEqual(create_format_3.obsoletes(migrate_2), True) - - # ActionResizeDevice - # - # - obsoletes other ActionResizeDevice instances w/ lower id and same - # device - sdc1.exists = True - sdc1.format.exists = True - resize_device_1 = ActionResizeDevice(sdc1, sdc1.size + 10000) - resize_device_2 = ActionResizeDevice(sdc1, sdc1.size - 10000) - self.assertEqual(resize_device_2.obsoletes(resize_device_1), True) - self.assertEqual(resize_device_1.obsoletes(resize_device_2), False) - sdc1.exists = False - sdc1.format.exists = False - - # ActionDestroyFormat - # - # - obsoletes all format actions w/ lower id on same device (including - # self if format does not exist) - destroy_format_1 = ActionDestroyFormat(sdc1) - self.assertEqual(destroy_format_1.obsoletes(create_format_1), True) - self.assertEqual(destroy_format_1.obsoletes(migrate_2), True) - self.assertEqual(destroy_format_1.obsoletes(resize_format_1), True) - self.assertEqual(destroy_format_1.obsoletes(destroy_format_1), True) - - # ActionDestroyDevice - # - # - obsoletes all actions w/ lower id that act on the same non-existent - # device (including self) - # sdc1 does not exist - destroy_sdc1 = ActionDestroyDevice(sdc1) - self.assertEqual(destroy_sdc1.obsoletes(create_format_2), True) - self.assertEqual(destroy_sdc1.obsoletes(migrate_1), True) - self.assertEqual(destroy_sdc1.obsoletes(resize_format_2), True) - self.assertEqual(destroy_sdc1.obsoletes(create_device_1), True) - self.assertEqual(destroy_sdc1.obsoletes(resize_device_1), True) - self.assertEqual(destroy_sdc1.obsoletes(destroy_sdc1), True) - - # ActionDestroyDevice - # - # - obsoletes all but ActionDestroyFormat actions w/ lower id on the - # same existing device - # sda1 exists - sda1 = self.storage.devicetree.getDeviceByName("sda1") - self.assertNotEqual(sda1, None) - resize_sda1_format = ActionResizeFormat(sda1, sda1.size - 50) - resize_sda1 = ActionResizeDevice(sda1, sda1.size - 50) - destroy_sda1_format = ActionDestroyFormat(sda1) - destroy_sda1 = ActionDestroyDevice(sda1) - self.assertEqual(destroy_sda1.obsoletes(resize_sda1_format), True) - self.assertEqual(destroy_sda1.obsoletes(resize_sda1), True) - self.assertEqual(destroy_sda1.obsoletes(destroy_sda1), False) - self.assertEqual(destroy_sda1.obsoletes(destroy_sda1_format), False) - - def testActionPruning(self, *args, **kwargs): - """ Verify correct functioning of action pruning. """ - self.destroyAllDevices() - - sda = self.storage.devicetree.getDeviceByName("sda") - self.assertNotEqual(sda, None, "failed to find disk 'sda'") - - sda1 = self.newDevice(device_class=PartitionDevice, - name="sda1", size=500, parents=[sda]) - self.scheduleCreateDevice(device=sda1) - - sda2 = self.newDevice(device_class=PartitionDevice, - name="sda2", size=100000, parents=[sda]) - self.scheduleCreateDevice(device=sda2) - format = self.newFormat("lvmpv", device=sda2.path) - self.scheduleCreateFormat(device=sda2, format=format) - - vg = self.newDevice(device_class=LVMVolumeGroupDevice, - name="vg", parents=[sda2]) - self.scheduleCreateDevice(device=vg) - - lv_root = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="lv_root", vgdev=vg, size=60000) - self.scheduleCreateDevice(device=lv_root) - format = self.newFormat("ext4", device=lv_root.path, mountpoint="/") - self.scheduleCreateFormat(device=lv_root, format=format) - - lv_swap = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="lv_swap", vgdev=vg, size=4000) - self.scheduleCreateDevice(device=lv_swap) - format = self.newFormat("swap", device=lv_swap.path) - self.scheduleCreateFormat(device=lv_swap, format=format) - - # we'll soon schedule destroy actions for these members and the array, - # which will test pruning. the whole mess should reduce to nothing - sda3 = self.newDevice(device_class=PartitionDevice, - name="sda3", parents=[sda], size=40000) - self.scheduleCreateDevice(device=sda3) - format = self.newFormat("mdmember", device=sda3.path) - self.scheduleCreateFormat(device=sda3, format=format) - - sdb = self.storage.devicetree.getDeviceByName("sdb") - self.assertNotEqual(sdb, None, "failed to find disk 'sdb'") - - sdb1 = self.newDevice(device_class=PartitionDevice, - name="sdb1", parents=[sdb], size=40000) - self.scheduleCreateDevice(device=sdb1) - format = self.newFormat("mdmember", device=sdb1.path,) - self.scheduleCreateFormat(device=sdb1, format=format) - - md0 = self.newDevice(device_class=MDRaidArrayDevice, - name="md0", level="raid0", minor=0, size=80000, - memberDevices=2, totalDevices=2, - parents=[sdb1, sda3]) - self.scheduleCreateDevice(device=md0) - - format = self.newFormat("ext4", device=md0.path, mountpoint="/home") - self.scheduleCreateFormat(device=md0, format=format) - - # now destroy the md and its components - self.scheduleDestroyFormat(device=md0) - self.scheduleDestroyDevice(device=md0) - self.scheduleDestroyDevice(device=sdb1) - self.scheduleDestroyDevice(device=sda3) - - format = self.newFormat("ext4", mountpoint="/boot", device=sda1.path) - self.scheduleCreateFormat(device=sda1, format=format) - - # verify the md actions are present prior to pruning - md0_actions = self.storage.devicetree.findActions(devid=md0.id) - self.assertNotEqual(len(md0_actions), 0) - - sdb1_actions = self.storage.devicetree.findActions(devid=sdb1.id) - self.assertNotEqual(len(sdb1_actions), 0) - - sda3_actions = self.storage.devicetree.findActions(devid=sda3.id) - self.assertNotEqual(len(sda3_actions), 0) - - self.storage.devicetree.pruneActions() - - # verify the md actions are gone after pruning - md0_actions = self.storage.devicetree.findActions(devid=md0.id) - self.assertEqual(len(md0_actions), 0) - - sdb1_actions = self.storage.devicetree.findActions(devid=sdb1.id) - self.assertEqual(len(sdb1_actions), 0) - - sda3_actions = self.storage.devicetree.findActions(sda3.id) - self.assertEqual(len(sda3_actions), 0) - - def testActionDependencies(self, *args, **kwargs): - """ Verify correct functioning of action dependencies. """ - # ActionResizeDevice - # an action that shrinks a device should require the action that - # shrinks the device's format - lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root") - self.assertNotEqual(lv_root, None) - shrink_format = ActionResizeFormat(lv_root, lv_root.size - 5000) - shrink_device = ActionResizeDevice(lv_root, lv_root.size - 5000) - self.assertEqual(shrink_device.requires(shrink_format), True) - self.assertEqual(shrink_format.requires(shrink_device), False) - shrink_format.cancel() - shrink_device.cancel() - - # ActionResizeDevice - # an action that grows a format should require the action that - # grows the device - orig_size = lv_root.currentSize - grow_device = ActionResizeDevice(lv_root, orig_size + 100) - grow_format = ActionResizeFormat(lv_root, orig_size + 100) - self.assertEqual(grow_format.requires(grow_device), True) - self.assertEqual(grow_device.requires(grow_format), False) - - # create something like uncommitted autopart - self.destroyAllDevices() - sda = self.storage.devicetree.getDeviceByName("sda") - sdb = self.storage.devicetree.getDeviceByName("sdb") - sda1 = self.newDevice(device_class=PartitionDevice, - name="sda1", size=500, parents=[sda]) - sda1_format = self.newFormat("ext4", mountpoint="/boot", - device=sda1.path) - self.scheduleCreateDevice(device=sda1) - self.scheduleCreateFormat(device=sda1, format=sda1_format) - - sda2 = self.newDevice(device_class=PartitionDevice, - name="sda2", size=99500, parents=[sda]) - sda2_format = self.newFormat("lvmpv", device=sda1.path) - self.scheduleCreateDevice(device=sda2) - self.scheduleCreateFormat(device=sda2, format=sda2_format) - - sdb1 = self.newDevice(device_class=PartitionDevice, - name="sdb1", size=100000, parents=[sdb]) - sdb1_format = self.newFormat("lvmpv", device=sdb1.path) - self.scheduleCreateDevice(device=sdb1) - self.scheduleCreateFormat(device=sdb1, format=sdb1_format) - - vg = self.newDevice(device_class=LVMVolumeGroupDevice, - name="VolGroup", parents=[sda2, sdb1]) - self.scheduleCreateDevice(device=vg) - - lv_root = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="lv_root", vgdev=vg, size=160000) - self.scheduleCreateDevice(device=lv_root) - format = self.newFormat("ext4", device=lv_root.path, mountpoint="/") - self.scheduleCreateFormat(device=lv_root, format=format) - - lv_swap = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="lv_swap", vgdev=vg, size=4000) - self.scheduleCreateDevice(device=lv_swap) - format = self.newFormat("swap", device=lv_swap.path) - self.scheduleCreateFormat(device=lv_swap, format=format) - - # ActionCreateDevice - # creation of an LV should require the actions that create the VG, - # its PVs, and the devices that contain the PVs - lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root") - self.assertNotEqual(lv_root, None) - actions = self.storage.devicetree.findActions(type="create", - object="device", - device=lv_root) - self.assertEqual(len(actions), 1, - "wrong number of device create actions for lv_root: " - "%d" % len(actions)) - create_lv_action = actions[0] - - vgs = [d for d in self.storage.vgs if d.name == "VolGroup"] - self.assertNotEqual(vgs, []) - vg = vgs[0] - actions = self.storage.devicetree.findActions(type="create", - object="device", - device=vg) - self.assertEqual(len(actions), 1, - "wrong number of device create actions for VolGroup") - create_vg_action = actions[0] - - self.assertEqual(create_lv_action.requires(create_vg_action), True) - - create_pv_actions = [] - pvs = [d for d in self.storage.pvs if d in vg.pvs] - self.assertNotEqual(pvs, []) - for pv in pvs: - # include device and format create actions for each pv - actions = self.storage.devicetree.findActions(type="create", - device=pv) - self.assertEqual(len(actions), 2, - "wrong number of device create actions for " - "pv %s" % pv.name) - create_pv_actions.append(actions[0]) - - for pv_action in create_pv_actions: - self.assertEqual(create_lv_action.requires(pv_action), True) - # also check that the vg create action requires the pv actions - self.assertEqual(create_vg_action.requires(pv_action), True) - - # ActionCreateDevice - # the higher numbered partition of two that are scheduled to be - # created on a single disk should require the action that creates the - # lower numbered of the two, eg: create sda2 before creating sda3 - sdc = self.storage.devicetree.getDeviceByName("sdc") - self.assertNotEqual(sdc, None) - - sdc1 = self.newDevice(device_class=PartitionDevice, - name="sdc1", parents=[sdc], size=50000) - create_sdc1 = self.scheduleCreateDevice(device=sdc1) - self.assertEqual(isinstance(create_sdc1, ActionCreateDevice), True) - - sdc2 = self.newDevice(device_class=PartitionDevice, - name="sdc2", parents=[sdc], size=50000) - create_sdc2 = self.scheduleCreateDevice(device=sdc2) - self.assertEqual(isinstance(create_sdc2, ActionCreateDevice), True) - - self.assertEqual(create_sdc2.requires(create_sdc1), True) - self.assertEqual(create_sdc1.requires(create_sdc2), False) - - # ActionCreateDevice - # actions that create partitions on two separate disks should not - # require each other, regardless of the partitions' numbers - sda1 = self.storage.devicetree.getDeviceByName("sda1") - self.assertNotEqual(sda1, None) - actions = self.storage.devicetree.findActions(type="create", - object="device", - device=sda1) - self.assertEqual(len(actions), 1, - "wrong number of create actions found for sda1") - create_sda1 = actions[0] - self.assertEqual(create_sdc2.requires(create_sda1), False) - self.assertEqual(create_sda1.requires(create_sdc1), False) - - # ActionDestroyDevice - # an action that destroys a device containing an mdmember format - # should require the action that destroys the md array it is a - # member of if an array is defined - self.destroyAllDevices(disks=["sdc", "sdd"]) - sdc = self.storage.devicetree.getDeviceByName("sdc") - self.assertNotEqual(sdc, None) - sdd = self.storage.devicetree.getDeviceByName("sdd") - self.assertNotEqual(sdd, None) - - sdc1 = self.newDevice(device_class=PartitionDevice, - name="sdc1", parents=[sdc], size=40000) - self.scheduleCreateDevice(device=sdc1) - format = self.newFormat("mdmember", device=sdc1.path) - self.scheduleCreateFormat(device=sdc1, format=format) - - sdd1 = self.newDevice(device_class=PartitionDevice, - name="sdd1", parents=[sdd], size=40000) - self.scheduleCreateDevice(device=sdd1) - format = self.newFormat("mdmember", device=sdd1.path,) - self.scheduleCreateFormat(device=sdd1, format=format) - - md0 = self.newDevice(device_class=MDRaidArrayDevice, - name="md0", level="raid0", minor=0, size=80000, - memberDevices=2, totalDevices=2, - parents=[sdc1, sdd1]) - self.scheduleCreateDevice(device=md0) - format = self.newFormat("ext4", device=md0.path, mountpoint="/home") - self.scheduleCreateFormat(device=md0, format=format) - - destroy_md0_format = self.scheduleDestroyFormat(device=md0) - destroy_md0 = self.scheduleDestroyDevice(device=md0) - destroy_members = [self.scheduleDestroyDevice(device=sdc1)] - destroy_members.append(self.scheduleDestroyDevice(device=sdd1)) - - for member in destroy_members: - # device and format destroy actions for md members should require - # both device and format destroy actions for the md array - for array in [destroy_md0_format, destroy_md0]: - self.assertEqual(member.requires(array), True) - - # ActionDestroyDevice - # when there are two actions that will each destroy a partition on the - # same disk, the action that will destroy the lower-numbered - # partition should require the action that will destroy the higher- - # numbered partition, eg: destroy sda2 before destroying sda1 - self.destroyAllDevices(disks=["sdc", "sdd"]) - sdc1 = self.newDevice(device_class=PartitionDevice, - name="sdc1", parents=[sdc], size=50000) - self.scheduleCreateDevice(device=sdc1) - - sdc2 = self.newDevice(device_class=PartitionDevice, - name="sdc2", parents=[sdc], size=40000) - self.scheduleCreateDevice(device=sdc2) - - destroy_sdc1 = self.scheduleDestroyDevice(device=sdc1) - destroy_sdc2 = self.scheduleDestroyDevice(device=sdc2) - self.assertEqual(destroy_sdc1.requires(destroy_sdc2), True) - self.assertEqual(destroy_sdc2.requires(destroy_sdc1), False) - - self.destroyAllDevices(disks=["sdc", "sdd"]) - sdc = self.storage.devicetree.getDeviceByName("sdc") - self.assertNotEqual(sdc, None) - sdd = self.storage.devicetree.getDeviceByName("sdd") - self.assertNotEqual(sdd, None) - - sdc1 = self.newDevice(device_class=PartitionDevice, - name="sdc1", parents=[sdc], size=50000) - create_pv = self.scheduleCreateDevice(device=sdc1) - format = self.newFormat("lvmpv", device=sdc1.path) - create_pv_format = self.scheduleCreateFormat(device=sdc1, format=format) - - testvg = self.newDevice(device_class=LVMVolumeGroupDevice, - name="testvg", parents=[sdc1], size=50000) - create_vg = self.scheduleCreateDevice(device=testvg) - testlv = self.newDevice(device_class=LVMLogicalVolumeDevice, - name="testlv", vgdev=testvg, size=30000) - create_lv = self.scheduleCreateDevice(device=testlv) - format = self.newFormat("ext4", device=testlv.path) - create_lv_format = self.scheduleCreateFormat(device=testlv, format=format) - - # ActionCreateFormat - # creation of a format on a non-existent device should require the - # action that creates the device - self.assertEqual(create_lv_format.requires(create_lv), True) - - # ActionCreateFormat - # an action that creates a format on a device should require an action - # that creates a device that the format's device depends on - self.assertEqual(create_lv_format.requires(create_pv), True) - self.assertEqual(create_lv_format.requires(create_vg), True) - - # ActionCreateFormat - # an action that creates a format on a device should require an action - # that creates a format on a device that the format's device depends on - self.assertEqual(create_lv_format.requires(create_pv_format), True) - - # XXX from here on, the devices are existing but not in the tree, so - # we instantiate and use actions directly - self.destroyAllDevices(disks=["sdc", "sdd"]) - sdc1 = self.newDevice(device_class=PartitionDevice, exists=True, - name="sdc1", parents=[sdc], size=50000) - sdc1.format = self.newFormat("lvmpv", device=sdc1.path, exists=True, - device_instance=sdc1) - testvg = self.newDevice(device_class=LVMVolumeGroupDevice, exists=True, - name="testvg", parents=[sdc1], size=50000) - testlv = self.newDevice(device_class=LVMLogicalVolumeDevice, - exists=True, - name="testlv", vgdev=testvg, size=30000) - testlv.format = self.newFormat("ext4", device=testlv.path, - exists=True, device_instance=testlv) - - # ActionResizeDevice - # an action that resizes a device should require an action that grows - # a device that the first action's device depends on, eg: grow - # device containing PV before resize of VG or LVs - tmp = sdc1.format - sdc1.format = None # since lvmpv format is not resizable - grow_pv = ActionResizeDevice(sdc1, sdc1.size + 10000) - grow_lv = ActionResizeDevice(testlv, testlv.size + 5000) - grow_lv_format = ActionResizeFormat(testlv, testlv.size + 5000) - - self.assertEqual(grow_lv.requires(grow_pv), True) - self.assertEqual(grow_pv.requires(grow_lv), False) - - # ActionResizeFormat - # an action that grows a format should require the action that grows - # the format's device - self.assertEqual(grow_lv_format.requires(grow_lv), True) - self.assertEqual(grow_lv.requires(grow_lv_format), False) - - # ActionResizeFormat - # an action that resizes a device's format should depend on an action - # that grows a device the first device depends on - self.assertEqual(grow_lv_format.requires(grow_pv), True) - self.assertEqual(grow_pv.requires(grow_lv_format), False) - - # ActionResizeFormat - # an action that resizes a device's format should depend on an action - # that grows a format on a device the first device depends on - # XXX resize of PV format is not allowed, so there's no real-life - # example of this to test - - grow_lv_format.cancel() - grow_lv.cancel() - grow_pv.cancel() - - # ActionResizeDevice - # an action that resizes a device should require an action that grows - # a format on a device that the first action's device depends on, eg: - # grow PV format before resize of VG or LVs - # XXX resize of PV format is not allowed, so there's no real-life - # example of this to test - - # ActionResizeDevice - # an action that resizes a device should require an action that - # shrinks a device that depends on the first action's device, eg: - # shrink LV before resizing VG or PV devices - shrink_lv = ActionResizeDevice(testlv, testlv.size - 10000) - shrink_pv = ActionResizeDevice(sdc1, sdc1.size - 5000) - - self.assertEqual(shrink_pv.requires(shrink_lv), True) - self.assertEqual(shrink_lv.requires(shrink_pv), False) - - # ActionResizeDevice - # an action that resizes a device should require an action that - # shrinks a format on a device that depends on the first action's - # device, eg: shrink LV format before resizing VG or PV devices - shrink_lv_format = ActionResizeFormat(testlv, testlv.size) - self.assertEqual(shrink_pv.requires(shrink_lv_format), True) - self.assertEqual(shrink_lv_format.requires(shrink_pv), False) - - # ActionResizeFormat - # an action that resizes a device's format should depend on an action - # that shrinks a device that depends on the first device - # XXX can't think of a real-world example of this since PVs and MD - # member devices are not resizable in anaconda - - # ActionResizeFormat - # an action that resizes a device's format should depend on an action - # that shrinks a format on a device that depends on the first device - # XXX can't think of a real-world example of this since PVs and MD - # member devices are not resizable in anaconda - - shrink_lv_format.cancel() - shrink_lv.cancel() - shrink_pv.cancel() - sdc1.format = tmp # restore pv's lvmpv format - - # ActionCreateFormat - # an action that creates a format on a device should require an action - # that resizes a device that the format's device depends on - # XXX Really? Is this always so? - - # ActionCreateFormat - # an action that creates a format on a device should require an action - # that resizes a format on a device that the format's device depends on - # XXX Same as above. - - # ActionCreateFormat - # an action that creates a format on a device should require an action - # that resizes the device that will contain the format - grow_lv = ActionResizeDevice(testlv, testlv.size + 1000) - format = self.newFormat("msdos", device=testlv.path) - format_lv = ActionCreateFormat(testlv, format) - self.assertEqual(format_lv.requires(grow_lv), True) - self.assertEqual(grow_lv.requires(format_lv), False) - - # ActionDestroyFormat - # an action that destroys a format should require an action that - # destroys a device that depends on the format's device - destroy_pv_format = ActionDestroyFormat(sdc1) - destroy_lv_format = ActionDestroyFormat(testlv) - destroy_lv = ActionDestroyDevice(testlv) - self.assertEqual(destroy_pv_format.requires(destroy_lv), True) - self.assertEqual(destroy_lv.requires(destroy_pv_format), False) - - # ActionDestroyFormat - # an action that destroys a format should require an action that - # destroys a format on a device that depends on the first format's - # device - self.assertEqual(destroy_pv_format.requires(destroy_lv_format), True) - self.assertEqual(destroy_lv_format.requires(destroy_pv_format), False) - - def testActionSorting(self, *args, **kwargs): - """ Verify correct functioning of action sorting. """ - pass - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(DeviceActionTestCase) - - -if __name__ == "__main__": - unittest.main() - diff --git a/tests/storage_test/devicelibs_test/Makefile.am b/tests/storage_test/devicelibs_test/Makefile.am deleted file mode 100644 index eeeabbdc4..000000000 --- a/tests/storage_test/devicelibs_test/Makefile.am +++ /dev/null @@ -1,22 +0,0 @@ -# tests/storage/devicelibs/Makefile.am for anaconda -# -# Copyright (C) 2009 Red Hat, Inc. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published -# by the Free Software Foundation; either version 2.1 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# Author: David Cantrell <dcantrell@redhat.com> - -EXTRA_DIST = *.py - -MAINTAINERCLEANFILES = Makefile.in diff --git a/tests/storage_test/devicelibs_test/baseclass.py b/tests/storage_test/devicelibs_test/baseclass.py deleted file mode 100644 index 06adef989..000000000 --- a/tests/storage_test/devicelibs_test/baseclass.py +++ /dev/null @@ -1,80 +0,0 @@ -import unittest -import os -import subprocess - - -def makeLoopDev(device_name, file_name): - proc = subprocess.Popen(["dd", "if=/dev/zero", "of=%s" % file_name, - "bs=1024", "count=102400"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - while True: - (out, err) = proc.communicate() - if proc.returncode is not None: - rc = proc.returncode - break - if rc: - raise OSError, "dd failed creating the file %s" % file_name - - proc = subprocess.Popen(["losetup", device_name, file_name], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - while True: - (out, err) = proc.communicate() - if proc.returncode is not None: - rc = proc.returncode - break - if rc: - raise OSError, "losetup failed setting up the loop device %s" % device_name - -def removeLoopDev(device_name, file_name): - proc = subprocess.Popen(["losetup", "-d", device_name], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - while True: - (out, err) = proc.communicate() - if proc.returncode is not None: - rc = proc.returncode - break - if rc: - raise OSError, "losetup failed removing the loop device %s" % device_name - - os.unlink(file_name) - -def getFreeLoopDev(): - # There's a race condition here where another process could grab the loop - # device losetup gives us before we have time to set it up, but that's just - # a chance we'll have to take. - proc = subprocess.Popen(["losetup", "-f"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out = None - - while True: - (out, err) = proc.communicate() - if proc.returncode is not None: - rc = proc.returncode - out = out.strip() - break - - if rc: - raise OSError, "losetup failed to find a free device" - - return out - -class DevicelibsTestCase(unittest.TestCase): - - _LOOP_DEVICES = ["/tmp/test-virtdev0", "/tmp/test-virtdev1"] - - def __init__(self, *args, **kwargs): - import pyanaconda.anaconda_log - pyanaconda.anaconda_log.init() - - unittest.TestCase.__init__(self, *args, **kwargs) - self._loopMap = {} - - def setUp(self): - for file in self._LOOP_DEVICES: - dev = getFreeLoopDev() - makeLoopDev(dev, file) - self._loopMap[file] = dev - - def tearDown(self): - for (file, dev) in self._loopMap.iteritems(): - removeLoopDev(dev, file) diff --git a/tests/storage_test/devicelibs_test/crypto_test.py b/tests/storage_test/devicelibs_test/crypto_test.py deleted file mode 100755 index 3e9e3a667..000000000 --- a/tests/storage_test/devicelibs_test/crypto_test.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/python -import baseclass -import unittest -from mock import acceptance - -import tempfile -import os - -class CryptoTestCase(baseclass.DevicelibsTestCase): - - def testCrypto(self): - _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] - _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] - - import storage.devicelibs.crypto as crypto - - - @acceptance - def testCrypto(self): - ## - ## is_luks - ## - # pass - self.assertEqual(crypto.is_luks(_LOOP_DEV0), -22) - self.assertEqual(crypto.is_luks("/not/existing/device"), -22) - - ## - ## luks_format - ## - # pass - self.assertEqual(crypto.luks_format(_LOOP_DEV0, passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256), None) - - # make a key file - handle, keyfile = tempfile.mkstemp(prefix="key", text=False) - os.write(handle, "nobodyknows") - os.close(handle) - - # format with key file - self.assertEqual(crypto.luks_format(_LOOP_DEV1, key_file=keyfile), None) - - # fail - self.assertRaises(crypto.CryptoError, crypto.luks_format, "/not/existing/device", passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256) - # no passhprase or key file - self.assertRaises(ValueError, crypto.luks_format, _LOOP_DEV1, cipher="aes-cbc-essiv:sha256", key_size=256) - - ## - ## is_luks - ## - # pass - self.assertEqual(crypto.is_luks(_LOOP_DEV0), 0) # 0 = is luks - self.assertEqual(crypto.is_luks(_LOOP_DEV1), 0) - - ## - ## luks_add_key - ## - # pass - self.assertEqual(crypto.luks_add_key(_LOOP_DEV0, new_passphrase="another-secret", passphrase="secret"), None) - - # make another key file - handle, new_keyfile = tempfile.mkstemp(prefix="key", text=False) - os.write(handle, "area51") - os.close(handle) - - # add new key file - self.assertEqual(crypto.luks_add_key(_LOOP_DEV1, new_key_file=new_keyfile, key_file=keyfile), None) - - # fail - self.assertRaises(crypto.CryptoError, crypto.luks_add_key, _LOOP_DEV0, new_passphrase="another-secret", passphrase="wrong-passphrase") - - ## - ## luks_remove_key - ## - # fail - self.assertRaises(RuntimeError, crypto.luks_remove_key, _LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase") - - # pass - self.assertEqual(crypto.luks_remove_key(_LOOP_DEV0, del_passphrase="another-secret", passphrase="secret"), None) - - # remove key file - self.assertEqual(crypto.luks_remove_key(LOOP_DEV1, del_key_file=new_keyfile, key_file=keyfile), None) - - ## - ## luks_open - ## - # pass - self.assertEqual(crypto.luks_open(_LOOP_DEV0, "crypted", passphrase="secret"), None) - self.assertEqual(crypto.luks_open(_LOOP_DEV1, "encrypted", key_file=keyfile), None) - - # fail - self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", passphrase="secret") - self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", key_file=keyfile) - # no passhprase or key file - self.assertRaises(ValueError, crypto.luks_open, _LOOP_DEV1, "another-crypted") - - ## - ## luks_status - ## - # pass - self.assertEqual(crypto.luks_status("crypted"), True) - self.assertEqual(crypto.luks_status("encrypted"), True) - self.assertEqual(crypto.luks_status("another-crypted"), False) - - ## - ## luks_uuid - ## - # pass - uuid = crypto.luks_uuid(_LOOP_DEV0) - self.assertEqual(crypto.luks_uuid(_LOOP_DEV0), uuid) - uuid = crypto.luks_uuid(_LOOP_DEV1) - self.assertEqual(crypto.luks_uuid(_LOOP_DEV1), uuid) - - ## - ## luks_close - ## - # pass - self.assertEqual(crypto.luks_close("crypted"), None) - self.assertEqual(crypto.luks_close("encrypted"), None) - - # fail - self.assertRaises(crypto.CryptoError, crypto.luks_close, "wrong-name") - # already closed - self.assertRaises(crypto.CryptoError, crypto.luks_close, "crypted") - self.assertRaises(crypto.CryptoError, crypto.luks_close, "encrypted") - - # cleanup - os.unlink(keyfile) - os.unlink(new_keyfile) - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(CryptoTestCase) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/devicelibs_test/edd_test.py b/tests/storage_test/devicelibs_test/edd_test.py deleted file mode 100644 index 449395508..000000000 --- a/tests/storage_test/devicelibs_test/edd_test.py +++ /dev/null @@ -1,212 +0,0 @@ -import mock - -class EddTestCase(mock.TestCase): - def setUp(self): - self.setupModules( - ['_isys', 'logging', 'pyanaconda.anaconda_log', 'block']) - - def tearDown(self): - self.tearDownModules() - - def test_biosdev_to_edd_dir(self): - from pyanaconda.storage.devicelibs import edd - path = edd.biosdev_to_edd_dir(138) - self.assertEqual("/sys/firmware/edd/int13_dev8a", path) - - def test_collect_edd_data(self): - from pyanaconda.storage.devicelibs import edd - - # test with vda, vdb - fs = EddTestFS(self, edd).vda_vdb() - edd_dict = edd.collect_edd_data() - self.assertEqual(len(edd_dict), 2) - self.assertEqual(edd_dict[0x80].type, "SCSI") - self.assertEqual(edd_dict[0x80].scsi_id, 0) - self.assertEqual(edd_dict[0x80].scsi_lun, 0) - self.assertEqual(edd_dict[0x80].pci_dev, "00:05.0") - self.assertEqual(edd_dict[0x80].channel, 0) - self.assertEqual(edd_dict[0x80].sectors, 16777216) - self.assertEqual(edd_dict[0x81].pci_dev, "00:06.0") - - # test with sda, vda - fs = EddTestFS(self, edd).sda_vda() - edd_dict = edd.collect_edd_data() - self.assertEqual(len(edd_dict), 2) - self.assertEqual(edd_dict[0x80].type, "ATA") - self.assertEqual(edd_dict[0x80].scsi_id, None) - self.assertEqual(edd_dict[0x80].scsi_lun, None) - self.assertEqual(edd_dict[0x80].pci_dev, "00:01.1") - self.assertEqual(edd_dict[0x80].channel, 0) - self.assertEqual(edd_dict[0x80].sectors, 2097152) - self.assertEqual(edd_dict[0x80].ata_device, 0) - self.assertEqual(edd_dict[0x80].mbr_signature, "0x000ccb01") - - def test_collect_edd_data_cciss(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_cciss() - edd_dict = edd.collect_edd_data() - - self.assertEqual(edd_dict[0x80].pci_dev, None) - self.assertEqual(edd_dict[0x80].channel, None) - - def test_edd_entry_str(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda() - edd_dict = edd.collect_edd_data() - expected_output = """\ttype: ATA, ata_device: 0 -\tchannel: 0, mbr_signature: 0x000ccb01 -\tpci_dev: 00:01.1, scsi_id: None -\tscsi_lun: None, sectors: 2097152""" - self.assertEqual(str(edd_dict[0x80]), expected_output) - - def test_matcher_device_path(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda() - edd_dict = edd.collect_edd_data() - - analyzer = edd.EddMatcher(edd_dict[0x80]) - path = analyzer.devname_from_pci_dev() - self.assertEqual(path, "sda") - - analyzer = edd.EddMatcher(edd_dict[0x81]) - path = analyzer.devname_from_pci_dev() - self.assertEqual(path, "vda") - - def test_bad_device_path(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda_no_pcidev() - edd_dict = edd.collect_edd_data() - - analyzer = edd.EddMatcher(edd_dict[0x80]) - path = analyzer.devname_from_pci_dev() - self.assertEqual(path, None) - - def test_bad_host_bus(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda_no_host_bus() - - edd_dict = edd.collect_edd_data() - - # 0x80 entry is basted so fail without an exception - analyzer = edd.EddMatcher(edd_dict[0x80]) - devname = analyzer.devname_from_pci_dev() - self.assertEqual(devname, None) - - # but still succeed on 0x81 - analyzer = edd.EddMatcher(edd_dict[0x81]) - devname = analyzer.devname_from_pci_dev() - self.assertEqual(devname, "vda") - - def test_get_edd_dict_1(self): - """ Test get_edd_dict()'s pci_dev matching. """ - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda() - self.assertEqual(edd.get_edd_dict([]), - {'sda' : 0x80, - 'vda' : 0x81}) - - def test_get_edd_dict_2(self): - """ Test get_edd_dict()'s pci_dev matching. """ - from pyanaconda.storage.devicelibs import edd - edd.collect_mbrs = mock.Mock(return_value = { - 'sda' : '0x000ccb01', - 'vda' : '0x0006aef1'}) - fs = EddTestFS(self, edd).sda_vda_missing_details() - self.assertEqual(edd.get_edd_dict([]), - {'sda' : 0x80, - 'vda' : 0x81}) - - def test_get_edd_dict_3(self): - """ Test scenario when the 0x80 and 0x81 edd directories contain the - same data and give no way to distinguish among the two devices. - """ - from pyanaconda.storage.devicelibs import edd - edd.log = mock.Mock() - edd.collect_mbrs = mock.Mock(return_value={'sda' : '0x000ccb01', - 'vda' : '0x0006aef1'}) - fs = EddTestFS(self, edd).sda_sdb_same() - self.assertEqual(edd.get_edd_dict([]), {}) - self.assertIn((('edd: both edd entries 0x80 and 0x81 seem to map to sda',), {}), - edd.log.info.call_args_list) - -class EddTestFS(object): - def __init__(self, test_case, target_module): - self.fs = mock.DiskIO() - test_case.take_over_io(self.fs, target_module) - - def sda_vda_missing_details(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n" - self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1\n" - - def sda_vda(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/interface"] = "ATA device: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n" - self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n" - - self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:05.0 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/interface"] = "SCSI id: 0 lun: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1\n" - self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "16777216\n" - - self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block"] = self.fs.Dir() - self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block/sda"] = self.fs.Dir() - - self.fs["/sys/devices/pci0000:00/0000:00:05.0/virtio2/block"] = self.fs.Dir() - self.fs["/sys/devices/pci0000:00/0000:00:05.0/virtio2/block/vda"] = self.fs.Dir() - - return self.fs - - def sda_vda_no_pcidev(self): - self.sda_vda() - entries = [e for e in self.fs.fs if e.startswith("/sys/devices/pci")] - map(self.fs.os_remove, entries) - return self.fs - - def sda_vda_no_host_bus(self): - self.sda_vda() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: \n" - self.fs.os_remove("/sys/firmware/edd/int13_dev80/mbr_signature") - self.fs.os_remove("/sys/firmware/edd/int13_dev81/mbr_signature") - - def sda_cciss(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCIX 05:00.0 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/interface"] = "RAID identity_tag: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n" - self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n" - - return self.fs - - def vda_vdb(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:05.0 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/interface"] = "SCSI id: 0 lun: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "16777216\n" - - self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:06.0 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/interface"] = "SCSI id: 0 lun: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "4194304\n" - - return self.fs - - def sda_sdb_same(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/interface"] = "ATA device: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01" - self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n" - - self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:01.1 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/interface"] = "ATA device: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1" - self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "2097152\n" - - self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block"] = self.fs.Dir() - self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block/sda"] = self.fs.Dir() diff --git a/tests/storage_test/devicelibs_test/lvm_test.py b/tests/storage_test/devicelibs_test/lvm_test.py deleted file mode 100755 index e639a2137..000000000 --- a/tests/storage_test/devicelibs_test/lvm_test.py +++ /dev/null @@ -1,239 +0,0 @@ -#!/usr/bin/python -import baseclass -import unittest -from mock import acceptance - -class LVMTestCase(baseclass.DevicelibsTestCase): - - def testLVM(self): - _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] - _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] - - import storage.devicelibs.lvm as lvm - - - @acceptance - def testLVM(self): - ## - ## pvcreate - ## - # pass - for dev, file in self._loopMap.iteritems(): - self.assertEqual(lvm.pvcreate(dev), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.pvcreate, "/not/existing/device") - - ## - ## pvresize - ## - # pass - for dev, file in self._loopMap.iteritems(): - self.assertEqual(lvm.pvresize(dev, 50), None) - self.assertEqual(lvm.pvresize(dev, 100), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.pvresize, "/not/existing/device", 50) - - ## - ## vgcreate - ## - # pass - self.assertEqual(lvm.vgcreate("test-vg", [_LOOP_DEV0, _LOOP_DEV1], 4), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", ["/not/existing/device"], 4) - # vg already exists - self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [_LOOP_DEV0], 4) - # pe size must be power of 2 - self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [_LOOP_DEV0], 5) - - ## - ## pvremove - ## - # fail - # cannot remove pv now with vg created - self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0) - - ## - ## vgdeactivate - ## - # pass - self.assertEqual(lvm.vgdeactivate("test-vg"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgdeactivate, "wrong-vg-name") - - ## - ## vgreduce - ## - # pass - self.assertEqual(lvm.vgreduce("test-vg", [_LOOP_DEV1]), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [_LOOP_DEV1]) - self.assertRaises(lvm.LVMError, lvm.vgreduce, "test-vg", ["/not/existing/device"]) - - ## - ## vgactivate - ## - # pass - self.assertEqual(lvm.vgactivate("test-vg"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgactivate, "wrong-vg-name") - - ## - ## pvinfo - ## - # pass - self.assertEqual(lvm.pvinfo(_LOOP_DEV0)["pv_name"], _LOOP_DEV0) - # no vg - self.assertEqual(lvm.pvinfo(_LOOP_DEV1)["pv_name"], _LOOP_DEV1) - - # fail - self.assertRaises(lvm.LVMError, lvm.pvinfo, "/not/existing/device") - - ## - ## vginfo - ## - # pass - self.assertEqual(lvm.vginfo("test-vg")["pe_size"], "4.00") - - # fail - self.assertRaises(lvm.LVMError, lvm.vginfo, "wrong-vg-name") - - ## - ## lvcreate - ## - # pass - self.assertEqual(lvm.lvcreate("test-vg", "test-lv", 10), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvcreate, "wrong-vg-name", "another-lv", 10) - - ## - ## lvdeactivate - ## - # pass - self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "test-vg", "wrong-lv-name") - self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "test-lv") - self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "wrong-lv-name") - - ## - ## lvresize - ## - # pass - self.assertEqual(lvm.lvresize("test-vg", "test-lv", 60), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "wrong-lv-name", 80) - self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "test-lv", 80) - self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "wrong-lv-name", 80) - # changing to same size - self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "test-lv", 60) - - ## - ## lvactivate - ## - # pass - self.assertEqual(lvm.lvactivate("test-vg", "test-lv"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvactivate, "test-vg", "wrong-lv-name") - self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "test-lv") - self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "wrong-lv-name") - - ## - ## lvs - ## - # pass - self.assertEqual(lvm.lvs("test-vg")["test-lv"]["size"], "60.00") - - # fail - self.assertRaises(lvm.LVMError, lvm.lvs, "wrong-vg-name") - - ## - ## has_lvm - ## - # pass - self.assertEqual(lvm.has_lvm(), True) - - # fail - # TODO - - ## - ## lvremove - ## - # pass - self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None) # is deactivation needed? - self.assertEqual(lvm.lvremove("test-vg", "test-lv"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "wrong-lv-name") - self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "test-lv") - self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "wrong-lv-name") - # lv already removed - self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "test-lv") - - ## - ## vgremove - ## - # pass - self.assertEqual(lvm.vgremove("test-vg"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgremove, "wrong-vg-name") - # vg already removed - self.assertRaises(lvm.LVMError, lvm.vgremove, "test-vg") - - ## - ## pvremove - ## - # pass - for dev, file in self._loopMap.iteritems(): - self.assertEqual(lvm.pvremove(dev), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.pvremove, "/not/existing/device") - # pv already removed - self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0) - - #def testGetPossiblePhysicalExtents(self): - # pass - self.assertEqual(lvm.getPossiblePhysicalExtents(4), - filter(lambda pe: pe > 4, map(lambda power: 2**power, xrange(3, 25)))) - self.assertEqual(lvm.getPossiblePhysicalExtents(100000), - filter(lambda pe: pe > 100000, map(lambda power: 2**power, xrange(3, 25)))) - - #def testGetMaxLVSize(self): - # pass - self.assertEqual(lvm.getMaxLVSize(), 16*1024**2) - - #def testSafeLVMName(self): - # pass - self.assertEqual(lvm.safeLvmName("/strange/lv*name5"), "strange_lvname5") - - #def testClampSize(self): - # pass - self.assertEqual(lvm.clampSize(10, 4), 8L) - self.assertEqual(lvm.clampSize(10, 4, True), 12L) - - #def testVGUsedSpace(self): - # TODO - pass - - #def testVGFreeSpace(self): - # TODO - pass - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(LVMTestCase) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/devicelibs_test/mdraid_test.py b/tests/storage_test/devicelibs_test/mdraid_test.py deleted file mode 100755 index 9083bd162..000000000 --- a/tests/storage_test/devicelibs_test/mdraid_test.py +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/python -import baseclass -import unittest -import time -from mock import acceptance - -class MDRaidTestCase(baseclass.DevicelibsTestCase): - - def testMDRaid(self): - _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] - _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] - - import storage.devicelibs.mdraid as mdraid - - @acceptance - def testMDRaid(self): - ## - ## getRaidLevels - ## - # pass - self.assertEqual(mdraid.getRaidLevels(), mdraid.getRaidLevels()) - - ## - ## get_raid_min_members - ## - # pass - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID0), 2) - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID1), 2) - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID5), 3) - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID6), 4) - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID10), 2) - - # fail - # unsupported raid - self.assertRaises(ValueError, mdraid.get_raid_min_members, 8) - - ## - ## get_raid_max_spares - ## - # pass - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID0, 5), 0) - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID1, 5), 3) - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID5, 5), 2) - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID6, 5), 1) - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID10, 5), 3) - - # fail - # unsupported raid - self.assertRaises(ValueError, mdraid.get_raid_max_spares, 8, 5) - - ## - ## mdcreate - ## - # pass - self.assertEqual(mdraid.mdcreate("/dev/md0", 1, [_LOOP_DEV0, _LOOP_DEV1]), None) - # wait for raid to settle - time.sleep(2) - - # fail - self.assertRaises(mdraid.MDRaidError, mdraid.mdcreate, "/dev/md1", 1, ["/not/existing/dev0", "/not/existing/dev1"]) - - ## - ## mddeactivate - ## - # pass - self.assertEqual(mdraid.mddeactivate("/dev/md0"), None) - - # fail - self.assertRaises(mdraid.MDRaidError, mdraid.mddeactivate, "/not/existing/md") - - ## - ## mdadd - ## - # pass - # TODO - - # fail - self.assertRaises(mdraid.MDRaidError, mdraid.mdadd, "/not/existing/device") - - ## - ## mdactivate - ## - # pass - self.assertEqual(mdraid.mdactivate("/dev/md0", [_LOOP_DEV0, _LOOP_DEV1], super_minor=0), None) - # wait for raid to settle - time.sleep(2) - - # fail - self.assertRaises(mdraid.MDRaidError, mdraid.mdactivate, "/not/existing/md", super_minor=1) - # requires super_minor or uuid - self.assertRaises(ValueError, mdraid.mdactivate, "/dev/md1") - - ## - ## mddestroy - ## - # pass - # deactivate first - self.assertEqual(mdraid.mddeactivate("/dev/md0"), None) - - self.assertEqual(mdraid.mddestroy(_LOOP_DEV0), None) - self.assertEqual(mdraid.mddestroy(_LOOP_DEV1), None) - - # fail - # not a component - self.assertRaises(mdraid.MDRaidError, mdraid.mddestroy, "/dev/md0") - self.assertRaises(mdraid.MDRaidError, mdraid.mddestroy, "/not/existing/device") - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(MDRaidTestCase) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/devicelibs_test/mpath_test.py b/tests/storage_test/devicelibs_test/mpath_test.py deleted file mode 100755 index 5a160d981..000000000 --- a/tests/storage_test/devicelibs_test/mpath_test.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/python -import mock - -class MPathTestCase(mock.TestCase): - - # creating devices, user_friendly_names set to yes - output1 = """\ -create: mpathb (1ATA ST3120026AS 5M) undef ATA,ST3120026AS -size=112G features='0' hwhandler='0' wp=undef -`-+- policy='round-robin 0' prio=1 status=undef - `- 2:0:0:0 sda 8:0 undef ready running -create: mpatha (36006016092d21800703762872c60db11) undef DGC,RAID 5 -size=10G features='1 queue_if_no_path' hwhandler='1 emc' wp=undef -`-+- policy='round-robin 0' prio=2 status=undef - |- 6:0:0:0 sdb 8:16 undef ready running - `- 7:0:0:0 sdc 8:32 undef ready running\ -""" - - # listing existing devices, user_friendly_names set to yes - output2 = """\ -mpathb (3600a0b800067fcc9000001f34d23ff88) dm-1 IBM,1726-4xx FAStT -size=100G features='0' hwhandler='1 rdac' wp=rw -`-+- policy='round-robin 0' prio=-1 status=active - |- 1:0:0:0 sda 8:0 active undef running - `- 2:0:0:0 sdc 8:32 active undef running -mpatha (3600a0b800067fabc000067694d23fe6e) dm-0 IBM,1726-4xx FAStT -size=100G features='0' hwhandler='1 rdac' wp=rw -`-+- policy='round-robin 0' prio=-1 status=active - |- 1:0:0:1 sdb 8:16 active undef running - `- 2:0:0:1 sdd 8:48 active undef running -""" - - # creating devices, user_friendly_names set to no - output3 = """\ -create: 3600a0b800067fabc000067694d23fe6e undef IBM,1726-4xx FAStT -size=100G features='1 queue_if_no_path' hwhandler='1 rdac' wp=undef -`-+- policy='round-robin 0' prio=6 status=undef - |- 1:0:0:1 sdb 8:16 undef ready running - `- 2:0:0:1 sdd 8:48 undef ready running -create: 3600a0b800067fcc9000001f34d23ff88 undef IBM,1726-4xx FAStT -size=100G features='1 queue_if_no_path' hwhandler='1 rdac' wp=undef -`-+- policy='round-robin 0' prio=3 status=undef - |- 1:0:0:0 sda 8:0 undef ready running - `- 2:0:0:0 sdc 8:32 undef ready running\ -""" - - # listing existing devices, user_friendly_names set to no - output4 = """\ -3600a0b800067fcc9000001f34d23ff88 dm-1 IBM,1726-4xx FAStT -size=100G features='0' hwhandler='1 rdac' wp=rw -`-+- policy='round-robin 0' prio=-1 status=active - |- 1:0:0:0 sda 8:0 active undef running - `- 2:0:0:0 sdc 8:32 active undef running -3600a0b800067fabc000067694d23fe6e dm-0 IBM,1726-4xx FAStT -size=100G features='0' hwhandler='1 rdac' wp=rw -`-+- policy='round-robin 0' prio=-1 status=active - |- 1:0:0:1 sdb 8:16 active undef running - `- 2:0:0:1 sdd 8:48 active undef running -""" - - def setUp(self): - self.setupModules( - ['_isys', 'logging', 'anaconda_log', 'block']) - - def tearDown(self): - self.tearDownModules() - - def testParse(self): - from pyanaconda.storage.devicelibs import mpath - topology = mpath.parseMultipathOutput(self.output1) - self.assertEqual(topology, - {'mpatha':['sdb','sdc'], 'mpathb':['sda']}) - topology = mpath.parseMultipathOutput(self.output2) - self.assertEqual(topology, - {'mpathb':['sda','sdc'], 'mpatha':['sdb', 'sdd']}) - topology = mpath.parseMultipathOutput(self.output3) - self.assertEqual(topology, - {'3600a0b800067fabc000067694d23fe6e' : ['sdb','sdd'], - '3600a0b800067fcc9000001f34d23ff88' : ['sda', 'sdc']}) - topology = mpath.parseMultipathOutput(self.output4) - self.assertEqual(topology, - {'3600a0b800067fabc000067694d23fe6e' : ['sdb','sdd'], - '3600a0b800067fcc9000001f34d23ff88' : ['sda', 'sdc']}) - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(MPathTestCase) diff --git a/tests/storage_test/devicelibs_test/swap_test.py b/tests/storage_test/devicelibs_test/swap_test.py deleted file mode 100755 index 3808943cc..000000000 --- a/tests/storage_test/devicelibs_test/swap_test.py +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/python -import baseclass -import unittest -from mock import acceptance - -class SwapTestCase(baseclass.DevicelibsTestCase): - - def testSwap(self): - _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] - _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] - - import storage.devicelibs.swap as swap - - @acceptance - def testSwap(self): - ## - ## mkswap - ## - # pass - self.assertEqual(swap.mkswap(_LOOP_DEV0, "swap"), None) - - # fail - self.assertRaises(swap.SwapError, swap.mkswap, "/not/existing/device") - - ## - ## swapon - ## - # pass - self.assertEqual(swap.swapon(_LOOP_DEV0, 1), None) - - # fail - self.assertRaises(swap.SwapError, swap.swapon, "/not/existing/device") - # not a swap partition - self.assertRaises(swap.SwapError, swap.swapon, _LOOP_DEV1) - - # pass - # make another swap - self.assertEqual(swap.mkswap(_LOOP_DEV1, "another-swap"), None) - self.assertEqual(swap.swapon(_LOOP_DEV1), None) - - ## - ## swapstatus - ## - # pass - self.assertEqual(swap.swapstatus(_LOOP_DEV0), True) - self.assertEqual(swap.swapstatus(_LOOP_DEV1), True) - - # does not fail - self.assertEqual(swap.swapstatus("/not/existing/device"), False) - - ## - ## swapoff - ## - # pass - self.assertEqual(swap.swapoff(_LOOP_DEV1), None) - - # check status - self.assertEqual(swap.swapstatus(_LOOP_DEV0), True) - self.assertEqual(swap.swapstatus(_LOOP_DEV1), False) - - self.assertEqual(swap.swapoff(_LOOP_DEV0), None) - - # fail - self.assertRaises(swap.SwapError, swap.swapoff, "/not/existing/device") - # already off - self.assertRaises(swap.SwapError, swap.swapoff, _LOOP_DEV0) - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(SwapTestCase) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/partitioning_test.py b/tests/storage_test/partitioning_test.py deleted file mode 100644 index 0a8cc75a5..000000000 --- a/tests/storage_test/partitioning_test.py +++ /dev/null @@ -1,130 +0,0 @@ -#!/usr/bin/python - -import unittest -from mock import Mock - -import parted - -import pyanaconda.anaconda_log -pyanaconda.anaconda_log.init() - -from pyanaconda.storage.partitioning import getNextPartitionType - -# disklabel-type-specific constants -# keys: disklabel type string -# values: 3-tuple of (max_primary_count, supports_extended, max_logical_count) -disklabel_types = {'dos': (4, True, 11), - 'gpt': (128, False, 0), - 'mac': (62, False, 0)} - -class PartitioningTestCase(unittest.TestCase): - def getDisk(self, disk_type, primary_count=0, - has_extended=False, logical_count=0): - """ Return a mock representing a parted.Disk. """ - disk = Mock() - - disk.type = disk_type - label_type_info = disklabel_types[disk_type] - (max_primaries, supports_extended, max_logicals) = label_type_info - - # primary partitions - disk.primaryPartitionCount = primary_count - disk.maxPrimaryPartitionCount = max_primaries - - # extended partitions - disk.supportsFeature = Mock(return_value=supports_extended) - disk.getExtendedPartition = Mock(return_value=has_extended) - - # logical partitions - disk.getMaxLogicalPartitions = Mock(return_value=max_logicals) - disk.getLogicalPartitions = Mock(return_value=[0]*logical_count) - - return disk - - def testNextPartitionType(self): - # - # DOS - # - - # empty disk, any type - disk = self.getDisk(disk_type="dos") - self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL) - - # three primaries and no extended -> extended - disk = self.getDisk(disk_type="dos", primary_count=3) - self.assertEqual(getNextPartitionType(disk), parted.PARTITION_EXTENDED) - - # three primaries and an extended -> primary - disk = self.getDisk(disk_type="dos", primary_count=3, has_extended=True) - self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL) - - # three primaries and an extended w/ no_primary -> logical - disk = self.getDisk(disk_type="dos", primary_count=3, has_extended=True) - self.assertEqual(getNextPartitionType(disk, no_primary=True), - parted.PARTITION_LOGICAL) - - # four primaries and an extended, available logical -> logical - disk = self.getDisk(disk_type="dos", primary_count=4, has_extended=True, - logical_count=9) - self.assertEqual(getNextPartitionType(disk), parted.PARTITION_LOGICAL) - - # four primaries and an extended, no available logical -> None - disk = self.getDisk(disk_type="dos", primary_count=4, has_extended=True, - logical_count=11) - self.assertEqual(getNextPartitionType(disk), None) - - # four primaries and no extended -> None - disk = self.getDisk(disk_type="dos", primary_count=4, - has_extended=False) - self.assertEqual(getNextPartitionType(disk), None) - - # free primary slot, extended, no free logical slot -> primary - disk = self.getDisk(disk_type="dos", primary_count=3, has_extended=True, - logical_count=11) - self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL) - - # free primary slot, extended, no free logical slot w/ no_primary - # -> None - disk = self.getDisk(disk_type="dos", primary_count=3, has_extended=True, - logical_count=11) - self.assertEqual(getNextPartitionType(disk, no_primary=True), None) - - # - # GPT - # - - # empty disk, any partition type - disk = self.getDisk(disk_type="gpt") - self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL) - - # no empty slots -> None - disk = self.getDisk(disk_type="gpt", primary_count=128) - self.assertEqual(getNextPartitionType(disk), None) - - # no_primary -> None - disk = self.getDisk(disk_type="gpt") - self.assertEqual(getNextPartitionType(disk, no_primary=True), None) - - # - # MAC - # - - # empty disk, any partition type - disk = self.getDisk(disk_type="mac") - self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL) - - # no empty slots -> None - disk = self.getDisk(disk_type="mac", primary_count=62) - self.assertEqual(getNextPartitionType(disk), None) - - # no_primary -> None - disk = self.getDisk(disk_type="mac") - self.assertEqual(getNextPartitionType(disk, no_primary=True), None) - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(PartitioningTestCase) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/size_test.py b/tests/storage_test/size_test.py deleted file mode 100644 index 4f002f298..000000000 --- a/tests/storage_test/size_test.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/python -# -# tests/storage/size_tests.py -# Size test cases for the pyanaconda.storage module -# -# Copyright (C) 2010 Red Hat, Inc. -# -# This copyrighted material is made available to anyone wishing to use, -# modify, copy, or redistribute it subject to the terms and conditions of -# the GNU General Public License v.2, or (at your option) any later version. -# This program is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY expressed or implied, including the implied warranties of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General -# Public License for more details. You should have received a copy of the -# GNU General Public License along with this program; if not, write to the -# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA -# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the -# source code or documentation are not subject to the GNU General Public -# License and may only be used or replicated with the express permission of -# Red Hat, Inc. -# -# Red Hat Author(s): David Cantrell <dcantrell@redhat.com> - -import unittest - -from pyanaconda import anaconda_log -anaconda_log.init() -from pyanaconda.storage.errors import * -from pyanaconda.storage.size import Size, _prefixes - -class SizeTestCase(unittest.TestCase): - def testExceptions(self): - self.assertRaises(SizeParamsError, Size) - self.assertRaises(SizeParamsError, Size, bytes=500, spec="45GB") - - self.assertRaises(SizeNotPositiveError, Size, bytes=-1) - - self.assertRaises(SizeNotPositiveError, Size, spec="0") - self.assertRaises(SizeNotPositiveError, Size, spec="-1 TB") - self.assertRaises(SizeNotPositiveError, Size, spec="-47kb") - - s = Size(bytes=500) - self.assertRaises(SizePlacesError, s.humanReadable, places=0) - - def _prefixTestHelper(self, bytes, factor, prefix, abbr): - c = bytes * factor - - s = Size(bytes=c) - self.assertEquals(s, c) - - if prefix: - u = "%sbytes" % prefix - s = Size(spec="%ld %s" % (bytes, u)) - self.assertEquals(s, c) - self.assertEquals(s.convertTo(spec=u), bytes) - - if abbr: - u = "%sb" % abbr - s = Size(spec="%ld %s" % (bytes, u)) - self.assertEquals(s, c) - self.assertEquals(s.convertTo(spec=u), bytes) - - if not prefix and not abbr: - s = Size(spec="%ld" % bytes) - self.assertEquals(s, c) - self.assertEquals(s.convertTo(), bytes) - - def testPrefixes(self): - bytes = 47L - self._prefixTestHelper(bytes, 1, None, None) - - for factor, prefix, abbr in _prefixes: - self._prefixTestHelper(bytes, factor, prefix, abbr) - - def testHumanReadable(self): - s = Size(bytes=58929971L) - self.assertEquals(s.humanReadable(), "58.9 Mb") - - s = Size(bytes=478360371L) - self.assertEquals(s.humanReadable(), "0.48 Gb") - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(SizeTestCase) - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/storagetestcase.py b/tests/storage_test/storagetestcase.py deleted file mode 100644 index 8e5df1db4..000000000 --- a/tests/storage_test/storagetestcase.py +++ /dev/null @@ -1,287 +0,0 @@ -#!/usr/bin/python - -import unittest -from mock import Mock -from mock import TestCase - -import parted - -import pyanaconda.anaconda_log -pyanaconda.anaconda_log.init() - -import pyanaconda.iutil -import pyanaconda.storage as storage -from pyanaconda.storage.formats import getFormat - -# device classes for brevity's sake -- later on, that is -from pyanaconda.storage.devices import StorageDevice -from pyanaconda.storage.devices import DiskDevice -from pyanaconda.storage.devices import PartitionDevice -from pyanaconda.storage.devices import MDRaidArrayDevice -from pyanaconda.storage.devices import DMDevice -from pyanaconda.storage.devices import LUKSDevice -from pyanaconda.storage.devices import LVMVolumeGroupDevice -from pyanaconda.storage.devices import LVMLogicalVolumeDevice -from pyanaconda.storage.devices import FileDevice - - -class StorageTestCase(TestCase): - """ StorageTestCase - - This is a base class for storage test cases. It sets up imports of - the storage package, along with an Anaconda instance and a Storage - instance. There are lots of little patches to prevent various pieces - of code from trying to access filesystems and/or devices on the host - system, along with a couple of convenience methods. - - """ - def __init__(self, *args, **kwargs): - TestCase.__init__(self, *args, **kwargs) - - self.setUpAnaconda() - - def setUpAnaconda(self): - pyanaconda.iutil.execWithRedirect = Mock() - pyanaconda.iutil.execWithCapture = Mock() - pyanaconda.iutil.execWithPulseProgress = Mock() - pyanaconda.storage.udev = Mock() - - self.anaconda = Mock() - self.setUpStorage() - - def setUpStorage(self): - self.setUpDeviceLibs() - self.storage = storage.Storage(self.anaconda) - - # device status - pyanaconda.storage.devices.StorageDevice.status = False - pyanaconda.storage.devices.DMDevice.status = False - pyanaconda.storage.devices.LUKSDevice.status = False - pyanaconda.storage.devices.LVMVolumeGroupDevice.status = False - pyanaconda.storage.devices.MDRaidArrayDevice.status = False - pyanaconda.storage.devices.FileDevice.status = False - - # prevent PartitionDevice from trying to dig around in the partition's - # geometry - pyanaconda.storage.devices.PartitionDevice._setTargetSize = StorageDevice._setTargetSize - - # prevent Ext2FS from trying to run resize2fs to get a filesystem's - # minimum size - storage.formats.fs.Ext2FS.minSize = storage.formats.DeviceFormat.minSize - storage.formats.fs.FS.migratable = storage.formats.DeviceFormat.migratable - - def setUpDeviceLibs(self): - # devicelibs shouldn't be touching or looking at the host system - - # lvm is easy because all calls to /sbin/lvm are via lvm() - storage.devicelibs.lvm.lvm = Mock() - - # mdraid is easy because all calls to /sbin/mdadm are via mdadm() - storage.devicelibs.mdraid.mdadm = Mock() - - # swap - storage.devicelibs.swap.swapstatus = Mock(return_value=False) - storage.devicelibs.swap.swapon = Mock() - storage.devicelibs.swap.swapoff = Mock() - - # dm - storage.devicelibs.dm = Mock() - - # crypto/luks - storage.devicelibs.crypto.luks_status = Mock(return_value=False) - storage.devicelibs.crypto.luks_uuid = Mock() - storage.devicelibs.crypto.luks_format = Mock() - storage.devicelibs.crypto.luks_open = Mock() - storage.devicelibs.crypto.luks_close = Mock() - storage.devicelibs.crypto.luks_add_key = Mock() - storage.devicelibs.crypto.luks_remove_key = Mock() - - # this list would normally be obtained by parsing /proc/mdstat - storage.devicelibs.mdraid.raid_levels = \ - [storage.devicelibs.mdraid.RAID10, - storage.devicelibs.mdraid.RAID0, - storage.devicelibs.mdraid.RAID1, - storage.devicelibs.mdraid.RAID4, - storage.devicelibs.mdraid.RAID5, - storage.devicelibs.mdraid.RAID6] - - def newDevice(*args, **kwargs): - """ Return a new Device instance suitable for testing. """ - args = args[1:] # drop self arg - device_class = kwargs.pop("device_class") - exists = kwargs.pop("exists", False) - part_type = kwargs.pop("part_type", parted.PARTITION_NORMAL) - device = device_class(*args, **kwargs) - - if exists: - # set up mock parted.Device w/ correct size - device._partedDevice = Mock() - device._partedDevice.getSize = Mock(return_value=float(device.size)) - device._partedDevice.sectorSize = 512 - - if isinstance(device, pyanaconda.storage.devices.PartitionDevice): - #if exists: - # device.parents = device.req_disks - device.parents = device.req_disks - - partedPartition = Mock() - - if device.disk: - part_num = device.name[len(device.disk.name):].split("p")[-1] - partedPartition.number = int(part_num) - - partedPartition.type = part_type - partedPartition.path = device.path - partedPartition.getDeviceNodeName = Mock(return_value=device.name) - partedPartition.getSize = Mock(return_value=float(device.size)) - device._partedPartition = partedPartition - - device.exists = exists - device.format.exists = exists - - if isinstance(device, pyanaconda.storage.devices.PartitionDevice): - # PartitionDevice.probe sets up data needed for resize operations - device.probe() - - return device - - def newFormat(*args, **kwargs): - """ Return a new DeviceFormat instance suitable for testing. - - Keyword Arguments: - - device_instance - StorageDevice instance this format will be - created on. This is needed for setup of - resizable formats. - - All other arguments are passed directly to - pyanaconda.storage.formats.getFormat. - """ - args = args[1:] # drop self arg - exists = kwargs.pop("exists", False) - device_instance = kwargs.pop("device_instance", None) - format = getFormat(*args, **kwargs) - if isinstance(format, storage.formats.disklabel.DiskLabel): - format._partedDevice = Mock() - format._partedDisk = Mock() - - format.exists = exists - - if format.resizable and device_instance: - format._size = device_instance.currentSize - - return format - - def destroyAllDevices(self, disks=None): - """ Remove all devices from the devicetree. - - Keyword Arguments: - - disks - a list of names of disks to remove partitions from - - Note: this is largely ripped off from partitioning.clearPartitions. - - """ - partitions = self.storage.partitions - - # Sort partitions by descending partition number to minimize confusing - # things like multiple "destroy sda5" actions due to parted renumbering - # partitions. This can still happen through the UI but it makes sense to - # avoid it where possible. - partitions.sort(key=lambda p: p.partedPartition.number, reverse=True) - for part in partitions: - if disks and part.disk.name not in disks: - continue - - devices = self.storage.deviceDeps(part) - while devices: - leaves = [d for d in devices if d.isleaf] - for leaf in leaves: - self.storage.destroyDevice(leaf) - devices.remove(leaf) - - self.storage.destroyDevice(part) - - def scheduleCreateDevice(self, *args, **kwargs): - """ Schedule an action to create the specified device. - - Verify that the device is not already in the tree and that the - act of scheduling/registering the action also adds the device to - the tree. - - Return the DeviceAction instance. - """ - device = kwargs.pop("device") - if hasattr(device, "req_disks") and \ - len(device.req_disks) == 1 and \ - not device.parents: - device.parents = device.req_disks - - devicetree = self.storage.devicetree - - self.assertEqual(devicetree.getDeviceByName(device.name), None) - action = storage.deviceaction.ActionCreateDevice(device) - devicetree.registerAction(action) - self.assertEqual(devicetree.getDeviceByName(device.name), device) - return action - - def scheduleDestroyDevice(self, *args, **kwargs): - """ Schedule an action to destroy the specified device. - - Verify that the device exists initially and that the act of - scheduling/registering the action also removes the device from - the tree. - - Return the DeviceAction instance. - """ - device = kwargs.pop("device") - devicetree = self.storage.devicetree - - self.assertEqual(devicetree.getDeviceByName(device.name), device) - action = storage.deviceaction.ActionDestroyDevice(device) - devicetree.registerAction(action) - self.assertEqual(devicetree.getDeviceByName(device.name), None) - return action - - def scheduleCreateFormat(self, *args, **kwargs): - """ Schedule an action to write a new format to a device. - - Verify that the device is already in the tree, that it is not - already set up to contain the specified format, and that the act - of registering/scheduling the action causes the new format to be - reflected in the tree. - - Return the DeviceAction instance. - """ - device = kwargs.pop("device") - format = kwargs.pop("format") - devicetree = self.storage.devicetree - - self.assertNotEqual(device.format, format) - self.assertEqual(devicetree.getDeviceByName(device.name), device) - action = storage.deviceaction.ActionCreateFormat(device, format) - devicetree.registerAction(action) - _device = devicetree.getDeviceByName(device.name) - self.assertEqual(_device.format, format) - return action - - def scheduleDestroyFormat(self, *args, **kwargs): - """ Schedule an action to remove a format from a device. - - Verify that the device is already in the tree and that the act - of registering/scheduling the action causes the new format to be - reflected in the tree. - - Return the DeviceAction instance. - """ - device = kwargs.pop("device") - devicetree = self.storage.devicetree - - self.assertEqual(devicetree.getDeviceByName(device.name), device) - action = storage.deviceaction.ActionDestroyFormat(device) - devicetree.registerAction(action) - _device = devicetree.getDeviceByName(device.name) - self.assertEqual(_device.format.type, None) - return action - - diff --git a/tests/storage_test/tsort_test.py b/tests/storage_test/tsort_test.py deleted file mode 100644 index cb96997bf..000000000 --- a/tests/storage_test/tsort_test.py +++ /dev/null @@ -1,62 +0,0 @@ - -import unittest -import pyanaconda.storage.tsort - -class TopologicalSortTestCase(unittest.TestCase): - def runTest(self): - items = [1, 2, 3, 4, 5] - edges = [(5, 4), (4, 3), (3, 2), (2, 1)] - graph = pyanaconda.storage.tsort.create_graph(items, edges) - self._tsortTest(graph) - - edges = [(5, 4), (2, 3), (1, 5)] - graph = pyanaconda.storage.tsort.create_graph(items, edges) - self._tsortTest(graph) - - edges = [(5, 4), (4, 3), (3, 2), (2, 1), (3, 5)] - graph = pyanaconda.storage.tsort.create_graph(items, edges) - self.failUnlessRaises(pyanaconda.storage.tsort.CyclicGraphError, - pyanaconda.storage.tsort.tsort, - graph) - - edges = [(5, 4), (4, 3), (3, 2), (2, 1), (2, 3)] - graph = pyanaconda.storage.tsort.create_graph(items, edges) - self.failUnlessRaises(pyanaconda.storage.tsort.CyclicGraphError, - pyanaconda.storage.tsort.tsort, - graph) - - items = ['a', 'b', 'c', 'd'] - edges = [('a', 'c'), ('c', 'b')] - graph = pyanaconda.storage.tsort.create_graph(items, edges) - self._tsortTest(graph) - - def _tsortTest(self, graph): - def check_order(order, graph): - # since multiple solutions can potentially exist, just verify - # that the ordering constraints are satisfied - for parent, child in graph['edges']: - if order.index(parent) > order.index(child): - return False - return True - - try: - order = pyanaconda.storage.tsort.tsort(graph) - except Exception as e: - self.fail(e) - - # verify output list is of the correct length - self.failIf(len(order) != len(graph['items']), - "sorted list length is incorrect") - - # verify that all ordering constraints are satisfied - self.failUnless(check_order(order, graph), - "ordering constraints not satisfied") - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(TopologicalSortTestCase) - - -if __name__ == "__main__": - unittest.main() - diff --git a/tests/storage_test/udev_test.py b/tests/storage_test/udev_test.py deleted file mode 100644 index 5d48279b4..000000000 --- a/tests/storage_test/udev_test.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/python - -import mock -import os - -class UdevTest(mock.TestCase): - - def setUp(self): - self.setupModules(["_isys", "block", "ConfigParser"]) - self.fs = mock.DiskIO() - - import pyanaconda.storage.udev - pyanaconda.storage.udev.os = mock.Mock() - pyanaconda.storage.udev.log = mock.Mock() - pyanaconda.storage.udev.open = self.fs.open - - def tearDown(self): - self.tearDownModules() - - def udev_enumerate_devices_test(self): - import pyanaconda.storage.udev - ENUMERATE_LIST = [ - '/sys/devices/pci0000:00/0000:00:1f.2/host0/target0:0:0/0:0:0:0/block/sda', - '/sys/devices/virtual/block/loop0', - '/sys/devices/virtual/block/loop1', - '/sys/devices/virtual/block/ram0', - '/sys/devices/virtual/block/ram1', - '/sys/devices/virtual/block/dm-0', - ] - - pyanaconda.storage.udev.global_udev.enumerate_devices = mock.Mock(return_value=ENUMERATE_LIST) - ret = pyanaconda.storage.udev.udev_enumerate_devices() - self.assertEqual(set(ret), - set(['/devices/pci0000:00/0000:00:1f.2/host0/target0:0:0/0:0:0:0/block/sda', - '/devices/virtual/block/loop0', '/devices/virtual/block/loop1', - '/devices/virtual/block/ram0', '/devices/virtual/block/ram1', - '/devices/virtual/block/dm-0']) - ) - - def udev_get_device_1_test(self): - import pyanaconda.storage.udev - - class Device(object): - def __init__(self): - self.sysname = 'loop1' - self.dict = {'symlinks': ['/dev/block/7:1'], - 'SUBSYSTEM': 'block', - 'MAJOR': '7', - 'DEVPATH': '/devices/virtual/block/loop1', - 'UDISKS_PRESENTATION_NOPOLICY': '1', - 'UDEV_LOG': '3', - 'DEVNAME': '/dev/loop1', - 'DEVTYPE': 'disk', - 'DEVLINKS': '/dev/block/7:1', - 'MINOR': '1' - } - - def __getitem__(self, key): - return self.dict[key] - - def __setitem__(self, key, value): - self.dict[key] = value - - pyanaconda.storage.udev.os.path.exists.return_value = True - DEV_PATH = '/devices/virtual/block/loop1' - dev = Device() - pyanaconda.storage.udev.global_udev = mock.Mock() - pyanaconda.storage.udev.global_udev.create_device.return_value = dev - pyanaconda.storage.udev.udev_parse_uevent_file = mock.Mock(return_value=dev) - - ret = pyanaconda.storage.udev.udev_get_device(DEV_PATH) - self.assertTrue(isinstance(ret, Device)) - self.assertEqual(ret['name'], ret.sysname) - self.assertEqual(ret['sysfs_path'], DEV_PATH) - self.assertTrue(pyanaconda.storage.udev.udev_parse_uevent_file.called) - - def udev_get_device_2_test(self): - import pyanaconda.storage.udev - pyanaconda.storage.udev.os.path.exists.return_value = False - ret = pyanaconda.storage.udev.udev_get_device('') - self.assertEqual(ret, None) - - def udev_get_device_3_test(self): - import pyanaconda.storage.udev - pyanaconda.storage.udev.os.path.exists.return_value = True - pyanaconda.storage.udev.global_udev = mock.Mock() - pyanaconda.storage.udev.global_udev.create_device.return_value = None - ret = pyanaconda.storage.udev.udev_get_device('') - self.assertEqual(ret, None) - - def udev_get_devices_test(self): - import pyanaconda.storage.udev - pyanaconda.storage.udev.udev_settle = mock.Mock() - DEVS = \ - ['/devices/pci0000:00/0000:00:1f.2/host0/target0:0:0/0:0:0:0/block/sda', - '/devices/virtual/block/loop0', '/devices/virtual/block/loop1', - '/devices/virtual/block/ram0', '/devices/virtual/block/ram1', - '/devices/virtual/block/dm-0'] - pyanaconda.storage.udev.udev_enumerate_devices = mock.Mock(return_value=DEVS) - pyanaconda.storage.udev.udev_get_device = lambda x: x - ret = pyanaconda.storage.udev.udev_get_devices() - self.assertEqual(ret, DEVS) - - def udev_parse_uevent_file_1_test(self): - import pyanaconda.storage.udev - pyanaconda.storage.udev.os.path.normpath = os.path.normpath - pyanaconda.storage.udev.os.access.return_value = True - - FILE_CONTENT = "MAJOR=7\nMINOR=1\nDEVNAME=loop1\nDEVTYPE=disk\n" - self.fs.open('/sys/devices/virtual/block/loop1/uevent', 'w').write(FILE_CONTENT) - dev = {'sysfs_path': '/devices/virtual/block/loop1'} - ret = pyanaconda.storage.udev.udev_parse_uevent_file(dev) - self.assertEqual(ret, - {'sysfs_path': '/devices/virtual/block/loop1', - 'DEVNAME': 'loop1', - 'DEVTYPE': 'disk', - 'MAJOR': '7', - 'MINOR': '1'}) - - def udev_parse_uevent_file_2_test(self): - import pyanaconda.storage.udev - pyanaconda.storage.udev.os.path.normpath = os.path.normpath - pyanaconda.storage.udev.os.access.return_value = False - - dev = {'sysfs_path': '/devices/virtual/block/loop1'} - ret = pyanaconda.storage.udev.udev_parse_uevent_file(dev) - self.assertEqual(ret, {'sysfs_path': '/devices/virtual/block/loop1'}) - - def udev_settle_test(self): - import pyanaconda.storage.udev - pyanaconda.storage.udev.util = mock.Mock() - pyanaconda.storage.udev.udev_settle() - self.assertTrue(pyanaconda.storage.udev.util.run_program.called) - - def udev_trigger_test(self): - import pyanaconda.storage.udev - pyanaconda.storage.udev.util = mock.Mock() - pyanaconda.storage.udev.udev_trigger() - self.assertTrue(pyanaconda.storage.udev.util.run_program.called) |