summaryrefslogtreecommitdiffstats
path: root/tests/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_utils.py')
-rw-r--r--tests/test_utils.py580
1 files changed, 8 insertions, 572 deletions
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 0b6ccc0..c2f55b5 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
# Authors: Radostin Stoyanov <rstoyanov1@gmail.com>
#
# Copyright (C) 2017 Radostin Stoyanov
@@ -19,126 +20,16 @@
"""
Unit tests for functions defined in virtBootstrap.utils
"""
-
-from tests import unittest
-from tests import mock
-from tests import utils
-try:
- # pylint: disable=redefined-builtin
- from importlib import reload
-except ImportError:
- pass
+import unittest
+from . import utils
# pylint: disable=invalid-name
-# pylint: disable=too-many-public-methods
class TestUtils(unittest.TestCase):
"""
Ensures that functions defined in the utils module of virtBootstrap
work as expected.
"""
-
- ###################################
- # Tests for: checksum()
- ###################################
- def test_utils_checksum_return_false_on_invalid_hash(self):
- """
- Ensures that checksum() returns False if the actual and expected
- hash sum of file are not equal.
- """
- with mock.patch.multiple(utils,
- open=mock.DEFAULT,
- logger=mock.DEFAULT,
- hashlib=mock.DEFAULT) as mocked:
- path, sum_type, sum_expected = '/foo', 'sha256', 'bar'
- mocked['hashlib'].sha256.hexdigest.return_value = False
- self.assertFalse(utils.checksum(path, sum_type, sum_expected))
-
- def test_utils_checksum_return_false_if_file_could_not_be_opened(self):
- """
- Ensures that checksum() returns False if the file to be checked
- cannot be open for read.
- """
- with mock.patch.multiple(utils,
- open=mock.DEFAULT,
- logger=mock.DEFAULT,
- hashlib=mock.DEFAULT) as mocked:
- mocked['open'].side_effect = IOError()
- self.assertFalse(utils.checksum('foo', 'sha256', 'bar'))
-
- def test_utils_checksum_return_true_on_valid_hash(self):
- """
- Ensures that checksum() returns True when the actual and expected
- hash sum of file are equal.
- """
- with mock.patch.multiple(utils,
- open=mock.DEFAULT,
- logger=mock.DEFAULT,
- hashlib=mock.DEFAULT) as mocked:
- path, sum_type, sum_expected = '/foo', 'sha256', 'bar'
- mocked['hashlib'].sha256.return_value.hexdigest.return_value \
- = sum_expected
- self.assertTrue(utils.checksum(path, sum_type, sum_expected))
-
- ###################################
- # Tests for: execute()
- ###################################
- def test_utils_execute_logging_on_successful_proc_call(self):
- """
- Ensures that execute() creates log record of cmd, stdout and stderr
- when the exit code of process is 0.
- """
- with mock.patch.multiple(utils,
- logger=mock.DEFAULT,
- subprocess=mock.DEFAULT) as mocked:
- cmd = ['foo']
- output, err = 'test_out', 'test_err'
-
- mocked['subprocess'].Popen.return_value.returncode = 0
- (mocked['subprocess'].Popen.return_value
- .communicate.return_value) = (output.encode(), err.encode())
-
- utils.execute(cmd)
- mocked['logger'].debug.assert_any_call("Call command:\n%s", cmd[0])
- mocked['logger'].debug.assert_any_call("Stdout:\n%s", output)
- mocked['logger'].debug.assert_any_call("Stderr:\n%s", err)
-
- def test_utils_execute_raise_error_on_unsuccessful_proc_call(self):
- """
- Ensures that execute() raise CalledProcessError exception when the
- exit code of process is not 0.
- """
- with mock.patch('virtBootstrap.utils.subprocess.Popen') as m_popen:
- m_popen.return_value.returncode = 1
- m_popen.return_value.communicate.return_value = (b'output', b'err')
- with self.assertRaises(utils.subprocess.CalledProcessError):
- utils.execute(['foo'])
-
- ###################################
- # Tests for: safe_untar()
- ###################################
- def test_utils_safe_untar_calls_execute(self):
- """
- Ensures that safe_untar() calls execute with virt-sandbox
- command to extract source files to destination folder.
- Test for users with EUID 0 and 1000.
- """
- with mock.patch('virtBootstrap.utils.os.geteuid') as m_geteuid:
- for uid in [0, 1000]:
- m_geteuid.return_value = uid
- reload(utils)
- with mock.patch('virtBootstrap.utils.execute') as m_execute:
- src, dest = 'foo', 'bar'
- utils.safe_untar('foo', 'bar')
- cmd = ['virt-sandbox',
- '-c', utils.LIBVIRT_CONN,
- '-m', 'host-bind:/mnt=' + dest,
- '--',
- '/bin/tar', 'xf', src,
- '-C', '/mnt',
- '--exclude', 'dev/*']
- m_execute.assert_called_once_with(cmd)
-
###################################
# Tests for: bytes_to_size()
###################################
@@ -172,241 +63,6 @@ class TestUtils(unittest.TestCase):
i += 1
###################################
- # Tests for: log_layer_extract()
- ###################################
- def test_utils_log_layer_extract(self):
- """
- Ensures that log_layer_extract() updates the progress and creates
- log record with debug level.
- """
- m_progress = mock.Mock()
- layer = ['sum_type', 'sum_value', 'layer_file', 'layer_size']
- with mock.patch.multiple(utils, logger=mock.DEFAULT,
- bytes_to_size=mock.DEFAULT) as mocked:
- utils.log_layer_extract(layer, 'foo', 'bar', m_progress)
- mocked['bytes_to_size'].assert_called_once_with('layer_size')
- mocked['logger'].debug.assert_called_once()
- m_progress.assert_called_once()
-
- ###################################
- # Tests for: get_mime_type()
- ###################################
- @mock.patch('virtBootstrap.utils.subprocess.Popen')
- def test_utils_get_mime_type(self, m_popen):
- """
- Ensures that get_mime_type() returns the detected MIME type
- of /usr/bin/file.
- """
- path = "foo"
- mime = "application/x-gzip"
- stdout = ('%s: %s' % (path, mime)).encode()
- m_popen.return_value.stdout.read.return_value = stdout
- self.assertEqual(utils.get_mime_type(path), mime)
- m_popen.assert_called_once_with(
- ["/usr/bin/file", "--mime-type", path],
- stdout=utils.subprocess.PIPE
- )
-
- ###################################
- # Tests for: untar_layers()
- ###################################
- def test_utils_untar_all_layers_in_order(self):
- """
- Ensures that untar_layers() iterates through all passed layers
- in order.
- """
- layers = ['l1', 'l2', 'l3']
- layers_list = [['', '', layer] for layer in layers]
- dest_dir = '/foo'
- expected_calls = [mock.call(layer, dest_dir) for layer in layers]
- with mock.patch.multiple(utils,
- safe_untar=mock.DEFAULT,
- log_layer_extract=mock.DEFAULT) as mocked:
- utils.untar_layers(layers_list, dest_dir, mock.Mock())
- mocked['safe_untar'].assert_has_calls(expected_calls)
-
- ###################################
- # Tests for: create_qcow2()
- ###################################
- def _apply_test_to_create_qcow2(self, expected_calls, *args):
- """
- This method contains common test pattern used in the next two
- test cases.
- """
- with mock.patch.multiple(utils,
- execute=mock.DEFAULT,
- logger=mock.DEFAULT,
- get_mime_type=mock.DEFAULT) as mocked:
- mocked['get_mime_type'].return_value = 'application/x-gzip'
- utils.create_qcow2(*args)
- mocked['execute'].assert_has_calls(expected_calls)
-
- def test_utils_create_qcow2_base_layer(self):
- """
- Ensures that create_qcow2() creates base layer when
- backing_file = None.
- """
- tar_file = 'foo'
- layer_file = 'bar'
- size = '5G'
- backing_file = None
-
- expected_calls = [
- mock.call(["qemu-img", "create", "-f", "qcow2", layer_file, size]),
-
- mock.call(['virt-format',
- '--format=qcow2',
- '--partition=none',
- '--filesystem=ext3',
- '-a', layer_file]),
-
- mock.call(['guestfish',
- '-a', layer_file,
- '-m', '/dev/sda',
- 'tar-in', tar_file, '/', 'compress:gzip'])
- ]
-
- self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file,
- backing_file, size)
-
- def test_utils_create_qcow2_layer_with_backing_chain(self):
- """
- Ensures that create_qcow2() creates new layer with backing chains
- when backing_file is specified.
- """
- tar_file = 'foo'
- layer_file = 'bar'
- backing_file = 'base'
- size = '5G'
-
- expected_calls = [
- mock.call(['qemu-img', 'create',
- '-b', backing_file,
- '-f', 'qcow2',
- layer_file, size]),
-
- mock.call(['guestfish',
- '-a', layer_file,
- '-m', '/dev/sda',
- 'tar-in', tar_file, '/', 'compress:gzip'])
- ]
-
- self._apply_test_to_create_qcow2(expected_calls, tar_file, layer_file,
- backing_file, size)
-
- ###################################
- # Tests for: extract_layers_in_qcow2()
- ###################################
- def test_utils_if_all_layers_extracted_in_order_in_qcow2(self):
- """
- Ensures that extract_layers_in_qcow2() iterates through all
- layers in order.
- """
- layers = ['l1', 'l2', 'l3']
- layers_list = [['', '', layer] for layer in layers]
- dest_dir = '/foo'
-
- # Generate expected calls
- expected_calls = []
- qcow2_backing_file = None
- for index, layer in enumerate(layers):
- qcow2_layer_file = dest_dir + "/layer-%s.qcow2" % index
- expected_calls.append(
- mock.call(layer, qcow2_layer_file, qcow2_backing_file))
- qcow2_backing_file = qcow2_layer_file
-
- # Mocking out and execute
- with mock.patch.multiple(utils,
- create_qcow2=mock.DEFAULT,
- log_layer_extract=mock.DEFAULT) as mocked:
- utils.extract_layers_in_qcow2(layers_list, dest_dir, mock.Mock())
-
- # Check actual calls
- mocked['create_qcow2'].assert_has_calls(expected_calls)
-
- ###################################
- # Tests for: get_image_dir()
- ###################################
- def test_utils_getimage_dir(self):
- """
- Ensures that get_image_dir() returns path to DEFAULT_IMG_DIR
- if the no_cache argument is set to False and create it if
- does not exist.
- """
- # Perform this test for UID 0 and 1000
- for uid in [0, 1000]:
- with mock.patch('os.geteuid') as m_geteuid:
- m_geteuid.return_value = uid
- reload(utils)
- with mock.patch('os.makedirs') as m_makedirs:
- with mock.patch('os.path.exists') as m_path_exists:
- m_path_exists.return_value = False
- self.assertEqual(utils.get_image_dir(False),
- utils.DEFAULT_IMG_DIR)
- m_makedirs.assert_called_once_with(utils.DEFAULT_IMG_DIR)
-
- @mock.patch('tempfile.mkdtemp')
- def test_utils_getimage_dir_no_cache(self, m_mkdtemp):
- """
- Ensures that get_image_dir() returns temporary file path created
- by tempfile.mkdtemp.
- """
- m_mkdtemp.return_value = 'foo'
- self.assertEqual(utils.get_image_dir(True), 'foo')
- m_mkdtemp.assert_called_once()
-
- ###################################
- # Tests for: get_image_details()
- ###################################
- @mock.patch('virtBootstrap.utils.subprocess.Popen')
- def test_utils_get_image_details_raise_error_on_fail(self, m_popen):
- """
- Ensures that get_image_details() throws ValueError exception
- when stderr from skopeo is provided.
- """
- src = 'docker://foo'
- m_popen.return_value.communicate.return_value = [b'', b'Error']
- with self.assertRaises(ValueError):
- utils.get_image_details(src)
-
- @mock.patch('virtBootstrap.utils.subprocess.Popen')
- def test_utils_get_image_details_return_json_obj_on_success(self, m_popen):
- """
- Ensures that get_image_details() returns python dictionary which
- represents the data provided from stdout of skopeo when stderr
- is not present.
- """
- src = 'docker://foo'
- json_dict = {'foo': 'bar'}
- stdout = utils.json.dumps(json_dict).encode()
- m_popen.return_value.communicate.return_value = [stdout, '']
- self.assertDictEqual(utils.get_image_details(src), json_dict)
-
- def test_utils_get_image_details_all_argument_passed(self):
- """
- Ensures that get_image_details() pass all argument values to
- skopeo inspect.
- """
- src = 'docker://foo'
- raw, insecure = True, True
- username, password = 'user', 'password'
- cmd = ['skopeo', 'inspect', src,
- '--raw',
- '--tls-verify=false',
- "--creds=%s:%s" % (username, password)]
-
- with mock.patch.multiple(utils.subprocess,
- Popen=mock.DEFAULT,
- PIPE=mock.DEFAULT) as mocked:
- mocked['Popen'].return_value.communicate.return_value = [b'{}',
- b'']
- utils.get_image_details(src, raw, insecure, username, password)
-
- mocked['Popen'].assert_called_once_with(cmd,
- stdout=mocked['PIPE'],
- stderr=mocked['PIPE'])
-
- ###################################
# Tests for: is_new_layer_message()
###################################
def test_utils_is_new_layer_message(self):
@@ -459,10 +115,11 @@ class TestUtils(unittest.TestCase):
Ensures that make_async() sets O_NONBLOCK flag on PIPE.
"""
- pipe = utils.subprocess.Popen(
+ proc = utils.subprocess.Popen(
["echo"],
stdout=utils.subprocess.PIPE
- ).stdout
+ )
+ pipe = proc.stdout
fd = pipe.fileno()
F_GETFL = utils.fcntl.F_GETFL
@@ -471,36 +128,8 @@ class TestUtils(unittest.TestCase):
self.assertFalse(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
utils.make_async(fd)
self.assertTrue(utils.fcntl.fcntl(fd, F_GETFL) & O_NONBLOCK)
-
- ###################################
- # Tests for: read_async()
- ###################################
- def test_utils_read_async_successful_read(self):
- """
- Ensures that read_async() calls read() of passed file descriptor.
- """
- m_fd = mock.MagicMock()
- utils.read_async(m_fd)
- m_fd.read.assert_called_once()
-
- def test_utils_read_async_return_empty_str_on_EAGAIN_error(self):
- """
- Ensures that read_async() ignores EAGAIN errors and returns
- empty string.
- """
- m_fd = mock.MagicMock()
- m_fd.read.side_effect = IOError(utils.errno.EAGAIN, '')
- self.assertEqual(utils.read_async(m_fd), '')
-
- def test_utils_read_async_raise_errors(self):
- """
- Ensures that read_async() does not ignore IOError which is different
- than EAGAIN and throws an exception.
- """
- m_fd = mock.MagicMock()
- m_fd.read.side_effect = IOError()
- with self.assertRaises(IOError):
- utils.read_async(m_fd)
+ proc.wait()
+ pipe.close()
###################################
# Tests for: str2float()
@@ -512,196 +141,3 @@ class TestUtils(unittest.TestCase):
test_values = {'1': 1.0, 'test': None, '0': 0.0, '1.25': 1.25}
for test in test_values:
self.assertEqual(utils.str2float(test), test_values[test])
-
- ###################################
- # Tests for: set_root_password_in_rootfs()
- ###################################
- def test_utils_set_root_password_in_rootfs_restore_permissions(self):
- """
- Ensures that set_root_password_in_rootfs() restore shadow
- file permissions after edit.
- """
- permissions = 700
- rootfs_path = '/foo'
- shadow_file = '%s/etc/shadow' % rootfs_path
-
- m_open = mock.mock_open(read_data='')
- with mock.patch('virtBootstrap.utils.open', m_open, create=True):
- with mock.patch('virtBootstrap.utils.os') as m_os:
- m_os.stat.return_value = [permissions]
- m_os.path.join.return_value = shadow_file
- utils.set_root_password_in_rootfs(rootfs_path, 'password')
-
- expected_calls = [
- mock.call.path.join(rootfs_path, 'etc/shadow'),
- mock.call.stat(shadow_file),
- mock.call.chmod(shadow_file, 438),
- mock.call.chmod(shadow_file, permissions)
- ]
- m_os.assert_has_calls(expected_calls)
-
- def test_utils_set_root_password_in_rootfs_restore_permissions_fail(self):
- """
- Ensures that set_root_password_in_rootfs() restore shadow file
- permissions in case of failure.
- """
- permissions = 700
- rootfs_path = '/foo'
- shadow_file = '%s/etc/shadow' % rootfs_path
-
- m_open = mock.mock_open(read_data='')
- with mock.patch('virtBootstrap.utils.open', m_open, create=True):
- with mock.patch('virtBootstrap.utils.os') as m_os:
- m_os.stat.return_value = [permissions]
- m_os.path.join.return_value = shadow_file
-
- with self.assertRaises(Exception):
- m_open.side_effect = Exception
- utils.set_root_password_in_rootfs(rootfs_path, 'password')
-
- expected_calls = [
- mock.call.path.join(rootfs_path, 'etc/shadow'),
- mock.call.stat(shadow_file),
- mock.call.chmod(shadow_file, 438),
- mock.call.chmod(shadow_file, permissions)
- ]
- m_os.assert_has_calls(expected_calls)
-
- def test_utils_set_root_password_in_rootfs_store_hash(self):
- """
- Ensures that set_root_password_in_rootfs() stores the hashed
- root password in shadow file.
- """
- rootfs_path = '/foo'
- password = 'secret'
- initial_value = '!locked'
- hashed_password = 'hashed_password'
- shadow_content = '\n'.join([
- "root:%s::0:99999:7:::",
- "bin:*:17004:0:99999:7:::"
- "daemon:*:17004:0:99999:7:::",
- "adm:*:17004:0:99999:7:::"
- ])
-
- m_open = mock.mock_open(read_data=shadow_content % initial_value)
- with mock.patch('virtBootstrap.utils.open', m_open, create=True):
- with mock.patch('virtBootstrap.utils.os'):
- with mock.patch('passlib.hosts.linux_context.hash') as m_hash:
- m_hash.return_value = hashed_password
- utils.set_root_password_in_rootfs(rootfs_path, password)
-
- m_hash.assert_called_once_with(password)
- m_open().write.assert_called_once_with(shadow_content
- % hashed_password)
-
- ###################################
- # Tests for: set_root_password_in_image()
- ###################################
- @mock.patch('virtBootstrap.utils.execute')
- def test_utils_set_root_password_in_image(self, m_execute):
- """
- Ensures that set_root_password_in_image() calls virt-edit
- with correct arguments.
- """
- image, password = 'foo', 'password'
- password_hash = ('$6$rounds=656000$PaQ/H4c/k8Ix9YOM$'
- 'cyD47r9PtAE2LhnkpdbVzsiQbM0/h2S/1Bv'
- 'u/sXqUtCg.3Ijp7TQy/8tEVstxMy5k5v4mh'
- 'CGFqnVv7S6wd.Ah/')
-
- expected_call = [
- 'virt-edit',
- '-a', image, '/etc/shadow',
- '-e', 's,^root:.*?:,root:%s:,' % utils.re.escape(password_hash)]
-
- hash_function = 'virtBootstrap.utils.passlib.hosts.linux_context.hash'
- with mock.patch(hash_function) as m_hash:
- m_hash.return_value = password_hash
- utils.set_root_password_in_image(image, password)
-
- m_execute.assert_called_once_with(expected_call)
-
- ###################################
- # Tests for: set_root_password()
- ###################################
- @mock.patch('virtBootstrap.utils.set_root_password_in_rootfs')
- def test_utils_set_root_password_dir(self, m_set_root_password_in_rootfs):
- """
- Ensures that set_root_password() calls set_root_password_in_rootfs()
- when the format is set to "dir".
- """
- fmt, dest, root_password = 'dir', 'dest', 'root_password'
- utils.set_root_password(fmt, dest, root_password)
-
- m_set_root_password_in_rootfs.assert_called_once_with(
- dest, root_password
- )
-
- @mock.patch('virtBootstrap.utils.set_root_password_in_image')
- def test_utils_set_root_password_qcow2(self, m_set_root_password_in_image):
- """
- Ensures that set_root_password() calls set_root_password_in_image()
- when the format is set to "qcow2" with the path to the last
- extracted layer.
- """
- fmt, dest, root_password = 'qcow2', 'dest', 'root_password'
- layers = ['layer-0.qcow2', 'layer-1.qcow2']
-
- with mock.patch('os.listdir') as m_listdir:
- m_listdir.return_value = layers
- utils.set_root_password(fmt, dest, root_password)
-
- m_set_root_password_in_image.assert_called_once_with(
- utils.os.path.join(dest, max(layers)),
- root_password
- )
-
- ###################################
- # Tests for: write_progress()
- ###################################
- def test_utils_write_progress_fill_terminal_width(self):
- """
- Ensures that write_progress() outputs a message with length
- equal to terminal width and last symbol '\r'.
- """
- terminal_width = 120
- prog = {'status': 'status', 'value': 0}
- with mock.patch.multiple(utils,
- subprocess=mock.DEFAULT,
- sys=mock.DEFAULT) as mocked:
-
- (mocked['subprocess'].Popen.return_value.stdout
- .read.return_value) = ("20 %s" % terminal_width).encode()
-
- utils.write_progress(prog)
-
- mocked['subprocess'].Popen.assert_called_once_with(
- ["stty", "size"],
- stdout=mocked['subprocess'].PIPE
- )
- output_message = mocked['sys'].stdout.write.call_args[0][0]
- mocked['sys'].stdout.write.assert_called_once()
- self.assertEqual(len(output_message), terminal_width + 1)
- self.assertEqual(output_message[-1], '\r')
-
- def test_utils_write_progress_use_default_term_width_on_failure(self):
- """
- Ensures that write_progress() outputs a message with length equal
- to default terminal width (80) when the detecting terminal width
- has failed.
- """
- default_terminal_width = 80
- prog = {'status': 'status', 'value': 0}
- with mock.patch.multiple(utils,
- subprocess=mock.DEFAULT,
- sys=mock.DEFAULT) as mocked:
- mocked['subprocess'].Popen.side_effect = Exception()
- utils.write_progress(prog)
-
- self.assertEqual(len(mocked['sys'].stdout.write.call_args[0][0]),
- default_terminal_width + 1)
- mocked['sys'].stdout.write.assert_called_once()
-
-
-if __name__ == '__main__':
- unittest.main(exit=False)