summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/Makefile.am2
-rw-r--r--tests/storage_test/Makefile.am24
-rw-r--r--tests/storage_test/action_test.py926
-rw-r--r--tests/storage_test/devicelibs_test/Makefile.am22
-rw-r--r--tests/storage_test/devicelibs_test/baseclass.py80
-rwxr-xr-xtests/storage_test/devicelibs_test/crypto_test.py135
-rw-r--r--tests/storage_test/devicelibs_test/edd_test.py212
-rwxr-xr-xtests/storage_test/devicelibs_test/lvm_test.py239
-rwxr-xr-xtests/storage_test/devicelibs_test/mdraid_test.py114
-rwxr-xr-xtests/storage_test/devicelibs_test/mpath_test.py86
-rwxr-xr-xtests/storage_test/devicelibs_test/swap_test.py74
-rw-r--r--tests/storage_test/partitioning_test.py130
-rw-r--r--tests/storage_test/size_test.py86
-rw-r--r--tests/storage_test/storagetestcase.py287
-rw-r--r--tests/storage_test/tsort_test.py62
-rw-r--r--tests/storage_test/udev_test.py139
16 files changed, 1 insertions, 2617 deletions
diff --git a/tests/Makefile.am b/tests/Makefile.am
index df244c9ce..cb4c70115 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -17,6 +17,6 @@
#
# Author: David Cantrell <dcantrell@redhat.com>
-SUBDIRS = mock kickstart_test storage_test regex pyanaconda_test pylint logpicker_test
+SUBDIRS = mock kickstart_test regex pyanaconda_test pylint logpicker_test
MAINTAINERCLEANFILES = Makefile.in
diff --git a/tests/storage_test/Makefile.am b/tests/storage_test/Makefile.am
deleted file mode 100644
index 23565abb8..000000000
--- a/tests/storage_test/Makefile.am
+++ /dev/null
@@ -1,24 +0,0 @@
-# tests/storage/Makefile.am for anaconda
-#
-# Copyright (C) 2009 Red Hat, Inc.
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as published
-# by the Free Software Foundation; either version 2.1 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: David Cantrell <dcantrell@redhat.com>
-
-SUBDIRS = devicelibs_test
-
-EXTRA_DIST = *.py
-
-MAINTAINERCLEANFILES = Makefile.in
diff --git a/tests/storage_test/action_test.py b/tests/storage_test/action_test.py
deleted file mode 100644
index b07a44b23..000000000
--- a/tests/storage_test/action_test.py
+++ /dev/null
@@ -1,926 +0,0 @@
-#!/usr/bin/python
-
-import unittest
-from mock import Mock
-from mock import TestCase
-
-from storagetestcase import StorageTestCase
-import pyanaconda.storage as storage
-from pyanaconda.storage.formats import getFormat
-
-# device classes for brevity's sake -- later on, that is
-from pyanaconda.storage.devices import StorageDevice
-from pyanaconda.storage.devices import DiskDevice
-from pyanaconda.storage.devices import PartitionDevice
-from pyanaconda.storage.devices import MDRaidArrayDevice
-from pyanaconda.storage.devices import DMDevice
-from pyanaconda.storage.devices import LUKSDevice
-from pyanaconda.storage.devices import LVMVolumeGroupDevice
-from pyanaconda.storage.devices import LVMLogicalVolumeDevice
-from pyanaconda.storage.devices import FileDevice
-
-# action classes
-from pyanaconda.storage.deviceaction import ActionCreateDevice
-from pyanaconda.storage.deviceaction import ActionResizeDevice
-from pyanaconda.storage.deviceaction import ActionDestroyDevice
-from pyanaconda.storage.deviceaction import ActionCreateFormat
-from pyanaconda.storage.deviceaction import ActionResizeFormat
-from pyanaconda.storage.deviceaction import ActionMigrateFormat
-from pyanaconda.storage.deviceaction import ActionDestroyFormat
-
-""" DeviceActionTestSuite """
-
-class DeviceActionTestCase(StorageTestCase):
- def setUp(self):
- """ Create something like a preexisting autopart on two disks (sda,sdb).
-
- The other two disks (sdc,sdd) are left for individual tests to use.
- """
- self.setUpAnaconda()
-
- for name in ["sda", "sdb", "sdc", "sdd"]:
- disk = self.newDevice(device_class=DiskDevice,
- name=name, size=100000)
- disk.format = self.newFormat("disklabel", path=disk.path,
- exists=True)
- self.storage.devicetree._addDevice(disk)
-
- # create a layout similar to autopart as a starting point
- sda = self.storage.devicetree.getDeviceByName("sda")
- sdb = self.storage.devicetree.getDeviceByName("sdb")
-
- sda1 = self.newDevice(device_class=PartitionDevice,
- exists=True, name="sda1", parents=[sda], size=500)
- sda1.format = self.newFormat("ext4", mountpoint="/boot",
- device_instance=sda1,
- device=sda1.path, exists=True)
- self.storage.devicetree._addDevice(sda1)
-
- sda2 = self.newDevice(device_class=PartitionDevice,
- size=99500, name="sda2", parents=[sda], exists=True)
- sda2.format = self.newFormat("lvmpv", device=sda2.path, exists=True)
- self.storage.devicetree._addDevice(sda2)
-
- sdb1 = self.newDevice(device_class=PartitionDevice,
- size=99999, name="sdb1", parents=[sdb], exists=True)
- sdb1.format = self.newFormat("lvmpv", device=sdb1.path, exists=True)
- self.storage.devicetree._addDevice(sdb1)
-
- vg = self.newDevice(device_class=LVMVolumeGroupDevice,
- name="VolGroup", parents=[sda2, sdb1],
- exists=True)
- self.storage.devicetree._addDevice(vg)
-
- lv_root = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="lv_root", vgdev=vg, size=160000,
- exists=True)
- lv_root.format = self.newFormat("ext4", mountpoint="/",
- device_instance=lv_root,
- device=lv_root.path, exists=True)
- self.storage.devicetree._addDevice(lv_root)
-
- lv_swap = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="lv_swap", vgdev=vg, size=4000,
- exists=True)
- lv_swap.format = self.newFormat("swap", device=lv_swap.path,
- device_instance=lv_swap,
- exists=True)
- self.storage.devicetree._addDevice(lv_swap)
-
- def testActions(self, *args, **kwargs):
- """ Verify correct management of actions.
-
- - action creation/registration/cancellation
- - ActionCreateDevice adds device to tree
- - ActionDestroyDevice removes device from tree
- - ActionCreateFormat sets device.format in tree
- - ActionDestroyFormat unsets device.format in tree
- - cancelled action's registration side-effects reversed
- - failure to register destruction of non-leaf device
- - failure to register creation of device already in tree?
- - failure to register destruction of device not in tree?
-
- - action pruning
- - non-existent-device create..destroy cycles removed
- - all actions on this device should get removed
- - all actions pruned from to-be-destroyed devices
- - resize, format, migrate, &c
- - redundant resize/migrate/format actions pruned
- - last one registered stays
-
- - action sorting
- - destroy..resize..migrate..create
- - creation
- - leaves-last, including formatting
- - destruction
- - leaves-first
- """
- devicetree = self.storage.devicetree
-
- # clear the disks
- self.destroyAllDevices()
- self.assertEqual(devicetree.getDevicesByType("lvmlv"), [])
- self.assertEqual(devicetree.getDevicesByType("lvmvg"), [])
- self.assertEqual(devicetree.getDevicesByType("partition"), [])
-
- sda = devicetree.getDeviceByName("sda")
- self.assertNotEqual(sda, None, "failed to find disk 'sda'")
-
- sda1 = self.newDevice(device_class=PartitionDevice,
- name="sda1", size=500, parents=[sda])
- self.scheduleCreateDevice(device=sda1)
-
- sda2 = self.newDevice(device_class=PartitionDevice,
- name="sda2", size=100000, parents=[sda])
- self.scheduleCreateDevice(device=sda2)
- format = self.newFormat("lvmpv", device=sda2.path)
- self.scheduleCreateFormat(device=sda2, format=format)
-
- vg = self.newDevice(device_class=LVMVolumeGroupDevice,
- name="vg", parents=[sda2])
- self.scheduleCreateDevice(device=vg)
-
- lv_root = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="lv_root", vgdev=vg, size=60000)
- self.scheduleCreateDevice(device=lv_root)
- format = self.newFormat("ext4", device=lv_root.path, mountpoint="/")
- self.scheduleCreateFormat(device=lv_root, format=format)
-
- lv_swap = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="lv_swap", vgdev=vg, size=4000)
- self.scheduleCreateDevice(device=lv_swap)
- format = self.newFormat("swap", device=lv_swap.path)
- self.scheduleCreateFormat(device=lv_swap, format=format)
-
- sda3 = self.newDevice(device_class=PartitionDevice,
- name="sda3", parents=[sda], size=40000)
- self.scheduleCreateDevice(device=sda3)
- format = self.newFormat("mdmember", device=sda3.path)
- self.scheduleCreateFormat(device=sda3, format=format)
-
- sdb = devicetree.getDeviceByName("sdb")
- self.assertNotEqual(sdb, None, "failed to find disk 'sdb'")
-
- sdb1 = self.newDevice(device_class=PartitionDevice,
- name="sdb1", parents=[sdb], size=40000)
- self.scheduleCreateDevice(device=sdb1)
- format = self.newFormat("mdmember", device=sdb1.path,)
- self.scheduleCreateFormat(device=sdb1, format=format)
-
- md0 = self.newDevice(device_class=MDRaidArrayDevice,
- name="md0", level="raid0", minor=0, size=80000,
- memberDevices=2, totalDevices=2,
- parents=[sdb1, sda3])
- self.scheduleCreateDevice(device=md0)
-
- format = self.newFormat("ext4", device=md0.path, mountpoint="/home")
- self.scheduleCreateFormat(device=md0, format=format)
-
- format = self.newFormat("ext4", mountpoint="/boot", device=sda1.path)
- self.scheduleCreateFormat(device=sda1, format=format)
-
- def testActionCreation(self, *args, **kwargs):
- """ Verify correct operation of action class constructors. """
- # instantiation of device resize action for non-existent device should
- # fail
- # XXX resizable depends on existence, so this is covered implicitly
- sdd = self.storage.devicetree.getDeviceByName("sdd")
- p = self.newDevice(device_class=PartitionDevice,
- name="sdd1", size=32768, parents=[sdd])
- self.failUnlessRaises(ValueError,
- storage.deviceaction.ActionResizeDevice,
- p,
- p.size + 7232)
-
- # instantiation of device resize action for non-resizable device
- # should fail
- vg = self.storage.devicetree.getDeviceByName("VolGroup")
- self.assertNotEqual(vg, None)
- self.failUnlessRaises(ValueError,
- storage.deviceaction.ActionResizeDevice,
- vg,
- vg.size + 32)
-
- # instantiation of format resize action for non-resizable format type
- # should fail
- lv_swap = self.storage.devicetree.getDeviceByName("VolGroup-lv_swap")
- self.assertNotEqual(lv_swap, None)
- self.failUnlessRaises(ValueError,
- storage.deviceaction.ActionResizeFormat,
- lv_swap,
- lv_swap.size + 32)
-
- # instantiation of format resize action for non-existent format
- # should fail
- lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root")
- self.assertNotEqual(lv_root, None)
- lv_root.format.exists = False
- self.failUnlessRaises(ValueError,
- storage.deviceaction.ActionResizeFormat,
- lv_root,
- lv_root.size - 1000)
- lv_root.format.exists = True
-
- # instantiation of format migrate action for non-migratable format
- # type should fail
- lv_swap = self.storage.devicetree.getDeviceByName("VolGroup-lv_swap")
- self.assertNotEqual(lv_swap, None)
- self.assertEqual(lv_swap.exists, True)
- self.failUnlessRaises(ValueError,
- storage.deviceaction.ActionMigrateFormat,
- lv_swap)
-
- # instantiation of format migrate for non-existent format should fail
- lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root")
- self.assertNotEqual(lv_root, None)
- orig_format = lv_root.format
- lv_root.format = getFormat("ext3", device=lv_root.path)
- self.failUnlessRaises(ValueError,
- storage.deviceaction.ActionMigrateFormat,
- lv_root)
- lv_root.format = orig_format
-
- # instantiation of device create action for existing device should
- # fail
- lv_swap = self.storage.devicetree.getDeviceByName("VolGroup-lv_swap")
- self.assertNotEqual(lv_swap, None)
- self.assertEqual(lv_swap.exists, True)
- self.failUnlessRaises(ValueError,
- storage.deviceaction.ActionCreateDevice,
- lv_swap)
-
- # instantiation of format destroy action for device causes device's
- # format attribute to be a DeviceFormat instance
- lv_swap = self.storage.devicetree.getDeviceByName("VolGroup-lv_swap")
- self.assertNotEqual(lv_swap, None)
- orig_format = lv_swap.format
- self.assertEqual(lv_swap.format.type, "swap")
- a = storage.deviceaction.ActionDestroyFormat(lv_swap)
- self.assertEqual(lv_swap.format.type, None)
-
- # instantiation of format create action for device causes new format
- # to be accessible via device's format attribute
- new_format = getFormat("vfat", device=lv_swap.path)
- a = storage.deviceaction.ActionCreateFormat(lv_swap, new_format)
- self.assertEqual(lv_swap.format, new_format)
- lv_swap.format = orig_format
-
- def testActionRegistration(self, *args, **kwargs):
- """ Verify correct operation of action registration and cancelling. """
- # self.setUp has just been run, so we should have something like
- # a preexisting autopart config in the devicetree.
-
- # registering a destroy action for a non-leaf device should fail
- vg = self.storage.devicetree.getDeviceByName("VolGroup")
- self.assertNotEqual(vg, None)
- self.assertEqual(vg.isleaf, False)
- a = storage.deviceaction.ActionDestroyDevice(vg)
- self.failUnlessRaises(ValueError,
- self.storage.devicetree.registerAction,
- a)
-
- # registering any action other than create for a device that's not in
- # the devicetree should fail
- sdc = self.storage.devicetree.getDeviceByName("sdc")
- self.assertNotEqual(sdc, None)
- sdc1 = self.newDevice(device_class=PartitionDevice,
- name="sdc1", size=100000, parents=[sdc],
- exists=True)
-
- sdc1_format = self.newFormat("ext2", device=sdc1.path, mountpoint="/")
- create_sdc1_format = ActionCreateFormat(sdc1, sdc1_format)
- self.failUnlessRaises(storage.errors.DeviceTreeError,
- self.storage.devicetree.registerAction,
- create_sdc1_format)
-
- sdc1_format.exists = True
-
- migrate_sdc1 = ActionMigrateFormat(sdc1)
- self.failUnlessRaises(storage.errors.DeviceTreeError,
- self.storage.devicetree.registerAction,
- migrate_sdc1)
- migrate_sdc1.cancel()
-
- resize_sdc1_format = ActionResizeFormat(sdc1, sdc1.size - 10000)
- self.failUnlessRaises(storage.errors.DeviceTreeError,
- self.storage.devicetree.registerAction,
- resize_sdc1_format)
-
- resize_sdc1 = ActionResizeDevice(sdc1, sdc1.size - 10000)
- self.failUnlessRaises(storage.errors.DeviceTreeError,
- self.storage.devicetree.registerAction,
- resize_sdc1)
-
- resize_sdc1.cancel()
- resize_sdc1_format.cancel()
-
- destroy_sdc1_format = ActionDestroyFormat(sdc1)
- self.failUnlessRaises(storage.errors.DeviceTreeError,
- self.storage.devicetree.registerAction,
- destroy_sdc1_format)
-
-
- destroy_sdc1 = ActionDestroyDevice(sdc1)
- self.failUnlessRaises(storage.errors.DeviceTreeError,
- self.storage.devicetree.registerAction,
- resize_sdc1)
-
- # registering a device destroy action should cause the device to be
- # removed from the devicetree
- lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root")
- self.assertNotEqual(lv_root, None)
- a = ActionDestroyDevice(lv_root)
- self.storage.devicetree.registerAction(a)
- lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root")
- self.assertEqual(lv_root, None)
- self.storage.devicetree.cancelAction(a)
-
- # registering a device create action should cause the device to be
- # added to the devicetree
- sdd = self.storage.devicetree.getDeviceByName("sdd")
- self.assertNotEqual(sdd, None)
- sdd1 = self.storage.devicetree.getDeviceByName("sdd1")
- self.assertEqual(sdd1, None)
- sdd1 = self.newDevice(device_class=PartitionDevice,
- name="sdd1", size=100000, parents=[sdd])
- a = ActionCreateDevice(sdd1)
- self.storage.devicetree.registerAction(a)
- sdd1 = self.storage.devicetree.getDeviceByName("sdd1")
- self.assertNotEqual(sdd1, None)
-
- def testActionObsoletes(self, *args, **kwargs):
- """ Verify correct operation of DeviceAction.obsoletes. """
- self.destroyAllDevices(disks=["sdc"])
- sdc = self.storage.devicetree.getDeviceByName("sdc")
- self.assertNotEqual(sdc, None)
-
- sdc1 = self.newDevice(device_class=PartitionDevice,
- name="sdc1", parents=[sdc], size=40000)
-
- # ActionCreateDevice
- #
- # - obsoletes other ActionCreateDevice instances w/ lower id and same
- # device
- create_device_1 = ActionCreateDevice(sdc1)
- create_device_2 = ActionCreateDevice(sdc1)
- self.assertEqual(create_device_2.obsoletes(create_device_1), True)
- self.assertEqual(create_device_1.obsoletes(create_device_2), False)
-
- # ActionCreateFormat
- #
- # - obsoletes other ActionCreateFormat instances w/ lower id and same
- # device
- format_1 = self.newFormat("ext3", mountpoint="/home", device=sdc1.path)
- format_2 = self.newFormat("ext3", mountpoint="/opt", device=sdc1.path)
- create_format_1 = ActionCreateFormat(sdc1, format_1)
- create_format_2 = ActionCreateFormat(sdc1, format_2)
- self.assertEqual(create_format_2.obsoletes(create_format_1), True)
- self.assertEqual(create_format_1.obsoletes(create_format_2), False)
-
- # ActionMigrateFormat
- #
- # - obsoletes other ActionMigrateFormat instances w/ lower id and same
- # device
- sdc1.format = self.newFormat("ext2", mountpoint="/", device=sdc1.path,
- device_instance=sdc1,
- exists=True)
- migrate_1 = ActionMigrateFormat(sdc1)
- migrate_2 = ActionMigrateFormat(sdc1)
- self.assertEqual(migrate_2.obsoletes(migrate_1), True)
- self.assertEqual(migrate_1.obsoletes(migrate_2), False)
-
- # ActionResizeFormat
- #
- # - obsoletes other ActionResizeFormat instances w/ lower id and same
- # device
- resize_format_1 = ActionResizeFormat(sdc1, sdc1.size - 1000)
- resize_format_2 = ActionResizeFormat(sdc1, sdc1.size - 5000)
- self.assertEqual(resize_format_2.obsoletes(resize_format_1), True)
- self.assertEqual(resize_format_1.obsoletes(resize_format_2), False)
-
- # ActionCreateFormat
- #
- # - obsoletes migrate, resize format actions w/ lower id on same device
- new_format = self.newFormat("ext4", mountpoint="/foo", device=sdc1.path)
- create_format_3 = ActionCreateFormat(sdc1, new_format)
- self.assertEqual(create_format_3.obsoletes(resize_format_1), True)
- self.assertEqual(create_format_3.obsoletes(resize_format_2), True)
- self.assertEqual(create_format_3.obsoletes(migrate_1), True)
- self.assertEqual(create_format_3.obsoletes(migrate_2), True)
-
- # ActionResizeDevice
- #
- # - obsoletes other ActionResizeDevice instances w/ lower id and same
- # device
- sdc1.exists = True
- sdc1.format.exists = True
- resize_device_1 = ActionResizeDevice(sdc1, sdc1.size + 10000)
- resize_device_2 = ActionResizeDevice(sdc1, sdc1.size - 10000)
- self.assertEqual(resize_device_2.obsoletes(resize_device_1), True)
- self.assertEqual(resize_device_1.obsoletes(resize_device_2), False)
- sdc1.exists = False
- sdc1.format.exists = False
-
- # ActionDestroyFormat
- #
- # - obsoletes all format actions w/ lower id on same device (including
- # self if format does not exist)
- destroy_format_1 = ActionDestroyFormat(sdc1)
- self.assertEqual(destroy_format_1.obsoletes(create_format_1), True)
- self.assertEqual(destroy_format_1.obsoletes(migrate_2), True)
- self.assertEqual(destroy_format_1.obsoletes(resize_format_1), True)
- self.assertEqual(destroy_format_1.obsoletes(destroy_format_1), True)
-
- # ActionDestroyDevice
- #
- # - obsoletes all actions w/ lower id that act on the same non-existent
- # device (including self)
- # sdc1 does not exist
- destroy_sdc1 = ActionDestroyDevice(sdc1)
- self.assertEqual(destroy_sdc1.obsoletes(create_format_2), True)
- self.assertEqual(destroy_sdc1.obsoletes(migrate_1), True)
- self.assertEqual(destroy_sdc1.obsoletes(resize_format_2), True)
- self.assertEqual(destroy_sdc1.obsoletes(create_device_1), True)
- self.assertEqual(destroy_sdc1.obsoletes(resize_device_1), True)
- self.assertEqual(destroy_sdc1.obsoletes(destroy_sdc1), True)
-
- # ActionDestroyDevice
- #
- # - obsoletes all but ActionDestroyFormat actions w/ lower id on the
- # same existing device
- # sda1 exists
- sda1 = self.storage.devicetree.getDeviceByName("sda1")
- self.assertNotEqual(sda1, None)
- resize_sda1_format = ActionResizeFormat(sda1, sda1.size - 50)
- resize_sda1 = ActionResizeDevice(sda1, sda1.size - 50)
- destroy_sda1_format = ActionDestroyFormat(sda1)
- destroy_sda1 = ActionDestroyDevice(sda1)
- self.assertEqual(destroy_sda1.obsoletes(resize_sda1_format), True)
- self.assertEqual(destroy_sda1.obsoletes(resize_sda1), True)
- self.assertEqual(destroy_sda1.obsoletes(destroy_sda1), False)
- self.assertEqual(destroy_sda1.obsoletes(destroy_sda1_format), False)
-
- def testActionPruning(self, *args, **kwargs):
- """ Verify correct functioning of action pruning. """
- self.destroyAllDevices()
-
- sda = self.storage.devicetree.getDeviceByName("sda")
- self.assertNotEqual(sda, None, "failed to find disk 'sda'")
-
- sda1 = self.newDevice(device_class=PartitionDevice,
- name="sda1", size=500, parents=[sda])
- self.scheduleCreateDevice(device=sda1)
-
- sda2 = self.newDevice(device_class=PartitionDevice,
- name="sda2", size=100000, parents=[sda])
- self.scheduleCreateDevice(device=sda2)
- format = self.newFormat("lvmpv", device=sda2.path)
- self.scheduleCreateFormat(device=sda2, format=format)
-
- vg = self.newDevice(device_class=LVMVolumeGroupDevice,
- name="vg", parents=[sda2])
- self.scheduleCreateDevice(device=vg)
-
- lv_root = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="lv_root", vgdev=vg, size=60000)
- self.scheduleCreateDevice(device=lv_root)
- format = self.newFormat("ext4", device=lv_root.path, mountpoint="/")
- self.scheduleCreateFormat(device=lv_root, format=format)
-
- lv_swap = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="lv_swap", vgdev=vg, size=4000)
- self.scheduleCreateDevice(device=lv_swap)
- format = self.newFormat("swap", device=lv_swap.path)
- self.scheduleCreateFormat(device=lv_swap, format=format)
-
- # we'll soon schedule destroy actions for these members and the array,
- # which will test pruning. the whole mess should reduce to nothing
- sda3 = self.newDevice(device_class=PartitionDevice,
- name="sda3", parents=[sda], size=40000)
- self.scheduleCreateDevice(device=sda3)
- format = self.newFormat("mdmember", device=sda3.path)
- self.scheduleCreateFormat(device=sda3, format=format)
-
- sdb = self.storage.devicetree.getDeviceByName("sdb")
- self.assertNotEqual(sdb, None, "failed to find disk 'sdb'")
-
- sdb1 = self.newDevice(device_class=PartitionDevice,
- name="sdb1", parents=[sdb], size=40000)
- self.scheduleCreateDevice(device=sdb1)
- format = self.newFormat("mdmember", device=sdb1.path,)
- self.scheduleCreateFormat(device=sdb1, format=format)
-
- md0 = self.newDevice(device_class=MDRaidArrayDevice,
- name="md0", level="raid0", minor=0, size=80000,
- memberDevices=2, totalDevices=2,
- parents=[sdb1, sda3])
- self.scheduleCreateDevice(device=md0)
-
- format = self.newFormat("ext4", device=md0.path, mountpoint="/home")
- self.scheduleCreateFormat(device=md0, format=format)
-
- # now destroy the md and its components
- self.scheduleDestroyFormat(device=md0)
- self.scheduleDestroyDevice(device=md0)
- self.scheduleDestroyDevice(device=sdb1)
- self.scheduleDestroyDevice(device=sda3)
-
- format = self.newFormat("ext4", mountpoint="/boot", device=sda1.path)
- self.scheduleCreateFormat(device=sda1, format=format)
-
- # verify the md actions are present prior to pruning
- md0_actions = self.storage.devicetree.findActions(devid=md0.id)
- self.assertNotEqual(len(md0_actions), 0)
-
- sdb1_actions = self.storage.devicetree.findActions(devid=sdb1.id)
- self.assertNotEqual(len(sdb1_actions), 0)
-
- sda3_actions = self.storage.devicetree.findActions(devid=sda3.id)
- self.assertNotEqual(len(sda3_actions), 0)
-
- self.storage.devicetree.pruneActions()
-
- # verify the md actions are gone after pruning
- md0_actions = self.storage.devicetree.findActions(devid=md0.id)
- self.assertEqual(len(md0_actions), 0)
-
- sdb1_actions = self.storage.devicetree.findActions(devid=sdb1.id)
- self.assertEqual(len(sdb1_actions), 0)
-
- sda3_actions = self.storage.devicetree.findActions(sda3.id)
- self.assertEqual(len(sda3_actions), 0)
-
- def testActionDependencies(self, *args, **kwargs):
- """ Verify correct functioning of action dependencies. """
- # ActionResizeDevice
- # an action that shrinks a device should require the action that
- # shrinks the device's format
- lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root")
- self.assertNotEqual(lv_root, None)
- shrink_format = ActionResizeFormat(lv_root, lv_root.size - 5000)
- shrink_device = ActionResizeDevice(lv_root, lv_root.size - 5000)
- self.assertEqual(shrink_device.requires(shrink_format), True)
- self.assertEqual(shrink_format.requires(shrink_device), False)
- shrink_format.cancel()
- shrink_device.cancel()
-
- # ActionResizeDevice
- # an action that grows a format should require the action that
- # grows the device
- orig_size = lv_root.currentSize
- grow_device = ActionResizeDevice(lv_root, orig_size + 100)
- grow_format = ActionResizeFormat(lv_root, orig_size + 100)
- self.assertEqual(grow_format.requires(grow_device), True)
- self.assertEqual(grow_device.requires(grow_format), False)
-
- # create something like uncommitted autopart
- self.destroyAllDevices()
- sda = self.storage.devicetree.getDeviceByName("sda")
- sdb = self.storage.devicetree.getDeviceByName("sdb")
- sda1 = self.newDevice(device_class=PartitionDevice,
- name="sda1", size=500, parents=[sda])
- sda1_format = self.newFormat("ext4", mountpoint="/boot",
- device=sda1.path)
- self.scheduleCreateDevice(device=sda1)
- self.scheduleCreateFormat(device=sda1, format=sda1_format)
-
- sda2 = self.newDevice(device_class=PartitionDevice,
- name="sda2", size=99500, parents=[sda])
- sda2_format = self.newFormat("lvmpv", device=sda1.path)
- self.scheduleCreateDevice(device=sda2)
- self.scheduleCreateFormat(device=sda2, format=sda2_format)
-
- sdb1 = self.newDevice(device_class=PartitionDevice,
- name="sdb1", size=100000, parents=[sdb])
- sdb1_format = self.newFormat("lvmpv", device=sdb1.path)
- self.scheduleCreateDevice(device=sdb1)
- self.scheduleCreateFormat(device=sdb1, format=sdb1_format)
-
- vg = self.newDevice(device_class=LVMVolumeGroupDevice,
- name="VolGroup", parents=[sda2, sdb1])
- self.scheduleCreateDevice(device=vg)
-
- lv_root = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="lv_root", vgdev=vg, size=160000)
- self.scheduleCreateDevice(device=lv_root)
- format = self.newFormat("ext4", device=lv_root.path, mountpoint="/")
- self.scheduleCreateFormat(device=lv_root, format=format)
-
- lv_swap = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="lv_swap", vgdev=vg, size=4000)
- self.scheduleCreateDevice(device=lv_swap)
- format = self.newFormat("swap", device=lv_swap.path)
- self.scheduleCreateFormat(device=lv_swap, format=format)
-
- # ActionCreateDevice
- # creation of an LV should require the actions that create the VG,
- # its PVs, and the devices that contain the PVs
- lv_root = self.storage.devicetree.getDeviceByName("VolGroup-lv_root")
- self.assertNotEqual(lv_root, None)
- actions = self.storage.devicetree.findActions(type="create",
- object="device",
- device=lv_root)
- self.assertEqual(len(actions), 1,
- "wrong number of device create actions for lv_root: "
- "%d" % len(actions))
- create_lv_action = actions[0]
-
- vgs = [d for d in self.storage.vgs if d.name == "VolGroup"]
- self.assertNotEqual(vgs, [])
- vg = vgs[0]
- actions = self.storage.devicetree.findActions(type="create",
- object="device",
- device=vg)
- self.assertEqual(len(actions), 1,
- "wrong number of device create actions for VolGroup")
- create_vg_action = actions[0]
-
- self.assertEqual(create_lv_action.requires(create_vg_action), True)
-
- create_pv_actions = []
- pvs = [d for d in self.storage.pvs if d in vg.pvs]
- self.assertNotEqual(pvs, [])
- for pv in pvs:
- # include device and format create actions for each pv
- actions = self.storage.devicetree.findActions(type="create",
- device=pv)
- self.assertEqual(len(actions), 2,
- "wrong number of device create actions for "
- "pv %s" % pv.name)
- create_pv_actions.append(actions[0])
-
- for pv_action in create_pv_actions:
- self.assertEqual(create_lv_action.requires(pv_action), True)
- # also check that the vg create action requires the pv actions
- self.assertEqual(create_vg_action.requires(pv_action), True)
-
- # ActionCreateDevice
- # the higher numbered partition of two that are scheduled to be
- # created on a single disk should require the action that creates the
- # lower numbered of the two, eg: create sda2 before creating sda3
- sdc = self.storage.devicetree.getDeviceByName("sdc")
- self.assertNotEqual(sdc, None)
-
- sdc1 = self.newDevice(device_class=PartitionDevice,
- name="sdc1", parents=[sdc], size=50000)
- create_sdc1 = self.scheduleCreateDevice(device=sdc1)
- self.assertEqual(isinstance(create_sdc1, ActionCreateDevice), True)
-
- sdc2 = self.newDevice(device_class=PartitionDevice,
- name="sdc2", parents=[sdc], size=50000)
- create_sdc2 = self.scheduleCreateDevice(device=sdc2)
- self.assertEqual(isinstance(create_sdc2, ActionCreateDevice), True)
-
- self.assertEqual(create_sdc2.requires(create_sdc1), True)
- self.assertEqual(create_sdc1.requires(create_sdc2), False)
-
- # ActionCreateDevice
- # actions that create partitions on two separate disks should not
- # require each other, regardless of the partitions' numbers
- sda1 = self.storage.devicetree.getDeviceByName("sda1")
- self.assertNotEqual(sda1, None)
- actions = self.storage.devicetree.findActions(type="create",
- object="device",
- device=sda1)
- self.assertEqual(len(actions), 1,
- "wrong number of create actions found for sda1")
- create_sda1 = actions[0]
- self.assertEqual(create_sdc2.requires(create_sda1), False)
- self.assertEqual(create_sda1.requires(create_sdc1), False)
-
- # ActionDestroyDevice
- # an action that destroys a device containing an mdmember format
- # should require the action that destroys the md array it is a
- # member of if an array is defined
- self.destroyAllDevices(disks=["sdc", "sdd"])
- sdc = self.storage.devicetree.getDeviceByName("sdc")
- self.assertNotEqual(sdc, None)
- sdd = self.storage.devicetree.getDeviceByName("sdd")
- self.assertNotEqual(sdd, None)
-
- sdc1 = self.newDevice(device_class=PartitionDevice,
- name="sdc1", parents=[sdc], size=40000)
- self.scheduleCreateDevice(device=sdc1)
- format = self.newFormat("mdmember", device=sdc1.path)
- self.scheduleCreateFormat(device=sdc1, format=format)
-
- sdd1 = self.newDevice(device_class=PartitionDevice,
- name="sdd1", parents=[sdd], size=40000)
- self.scheduleCreateDevice(device=sdd1)
- format = self.newFormat("mdmember", device=sdd1.path,)
- self.scheduleCreateFormat(device=sdd1, format=format)
-
- md0 = self.newDevice(device_class=MDRaidArrayDevice,
- name="md0", level="raid0", minor=0, size=80000,
- memberDevices=2, totalDevices=2,
- parents=[sdc1, sdd1])
- self.scheduleCreateDevice(device=md0)
- format = self.newFormat("ext4", device=md0.path, mountpoint="/home")
- self.scheduleCreateFormat(device=md0, format=format)
-
- destroy_md0_format = self.scheduleDestroyFormat(device=md0)
- destroy_md0 = self.scheduleDestroyDevice(device=md0)
- destroy_members = [self.scheduleDestroyDevice(device=sdc1)]
- destroy_members.append(self.scheduleDestroyDevice(device=sdd1))
-
- for member in destroy_members:
- # device and format destroy actions for md members should require
- # both device and format destroy actions for the md array
- for array in [destroy_md0_format, destroy_md0]:
- self.assertEqual(member.requires(array), True)
-
- # ActionDestroyDevice
- # when there are two actions that will each destroy a partition on the
- # same disk, the action that will destroy the lower-numbered
- # partition should require the action that will destroy the higher-
- # numbered partition, eg: destroy sda2 before destroying sda1
- self.destroyAllDevices(disks=["sdc", "sdd"])
- sdc1 = self.newDevice(device_class=PartitionDevice,
- name="sdc1", parents=[sdc], size=50000)
- self.scheduleCreateDevice(device=sdc1)
-
- sdc2 = self.newDevice(device_class=PartitionDevice,
- name="sdc2", parents=[sdc], size=40000)
- self.scheduleCreateDevice(device=sdc2)
-
- destroy_sdc1 = self.scheduleDestroyDevice(device=sdc1)
- destroy_sdc2 = self.scheduleDestroyDevice(device=sdc2)
- self.assertEqual(destroy_sdc1.requires(destroy_sdc2), True)
- self.assertEqual(destroy_sdc2.requires(destroy_sdc1), False)
-
- self.destroyAllDevices(disks=["sdc", "sdd"])
- sdc = self.storage.devicetree.getDeviceByName("sdc")
- self.assertNotEqual(sdc, None)
- sdd = self.storage.devicetree.getDeviceByName("sdd")
- self.assertNotEqual(sdd, None)
-
- sdc1 = self.newDevice(device_class=PartitionDevice,
- name="sdc1", parents=[sdc], size=50000)
- create_pv = self.scheduleCreateDevice(device=sdc1)
- format = self.newFormat("lvmpv", device=sdc1.path)
- create_pv_format = self.scheduleCreateFormat(device=sdc1, format=format)
-
- testvg = self.newDevice(device_class=LVMVolumeGroupDevice,
- name="testvg", parents=[sdc1], size=50000)
- create_vg = self.scheduleCreateDevice(device=testvg)
- testlv = self.newDevice(device_class=LVMLogicalVolumeDevice,
- name="testlv", vgdev=testvg, size=30000)
- create_lv = self.scheduleCreateDevice(device=testlv)
- format = self.newFormat("ext4", device=testlv.path)
- create_lv_format = self.scheduleCreateFormat(device=testlv, format=format)
-
- # ActionCreateFormat
- # creation of a format on a non-existent device should require the
- # action that creates the device
- self.assertEqual(create_lv_format.requires(create_lv), True)
-
- # ActionCreateFormat
- # an action that creates a format on a device should require an action
- # that creates a device that the format's device depends on
- self.assertEqual(create_lv_format.requires(create_pv), True)
- self.assertEqual(create_lv_format.requires(create_vg), True)
-
- # ActionCreateFormat
- # an action that creates a format on a device should require an action
- # that creates a format on a device that the format's device depends on
- self.assertEqual(create_lv_format.requires(create_pv_format), True)
-
- # XXX from here on, the devices are existing but not in the tree, so
- # we instantiate and use actions directly
- self.destroyAllDevices(disks=["sdc", "sdd"])
- sdc1 = self.newDevice(device_class=PartitionDevice, exists=True,
- name="sdc1", parents=[sdc], size=50000)
- sdc1.format = self.newFormat("lvmpv", device=sdc1.path, exists=True,
- device_instance=sdc1)
- testvg = self.newDevice(device_class=LVMVolumeGroupDevice, exists=True,
- name="testvg", parents=[sdc1], size=50000)
- testlv = self.newDevice(device_class=LVMLogicalVolumeDevice,
- exists=True,
- name="testlv", vgdev=testvg, size=30000)
- testlv.format = self.newFormat("ext4", device=testlv.path,
- exists=True, device_instance=testlv)
-
- # ActionResizeDevice
- # an action that resizes a device should require an action that grows
- # a device that the first action's device depends on, eg: grow
- # device containing PV before resize of VG or LVs
- tmp = sdc1.format
- sdc1.format = None # since lvmpv format is not resizable
- grow_pv = ActionResizeDevice(sdc1, sdc1.size + 10000)
- grow_lv = ActionResizeDevice(testlv, testlv.size + 5000)
- grow_lv_format = ActionResizeFormat(testlv, testlv.size + 5000)
-
- self.assertEqual(grow_lv.requires(grow_pv), True)
- self.assertEqual(grow_pv.requires(grow_lv), False)
-
- # ActionResizeFormat
- # an action that grows a format should require the action that grows
- # the format's device
- self.assertEqual(grow_lv_format.requires(grow_lv), True)
- self.assertEqual(grow_lv.requires(grow_lv_format), False)
-
- # ActionResizeFormat
- # an action that resizes a device's format should depend on an action
- # that grows a device the first device depends on
- self.assertEqual(grow_lv_format.requires(grow_pv), True)
- self.assertEqual(grow_pv.requires(grow_lv_format), False)
-
- # ActionResizeFormat
- # an action that resizes a device's format should depend on an action
- # that grows a format on a device the first device depends on
- # XXX resize of PV format is not allowed, so there's no real-life
- # example of this to test
-
- grow_lv_format.cancel()
- grow_lv.cancel()
- grow_pv.cancel()
-
- # ActionResizeDevice
- # an action that resizes a device should require an action that grows
- # a format on a device that the first action's device depends on, eg:
- # grow PV format before resize of VG or LVs
- # XXX resize of PV format is not allowed, so there's no real-life
- # example of this to test
-
- # ActionResizeDevice
- # an action that resizes a device should require an action that
- # shrinks a device that depends on the first action's device, eg:
- # shrink LV before resizing VG or PV devices
- shrink_lv = ActionResizeDevice(testlv, testlv.size - 10000)
- shrink_pv = ActionResizeDevice(sdc1, sdc1.size - 5000)
-
- self.assertEqual(shrink_pv.requires(shrink_lv), True)
- self.assertEqual(shrink_lv.requires(shrink_pv), False)
-
- # ActionResizeDevice
- # an action that resizes a device should require an action that
- # shrinks a format on a device that depends on the first action's
- # device, eg: shrink LV format before resizing VG or PV devices
- shrink_lv_format = ActionResizeFormat(testlv, testlv.size)
- self.assertEqual(shrink_pv.requires(shrink_lv_format), True)
- self.assertEqual(shrink_lv_format.requires(shrink_pv), False)
-
- # ActionResizeFormat
- # an action that resizes a device's format should depend on an action
- # that shrinks a device that depends on the first device
- # XXX can't think of a real-world example of this since PVs and MD
- # member devices are not resizable in anaconda
-
- # ActionResizeFormat
- # an action that resizes a device's format should depend on an action
- # that shrinks a format on a device that depends on the first device
- # XXX can't think of a real-world example of this since PVs and MD
- # member devices are not resizable in anaconda
-
- shrink_lv_format.cancel()
- shrink_lv.cancel()
- shrink_pv.cancel()
- sdc1.format = tmp # restore pv's lvmpv format
-
- # ActionCreateFormat
- # an action that creates a format on a device should require an action
- # that resizes a device that the format's device depends on
- # XXX Really? Is this always so?
-
- # ActionCreateFormat
- # an action that creates a format on a device should require an action
- # that resizes a format on a device that the format's device depends on
- # XXX Same as above.
-
- # ActionCreateFormat
- # an action that creates a format on a device should require an action
- # that resizes the device that will contain the format
- grow_lv = ActionResizeDevice(testlv, testlv.size + 1000)
- format = self.newFormat("msdos", device=testlv.path)
- format_lv = ActionCreateFormat(testlv, format)
- self.assertEqual(format_lv.requires(grow_lv), True)
- self.assertEqual(grow_lv.requires(format_lv), False)
-
- # ActionDestroyFormat
- # an action that destroys a format should require an action that
- # destroys a device that depends on the format's device
- destroy_pv_format = ActionDestroyFormat(sdc1)
- destroy_lv_format = ActionDestroyFormat(testlv)
- destroy_lv = ActionDestroyDevice(testlv)
- self.assertEqual(destroy_pv_format.requires(destroy_lv), True)
- self.assertEqual(destroy_lv.requires(destroy_pv_format), False)
-
- # ActionDestroyFormat
- # an action that destroys a format should require an action that
- # destroys a format on a device that depends on the first format's
- # device
- self.assertEqual(destroy_pv_format.requires(destroy_lv_format), True)
- self.assertEqual(destroy_lv_format.requires(destroy_pv_format), False)
-
- def testActionSorting(self, *args, **kwargs):
- """ Verify correct functioning of action sorting. """
- pass
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(DeviceActionTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
-
diff --git a/tests/storage_test/devicelibs_test/Makefile.am b/tests/storage_test/devicelibs_test/Makefile.am
deleted file mode 100644
index eeeabbdc4..000000000
--- a/tests/storage_test/devicelibs_test/Makefile.am
+++ /dev/null
@@ -1,22 +0,0 @@
-# tests/storage/devicelibs/Makefile.am for anaconda
-#
-# Copyright (C) 2009 Red Hat, Inc.
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as published
-# by the Free Software Foundation; either version 2.1 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: David Cantrell <dcantrell@redhat.com>
-
-EXTRA_DIST = *.py
-
-MAINTAINERCLEANFILES = Makefile.in
diff --git a/tests/storage_test/devicelibs_test/baseclass.py b/tests/storage_test/devicelibs_test/baseclass.py
deleted file mode 100644
index 06adef989..000000000
--- a/tests/storage_test/devicelibs_test/baseclass.py
+++ /dev/null
@@ -1,80 +0,0 @@
-import unittest
-import os
-import subprocess
-
-
-def makeLoopDev(device_name, file_name):
- proc = subprocess.Popen(["dd", "if=/dev/zero", "of=%s" % file_name,
- "bs=1024", "count=102400"],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- while True:
- (out, err) = proc.communicate()
- if proc.returncode is not None:
- rc = proc.returncode
- break
- if rc:
- raise OSError, "dd failed creating the file %s" % file_name
-
- proc = subprocess.Popen(["losetup", device_name, file_name],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- while True:
- (out, err) = proc.communicate()
- if proc.returncode is not None:
- rc = proc.returncode
- break
- if rc:
- raise OSError, "losetup failed setting up the loop device %s" % device_name
-
-def removeLoopDev(device_name, file_name):
- proc = subprocess.Popen(["losetup", "-d", device_name],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- while True:
- (out, err) = proc.communicate()
- if proc.returncode is not None:
- rc = proc.returncode
- break
- if rc:
- raise OSError, "losetup failed removing the loop device %s" % device_name
-
- os.unlink(file_name)
-
-def getFreeLoopDev():
- # There's a race condition here where another process could grab the loop
- # device losetup gives us before we have time to set it up, but that's just
- # a chance we'll have to take.
- proc = subprocess.Popen(["losetup", "-f"],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- out = None
-
- while True:
- (out, err) = proc.communicate()
- if proc.returncode is not None:
- rc = proc.returncode
- out = out.strip()
- break
-
- if rc:
- raise OSError, "losetup failed to find a free device"
-
- return out
-
-class DevicelibsTestCase(unittest.TestCase):
-
- _LOOP_DEVICES = ["/tmp/test-virtdev0", "/tmp/test-virtdev1"]
-
- def __init__(self, *args, **kwargs):
- import pyanaconda.anaconda_log
- pyanaconda.anaconda_log.init()
-
- unittest.TestCase.__init__(self, *args, **kwargs)
- self._loopMap = {}
-
- def setUp(self):
- for file in self._LOOP_DEVICES:
- dev = getFreeLoopDev()
- makeLoopDev(dev, file)
- self._loopMap[file] = dev
-
- def tearDown(self):
- for (file, dev) in self._loopMap.iteritems():
- removeLoopDev(dev, file)
diff --git a/tests/storage_test/devicelibs_test/crypto_test.py b/tests/storage_test/devicelibs_test/crypto_test.py
deleted file mode 100755
index 3e9e3a667..000000000
--- a/tests/storage_test/devicelibs_test/crypto_test.py
+++ /dev/null
@@ -1,135 +0,0 @@
-#!/usr/bin/python
-import baseclass
-import unittest
-from mock import acceptance
-
-import tempfile
-import os
-
-class CryptoTestCase(baseclass.DevicelibsTestCase):
-
- def testCrypto(self):
- _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
- _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
-
- import storage.devicelibs.crypto as crypto
-
-
- @acceptance
- def testCrypto(self):
- ##
- ## is_luks
- ##
- # pass
- self.assertEqual(crypto.is_luks(_LOOP_DEV0), -22)
- self.assertEqual(crypto.is_luks("/not/existing/device"), -22)
-
- ##
- ## luks_format
- ##
- # pass
- self.assertEqual(crypto.luks_format(_LOOP_DEV0, passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256), None)
-
- # make a key file
- handle, keyfile = tempfile.mkstemp(prefix="key", text=False)
- os.write(handle, "nobodyknows")
- os.close(handle)
-
- # format with key file
- self.assertEqual(crypto.luks_format(_LOOP_DEV1, key_file=keyfile), None)
-
- # fail
- self.assertRaises(crypto.CryptoError, crypto.luks_format, "/not/existing/device", passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256)
- # no passhprase or key file
- self.assertRaises(ValueError, crypto.luks_format, _LOOP_DEV1, cipher="aes-cbc-essiv:sha256", key_size=256)
-
- ##
- ## is_luks
- ##
- # pass
- self.assertEqual(crypto.is_luks(_LOOP_DEV0), 0) # 0 = is luks
- self.assertEqual(crypto.is_luks(_LOOP_DEV1), 0)
-
- ##
- ## luks_add_key
- ##
- # pass
- self.assertEqual(crypto.luks_add_key(_LOOP_DEV0, new_passphrase="another-secret", passphrase="secret"), None)
-
- # make another key file
- handle, new_keyfile = tempfile.mkstemp(prefix="key", text=False)
- os.write(handle, "area51")
- os.close(handle)
-
- # add new key file
- self.assertEqual(crypto.luks_add_key(_LOOP_DEV1, new_key_file=new_keyfile, key_file=keyfile), None)
-
- # fail
- self.assertRaises(crypto.CryptoError, crypto.luks_add_key, _LOOP_DEV0, new_passphrase="another-secret", passphrase="wrong-passphrase")
-
- ##
- ## luks_remove_key
- ##
- # fail
- self.assertRaises(RuntimeError, crypto.luks_remove_key, _LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase")
-
- # pass
- self.assertEqual(crypto.luks_remove_key(_LOOP_DEV0, del_passphrase="another-secret", passphrase="secret"), None)
-
- # remove key file
- self.assertEqual(crypto.luks_remove_key(LOOP_DEV1, del_key_file=new_keyfile, key_file=keyfile), None)
-
- ##
- ## luks_open
- ##
- # pass
- self.assertEqual(crypto.luks_open(_LOOP_DEV0, "crypted", passphrase="secret"), None)
- self.assertEqual(crypto.luks_open(_LOOP_DEV1, "encrypted", key_file=keyfile), None)
-
- # fail
- self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", passphrase="secret")
- self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", key_file=keyfile)
- # no passhprase or key file
- self.assertRaises(ValueError, crypto.luks_open, _LOOP_DEV1, "another-crypted")
-
- ##
- ## luks_status
- ##
- # pass
- self.assertEqual(crypto.luks_status("crypted"), True)
- self.assertEqual(crypto.luks_status("encrypted"), True)
- self.assertEqual(crypto.luks_status("another-crypted"), False)
-
- ##
- ## luks_uuid
- ##
- # pass
- uuid = crypto.luks_uuid(_LOOP_DEV0)
- self.assertEqual(crypto.luks_uuid(_LOOP_DEV0), uuid)
- uuid = crypto.luks_uuid(_LOOP_DEV1)
- self.assertEqual(crypto.luks_uuid(_LOOP_DEV1), uuid)
-
- ##
- ## luks_close
- ##
- # pass
- self.assertEqual(crypto.luks_close("crypted"), None)
- self.assertEqual(crypto.luks_close("encrypted"), None)
-
- # fail
- self.assertRaises(crypto.CryptoError, crypto.luks_close, "wrong-name")
- # already closed
- self.assertRaises(crypto.CryptoError, crypto.luks_close, "crypted")
- self.assertRaises(crypto.CryptoError, crypto.luks_close, "encrypted")
-
- # cleanup
- os.unlink(keyfile)
- os.unlink(new_keyfile)
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(CryptoTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/devicelibs_test/edd_test.py b/tests/storage_test/devicelibs_test/edd_test.py
deleted file mode 100644
index 449395508..000000000
--- a/tests/storage_test/devicelibs_test/edd_test.py
+++ /dev/null
@@ -1,212 +0,0 @@
-import mock
-
-class EddTestCase(mock.TestCase):
- def setUp(self):
- self.setupModules(
- ['_isys', 'logging', 'pyanaconda.anaconda_log', 'block'])
-
- def tearDown(self):
- self.tearDownModules()
-
- def test_biosdev_to_edd_dir(self):
- from pyanaconda.storage.devicelibs import edd
- path = edd.biosdev_to_edd_dir(138)
- self.assertEqual("/sys/firmware/edd/int13_dev8a", path)
-
- def test_collect_edd_data(self):
- from pyanaconda.storage.devicelibs import edd
-
- # test with vda, vdb
- fs = EddTestFS(self, edd).vda_vdb()
- edd_dict = edd.collect_edd_data()
- self.assertEqual(len(edd_dict), 2)
- self.assertEqual(edd_dict[0x80].type, "SCSI")
- self.assertEqual(edd_dict[0x80].scsi_id, 0)
- self.assertEqual(edd_dict[0x80].scsi_lun, 0)
- self.assertEqual(edd_dict[0x80].pci_dev, "00:05.0")
- self.assertEqual(edd_dict[0x80].channel, 0)
- self.assertEqual(edd_dict[0x80].sectors, 16777216)
- self.assertEqual(edd_dict[0x81].pci_dev, "00:06.0")
-
- # test with sda, vda
- fs = EddTestFS(self, edd).sda_vda()
- edd_dict = edd.collect_edd_data()
- self.assertEqual(len(edd_dict), 2)
- self.assertEqual(edd_dict[0x80].type, "ATA")
- self.assertEqual(edd_dict[0x80].scsi_id, None)
- self.assertEqual(edd_dict[0x80].scsi_lun, None)
- self.assertEqual(edd_dict[0x80].pci_dev, "00:01.1")
- self.assertEqual(edd_dict[0x80].channel, 0)
- self.assertEqual(edd_dict[0x80].sectors, 2097152)
- self.assertEqual(edd_dict[0x80].ata_device, 0)
- self.assertEqual(edd_dict[0x80].mbr_signature, "0x000ccb01")
-
- def test_collect_edd_data_cciss(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_cciss()
- edd_dict = edd.collect_edd_data()
-
- self.assertEqual(edd_dict[0x80].pci_dev, None)
- self.assertEqual(edd_dict[0x80].channel, None)
-
- def test_edd_entry_str(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda()
- edd_dict = edd.collect_edd_data()
- expected_output = """\ttype: ATA, ata_device: 0
-\tchannel: 0, mbr_signature: 0x000ccb01
-\tpci_dev: 00:01.1, scsi_id: None
-\tscsi_lun: None, sectors: 2097152"""
- self.assertEqual(str(edd_dict[0x80]), expected_output)
-
- def test_matcher_device_path(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda()
- edd_dict = edd.collect_edd_data()
-
- analyzer = edd.EddMatcher(edd_dict[0x80])
- path = analyzer.devname_from_pci_dev()
- self.assertEqual(path, "sda")
-
- analyzer = edd.EddMatcher(edd_dict[0x81])
- path = analyzer.devname_from_pci_dev()
- self.assertEqual(path, "vda")
-
- def test_bad_device_path(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda_no_pcidev()
- edd_dict = edd.collect_edd_data()
-
- analyzer = edd.EddMatcher(edd_dict[0x80])
- path = analyzer.devname_from_pci_dev()
- self.assertEqual(path, None)
-
- def test_bad_host_bus(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda_no_host_bus()
-
- edd_dict = edd.collect_edd_data()
-
- # 0x80 entry is basted so fail without an exception
- analyzer = edd.EddMatcher(edd_dict[0x80])
- devname = analyzer.devname_from_pci_dev()
- self.assertEqual(devname, None)
-
- # but still succeed on 0x81
- analyzer = edd.EddMatcher(edd_dict[0x81])
- devname = analyzer.devname_from_pci_dev()
- self.assertEqual(devname, "vda")
-
- def test_get_edd_dict_1(self):
- """ Test get_edd_dict()'s pci_dev matching. """
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda()
- self.assertEqual(edd.get_edd_dict([]),
- {'sda' : 0x80,
- 'vda' : 0x81})
-
- def test_get_edd_dict_2(self):
- """ Test get_edd_dict()'s pci_dev matching. """
- from pyanaconda.storage.devicelibs import edd
- edd.collect_mbrs = mock.Mock(return_value = {
- 'sda' : '0x000ccb01',
- 'vda' : '0x0006aef1'})
- fs = EddTestFS(self, edd).sda_vda_missing_details()
- self.assertEqual(edd.get_edd_dict([]),
- {'sda' : 0x80,
- 'vda' : 0x81})
-
- def test_get_edd_dict_3(self):
- """ Test scenario when the 0x80 and 0x81 edd directories contain the
- same data and give no way to distinguish among the two devices.
- """
- from pyanaconda.storage.devicelibs import edd
- edd.log = mock.Mock()
- edd.collect_mbrs = mock.Mock(return_value={'sda' : '0x000ccb01',
- 'vda' : '0x0006aef1'})
- fs = EddTestFS(self, edd).sda_sdb_same()
- self.assertEqual(edd.get_edd_dict([]), {})
- self.assertIn((('edd: both edd entries 0x80 and 0x81 seem to map to sda',), {}),
- edd.log.info.call_args_list)
-
-class EddTestFS(object):
- def __init__(self, test_case, target_module):
- self.fs = mock.DiskIO()
- test_case.take_over_io(self.fs, target_module)
-
- def sda_vda_missing_details(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n"
- self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1\n"
-
- def sda_vda(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/interface"] = "ATA device: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n"
- self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n"
-
- self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:05.0 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/interface"] = "SCSI id: 0 lun: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1\n"
- self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "16777216\n"
-
- self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block"] = self.fs.Dir()
- self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block/sda"] = self.fs.Dir()
-
- self.fs["/sys/devices/pci0000:00/0000:00:05.0/virtio2/block"] = self.fs.Dir()
- self.fs["/sys/devices/pci0000:00/0000:00:05.0/virtio2/block/vda"] = self.fs.Dir()
-
- return self.fs
-
- def sda_vda_no_pcidev(self):
- self.sda_vda()
- entries = [e for e in self.fs.fs if e.startswith("/sys/devices/pci")]
- map(self.fs.os_remove, entries)
- return self.fs
-
- def sda_vda_no_host_bus(self):
- self.sda_vda()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: \n"
- self.fs.os_remove("/sys/firmware/edd/int13_dev80/mbr_signature")
- self.fs.os_remove("/sys/firmware/edd/int13_dev81/mbr_signature")
-
- def sda_cciss(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCIX 05:00.0 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/interface"] = "RAID identity_tag: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n"
- self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n"
-
- return self.fs
-
- def vda_vdb(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:05.0 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/interface"] = "SCSI id: 0 lun: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "16777216\n"
-
- self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:06.0 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/interface"] = "SCSI id: 0 lun: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "4194304\n"
-
- return self.fs
-
- def sda_sdb_same(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/interface"] = "ATA device: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01"
- self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n"
-
- self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:01.1 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/interface"] = "ATA device: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1"
- self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "2097152\n"
-
- self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block"] = self.fs.Dir()
- self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block/sda"] = self.fs.Dir()
diff --git a/tests/storage_test/devicelibs_test/lvm_test.py b/tests/storage_test/devicelibs_test/lvm_test.py
deleted file mode 100755
index e639a2137..000000000
--- a/tests/storage_test/devicelibs_test/lvm_test.py
+++ /dev/null
@@ -1,239 +0,0 @@
-#!/usr/bin/python
-import baseclass
-import unittest
-from mock import acceptance
-
-class LVMTestCase(baseclass.DevicelibsTestCase):
-
- def testLVM(self):
- _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
- _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
-
- import storage.devicelibs.lvm as lvm
-
-
- @acceptance
- def testLVM(self):
- ##
- ## pvcreate
- ##
- # pass
- for dev, file in self._loopMap.iteritems():
- self.assertEqual(lvm.pvcreate(dev), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.pvcreate, "/not/existing/device")
-
- ##
- ## pvresize
- ##
- # pass
- for dev, file in self._loopMap.iteritems():
- self.assertEqual(lvm.pvresize(dev, 50), None)
- self.assertEqual(lvm.pvresize(dev, 100), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.pvresize, "/not/existing/device", 50)
-
- ##
- ## vgcreate
- ##
- # pass
- self.assertEqual(lvm.vgcreate("test-vg", [_LOOP_DEV0, _LOOP_DEV1], 4), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", ["/not/existing/device"], 4)
- # vg already exists
- self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [_LOOP_DEV0], 4)
- # pe size must be power of 2
- self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [_LOOP_DEV0], 5)
-
- ##
- ## pvremove
- ##
- # fail
- # cannot remove pv now with vg created
- self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0)
-
- ##
- ## vgdeactivate
- ##
- # pass
- self.assertEqual(lvm.vgdeactivate("test-vg"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgdeactivate, "wrong-vg-name")
-
- ##
- ## vgreduce
- ##
- # pass
- self.assertEqual(lvm.vgreduce("test-vg", [_LOOP_DEV1]), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [_LOOP_DEV1])
- self.assertRaises(lvm.LVMError, lvm.vgreduce, "test-vg", ["/not/existing/device"])
-
- ##
- ## vgactivate
- ##
- # pass
- self.assertEqual(lvm.vgactivate("test-vg"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgactivate, "wrong-vg-name")
-
- ##
- ## pvinfo
- ##
- # pass
- self.assertEqual(lvm.pvinfo(_LOOP_DEV0)["pv_name"], _LOOP_DEV0)
- # no vg
- self.assertEqual(lvm.pvinfo(_LOOP_DEV1)["pv_name"], _LOOP_DEV1)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.pvinfo, "/not/existing/device")
-
- ##
- ## vginfo
- ##
- # pass
- self.assertEqual(lvm.vginfo("test-vg")["pe_size"], "4.00")
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vginfo, "wrong-vg-name")
-
- ##
- ## lvcreate
- ##
- # pass
- self.assertEqual(lvm.lvcreate("test-vg", "test-lv", 10), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvcreate, "wrong-vg-name", "another-lv", 10)
-
- ##
- ## lvdeactivate
- ##
- # pass
- self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "test-vg", "wrong-lv-name")
- self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "test-lv")
- self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "wrong-lv-name")
-
- ##
- ## lvresize
- ##
- # pass
- self.assertEqual(lvm.lvresize("test-vg", "test-lv", 60), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "wrong-lv-name", 80)
- self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "test-lv", 80)
- self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "wrong-lv-name", 80)
- # changing to same size
- self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "test-lv", 60)
-
- ##
- ## lvactivate
- ##
- # pass
- self.assertEqual(lvm.lvactivate("test-vg", "test-lv"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvactivate, "test-vg", "wrong-lv-name")
- self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "test-lv")
- self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "wrong-lv-name")
-
- ##
- ## lvs
- ##
- # pass
- self.assertEqual(lvm.lvs("test-vg")["test-lv"]["size"], "60.00")
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvs, "wrong-vg-name")
-
- ##
- ## has_lvm
- ##
- # pass
- self.assertEqual(lvm.has_lvm(), True)
-
- # fail
- # TODO
-
- ##
- ## lvremove
- ##
- # pass
- self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None) # is deactivation needed?
- self.assertEqual(lvm.lvremove("test-vg", "test-lv"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "wrong-lv-name")
- self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "test-lv")
- self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "wrong-lv-name")
- # lv already removed
- self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "test-lv")
-
- ##
- ## vgremove
- ##
- # pass
- self.assertEqual(lvm.vgremove("test-vg"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgremove, "wrong-vg-name")
- # vg already removed
- self.assertRaises(lvm.LVMError, lvm.vgremove, "test-vg")
-
- ##
- ## pvremove
- ##
- # pass
- for dev, file in self._loopMap.iteritems():
- self.assertEqual(lvm.pvremove(dev), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.pvremove, "/not/existing/device")
- # pv already removed
- self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0)
-
- #def testGetPossiblePhysicalExtents(self):
- # pass
- self.assertEqual(lvm.getPossiblePhysicalExtents(4),
- filter(lambda pe: pe > 4, map(lambda power: 2**power, xrange(3, 25))))
- self.assertEqual(lvm.getPossiblePhysicalExtents(100000),
- filter(lambda pe: pe > 100000, map(lambda power: 2**power, xrange(3, 25))))
-
- #def testGetMaxLVSize(self):
- # pass
- self.assertEqual(lvm.getMaxLVSize(), 16*1024**2)
-
- #def testSafeLVMName(self):
- # pass
- self.assertEqual(lvm.safeLvmName("/strange/lv*name5"), "strange_lvname5")
-
- #def testClampSize(self):
- # pass
- self.assertEqual(lvm.clampSize(10, 4), 8L)
- self.assertEqual(lvm.clampSize(10, 4, True), 12L)
-
- #def testVGUsedSpace(self):
- # TODO
- pass
-
- #def testVGFreeSpace(self):
- # TODO
- pass
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(LVMTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/devicelibs_test/mdraid_test.py b/tests/storage_test/devicelibs_test/mdraid_test.py
deleted file mode 100755
index 9083bd162..000000000
--- a/tests/storage_test/devicelibs_test/mdraid_test.py
+++ /dev/null
@@ -1,114 +0,0 @@
-#!/usr/bin/python
-import baseclass
-import unittest
-import time
-from mock import acceptance
-
-class MDRaidTestCase(baseclass.DevicelibsTestCase):
-
- def testMDRaid(self):
- _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
- _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
-
- import storage.devicelibs.mdraid as mdraid
-
- @acceptance
- def testMDRaid(self):
- ##
- ## getRaidLevels
- ##
- # pass
- self.assertEqual(mdraid.getRaidLevels(), mdraid.getRaidLevels())
-
- ##
- ## get_raid_min_members
- ##
- # pass
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID0), 2)
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID1), 2)
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID5), 3)
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID6), 4)
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID10), 2)
-
- # fail
- # unsupported raid
- self.assertRaises(ValueError, mdraid.get_raid_min_members, 8)
-
- ##
- ## get_raid_max_spares
- ##
- # pass
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID0, 5), 0)
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID1, 5), 3)
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID5, 5), 2)
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID6, 5), 1)
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID10, 5), 3)
-
- # fail
- # unsupported raid
- self.assertRaises(ValueError, mdraid.get_raid_max_spares, 8, 5)
-
- ##
- ## mdcreate
- ##
- # pass
- self.assertEqual(mdraid.mdcreate("/dev/md0", 1, [_LOOP_DEV0, _LOOP_DEV1]), None)
- # wait for raid to settle
- time.sleep(2)
-
- # fail
- self.assertRaises(mdraid.MDRaidError, mdraid.mdcreate, "/dev/md1", 1, ["/not/existing/dev0", "/not/existing/dev1"])
-
- ##
- ## mddeactivate
- ##
- # pass
- self.assertEqual(mdraid.mddeactivate("/dev/md0"), None)
-
- # fail
- self.assertRaises(mdraid.MDRaidError, mdraid.mddeactivate, "/not/existing/md")
-
- ##
- ## mdadd
- ##
- # pass
- # TODO
-
- # fail
- self.assertRaises(mdraid.MDRaidError, mdraid.mdadd, "/not/existing/device")
-
- ##
- ## mdactivate
- ##
- # pass
- self.assertEqual(mdraid.mdactivate("/dev/md0", [_LOOP_DEV0, _LOOP_DEV1], super_minor=0), None)
- # wait for raid to settle
- time.sleep(2)
-
- # fail
- self.assertRaises(mdraid.MDRaidError, mdraid.mdactivate, "/not/existing/md", super_minor=1)
- # requires super_minor or uuid
- self.assertRaises(ValueError, mdraid.mdactivate, "/dev/md1")
-
- ##
- ## mddestroy
- ##
- # pass
- # deactivate first
- self.assertEqual(mdraid.mddeactivate("/dev/md0"), None)
-
- self.assertEqual(mdraid.mddestroy(_LOOP_DEV0), None)
- self.assertEqual(mdraid.mddestroy(_LOOP_DEV1), None)
-
- # fail
- # not a component
- self.assertRaises(mdraid.MDRaidError, mdraid.mddestroy, "/dev/md0")
- self.assertRaises(mdraid.MDRaidError, mdraid.mddestroy, "/not/existing/device")
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(MDRaidTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/devicelibs_test/mpath_test.py b/tests/storage_test/devicelibs_test/mpath_test.py
deleted file mode 100755
index 5a160d981..000000000
--- a/tests/storage_test/devicelibs_test/mpath_test.py
+++ /dev/null
@@ -1,86 +0,0 @@
-#!/usr/bin/python
-import mock
-
-class MPathTestCase(mock.TestCase):
-
- # creating devices, user_friendly_names set to yes
- output1 = """\
-create: mpathb (1ATA ST3120026AS 5M) undef ATA,ST3120026AS
-size=112G features='0' hwhandler='0' wp=undef
-`-+- policy='round-robin 0' prio=1 status=undef
- `- 2:0:0:0 sda 8:0 undef ready running
-create: mpatha (36006016092d21800703762872c60db11) undef DGC,RAID 5
-size=10G features='1 queue_if_no_path' hwhandler='1 emc' wp=undef
-`-+- policy='round-robin 0' prio=2 status=undef
- |- 6:0:0:0 sdb 8:16 undef ready running
- `- 7:0:0:0 sdc 8:32 undef ready running\
-"""
-
- # listing existing devices, user_friendly_names set to yes
- output2 = """\
-mpathb (3600a0b800067fcc9000001f34d23ff88) dm-1 IBM,1726-4xx FAStT
-size=100G features='0' hwhandler='1 rdac' wp=rw
-`-+- policy='round-robin 0' prio=-1 status=active
- |- 1:0:0:0 sda 8:0 active undef running
- `- 2:0:0:0 sdc 8:32 active undef running
-mpatha (3600a0b800067fabc000067694d23fe6e) dm-0 IBM,1726-4xx FAStT
-size=100G features='0' hwhandler='1 rdac' wp=rw
-`-+- policy='round-robin 0' prio=-1 status=active
- |- 1:0:0:1 sdb 8:16 active undef running
- `- 2:0:0:1 sdd 8:48 active undef running
-"""
-
- # creating devices, user_friendly_names set to no
- output3 = """\
-create: 3600a0b800067fabc000067694d23fe6e undef IBM,1726-4xx FAStT
-size=100G features='1 queue_if_no_path' hwhandler='1 rdac' wp=undef
-`-+- policy='round-robin 0' prio=6 status=undef
- |- 1:0:0:1 sdb 8:16 undef ready running
- `- 2:0:0:1 sdd 8:48 undef ready running
-create: 3600a0b800067fcc9000001f34d23ff88 undef IBM,1726-4xx FAStT
-size=100G features='1 queue_if_no_path' hwhandler='1 rdac' wp=undef
-`-+- policy='round-robin 0' prio=3 status=undef
- |- 1:0:0:0 sda 8:0 undef ready running
- `- 2:0:0:0 sdc 8:32 undef ready running\
-"""
-
- # listing existing devices, user_friendly_names set to no
- output4 = """\
-3600a0b800067fcc9000001f34d23ff88 dm-1 IBM,1726-4xx FAStT
-size=100G features='0' hwhandler='1 rdac' wp=rw
-`-+- policy='round-robin 0' prio=-1 status=active
- |- 1:0:0:0 sda 8:0 active undef running
- `- 2:0:0:0 sdc 8:32 active undef running
-3600a0b800067fabc000067694d23fe6e dm-0 IBM,1726-4xx FAStT
-size=100G features='0' hwhandler='1 rdac' wp=rw
-`-+- policy='round-robin 0' prio=-1 status=active
- |- 1:0:0:1 sdb 8:16 active undef running
- `- 2:0:0:1 sdd 8:48 active undef running
-"""
-
- def setUp(self):
- self.setupModules(
- ['_isys', 'logging', 'anaconda_log', 'block'])
-
- def tearDown(self):
- self.tearDownModules()
-
- def testParse(self):
- from pyanaconda.storage.devicelibs import mpath
- topology = mpath.parseMultipathOutput(self.output1)
- self.assertEqual(topology,
- {'mpatha':['sdb','sdc'], 'mpathb':['sda']})
- topology = mpath.parseMultipathOutput(self.output2)
- self.assertEqual(topology,
- {'mpathb':['sda','sdc'], 'mpatha':['sdb', 'sdd']})
- topology = mpath.parseMultipathOutput(self.output3)
- self.assertEqual(topology,
- {'3600a0b800067fabc000067694d23fe6e' : ['sdb','sdd'],
- '3600a0b800067fcc9000001f34d23ff88' : ['sda', 'sdc']})
- topology = mpath.parseMultipathOutput(self.output4)
- self.assertEqual(topology,
- {'3600a0b800067fabc000067694d23fe6e' : ['sdb','sdd'],
- '3600a0b800067fcc9000001f34d23ff88' : ['sda', 'sdc']})
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(MPathTestCase)
diff --git a/tests/storage_test/devicelibs_test/swap_test.py b/tests/storage_test/devicelibs_test/swap_test.py
deleted file mode 100755
index 3808943cc..000000000
--- a/tests/storage_test/devicelibs_test/swap_test.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#!/usr/bin/python
-import baseclass
-import unittest
-from mock import acceptance
-
-class SwapTestCase(baseclass.DevicelibsTestCase):
-
- def testSwap(self):
- _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
- _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
-
- import storage.devicelibs.swap as swap
-
- @acceptance
- def testSwap(self):
- ##
- ## mkswap
- ##
- # pass
- self.assertEqual(swap.mkswap(_LOOP_DEV0, "swap"), None)
-
- # fail
- self.assertRaises(swap.SwapError, swap.mkswap, "/not/existing/device")
-
- ##
- ## swapon
- ##
- # pass
- self.assertEqual(swap.swapon(_LOOP_DEV0, 1), None)
-
- # fail
- self.assertRaises(swap.SwapError, swap.swapon, "/not/existing/device")
- # not a swap partition
- self.assertRaises(swap.SwapError, swap.swapon, _LOOP_DEV1)
-
- # pass
- # make another swap
- self.assertEqual(swap.mkswap(_LOOP_DEV1, "another-swap"), None)
- self.assertEqual(swap.swapon(_LOOP_DEV1), None)
-
- ##
- ## swapstatus
- ##
- # pass
- self.assertEqual(swap.swapstatus(_LOOP_DEV0), True)
- self.assertEqual(swap.swapstatus(_LOOP_DEV1), True)
-
- # does not fail
- self.assertEqual(swap.swapstatus("/not/existing/device"), False)
-
- ##
- ## swapoff
- ##
- # pass
- self.assertEqual(swap.swapoff(_LOOP_DEV1), None)
-
- # check status
- self.assertEqual(swap.swapstatus(_LOOP_DEV0), True)
- self.assertEqual(swap.swapstatus(_LOOP_DEV1), False)
-
- self.assertEqual(swap.swapoff(_LOOP_DEV0), None)
-
- # fail
- self.assertRaises(swap.SwapError, swap.swapoff, "/not/existing/device")
- # already off
- self.assertRaises(swap.SwapError, swap.swapoff, _LOOP_DEV0)
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(SwapTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/partitioning_test.py b/tests/storage_test/partitioning_test.py
deleted file mode 100644
index 0a8cc75a5..000000000
--- a/tests/storage_test/partitioning_test.py
+++ /dev/null
@@ -1,130 +0,0 @@
-#!/usr/bin/python
-
-import unittest
-from mock import Mock
-
-import parted
-
-import pyanaconda.anaconda_log
-pyanaconda.anaconda_log.init()
-
-from pyanaconda.storage.partitioning import getNextPartitionType
-
-# disklabel-type-specific constants
-# keys: disklabel type string
-# values: 3-tuple of (max_primary_count, supports_extended, max_logical_count)
-disklabel_types = {'dos': (4, True, 11),
- 'gpt': (128, False, 0),
- 'mac': (62, False, 0)}
-
-class PartitioningTestCase(unittest.TestCase):
- def getDisk(self, disk_type, primary_count=0,
- has_extended=False, logical_count=0):
- """ Return a mock representing a parted.Disk. """
- disk = Mock()
-
- disk.type = disk_type
- label_type_info = disklabel_types[disk_type]
- (max_primaries, supports_extended, max_logicals) = label_type_info
-
- # primary partitions
- disk.primaryPartitionCount = primary_count
- disk.maxPrimaryPartitionCount = max_primaries
-
- # extended partitions
- disk.supportsFeature = Mock(return_value=supports_extended)
- disk.getExtendedPartition = Mock(return_value=has_extended)
-
- # logical partitions
- disk.getMaxLogicalPartitions = Mock(return_value=max_logicals)
- disk.getLogicalPartitions = Mock(return_value=[0]*logical_count)
-
- return disk
-
- def testNextPartitionType(self):
- #
- # DOS
- #
-
- # empty disk, any type
- disk = self.getDisk(disk_type="dos")
- self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL)
-
- # three primaries and no extended -> extended
- disk = self.getDisk(disk_type="dos", primary_count=3)
- self.assertEqual(getNextPartitionType(disk), parted.PARTITION_EXTENDED)
-
- # three primaries and an extended -> primary
- disk = self.getDisk(disk_type="dos", primary_count=3, has_extended=True)
- self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL)
-
- # three primaries and an extended w/ no_primary -> logical
- disk = self.getDisk(disk_type="dos", primary_count=3, has_extended=True)
- self.assertEqual(getNextPartitionType(disk, no_primary=True),
- parted.PARTITION_LOGICAL)
-
- # four primaries and an extended, available logical -> logical
- disk = self.getDisk(disk_type="dos", primary_count=4, has_extended=True,
- logical_count=9)
- self.assertEqual(getNextPartitionType(disk), parted.PARTITION_LOGICAL)
-
- # four primaries and an extended, no available logical -> None
- disk = self.getDisk(disk_type="dos", primary_count=4, has_extended=True,
- logical_count=11)
- self.assertEqual(getNextPartitionType(disk), None)
-
- # four primaries and no extended -> None
- disk = self.getDisk(disk_type="dos", primary_count=4,
- has_extended=False)
- self.assertEqual(getNextPartitionType(disk), None)
-
- # free primary slot, extended, no free logical slot -> primary
- disk = self.getDisk(disk_type="dos", primary_count=3, has_extended=True,
- logical_count=11)
- self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL)
-
- # free primary slot, extended, no free logical slot w/ no_primary
- # -> None
- disk = self.getDisk(disk_type="dos", primary_count=3, has_extended=True,
- logical_count=11)
- self.assertEqual(getNextPartitionType(disk, no_primary=True), None)
-
- #
- # GPT
- #
-
- # empty disk, any partition type
- disk = self.getDisk(disk_type="gpt")
- self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL)
-
- # no empty slots -> None
- disk = self.getDisk(disk_type="gpt", primary_count=128)
- self.assertEqual(getNextPartitionType(disk), None)
-
- # no_primary -> None
- disk = self.getDisk(disk_type="gpt")
- self.assertEqual(getNextPartitionType(disk, no_primary=True), None)
-
- #
- # MAC
- #
-
- # empty disk, any partition type
- disk = self.getDisk(disk_type="mac")
- self.assertEqual(getNextPartitionType(disk), parted.PARTITION_NORMAL)
-
- # no empty slots -> None
- disk = self.getDisk(disk_type="mac", primary_count=62)
- self.assertEqual(getNextPartitionType(disk), None)
-
- # no_primary -> None
- disk = self.getDisk(disk_type="mac")
- self.assertEqual(getNextPartitionType(disk, no_primary=True), None)
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(PartitioningTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/size_test.py b/tests/storage_test/size_test.py
deleted file mode 100644
index 4f002f298..000000000
--- a/tests/storage_test/size_test.py
+++ /dev/null
@@ -1,86 +0,0 @@
-#!/usr/bin/python
-#
-# tests/storage/size_tests.py
-# Size test cases for the pyanaconda.storage module
-#
-# Copyright (C) 2010 Red Hat, Inc.
-#
-# This copyrighted material is made available to anyone wishing to use,
-# modify, copy, or redistribute it subject to the terms and conditions of
-# the GNU General Public License v.2, or (at your option) any later version.
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY expressed or implied, including the implied warranties of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
-# Public License for more details. You should have received a copy of the
-# GNU General Public License along with this program; if not, write to the
-# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
-# source code or documentation are not subject to the GNU General Public
-# License and may only be used or replicated with the express permission of
-# Red Hat, Inc.
-#
-# Red Hat Author(s): David Cantrell <dcantrell@redhat.com>
-
-import unittest
-
-from pyanaconda import anaconda_log
-anaconda_log.init()
-from pyanaconda.storage.errors import *
-from pyanaconda.storage.size import Size, _prefixes
-
-class SizeTestCase(unittest.TestCase):
- def testExceptions(self):
- self.assertRaises(SizeParamsError, Size)
- self.assertRaises(SizeParamsError, Size, bytes=500, spec="45GB")
-
- self.assertRaises(SizeNotPositiveError, Size, bytes=-1)
-
- self.assertRaises(SizeNotPositiveError, Size, spec="0")
- self.assertRaises(SizeNotPositiveError, Size, spec="-1 TB")
- self.assertRaises(SizeNotPositiveError, Size, spec="-47kb")
-
- s = Size(bytes=500)
- self.assertRaises(SizePlacesError, s.humanReadable, places=0)
-
- def _prefixTestHelper(self, bytes, factor, prefix, abbr):
- c = bytes * factor
-
- s = Size(bytes=c)
- self.assertEquals(s, c)
-
- if prefix:
- u = "%sbytes" % prefix
- s = Size(spec="%ld %s" % (bytes, u))
- self.assertEquals(s, c)
- self.assertEquals(s.convertTo(spec=u), bytes)
-
- if abbr:
- u = "%sb" % abbr
- s = Size(spec="%ld %s" % (bytes, u))
- self.assertEquals(s, c)
- self.assertEquals(s.convertTo(spec=u), bytes)
-
- if not prefix and not abbr:
- s = Size(spec="%ld" % bytes)
- self.assertEquals(s, c)
- self.assertEquals(s.convertTo(), bytes)
-
- def testPrefixes(self):
- bytes = 47L
- self._prefixTestHelper(bytes, 1, None, None)
-
- for factor, prefix, abbr in _prefixes:
- self._prefixTestHelper(bytes, factor, prefix, abbr)
-
- def testHumanReadable(self):
- s = Size(bytes=58929971L)
- self.assertEquals(s.humanReadable(), "58.9 Mb")
-
- s = Size(bytes=478360371L)
- self.assertEquals(s.humanReadable(), "0.48 Gb")
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(SizeTestCase)
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/storagetestcase.py b/tests/storage_test/storagetestcase.py
deleted file mode 100644
index 8e5df1db4..000000000
--- a/tests/storage_test/storagetestcase.py
+++ /dev/null
@@ -1,287 +0,0 @@
-#!/usr/bin/python
-
-import unittest
-from mock import Mock
-from mock import TestCase
-
-import parted
-
-import pyanaconda.anaconda_log
-pyanaconda.anaconda_log.init()
-
-import pyanaconda.iutil
-import pyanaconda.storage as storage
-from pyanaconda.storage.formats import getFormat
-
-# device classes for brevity's sake -- later on, that is
-from pyanaconda.storage.devices import StorageDevice
-from pyanaconda.storage.devices import DiskDevice
-from pyanaconda.storage.devices import PartitionDevice
-from pyanaconda.storage.devices import MDRaidArrayDevice
-from pyanaconda.storage.devices import DMDevice
-from pyanaconda.storage.devices import LUKSDevice
-from pyanaconda.storage.devices import LVMVolumeGroupDevice
-from pyanaconda.storage.devices import LVMLogicalVolumeDevice
-from pyanaconda.storage.devices import FileDevice
-
-
-class StorageTestCase(TestCase):
- """ StorageTestCase
-
- This is a base class for storage test cases. It sets up imports of
- the storage package, along with an Anaconda instance and a Storage
- instance. There are lots of little patches to prevent various pieces
- of code from trying to access filesystems and/or devices on the host
- system, along with a couple of convenience methods.
-
- """
- def __init__(self, *args, **kwargs):
- TestCase.__init__(self, *args, **kwargs)
-
- self.setUpAnaconda()
-
- def setUpAnaconda(self):
- pyanaconda.iutil.execWithRedirect = Mock()
- pyanaconda.iutil.execWithCapture = Mock()
- pyanaconda.iutil.execWithPulseProgress = Mock()
- pyanaconda.storage.udev = Mock()
-
- self.anaconda = Mock()
- self.setUpStorage()
-
- def setUpStorage(self):
- self.setUpDeviceLibs()
- self.storage = storage.Storage(self.anaconda)
-
- # device status
- pyanaconda.storage.devices.StorageDevice.status = False
- pyanaconda.storage.devices.DMDevice.status = False
- pyanaconda.storage.devices.LUKSDevice.status = False
- pyanaconda.storage.devices.LVMVolumeGroupDevice.status = False
- pyanaconda.storage.devices.MDRaidArrayDevice.status = False
- pyanaconda.storage.devices.FileDevice.status = False
-
- # prevent PartitionDevice from trying to dig around in the partition's
- # geometry
- pyanaconda.storage.devices.PartitionDevice._setTargetSize = StorageDevice._setTargetSize
-
- # prevent Ext2FS from trying to run resize2fs to get a filesystem's
- # minimum size
- storage.formats.fs.Ext2FS.minSize = storage.formats.DeviceFormat.minSize
- storage.formats.fs.FS.migratable = storage.formats.DeviceFormat.migratable
-
- def setUpDeviceLibs(self):
- # devicelibs shouldn't be touching or looking at the host system
-
- # lvm is easy because all calls to /sbin/lvm are via lvm()
- storage.devicelibs.lvm.lvm = Mock()
-
- # mdraid is easy because all calls to /sbin/mdadm are via mdadm()
- storage.devicelibs.mdraid.mdadm = Mock()
-
- # swap
- storage.devicelibs.swap.swapstatus = Mock(return_value=False)
- storage.devicelibs.swap.swapon = Mock()
- storage.devicelibs.swap.swapoff = Mock()
-
- # dm
- storage.devicelibs.dm = Mock()
-
- # crypto/luks
- storage.devicelibs.crypto.luks_status = Mock(return_value=False)
- storage.devicelibs.crypto.luks_uuid = Mock()
- storage.devicelibs.crypto.luks_format = Mock()
- storage.devicelibs.crypto.luks_open = Mock()
- storage.devicelibs.crypto.luks_close = Mock()
- storage.devicelibs.crypto.luks_add_key = Mock()
- storage.devicelibs.crypto.luks_remove_key = Mock()
-
- # this list would normally be obtained by parsing /proc/mdstat
- storage.devicelibs.mdraid.raid_levels = \
- [storage.devicelibs.mdraid.RAID10,
- storage.devicelibs.mdraid.RAID0,
- storage.devicelibs.mdraid.RAID1,
- storage.devicelibs.mdraid.RAID4,
- storage.devicelibs.mdraid.RAID5,
- storage.devicelibs.mdraid.RAID6]
-
- def newDevice(*args, **kwargs):
- """ Return a new Device instance suitable for testing. """
- args = args[1:] # drop self arg
- device_class = kwargs.pop("device_class")
- exists = kwargs.pop("exists", False)
- part_type = kwargs.pop("part_type", parted.PARTITION_NORMAL)
- device = device_class(*args, **kwargs)
-
- if exists:
- # set up mock parted.Device w/ correct size
- device._partedDevice = Mock()
- device._partedDevice.getSize = Mock(return_value=float(device.size))
- device._partedDevice.sectorSize = 512
-
- if isinstance(device, pyanaconda.storage.devices.PartitionDevice):
- #if exists:
- # device.parents = device.req_disks
- device.parents = device.req_disks
-
- partedPartition = Mock()
-
- if device.disk:
- part_num = device.name[len(device.disk.name):].split("p")[-1]
- partedPartition.number = int(part_num)
-
- partedPartition.type = part_type
- partedPartition.path = device.path
- partedPartition.getDeviceNodeName = Mock(return_value=device.name)
- partedPartition.getSize = Mock(return_value=float(device.size))
- device._partedPartition = partedPartition
-
- device.exists = exists
- device.format.exists = exists
-
- if isinstance(device, pyanaconda.storage.devices.PartitionDevice):
- # PartitionDevice.probe sets up data needed for resize operations
- device.probe()
-
- return device
-
- def newFormat(*args, **kwargs):
- """ Return a new DeviceFormat instance suitable for testing.
-
- Keyword Arguments:
-
- device_instance - StorageDevice instance this format will be
- created on. This is needed for setup of
- resizable formats.
-
- All other arguments are passed directly to
- pyanaconda.storage.formats.getFormat.
- """
- args = args[1:] # drop self arg
- exists = kwargs.pop("exists", False)
- device_instance = kwargs.pop("device_instance", None)
- format = getFormat(*args, **kwargs)
- if isinstance(format, storage.formats.disklabel.DiskLabel):
- format._partedDevice = Mock()
- format._partedDisk = Mock()
-
- format.exists = exists
-
- if format.resizable and device_instance:
- format._size = device_instance.currentSize
-
- return format
-
- def destroyAllDevices(self, disks=None):
- """ Remove all devices from the devicetree.
-
- Keyword Arguments:
-
- disks - a list of names of disks to remove partitions from
-
- Note: this is largely ripped off from partitioning.clearPartitions.
-
- """
- partitions = self.storage.partitions
-
- # Sort partitions by descending partition number to minimize confusing
- # things like multiple "destroy sda5" actions due to parted renumbering
- # partitions. This can still happen through the UI but it makes sense to
- # avoid it where possible.
- partitions.sort(key=lambda p: p.partedPartition.number, reverse=True)
- for part in partitions:
- if disks and part.disk.name not in disks:
- continue
-
- devices = self.storage.deviceDeps(part)
- while devices:
- leaves = [d for d in devices if d.isleaf]
- for leaf in leaves:
- self.storage.destroyDevice(leaf)
- devices.remove(leaf)
-
- self.storage.destroyDevice(part)
-
- def scheduleCreateDevice(self, *args, **kwargs):
- """ Schedule an action to create the specified device.
-
- Verify that the device is not already in the tree and that the
- act of scheduling/registering the action also adds the device to
- the tree.
-
- Return the DeviceAction instance.
- """
- device = kwargs.pop("device")
- if hasattr(device, "req_disks") and \
- len(device.req_disks) == 1 and \
- not device.parents:
- device.parents = device.req_disks
-
- devicetree = self.storage.devicetree
-
- self.assertEqual(devicetree.getDeviceByName(device.name), None)
- action = storage.deviceaction.ActionCreateDevice(device)
- devicetree.registerAction(action)
- self.assertEqual(devicetree.getDeviceByName(device.name), device)
- return action
-
- def scheduleDestroyDevice(self, *args, **kwargs):
- """ Schedule an action to destroy the specified device.
-
- Verify that the device exists initially and that the act of
- scheduling/registering the action also removes the device from
- the tree.
-
- Return the DeviceAction instance.
- """
- device = kwargs.pop("device")
- devicetree = self.storage.devicetree
-
- self.assertEqual(devicetree.getDeviceByName(device.name), device)
- action = storage.deviceaction.ActionDestroyDevice(device)
- devicetree.registerAction(action)
- self.assertEqual(devicetree.getDeviceByName(device.name), None)
- return action
-
- def scheduleCreateFormat(self, *args, **kwargs):
- """ Schedule an action to write a new format to a device.
-
- Verify that the device is already in the tree, that it is not
- already set up to contain the specified format, and that the act
- of registering/scheduling the action causes the new format to be
- reflected in the tree.
-
- Return the DeviceAction instance.
- """
- device = kwargs.pop("device")
- format = kwargs.pop("format")
- devicetree = self.storage.devicetree
-
- self.assertNotEqual(device.format, format)
- self.assertEqual(devicetree.getDeviceByName(device.name), device)
- action = storage.deviceaction.ActionCreateFormat(device, format)
- devicetree.registerAction(action)
- _device = devicetree.getDeviceByName(device.name)
- self.assertEqual(_device.format, format)
- return action
-
- def scheduleDestroyFormat(self, *args, **kwargs):
- """ Schedule an action to remove a format from a device.
-
- Verify that the device is already in the tree and that the act
- of registering/scheduling the action causes the new format to be
- reflected in the tree.
-
- Return the DeviceAction instance.
- """
- device = kwargs.pop("device")
- devicetree = self.storage.devicetree
-
- self.assertEqual(devicetree.getDeviceByName(device.name), device)
- action = storage.deviceaction.ActionDestroyFormat(device)
- devicetree.registerAction(action)
- _device = devicetree.getDeviceByName(device.name)
- self.assertEqual(_device.format.type, None)
- return action
-
-
diff --git a/tests/storage_test/tsort_test.py b/tests/storage_test/tsort_test.py
deleted file mode 100644
index cb96997bf..000000000
--- a/tests/storage_test/tsort_test.py
+++ /dev/null
@@ -1,62 +0,0 @@
-
-import unittest
-import pyanaconda.storage.tsort
-
-class TopologicalSortTestCase(unittest.TestCase):
- def runTest(self):
- items = [1, 2, 3, 4, 5]
- edges = [(5, 4), (4, 3), (3, 2), (2, 1)]
- graph = pyanaconda.storage.tsort.create_graph(items, edges)
- self._tsortTest(graph)
-
- edges = [(5, 4), (2, 3), (1, 5)]
- graph = pyanaconda.storage.tsort.create_graph(items, edges)
- self._tsortTest(graph)
-
- edges = [(5, 4), (4, 3), (3, 2), (2, 1), (3, 5)]
- graph = pyanaconda.storage.tsort.create_graph(items, edges)
- self.failUnlessRaises(pyanaconda.storage.tsort.CyclicGraphError,
- pyanaconda.storage.tsort.tsort,
- graph)
-
- edges = [(5, 4), (4, 3), (3, 2), (2, 1), (2, 3)]
- graph = pyanaconda.storage.tsort.create_graph(items, edges)
- self.failUnlessRaises(pyanaconda.storage.tsort.CyclicGraphError,
- pyanaconda.storage.tsort.tsort,
- graph)
-
- items = ['a', 'b', 'c', 'd']
- edges = [('a', 'c'), ('c', 'b')]
- graph = pyanaconda.storage.tsort.create_graph(items, edges)
- self._tsortTest(graph)
-
- def _tsortTest(self, graph):
- def check_order(order, graph):
- # since multiple solutions can potentially exist, just verify
- # that the ordering constraints are satisfied
- for parent, child in graph['edges']:
- if order.index(parent) > order.index(child):
- return False
- return True
-
- try:
- order = pyanaconda.storage.tsort.tsort(graph)
- except Exception as e:
- self.fail(e)
-
- # verify output list is of the correct length
- self.failIf(len(order) != len(graph['items']),
- "sorted list length is incorrect")
-
- # verify that all ordering constraints are satisfied
- self.failUnless(check_order(order, graph),
- "ordering constraints not satisfied")
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(TopologicalSortTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
-
diff --git a/tests/storage_test/udev_test.py b/tests/storage_test/udev_test.py
deleted file mode 100644
index 5d48279b4..000000000
--- a/tests/storage_test/udev_test.py
+++ /dev/null
@@ -1,139 +0,0 @@
-#!/usr/bin/python
-
-import mock
-import os
-
-class UdevTest(mock.TestCase):
-
- def setUp(self):
- self.setupModules(["_isys", "block", "ConfigParser"])
- self.fs = mock.DiskIO()
-
- import pyanaconda.storage.udev
- pyanaconda.storage.udev.os = mock.Mock()
- pyanaconda.storage.udev.log = mock.Mock()
- pyanaconda.storage.udev.open = self.fs.open
-
- def tearDown(self):
- self.tearDownModules()
-
- def udev_enumerate_devices_test(self):
- import pyanaconda.storage.udev
- ENUMERATE_LIST = [
- '/sys/devices/pci0000:00/0000:00:1f.2/host0/target0:0:0/0:0:0:0/block/sda',
- '/sys/devices/virtual/block/loop0',
- '/sys/devices/virtual/block/loop1',
- '/sys/devices/virtual/block/ram0',
- '/sys/devices/virtual/block/ram1',
- '/sys/devices/virtual/block/dm-0',
- ]
-
- pyanaconda.storage.udev.global_udev.enumerate_devices = mock.Mock(return_value=ENUMERATE_LIST)
- ret = pyanaconda.storage.udev.udev_enumerate_devices()
- self.assertEqual(set(ret),
- set(['/devices/pci0000:00/0000:00:1f.2/host0/target0:0:0/0:0:0:0/block/sda',
- '/devices/virtual/block/loop0', '/devices/virtual/block/loop1',
- '/devices/virtual/block/ram0', '/devices/virtual/block/ram1',
- '/devices/virtual/block/dm-0'])
- )
-
- def udev_get_device_1_test(self):
- import pyanaconda.storage.udev
-
- class Device(object):
- def __init__(self):
- self.sysname = 'loop1'
- self.dict = {'symlinks': ['/dev/block/7:1'],
- 'SUBSYSTEM': 'block',
- 'MAJOR': '7',
- 'DEVPATH': '/devices/virtual/block/loop1',
- 'UDISKS_PRESENTATION_NOPOLICY': '1',
- 'UDEV_LOG': '3',
- 'DEVNAME': '/dev/loop1',
- 'DEVTYPE': 'disk',
- 'DEVLINKS': '/dev/block/7:1',
- 'MINOR': '1'
- }
-
- def __getitem__(self, key):
- return self.dict[key]
-
- def __setitem__(self, key, value):
- self.dict[key] = value
-
- pyanaconda.storage.udev.os.path.exists.return_value = True
- DEV_PATH = '/devices/virtual/block/loop1'
- dev = Device()
- pyanaconda.storage.udev.global_udev = mock.Mock()
- pyanaconda.storage.udev.global_udev.create_device.return_value = dev
- pyanaconda.storage.udev.udev_parse_uevent_file = mock.Mock(return_value=dev)
-
- ret = pyanaconda.storage.udev.udev_get_device(DEV_PATH)
- self.assertTrue(isinstance(ret, Device))
- self.assertEqual(ret['name'], ret.sysname)
- self.assertEqual(ret['sysfs_path'], DEV_PATH)
- self.assertTrue(pyanaconda.storage.udev.udev_parse_uevent_file.called)
-
- def udev_get_device_2_test(self):
- import pyanaconda.storage.udev
- pyanaconda.storage.udev.os.path.exists.return_value = False
- ret = pyanaconda.storage.udev.udev_get_device('')
- self.assertEqual(ret, None)
-
- def udev_get_device_3_test(self):
- import pyanaconda.storage.udev
- pyanaconda.storage.udev.os.path.exists.return_value = True
- pyanaconda.storage.udev.global_udev = mock.Mock()
- pyanaconda.storage.udev.global_udev.create_device.return_value = None
- ret = pyanaconda.storage.udev.udev_get_device('')
- self.assertEqual(ret, None)
-
- def udev_get_devices_test(self):
- import pyanaconda.storage.udev
- pyanaconda.storage.udev.udev_settle = mock.Mock()
- DEVS = \
- ['/devices/pci0000:00/0000:00:1f.2/host0/target0:0:0/0:0:0:0/block/sda',
- '/devices/virtual/block/loop0', '/devices/virtual/block/loop1',
- '/devices/virtual/block/ram0', '/devices/virtual/block/ram1',
- '/devices/virtual/block/dm-0']
- pyanaconda.storage.udev.udev_enumerate_devices = mock.Mock(return_value=DEVS)
- pyanaconda.storage.udev.udev_get_device = lambda x: x
- ret = pyanaconda.storage.udev.udev_get_devices()
- self.assertEqual(ret, DEVS)
-
- def udev_parse_uevent_file_1_test(self):
- import pyanaconda.storage.udev
- pyanaconda.storage.udev.os.path.normpath = os.path.normpath
- pyanaconda.storage.udev.os.access.return_value = True
-
- FILE_CONTENT = "MAJOR=7\nMINOR=1\nDEVNAME=loop1\nDEVTYPE=disk\n"
- self.fs.open('/sys/devices/virtual/block/loop1/uevent', 'w').write(FILE_CONTENT)
- dev = {'sysfs_path': '/devices/virtual/block/loop1'}
- ret = pyanaconda.storage.udev.udev_parse_uevent_file(dev)
- self.assertEqual(ret,
- {'sysfs_path': '/devices/virtual/block/loop1',
- 'DEVNAME': 'loop1',
- 'DEVTYPE': 'disk',
- 'MAJOR': '7',
- 'MINOR': '1'})
-
- def udev_parse_uevent_file_2_test(self):
- import pyanaconda.storage.udev
- pyanaconda.storage.udev.os.path.normpath = os.path.normpath
- pyanaconda.storage.udev.os.access.return_value = False
-
- dev = {'sysfs_path': '/devices/virtual/block/loop1'}
- ret = pyanaconda.storage.udev.udev_parse_uevent_file(dev)
- self.assertEqual(ret, {'sysfs_path': '/devices/virtual/block/loop1'})
-
- def udev_settle_test(self):
- import pyanaconda.storage.udev
- pyanaconda.storage.udev.util = mock.Mock()
- pyanaconda.storage.udev.udev_settle()
- self.assertTrue(pyanaconda.storage.udev.util.run_program.called)
-
- def udev_trigger_test(self):
- import pyanaconda.storage.udev
- pyanaconda.storage.udev.util = mock.Mock()
- pyanaconda.storage.udev.udev_trigger()
- self.assertTrue(pyanaconda.storage.udev.util.run_program.called)