diff options
Diffstat (limited to 'tests/storage_test/devicelibs_test')
-rw-r--r-- | tests/storage_test/devicelibs_test/Makefile.am | 22 | ||||
-rw-r--r-- | tests/storage_test/devicelibs_test/baseclass.py | 80 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/crypto_test.py | 135 | ||||
-rw-r--r-- | tests/storage_test/devicelibs_test/edd_test.py | 212 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/lvm_test.py | 239 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/mdraid_test.py | 114 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/mpath_test.py | 86 | ||||
-rwxr-xr-x | tests/storage_test/devicelibs_test/swap_test.py | 74 |
8 files changed, 0 insertions, 962 deletions
diff --git a/tests/storage_test/devicelibs_test/Makefile.am b/tests/storage_test/devicelibs_test/Makefile.am deleted file mode 100644 index eeeabbdc4..000000000 --- a/tests/storage_test/devicelibs_test/Makefile.am +++ /dev/null @@ -1,22 +0,0 @@ -# tests/storage/devicelibs/Makefile.am for anaconda -# -# Copyright (C) 2009 Red Hat, Inc. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published -# by the Free Software Foundation; either version 2.1 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# Author: David Cantrell <dcantrell@redhat.com> - -EXTRA_DIST = *.py - -MAINTAINERCLEANFILES = Makefile.in diff --git a/tests/storage_test/devicelibs_test/baseclass.py b/tests/storage_test/devicelibs_test/baseclass.py deleted file mode 100644 index 06adef989..000000000 --- a/tests/storage_test/devicelibs_test/baseclass.py +++ /dev/null @@ -1,80 +0,0 @@ -import unittest -import os -import subprocess - - -def makeLoopDev(device_name, file_name): - proc = subprocess.Popen(["dd", "if=/dev/zero", "of=%s" % file_name, - "bs=1024", "count=102400"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - while True: - (out, err) = proc.communicate() - if proc.returncode is not None: - rc = proc.returncode - break - if rc: - raise OSError, "dd failed creating the file %s" % file_name - - proc = subprocess.Popen(["losetup", device_name, file_name], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - while True: - (out, err) = proc.communicate() - if proc.returncode is not None: - rc = proc.returncode - break - if rc: - raise OSError, "losetup failed setting up the loop device %s" % device_name - -def removeLoopDev(device_name, file_name): - proc = subprocess.Popen(["losetup", "-d", device_name], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - while True: - (out, err) = proc.communicate() - if proc.returncode is not None: - rc = proc.returncode - break - if rc: - raise OSError, "losetup failed removing the loop device %s" % device_name - - os.unlink(file_name) - -def getFreeLoopDev(): - # There's a race condition here where another process could grab the loop - # device losetup gives us before we have time to set it up, but that's just - # a chance we'll have to take. - proc = subprocess.Popen(["losetup", "-f"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out = None - - while True: - (out, err) = proc.communicate() - if proc.returncode is not None: - rc = proc.returncode - out = out.strip() - break - - if rc: - raise OSError, "losetup failed to find a free device" - - return out - -class DevicelibsTestCase(unittest.TestCase): - - _LOOP_DEVICES = ["/tmp/test-virtdev0", "/tmp/test-virtdev1"] - - def __init__(self, *args, **kwargs): - import pyanaconda.anaconda_log - pyanaconda.anaconda_log.init() - - unittest.TestCase.__init__(self, *args, **kwargs) - self._loopMap = {} - - def setUp(self): - for file in self._LOOP_DEVICES: - dev = getFreeLoopDev() - makeLoopDev(dev, file) - self._loopMap[file] = dev - - def tearDown(self): - for (file, dev) in self._loopMap.iteritems(): - removeLoopDev(dev, file) diff --git a/tests/storage_test/devicelibs_test/crypto_test.py b/tests/storage_test/devicelibs_test/crypto_test.py deleted file mode 100755 index 3e9e3a667..000000000 --- a/tests/storage_test/devicelibs_test/crypto_test.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/python -import baseclass -import unittest -from mock import acceptance - -import tempfile -import os - -class CryptoTestCase(baseclass.DevicelibsTestCase): - - def testCrypto(self): - _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] - _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] - - import storage.devicelibs.crypto as crypto - - - @acceptance - def testCrypto(self): - ## - ## is_luks - ## - # pass - self.assertEqual(crypto.is_luks(_LOOP_DEV0), -22) - self.assertEqual(crypto.is_luks("/not/existing/device"), -22) - - ## - ## luks_format - ## - # pass - self.assertEqual(crypto.luks_format(_LOOP_DEV0, passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256), None) - - # make a key file - handle, keyfile = tempfile.mkstemp(prefix="key", text=False) - os.write(handle, "nobodyknows") - os.close(handle) - - # format with key file - self.assertEqual(crypto.luks_format(_LOOP_DEV1, key_file=keyfile), None) - - # fail - self.assertRaises(crypto.CryptoError, crypto.luks_format, "/not/existing/device", passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256) - # no passhprase or key file - self.assertRaises(ValueError, crypto.luks_format, _LOOP_DEV1, cipher="aes-cbc-essiv:sha256", key_size=256) - - ## - ## is_luks - ## - # pass - self.assertEqual(crypto.is_luks(_LOOP_DEV0), 0) # 0 = is luks - self.assertEqual(crypto.is_luks(_LOOP_DEV1), 0) - - ## - ## luks_add_key - ## - # pass - self.assertEqual(crypto.luks_add_key(_LOOP_DEV0, new_passphrase="another-secret", passphrase="secret"), None) - - # make another key file - handle, new_keyfile = tempfile.mkstemp(prefix="key", text=False) - os.write(handle, "area51") - os.close(handle) - - # add new key file - self.assertEqual(crypto.luks_add_key(_LOOP_DEV1, new_key_file=new_keyfile, key_file=keyfile), None) - - # fail - self.assertRaises(crypto.CryptoError, crypto.luks_add_key, _LOOP_DEV0, new_passphrase="another-secret", passphrase="wrong-passphrase") - - ## - ## luks_remove_key - ## - # fail - self.assertRaises(RuntimeError, crypto.luks_remove_key, _LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase") - - # pass - self.assertEqual(crypto.luks_remove_key(_LOOP_DEV0, del_passphrase="another-secret", passphrase="secret"), None) - - # remove key file - self.assertEqual(crypto.luks_remove_key(LOOP_DEV1, del_key_file=new_keyfile, key_file=keyfile), None) - - ## - ## luks_open - ## - # pass - self.assertEqual(crypto.luks_open(_LOOP_DEV0, "crypted", passphrase="secret"), None) - self.assertEqual(crypto.luks_open(_LOOP_DEV1, "encrypted", key_file=keyfile), None) - - # fail - self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", passphrase="secret") - self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", key_file=keyfile) - # no passhprase or key file - self.assertRaises(ValueError, crypto.luks_open, _LOOP_DEV1, "another-crypted") - - ## - ## luks_status - ## - # pass - self.assertEqual(crypto.luks_status("crypted"), True) - self.assertEqual(crypto.luks_status("encrypted"), True) - self.assertEqual(crypto.luks_status("another-crypted"), False) - - ## - ## luks_uuid - ## - # pass - uuid = crypto.luks_uuid(_LOOP_DEV0) - self.assertEqual(crypto.luks_uuid(_LOOP_DEV0), uuid) - uuid = crypto.luks_uuid(_LOOP_DEV1) - self.assertEqual(crypto.luks_uuid(_LOOP_DEV1), uuid) - - ## - ## luks_close - ## - # pass - self.assertEqual(crypto.luks_close("crypted"), None) - self.assertEqual(crypto.luks_close("encrypted"), None) - - # fail - self.assertRaises(crypto.CryptoError, crypto.luks_close, "wrong-name") - # already closed - self.assertRaises(crypto.CryptoError, crypto.luks_close, "crypted") - self.assertRaises(crypto.CryptoError, crypto.luks_close, "encrypted") - - # cleanup - os.unlink(keyfile) - os.unlink(new_keyfile) - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(CryptoTestCase) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/devicelibs_test/edd_test.py b/tests/storage_test/devicelibs_test/edd_test.py deleted file mode 100644 index 449395508..000000000 --- a/tests/storage_test/devicelibs_test/edd_test.py +++ /dev/null @@ -1,212 +0,0 @@ -import mock - -class EddTestCase(mock.TestCase): - def setUp(self): - self.setupModules( - ['_isys', 'logging', 'pyanaconda.anaconda_log', 'block']) - - def tearDown(self): - self.tearDownModules() - - def test_biosdev_to_edd_dir(self): - from pyanaconda.storage.devicelibs import edd - path = edd.biosdev_to_edd_dir(138) - self.assertEqual("/sys/firmware/edd/int13_dev8a", path) - - def test_collect_edd_data(self): - from pyanaconda.storage.devicelibs import edd - - # test with vda, vdb - fs = EddTestFS(self, edd).vda_vdb() - edd_dict = edd.collect_edd_data() - self.assertEqual(len(edd_dict), 2) - self.assertEqual(edd_dict[0x80].type, "SCSI") - self.assertEqual(edd_dict[0x80].scsi_id, 0) - self.assertEqual(edd_dict[0x80].scsi_lun, 0) - self.assertEqual(edd_dict[0x80].pci_dev, "00:05.0") - self.assertEqual(edd_dict[0x80].channel, 0) - self.assertEqual(edd_dict[0x80].sectors, 16777216) - self.assertEqual(edd_dict[0x81].pci_dev, "00:06.0") - - # test with sda, vda - fs = EddTestFS(self, edd).sda_vda() - edd_dict = edd.collect_edd_data() - self.assertEqual(len(edd_dict), 2) - self.assertEqual(edd_dict[0x80].type, "ATA") - self.assertEqual(edd_dict[0x80].scsi_id, None) - self.assertEqual(edd_dict[0x80].scsi_lun, None) - self.assertEqual(edd_dict[0x80].pci_dev, "00:01.1") - self.assertEqual(edd_dict[0x80].channel, 0) - self.assertEqual(edd_dict[0x80].sectors, 2097152) - self.assertEqual(edd_dict[0x80].ata_device, 0) - self.assertEqual(edd_dict[0x80].mbr_signature, "0x000ccb01") - - def test_collect_edd_data_cciss(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_cciss() - edd_dict = edd.collect_edd_data() - - self.assertEqual(edd_dict[0x80].pci_dev, None) - self.assertEqual(edd_dict[0x80].channel, None) - - def test_edd_entry_str(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda() - edd_dict = edd.collect_edd_data() - expected_output = """\ttype: ATA, ata_device: 0 -\tchannel: 0, mbr_signature: 0x000ccb01 -\tpci_dev: 00:01.1, scsi_id: None -\tscsi_lun: None, sectors: 2097152""" - self.assertEqual(str(edd_dict[0x80]), expected_output) - - def test_matcher_device_path(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda() - edd_dict = edd.collect_edd_data() - - analyzer = edd.EddMatcher(edd_dict[0x80]) - path = analyzer.devname_from_pci_dev() - self.assertEqual(path, "sda") - - analyzer = edd.EddMatcher(edd_dict[0x81]) - path = analyzer.devname_from_pci_dev() - self.assertEqual(path, "vda") - - def test_bad_device_path(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda_no_pcidev() - edd_dict = edd.collect_edd_data() - - analyzer = edd.EddMatcher(edd_dict[0x80]) - path = analyzer.devname_from_pci_dev() - self.assertEqual(path, None) - - def test_bad_host_bus(self): - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda_no_host_bus() - - edd_dict = edd.collect_edd_data() - - # 0x80 entry is basted so fail without an exception - analyzer = edd.EddMatcher(edd_dict[0x80]) - devname = analyzer.devname_from_pci_dev() - self.assertEqual(devname, None) - - # but still succeed on 0x81 - analyzer = edd.EddMatcher(edd_dict[0x81]) - devname = analyzer.devname_from_pci_dev() - self.assertEqual(devname, "vda") - - def test_get_edd_dict_1(self): - """ Test get_edd_dict()'s pci_dev matching. """ - from pyanaconda.storage.devicelibs import edd - fs = EddTestFS(self, edd).sda_vda() - self.assertEqual(edd.get_edd_dict([]), - {'sda' : 0x80, - 'vda' : 0x81}) - - def test_get_edd_dict_2(self): - """ Test get_edd_dict()'s pci_dev matching. """ - from pyanaconda.storage.devicelibs import edd - edd.collect_mbrs = mock.Mock(return_value = { - 'sda' : '0x000ccb01', - 'vda' : '0x0006aef1'}) - fs = EddTestFS(self, edd).sda_vda_missing_details() - self.assertEqual(edd.get_edd_dict([]), - {'sda' : 0x80, - 'vda' : 0x81}) - - def test_get_edd_dict_3(self): - """ Test scenario when the 0x80 and 0x81 edd directories contain the - same data and give no way to distinguish among the two devices. - """ - from pyanaconda.storage.devicelibs import edd - edd.log = mock.Mock() - edd.collect_mbrs = mock.Mock(return_value={'sda' : '0x000ccb01', - 'vda' : '0x0006aef1'}) - fs = EddTestFS(self, edd).sda_sdb_same() - self.assertEqual(edd.get_edd_dict([]), {}) - self.assertIn((('edd: both edd entries 0x80 and 0x81 seem to map to sda',), {}), - edd.log.info.call_args_list) - -class EddTestFS(object): - def __init__(self, test_case, target_module): - self.fs = mock.DiskIO() - test_case.take_over_io(self.fs, target_module) - - def sda_vda_missing_details(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n" - self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1\n" - - def sda_vda(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/interface"] = "ATA device: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n" - self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n" - - self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:05.0 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/interface"] = "SCSI id: 0 lun: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1\n" - self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "16777216\n" - - self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block"] = self.fs.Dir() - self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block/sda"] = self.fs.Dir() - - self.fs["/sys/devices/pci0000:00/0000:00:05.0/virtio2/block"] = self.fs.Dir() - self.fs["/sys/devices/pci0000:00/0000:00:05.0/virtio2/block/vda"] = self.fs.Dir() - - return self.fs - - def sda_vda_no_pcidev(self): - self.sda_vda() - entries = [e for e in self.fs.fs if e.startswith("/sys/devices/pci")] - map(self.fs.os_remove, entries) - return self.fs - - def sda_vda_no_host_bus(self): - self.sda_vda() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: \n" - self.fs.os_remove("/sys/firmware/edd/int13_dev80/mbr_signature") - self.fs.os_remove("/sys/firmware/edd/int13_dev81/mbr_signature") - - def sda_cciss(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCIX 05:00.0 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/interface"] = "RAID identity_tag: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n" - self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n" - - return self.fs - - def vda_vdb(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:05.0 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/interface"] = "SCSI id: 0 lun: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "16777216\n" - - self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:06.0 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/interface"] = "SCSI id: 0 lun: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "4194304\n" - - return self.fs - - def sda_sdb_same(self): - self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/interface"] = "ATA device: 0\n" - self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01" - self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n" - - self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir() - self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:01.1 channel: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/interface"] = "ATA device: 0\n" - self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1" - self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "2097152\n" - - self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block"] = self.fs.Dir() - self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block/sda"] = self.fs.Dir() diff --git a/tests/storage_test/devicelibs_test/lvm_test.py b/tests/storage_test/devicelibs_test/lvm_test.py deleted file mode 100755 index e639a2137..000000000 --- a/tests/storage_test/devicelibs_test/lvm_test.py +++ /dev/null @@ -1,239 +0,0 @@ -#!/usr/bin/python -import baseclass -import unittest -from mock import acceptance - -class LVMTestCase(baseclass.DevicelibsTestCase): - - def testLVM(self): - _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] - _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] - - import storage.devicelibs.lvm as lvm - - - @acceptance - def testLVM(self): - ## - ## pvcreate - ## - # pass - for dev, file in self._loopMap.iteritems(): - self.assertEqual(lvm.pvcreate(dev), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.pvcreate, "/not/existing/device") - - ## - ## pvresize - ## - # pass - for dev, file in self._loopMap.iteritems(): - self.assertEqual(lvm.pvresize(dev, 50), None) - self.assertEqual(lvm.pvresize(dev, 100), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.pvresize, "/not/existing/device", 50) - - ## - ## vgcreate - ## - # pass - self.assertEqual(lvm.vgcreate("test-vg", [_LOOP_DEV0, _LOOP_DEV1], 4), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", ["/not/existing/device"], 4) - # vg already exists - self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [_LOOP_DEV0], 4) - # pe size must be power of 2 - self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [_LOOP_DEV0], 5) - - ## - ## pvremove - ## - # fail - # cannot remove pv now with vg created - self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0) - - ## - ## vgdeactivate - ## - # pass - self.assertEqual(lvm.vgdeactivate("test-vg"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgdeactivate, "wrong-vg-name") - - ## - ## vgreduce - ## - # pass - self.assertEqual(lvm.vgreduce("test-vg", [_LOOP_DEV1]), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [_LOOP_DEV1]) - self.assertRaises(lvm.LVMError, lvm.vgreduce, "test-vg", ["/not/existing/device"]) - - ## - ## vgactivate - ## - # pass - self.assertEqual(lvm.vgactivate("test-vg"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgactivate, "wrong-vg-name") - - ## - ## pvinfo - ## - # pass - self.assertEqual(lvm.pvinfo(_LOOP_DEV0)["pv_name"], _LOOP_DEV0) - # no vg - self.assertEqual(lvm.pvinfo(_LOOP_DEV1)["pv_name"], _LOOP_DEV1) - - # fail - self.assertRaises(lvm.LVMError, lvm.pvinfo, "/not/existing/device") - - ## - ## vginfo - ## - # pass - self.assertEqual(lvm.vginfo("test-vg")["pe_size"], "4.00") - - # fail - self.assertRaises(lvm.LVMError, lvm.vginfo, "wrong-vg-name") - - ## - ## lvcreate - ## - # pass - self.assertEqual(lvm.lvcreate("test-vg", "test-lv", 10), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvcreate, "wrong-vg-name", "another-lv", 10) - - ## - ## lvdeactivate - ## - # pass - self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "test-vg", "wrong-lv-name") - self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "test-lv") - self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "wrong-lv-name") - - ## - ## lvresize - ## - # pass - self.assertEqual(lvm.lvresize("test-vg", "test-lv", 60), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "wrong-lv-name", 80) - self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "test-lv", 80) - self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "wrong-lv-name", 80) - # changing to same size - self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "test-lv", 60) - - ## - ## lvactivate - ## - # pass - self.assertEqual(lvm.lvactivate("test-vg", "test-lv"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvactivate, "test-vg", "wrong-lv-name") - self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "test-lv") - self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "wrong-lv-name") - - ## - ## lvs - ## - # pass - self.assertEqual(lvm.lvs("test-vg")["test-lv"]["size"], "60.00") - - # fail - self.assertRaises(lvm.LVMError, lvm.lvs, "wrong-vg-name") - - ## - ## has_lvm - ## - # pass - self.assertEqual(lvm.has_lvm(), True) - - # fail - # TODO - - ## - ## lvremove - ## - # pass - self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None) # is deactivation needed? - self.assertEqual(lvm.lvremove("test-vg", "test-lv"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "wrong-lv-name") - self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "test-lv") - self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "wrong-lv-name") - # lv already removed - self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "test-lv") - - ## - ## vgremove - ## - # pass - self.assertEqual(lvm.vgremove("test-vg"), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.vgremove, "wrong-vg-name") - # vg already removed - self.assertRaises(lvm.LVMError, lvm.vgremove, "test-vg") - - ## - ## pvremove - ## - # pass - for dev, file in self._loopMap.iteritems(): - self.assertEqual(lvm.pvremove(dev), None) - - # fail - self.assertRaises(lvm.LVMError, lvm.pvremove, "/not/existing/device") - # pv already removed - self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0) - - #def testGetPossiblePhysicalExtents(self): - # pass - self.assertEqual(lvm.getPossiblePhysicalExtents(4), - filter(lambda pe: pe > 4, map(lambda power: 2**power, xrange(3, 25)))) - self.assertEqual(lvm.getPossiblePhysicalExtents(100000), - filter(lambda pe: pe > 100000, map(lambda power: 2**power, xrange(3, 25)))) - - #def testGetMaxLVSize(self): - # pass - self.assertEqual(lvm.getMaxLVSize(), 16*1024**2) - - #def testSafeLVMName(self): - # pass - self.assertEqual(lvm.safeLvmName("/strange/lv*name5"), "strange_lvname5") - - #def testClampSize(self): - # pass - self.assertEqual(lvm.clampSize(10, 4), 8L) - self.assertEqual(lvm.clampSize(10, 4, True), 12L) - - #def testVGUsedSpace(self): - # TODO - pass - - #def testVGFreeSpace(self): - # TODO - pass - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(LVMTestCase) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/devicelibs_test/mdraid_test.py b/tests/storage_test/devicelibs_test/mdraid_test.py deleted file mode 100755 index 9083bd162..000000000 --- a/tests/storage_test/devicelibs_test/mdraid_test.py +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/python -import baseclass -import unittest -import time -from mock import acceptance - -class MDRaidTestCase(baseclass.DevicelibsTestCase): - - def testMDRaid(self): - _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] - _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] - - import storage.devicelibs.mdraid as mdraid - - @acceptance - def testMDRaid(self): - ## - ## getRaidLevels - ## - # pass - self.assertEqual(mdraid.getRaidLevels(), mdraid.getRaidLevels()) - - ## - ## get_raid_min_members - ## - # pass - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID0), 2) - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID1), 2) - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID5), 3) - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID6), 4) - self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID10), 2) - - # fail - # unsupported raid - self.assertRaises(ValueError, mdraid.get_raid_min_members, 8) - - ## - ## get_raid_max_spares - ## - # pass - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID0, 5), 0) - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID1, 5), 3) - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID5, 5), 2) - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID6, 5), 1) - self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID10, 5), 3) - - # fail - # unsupported raid - self.assertRaises(ValueError, mdraid.get_raid_max_spares, 8, 5) - - ## - ## mdcreate - ## - # pass - self.assertEqual(mdraid.mdcreate("/dev/md0", 1, [_LOOP_DEV0, _LOOP_DEV1]), None) - # wait for raid to settle - time.sleep(2) - - # fail - self.assertRaises(mdraid.MDRaidError, mdraid.mdcreate, "/dev/md1", 1, ["/not/existing/dev0", "/not/existing/dev1"]) - - ## - ## mddeactivate - ## - # pass - self.assertEqual(mdraid.mddeactivate("/dev/md0"), None) - - # fail - self.assertRaises(mdraid.MDRaidError, mdraid.mddeactivate, "/not/existing/md") - - ## - ## mdadd - ## - # pass - # TODO - - # fail - self.assertRaises(mdraid.MDRaidError, mdraid.mdadd, "/not/existing/device") - - ## - ## mdactivate - ## - # pass - self.assertEqual(mdraid.mdactivate("/dev/md0", [_LOOP_DEV0, _LOOP_DEV1], super_minor=0), None) - # wait for raid to settle - time.sleep(2) - - # fail - self.assertRaises(mdraid.MDRaidError, mdraid.mdactivate, "/not/existing/md", super_minor=1) - # requires super_minor or uuid - self.assertRaises(ValueError, mdraid.mdactivate, "/dev/md1") - - ## - ## mddestroy - ## - # pass - # deactivate first - self.assertEqual(mdraid.mddeactivate("/dev/md0"), None) - - self.assertEqual(mdraid.mddestroy(_LOOP_DEV0), None) - self.assertEqual(mdraid.mddestroy(_LOOP_DEV1), None) - - # fail - # not a component - self.assertRaises(mdraid.MDRaidError, mdraid.mddestroy, "/dev/md0") - self.assertRaises(mdraid.MDRaidError, mdraid.mddestroy, "/not/existing/device") - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(MDRaidTestCase) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/storage_test/devicelibs_test/mpath_test.py b/tests/storage_test/devicelibs_test/mpath_test.py deleted file mode 100755 index 5a160d981..000000000 --- a/tests/storage_test/devicelibs_test/mpath_test.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/python -import mock - -class MPathTestCase(mock.TestCase): - - # creating devices, user_friendly_names set to yes - output1 = """\ -create: mpathb (1ATA ST3120026AS 5M) undef ATA,ST3120026AS -size=112G features='0' hwhandler='0' wp=undef -`-+- policy='round-robin 0' prio=1 status=undef - `- 2:0:0:0 sda 8:0 undef ready running -create: mpatha (36006016092d21800703762872c60db11) undef DGC,RAID 5 -size=10G features='1 queue_if_no_path' hwhandler='1 emc' wp=undef -`-+- policy='round-robin 0' prio=2 status=undef - |- 6:0:0:0 sdb 8:16 undef ready running - `- 7:0:0:0 sdc 8:32 undef ready running\ -""" - - # listing existing devices, user_friendly_names set to yes - output2 = """\ -mpathb (3600a0b800067fcc9000001f34d23ff88) dm-1 IBM,1726-4xx FAStT -size=100G features='0' hwhandler='1 rdac' wp=rw -`-+- policy='round-robin 0' prio=-1 status=active - |- 1:0:0:0 sda 8:0 active undef running - `- 2:0:0:0 sdc 8:32 active undef running -mpatha (3600a0b800067fabc000067694d23fe6e) dm-0 IBM,1726-4xx FAStT -size=100G features='0' hwhandler='1 rdac' wp=rw -`-+- policy='round-robin 0' prio=-1 status=active - |- 1:0:0:1 sdb 8:16 active undef running - `- 2:0:0:1 sdd 8:48 active undef running -""" - - # creating devices, user_friendly_names set to no - output3 = """\ -create: 3600a0b800067fabc000067694d23fe6e undef IBM,1726-4xx FAStT -size=100G features='1 queue_if_no_path' hwhandler='1 rdac' wp=undef -`-+- policy='round-robin 0' prio=6 status=undef - |- 1:0:0:1 sdb 8:16 undef ready running - `- 2:0:0:1 sdd 8:48 undef ready running -create: 3600a0b800067fcc9000001f34d23ff88 undef IBM,1726-4xx FAStT -size=100G features='1 queue_if_no_path' hwhandler='1 rdac' wp=undef -`-+- policy='round-robin 0' prio=3 status=undef - |- 1:0:0:0 sda 8:0 undef ready running - `- 2:0:0:0 sdc 8:32 undef ready running\ -""" - - # listing existing devices, user_friendly_names set to no - output4 = """\ -3600a0b800067fcc9000001f34d23ff88 dm-1 IBM,1726-4xx FAStT -size=100G features='0' hwhandler='1 rdac' wp=rw -`-+- policy='round-robin 0' prio=-1 status=active - |- 1:0:0:0 sda 8:0 active undef running - `- 2:0:0:0 sdc 8:32 active undef running -3600a0b800067fabc000067694d23fe6e dm-0 IBM,1726-4xx FAStT -size=100G features='0' hwhandler='1 rdac' wp=rw -`-+- policy='round-robin 0' prio=-1 status=active - |- 1:0:0:1 sdb 8:16 active undef running - `- 2:0:0:1 sdd 8:48 active undef running -""" - - def setUp(self): - self.setupModules( - ['_isys', 'logging', 'anaconda_log', 'block']) - - def tearDown(self): - self.tearDownModules() - - def testParse(self): - from pyanaconda.storage.devicelibs import mpath - topology = mpath.parseMultipathOutput(self.output1) - self.assertEqual(topology, - {'mpatha':['sdb','sdc'], 'mpathb':['sda']}) - topology = mpath.parseMultipathOutput(self.output2) - self.assertEqual(topology, - {'mpathb':['sda','sdc'], 'mpatha':['sdb', 'sdd']}) - topology = mpath.parseMultipathOutput(self.output3) - self.assertEqual(topology, - {'3600a0b800067fabc000067694d23fe6e' : ['sdb','sdd'], - '3600a0b800067fcc9000001f34d23ff88' : ['sda', 'sdc']}) - topology = mpath.parseMultipathOutput(self.output4) - self.assertEqual(topology, - {'3600a0b800067fabc000067694d23fe6e' : ['sdb','sdd'], - '3600a0b800067fcc9000001f34d23ff88' : ['sda', 'sdc']}) - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(MPathTestCase) diff --git a/tests/storage_test/devicelibs_test/swap_test.py b/tests/storage_test/devicelibs_test/swap_test.py deleted file mode 100755 index 3808943cc..000000000 --- a/tests/storage_test/devicelibs_test/swap_test.py +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/python -import baseclass -import unittest -from mock import acceptance - -class SwapTestCase(baseclass.DevicelibsTestCase): - - def testSwap(self): - _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]] - _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]] - - import storage.devicelibs.swap as swap - - @acceptance - def testSwap(self): - ## - ## mkswap - ## - # pass - self.assertEqual(swap.mkswap(_LOOP_DEV0, "swap"), None) - - # fail - self.assertRaises(swap.SwapError, swap.mkswap, "/not/existing/device") - - ## - ## swapon - ## - # pass - self.assertEqual(swap.swapon(_LOOP_DEV0, 1), None) - - # fail - self.assertRaises(swap.SwapError, swap.swapon, "/not/existing/device") - # not a swap partition - self.assertRaises(swap.SwapError, swap.swapon, _LOOP_DEV1) - - # pass - # make another swap - self.assertEqual(swap.mkswap(_LOOP_DEV1, "another-swap"), None) - self.assertEqual(swap.swapon(_LOOP_DEV1), None) - - ## - ## swapstatus - ## - # pass - self.assertEqual(swap.swapstatus(_LOOP_DEV0), True) - self.assertEqual(swap.swapstatus(_LOOP_DEV1), True) - - # does not fail - self.assertEqual(swap.swapstatus("/not/existing/device"), False) - - ## - ## swapoff - ## - # pass - self.assertEqual(swap.swapoff(_LOOP_DEV1), None) - - # check status - self.assertEqual(swap.swapstatus(_LOOP_DEV0), True) - self.assertEqual(swap.swapstatus(_LOOP_DEV1), False) - - self.assertEqual(swap.swapoff(_LOOP_DEV0), None) - - # fail - self.assertRaises(swap.SwapError, swap.swapoff, "/not/existing/device") - # already off - self.assertRaises(swap.SwapError, swap.swapoff, _LOOP_DEV0) - - -def suite(): - return unittest.TestLoader().loadTestsFromTestCase(SwapTestCase) - - -if __name__ == "__main__": - unittest.main() |