summaryrefslogtreecommitdiffstats
path: root/tests/storage_test/devicelibs_test
diff options
context:
space:
mode:
Diffstat (limited to 'tests/storage_test/devicelibs_test')
-rw-r--r--tests/storage_test/devicelibs_test/Makefile.am22
-rw-r--r--tests/storage_test/devicelibs_test/baseclass.py80
-rwxr-xr-xtests/storage_test/devicelibs_test/crypto_test.py135
-rw-r--r--tests/storage_test/devicelibs_test/edd_test.py212
-rwxr-xr-xtests/storage_test/devicelibs_test/lvm_test.py239
-rwxr-xr-xtests/storage_test/devicelibs_test/mdraid_test.py114
-rwxr-xr-xtests/storage_test/devicelibs_test/mpath_test.py86
-rwxr-xr-xtests/storage_test/devicelibs_test/swap_test.py74
8 files changed, 0 insertions, 962 deletions
diff --git a/tests/storage_test/devicelibs_test/Makefile.am b/tests/storage_test/devicelibs_test/Makefile.am
deleted file mode 100644
index eeeabbdc4..000000000
--- a/tests/storage_test/devicelibs_test/Makefile.am
+++ /dev/null
@@ -1,22 +0,0 @@
-# tests/storage/devicelibs/Makefile.am for anaconda
-#
-# Copyright (C) 2009 Red Hat, Inc.
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as published
-# by the Free Software Foundation; either version 2.1 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-# Author: David Cantrell <dcantrell@redhat.com>
-
-EXTRA_DIST = *.py
-
-MAINTAINERCLEANFILES = Makefile.in
diff --git a/tests/storage_test/devicelibs_test/baseclass.py b/tests/storage_test/devicelibs_test/baseclass.py
deleted file mode 100644
index 06adef989..000000000
--- a/tests/storage_test/devicelibs_test/baseclass.py
+++ /dev/null
@@ -1,80 +0,0 @@
-import unittest
-import os
-import subprocess
-
-
-def makeLoopDev(device_name, file_name):
- proc = subprocess.Popen(["dd", "if=/dev/zero", "of=%s" % file_name,
- "bs=1024", "count=102400"],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- while True:
- (out, err) = proc.communicate()
- if proc.returncode is not None:
- rc = proc.returncode
- break
- if rc:
- raise OSError, "dd failed creating the file %s" % file_name
-
- proc = subprocess.Popen(["losetup", device_name, file_name],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- while True:
- (out, err) = proc.communicate()
- if proc.returncode is not None:
- rc = proc.returncode
- break
- if rc:
- raise OSError, "losetup failed setting up the loop device %s" % device_name
-
-def removeLoopDev(device_name, file_name):
- proc = subprocess.Popen(["losetup", "-d", device_name],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- while True:
- (out, err) = proc.communicate()
- if proc.returncode is not None:
- rc = proc.returncode
- break
- if rc:
- raise OSError, "losetup failed removing the loop device %s" % device_name
-
- os.unlink(file_name)
-
-def getFreeLoopDev():
- # There's a race condition here where another process could grab the loop
- # device losetup gives us before we have time to set it up, but that's just
- # a chance we'll have to take.
- proc = subprocess.Popen(["losetup", "-f"],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- out = None
-
- while True:
- (out, err) = proc.communicate()
- if proc.returncode is not None:
- rc = proc.returncode
- out = out.strip()
- break
-
- if rc:
- raise OSError, "losetup failed to find a free device"
-
- return out
-
-class DevicelibsTestCase(unittest.TestCase):
-
- _LOOP_DEVICES = ["/tmp/test-virtdev0", "/tmp/test-virtdev1"]
-
- def __init__(self, *args, **kwargs):
- import pyanaconda.anaconda_log
- pyanaconda.anaconda_log.init()
-
- unittest.TestCase.__init__(self, *args, **kwargs)
- self._loopMap = {}
-
- def setUp(self):
- for file in self._LOOP_DEVICES:
- dev = getFreeLoopDev()
- makeLoopDev(dev, file)
- self._loopMap[file] = dev
-
- def tearDown(self):
- for (file, dev) in self._loopMap.iteritems():
- removeLoopDev(dev, file)
diff --git a/tests/storage_test/devicelibs_test/crypto_test.py b/tests/storage_test/devicelibs_test/crypto_test.py
deleted file mode 100755
index 3e9e3a667..000000000
--- a/tests/storage_test/devicelibs_test/crypto_test.py
+++ /dev/null
@@ -1,135 +0,0 @@
-#!/usr/bin/python
-import baseclass
-import unittest
-from mock import acceptance
-
-import tempfile
-import os
-
-class CryptoTestCase(baseclass.DevicelibsTestCase):
-
- def testCrypto(self):
- _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
- _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
-
- import storage.devicelibs.crypto as crypto
-
-
- @acceptance
- def testCrypto(self):
- ##
- ## is_luks
- ##
- # pass
- self.assertEqual(crypto.is_luks(_LOOP_DEV0), -22)
- self.assertEqual(crypto.is_luks("/not/existing/device"), -22)
-
- ##
- ## luks_format
- ##
- # pass
- self.assertEqual(crypto.luks_format(_LOOP_DEV0, passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256), None)
-
- # make a key file
- handle, keyfile = tempfile.mkstemp(prefix="key", text=False)
- os.write(handle, "nobodyknows")
- os.close(handle)
-
- # format with key file
- self.assertEqual(crypto.luks_format(_LOOP_DEV1, key_file=keyfile), None)
-
- # fail
- self.assertRaises(crypto.CryptoError, crypto.luks_format, "/not/existing/device", passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256)
- # no passhprase or key file
- self.assertRaises(ValueError, crypto.luks_format, _LOOP_DEV1, cipher="aes-cbc-essiv:sha256", key_size=256)
-
- ##
- ## is_luks
- ##
- # pass
- self.assertEqual(crypto.is_luks(_LOOP_DEV0), 0) # 0 = is luks
- self.assertEqual(crypto.is_luks(_LOOP_DEV1), 0)
-
- ##
- ## luks_add_key
- ##
- # pass
- self.assertEqual(crypto.luks_add_key(_LOOP_DEV0, new_passphrase="another-secret", passphrase="secret"), None)
-
- # make another key file
- handle, new_keyfile = tempfile.mkstemp(prefix="key", text=False)
- os.write(handle, "area51")
- os.close(handle)
-
- # add new key file
- self.assertEqual(crypto.luks_add_key(_LOOP_DEV1, new_key_file=new_keyfile, key_file=keyfile), None)
-
- # fail
- self.assertRaises(crypto.CryptoError, crypto.luks_add_key, _LOOP_DEV0, new_passphrase="another-secret", passphrase="wrong-passphrase")
-
- ##
- ## luks_remove_key
- ##
- # fail
- self.assertRaises(RuntimeError, crypto.luks_remove_key, _LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase")
-
- # pass
- self.assertEqual(crypto.luks_remove_key(_LOOP_DEV0, del_passphrase="another-secret", passphrase="secret"), None)
-
- # remove key file
- self.assertEqual(crypto.luks_remove_key(LOOP_DEV1, del_key_file=new_keyfile, key_file=keyfile), None)
-
- ##
- ## luks_open
- ##
- # pass
- self.assertEqual(crypto.luks_open(_LOOP_DEV0, "crypted", passphrase="secret"), None)
- self.assertEqual(crypto.luks_open(_LOOP_DEV1, "encrypted", key_file=keyfile), None)
-
- # fail
- self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", passphrase="secret")
- self.assertRaises(crypto.CryptoError, crypto.luks_open, "/not/existing/device", "another-crypted", key_file=keyfile)
- # no passhprase or key file
- self.assertRaises(ValueError, crypto.luks_open, _LOOP_DEV1, "another-crypted")
-
- ##
- ## luks_status
- ##
- # pass
- self.assertEqual(crypto.luks_status("crypted"), True)
- self.assertEqual(crypto.luks_status("encrypted"), True)
- self.assertEqual(crypto.luks_status("another-crypted"), False)
-
- ##
- ## luks_uuid
- ##
- # pass
- uuid = crypto.luks_uuid(_LOOP_DEV0)
- self.assertEqual(crypto.luks_uuid(_LOOP_DEV0), uuid)
- uuid = crypto.luks_uuid(_LOOP_DEV1)
- self.assertEqual(crypto.luks_uuid(_LOOP_DEV1), uuid)
-
- ##
- ## luks_close
- ##
- # pass
- self.assertEqual(crypto.luks_close("crypted"), None)
- self.assertEqual(crypto.luks_close("encrypted"), None)
-
- # fail
- self.assertRaises(crypto.CryptoError, crypto.luks_close, "wrong-name")
- # already closed
- self.assertRaises(crypto.CryptoError, crypto.luks_close, "crypted")
- self.assertRaises(crypto.CryptoError, crypto.luks_close, "encrypted")
-
- # cleanup
- os.unlink(keyfile)
- os.unlink(new_keyfile)
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(CryptoTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/devicelibs_test/edd_test.py b/tests/storage_test/devicelibs_test/edd_test.py
deleted file mode 100644
index 449395508..000000000
--- a/tests/storage_test/devicelibs_test/edd_test.py
+++ /dev/null
@@ -1,212 +0,0 @@
-import mock
-
-class EddTestCase(mock.TestCase):
- def setUp(self):
- self.setupModules(
- ['_isys', 'logging', 'pyanaconda.anaconda_log', 'block'])
-
- def tearDown(self):
- self.tearDownModules()
-
- def test_biosdev_to_edd_dir(self):
- from pyanaconda.storage.devicelibs import edd
- path = edd.biosdev_to_edd_dir(138)
- self.assertEqual("/sys/firmware/edd/int13_dev8a", path)
-
- def test_collect_edd_data(self):
- from pyanaconda.storage.devicelibs import edd
-
- # test with vda, vdb
- fs = EddTestFS(self, edd).vda_vdb()
- edd_dict = edd.collect_edd_data()
- self.assertEqual(len(edd_dict), 2)
- self.assertEqual(edd_dict[0x80].type, "SCSI")
- self.assertEqual(edd_dict[0x80].scsi_id, 0)
- self.assertEqual(edd_dict[0x80].scsi_lun, 0)
- self.assertEqual(edd_dict[0x80].pci_dev, "00:05.0")
- self.assertEqual(edd_dict[0x80].channel, 0)
- self.assertEqual(edd_dict[0x80].sectors, 16777216)
- self.assertEqual(edd_dict[0x81].pci_dev, "00:06.0")
-
- # test with sda, vda
- fs = EddTestFS(self, edd).sda_vda()
- edd_dict = edd.collect_edd_data()
- self.assertEqual(len(edd_dict), 2)
- self.assertEqual(edd_dict[0x80].type, "ATA")
- self.assertEqual(edd_dict[0x80].scsi_id, None)
- self.assertEqual(edd_dict[0x80].scsi_lun, None)
- self.assertEqual(edd_dict[0x80].pci_dev, "00:01.1")
- self.assertEqual(edd_dict[0x80].channel, 0)
- self.assertEqual(edd_dict[0x80].sectors, 2097152)
- self.assertEqual(edd_dict[0x80].ata_device, 0)
- self.assertEqual(edd_dict[0x80].mbr_signature, "0x000ccb01")
-
- def test_collect_edd_data_cciss(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_cciss()
- edd_dict = edd.collect_edd_data()
-
- self.assertEqual(edd_dict[0x80].pci_dev, None)
- self.assertEqual(edd_dict[0x80].channel, None)
-
- def test_edd_entry_str(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda()
- edd_dict = edd.collect_edd_data()
- expected_output = """\ttype: ATA, ata_device: 0
-\tchannel: 0, mbr_signature: 0x000ccb01
-\tpci_dev: 00:01.1, scsi_id: None
-\tscsi_lun: None, sectors: 2097152"""
- self.assertEqual(str(edd_dict[0x80]), expected_output)
-
- def test_matcher_device_path(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda()
- edd_dict = edd.collect_edd_data()
-
- analyzer = edd.EddMatcher(edd_dict[0x80])
- path = analyzer.devname_from_pci_dev()
- self.assertEqual(path, "sda")
-
- analyzer = edd.EddMatcher(edd_dict[0x81])
- path = analyzer.devname_from_pci_dev()
- self.assertEqual(path, "vda")
-
- def test_bad_device_path(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda_no_pcidev()
- edd_dict = edd.collect_edd_data()
-
- analyzer = edd.EddMatcher(edd_dict[0x80])
- path = analyzer.devname_from_pci_dev()
- self.assertEqual(path, None)
-
- def test_bad_host_bus(self):
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda_no_host_bus()
-
- edd_dict = edd.collect_edd_data()
-
- # 0x80 entry is basted so fail without an exception
- analyzer = edd.EddMatcher(edd_dict[0x80])
- devname = analyzer.devname_from_pci_dev()
- self.assertEqual(devname, None)
-
- # but still succeed on 0x81
- analyzer = edd.EddMatcher(edd_dict[0x81])
- devname = analyzer.devname_from_pci_dev()
- self.assertEqual(devname, "vda")
-
- def test_get_edd_dict_1(self):
- """ Test get_edd_dict()'s pci_dev matching. """
- from pyanaconda.storage.devicelibs import edd
- fs = EddTestFS(self, edd).sda_vda()
- self.assertEqual(edd.get_edd_dict([]),
- {'sda' : 0x80,
- 'vda' : 0x81})
-
- def test_get_edd_dict_2(self):
- """ Test get_edd_dict()'s pci_dev matching. """
- from pyanaconda.storage.devicelibs import edd
- edd.collect_mbrs = mock.Mock(return_value = {
- 'sda' : '0x000ccb01',
- 'vda' : '0x0006aef1'})
- fs = EddTestFS(self, edd).sda_vda_missing_details()
- self.assertEqual(edd.get_edd_dict([]),
- {'sda' : 0x80,
- 'vda' : 0x81})
-
- def test_get_edd_dict_3(self):
- """ Test scenario when the 0x80 and 0x81 edd directories contain the
- same data and give no way to distinguish among the two devices.
- """
- from pyanaconda.storage.devicelibs import edd
- edd.log = mock.Mock()
- edd.collect_mbrs = mock.Mock(return_value={'sda' : '0x000ccb01',
- 'vda' : '0x0006aef1'})
- fs = EddTestFS(self, edd).sda_sdb_same()
- self.assertEqual(edd.get_edd_dict([]), {})
- self.assertIn((('edd: both edd entries 0x80 and 0x81 seem to map to sda',), {}),
- edd.log.info.call_args_list)
-
-class EddTestFS(object):
- def __init__(self, test_case, target_module):
- self.fs = mock.DiskIO()
- test_case.take_over_io(self.fs, target_module)
-
- def sda_vda_missing_details(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n"
- self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1\n"
-
- def sda_vda(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/interface"] = "ATA device: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n"
- self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n"
-
- self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:05.0 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/interface"] = "SCSI id: 0 lun: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1\n"
- self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "16777216\n"
-
- self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block"] = self.fs.Dir()
- self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block/sda"] = self.fs.Dir()
-
- self.fs["/sys/devices/pci0000:00/0000:00:05.0/virtio2/block"] = self.fs.Dir()
- self.fs["/sys/devices/pci0000:00/0000:00:05.0/virtio2/block/vda"] = self.fs.Dir()
-
- return self.fs
-
- def sda_vda_no_pcidev(self):
- self.sda_vda()
- entries = [e for e in self.fs.fs if e.startswith("/sys/devices/pci")]
- map(self.fs.os_remove, entries)
- return self.fs
-
- def sda_vda_no_host_bus(self):
- self.sda_vda()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: \n"
- self.fs.os_remove("/sys/firmware/edd/int13_dev80/mbr_signature")
- self.fs.os_remove("/sys/firmware/edd/int13_dev81/mbr_signature")
-
- def sda_cciss(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCIX 05:00.0 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/interface"] = "RAID identity_tag: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01\n"
- self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n"
-
- return self.fs
-
- def vda_vdb(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:05.0 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/interface"] = "SCSI id: 0 lun: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "16777216\n"
-
- self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:06.0 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/interface"] = "SCSI id: 0 lun: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "4194304\n"
-
- return self.fs
-
- def sda_sdb_same(self):
- self.fs["/sys/firmware/edd/int13_dev80"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev80/host_bus"] = "PCI 00:01.1 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/interface"] = "ATA device: 0\n"
- self.fs["/sys/firmware/edd/int13_dev80/mbr_signature"] = "0x000ccb01"
- self.fs["/sys/firmware/edd/int13_dev80/sectors"] = "2097152\n"
-
- self.fs["/sys/firmware/edd/int13_dev81"] = self.fs.Dir()
- self.fs["/sys/firmware/edd/int13_dev81/host_bus"] = "PCI 00:01.1 channel: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/interface"] = "ATA device: 0\n"
- self.fs["/sys/firmware/edd/int13_dev81/mbr_signature"] = "0x0006aef1"
- self.fs["/sys/firmware/edd/int13_dev81/sectors"] = "2097152\n"
-
- self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block"] = self.fs.Dir()
- self.fs["/sys/devices/pci0000:00/0000:00:01.1/host0/target0:0:0/0:0:0:0/block/sda"] = self.fs.Dir()
diff --git a/tests/storage_test/devicelibs_test/lvm_test.py b/tests/storage_test/devicelibs_test/lvm_test.py
deleted file mode 100755
index e639a2137..000000000
--- a/tests/storage_test/devicelibs_test/lvm_test.py
+++ /dev/null
@@ -1,239 +0,0 @@
-#!/usr/bin/python
-import baseclass
-import unittest
-from mock import acceptance
-
-class LVMTestCase(baseclass.DevicelibsTestCase):
-
- def testLVM(self):
- _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
- _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
-
- import storage.devicelibs.lvm as lvm
-
-
- @acceptance
- def testLVM(self):
- ##
- ## pvcreate
- ##
- # pass
- for dev, file in self._loopMap.iteritems():
- self.assertEqual(lvm.pvcreate(dev), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.pvcreate, "/not/existing/device")
-
- ##
- ## pvresize
- ##
- # pass
- for dev, file in self._loopMap.iteritems():
- self.assertEqual(lvm.pvresize(dev, 50), None)
- self.assertEqual(lvm.pvresize(dev, 100), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.pvresize, "/not/existing/device", 50)
-
- ##
- ## vgcreate
- ##
- # pass
- self.assertEqual(lvm.vgcreate("test-vg", [_LOOP_DEV0, _LOOP_DEV1], 4), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", ["/not/existing/device"], 4)
- # vg already exists
- self.assertRaises(lvm.LVMError, lvm.vgcreate, "test-vg", [_LOOP_DEV0], 4)
- # pe size must be power of 2
- self.assertRaises(lvm.LVMError, lvm.vgcreate, "another-vg", [_LOOP_DEV0], 5)
-
- ##
- ## pvremove
- ##
- # fail
- # cannot remove pv now with vg created
- self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0)
-
- ##
- ## vgdeactivate
- ##
- # pass
- self.assertEqual(lvm.vgdeactivate("test-vg"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgdeactivate, "wrong-vg-name")
-
- ##
- ## vgreduce
- ##
- # pass
- self.assertEqual(lvm.vgreduce("test-vg", [_LOOP_DEV1]), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgreduce, "wrong-vg-name", [_LOOP_DEV1])
- self.assertRaises(lvm.LVMError, lvm.vgreduce, "test-vg", ["/not/existing/device"])
-
- ##
- ## vgactivate
- ##
- # pass
- self.assertEqual(lvm.vgactivate("test-vg"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgactivate, "wrong-vg-name")
-
- ##
- ## pvinfo
- ##
- # pass
- self.assertEqual(lvm.pvinfo(_LOOP_DEV0)["pv_name"], _LOOP_DEV0)
- # no vg
- self.assertEqual(lvm.pvinfo(_LOOP_DEV1)["pv_name"], _LOOP_DEV1)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.pvinfo, "/not/existing/device")
-
- ##
- ## vginfo
- ##
- # pass
- self.assertEqual(lvm.vginfo("test-vg")["pe_size"], "4.00")
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vginfo, "wrong-vg-name")
-
- ##
- ## lvcreate
- ##
- # pass
- self.assertEqual(lvm.lvcreate("test-vg", "test-lv", 10), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvcreate, "wrong-vg-name", "another-lv", 10)
-
- ##
- ## lvdeactivate
- ##
- # pass
- self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "test-vg", "wrong-lv-name")
- self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "test-lv")
- self.assertRaises(lvm.LVMError, lvm.lvdeactivate, "wrong-vg-name", "wrong-lv-name")
-
- ##
- ## lvresize
- ##
- # pass
- self.assertEqual(lvm.lvresize("test-vg", "test-lv", 60), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "wrong-lv-name", 80)
- self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "test-lv", 80)
- self.assertRaises(lvm.LVMError, lvm.lvresize, "wrong-vg-name", "wrong-lv-name", 80)
- # changing to same size
- self.assertRaises(lvm.LVMError, lvm.lvresize, "test-vg", "test-lv", 60)
-
- ##
- ## lvactivate
- ##
- # pass
- self.assertEqual(lvm.lvactivate("test-vg", "test-lv"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvactivate, "test-vg", "wrong-lv-name")
- self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "test-lv")
- self.assertRaises(lvm.LVMError, lvm.lvactivate, "wrong-vg-name", "wrong-lv-name")
-
- ##
- ## lvs
- ##
- # pass
- self.assertEqual(lvm.lvs("test-vg")["test-lv"]["size"], "60.00")
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvs, "wrong-vg-name")
-
- ##
- ## has_lvm
- ##
- # pass
- self.assertEqual(lvm.has_lvm(), True)
-
- # fail
- # TODO
-
- ##
- ## lvremove
- ##
- # pass
- self.assertEqual(lvm.lvdeactivate("test-vg", "test-lv"), None) # is deactivation needed?
- self.assertEqual(lvm.lvremove("test-vg", "test-lv"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "wrong-lv-name")
- self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "test-lv")
- self.assertRaises(lvm.LVMError, lvm.lvremove, "wrong-vg-name", "wrong-lv-name")
- # lv already removed
- self.assertRaises(lvm.LVMError, lvm.lvremove, "test-vg", "test-lv")
-
- ##
- ## vgremove
- ##
- # pass
- self.assertEqual(lvm.vgremove("test-vg"), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.vgremove, "wrong-vg-name")
- # vg already removed
- self.assertRaises(lvm.LVMError, lvm.vgremove, "test-vg")
-
- ##
- ## pvremove
- ##
- # pass
- for dev, file in self._loopMap.iteritems():
- self.assertEqual(lvm.pvremove(dev), None)
-
- # fail
- self.assertRaises(lvm.LVMError, lvm.pvremove, "/not/existing/device")
- # pv already removed
- self.assertRaises(lvm.LVMError, lvm.pvremove, _LOOP_DEV0)
-
- #def testGetPossiblePhysicalExtents(self):
- # pass
- self.assertEqual(lvm.getPossiblePhysicalExtents(4),
- filter(lambda pe: pe > 4, map(lambda power: 2**power, xrange(3, 25))))
- self.assertEqual(lvm.getPossiblePhysicalExtents(100000),
- filter(lambda pe: pe > 100000, map(lambda power: 2**power, xrange(3, 25))))
-
- #def testGetMaxLVSize(self):
- # pass
- self.assertEqual(lvm.getMaxLVSize(), 16*1024**2)
-
- #def testSafeLVMName(self):
- # pass
- self.assertEqual(lvm.safeLvmName("/strange/lv*name5"), "strange_lvname5")
-
- #def testClampSize(self):
- # pass
- self.assertEqual(lvm.clampSize(10, 4), 8L)
- self.assertEqual(lvm.clampSize(10, 4, True), 12L)
-
- #def testVGUsedSpace(self):
- # TODO
- pass
-
- #def testVGFreeSpace(self):
- # TODO
- pass
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(LVMTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/devicelibs_test/mdraid_test.py b/tests/storage_test/devicelibs_test/mdraid_test.py
deleted file mode 100755
index 9083bd162..000000000
--- a/tests/storage_test/devicelibs_test/mdraid_test.py
+++ /dev/null
@@ -1,114 +0,0 @@
-#!/usr/bin/python
-import baseclass
-import unittest
-import time
-from mock import acceptance
-
-class MDRaidTestCase(baseclass.DevicelibsTestCase):
-
- def testMDRaid(self):
- _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
- _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
-
- import storage.devicelibs.mdraid as mdraid
-
- @acceptance
- def testMDRaid(self):
- ##
- ## getRaidLevels
- ##
- # pass
- self.assertEqual(mdraid.getRaidLevels(), mdraid.getRaidLevels())
-
- ##
- ## get_raid_min_members
- ##
- # pass
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID0), 2)
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID1), 2)
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID5), 3)
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID6), 4)
- self.assertEqual(mdraid.get_raid_min_members(mdraid.RAID10), 2)
-
- # fail
- # unsupported raid
- self.assertRaises(ValueError, mdraid.get_raid_min_members, 8)
-
- ##
- ## get_raid_max_spares
- ##
- # pass
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID0, 5), 0)
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID1, 5), 3)
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID5, 5), 2)
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID6, 5), 1)
- self.assertEqual(mdraid.get_raid_max_spares(mdraid.RAID10, 5), 3)
-
- # fail
- # unsupported raid
- self.assertRaises(ValueError, mdraid.get_raid_max_spares, 8, 5)
-
- ##
- ## mdcreate
- ##
- # pass
- self.assertEqual(mdraid.mdcreate("/dev/md0", 1, [_LOOP_DEV0, _LOOP_DEV1]), None)
- # wait for raid to settle
- time.sleep(2)
-
- # fail
- self.assertRaises(mdraid.MDRaidError, mdraid.mdcreate, "/dev/md1", 1, ["/not/existing/dev0", "/not/existing/dev1"])
-
- ##
- ## mddeactivate
- ##
- # pass
- self.assertEqual(mdraid.mddeactivate("/dev/md0"), None)
-
- # fail
- self.assertRaises(mdraid.MDRaidError, mdraid.mddeactivate, "/not/existing/md")
-
- ##
- ## mdadd
- ##
- # pass
- # TODO
-
- # fail
- self.assertRaises(mdraid.MDRaidError, mdraid.mdadd, "/not/existing/device")
-
- ##
- ## mdactivate
- ##
- # pass
- self.assertEqual(mdraid.mdactivate("/dev/md0", [_LOOP_DEV0, _LOOP_DEV1], super_minor=0), None)
- # wait for raid to settle
- time.sleep(2)
-
- # fail
- self.assertRaises(mdraid.MDRaidError, mdraid.mdactivate, "/not/existing/md", super_minor=1)
- # requires super_minor or uuid
- self.assertRaises(ValueError, mdraid.mdactivate, "/dev/md1")
-
- ##
- ## mddestroy
- ##
- # pass
- # deactivate first
- self.assertEqual(mdraid.mddeactivate("/dev/md0"), None)
-
- self.assertEqual(mdraid.mddestroy(_LOOP_DEV0), None)
- self.assertEqual(mdraid.mddestroy(_LOOP_DEV1), None)
-
- # fail
- # not a component
- self.assertRaises(mdraid.MDRaidError, mdraid.mddestroy, "/dev/md0")
- self.assertRaises(mdraid.MDRaidError, mdraid.mddestroy, "/not/existing/device")
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(MDRaidTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/storage_test/devicelibs_test/mpath_test.py b/tests/storage_test/devicelibs_test/mpath_test.py
deleted file mode 100755
index 5a160d981..000000000
--- a/tests/storage_test/devicelibs_test/mpath_test.py
+++ /dev/null
@@ -1,86 +0,0 @@
-#!/usr/bin/python
-import mock
-
-class MPathTestCase(mock.TestCase):
-
- # creating devices, user_friendly_names set to yes
- output1 = """\
-create: mpathb (1ATA ST3120026AS 5M) undef ATA,ST3120026AS
-size=112G features='0' hwhandler='0' wp=undef
-`-+- policy='round-robin 0' prio=1 status=undef
- `- 2:0:0:0 sda 8:0 undef ready running
-create: mpatha (36006016092d21800703762872c60db11) undef DGC,RAID 5
-size=10G features='1 queue_if_no_path' hwhandler='1 emc' wp=undef
-`-+- policy='round-robin 0' prio=2 status=undef
- |- 6:0:0:0 sdb 8:16 undef ready running
- `- 7:0:0:0 sdc 8:32 undef ready running\
-"""
-
- # listing existing devices, user_friendly_names set to yes
- output2 = """\
-mpathb (3600a0b800067fcc9000001f34d23ff88) dm-1 IBM,1726-4xx FAStT
-size=100G features='0' hwhandler='1 rdac' wp=rw
-`-+- policy='round-robin 0' prio=-1 status=active
- |- 1:0:0:0 sda 8:0 active undef running
- `- 2:0:0:0 sdc 8:32 active undef running
-mpatha (3600a0b800067fabc000067694d23fe6e) dm-0 IBM,1726-4xx FAStT
-size=100G features='0' hwhandler='1 rdac' wp=rw
-`-+- policy='round-robin 0' prio=-1 status=active
- |- 1:0:0:1 sdb 8:16 active undef running
- `- 2:0:0:1 sdd 8:48 active undef running
-"""
-
- # creating devices, user_friendly_names set to no
- output3 = """\
-create: 3600a0b800067fabc000067694d23fe6e undef IBM,1726-4xx FAStT
-size=100G features='1 queue_if_no_path' hwhandler='1 rdac' wp=undef
-`-+- policy='round-robin 0' prio=6 status=undef
- |- 1:0:0:1 sdb 8:16 undef ready running
- `- 2:0:0:1 sdd 8:48 undef ready running
-create: 3600a0b800067fcc9000001f34d23ff88 undef IBM,1726-4xx FAStT
-size=100G features='1 queue_if_no_path' hwhandler='1 rdac' wp=undef
-`-+- policy='round-robin 0' prio=3 status=undef
- |- 1:0:0:0 sda 8:0 undef ready running
- `- 2:0:0:0 sdc 8:32 undef ready running\
-"""
-
- # listing existing devices, user_friendly_names set to no
- output4 = """\
-3600a0b800067fcc9000001f34d23ff88 dm-1 IBM,1726-4xx FAStT
-size=100G features='0' hwhandler='1 rdac' wp=rw
-`-+- policy='round-robin 0' prio=-1 status=active
- |- 1:0:0:0 sda 8:0 active undef running
- `- 2:0:0:0 sdc 8:32 active undef running
-3600a0b800067fabc000067694d23fe6e dm-0 IBM,1726-4xx FAStT
-size=100G features='0' hwhandler='1 rdac' wp=rw
-`-+- policy='round-robin 0' prio=-1 status=active
- |- 1:0:0:1 sdb 8:16 active undef running
- `- 2:0:0:1 sdd 8:48 active undef running
-"""
-
- def setUp(self):
- self.setupModules(
- ['_isys', 'logging', 'anaconda_log', 'block'])
-
- def tearDown(self):
- self.tearDownModules()
-
- def testParse(self):
- from pyanaconda.storage.devicelibs import mpath
- topology = mpath.parseMultipathOutput(self.output1)
- self.assertEqual(topology,
- {'mpatha':['sdb','sdc'], 'mpathb':['sda']})
- topology = mpath.parseMultipathOutput(self.output2)
- self.assertEqual(topology,
- {'mpathb':['sda','sdc'], 'mpatha':['sdb', 'sdd']})
- topology = mpath.parseMultipathOutput(self.output3)
- self.assertEqual(topology,
- {'3600a0b800067fabc000067694d23fe6e' : ['sdb','sdd'],
- '3600a0b800067fcc9000001f34d23ff88' : ['sda', 'sdc']})
- topology = mpath.parseMultipathOutput(self.output4)
- self.assertEqual(topology,
- {'3600a0b800067fabc000067694d23fe6e' : ['sdb','sdd'],
- '3600a0b800067fcc9000001f34d23ff88' : ['sda', 'sdc']})
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(MPathTestCase)
diff --git a/tests/storage_test/devicelibs_test/swap_test.py b/tests/storage_test/devicelibs_test/swap_test.py
deleted file mode 100755
index 3808943cc..000000000
--- a/tests/storage_test/devicelibs_test/swap_test.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#!/usr/bin/python
-import baseclass
-import unittest
-from mock import acceptance
-
-class SwapTestCase(baseclass.DevicelibsTestCase):
-
- def testSwap(self):
- _LOOP_DEV0 = self._loopMap[self._LOOP_DEVICES[0]]
- _LOOP_DEV1 = self._loopMap[self._LOOP_DEVICES[1]]
-
- import storage.devicelibs.swap as swap
-
- @acceptance
- def testSwap(self):
- ##
- ## mkswap
- ##
- # pass
- self.assertEqual(swap.mkswap(_LOOP_DEV0, "swap"), None)
-
- # fail
- self.assertRaises(swap.SwapError, swap.mkswap, "/not/existing/device")
-
- ##
- ## swapon
- ##
- # pass
- self.assertEqual(swap.swapon(_LOOP_DEV0, 1), None)
-
- # fail
- self.assertRaises(swap.SwapError, swap.swapon, "/not/existing/device")
- # not a swap partition
- self.assertRaises(swap.SwapError, swap.swapon, _LOOP_DEV1)
-
- # pass
- # make another swap
- self.assertEqual(swap.mkswap(_LOOP_DEV1, "another-swap"), None)
- self.assertEqual(swap.swapon(_LOOP_DEV1), None)
-
- ##
- ## swapstatus
- ##
- # pass
- self.assertEqual(swap.swapstatus(_LOOP_DEV0), True)
- self.assertEqual(swap.swapstatus(_LOOP_DEV1), True)
-
- # does not fail
- self.assertEqual(swap.swapstatus("/not/existing/device"), False)
-
- ##
- ## swapoff
- ##
- # pass
- self.assertEqual(swap.swapoff(_LOOP_DEV1), None)
-
- # check status
- self.assertEqual(swap.swapstatus(_LOOP_DEV0), True)
- self.assertEqual(swap.swapstatus(_LOOP_DEV1), False)
-
- self.assertEqual(swap.swapoff(_LOOP_DEV0), None)
-
- # fail
- self.assertRaises(swap.SwapError, swap.swapoff, "/not/existing/device")
- # already off
- self.assertRaises(swap.SwapError, swap.swapoff, _LOOP_DEV0)
-
-
-def suite():
- return unittest.TestLoader().loadTestsFromTestCase(SwapTestCase)
-
-
-if __name__ == "__main__":
- unittest.main()