diff options
author | Michal Minar <miminar@redhat.com> | 2013-11-20 10:07:41 +0100 |
---|---|---|
committer | Michal Minar <miminar@redhat.com> | 2013-11-29 09:26:56 +0100 |
commit | 85b821bbabf16b4f96a636ae19817941445c37b0 (patch) | |
tree | ce5b6f373dac939869782cc49c65d6b9c8750255 /src/software/test | |
parent | 35847aa0b11566154f0db5e2ef3673bccb138211 (diff) | |
download | openlmi-providers-85b821bbabf16b4f96a636ae19817941445c37b0.tar.gz openlmi-providers-85b821bbabf16b4f96a636ae19817941445c37b0.tar.xz openlmi-providers-85b821bbabf16b4f96a636ae19817941445c37b0.zip |
software: rewritten tests
Software tests now create custom testing repositories and packages.
This makes them more robust and efficient (nothing needs to be
downloaded).
Diffstat (limited to 'src/software/test')
19 files changed, 3404 insertions, 1740 deletions
diff --git a/src/software/test/README b/src/software/test/README index cb1acce..4a059bd 100644 --- a/src/software/test/README +++ b/src/software/test/README @@ -2,38 +2,47 @@ Testing ======= Tests must be run as root on machine with cimom broker. There are number of environment variables, that affect test running: - LMI_CIMOM_USERNAME - LMI_CIMOM_PASSWORD - LMI_CIMOM_BROKER - LMI_CIMOM_URL - LMI_RUN_DANGEROUS - valid values: 0, 1 - LMI_SOFTWARE_USE_CACHE - valid values: 0, 1 - - says, whether to use cache to download rpm packages, which will - be used to speed up testing - LMI_SOFTWARE_CACHE_DIR - - directory, where to store rpm packages and other information - - if not set and LMI_SOFTWARE_USE_CACHE == '1', a temporary - directory will be created and deleted, when tests are - complete + ``LMI_CIMOM_USERNAME`` + ``LMI_CIMOM_PASSWORD`` + ``LMI_CIMOM_BROKER`` + ``LMI_CIMOM_URL`` + ``LMI_RUN_DANGEROUS`` - valid values: 0, 1 + ``LMI_RUN_TEDIOUS`` - valid values: 0, 1 + ``LMI_SOFTWARE_DB_CACHE`` + Where to store the file with information about generated testing + database. If not set, temporary file will be created and whole cache + will be removed after tests are complete. If set and the file does not + yet exist its created and user may choose if he wants to preserve + generated cache for another run (with ``LMI_SOFTWARE_CLEANUP_CACHE``). + If set and the file exists, database is reused, which speeds up the + intialization. + ``LMI_SOFTWARE_CLEANUP_CACHE`` - valid values: 0, 1 + Whether to preserve generated reposities and database file (cache) for + next runs to speed them up. Defaults to False. + ``LMI_SOFTWARE_DB_CACHE`` must be set for this to have any effect. + +Dependencies +------------ + * @development-tools (rpmbuild, createrepo, etc.) Preparation ----------- -Tests are installed together with source in single python egg. They are -accessible from ``lmi.test.software`` package. +``lmi.test`` module must be on python path. The easiest way to get it there is +to export ``PYTHONPATH`` environment variable like this: :: + + export PYTHONPATH="${PATH_TO_GIT_ROOT}/src/python" Running tests ------------- -They can be run with a script ``lmi-test-software``. - Each test module can be run as a stand-alone script or with run.py script: :: - ./run.py -c --cache-dir=/var/tmp + ./run.py To pass any arguments to underlying ``unittest.main()`` function, append them after "--" swith like this: :: - ./run.py -c --cache-dir=/var/tmp -- -v TestSoftwarePackage + ./run.py -- -v TestSoftwarePackage All environment variables defined above can be overriden by command-line -arguments. +arguments. See help message of ``run.py`` script. diff --git a/src/software/test/base.py b/src/software/test/base.py deleted file mode 100644 index 6b6f7f9..0000000 --- a/src/software/test/base.py +++ /dev/null @@ -1,191 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Michal Minar <miminar@redhat.com> -# -""" -Common utilities and base class for all software tests. -""" - -import itertools -import os -import tempfile -from subprocess import check_output - -from lmi.test.lmibase import LmiTestCase -import repository -import rpmcache - -def get_pkg_files(pkg): - """ - Tries to make a the heterogenous and smallest set of test files from - package. - @param pkg must be installed package - @return list of few files installed by pkg - """ - cmd = ['rpm', '-ql', pkg.name] - output = check_output(cmd) - configs = set() - docs = set() - dirs = set() - files = set() - symlinks = set() - for fpath in output.splitlines(): #pylint: disable=E1103 - if ( len(dirs) == 0 - and not os.path.islink(fpath) - and os.path.isdir(fpath)): - dirs.add(fpath) - elif len(symlinks) == 0 and os.path.islink(fpath): - symlinks.add(fpath) - elif not os.path.islink(fpath) and os.path.isfile(fpath): - if len(configs) == 0 and rpmcache.has_pkg_config_file(pkg, fpath): - configs.add(fpath) - elif len(docs) == 0 and rpmcache.has_pkg_doc_file(pkg, fpath): - docs.add(fpath) - elif len(files) == 0: - files.add(fpath) - out = list(configs) + list(docs) + list(dirs) + list(symlinks) - if len(files) > 0 and len(docs) == 0 and len(symlinks) == 0: - out += list(files) - return out - -class SoftwareBaseTestCase(LmiTestCase): #pylint: disable=R0904 - """ - Base class for all LMI Software test classes. - """ - - CLASS_NAME = "Define in subclass" - KEYS = tuple() - - # will be filled when first needed - # it's a dictionary with items (pkg_name, [file_path1, ...]) - PKGDB_FILES = None - REPODB = None - - @classmethod - def get_pkgdb_files(cls): - """ - @return dictionary { pkg_name: ["file_path1, ...] } - """ - if cls.PKGDB_FILES is not None: - return cls.PKGDB_FILES - SoftwareBaseTestCase.PKGDB_FILES = res = dict( - (pkg.name, get_pkg_files(pkg)) for pkg in itertools.chain( - cls.safe_pkgs, cls.dangerous_pkgs)) - return res - - @classmethod - def get_repodb(cls): - """ - @return list of Repository instances - """ - if cls.REPODB is not None: - return cls.REPODB - SoftwareBaseTestCase.REPODB = res = repository.get_repo_database() - return res - - @classmethod - def needs_pkgdb(cls): - """subclass may override this, if it does not need PKGDB database""" - return True - - @classmethod - def needs_pkgdb_files(cls): - """subclass may override this, if it needs PKGDB_FILES database""" - return False - - @classmethod - def needs_repodb(cls): - """subclass may override this, if it needs REPODB database""" - return False - - def __init__(self, *args, **kwargs): - LmiTestCase.__init__(self, *args, **kwargs) - self.longMessage = True #pylint: disable=C0103 - - def install_pkg(self, pkg, newer=True, repolist=None): - """ - Use this method instead of function in rpmcache in tests. - """ - if repolist is None: - repolist = self.test_repos - return rpmcache.install_pkg(pkg, newer, repolist) - - def ensure_pkg_installed(self, pkg, newer=True, repolist=None): - """ - Use this method instead of function in rpmcache in tests. - """ - if repolist is None: - repolist = self.test_repos - return rpmcache.ensure_pkg_installed(pkg, newer, repolist) - - @classmethod - def setUpClass(cls): #pylint: disable=C0103 - LmiTestCase.setUpClass.im_func(cls) - - # TODO: make dangerous tests work reliably - # this is just a temporary solution - cls.run_dangerous = False - - cls.test_repos = os.environ.get( - 'LMI_SOFTWARE_TEST_REPOS', '').split(',') - use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1' - cls.cache_dir = None - if use_cache: - cls.cache_dir = os.environ.get('LMI_SOFTWARE_CACHE_DIR', None) - if cls.cache_dir is None: - cls.cache_dir = tempfile.mkdtemp(suffix="software_database") - if cls.cache_dir: - cls.prev_dir = os.getcwd() - if not os.path.exists(cls.cache_dir): - os.makedirs(cls.cache_dir) - # rpm packages are expected to be in CWD - os.chdir(cls.cache_dir) - if not hasattr(cls, 'safe_pkgs') or not hasattr(cls, 'dangerous_pkgs'): - if cls.needs_pkgdb(): - safe, dangerous = rpmcache.get_pkg_database( - use_cache=use_cache, - dangerous=cls.run_dangerous, - repolist=cls.test_repos, - cache_dir=cls.cache_dir if use_cache else None) - SoftwareBaseTestCase.safe_pkgs = safe - SoftwareBaseTestCase.dangerous_pkgs = dangerous - else: - cls.safe_pkgs = [] - cls.dangerous_pkgs = [] - if cls.needs_pkgdb_files() and not hasattr(cls, 'pkgdb_files'): - for pkg in cls.dangerous_pkgs: - if not rpmcache.is_pkg_installed(pkg.name): - rpmcache.install_pkg(pkg, repolist=cls.test_repos) - SoftwareBaseTestCase.pkgdb_files = cls.get_pkgdb_files() - if cls.needs_repodb() and not hasattr(cls, 'repodb'): - SoftwareBaseTestCase.repodb = cls.get_repodb() - - @classmethod - def tearDownClass(cls): #pylint: disable=C0103 - if hasattr(cls, "repodb") and cls.repodb: - # set the enabled states to original values - for repo in cls.repodb: - if repository.is_repo_enabled(repo) != repo.status: - repository.set_repo_enabled(repo, repo.status) - if cls.run_dangerous: - for pkg in cls.dangerous_pkgs: - if rpmcache.is_pkg_installed(pkg.name): - rpmcache.remove_pkg(pkg.name) - if hasattr(cls, "prev_dir"): - os.chdir(cls.prev_dir) - diff --git a/src/software/test/package.py b/src/software/test/package.py index d7b2288..500f8b5 100644 --- a/src/software/test/package.py +++ b/src/software/test/package.py @@ -21,111 +21,191 @@ Abstraction for RPM package for test purposes. """ -import json +import subprocess import util -class Package(object): #pylint: disable=R0902 +class Package(object): """ Element of test package database. It's a container for package - informations. It contains two sets of versions for single package. - That's meant for updating tests. + informations. """ - def __init__(self, name, epoch, ver, rel, arch, repo, - **kwargs): - """ - Arguments prefixed with 'up_' are for newer package. - """ + def __init__(self, name, epoch, ver, rel, arch, repoid, + rpm_path, files=None): self._name = name - if not epoch or epoch.lower() == "(none)": + if not epoch or (isinstance(epoch, basestring) + and epoch.lower() == "(none)"): epoch = "0" - self._epoch = epoch + self._epoch = int(epoch) self._ver = ver self._rel = rel self._arch = arch - self._repo = repo - safe = kwargs.get('safe', False) - self._up_epoch = epoch if safe else kwargs.get('up_epoch', epoch) - self._up_ver = ver if safe else kwargs.get('up_ver' , ver) - self._up_rel = rel if safe else kwargs.get('up_rel' , rel) - self._up_repo = repo if safe else kwargs.get('up_repo', repo) + self._repoid = repoid + self._rpm_path = rpm_path + if files is None: + files = set() + else: + files = set(files) + self._files = files def __str__(self): return self.get_nevra() @property - def name(self): return self._name #pylint: disable=C0111,C0321 - @property - def epoch(self): return self._epoch #pylint: disable=C0111,C0321 - @property - def ver(self): return self._ver #pylint: disable=C0111,C0321 + def name(self): + """ :returns: Package name. """ + return self._name @property - def rel(self): return self._rel #pylint: disable=C0111,C0321 + def epoch(self): + """ :returns: Package epoch as an integer. """ + return self._epoch @property - def arch(self): return self._arch #pylint: disable=C0111,C0321 + def ver(self): + """ :returns: Version string of package. """ + return self._ver @property - def repo(self): return self._repo #pylint: disable=C0111,C0321 + def rel(self): + """ :returns: Release string of package. """ + return self._rel @property - def nevra(self): #pylint: disable=C0111,C0321 - return self.get_nevra(False) + def arch(self): + """ :returns: Architecture string of package. """ + return self._arch @property - def evra(self): #pylint: disable=C0111,C0321 - return self.get_evra(False) - - @property - def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321 - @property - def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321 + def repoid(self): + """ :returns: Repository id of package, where it is available. """ + return self._repoid @property - def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321 - @property - def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321 + def nevra(self): + """ + :returns: Nevra string of package with epoch part always present. + """ + return self.get_nevra('ALWAYS') @property - def up_nevra(self): #pylint: disable=C0111,C0321 - return self.get_nevra(True) + def evra(self): + """ + :returns: Evra string of package. That's the same as *nevra* without a + name. + """ + attrs = ('epoch', 'ver', 'rel', 'arch') + return util.make_evra(*[getattr(self, '_'+a) for a in attrs]) @property - def up_evra(self): #pylint: disable=C0111,C0321 - return self.get_evra(True) + def rpm_path(self): + """ + :returns: Absolute path to rpm package. + """ + return self._rpm_path @property - def is_safe(self): + def summary(self): """ - @return True if properties prefixed with up_ matches those without - it. In that case a package is suited for non-dangerous tests. + :returns: Package summary string. """ - return all( getattr(self, a) == getattr(self, 'up_' + a) - for a in ('epoch', 'ver', 'rel', 'repo')) + return subprocess.check_output( + ['/usr/bin/rpm', '-q', '--qf', '%{SUMMARY}', '-p', + self.rpm_path]) - def get_nevra(self, newer=True, with_epoch='NOT_ZERO'): + def get_nevra(self, with_epoch='NOT_ZERO'): """ - @newer if True, evr part is made from properties prefixed with 'up_' - @return pkg nevra string + :param string with_epoch: Says when the epoch part should appear + in resulting string. There are following possible values: + + * NOT_ZERO - epoch shall be present if it's greater than 0 + * ALWAYS - epoch will be present + * NEVER - epoch won't be present + + :returns: Package nevra string. + :rtype: string """ - if newer: - attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch'] - else: - attrs = ['name', 'epoch', 'ver', 'rel', 'arch'] + attrs = ('name', 'epoch', 'ver', 'rel', 'arch') return util.make_nevra(*[getattr(self, '_'+a) for a in attrs], with_epoch=with_epoch) - def get_evra(self, newer=True): + def __contains__(self, path): + return path in self._files + def __iter__(self): + for pkg in self._files.__iter__(): + yield pkg + def __len__(self): + return len(self._files) + @property + def files(self): """ - @newer if True, evr part is made from properties prefixed with 'up_' - @return pkg nevra string + :returns: Set of files and directories installed by this package. + :rtype: set """ - attrs = [('up_' if newer else '')+a for a in ('epoch', 'ver', 'rel')] - attrs.append('arch') - return util.make_evra(*[getattr(self, '_'+a) for a in attrs]) + return self._files.copy() -class PackageEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, Package): - return obj.__dict__ - return json.JSONEncoder.default(self, obj) +def to_json(_encoder, pkg): + """ + Converts package object to dictionary which json encoder can handle. + + :param _encoder: Instance of json encoder. + :param pkg: Package object to convert. + :type pkg: :py:class:`Package` + :returns: Dictionary with package attributes. + :rtype: dictionary + """ + if not isinstance(pkg, Package): + raise TypeError("not a Package object") + return pkg.__dict__ def from_json(json_object): - if '_arch' in json_object: + """ + Constructs a package object from dictionary loaded from json text. + Inverse function to :py:func:`to_json`. + + :param dictionary json_object: Deserialized package as a dictionary. + :returns: Package object. + :rtype: :py:class:`Package` + """ + if isinstance(json_object, dict) and '_arch' in json_object: kwargs = {k[1:]: v for k, v in json_object.items()} return Package(**kwargs) return json_object +def is_pkg_installed(pkg): + """ + Check, whether package is installed. + """ + if not isinstance(pkg, Package): + return subprocess.call(["rpm", "--quiet", "-q", pkg]) == 0 + else: + cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", pkg.get_nevra( + with_epoch='NEVER') ] + try: + out = subprocess.check_output(cmd).splitlines()[0] + epoch, _ = out.split(':') + if not epoch or epoch.lower() == "(none)": + epoch = "0" + return int(epoch) == int(pkg.epoch) + except subprocess.CalledProcessError: + return False + +def remove_pkg(pkg, *args): + """ + Remove package with rpm command. + + :param pkg: Either an instance of :py:class:`Package` or package name. + If it's a name, any version will be removed. Otherwise the exact + version must be installed for command to be successful. + :param list args: List of parameters for rpm command. + """ + cmd = ["rpm", "--quiet"] + list(args) + if isinstance(pkg, Package): + cmd.extend(["-e", pkg.get_nevra()]) + else: + cmd.extend(["-e", pkg]) + subprocess.call(cmd) + +def install_pkg(pkg): + """ + Install a specific package. + + :param pkg: Package object. + :type pkg: :py:class:`Package` + """ + if not isinstance(pkg, Package): + raise TypeError("pkg must be a Package instance") + subprocess.call(["rpm", "--quiet", "-i", pkg.rpm_path]) + diff --git a/src/software/test/reposetup.py b/src/software/test/reposetup.py new file mode 100644 index 0000000..93f4429 --- /dev/null +++ b/src/software/test/reposetup.py @@ -0,0 +1,658 @@ +#!/usr/bin/env python +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Module defining and creating testing packages and repositories for +OpenLMI Software provider. +""" + +import collections +import datetime +import functools +import json +import os +import shutil +import stat +import subprocess +import tempfile +import time +import yum + +import package +import repository +import util + +REGULAR, DOC, CONFIG, LINK, CHARDEV, BLOCKDEV, FIFO = [ + 2**i for i in range(7) ] + +#: Holds variables for yum config. +#: * ``releasever`` - version of distribution ('18' for *Fedora 18*). +#: * ``disttag`` - dist tag of distribution. This is a part of release string +#: of packages build for particular distribution +#: ('.fc18' for *Fedora 18*). +#: * ``basearch`` - target architecture ('i686', 'x86_64', etc.). +#: * ``arch`` - system architecture ('ia32e') +YumVars = collections.namedtuple('YumVars', + ('releasever', 'disttag', 'basearch', 'arch')) + +#: Dictionary with shortned package names as keys with assigned contents. +#: Content represented with recursive dictionaries where dictionaries +#: represent folders and tuples files, pipes, symlinks, etc. +#: Directories whose names end with '/' will be owned. +PKG_FILE_ENTRIES = { + 'pkg1' : { + 'usr/share/openlmi-sw-test-pkg1/' : { + 'README' : (REGULAR | DOC, 0644, 'This is a test pkg 1\n'), + 'Changelog' : (REGULAR | DOC, 0644, 'Nothing new\n'), + 'data/' : { 'file.txt' : (REGULAR, 0644, 'content\n') } + }, + 'etc/openlmi/software/test/' : { + 'dummy_config.cfg' : + (REGULAR | CONFIG, 0644, '# This is a dummy cfg file\n'), + } + }, + + 'pkg2' : { + 'usr/share/openlmi-sw-test-pkg2/' : { + 'README' : (REGULAR | DOC, 0644, 'This is a test pkg 2\n'), + 'Changelog' : (REGULAR | DOC, 0644, 'Nothing new\n'), + 'data/' : { + 'target.txt' : (REGULAR, 0644, 'Target of symlinks\n') + }, + 'symlinks/' : { + 'relative' : (LINK, 0644, '../data/target.txt'), + 'absolute' : (LINK, 0644, + 'usr/share/openlmi-sw-text-pkg1/data/target.txt') + } + }, + }, + + 'pkg3' : { + 'usr/share/openlmi-sw-test-pkg3/' : { + 'README' : (REGULAR | DOC, 0644, 'This is a test pkg 3\n'), + 'Changelog' : (REGULAR | DOC, 0644, 'Nothing new\n'), + 'fifos/' : { + 'pipe' : (FIFO, 0644, None) + }, + 'devs/' : { + 'char12' : (CHARDEV, 0644, (1, 2)), + 'char34' : (CHARDEV, 0644, (3, 4)), + 'block56' : (BLOCKDEV, 0644, (5, 6)), + 'block78' : (BLOCKDEV, 0644, (7, 8)) + } + } + }, + + 'pkg4' : { + 'usr/bin' : { + 'openlmi-sw-test-script' : + (REGULAR, 0755, '#!/bin/sh\necho "OpenLMI rules!"') + }, + 'usr/share/openlmi-sw-test-pkg4/' : { + 'README' : (REGULAR | DOC, 0644, 'This is a test pkg 4\n'), + 'Changelog' : (REGULAR | DOC, 0644, 'Nothing new\n'), + 'perms/' : { # this will be given 777 perms with the sticky bit + 'rwxrwxrwx' : (REGULAR, 0777, ''), + 'rw-rw-rw-' : (REGULAR, 0666, ''), + 'r--r--r--' : (REGULAR, 0444, ''), + 'r-xr-xr-x' : (REGULAR, 0555, ''), + 'r---w---x' : (REGULAR, 0421, ''), + 'rwSr--r--' : (REGULAR, 04644, ''), + 'rw-r-Sr--' : (REGULAR, 02644, ''), + 'rw-r--r-T' : (REGULAR, 01644, ''), + 'rwsr-sr-t' : (REGULAR, 07755, ''), + } + }, + } +} + +#: Repositories with assigned packages. +REPOS = { + 'openlmi-sw-test-repo-stable' : { + # grows by revision + 'openlmi-sw-test-pkg1-0:1.2.3-1%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg1'] + }, + # grows by epoch + 'openlmi-sw-test-pkg2-1:1.2.3-1%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg2'] + }, + # grows by version + 'openlmi-sw-test-pkg3-0:1.2.1-3%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg3'] + }, + # no update available + 'openlmi-sw-test-pkg4-1:1.2.3-1%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg4'] + } + }, + + 'openlmi-sw-test-repo-updates' : { + 'openlmi-sw-test-pkg1-0:1.2.3-2%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg1'] + }, + 'openlmi-sw-test-pkg2-2:1.2.2-1%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg2'] + }, + 'openlmi-sw-test-pkg3-0:1.2.2-2%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg3'] + }, + # exactly the same version as in repo stable + 'openlmi-sw-test-pkg4-1:1.2.3-1%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg4'] + } + }, + + 'openlmi-sw-test-repo-updates-testing' : { + 'openlmi-sw-test-pkg1-0:1.2.3-3%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg1'] + }, + 'openlmi-sw-test-pkg2-3:1.2.0-1%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg2'] + }, + }, + + 'openlmi-sw-test-repo-misc' : { + # this package conflicts with pkg1 (explicitely) + 'openlmi-sw-test-conflict1-0:1.2.3-1%(disttag)s.noarch' : { + 'conflicts' : ['openlmi-sw-test-pkg1'] + }, + # this package conflicts with pkg2 (by files) + 'openlmi-sw-test-conflict2-0:1.2.3-1%(disttag)s.noarch' : { + 'files' : PKG_FILE_ENTRIES['pkg1'] + }, + # this package depends on few packages above + 'openlmi-sw-test-depend1-0:1.2.3-1%(disttag)s.noarch' : { + 'requires' : ['openlmi-sw-test-pkg1', 'openlmi-sw-test-pkg2'] + }, + # same as previous one, with version requirements + 'openlmi-sw-test-depend2-0:1.2.3-1%(disttag)s.noarch' : { + 'requires' : [ + 'openlmi-sw-test-pkg1 = 1.2.3-1%(disttag)s', + 'openlmi-sw-test-pkg2 = 2:1.2.2-1%(disttag)s'] + }, + # same package as pkg2 except for arch and shared directory + 'openlmi-sw-test-pkg1-0:1.2.3-1%(disttag)s.%(basearch)s' : { + 'files' : { + 'usr/share/openlmi-sw-test-pkg2-arch/' : + PKG_FILE_ENTRIES['pkg2'].values()[0] + } + }, + # funny (but valid) version string + 'openlmi-sw-test-funny-version-0:.1bA.+~-1%(disttag)s.noarch' : { + }, + # funny (but valid) release string + 'openlmi-sw-test-funny-release-0:1.2.3-+.rel%(disttag)s.noarch' : { + }, + }, +} + +SPEC_TEMPLATE = """\ +Name: %(pkg_name)s +Epoch: %(epoch)s +Version: %(version)s +Release: %(release)s +License: LGPLv2+ +Source0: %(pkg_name)s-%(epoch)s:%(version)s-%(release)s.%(pkg_arch)s.tar.gz +%(arch_string)s +Requires: openlmi-software +%(requires_string)s +Summary: This is a test package %(pkg_name)s + +%%description +Test package for OpenLMI Software provider named %(pkg_name)s. + +%%prep +%%setup -q + +%%build +# no build necessary + +%%install +%(install_string)s + +%%files +%(files_string)s + +%%changelog +* Mon Nov 04 2013 Michal Minar <miminar@redhat.com> 0.4.0-2 +- initial commit +""" + +REPO_CONFIG_TEMPLATE = """\ +[%(repoid)s] +name=%(name)s +baseurl=file://%(repos_dir)s/%(repoid)s +gpgcheck=0 +enabled=1 +""" + +def full_repo_name(repoid): + """ + Convenient function returning full repository name in case it's in + shortened form. Full repository name starts with ``openlmi-sw-test-repo-``. + + :param string repoid: Repository id either in shortened or full form. + Must be present in ``REPOS``. + :returns: Full repository name. + :rtype: string + """ + if not repoid in REPOS: + if not repoid.startswith('openlmi-sw-test-repo-'): + repoid = 'openlmi-sw-test-repo-' + repoid + if not repoid in REPOS: + raise ValueError('invalid repoid: %s' % repoid) + return repoid + +def full_pkg_name(pkg_name): + """ + Convenient function returning full package name in case it's in + shortened form. Full package name starts with ``openlmi-sw-test-``. + + :param string pkg_name: Package name either in shortened or full form. + Must be present in one of testing repositories. + :returns: full_pkg_name + :rtype: string + """ + if not pkg_name.startswith('openlmi-sw-test-'): + pkg_name = 'openlmi-sw-test-' + pkg_name + return pkg_name + +def _unwind_files(path, files_dict): + """ + Turns package files dictionary into a list of installed files. + Called recursively until all entries from *files_dict* are handled. + + :param string path: Path where files are nested. If called the first time, + it should be ``'/'``. + :param dictionary files_dict: One of files dictionary from + ``PKG_FILE_ENTRIES`` or one with the same format. + :returns: List of absolute file paths of all package entries. + :rtype: list + """ + result = [] + for name, entry in files_dict.items(): + full_path = os.path.join(path, name) + if isinstance(entry, dict): + if name.endswith('/'): + result.append(full_path[:-1]) + result.extend(_unwind_files(full_path, entry)) + else: + result.append(full_path) + return result + +def _make_package_object(repo, pkg_nevra, rpm_path, pkg_dict): + """ + Create package object with supplied informations. RPM package needs + to be created first. + + :param repo: Instance of repository. + :param string pkg_nevra: NEVRA of package to create. + :param string rpm_path: Absolute path to rpm file. + :param dictionary pkg_dict: One of ``REPOS`` values. + :returns: Package object. + :rtype: :py:class:`package.Package` + """ + match = util.RE_NEVRA.match(pkg_nevra) + pkg = package.Package(match.group('name'), + epoch=match.group('epoch'), + ver=match.group('ver'), + rel=match.group('rel'), + arch=match.group('arch'), + repoid=repo.repoid, + rpm_path=rpm_path, + files=_unwind_files('/', pkg_dict.get('files', {}))) + return pkg + +def _write_package_directory(path, entries=None): + """ + Write contents of package to some directory. This content may then be + archived into a source tarball. + + :param string path: Path to directory where the package contents shall + be written. + :param dictionary entries: File dictionary. + """ + if entries is None: + entries = {} + os.makedirs(path) + if path.endswith('perms/'): + os.chmod(path, 0777 | stat.S_ISVTX) + for name, entry in entries.items(): + full_path = os.path.join(path, name) + if isinstance(entry, dict): + _write_package_directory(full_path, entry) + elif isinstance(entry, tuple): + (ftype, perms, content) = entry + if ftype & REGULAR: + with open(full_path, 'w') as outf: + outf.write(content) + elif ftype & LINK: + os.symlink(content, full_path) + elif ftype & (CHARDEV | BLOCKDEV): + mode = ( (stat.S_IFCHR if ftype & CHARDEV else stat.S_IFBLK) + | perms) + os.mknod(full_path, mode, os.makedev(content[0], content[1])) + elif ftype & FIFO: + os.mkfifo(full_path, perms) + else: + raise ValueError('file type 0%o unknown for file "%s"' + % (ftype, full_path)) + if not ftype & (CHARDEV | BLOCKDEV | FIFO | LINK): + os.chmod(full_path, perms) + else: + raise TypeError('enexpected entry in package dictionary: "%s"' + % repr(entry)) + +def _make_source_tarball(rpmbuild_dir, pkg_nevra, pkg_dict): + """ + Create a source tarball for rpm. + + :param string rpmbuild_dir: Path to directory where ``rpmbuild`` shall + operate. + :param string pkg_nevra: NEVRA string of package to build. + :param dictionary pkg_dict: One of values of ``REPOS`` dictionary. + :returns: Absolute file path to created tarball. + :rtype: string + """ + sources_dir = os.path.join(rpmbuild_dir, 'SOURCES') + if not os.path.exists(sources_dir): + os.mkdir(sources_dir) + match = util.RE_NEVRA.match(pkg_nevra) + tarball_path = os.path.join(sources_dir, pkg_nevra + '.tar.gz') + archive_dir = tempfile.mkdtemp() + try: + src_dir = os.path.join(archive_dir, + '%s-%s' % (match.group('name'), match.group('ver'))) + _write_package_directory(src_dir, pkg_dict.get('files', {})) + subprocess.call(['/usr/bin/tar', '-C', archive_dir, + '-czf', tarball_path, os.path.basename(src_dir)]) + return tarball_path + finally: + shutil.rmtree(archive_dir) + +def _make_spec_files_string(path, entries): + """ + Make a string used in spec file template to define a list of files + belonging to package. Called recursively. + + :param string path: Absolute path to an entry to be added. + :param dictionary entries: Dictionary representing directory content to + include in resulting list. + :returns: List of lines. Each representing one file or directory. + :rtype: list + """ + flag_to_prefix = { + DOC : '%doc ', + CONFIG : '%config(noreplace) ', + 0 : '' + } + res = [] + for name, entry in entries.items(): + full_path = os.path.join(path, name) + if isinstance(entry, dict): + if name.endswith('perms/'): + res.append('%dir %attr(1777, -, -) ' + full_path) + elif name.endswith('/'): + res.append('%dir ' + full_path) + res.extend(_make_spec_files_string(full_path, entry)) + elif isinstance(entry, tuple): + if REGULAR & entry[0]: + attrs = '%%attr(%o, -, -) ' % entry[1] + else: + attrs = '' + res.append(flag_to_prefix[entry[0] & (DOC | CONFIG)] + + attrs + full_path) + else: + raise TypeError('enexpected entry in package dictionary: "%s"' + % repr(entry)) + return res + +def _get_rpm_name_from_spec(spec_file_path): + """ + :param string spec_file_path: Absolute file path to spec file. + :returns: Name of rpm file builded out of particular spec file. + :rtype: string + """ + return subprocess.check_output(['/usr/bin/rpm', '-q', '--specfile', + spec_file_path]).splitlines()[0] + '.rpm' + +def _build_pkg(rpmbuild_dir, pkg_nevra, pkg_dict): + """ + Build RPM package. + + :param string rpmbuild_dir: Absolute path to directory where + the build takes place. + :param string pkg_nevra: Nevra string of package to build. + :param dictionary pkg_dict: One of ``REPOS`` values. + :returns: Absolute file path to created rpm. + :rtype: string + """ + config = get_yum_config() + specs_dir = os.path.join(rpmbuild_dir, 'SPECS') + build_dir = os.path.join(rpmbuild_dir, 'BUILD') + rpm_dir = os.path.join(rpmbuild_dir, 'RPMS') + if not os.path.exists(specs_dir): + os.mkdir(specs_dir) + if not os.path.exists(build_dir): + os.mkdir(build_dir) + spec_path = os.path.join(specs_dir, pkg_nevra + '.spec') + match = util.RE_NEVRA.match(pkg_nevra) + if not match: + raise ValueError('pkg_nevra is not a valid nevra: "%s"' % pkg_nevra) + with open(spec_path, 'w') as spec: + spec_args = { + "pkg_name" : match.group('name'), + "epoch" : match.group('epoch'), + "version" : match.group('ver'), + "release" : match.group('rel'), + "pkg_arch" : match.group('arch') + } + if match.group('arch') == 'noarch': + spec_args['arch_string'] = 'BuildArch: %s\n' % match.group('arch') + else: + spec_args['arch_string'] = '' + for deps in ('requires', 'conflicts'): + if deps in pkg_dict and pkg_dict[deps]: + spec_args[deps + '_string'] = "\n".join( + ('%s: %s' % (deps.capitalize(), r)) + for r in pkg_dict[deps]) % config._asdict() + else: + spec_args[deps + '_string'] = '' + spec_args['files_string'] = "\n".join( + _make_spec_files_string('/', pkg_dict.get('files', {}))) + if pkg_dict.get('files', []): + spec_args['install_string'] = 'cp -a ./* ${RPM_BUILD_ROOT}/' + else: + spec_args['install_string'] = '' + spec.write(SPEC_TEMPLATE % spec_args) + + subprocess.call(['/usr/bin/rpmbuild', '--quiet', + '-ba', # build rpm package + '-D', '_topdir %s' % rpmbuild_dir, # build in rpmbuild_dir + spec_path # from this spec + ]) + return os.path.join(rpm_dir, match.group('arch'), + _get_rpm_name_from_spec(spec_path)) + +def _make_pkg(rpmbuild_dir, pkg_nevra, pkg_dict): + """ + :returns: Absolute path to created rpm package. + :rtype: string + """ + _make_source_tarball(rpmbuild_dir, pkg_nevra, pkg_dict) + return _build_pkg(rpmbuild_dir, pkg_nevra, pkg_dict) + +def _create_repo(repos_dir, repoid, packages): + """ + :returns: Absolute path to repository configuration file. + :rtype: string + """ + config_path = os.path.join('/etc/yum.repos.d', repoid + '.repo') + repo_dir = os.path.join(repos_dir, repoid) + with open(config_path, 'w') as config_file: + config_file.write(REPO_CONFIG_TEMPLATE % { + 'repoid' : repoid, + 'repos_dir' : repos_dir, + 'name' : 'OpenLMI Software Test Repository - ' + ' '.join( + r.capitalize() + for r in repoid[len('openlmi-sw-test-repo-'):].split('-')) + }) + os.makedirs(repo_dir) + for rpm_path in packages: + shutil.move(rpm_path, repo_dir) + subprocess.call(['/usr/bin/createrepo', '--quiet', repo_dir]) + return config_path + +def get_yum_config(): + """ + :returns: Yum configuration variables for this system. + :rtype: :py:class:`YumVars` + """ + if not hasattr(get_yum_config, '_yum_vars'): + yumvar_dict = yum.YumBase().conf.yumvar + kwargs = { f: yumvar_dict[f] + for f in YumVars._fields if f in yumvar_dict} + kwargs['disttag'] = '.' + subprocess.check_output( + ['/usr/bin/rpm', '-q', '--qf', '%{RELEASE}\n' + , 'kernel']).splitlines()[0].split('.')[-1] + get_yum_config._yum_vars = YumVars(**kwargs) + return get_yum_config._yum_vars + +def make_repositories(repos_dir): + """ + :returns: ``{ repoid : (config_path, rpms), ... }`` + Where rpms is a list of pairs: + ``[ (pkg_nevra, rpm_path), ... ]`` + :rtype: dictionary + """ + rpmbuild_dir = tempfile.mkdtemp() + config = get_yum_config() + try: + # (repoid, (config_file_path, rpm_package_list)) + result = {} + for repoid, packages in REPOS.items(): + rpms = [] + for pkg_tmpl, pkg_dict in packages.items(): + pkg_nevra = pkg_tmpl % config._asdict() + rpms.append( + ( pkg_nevra + , _make_pkg(rpmbuild_dir, pkg_nevra, pkg_dict))) + result[repoid] = ( + _create_repo(repos_dir, repoid, [r for _, r in rpms]), + rpms) + finally: + shutil.rmtree(rpmbuild_dir) + return result + +def make_object_database(repos_dir): + """ + :returns: ``(test_repo_db, other_repo_db)`` where + both items have the following structure: ``{ repoid : repo, ... }``. + First item contains repositories that shall be used for testing. + Others are currently present on system and will be disabled during + tests. + :rtype: tuple + """ + repos = make_repositories(repos_dir) + db = dict((r.repoid, r) for r in repository.get_repo_database()) + result = {} + for repoid, (_config_path, rpms) in repos.items(): + repo = db.pop(repoid) + for pkg_nevra, rpm_path in rpms: + pkg_name = util.RE_NEVRA.match(pkg_nevra).group('name') + pkg_dict = [ d for p, d in REPOS[repoid].items() + if p.startswith(pkg_name)][0] + pkg = _make_package_object(repo, pkg_nevra, + os.path.join(repo.local_path, os.path.basename(rpm_path)), + pkg_dict) + repo.packages.add(pkg) + result[repoid] = repo + return (result, db) + +class ObjectEncoder(json.JSONEncoder): + """ + Takes care of encoding repository and package objects for json to handle + library to handle. + """ + def default(self, obj): + if isinstance(obj, repository.Repository): + return repository.to_json(self, obj) + elif isinstance(obj, package.Package): + return package.to_json(self, obj) + elif isinstance(obj, datetime.datetime): + return int(time.mktime(obj.timetuple())) + elif isinstance(obj, set): + return list(obj) + return json.JSONEncoder.default(self, obj) + +def save_test_database(repos_dir, test_repo_db, other_repo_db, dest_path=None): + """ + Writes package database into a file. + + :param string repos_dir: Directory were repositories are stored. + :param dictionary test_repo_db: Dictionary with testing repositories. + It's the first item returned from :py:func:`make_object_database`. + :param dictionary other_repo_db: Dictionary with other repositories. + It's the second item returned from :py:func:`make_object_database`. + :param string dest_path: Absolute file path, where the given objects will be + serialized. If ``None``, some random, temporary file will be created. + :returns: File path of written data. + :rtype: string + """ + if dest_path is None: + dest_path = tempfile.mkstemp() + db_file = os.fdopen(dest_path[0], 'w') + dest_path = dest_path[1] + else: + db_file = open(dest_path, 'w') + try: + data = (repos_dir, test_repo_db, other_repo_db) + json.dump(data, db_file, cls=ObjectEncoder, + sort_keys=True, indent=4, separators=(',', ': ')) + finally: + db_file.close() + return dest_path + +def load_test_database(src_path): + """ + This is inverse function to :py:func:`save_pkg_database`. + + :returns: ``(repos_dir, test_repo_db, other_repo_db)`` package lists loaded + from file. + :rtype: tuple + """ + with open(src_path, 'r') as db_file: + repos_dir, test_db, other_db = json.load( + db_file, + object_hook=functools.partial( + repository.from_json, package.from_json)) + return repos_dir, test_db, other_db + +def main(): + repos_dir = tempfile.mkdtemp() + print "created repos dir: %s" % repos_dir + db = make_object_database(repos_dir) + import pprint + pprint.pprint(db) + +if __name__ == '__main__': + main() + diff --git a/src/software/test/repository.py b/src/software/test/repository.py index bb9b068..d963d43 100644 --- a/src/software/test/repository.py +++ b/src/software/test/repository.py @@ -23,6 +23,9 @@ Abstraction of YUM repository for test purposes. from collections import defaultdict, namedtuple from datetime import datetime +import functools +import inspect +import json import os import re from subprocess import call, check_output @@ -40,23 +43,207 @@ RE_REPO_CONFIG = re.compile( RE_REPO_ENABLED = re.compile( r'^enabled\s*=\s*(true|false|0|1|yes|no)\s*$', re.I | re.M) -Repository = namedtuple("Repository", #pylint: disable=C0103 - # repo properties - [ "repoid" - , "name" - , "status" - , "revision" - , "tags" - , "last_updated" - , "pkg_count" - , "base_urls" - , "metalink" - , "mirror_list" - , "filename" - , "cost" - , "gpg_check" - , "repo_gpg_check" - ]) +class Repository(object): + + def __init__(self, repoid, name, status, revision, + tags, last_updated, pkg_count, base_urls, + metalink, mirror_list, filename, cost, timeout, + gpg_check, repo_gpg_check, metadata_expire, + config_path=None, packages=None): + for attr in ('repoid', 'name', 'tags', 'metalink', 'filename'): + setattr(self, attr, locals()[attr]) + for attr in ('gpg_check', 'repo_gpg_check', 'cost', 'status', + 'revision', 'last_updated', 'pkg_count', + 'base_urls', 'mirror_list', 'timeout', 'metadata_expire'): + setattr(self, '_'+attr, None) + self.base_urls = base_urls + self.cost = cost + self.gpg_check = gpg_check + self.last_updated = last_updated + self.metadata_expire = metadata_expire + self.mirror_list = mirror_list + self.pkg_count = pkg_count + self.repo_gpg_check = repo_gpg_check + self.revision = revision + self.status = status + self.timeout = timeout + if config_path is None: + repos_dir = os.environ.get('LMI_SOFTWARE_YUM_REPOS_DIR', + '/etc/yum.repos.d') + config_path = os.path.join(repos_dir, '%s.repo' % self.repoid) + self.config_path = config_path + if packages is None: + packages = set() + self._packages = set(packages) + + @property + def base_urls(self): + return self._base_urls + @base_urls.setter + def base_urls(self, base_urls): + if not isinstance(base_urls, list): + raise TypeError('base_urls must be a list') + self._base_urls = base_urls + + @property + def local_path(self): + if ( len(self._base_urls) == 1 + and self._base_urls[0].startswith('file://')): + return self._base_urls[0][len('file://'):] + + @property + def cost(self): + return self._cost + @cost.setter + def cost(self, cost): + self._cost = int(cost) + + @property + def gpg_check(self): + return self._gpg_check + @gpg_check.setter + def gpg_check(self, value): + self._gpg_check = bool(value) + + @property + def last_updated(self): + return self._last_updated + @last_updated.setter + def last_updated(self, last_updated): + if isinstance(last_updated, int): + last_updated = datetime.fromtimestamp(last_updated) + if last_updated is not None and not isinstance(last_updated, datetime): + raise TypeError('last_updated must be a datetime object') + self._last_updated = last_updated + + @property + def metadata_expire(self): + return self._metadata_expire + @metadata_expire.setter + def metadata_expire(self, metadata_expire): + if not isinstance(metadata_expire, (int, long, basestring)): + raise TypeError('metadata_expire must be either integer or datetime') + self._metadata_expire = ( + int(metadata_expire.strip().split(' ')[0]) + if isinstance(metadata_expire, basestring) + else metadata_expire) + + @property + def mirror_list(self): + return self._mirror_list + @mirror_list.setter + def mirror_list(self, mirror_list): + if mirror_list is not None and not isinstance(mirror_list, basestring): + raise TypeError('mirror list must be a string') + if not mirror_list: + mirror_list = None + self._mirror_list = mirror_list + + @property + def packages(self): + return self._packages + + @property + def pkg_count(self): + return self._pkg_count + @pkg_count.setter + def pkg_count(self, pkg_count): + self._pkg_count = None if pkg_count is None else int(pkg_count) + + @property + def repo_gpg_check(self): + return self._repo_gpg_check + @repo_gpg_check.setter + def repo_gpg_check(self, value): + self._repo_gpg_check = bool(value) + + @property + def revision(self): + return self._revision + @revision.setter + def revision(self, value): + self._revision = None if value is None else int(value) + + @property + def status(self): + return self._status + @status.setter + def status(self, value): + self._status = bool(value) + + @property + def timeout(self): + return self._timeout + @timeout.setter + def timeout(self, timeout): + self._timeout = float(timeout) + + def __getitem__(self, pkg_name): + pkg_dict = {p.name : p for p in self._packages} + try: + return pkg_dict[pkg_name] + except KeyError: + try: + return pkg_dict['openlmi-sw-test-' + pkg_name] + except KeyError: + raise KeyError(pkg_name) + + def refresh(self): + updated_attrs = set() + for attr, value in _parse_repo_file(self.repoid).items(): + try: + setattr(self, attr, value) + updated_attrs.add(attr) + except AttributeError: + print 'can\'t set attribute "%s"' % attr + if not self.status: + for attr in ('revision', 'tags', 'last_updated'): + if attr not in updated_attrs and getattr(self, attr) is not None: + setattr(self, attr, None) + +def to_json(encoder, repo): + """ + Converts repository object to dictionary which json encoder can handle. + + :param encoder: Instance of json encoder. + :param repo: Package object to convert. + :type repo: :py:class:`Repository` + :returns: Dictionary with repository attributes. + :rtype: dictionary + """ + if not isinstance(repo, Repository): + raise TypeError("not a repo object") + def transform_value(val): + if not isinstance(val, (int, long, float, tuple, list, + dict, bool, basestring)) and val is not None: + return encoder.default(val) + if isinstance(val, (list, tuple)): + return type(val)((transform_value(v) for v in val)) + return val + argspec = inspect.getargspec(repo.__init__) + return { a: transform_value(getattr(repo, a)) for a in argspec.args[1:] } + +def from_json(package_deserializer, json_object): + """ + Constructs a repository object from dictionary loaded from json text. + Inverse function to :py:func:`to_json`. + + :param callable package_deserializer: Function taking encoder and package + dictionary as arguments, returning new package object. + :param dictionary json_object: Deserialized repository as a dictionary. + :returns: Package object. + :rtype: :py:class:`Package` + """ + if 'gpg_check' in json_object: + kwargs = {} + for key, value in json_object.items(): + if isinstance(value, (tuple, list)): + value = type(value)((package_deserializer(v) for v in value)) + else: + value = package_deserializer(value) + kwargs[key] = value + return Repository(**kwargs) + return json_object # example of repo information #Repo-id : updates-testing/18/x86_64 @@ -73,9 +260,12 @@ Repository = namedtuple("Repository", #pylint: disable=C0103 def _parse_repo_file(repo_name): """ - Parse output of yum-config-manager command for single repository. - @return dictionary with key-value pairs suitable for passing to Repository - constructor. + Parse output of ``yum-config-manager`` command for single repository. + + :param string repo_name: Repository id. + :returns: Dictionary with key-value pairs suitable for passing to + :py:class:`Repository` constructor. + :rtype: dictionary """ cmd = ["yum-config-manager", repo_name] out = check_output(cmd).decode('utf-8') @@ -93,17 +283,23 @@ def _parse_repo_file(repo_name): result[tag] = int(value) elif tag == "cost": result[tag] = int(value) - else: + elif tag == 'baseurl': + result['base_urls'] = [value] + elif tag == 'enabled': + result['status'] = value.lower() in ('true', 'yes', 1) + elif tag == 'mirrorlist': + result['mirror_list'] = value + elif tag in {p for p in Repository.__dict__ if not p.startswith('_')}: continue return result -def make_repo(repo_info): +def make_repo(repo_info, packages=None): """ Makes a Repository instance from string dumped by yum repoinfo command. """ metadata = defaultdict(lambda : None) metadata["base_urls"] = [] - fields = set(Repository._fields) + argspec = inspect.getargspec(Repository.__init__) for match in RE_REPO_TAG.finditer(repo_info): tag = match.group('tag') value = match.group('value') @@ -116,7 +312,7 @@ def make_repo(repo_info): , 'mirrors' : 'mirror_list' }[tag] except KeyError: - if tag not in fields: + if tag not in argspec.args: continue # unexpeted, or unwanted tag found new_tag = tag if new_tag == 'repoid': @@ -135,26 +331,41 @@ def make_repo(repo_info): metadata[new_tag] = int(value) else: metadata[new_tag] = value + if packages is None: + packages = set() + metadata['packages'] = packages config_items = _parse_repo_file(metadata['repoid']) - for field in Repository._fields: + for field in argspec.args[1:]: # leave out self argument if not field in metadata: metadata[field] = config_items.get(field, None) - return Repository(**dict((f, metadata[f]) for f in Repository._fields)) + return Repository(**metadata) + +def get_repo_list(kind='all'): + if kind.lower() not in ('all', 'enabled', 'disabled'): + raise ValueError('kind must be on of {"all", "enabled", "disabled"}') + cmd = ["yum", "-q", "repoinfo", "all"] + return list( + RE_REPO_TAG.search(m.group(1)).group('value').split('/')[0] + for m in RE_REPO_INFOS.finditer( + check_output(cmd).decode('utf-8'))) def get_repo_database(): """ - @return list of Repository instances for all configured repositories. + :returns: List of repository objects for all configured repositories. + :rtype: list """ result = [] cmd = ["yum", "-q", "repoinfo", "all"] - repo_infos = check_output(cmd, env={'LANG': 'C'}).decode('utf-8') + repo_infos = check_output(cmd).decode('utf-8') for match in RE_REPO_INFOS.finditer(repo_infos): result.append(make_repo(match.group(1))) return result def is_repo_enabled(repo): """ - @return True, if repository is enabled + :param repo: Either a repository id or instance of :py:class:`Repository`. + :returns: Whether the repository is enabled. + :rtype: boolean """ if isinstance(repo, Repository): repo = repo.repoid @@ -166,6 +377,9 @@ def is_repo_enabled(repo): def set_repo_enabled(repo, enable): """ Enables or disables repository in its configuration file. + + :param repo: Eiether a repository id or instance of :py:class:`Repository`. + :param boolean enable: New state of repository. """ if isinstance(repo, Repository): repo = repo.repoid diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py deleted file mode 100644 index b4be748..0000000 --- a/src/software/test/rpmcache.py +++ /dev/null @@ -1,541 +0,0 @@ -#!/usr/bin/python -# -*- Coding:utf-8 -*- -# -# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. # -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Radek Novacek <rnovacek@redhat.com> -# Authors: Michal Minar <miminar@redhat.com> - -""" -Creation and manipulation utilities with rpm cache for software tests. -""" - -import copy -import os -import json -import random -import re -from collections import defaultdict -from subprocess import call, check_output, CalledProcessError - -import util -from package import Package, PackageEncoder, from_json - -DB_BACKUP_FILE = 'lmi_software_test_cache' - -RE_AVAIL_PKG = re.compile( - r'^(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' - r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)' - r'-(?P<rel>[a-zA-Z0-9_.]+)\s+' - r'(?P<repo>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE) -# this won't match the last entry, unless "package\n" is not appended -# at the end of the string -RE_PKG_DEPS = re.compile( - r'^package:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' - r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)' - r'-(?P<rel>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)' - r'(?=^package|\Z)', re.MULTILINE | re.DOTALL) -RE_DEPS_PROVIDERS = re.compile( - r'^\s*dependency:\s*(?P<dependency>.+?)\s*' - r'(?P<providers>(^\s+unsatisfied\s+dependency$)|(^\s+provider:.+?$)+)', - re.MULTILINE | re.IGNORECASE) -RE_DEPS_UNSATISFIED = re.compile(r'^\s+unsatisfied\s+dep.*$', re.IGNORECASE) -RE_PROVIDER = re.compile( - r'^\s+provider:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' - r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)' - r'-(?P<rel>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE) -RE_PKG_INFO = re.compile( - r'^Name\s*:\s*(?P<name>[^\s]+).*?' - r'^(Epoch\s*:\s*(?P<epoch>[0-9]+)\s+)?' - r'^Version\s*:\s*(?P<ver>[a-zA-Z0-9._+-]+)\s+' - r'^Release\s*:\s*(?P<rel>[^\s]+)\s+.*?' - r'^Size\s*:\s*(?P<size>\d+(\.\d+)?)( *(?P<units>[kMG]))?', - re.MULTILINE | re.DOTALL | re.IGNORECASE) -# matching packages won't be selected for dangerous tests -RE_AVAILABLE_EXCLUDE = re.compile( - r'^(kernel|tog-pegasus|sblim|.*openlmi).*') - -# Maximum number of packages, that will be selected for testing / 2 -# There are 2 sets of test packages (safe and dangerous). When running -# in dangerous mode, the resulting number will be twice as much. -MAX_PKG_DB_SIZE = 3 -# step used to iterate over package names used to check for thery dependencies -# it's a number of packages, that will be passed to yum command at once -PKG_DEPS_ITER_STEP = 50 - -class InvalidTestCache(Exception): - """Exception saying, that rpm test cache is not valiid.""" - pass -class MissingRPM(InvalidTestCache): - """ - Raised, when requested rpm for package is not contained in - rpm test cache. - """ - def __init__(self, pkg_name): - InvalidTestCache.__init__(self, - "Missing package '%s' in test cache!"%pkg_name) - -def _is_sane_package(pkg_name): - """ - :returns: Whether the given package can be included among tested ones. - :rtype: boolean - """ - return RE_AVAILABLE_EXCLUDE.match(pkg_name) is None - -def _match_nevr(match): - """ - @param match is a regexp match object with parsed rpm package - @return tuple (name, epoch, version, release) - """ - epoch = match.group('epoch') - return ( match.group('name') - , epoch if epoch and epoch.lower() != "(none)" else "0" - , match.group('ver') - , match.group('rel')) - -def _match2pkg(match, safe=False): - """ - @param match is a regexp match object with attributes: - name, epoch, version, release, arch - @param safe if True, the packe will have up_* properties equal to - non-prefixed ones, otherwise they will be set to None - @return instance of Package - """ - kwargs = {} - if safe is True: - kwargs['safe'] = True - else: - for attr in ('epoch', 'ver', 'rel', 'repo'): - kwargs['up_'+attr] = None - epoch = match.group('epoch') - if not epoch or epoch.lower() == "(none)": - epoch = '0' - return Package( - match.group('name'), - epoch, - match.group('ver'), match.group('rel'), - match.group('arch'), match.groupdict().get('repo', None), - **kwargs) - -def _filter_duplicates(installed, avail_str): - """ - Parse output of "yum list available" command and retuns only those - packages occuring in multiple versions. - @param installed is a set of installed package names - @param avail_str yum command output - @return [ [pkg1v1, pkg1v2, ...], [pkg2v1, pkg2v2, ...], ... ] - Each sublist of result contain at least 2 elements, that are instances - of Package. - """ - dups_list = [] - cur_package_matches = [] - prev_match = None - system_arch = util.get_system_architecture() - for match in RE_AVAIL_PKG.finditer(avail_str): - if ( _match_nevr(match) in [ _match_nevr(m) - for m in cur_package_matches] - or ( ( not prev_match - or prev_match.group('name') in - [m.group('name') for m in cur_package_matches]) - and match.group('arch') not in ('noarch', system_arch)) - or not _is_sane_package(match.group('name'))): - continue - if prev_match and prev_match.group('name') != match.group('name'): - if ( len(cur_package_matches) > 1 - and not cur_package_matches[0].group('name') in installed): - pkgs = [ _match2pkg(m) for m in cur_package_matches ] - dups_list.append(pkgs) - cur_package_matches = [] - cur_package_matches.append(match) - prev_match = match - if len(cur_package_matches) > 1: - dups_list.append([ _match2pkg(m) for m in cur_package_matches ]) - return dups_list - -def _check_single_pkg_deps( - installed, - dependencies_str): - """ - Each package has zero or more dependencies. Each dependency - has at least one provider. One of these provider must be installed - for each such dependency. - @param dependencies of single package - @return True if all dependencies have at least one provider installed - """ - for match_deps in RE_DEPS_PROVIDERS.finditer(dependencies_str): - providers = [] - if RE_DEPS_UNSATISFIED.search(match_deps.group('providers')): - return False - for match_dep in RE_PROVIDER.finditer(match_deps.group('providers')): - if match_dep.group('name') not in installed: - continue - providers.append(_match2pkg(match_dep)) - for provider in providers: - if is_pkg_installed(provider, False): - break - else: # no provider is installed - return False - return True - -def _check_pkg_dependencies( - installed, - dup_list, - number_of_packages=MAX_PKG_DB_SIZE, - repolist=None): - """ - Finds packages from dup_list with satisfied (installed) dependencies. - @param installed is a set of installed package names - @return filtered dup_list with at least number_of_packages elements. - """ - yum_params = ['deplist'] - dups_no_deps = [] - for i in range(0, len(dup_list), PKG_DEPS_ITER_STEP): - dups_part = dup_list[i:i+PKG_DEPS_ITER_STEP] - yum_params = yum_params[:1] - for dups in dups_part: - yum_params.extend([d.get_nevra(newer=False) for d in dups]) - deplist_str = util.run_yum(*yum_params, repolist=repolist) - matches = RE_PKG_DEPS.finditer(deplist_str) - prev_match = None - for pkgs in dups_part: - satisfied = True - remains = len(pkgs) - for match_pkg in matches: - if ( prev_match - and _match_nevr(prev_match) == _match_nevr(match_pkg)): - # there sometimes appear duplicates in yum deplist - # output, so let's skip them - continue - if satisfied and not _check_single_pkg_deps( - installed, match_pkg.group('dep_list')): - satisfied = False - prev_match = match_pkg - remains -= 1 - if remains <= 0: - break - if satisfied: - # all packages from pkgs have satisfied dependencies - dups_no_deps.append(pkgs) - if len(dups_no_deps) >= number_of_packages: - break - return dups_no_deps - -def _sorted_db_by_size(pkgdb, repolist=None): - """ - @param pkgdb is a list of lists of packages with common name - @return sorted instances of Package according to their size - """ - yum_params = ['info', '--showduplicates'] - yum_params.extend([ps[0].name for ps in pkgdb]) - info_str = util.run_yum(*yum_params, repolist=repolist) - pkg_sizes = {} - # to get correct ordering from "yum info" command - # { pkg_name : [(epoch, version, release), ... ] } - pkg_version_order = defaultdict(list) - try: - header = "Available Packages\n" - info_str = info_str[info_str.index(header)+len(header):] - except ValueError: - pass - for info_match in RE_PKG_INFO.finditer(info_str): - pkg_name = info_match.group('name') - size = float(info_match.group('size')) - units = info_match.group('units') - if units: - size *= defaultdict(lambda: 1, - {'k':10**3, 'm':10**6, 'g':10**9})[units.lower()] - pkg_sizes[pkg_name] = size - epoch = info_match.group('epoch') - if not epoch: - epoch = "0" - pkg_version_order[pkg_name].append(( - epoch, info_match.group('ver'), info_match.group('rel'))) - pkgdb = sorted(pkgdb, key=lambda pkgs: pkg_sizes[pkgs[0].name])[ - :MAX_PKG_DB_SIZE] - - for i, pkgs in enumerate(pkgdb): - pkgs = sorted(pkgs, key=lambda p: - pkg_version_order[pkgs[0].name].index((p.epoch, p.ver, p.rel))) - pkg_kwargs = dict((k, getattr(pkgs[0], k)) for k in ('name', 'arch') ) - for attr in ('epoch', 'ver', 'rel', 'repo'): - pkg_kwargs[attr] = getattr(pkgs[0], attr) - pkg_kwargs['up_'+attr] = getattr(pkgs[-1], attr) - pkgdb[i] = Package(**pkg_kwargs) - return pkgdb - -def _download_dangerous(repolist, pkgdb, cache_dir=None): - """ - Downloads all rpm packages (old and newer versions) from package database - to current directory. - """ - repo_pkgs = defaultdict(list) - for pkg in pkgdb: - repo_pkgs[pkg.repo].append(pkg.name) - repo_pkgs[pkg.up_repo].append(pkg.name) - base_cmd = ['yumdownloader'] - if cache_dir: - base_cmd.extend(['--destdir', cache_dir]) - for repo, pkgs in repo_pkgs.items(): - cmd = copy.copy(base_cmd) - repos = set(repolist) - try: - repos.remove(repo) - except KeyError: # there may be duplicates - continue - for not_allowed_repo in repos: - cmd.append('--disablerepo='+not_allowed_repo) - cmd.append('--enablerepo='+repo) - cmd.extend(pkgs) - call(cmd) - -def _make_rpm_path(pkg, cache_dir='', newer=True, without_epoch=False): - """ - @param newer says, whether to use EVR of package to update - (in this case, all epoch/ver/rel attributes will be prefixed with "up_") - @param without_epoch if True, epoch will be left out of package name - @return path to rpm package made from instance of Package - """ - if not isinstance(pkg, Package): - raise TypeError("pkg must be an instance of Package ") - nevra = pkg.get_nevra(newer, - with_epoch='NEVER' if without_epoch else 'NOT_ZERO') - return os.path.join(cache_dir, nevra) + '.rpm' - -def get_rpm_name(pkg, cache_dir='', newer=True): - """ - Some packages do not have epoch in their name, even if it's higher than - zero. That's why it's necessary to try more variants of rpm name. - @return rpm path to package in cache - """ - path = _make_rpm_path(pkg, cache_dir, newer) - if os.path.exists(path): - return path - path = _make_rpm_path(pkg, cache_dir, newer, True) - if os.path.exists(path): - return path - raise MissingRPM(pkg.name) - -def rpm_exists(pkg, cache_dir='', newer=True): - """ - @return True, when rpm package is in cache. - """ - return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer)) - or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True))) - - -def remove_pkg(pkg, *args): - """ - Remove package with rpm command. - @param pkg is either instance of Package or package name - @param args is a list of parameters for rpm command - """ - if isinstance(pkg, Package): - pkg = pkg.name - call(["rpm", "--quiet"] + list(args) + ["-e", pkg]) - -def install_pkg(pkg, newer=True, repolist=None): - """ - Install a specific package. - @param pkg is either package name or instance of Package - In latter case, a specific version is installed. - @param repolist is a list of repositories, that should be - used for downloading, if using yum - when empty, all enabled repositories are used - """ - if isinstance(pkg, Package): - try: - rpm_name = get_rpm_name(pkg, newer=newer) - call(["rpm", "--quiet", "-i", rpm_name]) - return - except MissingRPM: - pass - pkg = pkg.name - util.run_yum('-q', '-y', 'install', pkg, repolist=repolist) - -def ensure_pkg_installed(pkg, newer=True, repolist=None): - """ - Ensures, that specific version of package is installed. If other - version is installed, it is removed and reinstalled. - """ - if not isinstance(pkg, Package): - raise TypeError("pkg must be a Package instance") - if not is_pkg_installed(pkg, newer): - if is_pkg_installed(pkg.name): - remove_pkg(pkg.name) - install_pkg(pkg, newer, repolist) - -def verify_pkg(pkg): - """ - @return output of command rpm, with verification output for package - """ - if isinstance(pkg, basestring): - name = pkg - elif isinstance(pkg, Package): - name = pkg.name - else: - raise TypeError("pkg must be either package name or Package instance") - return call(["rpm", "--quiet", "-Va", name]) == 0 - -def has_pkg_config_file(pkg, file_path): - """ - @return True, if file_path is a configuration file of package pkg. - """ - cmd = ['rpm', '-qc', pkg.name] - out = check_output(cmd) - return file_path in set(out.splitlines()) #pylint: disable=E1103 - -def has_pkg_doc_file(pkg, file_path): - """ - @return True, if file_path is a documentation file of package pkg. - """ - cmd = ['rpm', '-qd', pkg.name] - out = check_output(cmd) - return file_path in set(out.splitlines()) #pylint: disable=E1103 - -def is_pkg_installed(pkg, newer=True): - """ - Check, whether package is installed. - """ - if not isinstance(pkg, Package): - return call(["rpm", "--quiet", "-q", pkg]) == 0 - else: - cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", pkg.get_nevra( - newer, with_epoch='NEVER') ] - try: - out = check_output(cmd).splitlines()[0] - epoch, nvra = out.split(':') #pylint: disable=E1103 - if not epoch or epoch.lower() == "(none)": - epoch = "0" - return ( epoch == getattr(pkg, 'up_epoch' if newer else 'epoch') - and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER")) - except CalledProcessError: - return False - -def write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir=''): - """ - Writes package database into a file named DB_BACKUP_FILE. - """ - with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'w') as db_file: - data = (safe_pkgs, dangerous_pkgs) - json.dump(data, db_file, cls=PackageEncoder, - sort_keys=True, indent=4, separators=(',', ': ')) - -def load_pkgdb(cache_dir=''): - """ - This is inverse function to _write_pkgdb(). - @return (safe, dangerous) package lists loaded from file - """ - with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'r') as db_file: - safe, dangerous = json.load(db_file, object_hook=from_json) - #print "Loaded package database from: %s" % date_time - return safe, dangerous - -def make_dangerous_list(installed, repolist=None): - """ - This makes a list of instances of Package for dangerous tests. - """ - avail_str = util.run_yum('list', 'available', '--showduplicates', - repolist=repolist) - # list of lists of packages with the same name, longer than 2 - dups_list = _filter_duplicates(installed, avail_str) - selected = _check_pkg_dependencies(installed, dups_list, - number_of_packages=MAX_PKG_DB_SIZE*5, - repolist=repolist) - return _sorted_db_by_size(selected, repolist=repolist) - -def make_safe_list(installed, exclude=None): - """ - Makes list of installed packages for non-dangerous tests. - @param installed is a list of installed packages names - @param exclude is a list of package names, that won't appear in result - """ - if exclude is None: - exclude = set() - base = list(installed) - random.shuffle(base) - res = [] - i = 0 - while len(res) < MAX_PKG_DB_SIZE and i < len(base): - name = base[i] - i += 1 - if name in exclude or not _is_sane_package(name): - continue - cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", name ] - envra = check_output(cmd).splitlines()[0] - res.append(_match2pkg(util.RE_ENVRA.match(envra), safe=True)) - return res - -def get_pkg_database( - force_update=False, - use_cache=True, - dangerous=False, - cache_dir='', - repolist=None): - """ - Checks yum database for available packages, that have at least two - different versions in repositories. Only not installed ones with - all of their dependencies intalled are selected. - And from those, few of the smallest are downloaded as rpms. - @param dangerous whether to load available, not installed - packages for dangerous tests - @return (safe, dangerous) - where - safe is a list of instances of Package, representing installed - software, these should be used for not-dangerous tests; - both older - dangerous is a list of instances of Package of selected packages, - that are not installed, but available; instances contain - both newer and older version of package - """ - dangerous_pkgs = [] - if ( use_cache and not force_update - and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))): - safe_pkgs, dangerous_pkgs = load_pkgdb(cache_dir) - valid_db = True - if len(safe_pkgs) < MAX_PKG_DB_SIZE: - valid_db = False - elif not dangerous and len(dangerous_pkgs) > 0: - dangerous_pkgs = [] - elif dangerous and len(dangerous_pkgs) == 0: - valid_db = False - else: - for pkg in safe_pkgs: - if not is_pkg_installed(pkg): - valid_db = False - break - for pkg in dangerous_pkgs: - if ( not rpm_exists(pkg, cache_dir, False) - or not rpm_exists(pkg, cache_dir, True)): - valid_db = False - #print "Old package database is not valid" - break - if valid_db: - return (safe_pkgs, dangerous_pkgs) - #print "Getting installed packages" - installed = set(check_output( #pylint: disable=E1103 - ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines()) - if dangerous: - dangerous_pkgs = make_dangerous_list(installed) - safe_pkgs = make_safe_list(installed, exclude=set( - pkg.name for pkg in dangerous_pkgs)) - - if use_cache: - repolist = util.get_repo_list() if repolist in (None, []) else repolist - _download_dangerous(repolist, dangerous_pkgs, cache_dir) - #print "Backing up database information" - write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir) - return (safe_pkgs, dangerous_pkgs) - diff --git a/src/software/test/run.py b/src/software/test/run.py index f673d8a..4dfdf4c 100755 --- a/src/software/test/run.py +++ b/src/software/test/run.py @@ -23,8 +23,8 @@ Runs all defined tests. Test modules are expected to be in directory with this script and should contain subclasses of unittest.TestCase, that will be loaded. -Preferably a suite() function should be defined there as well. -They must be named according to shell regexp: "test_*.py". +Preferably a ``suite()`` function should be defined there as well. +They must be named according to shell regexp: ``"test_*.py"``. """ import argparse @@ -33,16 +33,12 @@ import getpass import inspect import pywbem import os -import shutil import sys import tempfile import unittest -import rpmcache - -# this is a global variable, that can be modified by environment or program -# argument -CACHE_DIR = '' +import reposetup +import swbase class NotFound(Exception): """Raised when test of particular name could no be found.""" @@ -52,7 +48,9 @@ class NotFound(Exception): def parse_cmd_line(): """ Use ArgumentParser to parse command line arguments. - @return (parsed arguments object, arguments for unittest.main()) + + :returns: ``(parsed arguments object, arguments for unittest.main())``. + :rtype: tuple """ parser = argparse.ArgumentParser( add_help=False, # handle help message ourselves @@ -90,37 +88,22 @@ def parse_cmd_line(): default=os.environ.get('LMI_RUN_TEDIOUS', '0') == '1', help="Disable tedious tests.") - cache_group = parser.add_mutually_exclusive_group() - cache_group.add_argument("-c", "--use-cache", action="store_true", - default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'), - help="Use a cache directory to download rpm packages for" - " testing purposes. It greatly speeds up testing." - " Also a database file \"${LMI_SOFTWARE_CACHE_DIR}/%s\"" - " will be created to store information" - " for this system. Overrides environment variable" - " LMI_SOFTWARE_USE_CACHE." % rpmcache.DB_BACKUP_FILE) - cache_group.add_argument("--no-cache", action="store_false", - dest="use_cache", - default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'), - help="Do not cache rpm packages for speeding up tests." - " Overrides environment variable LMI_SOFTWARE_USE_CACHE.") - parser.add_argument('--cache-dir', - default=os.environ.get('LMI_SOFTWARE_CACHE_DIR', ''), - help="Use particular directory, instead of temporary one, to store" - " rpm packages for testing purposes. Overrides environment" - " variable LMI_SOFTWARE_CACHE_DIR.") - parser.add_argument('--test-repos', - default=os.environ.get('LMI_SOFTWARE_REPOSITORIES', ''), - help="Use this option to specify list of repositories, that" - " alone should be used for testing. Overrides environment" - " variable LMI_SOFTWARE_REPOSITORIES.") - parser.add_argument('--test-packages', - default=os.environ.get('LMI_SOFTWARE_PACKAGES', ''), - help="Specify packages for dangerous tests. If empty" - " and cache is enabled, some will be picked up by algorithm") - parser.add_argument('--force-update', action="store_true", - help="Force update of package database. Otherwise an old" - " one will be used (if any exists).") + cleanup_group = parser.add_mutually_exclusive_group() + cleanup_group.add_argument("--cleanup-cache", action="store_true", + default=os.environ.get('LMI_SOFTWARE_CLEANUP_CACHE', '1') == '1', + help="Clean up all temporary files created for testing purposes." + " If LMI_SOFTWARE_DB_CACHE is set and this is set to False, next" + " run will be much faster because testing database won't need to" + " be generated again. Overrides environment variable" + " LMI_SOFTWARE_CLEANUP_CACHE.") + cleanup_group.add_argument('--no-cleanup', action="store_false", + dest="cleanup_cache", + default=os.environ.get('LMI_SOFTWARE_CLEANUP_CACHE', '1') == '1', + help="Do not delete database cache file and created repositories." + " This speeds up testing because testing repositories don't need" + " to be recreated upon next run. This applies only when" + " LMI_SOFTWARE_DB_CACHE is set.") + parser.add_argument('-l', '--list-tests', action="store_true", help="List all possible test names.") parser.add_argument('-h', '--help', action="store_true", @@ -147,7 +130,9 @@ def try_connection(args): """ Try to connect to cim broker. If authentication fails, ask the user for credentials in loop. - @return (user, password) + + :returns: ``(user, password)`` + :rtype: tuple """ user = args.user password = args.password @@ -166,6 +151,9 @@ def try_connection(args): def prepare_environment(args): """ Set the environment for test scripts. + + :returns: Whether the database cache needs to be deleted. + :rtype: boolean """ os.environ['LMI_CIMOM_URL'] = args.url os.environ['LMI_CIMOM_USERNAME'] = args.user @@ -174,16 +162,23 @@ def prepare_environment(args): '1' if args.run_dangerous else '0') os.environ["LMI_RUN_TEDIOUS"] = ( '1' if args.run_tedious else '0') - os.environ['LMI_SOFTWARE_USE_CACHE'] = '1' if args.use_cache else '0' - if args.use_cache: - os.environ['LMI_SOFTWARE_CACHE_DIR'] = CACHE_DIR - os.environ['LMI_SOFTWARE_REPOSITORIES'] = args.test_repos - os.environ['LMI_SOFTWARE_PACKAGES'] = args.test_packages + db_cache = os.environ.get("LMI_SOFTWARE_DB_CACHE", None) + needs_cleanup = args.cleanup_cache or not bool(db_cache) + if not db_cache or not os.path.exists(db_cache): + repos_dir = tempfile.mkdtemp() + repodb, other_repos = reposetup.make_object_database(repos_dir) + db_cache = reposetup.save_test_database( + repos_dir, repodb, other_repos, db_cache) + os.environ["LMI_SOFTWARE_DB_CACHE"] = db_cache + os.environ['LMI_SOFTWARE_CLEANUP_CACHE'] = ( + '1' if needs_cleanup else '0') + return needs_cleanup def load_tests(loader, standard_tests, pattern): """ - Helper function for unittest.main() test loader. - @return TestSuite instance + Helper function for ``unittest.main()`` test loader. + + :returns: :py:class:`unittest.TestSuite` instance. """ this_dir = os.path.dirname(__file__) if standard_tests: @@ -206,10 +201,12 @@ class LMITestLoader(unittest.TestLoader): def find_in_test_nodes(node, name): """ Traverses suite tree nodes to find a test named name. - @param name is a name of test to find - @return desired TestCase or test function + + :param string name: Name of test to find. + :returns: Desired :py:class:`TestCase` or test function. """ if ( isinstance(node, unittest.TestSuite) + and node.countTestCases() > 0 and isinstance(next(iter(node)), unittest.TestCase) and next(iter(node)).__class__.__name__ == name): return node @@ -242,8 +239,10 @@ class LMITestLoader(unittest.TestLoader): def unwind_test_suite_tree(node): """ Make a list of test names out of TestSuite. - @param node is a suite - @return [ test_name, ... ] + + :param node: A test suite. + :type node: :py:class:`unittest.TestSuite` + :returns: ``[ test_name, ... ]`` """ result = [] for subnode in node: @@ -269,32 +268,17 @@ def main(): """ Main functionality of script. """ - global CACHE_DIR args, ut_args = parse_cmd_line() if args.list_tests: list_tests() - if args.use_cache and not args.cache_dir: - CACHE_DIR = tempfile.mkdtemp(suffix="software_database") - elif args.use_cache: - CACHE_DIR = args.cache_dir - if not os.path.exists(args.cache_dir): - os.makedirs(args.cache_dir) try_connection(args) - if args.test_repos: - repolist = args.test_repos.split(',') - else: - repolist = [] - rpmcache.get_pkg_database( - args.force_update, - args.use_cache, - dangerous=args.run_dangerous, - cache_dir=CACHE_DIR, - repolist=repolist) - prepare_environment(args) - test_program = unittest.main(__name__, argv=ut_args, - testLoader=LMITestLoader(), exit=False) - if args.use_cache and not args.cache_dir: - shutil.rmtree(CACHE_DIR) + cleanup = prepare_environment(args) + swbase.SwTestCase.setUpClass() + try: + test_program = unittest.main(__name__, argv=ut_args, + testLoader=LMITestLoader(), exit=False) + finally: + swbase.SwTestCase.tearDownClass() sys.exit(0 if test_program.result.wasSuccessful() else 1) if __name__ == '__main__': diff --git a/src/software/test/swbase.py b/src/software/test/swbase.py new file mode 100644 index 0000000..ccf2b78 --- /dev/null +++ b/src/software/test/swbase.py @@ -0,0 +1,288 @@ +# -*- encoding: utf-8 -*- +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Common utilities and base class for all software tests. +""" + +import itertools +import functools +import os +import re +import shutil +import subprocess +import tempfile + +from lmi.test.lmibase import LmiTestCase +import package +import repository +import reposetup + +RE_REPOPKG = re.compile(r'^(?P<repo>[a-z-]+)#(?P<pkg_name>\w+.*|\*)', re.I) + +def test_with_repos(*enable_repos, **repos_dict): + """ + Decorator factory for test case methods. It enables or disables + specified repositories for the time of method execution. + + It accepts shortened form of test repositories e.g. *stable* instead of + *openlmi-sw-test-repo-stable*. + + Usage: :: + + @test_with_repos('stable', **{'updates' : False}) + def test_method(self): + pass + + Repository names that shall be enabled can be passed as string arguments. + Those that shall be disabled need to be passed as keyword arguments with + ``False`` assigned to them. + """ + + def _decorator(func): + """ Returns wrapped function. """ + @functools.wraps(func) + def _wrapper(self, *args, **kwargs): + """ This is a wrapper taking care of repository enablement. """ + to_enable = set(reposetup.full_repo_name(r) for r in enable_repos) + to_enable.update( + set(reposetup.full_repo_name(r) + for r, v in repos_dict.items() if v)) + for repoid in self.repodb: + enable = ( repoid in to_enable + or bool(repos_dict.get(repoid, False))) + repo = self.repodb[repoid] + repo.refresh() + if enable != repo.status: + repository.set_repo_enabled(repo, enable) + repo.refresh() + return func(self, *args, **kwargs) + + return _wrapper + + return _decorator + +def test_with_packages(*install_pkgs, **enable_dict): + """ + Decorator factory for test case methods. Returned decorator takes care of + installation or removal of specified packages for the time of method's + execution. + + Packages needs to be selected from testing database. They are specified in + form: :: + + repository#package + + Where *repository* marks its full or shortened name. The same applies to + packages ( *pkg1* instead of *openlmi-sw-test-pkg1* can be written). + + Star ``*`` is also accepted in place of package name marking any package + present in particular repository. + + When supplied as a key-value pairs, value denote whether package (key) + shall be installed or removed. Packages to be removed may also omit + repository part. + + Usage: :: + + @test_with_packages('stable#pkg1', **{ + 'updates#pkg2' : True, + 'pkg3' : False + }) + def test_method(self): + pass + + Example above shows two possible ways of specifying packages to install. + Those are: + * openlmi-sw-test-pkg1 from stable repository + * openlmi-sw-test-pkg2 from updates repository + Package openlmi-sw-test-pkg3 will be removed regardless of the source + repository. + """ + + def _decorator(func): + @functools.wraps(func) + def _wrapper(self, *args, **kwargs): + to_install = [] + to_remove = set() + for repopkg in itertools.chain(install_pkgs, enable_dict.keys()): + match = RE_REPOPKG.match(repopkg) + if match: + repo = self.repodb[ + reposetup.full_repo_name(match.group('repo'))] + pkg_name = match.group('pkg_name') + if pkg_name == '*': + pkgs = list(repo.packages) + else: + pkgs = [repo[reposetup.full_pkg_name(pkg_name)]] + elif enable_dict.get(repopkg, True) is False: + # package is given as string (with just name) + repopkg = reposetup.full_pkg_name(repopkg) + if package.is_pkg_installed(repopkg): + present = [] + # remove any instances of Package with the same name + # from *to_remove* set + for pkg in to_remove: + if ( isinstance(pkg, package.Package) + and pkg.name == repopkg): + present.append(pkg) + for pkg in present: + to_remove.remove(pkg) + # now let's add the pkg name + to_remove.add(repopkg) + continue + else: + raise ValueError( + 'invalid format of repo#package string: "%s"' + % repopkg) + for pkg in pkgs: + installed = package.is_pkg_installed(pkg) + if installed and not enable_dict.get(repopkg, True): + if pkg.name not in to_remove: + to_remove.add(pkg) + elif not installed and enable_dict.get(repopkg, True): + to_install.append(pkg) + for pkg in to_remove: + package.remove_pkg(pkg) + for pkg in to_install: + if package.is_pkg_installed(pkg.name): + package.remove_pkg(pkg.name) + package.install_pkg(pkg) + return func(self, *args, **kwargs) + + return _wrapper + + return _decorator + +class SwTestCase(LmiTestCase): + """ + Base class for all LMI Software test classes. + + There are few important class properties to note: + + ``repodb`` : dictionary + Is a dictionary of testing repositories. These are created upon + set up of this class. It has following format: :: + + ( repoid, repository) + + Where repository is an instance of + ;py:class:`repository.Repository`. + ``other_repos`` : dictionary + Has the same format but contains other repositories not present in + ``repodb``. These are disabled upon class's set up and re-enabled + on its tear down. + + These additional environment variables affects the execution: + + ``LMI_SOFTWARE_YUM_REPOS_DIR`` + Absolute path to directory used to store yum repository + configuration files. + ``LMI_SOFTWARE_DB_CACHE`` + File path to a serialized testing database which was cached in + previous runs. + ``LMI_SOFTWARE_CLEANUP_CACHE`` : defaults to '1' + Boolean flag indicating whether, the cache shall be deleted. This + includes temporary directory with testing repositories and their + configuration files and serialized database file. + """ + #: Define in subclass. + KEYS = tuple() + #: When running multiple test modules under same process, the test + #: repository will be cached until last test case is destroyed. This + #: greatly speeds up the testing. + TEST_CASES_INSTANTIATED = 0 + + def __init__(self, *args, **kwargs): + LmiTestCase.__init__(self, *args, **kwargs) + self.longMessage = True + + def get_repo(self, repoid): + """ + Get repository object from its name. Prefer testin repositories. + Accepts shortened names of testing repositories. + + :param string repoid: Repository id which may be shortened in case of + testing one. + :returns: Repository object. + :rtype: :py:class:`repository.Repository` + """ + try: + return self.repodb[repoid] + except KeyError: + try: + return self.repodb[reposetup.full_repo_name(repoid)] + except KeyError: + return self.other_repos[repoid] + + @classmethod + def setUpClass(cls): + LmiTestCase.setUpClass.im_func(cls) + # This applies to all the commands whose output needs to be parsed. + # Make sure we don't need to deal with localisations. + os.environ['LANG'] = 'C' + SwTestCase.yum_repos_dir = os.environ.get( + 'LMI_SOFTWARE_YUM_REPOS_DIR', '/etc/yum.repos.d') + SwTestCase._cleanup_db = ( + os.environ.get('LMI_SOFTWARE_CLEANUP_CACHE', '1') + in ('1', 'yes', 'on', 'true')) + if not getattr(SwTestCase, 'repodb', False): + if os.environ.get('LMI_SOFTWARE_DB_CACHE', None): + db_cache = os.environ['LMI_SOFTWARE_DB_CACHE'] + SwTestCase.repos_dir, SwTestCase.repodb, \ + SwTestCase.other_repos = \ + reposetup.load_test_database(db_cache) + else: + SwTestCase.repos_dir = tempfile.mkdtemp() + SwTestCase.repodb, SwTestCase.other_repos = \ + reposetup.make_object_database(SwTestCase.repos_dir) + if not getattr(SwTestCase, '_restore_repos', []): + SwTestCase._restore_repos = [] + for repoid in repository.get_repo_list('enabled'): + if repoid in SwTestCase.other_repos: + SwTestCase._restore_repos.append(repoid) + repository.set_repo_enabled(repoid, False) + SwTestCase.other_repos[repoid].refresh() + SwTestCase.TEST_CASES_INSTANTIATED += 1 + + @classmethod + def tearDownClass(cls): + if SwTestCase.TEST_CASES_INSTANTIATED <= 1: + to_uninstall = set() + for repo in SwTestCase.repodb.values(): + for pkg in repo.packages: + to_uninstall.add(pkg.name) + if not SwTestCase._cleanup_db: + repository.set_repo_enabled(repo.repoid, False) + else: + os.remove(repo.config_path) + to_uninstall = [ p for p in to_uninstall + if package.is_pkg_installed(p)] + if to_uninstall: + subprocess.call([ + '/usr/bin/rpm', '--quiet', '-e'] + to_uninstall) + for repoid in cls._restore_repos: + repository.set_repo_enabled(repoid, True) + if SwTestCase._cleanup_db: + shutil.rmtree(cls.repos_dir) + if os.environ.get('LMI_SOFTWARE_DB_CACHE', None): + os.remove(os.environ['LMI_SOFTWARE_DB_CACHE']) + cls.repodb.clear() + SwTestCase.TEST_CASES_INSTANTIATED -= 1 + diff --git a/src/software/test/test_hosted_software_collection.py b/src/software/test/test_hosted_software_collection.py index 12db36e..2b05c2d 100644 --- a/src/software/test/test_hosted_software_collection.py +++ b/src/software/test/test_hosted_software_collection.py @@ -19,30 +19,26 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_MemberOfSoftwareCollection provider. +Unit tests for ``LMI_MemberOfSoftwareCollection`` provider. """ import pywbem import unittest -import base +import swbase -class TestHostedSoftwareCollection(base.SoftwareBaseTestCase): +class TestHostedSoftwareCollection(swbase.SwTestCase): """ - Basic cim operations test. + Basic cim operations test on ``LMI_HostedSoftwareCollection``. """ CLASS_NAME = "LMI_HostedSoftwareCollection" KEYS = ("Antecedent", "Dependent") - @classmethod - def needs_pkgdb(cls): - return False - def make_op(self): """ - @param ses SoftwareElementState property value - @return object path of SoftwareIdentity + :returns Object path of ``LMI_HostedSoftwareCollection``. + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ return self.cim_class.new_instance_name({ "Antecedent" : self.system_iname, @@ -54,7 +50,7 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase): def test_get_instance(self): """ - Tests GetInstance call on packages from our rpm cache. + Test ``GetInstance()`` call on ``LMI_HostedSoftwareCollection``. """ objpath = self.make_op() inst = objpath.to_instance() @@ -80,7 +76,7 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase): def test_enum_instances(self): """ - Tests EnumInstances call on installed packages. + Test ``EnumInstances()`` call on ``LMI_HostedSoftwareCollection``. """ objpath = self.make_op() insts = self.cim_class.instances() @@ -93,16 +89,16 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase): def test_enum_instance_names(self): """ - Tests EnumInstanceNames call on installed packages. + Test ``EnumInstanceNames`` call on ``LMI_HostedSoftwareCollection``. """ objpath = self.make_op() inames = self.cim_class.instance_names() self.assertEqual(1, len(inames)) self.assertCIMNameEqual(objpath, inames[0]) - def test_get_antecedent_referents(self): + def test_get_computer_system_collections(self): """ - Test ReferenceNames for ComputerSystem. + Try to get collection associated to computer system. """ objpath = self.make_op() refs = objpath.Antecedent.to_instance().associator_names( @@ -114,9 +110,9 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase): ref = refs[0] self.assertCIMNameEqual(objpath.Dependent, ref) - def test_get_dependent_referents(self): + def test_get_collection_computer_systems(self): """ - Test ReferenceNames for SystemSoftwareCollection. + Try to get computer system associated to software collection. """ objpath = self.make_op() refs = objpath.Dependent.to_instance().associator_names( diff --git a/src/software/test/test_hosted_software_identity_resource.py b/src/software/test/test_hosted_software_identity_resource.py index 09e09f2..77c9342 100644 --- a/src/software/test/test_hosted_software_identity_resource.py +++ b/src/software/test/test_hosted_software_identity_resource.py @@ -19,33 +19,28 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_HostedSoftwareIdentityResource provider. +Unit tests for ``LMI_HostedSoftwareIdentityResource`` provider. """ -import pywbem import unittest -import base +import swbase -class TestHostedSoftwareIdentityResource(base.SoftwareBaseTestCase): +ENABLED_STATE_DISABLED = 3 +ENABLED_STATE_ENABLED = 2 + +class TestHostedSoftwareIdentityResource(swbase.SwTestCase): """ - Basic cim operations test. + Basic cim operations test on ``LMI_HostedSoftwareIdentityResource.`` """ CLASS_NAME = "LMI_HostedSoftwareIdentityResource" KEYS = ("Antecedent", "Dependent") - @classmethod - def needs_pkgdb(cls): - return False - - @classmethod - def needs_repodb(cls): - return True - def make_op(self, repo): """ - @return object path of HostedSoftwareIdentityResource association + :returns Object path of ``LMI_HostedSoftwareIdentityResource``. + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ return self.cim_class.new_instance_name({ "Antecedent" : self.system_iname, @@ -58,19 +53,25 @@ class TestHostedSoftwareIdentityResource(base.SoftwareBaseTestCase): }) }) + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates' : True, + 'updates-testing' : False, + 'misc' : False + }) def test_get_instance(self): """ - Tests GetInstance call on repositories from our rpm cache. + Test ``GetInstance()`` call on ``LMI_HostedSoftwareIdentityResource``. """ - for repo in self.repodb: + for repo in self.repodb.values(): objpath = self.make_op(repo) inst = objpath.to_instance() + self.assertNotEqual(inst, None, + "GetInstance succeeds for repo %s" % repo.repoid) self.assertCIMNameEqual(objpath, inst.path, "Object paths should match for repo %s" % repo.repoid) + self.assertEqual(set(inst.path.key_properties()), set(self.KEYS)) for key in self.KEYS: - self.assertIn(key, inst.properties(), - 'OP is missing \"%s\" key for repo "%s".' % - (key, repo.repoid)) self.assertCIMNameEqual( getattr(inst, key), getattr(inst.path, key), 'Key property "%s" does not match for repo "%s"!' % @@ -78,63 +79,158 @@ class TestHostedSoftwareIdentityResource(base.SoftwareBaseTestCase): def test_enum_instance_names(self): """ - Tests EnumInstanceNames call on repositories from our cache. + Test ``EnumInstanceNames()`` call on + ``LMI_HostedSoftwareIdentityResource``. """ inames = self.cim_class.instance_names() - repos = { r.repoid: r for r in self.repodb } - self.assertEqual(len(repos), len(inames)) + repos = set(r for r in self.repodb) + repos.update([r for r in self.other_repos]) + self.assertEqual(len(inames), len(repos)) for iname in inames: self.assertIn(iname.Dependent.Name, repos) - objpath = self.make_op(repos[iname.Dependent.Name]) - self.assertCIMNameEqual(objpath, iname) + repoid = iname.Dependent.Name + if repoid in self.repodb: + repo = self.repodb[repoid] + else: + repo = self.other_repos[repoid] + objpath = self.make_op(repo) + self.assertCIMNameEqual(iname, objpath) + self.assertEqual(set(iname.key_properties()), set(self.KEYS)) + repos.remove(iname.Dependent.Name) + self.assertEqual(len(repos), 0) def test_enum_instances(self): """ - Tests EnumInstanceNames call on repositories from our cache. + Test ``EnumInstanceNames()`` call on + ``LMI_HostedSoftwareIdentityResource``. """ - inames = self.cim_class.instances() - repos = { r.repoid: r for r in self.repodb } - self.assertEqual(len(repos), len(inames)) - for inst in inames: + insts = self.cim_class.instances() + repos = set(r for r in self.repodb) + repos.update([r for r in self.other_repos]) + self.assertEqual(len(insts), len(repos)) + for inst in insts: repoid = inst.Dependent.Name self.assertIn(repoid, repos) - objpath = self.make_op(repos[repoid]) - self.assertCIMNameEqual(objpath, inst.path) + if repoid in self.repodb: + repo = self.repodb[repoid] + else: + repo = self.other_repos[repoid] + objpath = self.make_op(repo) + self.assertCIMNameEqual(inst.path, objpath) + self.assertEqual(set(inst.path.key_properties()), set(self.KEYS)) for key in self.KEYS: - self.assertIn(key, inst.properties(), - 'OP is missing \"%s\" key for repo "%s".' % (key, repoid)) self.assertCIMNameEqual( getattr(inst, key), getattr(inst.path, key), 'Key property "%s" does not match for repo "%s"!' % (key, repoid)) + repos.remove(repoid) + self.assertEqual(len(repos), 0) - def test_get_antecedent_referents(self): + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates' : True, + 'updates-testing' : False, + 'misc' : False + }) + def test_get_computer_system_repositories(self): """ - Test ReferenceNames for ComputerSystem. + Try to get repositories associated with computer system. """ - if not self.repodb: - return - repo = self.repodb[0] - objpath = self.make_op(repo) - refs = objpath.Antecedent.to_instance().associator_names( + objpath = self.make_op(self.get_repo('stable')) + refs = objpath.Antecedent.to_instance().associators( AssocClass=self.CLASS_NAME, Role="Antecedent", ResultRole="Dependent", ResultClass="LMI_SoftwareIdentityResource") - repos = {r.repoid: r for r in self.repodb} + repos = set(r for r in self.repodb) + repos.update(self.other_repos.keys()) + self.assertEqual(len(refs), len(repos)) for ref in refs: - self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.path.namespace, 'root/cimv2') self.assertEqual(ref.classname, "LMI_SoftwareIdentityResource") - objpath = self.make_op(repos[ref.Name]) + repoid = ref.Name + if repoid in self.repodb: + repo = self.repodb[repoid] + else: + repo = self.other_repos[repoid] + objpath = self.make_op(repo) + self.assertCIMNameEqual(ref.path, objpath.Dependent) + self.assertEqual(set(ref.path.key_properties()), + set(["CreationClassName", "Name", + "SystemCreationClassName", "SystemName"])) + if repo.status: + self.assertEqual(ref.EnabledState, ENABLED_STATE_ENABLED, + "EnabledState does not match for repo %s" % repo.repoid) + else: + self.assertEqual(ref.EnabledState, ENABLED_STATE_DISABLED, + "EnabledState does not match for repo %s" % repo.repoid) + repos.remove(ref.Name) + self.assertEqual(len(repos), 0) + + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates' : True, + 'updates-testing' : False, + 'misc' : False + }) + def test_get_computer_system_repository_names(self): + """ + Try to get repository names associated with computer system. + """ + objpath = self.make_op(self.get_repo('stable')) + refs = objpath.Antecedent.to_instance().associator_names( + AssocClass=self.CLASS_NAME, + Role="Antecedent", + ResultRole="Dependent", + ResultClass="LMI_SoftwareIdentityResource") + repos = set(r for r in self.repodb) + repos.update(self.other_repos.keys()) + self.assertEqual(len(refs), len(repos)) + for ref in refs: + repoid = ref.Name + if repoid in self.repodb: + repo = self.repodb[repoid] + else: + repo = self.other_repos[repoid] + objpath = self.make_op(repo) self.assertCIMNameEqual(objpath.Dependent, ref) - del repos[ref.Name] - self.assertEqual(0, len(repos)) + self.assertEqual(set(ref.key_properties()), + set(["CreationClassName", "Name", + "SystemCreationClassName", "SystemName"])) + repos.remove(ref.Name) + self.assertEqual(len(repos), 0) + + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates' : True, + 'updates-testing' : False, + 'misc' : False + }) + def test_get_repository_computer_systems(self): + """ + Try to get computer system associated with repository. + """ + for repo in self.repodb.values(): + objpath = self.make_op(repo=repo) + refs = objpath.Dependent.to_instance().associators( + AssocClass=self.CLASS_NAME, + Role="Dependent", + ResultRole="Antecedent", + ResultClass=self.system_cs_name) + self.assertEqual(len(refs), 1) + self.assertCIMNameEqual(refs[0].path, objpath.Antecedent) - def test_get_dependent_referents(self): + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates' : True, + 'updates-testing' : False, + 'misc' : False + }) + def test_get_repository_computer_system_names(self): """ - Test ReferenceNames for repository. + Try to get computer system names associated with repository. """ - for repo in self.repodb: + for repo in self.repodb.values(): objpath = self.make_op(repo=repo) refs = objpath.Dependent.to_instance().associator_names( AssocClass=self.CLASS_NAME, @@ -142,7 +238,7 @@ class TestHostedSoftwareIdentityResource(base.SoftwareBaseTestCase): ResultRole="Antecedent", ResultClass=self.system_cs_name) self.assertEqual(len(refs), 1) - self.assertCIMNameEqual(objpath.Antecedent, refs[0]) + self.assertCIMNameEqual(refs[0], objpath.Antecedent) def suite(): """For unittest loaders.""" diff --git a/src/software/test/test_installed_software_identity.py b/src/software/test/test_installed_software_identity.py index ee79318..3535294 100644 --- a/src/software/test/test_installed_software_identity.py +++ b/src/software/test/test_installed_software_identity.py @@ -19,93 +19,144 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_InstalledSoftwareIdentity provider. +Unit tests for ``LMI_InstalledSoftwareIdentity`` provider. """ import pywbem import unittest +from lmi.shell import LMIDeletedObjectError from lmi.test.lmibase import enable_lmi_exceptions -from lmi.test.util import mark_dangerous from lmi.test.util import mark_tedious -import base -import rpmcache +import package +import swbase -class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): +class TestInstalledSoftwareIdentity(swbase.SwTestCase): """ - Basic cim operations test. + Basic cim operations test on `LMI_InstalledSoftwareIdentity``. """ CLASS_NAME = "LMI_InstalledSoftwareIdentity" KEYS = ("InstalledSoftware", "System") - def make_op(self, pkg, newer=True): + def make_op(self, pkg): """ - @return object path of InstalledSoftwareIdentity association + :returns Object path of ``LMI_InstalledSoftwareIdentity``. + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ return self.cim_class.new_instance_name({ "System" : self.system_iname, "InstalledSoftware" : self.ns.LMI_SoftwareIdentity. \ new_instance_name({ - "InstanceID" : 'LMI:LMI_SoftwareIdentity:' - + pkg.get_nevra(newer=newer, with_epoch="ALWAYS") + "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + pkg.nevra }) }) + @swbase.test_with_packages('stable#pkg1') def test_get_instance(self): """ - Tests GetInstance call on packages from our rpm cache. + Test ``GetInstance()`` call on ``LMI_InstalledSoftwareIdentity``. """ - for pkg in self.safe_pkgs: - objpath = self.make_op(pkg) - inst = objpath.to_instance() - self.assertCIMNameEqual(inst.path, objpath, - "Object paths should match for package %s"%pkg) - for key in self.KEYS: - self.assertIn(key, inst.properties(), - "OP is missing \"%s\" key for package %s"%(key, pkg)) - self.assertCIMNameEqual( - getattr(inst, key), getattr(inst.path, key)) - self.assertCIMNameEqual(objpath, inst.path, - "Object paths should match for package %s"%pkg) - - # try it now with CIM_ComputerSystem - system = objpath.System.copy() - objpath.System.wrapped_object.classname = "CIM_ComputerSystem" - objpath.System.wrapped_object.CreationClassName = \ - "CIM_ComputerSystem" - - inst = objpath.to_instance() - self.assertCIMNameEqual(objpath, inst.path, - "Object paths should match for package %s"%pkg) - for key in self.KEYS: - self.assertIn(key, inst.properties(), - "OP is missing \"%s\" key for package %s"%(key, pkg)) - self.assertCIMNameEqual(system, inst.System) - self.assertCIMNameEqual(objpath, inst.path) + repo = self.get_repo('stable') + pkg = repo['pkg1'] + objpath = self.make_op(pkg) + inst = objpath.to_instance() + self.assertEqual(set(inst.path.key_properties()), set(self.KEYS)) + for key in self.KEYS: + self.assertCIMNameEqual( + getattr(inst, key), getattr(inst.path, key)) + self.assertCIMNameEqual(inst.path, objpath, + "Object paths should match for package %s" % pkg) + + # try it now with CIM_ComputerSystem + system = objpath.System.copy() + objpath.System.wrapped_object.classname = "CIM_ComputerSystem" + objpath.System.wrapped_object.CreationClassName = \ + "CIM_ComputerSystem" + + inst = objpath.to_instance() + self.assertCIMNameEqual(inst.path, objpath, + "Object paths should match for package %s" % pkg) + for key in self.KEYS: + self.assertIn(key, inst.properties(), + "OP is missing \"%s\" key for package %s" % (key, pkg)) + self.assertCIMNameEqual(inst.System, system) + self.assertCIMNameEqual(inst.path, objpath) + + @enable_lmi_exceptions + @swbase.test_with_packages(**{'stable#pkg2' : False }) + def test_get_instance_not_installed(self): + """ + Test ``GetInstance()`` call on ``LMI_InstalledSoftwareIdentity``. + """ + repo = self.get_repo('stable') + pkg = repo['pkg2'] + objpath = self.make_op(pkg) + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance) - def test_enum_instance_names_safe(self): + @mark_tedious + @swbase.test_with_packages(**{ + 'pkg3' : False, + 'pkg4' : False, + 'stable#pkg1' : True, + 'stable#pkg2' : False, + 'updates#*' : False, + 'updates-testing#pkg1' : False, + 'updates-testing#pkg2' : True, + 'misc#conflict1' : False, + 'misc#conflict2' : False, + 'misc#depend1' : False, + 'misc#depend2' : False, + 'misc#pkg1' : False, + 'misc#funny-version' : True, + 'misc#funny-release' : True, + }) + def test_enum_instance_names(self): """ - Tests EnumInstanceNames call on installed packages. + Test ``EnumInstanceNames()`` call on ``LMI_InstalledSoftwareIdentity``. """ inames = self.cim_class.instance_names() self.assertGreater(len(inames), 0) - objpath = self.make_op(self.safe_pkgs[0]) + objpath = self.make_op(self.get_repo('stable')['pkg1']) for iname in inames: self.assertEqual(iname.namespace, 'root/cimv2') self.assertEqual(iname.classname, self.CLASS_NAME) self.assertEqual(sorted(iname.key_properties()), sorted(self.KEYS)) self.assertCIMNameEqual(objpath.System, iname.System) - nevra_set = set(i.InstalledSoftware.InstanceID for i in inames) - for pkg in self.safe_pkgs: - nevra = 'LMI:LMI_SoftwareIdentity:' \ - + pkg.get_nevra(with_epoch="ALWAYS") - self.assertTrue(nevra in nevra_set, 'Missing nevra "%s".' % nevra) + nevra_set = set(i.InstalledSoftware.InstanceID[ + len('LMI:LMI_SoftwareIdentity:'):] for i in inames) + db_installed = 0 + for repo in self.repodb.values(): + for pkg in repo.packages: + if package.is_pkg_installed(pkg): + self.assertTrue(pkg.nevra in nevra_set) + db_installed += 1 + else: + self.assertFalse(pkg.nevra in nevra_set) + # test that number of tested packages installed really match number + # of requested packages + self.assertEqual(db_installed, 4) @mark_tedious + @swbase.test_with_packages(**{ + 'pkg3' : False, + 'pkg4' : False, + 'stable#pkg1' : True, + 'stable#pkg2' : False, + 'updates#*' : False, + 'updates-testing#pkg1' : False, + 'updates-testing#pkg2' : True, + 'misc#conflict1' : False, + 'misc#conflict2' : False, + 'misc#depend1' : False, + 'misc#depend2' : False, + 'misc#pkg1' : False, + 'misc#funny-version' : True, + 'misc#funny-release' : True, + }) def test_enum_instances(self): """ - Tests EnumInstances call on installed packages. + Test ``EnumInstances() call on ``LMI_InstalledSoftwareIdentity``. """ insts = self.cim_class.instances() self.assertGreater(len(insts), 0) @@ -114,94 +165,96 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): for key in self.KEYS: self.assertCIMNameEqual( getattr(inst, key), getattr(inst.path, key)) - nevra_set = set(i.InstalledSoftware.InstanceID for i in insts) - for pkg in self.safe_pkgs: - nevra = 'LMI:LMI_SoftwareIdentity:' \ - + pkg.get_nevra(with_epoch="ALWAYS") - self.assertTrue(nevra in nevra_set, "Missing pkg %s in nevra_set." - % nevra) - - @unittest.skip("dangerous tests are not reliable") - @mark_tedious - @mark_dangerous - def test_enum_instance_names(self): + nevra_set = set(i.InstalledSoftware.InstanceID[ + len('LMI:LMI_SoftwareIdentity:'):] for i in insts) + db_installed = 0 + for repo in self.repodb.values(): + for pkg in repo.packages: + if package.is_pkg_installed(pkg): + self.assertTrue(pkg.nevra in nevra_set) + db_installed += 1 + else: + self.assertFalse(pkg.nevra in nevra_set) + # test that number of tested packages installed really match number + # of requested packages + self.assertEqual(db_installed, 4) + + @swbase.test_with_repos('stable', + **{'updates' : False, 'updates-testing' : False, 'misc' : False }) + @swbase.test_with_packages(**{'stable#pkg1' : False}) + def test_install_package(self): """ - Tests EnumInstanceNames call on dangerous packages. + Test package installation with ``LMI_InstalledSoftwareIdentity``. """ - pkg = self.dangerous_pkgs[0] + repo = self.get_repo('stable') + pkg = repo['pkg1'] + self.assertFalse(package.is_pkg_installed(pkg)) + objpath = self.make_op(pkg) - rpmcache.ensure_pkg_installed(pkg) + properties = { + "InstalledSoftware" : objpath.InstalledSoftware, + "System" : objpath.System + } - inames1 = self.cim_class.instance_names() - self.assertGreater(len(inames1), 1) - self.assertIn('LMI:LMI_SoftwareIdentity:' - + pkg.get_nevra(with_epoch="ALWAYS"), - set(i.InstalledSoftware.InstanceID for i in inames1)) - - rpmcache.remove_pkg(pkg.name) - inames2 = self.cim_class.instance_names() - self.assertEqual(len(inames1), len(inames2) + 1) - self.assertNotIn('LMI:LMI_SoftwareIdentity:' - + pkg.get_nevra(with_epoch="ALWAYS"), - set(i.InstalledSoftware.InstanceID for i in inames2)) - - rpmcache.install_pkg(pkg) - inames3 = self.cim_class.instance_names() - self.assertEqual(len(inames1), len(inames3)) - self.assertIn('LMI:LMI_SoftwareIdentity:' - + pkg.get_nevra(with_epoch="ALWAYS"), - set(i.InstalledSoftware.InstanceID for i in inames3)) + inst = self.cim_class.create_instance(properties) + self.assertTrue(package.is_pkg_installed(pkg)) + self.assertCIMNameEqual(inst.path, objpath, + 'Object path does not match for %s.' % pkg) @enable_lmi_exceptions - @mark_dangerous - def test_create_instance(self): + @swbase.test_with_repos('stable', + **{'updates' : False, 'updates-testing' : False, 'misc' : False }) + @swbase.test_with_packages('stable#pkg1') + def test_install_already_installed(self): """ - Tests new instance creation for dangerous packages. + Try to install package that is already installed with + ``LMI_InstalledSoftwareIdentity``. """ - for pkg in self.dangerous_pkgs: - if rpmcache.is_pkg_installed(pkg.name): - rpmcache.remove_pkg(pkg.name) - objpath = self.make_op(pkg) - self.assertFalse(rpmcache.is_pkg_installed(pkg)) - - properties = { - "InstalledSoftware" : objpath.InstalledSoftware, - "System" : objpath.System - } + pkg = self.get_repo('stable')['pkg1'] + self.assertTrue(package.is_pkg_installed(pkg)) + objpath = self.make_op(pkg) - inst = self.cim_class.create_instance(properties) - self.assertTrue(rpmcache.is_pkg_installed(pkg)) - self.assertCIMNameEqual(objpath, inst.path, - 'Object path does not match for %s.' % pkg) - - # try to install second time - self.assertRaisesCIM(pywbem.CIM_ERR_ALREADY_EXISTS, - self.cim_class.create_instance, inst.properties_dict()) + properties = { + "InstalledSoftware" : objpath.InstalledSoftware, + "System" : objpath.System + } + # try to install second time + self.assertRaisesCIM(pywbem.CIM_ERR_ALREADY_EXISTS, + self.cim_class.create_instance, properties) @enable_lmi_exceptions - @mark_dangerous - def test_delete_instance(self): + @swbase.test_with_packages('stable#pkg2') + def test_remove_installed(self): """ - Tests removing of dangerous packages by deleting instance name. + Test package removal with ``LMI_InstalledSoftwareIdentity``. """ - for pkg in self.dangerous_pkgs: - self.ensure_pkg_installed(pkg) - self.assertTrue(rpmcache.is_pkg_installed(pkg)) - objpath = self.make_op(pkg) - inst = objpath.to_instance() - inst.delete() - self.assertFalse(rpmcache.is_pkg_installed(pkg), - "Failed to delete instance for %s." % pkg) + pkg = self.get_repo('stable')['pkg2'] + self.assertTrue(package.is_pkg_installed(pkg)) + objpath = self.make_op(pkg) + inst = objpath.to_instance() + inst.delete() + self.assertFalse(package.is_pkg_installed(pkg)) - # try to delete second time - self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, inst.delete) + # try to delete second time + self.assertRaises(LMIDeletedObjectError, inst.delete) + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance) @mark_tedious - def test_get_system_referents(self): + @swbase.test_with_packages(**{ + 'updates#pkg3' : True, + 'stable#pkg1' : True, + 'pkg2' : False, + 'pkg4' : False, + 'updates-testing#*' : False, + 'misc#*' : False + }) + def test_get_installed_package_names(self): """ - Test ReferenceNames for ComputerSystem. + Try to get installed packages associated with computer system + via ``LMI_InstalledSoftwareIdentity``. """ - objpath = self.make_op(self.safe_pkgs[0]) + pkg = self.get_repo('stable')['pkg1'] + objpath = self.make_op(pkg) refs = objpath.System.to_instance().associator_names( AssocClass=self.CLASS_NAME, Role="System", @@ -214,27 +267,38 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): self.assertEqual(ref.key_properties(), ["InstanceID"]) self.assertTrue(ref.InstanceID.startswith( "LMI:LMI_SoftwareIdentity:")) + nevra_set = set( i.InstanceID[len('LMI:LMI_SoftwareIdentity:'):] + for i in refs) - nevra_set = set(i.InstanceID for i in refs) - for pkg in self.safe_pkgs: - nevra = 'LMI:LMI_SoftwareIdentity:' \ - + pkg.get_nevra(with_epoch="ALWAYS") - self.assertTrue(nevra in nevra_set, 'Missing nevra "%s".' % nevra) + db_installed = 0 + for repo in self.repodb.values(): + for pkg in repo.packages: + if package.is_pkg_installed(pkg): + self.assertTrue(pkg.nevra in nevra_set) + db_installed += 1 + else: + self.assertFalse(pkg.nevra in nevra_set) + # test that number of tested packages installed really match number + # of requested packages + self.assertEqual(db_installed, 2) + @swbase.test_with_packages('stable#pkg1') def test_get_installed_software_referents(self): """ - Test ReferenceNames for SoftwareIdentity. - """ - for pkg in self.safe_pkgs: - objpath = self.make_op(pkg) - refs = objpath.InstalledSoftware.to_instance().associator_names( - AssocClass='LMI_InstalledSoftwareIdentity', - Role="InstalledSoftware", - ResultRole="System", - ResultClass=self.system_cs_name) - self.assertEqual(len(refs), 1) - ref = refs[0] - self.assertCIMNameEqual(objpath.System, ref) + Try to get computer system associated with installed package via + ``LMI_InstalledSoftwareIdentity``. + """ + pkg = self.get_repo('stable')['pkg1'] + self.assertTrue(package.is_pkg_installed(pkg)) + objpath = self.make_op(pkg) + refs = objpath.InstalledSoftware.to_instance().associator_names( + AssocClass='LMI_InstalledSoftwareIdentity', + Role="InstalledSoftware", + ResultRole="System", + ResultClass=self.system_cs_name) + self.assertEqual(len(refs), 1) + ref = refs[0] + self.assertCIMNameEqual(objpath.System, ref) def suite(): """For unittest loaders.""" diff --git a/src/software/test/test_member_of_software_collection.py b/src/software/test/test_member_of_software_collection.py index e51f56d..6acb9e0 100644 --- a/src/software/test/test_member_of_software_collection.py +++ b/src/software/test/test_member_of_software_collection.py @@ -19,25 +19,25 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_MemberOfSoftwareCollection provider. +Unit tests for ``LMI_MemberOfSoftwareCollection`` provider. """ -import pywbem import unittest -import base +import swbase -class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase): +class TestMemberOfSoftwareCollection(swbase.SwTestCase): """ - Basic cim operations test. + Basic cim operations test for ``LMI_MemberOfSoftwareCollection``. """ CLASS_NAME = "LMI_MemberOfSoftwareCollection" KEYS = ("Collection", "Member") - def make_op(self, pkg, newer=True): + def make_op(self, pkg): """ - @return object path of MembeOfSoftwareCollection association + :returns Object path of ``LMI_MemberOfSoftwareCollection``. + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ return self.cim_class.new_instance_name({ "Collection" : self.ns.LMI_SystemSoftwareCollection. \ @@ -45,46 +45,228 @@ class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase): "InstanceID" : "LMI:LMI_SystemSoftwareCollection" }), "Member" : self.ns.LMI_SoftwareIdentity.new_instance_name({ - "InstanceID" : 'LMI:LMI_SoftwareIdentity:' \ - + pkg.get_nevra(newer=newer, with_epoch="ALWAYS") + "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + pkg.nevra }) }) + @swbase.test_with_repos('stable', 'misc', 'updates-testing') def test_get_instance(self): """ - Tests GetInstance call on packages from our rpm cache. + Test ``GetInstance()`` call on ``LMI_MemberOfSoftwareCollection``. """ - for pkg in self.safe_pkgs: + for repoid in ('stable', 'misc', 'updates-testing'): + repo = self.get_repo(repoid) + for pkg in repo.packages: + objpath = self.make_op(pkg) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + 'Package "%s" is available in SystemSoftwareCollection' + % pkg) + self.assertCIMNameEqual(inst.path, objpath, + "Object paths should match for package %s" % pkg) + self.assertEqual(set(inst.path.key_properties()), + set(self.KEYS)) + + @swbase.test_with_repos(misc=False) + def test_get_instance_with_disabled_repo(self): + """ + Test ``GetInstance()`` call on ``LMI_MemberOfSoftwareCollection`` + with package from disabled repository. + """ + repo = self.get_repo('misc') + for pkg in repo.packages: objpath = self.make_op(pkg) inst = objpath.to_instance() - self.assertNotEqual(inst, None, - 'Package "%s" not available in SystemSoftwareCollection' - % str(pkg)) - self.assertCIMNameEqual(inst.path, objpath, - "Object paths should match for package %s"%pkg) + self.assertEqual(inst, None, + 'package should not be accessible from disabled repository.') + + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates': False, + 'updates-testing': True, + 'misc' : False + }) + @swbase.test_with_packages(**{ + 'stable#pkg1' : True, + 'updates#pkg2' : True, + 'pkg3' : False, + 'pkg4' : False + }) + def test_enum_instances(self): + """ + Test ``EnumInstances()`` call on ``LMI_MemberOfSoftwareCollection``. + """ + insts = self.cim_class.instances() + self.assertGreater(len(insts), 0) + for inst in insts: + self.assertEqual(set(inst.path.key_properties()), set(self.KEYS)) for key in self.KEYS: - self.assertIn(key, inst.properties(), - "OP is missing \"%s\" key for package %s"%(key, pkg)) + self.assertCIMNameEqual( + getattr(inst, key), getattr(inst.path, key)) + nevra_set = set(i.Member.InstanceID[ + len('LMI:LMI_SoftwareIdentity:'):] for i in insts) + for repo in self.repodb.values(): + if not repo.repoid in ( + 'openlmi-sw-test-repo-stable', + 'openlmi-sw-test-repo-updates-testing'): + continue + for pkg in repo.packages: + self.assertIn(pkg.nevra, nevra_set) + nevra_set.remove(pkg.nevra) + self.assertEqual(len(nevra_set), 0) + + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates': False, + 'updates-testing': True, + 'misc' : False + }) + def test_enum_instance_names(self): + """ + Test ``EnumInstanceNames()`` call on ``LMI_MemberOfSoftwareCollection``. + """ + inames = self.cim_class.instance_names() + self.assertGreater(len(inames), 0) + objpath = self.make_op(self.get_repo('stable')['pkg1']) + for iname in inames: + self.assertEqual(iname.namespace, 'root/cimv2') + self.assertEqual(iname.classname, self.CLASS_NAME) + self.assertEqual(sorted(iname.key_properties()), sorted(self.KEYS)) + self.assertCIMNameEqual(objpath.Collection, iname.Collection) + nevra_set = set(i.Member.InstanceID[ + len('LMI:LMI_SoftwareIdentity:'):] for i in inames) + for repo in self.repodb.values(): + if not repo.repoid in ( + 'openlmi-sw-test-repo-stable', + 'openlmi-sw-test-repo-updates-testing'): + continue + for pkg in repo.packages: + self.assertIn(pkg.nevra, nevra_set) + nevra_set.remove(pkg.nevra) + self.assertEqual(len(nevra_set), 0) + + + @swbase.test_with_repos('stable') + def test_get_package_collections(self): + """ + Try to get instance of software collection associated with package. + """ + repo = self.get_repo('stable') + for pkg in repo.packages: + objpath = self.make_op(pkg) + inst = objpath.Member.to_instance() + self.assertNotEqual(inst, None, + "package %s is available" % pkg) + refs = inst.associators( + AssocClass=self.CLASS_NAME, + Role="Member", + ResultRole="Collection", + ResultClass="LMI_SystemSoftwareCollection") + self.assertEqual(len(refs), 1, + "exactly one collection is associated with package %s" % pkg) + self.assertCIMNameEqual(refs[0].path, objpath.Collection) + self.assertEqual(refs[0].InstanceID, objpath.Collection.InstanceID) - def test_get_member_referents(self): + @swbase.test_with_repos('stable') + def test_get_package_collection_names(self): """ - Test ReferenceNames for SoftwareIdentity. + Try to get instance of software collection associated with package. """ - for pkg in self.safe_pkgs: + repo = self.get_repo('stable') + for pkg in repo.packages: objpath = self.make_op(pkg) - member = objpath.Member.to_instance() - self.assertNotEqual(member, None, - 'Package "%s" not found' % str(pkg)) - refs = member.associator_names( + inst = objpath.Member.to_instance() + self.assertNotEqual(inst, None, + "package %s is available" % pkg) + refs = inst.associator_names( AssocClass=self.CLASS_NAME, Role="Member", ResultRole="Collection", ResultClass="LMI_SystemSoftwareCollection") self.assertEqual(len(refs), 1, - 'Package "%s" not available in SystemSoftwareCollection' - % str(pkg)) - ref = refs[0] - self.assertCIMNameEqual(objpath.Collection, ref) + "exactly one collection is associated with package %s" % pkg) + self.assertCIMNameEqual(refs[0], objpath.Collection) + + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates' : False, + 'updates-testing' : True, + 'misc' : False + }) + def test_get_collection_package_names(self): + """ + Try to get package names associated to software collection. + """ + pkg = self.get_repo('stable')['pkg1'] + objpath = self.make_op(pkg) + collection = objpath.Collection.to_instance() + self.assertNotEqual(collection, None, + "GetInstance call succeeds on collection") + refs = collection.associator_names( + AssocClass=self.CLASS_NAME, + Role="Collection", + ResultRole="Member", + ResultClass="LMI_SoftwareIdentity") + self.assertGreater(len(refs), 0) + + for ref in refs: + self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.classname, "LMI_SoftwareIdentity") + self.assertEqual(ref.key_properties(), ["InstanceID"]) + self.assertTrue(ref.InstanceID.startswith( + "LMI:LMI_SoftwareIdentity:")) + + nevra_set = set( i.InstanceID[len('LMI:LMI_SoftwareIdentity:'):] + for i in refs) + for repo in self.repodb.values(): + if not repo.repoid in ( + 'openlmi-sw-test-repo-stable', + 'openlmi-sw-test-repo-updates-testing'): + continue + for pkg in repo.packages: + self.assertIn(pkg.nevra, nevra_set) + nevra_set.remove(pkg.nevra) + self.assertEqual(len(nevra_set), 0) + + @swbase.test_with_repos(**{ + 'stable' : True, + 'updates' : False, + 'updates-testing' : True, + 'misc' : False + }) + def test_get_collection_packages(self): + """ + Try to get packages associated to software collection. + """ + pkg = self.get_repo('stable')['pkg1'] + objpath = self.make_op(pkg) + collection = objpath.Collection.to_instance() + self.assertNotEqual(collection, None, + "GetInstance call succeeds on collection") + refs = collection.associators( + AssocClass=self.CLASS_NAME, + Role="Collection", + ResultRole="Member", + ResultClass="LMI_SoftwareIdentity") + self.assertGreater(len(refs), 0) + + for ref in refs: + self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.classname, "LMI_SoftwareIdentity") + self.assertEqual(ref.path.key_properties(), ["InstanceID"]) + self.assertTrue(ref.InstanceID.startswith( + "LMI:LMI_SoftwareIdentity:")) + + nevra_set = set(i.ElementName for i in refs) + for repo in self.repodb.values(): + if not repo.repoid in ( + 'openlmi-sw-test-repo-stable', + 'openlmi-sw-test-repo-updates-testing'): + continue + for pkg in repo.packages: + self.assertIn(pkg.nevra, nevra_set) + nevra_set.remove(pkg.nevra) + self.assertEqual(len(nevra_set), 0) def suite(): """For unittest loaders.""" diff --git a/src/software/test/test_resource_for_software_identity.py b/src/software/test/test_resource_for_software_identity.py index faeecb8..e44f0db 100644 --- a/src/software/test/test_resource_for_software_identity.py +++ b/src/software/test/test_resource_for_software_identity.py @@ -19,17 +19,16 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_ResourceForSoftwareIdentity provider. +Unit tests for ``LMI_ResourceForSoftwareIdentity`` provider. """ -import pywbem import unittest -from lmi.test.util import mark_dangerous -from lmi.test.util import mark_tedious -import base +import swbase -class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase): +ENABLED_STATE_ENABLED = 2 + +class TestResourceForSoftwareIdentity(swbase.SwTestCase): """ Basic cim operations test. """ @@ -37,13 +36,10 @@ class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase): CLASS_NAME = "LMI_ResourceForSoftwareIdentity" KEYS = ("ManagedElement", "AvailableSAP") - @classmethod - def needs_repodb(cls): - return True - - def make_op(self, pkg=None, newer=True, repo=None): + def make_op(self, pkg=None, repo=None): """ - @return object path of ResourceForSoftwareIdentity association + :returns: Object path of ``LMI_ResourceForSoftwareIdentity`` + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ objpath = self.cim_class.new_instance_name({ "AvailableSAP" : self.ns.LMI_SoftwareIdentityResource. \ @@ -52,125 +48,217 @@ class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase): "SystemCreationClassName" : self.system_cs_name, "SystemName" : self.SYSTEM_NAME }), - "ManagedElement" : self.ns.LMI_SoftwareIdentity.new_instance_name({}) + "ManagedElement" : + self.ns.LMI_SoftwareIdentity.new_instance_name({}) }) if repo is not None: objpath.AvailableSAP.wrapped_object["Name"] = repo.repoid elif pkg is not None: - objpath.AvailableSAP.wrapped_object["Name"] = getattr(pkg, - 'up_repo' if newer else 'repo') + objpath.AvailableSAP.wrapped_object["Name"] = pkg.repoid if pkg is not None: - objpath.ManagedElement.wrappe_object["InstanceID"] = ( - 'LMI:LMI_SoftwareIdentity:' - + pkg.get_nevra(newer=newer, with_epoch="ALWAYS")) + objpath.ManagedElement.wrapped_object["InstanceID"] = ( + 'LMI:LMI_SoftwareIdentity:' + pkg.nevra) return objpath - @mark_dangerous + @swbase.test_with_repos('stable', 'updates', 'misc', + **{'updates-testing' : True}) def test_get_instance(self): """ - Tests GetInstance call on packages from our rpm cache. + Test ``GetInstance()`` call on ``LMI_ResourceForSoftwareIdentity``. + """ + for repo in self.repodb.values(): + self.assertTrue(repo.status) + for pkg in repo.packages: + objpath = self.make_op(pkg, repo) + inst = objpath.to_instance() + self.assertNotEqual(inst, None) + self.assertEqual(set(inst.path.key_properties()), + set(self.KEYS)) + for key in self.KEYS: + self.assertCIMNameEqual( + getattr(inst, key), getattr(inst.path, key), + 'Key property "%s" does not match for package "%s#%s"!' + % (key, repo.repoid, pkg)) + + @swbase.test_with_repos(**{'updates' : False}) + def test_get_instance_disabled_repo(self): + """ + Test ``GetInstance()`` call on ``LMI_ResourceForSoftwareIdentity`` + with disabled repository. """ - for pkg in self.dangerous_pkgs: - objpath = self.make_op(pkg) + repo = self.get_repo('updates') + self.assertFalse(repo.status) + for pkg in repo.packages: + objpath = self.make_op(pkg, repo) inst = objpath.to_instance() - self.assertCIMNameEqual(objpath, inst.path, - "Object paths should match for package %s"%pkg) + self.assertNotEqual(inst, None) + self.assertEqual(set(inst.path.key_properties()), + set(self.KEYS)) for key in self.KEYS: - self.assertIn(key, inst.properties(), - 'OP is missing \"%s\" key for package "%s".' % (key, pkg)) self.assertCIMNameEqual( - getattr(inst, key), getattr(inst.path, key), - 'Key property "%s" does not match for package "%s"!' % - (key, pkg)) + getattr(inst, key), getattr(inst.path, key), + 'Key property "%s" does not match for package "%s#%s"!' % + (key, repo.repoid, pkg)) - @mark_tedious - def test_get_resource_referents(self): + @swbase.test_with_repos('stable') + def test_repo_identity_names(self): """ - Test ReferenceNames for AvailableSAP. + Test ``AssociatorNames()`` call on ``LMI_ResourceForSoftwareIdentity``. """ - for repo in self.repodb: - objpath = self.make_op(repo=repo) - if not repo.pkg_count or repo.pkg_count > 10000: - # do not test too big repositories - continue - refs = objpath.AvailableSAP.to_instance().associator_names( - AssocClass=self.CLASS_NAME, - Role="AvailableSAP", - ResultRole="ManagedElement", - ResultClass="LMI_SoftwareIdentity") - if repo.pkg_count > 0: - self.assertGreater(len(refs), 0, - 'repository "%s" is missing software identities' - % repo.name) - for ref in refs: - self.assertEqual(ref.namespace, 'root/cimv2') - self.assertEqual(ref.classname, "LMI_SoftwareIdentity") - self.assertEqual(ref.key_properties(), ["InstanceID"]) - self.assertTrue( - ref.InstanceID.startswith("LMI:LMI_SoftwareIdentity:")) - - nevra_set = set(i.InstanceID for i in refs) - # NOTE: installed packages might not be available - for pkg, up in ((pkg, up) for pkg in self.dangerous_pkgs - for up in (True, False)): - nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra( - newer=up, with_epoch="ALWAYS") - reponame = getattr(pkg, 'up_repo' if up else 'repo') - if reponame == repo.repoid: - self.assertTrue(nevra in nevra_set, - 'Missing nevra "%s" for repo "%s".' % (nevra, - reponame)) - - @mark_tedious - def test_get_resource_referents_for_disabled_repo(self): - """ - Test ReferenceNames for AvailableSAP, which is disabled. - """ - for repo in self.repodb: - if repo.status: - continue # test only disabled repositories - objpath = self.make_op(repo=repo) - refs = objpath.AvailableSAP.to_instance().associator_names( + repo = self.get_repo('stable') + self.assertTrue(repo.status) + self.assertGreater(repo.pkg_count, 0) + + objpath = self.make_op(repo=repo) + refs = objpath.AvailableSAP.to_instance().associator_names( + AssocClass=self.CLASS_NAME, + Role="AvailableSAP", + ResultRole="ManagedElement", + ResultClass="LMI_SoftwareIdentity") + self.assertEqual(len(refs), repo.pkg_count, + 'repository "%s" is missing software identities' + % repo.name) + for ref in refs: + self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.classname, "LMI_SoftwareIdentity") + self.assertEqual(ref.key_properties(), ["InstanceID"]) + self.assertTrue( + ref.InstanceID.startswith("LMI:LMI_SoftwareIdentity:")) + + nevra_set = set(i.InstanceID for i in refs) + for pkg in repo.packages: + nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.nevra + self.assertTrue(nevra in nevra_set, + 'Missing nevra "%s" for repo "%s".' % (nevra, + repo.repoid)) + nevra_set.remove(nevra) + self.assertEqual(len(nevra_set), 0, + "all packages from repository have been listed") + + @swbase.test_with_repos(stable=False) + def test_disabled_repo_identity_names(self): + """ + Test ``AssociatorNames()`` call on ``LMI_ResourceForSoftwareIdentity``. + """ + repo = self.get_repo('stable') + self.assertFalse(repo.status) + self.assertGreater(repo.pkg_count, 0) + + objpath = self.make_op(repo=repo) + refs = objpath.AvailableSAP.to_instance().associator_names( + AssocClass=self.CLASS_NAME, + Role="AvailableSAP", + ResultRole="ManagedElement", + ResultClass="LMI_SoftwareIdentity") + self.assertEqual(len(refs), repo.pkg_count, + 'repository "%s" is missing software identities' + % repo.name) + for ref in refs: + self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.classname, "LMI_SoftwareIdentity") + self.assertEqual(ref.key_properties(), ["InstanceID"]) + self.assertTrue( + ref.InstanceID.startswith("LMI:LMI_SoftwareIdentity:")) + + @swbase.test_with_repos('updates') + def test_repo_indentities(self): + """ + Test ``Associators()`` call on ``LMI_ResourceForSoftwareIdentity``. + """ + repo = self.get_repo('updates') + self.assertTrue(repo.status) + self.assertGreater(repo.pkg_count, 0) + + objpath = self.make_op(repo=repo) + refs = objpath.AvailableSAP.to_instance().associators( + AssocClass=self.CLASS_NAME, + Role="AvailableSAP", + ResultRole="ManagedElement", + ResultClass="LMI_SoftwareIdentity") + if repo.pkg_count: + self.assertGreater(len(refs), 0, + 'no software identities associated to repo "%s"' + % repo.repoid) + for ref in refs: + self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.classname, "LMI_SoftwareIdentity") + self.assertEqual(ref.path.key_properties(), ["InstanceID"]) + + nevra_set = set(i.ElementName for i in refs) + for pkg in repo.packages: + self.assertTrue(pkg.nevra in nevra_set, + 'Missing package "%s" in repo "%s".' % (pkg, repo.repoid)) + nevra_set.remove(pkg.nevra) + self.assertEqual(len(nevra_set), 0, + "all packages from repository have been listed") + + @swbase.test_with_repos('stable') + def test_get_pkg_repository_name(self): + """ + Test ``AssociatorNames()`` call on ``LMI_SoftwareIdentity``. + """ + repo = self.get_repo('stable') + self.assertTrue(repo.status) + for pkg in repo.packages: + objpath = self.make_op(pkg, repo) + inst = objpath.ManagedElement.to_instance() + self.assertNotEqual(inst, None, + "GetInstance() succeeds for package %s" % pkg) + refs = inst.associator_names( AssocClass=self.CLASS_NAME, - Role="AvailableSAP", - ResultRole="ManagedElement", - ResultClass="LMI_SoftwareIdentity") - if repo.pkg_count: - self.assertGreater(len(refs), 0, - 'no software identities associated to repo "%s"' % repo.name) - for ref in refs: - self.assertEqual(ref.namespace, 'root/cimv2') - self.assertEqual(ref.classname, "LMI_SoftwareIdentity") - self.assertEqual(ref.key_properties(), ["InstanceID"]) - self.assertTrue( - ref.InstanceID.startswith("LMI:LMI_SoftwareIdentity:")) - - @mark_dangerous - def test_get_managed_element_referents(self): - """ - Test ReferenceNames for SoftwareIdentity. - """ - for pkg, up in ((pkg, up) for pkg in self.dangerous_pkgs - for up in (True, False)): - objpath = self.make_op(pkg, newer=up) - refs = objpath.ManagedElement.to_instance().associator_names( + Role="ManagedElement", + ResultRole="AvailableSAP", + ResultClass="LMI_SoftwareIdentityResource") + self.assertEqual(1, len(refs), + 'No repo found for pkg "%s".' % pkg.nevra) + ref = refs[0] + self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.classname, "LMI_SoftwareIdentityResource") + self.assertEqual(set(ref.key_properties()), + set(["SystemCreationClassName", "SystemName", + "Name", "CreationClassName"])) + self.assertEqual(ref.Name, pkg.repoid, + 'Repository name does not match for pkg "%s"' + % pkg.nevra) + + @swbase.test_with_repos('updates') + @swbase.test_with_packages(**{ + 'updates#pkg1' : True, + 'updates#pkg2' : True, + 'updates#pkg3' : False, + 'updates#pkg4' : False + }) + def test_get_pkg_repository(self): + """ + Test ``AssociatorNames()`` call on ``LMI_SoftwareIdentity``. + + Try to get repository for installed and not installed repository. + """ + repo = self.get_repo('updates') + self.assertTrue(repo.status) + for pkg in repo.packages: + objpath = self.make_op(pkg, repo) + inst = objpath.ManagedElement + self.assertNotEqual(inst, None, + "GetInstance() succeeds for package %s" % pkg) + refs = inst.to_instance().associators( AssocClass=self.CLASS_NAME, Role="ManagedElement", ResultRole="AvailableSAP", ResultClass="LMI_SoftwareIdentityResource") self.assertEqual(1, len(refs), - 'No repo found for pkg "%s".' % pkg.get_nevra(newer=up, - with_epoch="ALWAYS")) + 'No repo found for pkg "%s".' % pkg.nevra) ref = refs[0] self.assertEqual(ref.namespace, 'root/cimv2') self.assertEqual(ref.classname, "LMI_SoftwareIdentityResource") - self.assertEqual(sorted(ref.key_properties()), - sorted(["SystemCreationClassName", "SystemName", + self.assertEqual(set(ref.path.key_properties()), + set(["SystemCreationClassName", "SystemName", "Name", "CreationClassName"])) - self.assertEqual( - getattr(pkg, 'up_repo' if up else 'repo'), - ref.Name, 'Repository name does not match for pkg "%s"'% - pkg.get_nevra(newer=up, with_epoch="ALWAYS")) + self.assertEqual(ref.Name, pkg.repoid, + 'Repository name does not match for pkg "%s"' + % pkg.nevra) + self.assertEqual(ref.ElementName, pkg.repoid) + self.assertEqual(ref.EnabledState, ENABLED_STATE_ENABLED) def suite(): """For unittest loaders.""" diff --git a/src/software/test/test_software_identity.py b/src/software/test/test_software_identity.py index c4c5524..05ce92d 100644 --- a/src/software/test/test_software_identity.py +++ b/src/software/test/test_software_identity.py @@ -19,93 +19,228 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_SoftwareIdentity provider. +Unit tests for ``LMI_SoftwareIdentity`` provider. """ from datetime import datetime, timedelta import pywbem import unittest -from lmi.shell.LMIUtil import lmi_wrap_cim_instance_name -from lmi.test.util import mark_dangerous -import base -import rpmcache -import util +from lmi.test.lmibase import enable_lmi_exceptions +import package +import swbase -class TestSoftwareIdentity(base.SoftwareBaseTestCase): #pylint: disable=R0904 +class TestSoftwareIdentity(swbase.SwTestCase): """ - Basic cim operations test. + Basic cim operations test on ``LMI_SoftwareIdentity``. """ CLASS_NAME = "LMI_SoftwareIdentity" KEYS = ("InstanceID", ) - def make_op(self, pkg, newer=True): + def make_op(self, pkg): """ - @return object path of SoftwareIdentity + :returns: Object path of ``LMI_SoftwareIdentity`` + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ + if isinstance(pkg, package.Package): + nevra = pkg.nevra + else: + nevra = pkg return self.cim_class.new_instance_name({ - "InstanceID" : 'LMI:LMI_SoftwareIdentity:' - + pkg.get_nevra(newer, "ALWAYS") + "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + nevra }) - @mark_dangerous - def test_get_instance(self): - """ - Dangerous test, making sure, that properties are set correctly - for installed and removed packages. - """ - for pkg in self.dangerous_pkgs: - if rpmcache.is_pkg_installed(pkg.name): - rpmcache.remove_pkg(pkg.name) - objpath = self.make_op(pkg) - inst = objpath.to_instance() - self.assertIsNone(inst.InstallDate) - time_stamp = datetime.now(pywbem.cim_types.MinutesFromUTC(0)) \ - - timedelta(seconds=0.01) - rpmcache.install_pkg(pkg) - inst.refresh() - self.assertIsNotNone(inst.InstallDate) - self.assertGreater(inst.InstallDate.datetime, time_stamp) - - def test_get_instance_safe(self): - """ - Tests GetInstance call on packages from our rpm cache. - """ - for pkg in self.safe_pkgs: - objpath = self.make_op(pkg) - inst = objpath.to_instance() - self.assertCIMNameEqual(inst.path, objpath, - "Object paths should match for package %s"%pkg) - for key in self.KEYS: - self.assertIn(key, inst.properties(), - "OP is missing \"%s\" key for package %s"%(key, pkg)) - self.assertIsInstance(inst.Caption, basestring) - self.assertIsInstance(inst.Description, basestring) - self.assertEqual(pkg.up_evra, inst.VersionString, - "VersionString does not match evra for pkg %s" % pkg) - self.assertTrue(inst.IsEntity) - self.assertEqual(pkg.name, inst.Name, - "Name does not match for pkg %s" % pkg) - self.assertIsInstance(inst.Epoch, pywbem.Uint32, - "Epoch does not match for pkg %s" % pkg) - self.assertEqual(int(pkg.epoch), inst.Epoch) - self.assertEqual(pkg.ver, inst.Version, - "Version does not match for pkg %s" % pkg) - self.assertEqual(pkg.rel, inst.Release, - "Release does not match for pkg %s" % pkg) - self.assertEqual(pkg.arch, inst.Architecture, - "Architecture does not match for pkg %s" % pkg) - - # try to leave out epoch part - if pkg.epoch == "0": - op_no_epoch = objpath.copy() - op_no_epoch.wrapped_object["SoftwareElementID"] = \ - util.make_nevra(pkg.name, pkg.up_epoch, pkg.up_ver, - pkg.up_rel, pkg.arch, "NEVER") - self.assertNotIn(":", op_no_epoch.SoftwareElementID) - inst = op_no_epoch.to_instance() - self.assertCIMNameIn(inst.path, (objpath, op_no_epoch)) + def _check_package_instance(self, pkg, inst): + """ + Check properties of ``LMI_SoftwareIdentity`` instance and its + correspondence to represented *pkg*. + """ + self.assertNotEqual(inst, None, + "instance must exist for package %s" % pkg) + objpath = self.make_op(pkg) + self.assertCIMNameEqual(objpath, inst.path) + for key in self.KEYS: + self.assertEqual(getattr(inst, key), getattr(inst.path, key)) + self.assertEqual(set(inst.path.key_properties()), set(self.KEYS)) + self.assertEqual(pkg.name, inst.Name) + + self.assertEqual(inst.InstanceID, 'LMI:LMI_SoftwareIdentity:' + + pkg.nevra) + self.assertEqual(inst.Caption, pkg.summary) + self.assertIsInstance(inst.Description, basestring) + self.assertEqual(inst.VersionString, pkg.evra, + "VersionString does not match evra for pkg %s" % pkg) + self.assertTrue(inst.IsEntity) + self.assertEqual(inst.Name, pkg.name, + "Name does not match for pkg %s" % pkg) + self.assertIsInstance(inst.Epoch, pywbem.Uint32, + "Epoch does not match for pkg %s" % pkg) + self.assertEqual(inst.Epoch, pkg.epoch) + self.assertEqual(inst.Version, pkg.ver, + "Version does not match for pkg %s" % pkg) + self.assertEqual(inst.Release, pkg.rel, + "Release does not match for pkg %s" % pkg) + self.assertEqual(inst.Architecture, pkg.arch, + "Architecture does not match for pkg %s" % pkg) + self.assertEqual(inst.ElementName, pkg.nevra, + "ElementName must match nevra of pkg %s" % pkg) + self.assertTrue(inst.InstanceID.endswith(inst.ElementName), + "InstanceID has package nevra at its end for pkg %s" % pkg) + + if package.is_pkg_installed(pkg): + self.assertNotEqual(inst.InstallDate, None) + else: + self.assertEqual(inst.InstallDate, None) + + @swbase.test_with_packages(**{'stable#pkg1' : False}) + def test_get_instance_installed(self): + """ + Test ``GetInstance()`` call on ``LMI_SoftwareIdentity`` with + installed package. + """ + pkg = self.get_repo('stable')['pkg1'] + package.remove_pkg(pkg.name) + package.install_pkg(pkg) + self.assertTrue(package.is_pkg_installed(pkg)) + + objpath = self.make_op(pkg) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, "GetInstance is successful on %s" % pkg) + self.assertIsNotNone(inst.InstallDate) + time_stamp = datetime.now(pywbem.cim_types.MinutesFromUTC(0)) \ + - timedelta(seconds=5) + self.assertGreater(inst.InstallDate.datetime, time_stamp) + self._check_package_instance(pkg, inst) + + @swbase.test_with_repos('stable') + @swbase.test_with_packages(**{'stable#pkg2' : False}) + def test_get_instance_not_installed(self): + """ + Test ``GetInstance()`` call on ``LMI_SoftwareIdentity`` with not + installed package. + """ + pkg = self.get_repo('stable')['pkg2'] + self.assertFalse(package.is_pkg_installed(pkg)) + + self._check_package_instance(pkg, self.make_op(pkg).to_instance()) + + @swbase.test_with_packages('stable#pkg1', 'stable#pkg4') + def test_get_instance_without_epoch(self): + """ + Test ``GetInstance()`` on ``LMI_SoftwareIdentity`` with epoch part + omitted. + """ + pkg = self.get_repo('stable')['pkg1'] + objpath = self.make_op(pkg) + objpath.wrapped_object['InstanceID'] = 'LMI:LMI_SoftwareIdentity:' \ + + pkg.get_nevra('NEVER') + self.assertFalse('0:' in objpath.InstanceID) + inst = objpath.to_instance() + self.assertNotEqual(inst, None) + self.assertTrue('0:' in pkg.nevra) + self.assertTrue(inst.InstanceID.endswith(pkg.nevra)) + + @swbase.test_with_repos('stable') + @swbase.test_with_packages(**{'stable#pkg3' : False}) + def test_refresh_after_install(self): + """ + Test instance refresh of ``LMI_SoftwareIdentity`` after package + installation. + """ + pkg = self.get_repo('stable')['pkg3'] + self.assertFalse(package.is_pkg_installed(pkg)) + inst = self.make_op(pkg).to_instance() + self.assertEqual(inst.InstallDate, None) + self._check_package_instance(pkg, inst) + + # install it + package.install_pkg(pkg) + self.assertTrue(package.is_pkg_installed(pkg)) + inst.refresh() + self.assertNotEqual(inst.InstallDate, None) + self._check_package_instance(pkg, inst) + + @swbase.test_with_repos('stable') + @swbase.test_with_packages('stable#pkg4') + def test_refresh_after_uninstall(self): + """ + Test instance refresh of ``LMI_SoftwareIdentity`` after package + uninstallation. + """ + pkg = self.get_repo('stable')['pkg4'] + self.assertTrue(package.is_pkg_installed(pkg)) + inst = self.make_op(pkg).to_instance() + self.assertNotEqual(inst.InstallDate, None) + self._check_package_instance(pkg, inst) + + # install it + package.remove_pkg(pkg.name) + self.assertFalse(package.is_pkg_installed(pkg)) + inst.refresh() + self.assertEqual(inst.InstallDate, None) + self._check_package_instance(pkg, inst) + + @enable_lmi_exceptions + @swbase.test_with_packages('stable#pkg3') + def test_get_instance_invalid(self): + """ + Test ``GetInstance()`` call on invalid object path. + """ + pkg = self.get_repo('stable')['pkg3'] + # leave out arch part + nvr = '%s-%s-%s' % (pkg.name, pkg.ver, pkg.rel) + objpath = self.make_op(nvr) + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance) + + # leave out version part + nra = '%s-%s.%s' % (pkg.name, pkg.rel, pkg.arch) + objpath = self.make_op(nra) + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance) + + # leave out release part + nva = '%s-%s.%s' % (pkg.name, pkg.ver, pkg.arch) + objpath = self.make_op(nva) + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance) + + # leave out name part + vra = '%s-%s.%s' % (pkg.ver, pkg.rel, pkg.arch) + objpath = self.make_op(vra) + self.assertRaisesCIM(pywbem.CIM_ERR_INVALID_PARAMETER, + objpath.to_instance) + + # leave out just epoch + nvra = '%s-%s-%s.%s' % (pkg.name, pkg.ver, pkg.rel, pkg.arch) + objpath = self.make_op(nvra) + inst = objpath.to_instance() + self.assertNotEqual(inst, None) + + @swbase.test_with_packages('misc#funny-version', 'misc#funny-release') + def test_get_instance_funny(self): + """ + Test ``GetInstance()`` on package with quite odd nevra. + """ + pkg = self.get_repo('misc')['funny-version'] + self.assertTrue(package.is_pkg_installed(pkg)) + objpath = self.make_op(pkg) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, "GetInstance is successful on %s" % pkg) + self._check_package_instance(pkg, inst) + + pkg = self.get_repo('misc')['funny-release'] + self.assertTrue(package.is_pkg_installed(pkg)) + objpath = self.make_op(pkg) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, "GetInstance is successful on %s" % pkg) + self._check_package_instance(pkg, inst) + + @enable_lmi_exceptions + def test_enum_instance_names(self): + """ + Test ``EnumInstanceNames()`` call on ``LMI_SoftwareIdentity``. + """ + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_SUPPORTED, + self.cim_class.instance_names) def suite(): """For unittest loaders.""" diff --git a/src/software/test/test_software_identity_checks.py b/src/software/test/test_software_identity_checks.py index 20d4fd1..c29fa47 100644 --- a/src/software/test/test_software_identity_checks.py +++ b/src/software/test/test_software_identity_checks.py @@ -19,17 +19,22 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_SoftwareIdentityChecks provider. +Unit tests for ``LMI_SoftwareIdentityChecks`` provider. """ +import os import pywbem +import subprocess import unittest from lmi.test.lmibase import enable_lmi_exceptions -import base +import package +import swbase import util -class TestSoftwareIdentityChecks(base.SoftwareBaseTestCase): +SOFTWARE_ELEMENT_STATE_EXECUTABLE = 2 + +class TestSoftwareIdentityChecks(swbase.SwTestCase): """ Basic cim operations test. """ @@ -37,104 +42,160 @@ class TestSoftwareIdentityChecks(base.SoftwareBaseTestCase): CLASS_NAME = "LMI_SoftwareIdentityChecks" KEYS = ("Check", "Element") - @classmethod - def needs_pkgdb_files(cls): - return True - - def make_op(self, pkg, filepath, newer=True): + def make_op(self, pkg, filepath): """ - @return object path of SoftwareIdentityChecks association + :returns: Object path of ``LMI_SoftwareIdentityChecks`` + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ return self.cim_class.new_instance_name({ "Check" : self.ns.LMI_SoftwareIdentityFileCheck.new_instance_name({ "CheckID" : 'LMI:LMI_SoftwareIdentityFileCheck', "Name" : filepath, - "SoftwareElementID" : pkg.get_nevra( - newer=newer, with_epoch="ALWAYS"), - "SoftwareElementState" : pywbem.Uint16(2), #Executable + "SoftwareElementID" : pkg.nevra, + "SoftwareElementState" : SOFTWARE_ELEMENT_STATE_EXECUTABLE, "TargetOperatingSystem" : util.get_target_operating_system(), - "Version" : getattr(pkg, 'up_evra' if newer else 'evra') + "Version" : pkg.evra }), "Element" : self.ns.LMI_SoftwareIdentity.new_instance_name({ - "InstanceID" : 'LMI:LMI_SoftwareIdentity:' - + pkg.get_nevra(newer=newer, with_epoch="ALWAYS") + "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + pkg.nevra }) }) - @unittest.skip("unrealiable test on random packages") + def setUp(self): + to_uninstall = set() + for repo in self.repodb.values(): + for pkg in repo.packages: + to_uninstall.add(pkg.name) + to_uninstall = [p for p in to_uninstall if package.is_pkg_installed(p)] + if to_uninstall: + subprocess.call(['/usr/bin/rpm', '--quiet', '-e'] + to_uninstall) + + @swbase.test_with_packages('stable#pkg1') def test_get_instance(self): """ - Tests GetInstance call on packages from our rpm cache. + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityChecks``. + """ + pkg = self.get_repo('stable')['pkg1'] + count = 0 + for filepath in pkg: + objpath = self.make_op(pkg, filepath) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + "failed to get instance for %s:%s" % (pkg, filepath)) + self.assertCIMNameEqual(inst.path, objpath) + for key in self.KEYS: + self.assertCIMNameEqual( + getattr(inst, key), getattr(inst.path, key)) + count += 1 + self.assertEqual(count, 7, "there are 7 files in pkg1") + + @enable_lmi_exceptions + @swbase.test_with_packages('stable#pkg1') + def test_get_instance_invalid(self): + """ + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityChecks`` + on not installed file. """ - for pkg in self.safe_pkgs: - for filepath in self.pkgdb_files[pkg.name]: - objpath = self.make_op(pkg, filepath) - inst = objpath.to_instance() - self.assertCIMNameEqual(inst.path, objpath, - "Object paths should match for package %s"%pkg) - for key in self.KEYS: - self.assertIn(key, inst.properties(), - "OP is missing \"%s\" key for package %s"%(key, pkg)) - if key == "SoftwareElementID": - setattr(inst, key, getattr(inst, key)[:-1]) - self.assertCIMNameEqual( - getattr(inst, key), getattr(inst.path, key)) - self.assertCIMNameEqual(objpath, inst.path, - "Object paths should match for package %s"%pkg) + pkg = self.get_repo('stable')['pkg1'] + objpath = self.make_op(pkg, '/not-installed-file') + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance) @enable_lmi_exceptions def test_enum_instance_names(self): """ - Tests EnumInstances call on identity checks - should not be supported. + Test ``EnumInstanceNames()`` call on ``LMI_SoftwareIdentityChecks``. + Should not be supported. """ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_SUPPORTED, self.cim_class.instance_names) - def test_get_identity_referents(self): + @swbase.test_with_packages('stable#pkg3') + def test_package_file_checks(self): + """ + Try to get file checks associated with package. + """ + pkg = self.get_repo('stable')['pkg3'] + filepath = '/usr/share/openlmi-sw-test-pkg3' + objpath = self.make_op(pkg, filepath) + inst = objpath.Element.to_instance() + self.assertNotEqual(inst, None, + "get instance on %s:%s succeeds" % (pkg, filepath)) + refs = inst.associators( + AssocClass=self.CLASS_NAME, + Role="Element", + ResultRole="Check", + ResultClass="LMI_SoftwareIdentityFileCheck") + self.assertEqual(len(refs), len(pkg)) + self.assertEqual(len(refs), 10, "there are 10 files in %s" % pkg.name) + pkg_names = set(pkg.files) + for ref in refs: + self.assertIn(ref.Name, set(r.Name for r in refs)) + self.assertEqual(sorted(ref.path.key_properties()), + ["CheckID", "Name", "SoftwareElementID", + "SoftwareElementState", + "TargetOperatingSystem", "Version"]) + self.assertEqual(ref.SoftwareElementID, pkg.nevra) + self.assertIn(ref.Name, pkg_names) + self.assertEqual(ref.FailedFlags, []) + pkg_names.remove(ref.Name) + self.assertEqual(len(pkg_names), 0) + + @swbase.test_with_packages('stable#pkg3') + def test_package_file_check_names(self): """ - Test AssociatorNames for Element(SoftwareIdentity). + Try to get file check names associated with package. """ - for pkg in self.safe_pkgs: - files = self.pkgdb_files[pkg.name] - if not files: - # package may not have any file - continue - objpath = self.make_op(pkg, files[0]) - refs = objpath.Element.to_instance().associator_names( - AssocClass=self.CLASS_NAME, - Role="Element", - ResultRole="Check", - ResultClass="LMI_SoftwareIdentityFileCheck") - for ref in refs: - self.assertEqual(ref.namespace, 'root/cimv2') - self.assertEqual(ref.classname, - "LMI_SoftwareIdentityFileCheck") - self.assertIn(ref.Name, set(r.Name for r in refs)) - self.assertEqual(sorted(ref.key_properties()), - ["CheckID", "Name", "SoftwareElementID", - "SoftwareElementState", - "TargetOperatingSystem", "Version"]) - self.assertEqual( - pkg.get_nevra(newer=True, with_epoch='ALWAYS'), - ref.SoftwareElementID) - - @unittest.skip("unrealiable test on random packages") - def test_get_check_referents(self): + pkg = self.get_repo('stable')['pkg3'] + objpath = self.make_op(pkg, '/usr/share/openlmi-sw-test-pkg3') + refs = objpath.Element.to_instance().associator_names( + AssocClass=self.CLASS_NAME, + Role="Element", + ResultRole="Check", + ResultClass="LMI_SoftwareIdentityFileCheck") + self.assertEqual(len(refs), len(pkg)) + self.assertEqual(len(refs), 10, "there are 10 files in %s" % pkg.name) + pkg_names = set(pkg.files) + for ref in refs: + self.assertIn(ref.Name, set(r.Name for r in refs)) + self.assertEqual(sorted(ref.key_properties()), + ["CheckID", "Name", "SoftwareElementID", + "SoftwareElementState", + "TargetOperatingSystem", "Version"]) + self.assertEqual(ref.SoftwareElementID, pkg.nevra) + self.assertIn(ref.Name, pkg_names) + pkg_names.remove(ref.Name) + self.assertEqual(len(pkg_names), 0) + + @swbase.test_with_packages('stable#pkg3') + def test_file_check_package(self): """ - Test AssociatorNames for Check(SoftwareIdentityFileCheck). + Try to get package assocatied to file check. """ - for pkg in self.safe_pkgs: - files = self.pkgdb_files[pkg.name] - for filepath in files: - objpath = self.make_op(pkg, filepath) - refs = objpath.Check.to_instance().associator_names( - AssocClass=self.CLASS_NAME, - Role="Check", - ResultRole="Element", - ResultClass="LMI_SoftwareIdentity") - self.assertEqual(len(refs), 1) - ref = refs[0] - self.assertCIMNameEqual(objpath.Element, ref) + pkg = self.get_repo('stable')['pkg3'] + filepath = '/usr/share/openlmi-sw-test-pkg3/README' + objpath = self.make_op(pkg, filepath) + inst = objpath.Check.to_instance() + self.assertNotEqual(inst, None, + "get instance for %s:%s succeeds" % (pkg, filepath)) + refs = inst.associators( + AssocClass=self.CLASS_NAME, + Role="Check", + ResultRole="Element", + ResultClass="LMI_SoftwareIdentity") + self.assertEqual(len(refs), 1) + ref = refs[0] + self.assertCIMNameEqual(ref.path, objpath.Element) + self.assertEqual(ref.ElementName, pkg.nevra) + self.assertEqual(ref.VersionString, pkg.evra) + + # try with removed file + os.remove(filepath) + refs = objpath.Check.to_instance().associators( + AssocClass=self.CLASS_NAME, + Role="Check", + ResultRole="Element", + ResultClass="LMI_SoftwareIdentity") + self.assertEqual(len(refs), 1) def suite(): """For unittest loaders.""" diff --git a/src/software/test/test_software_identity_file_check.py b/src/software/test/test_software_identity_file_check.py index 485a970..4783076 100644 --- a/src/software/test/test_software_identity_file_check.py +++ b/src/software/test/test_software_identity_file_check.py @@ -19,25 +19,77 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_SoftwareIdentity provider. +Unit tests for ``LMI_SoftwareIdentityFileCheck`` provider. """ +import shutil import os import pywbem import re +import stat import subprocess import unittest from lmi.test.lmibase import enable_lmi_exceptions -from lmi.test.util import mark_dangerous -import base -import rpmcache +import package +import swbase import util RE_CHECKSUM = re.compile(r'^([0-9a-fA-F]+)\s+.*') -class TestSoftwareIdentityFileCheck( - base.SoftwareBaseTestCase): #pylint: disable=R0904 +SOFTWARE_ELEMENT_STATE_EXECUTABLE = 2 +FILE_TYPE_UNKNOWN = 0 +FILE_TYPE_FILE = 1 +FILE_TYPE_DIRECTORY = 2 +FILE_TYPE_SYMLINK = 3 +FILE_TYPE_FIFO = 4 +FILE_TYPE_CHARDEV = 5 +FILE_TYPE_BLOCKDEV = 6 +FAILED_FLAGS_EXISTENCE = 0 +FAILED_FLAGS_SIZE = 1 +FAILED_FLAGS_MODE = 2 +FAILED_FLAGS_CHECKSUM = 3 +FAILED_FLAGS_DEVNUM = 4 +FAILED_FLAGS_TARGET = 5 +FAILED_FLAGS_UID = 6 +FAILED_FLAGS_GID = 7 +FAILED_FLAGS_MTIME = 8 +FILE_MODE_XOTH = 0 +FILE_MODE_WOTH = 1 +FILE_MODE_ROTH = 2 +FILE_MODE_XGRP = 3 +FILE_MODE_WGRP = 4 +FILE_MODE_RGRP = 5 +FILE_MODE_XUSR = 6 +FILE_MODE_WUSR = 7 +FILE_MODE_RUSR = 8 +FILE_MODE_SVTX = 9 # sticky bit +FILE_MODE_SGID = 10 +FILE_MODE_SUID = 11 +INVOKE_SATISFIED = 0 +INVOKE_NOT_SATISFIED = 2 + +# maps algorithm id to the shell command +HASH_COMMAND = { + 1 : "md5sum", + 2 : "sha1sum", + 8 : "sha256sum", + 9 : "sha384sum", + 10 : "sha512sum", + 11 : "sha224sum" +} + +# maps algorithm id to the length of digest string +HASH_DIGEST_LENGTH = { + 1 : 32, #MD5 + 2 : 40, #SHA1 + 8 : 64, #SHA256 + 9 : 96, #SHA384 + 10 : 128, #SHA512 + 11 : 56 #SHA224 +} + +class TestSoftwareIdentityFileCheck(swbase.SwTestCase): """ Basic cim operations test. """ @@ -45,103 +97,84 @@ class TestSoftwareIdentityFileCheck( CLASS_NAME = "LMI_SoftwareIdentityFileCheck" KEYS = ("CheckID", "Name", "SoftwareElementID", "SoftwareElementState") - # maps algorithm id to the shell command - HASH_COMMAND = { - 1 : "md5sum", - 2 : "sha1sum", - 8 : "sha256sum", - 9 : "sha384sum", - 10 : "sha512sum", - 11 : "sha224sum" - } - - # maps algorithm id to the length of digest string - HASH_DIGEST_LENGTH = { - 1 : 32, #MD5 - 2 : 40, #SHA1 - 8 : 64, #SHA256 - 9 : 96, #SHA384 - 10 : 128, #SHA512 - 11 : 56 #SHA224 - } - - @classmethod - def needs_pkgdb_files(cls): - return True - def make_checksum_str(self, csumnum, filename): """ - @return checksum of installed file + :returns: Checksum of installed file. + :rtype: string """ return RE_CHECKSUM.match(subprocess.check_output([ - self.HASH_COMMAND[csumnum], filename])).group(1).lower() + HASH_COMMAND[csumnum], filename])).group(1).lower() - def make_op(self, pkg, file_name, newer=True): + def make_op(self, pkg, file_name): """ - :returns: object path of `LMI_SoftwareIdentityFileCheck` + :returns: Object path of ``LMI_SoftwareIdentityFileCheck`` + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ return self.cim_class.new_instance_name({ "CheckID" : 'LMI:LMI_SoftwareIdentityFileCheck', "Name" : file_name, - "SoftwareElementID" : pkg.get_nevra( - newer=newer, with_epoch="ALWAYS"), - "SoftwareElementState" : pywbem.Uint16(2), #Executable + "SoftwareElementID" : pkg.nevra, + "SoftwareElementState" : pywbem.Uint16( + SOFTWARE_ELEMENT_STATE_EXECUTABLE), "TargetOperatingSystem" : util.get_target_operating_system(), - "Version" : getattr(pkg, 'up_evra' if newer else 'evra') + "Version" : pkg.evra }) - def do_check_symlink(self, pkg, filepath, inst, safe=False): + def setUp(self): + to_uninstall = set() + for repo in self.repodb.values(): + for pkg in repo.packages: + to_uninstall.add(pkg.name) + to_uninstall = [p for p in to_uninstall if package.is_pkg_installed(p)] + if to_uninstall: + subprocess.call(['/usr/bin/rpm', '--quiet', '-e'] + to_uninstall) + + def do_check_symlink(self, pkg, filepath, inst): """ - Assert some details about symlink. - :param safe: (``bool``) If True, no modification is done to file. + Check symbolink link. """ target = os.readlink(filepath) stats = os.lstat(filepath) - self.assertEqual(pywbem.Uint16(3), inst.FileType, - "Unexpected file type of symlink for %s:%s"%( - pkg.name, filepath)) - self.assertEqual(stats.st_uid, inst.UserID, - "Unexpected uid of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_gid, inst.GroupID, - "Unexpected gid of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_mode, inst.FileMode, - "Unexpected mode of symlink for %s:%s" %(pkg.name, filepath)) - self.assertEqual(stats.st_size, inst.FileSize, - "Unexpected size of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual(target, inst.LinkTarget, - "Unexpected size of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual(int(stats.st_mtime), inst.LastModificationTime, - "Unexpected mtime of symlink for %s:%s"%(pkg.name, filepath)) + self.assertEqual(inst.FileType, pywbem.Uint16(FILE_TYPE_SYMLINK), + "Unexpected file type of symlink for %s:%s" + % (pkg.name, filepath)) + self.assertEqual(inst.UserID, stats.st_uid, + "Unexpected uid of symlink for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.GroupID, stats.st_gid, + "Unexpected gid of symlink for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.FileMode, stats.st_mode, + "Unexpected mode of symlink for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.FileSize, stats.st_size, + "Unexpected size of symlink for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.LinkTarget, target, + "Unexpected target of symlink for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.LastModificationTime, int(stats.st_mtime), + "Unexpected mtime of symlink for %s:%s" % (pkg.name, filepath)) self.assertIsNone(inst.Checksum, - "Checksum should be None for symlink %s:%s"%( - pkg.name, filepath)) + "Checksum should be None for symlink %s:%s" + % (pkg.name, filepath)) self.assertIsNone(inst.FileChecksum, - "FileChecksum should be None for symlink %s:%s"%( - pkg.name, filepath)) + "FileChecksum should be None for symlink %s:%s" + % (pkg.name, filepath)) self.assertIsNone(inst.MD5Checksum, - "MD5Checksum should be None for symlink %s:%s"%( - pkg.name, filepath)) - - if safe: - return + "MD5Checksum should be None for symlink %s:%s" + % (pkg.name, filepath)) # modify owner prev_user = inst.UserID os.lchown(filepath, stats.st_uid + 1, -1) inst.refresh() - self.assertEqual(inst.UserIDOriginal + 1, inst.UserID, + self.assertEqual(inst.UserID, inst.UserIDOriginal + 1, "Unexpected uid of modified symlink for %s:%s"%( pkg.name, filepath)) - self.assertEqual(prev_user + 1, inst.UserID, + self.assertEqual(inst.UserID, prev_user + 1, "Unexpected uid of modified symlink for %s:%s"%( pkg.name, filepath)) - self.assertEqual(stats.st_gid, inst.GroupID, + self.assertEqual(inst.GroupID, stats.st_gid, "Unexpected gid of modified symlink for %s:%s"%( pkg.name, filepath)) - self.assertEqual([ - pywbem.Uint16(7) #UserID - ], inst.FailedFlags) + self.assertEqual(inst.FailedFlags, [FAILED_FLAGS_UID]) # modify link_target os.remove(filepath) @@ -150,95 +183,87 @@ class TestSoftwareIdentityFileCheck( os.lchown(filepath, inst.UserIDOriginal, inst.GroupIDOriginal) inst.refresh() - self.assertEqual(set([ - pywbem.Uint16(1), #FileSize - pywbem.Uint16(6), #LinkTarget - ]), set(inst.FailedFlags)) - self.assertGreater(len(inst.LinkTarget), - len(inst.LinkTargetOriginal)) + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_TARGET])) + self.assertGreater(len(inst.LinkTarget), len(inst.LinkTargetOriginal)) self.assertTrue(inst.FileExists, "File %s:%s should exist"%(pkg.name, filepath)) - self.assertEqual(inst.FileTypeOriginal, inst.FileType, + self.assertEqual(inst.FileType, inst.FileTypeOriginal, "File type should match for symlink %s:%s"%(pkg.name, filepath)) self.assertNotEqual(inst.FileSizeOriginal, inst.FileSize, "File size should not match for symlink %s:%s"%( pkg.name, filepath)) - self.assertEqual(inst.FileModeOriginal, inst.FileMode, + self.assertEqual(inst.FileMode, inst.FileModeOriginal, "File mode should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertEqual(lt_modif, inst.LinkTarget, + self.assertEqual(inst.LinkTarget, lt_modif, "Link target should match modified path %s:%s"%( pkg.name, filepath)) - self.assertNotEqual(inst.LinkTargetOriginal, inst.ListTarget, + self.assertNotEqual(inst.LinkTargetOriginal, inst.LinkTarget, "Link target should not match for symlink %s:%s"%( pkg.name, filepath)) - self.assertEqual(inst.UserIDOriginal, inst.UserID, + self.assertEqual(inst.UserID, inst.UserIDOriginal, "File uid should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertEqual(inst.GroupIDOriginal, inst.GroupID, + self.assertEqual(inst.GroupID, inst.GroupIDOriginal, "File gid should match for symlink %s:%s"%(pkg.name, filepath)) def do_check_directory(self, pkg, filepath, inst): """ - Assert some details about directory. + Check directory. """ stats = os.lstat(filepath) - self.assertEqual(pywbem.Uint16(2), inst.FileType, + self.assertEqual(inst.FileType, pywbem.Uint16(FILE_TYPE_DIRECTORY), "Unexpected type for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_uid, inst.UserID, + self.assertEqual(inst.UserID, stats.st_uid, "Unexpected uid for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_gid, inst.GroupID, + self.assertEqual(inst.GroupID, stats.st_gid, "Unexpected gid for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_mode, inst.FileMode, + self.assertEqual(inst.FileMode, stats.st_mode, "Unexpected mode for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_size, inst.FileSize, + self.assertEqual(inst.FileSize, stats.st_size, "Unexpected size for directory %s:%s"%(pkg.name, filepath)) self.assertIs(inst.LinkTarget, None) self.assertIsNone(inst.FileChecksum, "Unexpected checksum for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(int(stats.st_mtime), inst.LastModificationTime, + self.assertEqual(inst.LastModificationTime, int(stats.st_mtime), "Unexpected mtime for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual([], inst.FailedFlags) + self.assertEqual(inst.FailedFlags, []) - def do_check_file(self, pkg, filepath, inst, safe=False): + def do_check_file(self, pkg, filepath, inst): """ - Assert some details about regular file. - :param safe: (``bool``) If True, no modification is done to file. + Check regular file. """ stats = os.lstat(filepath) - self.assertEqual(pywbem.Uint16(1), inst.FileType, + self.assertEqual(inst.FileType, pywbem.Uint16(FILE_TYPE_FILE), "Unexpected file type for %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_uid, inst.UserID, + self.assertEqual(inst.UserID, stats.st_uid, "Unexpected file uid for %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_gid, inst.GroupID, + self.assertEqual(inst.GroupID, stats.st_gid, "Unexpected gid for regular file %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_mode, inst.FileMode, + self.assertEqual(inst.FileMode, stats.st_mode, "Unexpected mode for regular file %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_size, inst.FileSize, + self.assertEqual(inst.FileSize, stats.st_size, "Unexpected size for regular file %s:%s"%(pkg.name, filepath)) self.assertIs(inst.LinkTarget, None) csum = self.make_checksum_str(inst.ChecksumType, filepath) - self.assertEqual(csum, inst.FileChecksum.lower(), + self.assertEqual(inst.FileChecksum.lower(), csum, "Unexpected checksum for regular file %s:%s"%(pkg.name, filepath)) self.assertEqual(inst.LastModificationTime, inst.LastModificationTimeOriginal, "Unexpected mtime for regular file %s:%s"%(pkg.name, filepath)) - self.assertEqual(int(stats.st_mtime), inst.LastModificationTime, + self.assertEqual(inst.LastModificationTime, int(stats.st_mtime), "Unexpected mtime for regular file %s:%s"%(pkg.name, filepath)) - if safe: - return - # make it longer with open(filepath, "a+") as fobj: fobj.write("data\n") inst.refresh() - self.assertEqual(set([ - pywbem.Uint16(1), #FileSize - pywbem.Uint16(3), #Checksum - pywbem.Uint16(9), #LastModificationTime - ]), set(inst.FailedFlags)) + self.assertEqual(set(inst.FailedFlags), set([ + FAILED_FLAGS_SIZE, + FAILED_FLAGS_CHECKSUM, + FAILED_FLAGS_MTIME, + ])) self.assertGreater(inst.FileSize, inst.FileSizeOriginal, "File size should be greater, then expected for regular file" @@ -266,15 +291,14 @@ class TestSoftwareIdentityFileCheck( pkg.name, filepath)) self.assertNotEqual(inst.FileTypeOriginal, inst.FileType, "File type should not match for %s:%s"%(pkg.name, filepath)) - self.assertEqual(pywbem.Uint16(3), inst.FileType, + self.assertEqual(inst.FileType, pywbem.Uint16(FILE_TYPE_SYMLINK), "File type should match for %s:%s"%(pkg.name, filepath)) - self.assertIn(pywbem.Uint16(2), #FileMode - inst.FailedFlags) + self.assertIn(pywbem.Uint16(FAILED_FLAGS_MODE), inst.FailedFlags) # remove it os.remove(filepath) inst.refresh() - self.assertEqual(inst.LinkTargetOriginal, inst.LinkTarget, + self.assertEqual(inst.LinkTarget, inst.LinkTargetOriginal, "Link target does not match for regular file %s:%s"%( pkg.name, filepath)) self.assertNotEqual(inst.FileSizeOriginal, inst.FileSize, @@ -286,20 +310,58 @@ class TestSoftwareIdentityFileCheck( self.assertIsNone(inst.FileMode) self.assertIsNone(inst.UserID) self.assertIsNone(inst.GroupID) - self.assertEqual([ - pywbem.Uint16(0), #exists - ], inst.FileExists) + self.assertFalse(inst.FileExists) + self.assertEqual(inst.FailedFlags, [FAILED_FLAGS_EXISTENCE]) - def check_filepath(self, pkg, filepath, safe=False): + def do_check_dev(self, pkg, filepath, inst): + """ + Check device file. + """ + stats = os.lstat(filepath) + + is_block = stat.S_ISBLK(stats.st_mode) + self.assertEqual(inst.FileType, pywbem.Uint16( + FILE_TYPE_BLOCKDEV if is_block else FILE_TYPE_CHARDEV), + "Unexpected file type of dev file for %s:%s" + % (pkg.name, filepath)) + self.assertEqual(inst.UserID, stats.st_uid, + "Unexpected uid of dev file for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.GroupID, stats.st_gid, + "Unexpected gid of dev file for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.FileMode, stats.st_mode, + "Unexpected mode of dev file for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.FileSize, stats.st_size, + "Unexpected size of dev file for %s:%s" % (pkg.name, filepath)) + self.assertEqual(inst.LinkTarget, None) + self.assertEqual(inst.LastModificationTime, int(stats.st_mtime), + "Unexpected mtime of dev file for %s:%s" % (pkg.name, filepath)) + self.assertIsNone(inst.Checksum, + "Checksum should be None for dev file %s:%s" + % (pkg.name, filepath)) + self.assertIsNone(inst.FileChecksum, + "FileChecksum should be None for dev file %s:%s" + % (pkg.name, filepath)) + self.assertIsNone(inst.MD5Checksum, + "MD5Checksum should be None for dev file %s:%s" + % (pkg.name, filepath)) + + os.remove(filepath) + subprocess.call(['/usr/bin/mknod', filepath, 'b' if is_block else 'c', + str(os.major(stats.st_rdev) + 1), str(os.minor(stats.st_rdev) + 1)]) + self.assertTrue(os.path.exists(filepath)) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_DEVNUM])) + self.assertEqual(inst.FileMode, stats.st_mode) + + def check_filepath(self, pkg, filepath): """ Make a check of particular file of package. All files are expected to have no flaw. - - :param safe: (``bool``) says, whether modifications to file can be done """ objpath = self.make_op(pkg, filepath) inst = objpath.to_instance() - self.assertCIMNameEqual(objpath, inst.path, + self.assertNotEqual(inst, None) + self.assertCIMNameEqual(inst.path, objpath, msg="Object paths of instance must match for %s:%s"%( pkg.name, filepath)) for key in self.KEYS: @@ -312,8 +374,9 @@ class TestSoftwareIdentityFileCheck( self.assertTrue(inst.FileExists, "File %s:%s must exist"%(pkg.name, filepath)) - self.assertEqual(0, len(inst.FailedFlags), - "FailedFlags must be empty") + self.assertEqual(len(inst.FailedFlags), 0, + "FailedFlags must be empty for %s:%s, not: %s" % ( + str(pkg), filepath, inst.FailedFlags)) for prop in ( "FileType", "UserID", "GroupID" , "FileMode", "FileSize", "LinkTarget" @@ -327,91 +390,435 @@ class TestSoftwareIdentityFileCheck( getattr(inst, prop+"Original"), getattr(inst, prop), "%s should match for %s:%s"%(prop, pkg.name, filepath)) if os.path.islink(filepath): - self.do_check_symlink(pkg, filepath, inst, safe=safe) + self.do_check_symlink(pkg, filepath, inst) elif os.path.isdir(filepath): self.do_check_directory(pkg, filepath, inst) elif os.path.isfile(filepath): - self.do_check_file(pkg, filepath, inst, safe=safe) + self.do_check_file(pkg, filepath, inst) - @unittest.skip("not reliable test for random packages") - def test_get_instance_safe(self): + @swbase.test_with_packages('stable#pkg1') + def test_get_directory(self): """ - Tests GetInstance call. + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with + directory. """ - for pkg in self.safe_pkgs: - files = self.pkgdb_files[pkg.name] - for filepath in files: - self.check_filepath(pkg, filepath, safe=True) + pkg = self.get_repo('stable')['pkg1'] + filepath = '/usr/share/openlmi-sw-test-pkg1' + objpath = self.make_op(pkg, filepath) + inst = objpath.to_instance() + self.assertEqual(inst.FileType, FILE_TYPE_DIRECTORY) + self.check_filepath(pkg, filepath) + + @swbase.test_with_packages('stable#pkg1') + def test_check_config(self): + """ + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with + config file.. + """ + pkg = self.get_repo('stable')['pkg1'] + filepath = '/etc/openlmi/software/test/dummy_config.cfg' + objpath = self.make_op(pkg, filepath) + inst = objpath.to_instance() + self.assertEqual(inst.FileType, FILE_TYPE_FILE) + self.check_filepath(pkg, filepath) + + @enable_lmi_exceptions + @swbase.test_with_packages('stable#pkg1', 'stable#pkg2') + def test_get_instance_invalid(self): + """ + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with + file not belonging to particular package. + """ + stable = self.get_repo('stable') + pkg1 = stable['pkg1'] + pkg2 = stable['pkg2'] + fp1 = '/etc/openlmi/software/test/dummy_config.cfg' + fp2 = '/usr/share/openlmi-sw-test-pkg2' + objpath1 = self.make_op(pkg1, fp2) + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath1.to_instance) + objpath2 = self.make_op(pkg2, fp1) + self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath2.to_instance) + objpath1.wrapped_object["Name"] = fp1 + self.assertNotEqual(objpath1.to_instance(), None) + objpath2.wrapped_object["Name"] = fp2 + self.assertNotEqual(objpath2.to_instance(), None) + + @swbase.test_with_packages('stable#pkg2') + def test_check_symlink(self): + """ + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with + symbolic link. + """ + pkg = self.get_repo('stable')['pkg2'] + filepath = '/usr/share/openlmi-sw-test-pkg2/symlinks/absolute' + objpath = self.make_op(pkg, filepath) + inst = objpath.to_instance() + self.assertEqual(inst.FileType, FILE_TYPE_SYMLINK) + self.check_filepath(pkg, filepath) + + @swbase.test_with_packages('stable#pkg3') + def test_check_chardev(self): + """ + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with + block and character device files. + """ + pkg = self.get_repo('stable')['pkg3'] + filepath = '/usr/share/openlmi-sw-test-pkg3/devs/char12' + objpath = self.make_op(pkg, filepath) + inst = objpath.to_instance() + self.assertNotEqual(inst, None) + self.assertEqual(inst.FileType, FILE_TYPE_CHARDEV) + self.check_filepath(pkg, filepath) + self.do_check_dev(pkg, filepath, inst) + + @swbase.test_with_packages('stable#pkg3') + def test_check_blockdev(self): + """ + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with + block and character device files. + """ + pkg = self.get_repo('stable')['pkg3'] + filepath = '/usr/share/openlmi-sw-test-pkg3/devs/block56' + objpath = self.make_op(pkg, filepath) + inst = objpath.to_instance() + self.assertNotEqual(inst, None) + self.assertEqual(inst.FileType, FILE_TYPE_BLOCKDEV) + self.check_filepath(pkg, filepath) + self.do_check_dev(pkg, filepath, inst) + + @swbase.test_with_packages('stable#pkg4') + def test_permissions(self): + """ + Test ``FileMode`` property of ``LMI_SoftwareIdentityFileCheck``. + """ + pkg = self.get_repo('stable')['pkg4'] + permsdir = '/usr/share/openlmi-sw-test-pkg4/perms' + objpath = self.make_op(pkg, permsdir) + inst = objpath.to_instance() + self.assertNotEqual(inst, None) + permsdir += '/' + self.assertEqual(inst.FileMode, stat.S_IFDIR | stat.S_ISVTX | 0777) + self.assertEqual(set(inst.FileModeFlags), set(range(10))) + + objpath.wrapped_object["Name"] = permsdir + 'rwxrwxrwx' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 0777) + self.assertEqual(set(inst.FileModeFlags), set(range(9))) + + objpath.wrapped_object["Name"] = permsdir + 'rw-rw-rw-' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 0666) + self.assertEqual(set(inst.FileModeFlags), set([ + FILE_MODE_ROTH, FILE_MODE_RGRP, FILE_MODE_RUSR, + FILE_MODE_WOTH, FILE_MODE_WGRP, FILE_MODE_WUSR])) + + objpath.wrapped_object["Name"] = permsdir + 'r--r--r--' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 0444) + self.assertEqual(set(inst.FileModeFlags), set([ + FILE_MODE_ROTH, FILE_MODE_RGRP, FILE_MODE_RUSR])) + + objpath.wrapped_object["Name"] = permsdir + 'r-xr-xr-x' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 0555) + self.assertEqual(set(inst.FileModeFlags), set([ + FILE_MODE_ROTH, FILE_MODE_RGRP, FILE_MODE_RUSR, + FILE_MODE_XOTH, FILE_MODE_XGRP, FILE_MODE_XUSR])) + + objpath.wrapped_object["Name"] = permsdir + 'r---w---x' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 0421) + self.assertEqual(set(inst.FileModeFlags), set([ + FILE_MODE_RUSR, FILE_MODE_WGRP, FILE_MODE_XOTH])) + + objpath.wrapped_object["Name"] = permsdir + 'rwSr--r--' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 04644) + self.assertEqual(set(inst.FileModeFlags), set([ + FILE_MODE_RUSR, FILE_MODE_WUSR, + FILE_MODE_RGRP, FILE_MODE_ROTH, FILE_MODE_SUID])) + + objpath.wrapped_object["Name"] = permsdir + 'rw-r-Sr--' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 02644) + self.assertEqual(set(inst.FileModeFlags), set([ + FILE_MODE_RUSR, FILE_MODE_WUSR, + FILE_MODE_RGRP, FILE_MODE_ROTH, FILE_MODE_SGID])) + + objpath.wrapped_object["Name"] = permsdir + 'rw-r--r-T' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 01644) + self.assertEqual(set(inst.FileModeFlags), set([ + FILE_MODE_RUSR, FILE_MODE_WUSR, + FILE_MODE_RGRP, FILE_MODE_ROTH, FILE_MODE_SVTX])) + + objpath.wrapped_object["Name"] = permsdir + 'rwsr-sr-t' + inst = objpath.to_instance() + self.assertEqual(inst.FileMode, stat.S_IFREG | 07755) + self.assertEqual(set(inst.FileModeFlags), set([ + FILE_MODE_ROTH, FILE_MODE_RGRP, FILE_MODE_RUSR, + FILE_MODE_XOTH, FILE_MODE_XGRP, FILE_MODE_XUSR, + FILE_MODE_WUSR, FILE_MODE_SUID, FILE_MODE_SGID, FILE_MODE_SVTX])) @enable_lmi_exceptions def test_enum_instance_names(self): """ - Tests EnumInstanceNames - which should not be supported. + Test ``EnumInstanceNames`` call on ``LMI_SoftwareIdentityFileCheck`` + that should not be supported. """ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_SUPPORTED, self.cim_class.instance_names) - @enable_lmi_exceptions - @unittest.skip("not reliable test for random packages") - def test_invoke_method_safe(self): - """ - Test Invoke method in a safe manner. - """ - for pkg in self.safe_pkgs: - files = self.pkgdb_files[pkg.name] - for filepath in files: - objpath = self.make_op(pkg, filepath) - inst = objpath.to_instance() - - (rval, _, _) = inst.Invoke() - self.assertEqual(pywbem.Uint32(0), #Satisfied - rval, - msg="Invoke method should be successful for %s:%s"%( - pkg.name, filepath)) - - (rval, _, _) = inst.InvokeOnSystem( - TargetSystem=self.system_iname) - - self.assertEqual(pywbem.Uint32(0), #Satisfied - rval, msg="InvokeOnSystem method should be successful" - " for %s:%s"%(pkg.name, filepath)) - - system_iname = self.system_iname.wrapped_object - system_iname["CreationClassName"] = "Bad class name" - system_iname["Name"] = "some random name" - self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, - inst.InvokeOnSystem, TargetSystem=system_iname) - - @mark_dangerous - def test_invoke_method(self): - """ - Tests Invoke method invocation. - """ - for pkg in self.dangerous_pkgs: - if ( rpmcache.is_pkg_installed(pkg.name) - and not util.verify_pkg(pkg.name)): - rpmcache.remove_pkg(pkg.name) - if not rpmcache.is_pkg_installed(pkg.name): - rpmcache.install_pkg(pkg) - for filepath in self.pkgdb_files[pkg.name]: - objpath = self.make_op(pkg, filepath) - inst = objpath.to_instance() - - (rval, _) = inst.Invoke() - self.assertEqual(pywbem.Uint32(0), #Satisfied - rval, "Invoke method should be successful for %s:%s" - % (pkg.name, filepath)) - - # modify file - if os.path.isfile(filepath): - os.remove(filepath) - else: - os.lchown(filepath, os.stat(filepath).st_uid + 1, -1) - (rval, _) = inst.Invoke() - self.assertEqual(pywbem.Uint32(2), #Not Satisfied - rval, - "Invoke method should not pass for modified file %s:%s" - % (pkg.name, filepath)) + @swbase.test_with_packages('stable#pkg1') + def test_method_invoke_on_file(self): + """ + Test ``Invoke`` method of ``LMI_SoftwareIdentityFileCheck`` on regular + file. + """ + pkg = self.get_repo('stable')['pkg1'] + fp = '/usr/share/openlmi-sw-test-pkg1/README' + objpath = self.make_op(pkg, fp) + inst = objpath.to_instance() + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED, + "Invoke method should be successful for %s:%s" % (pkg, fp)) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED, + "InvokeOnSystem method should be successful for %s:%s" + % (pkg.name, fp)) + + stats = os.stat(fp) + backup = fp + '.bak' + shutil.copy(fp, backup) + + # change mode + os.chmod(fp, 0600) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MODE])) + + # restore it + shutil.copy(backup, fp) + os.utime(fp, (stats.st_atime, stats.st_mtime)) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED) + + # modify contents + with open(fp, 'a') as fobj: + fobj.write('new line\n') + os.utime(fp, (stats.st_atime, stats.st_mtime)) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), + set([FAILED_FLAGS_SIZE, FAILED_FLAGS_CHECKSUM])) + + # restore it + shutil.copy(backup, fp) + os.utime(fp, (stats.st_atime, stats.st_mtime)) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED) + + # touch it + os.utime(fp, None) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MTIME])) + + @swbase.test_with_packages('stable#pkg2') + def test_method_invoke_on_symlink(self): + """ + Test ``Invoke`` method of ``LMI_SoftwareIdentityFileCheck`` on symbolic + link. + """ + pkg = self.get_repo('stable')['pkg2'] + fp = '/usr/share/openlmi-sw-test-pkg2/symlinks/relative' + objpath = self.make_op(pkg, fp) + inst = objpath.to_instance() + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED, + "Invoke method should be successful for %s:%s" % (pkg, fp)) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED, + "InvokeOnSystem method should be successful for %s:%s" + % (pkg.name, fp)) + stats = os.stat(fp) + backup = fp + '.bak' + shutil.copy(fp, backup) + + # touch it (mtime) + os.utime(fp, None) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED) + + # change owner + os.lchown(fp, 1, -1) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_UID])) + + # remove symlink + os.remove(fp) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_EXISTENCE])) + + # restore it (mtime) + os.symlink("../data/target.txt", fp) + os.utime(fp, (stats.st_atime, stats.st_mtime)) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED) + + # point to different target + os.remove(fp) + os.symlink("../data/target.xtx", fp) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_TARGET])) + + @swbase.test_with_packages('stable#pkg3') + def test_method_invoke_on_fifo(self): + """ + Test ``Invoke`` method of ``LMI_SoftwareIdentityFileCheck`` on fifo. + """ + pkg = self.get_repo('stable')['pkg3'] + fp = '/usr/share/openlmi-sw-test-pkg3/fifos/pipe' + objpath = self.make_op(pkg, fp) + inst = objpath.to_instance() + self.assertEqual(inst.FileType, FILE_TYPE_FIFO) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED, + "Invoke method should be successful for %s:%s" % (pkg, fp)) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED, + "InvokeOnSystem method should be successful for %s:%s" + % (pkg.name, fp)) + stats = os.stat(fp) + + # touch it (mtime) + os.utime(fp, None) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED) + + # change owner + os.chown(fp, 1, -1) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_UID])) + + # remove fifo + os.remove(fp) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_EXISTENCE])) + + # restore it + os.mkfifo(fp, 0644) + os.utime(fp, (stats.st_atime, stats.st_mtime)) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED) + + # change mode + os.chmod(fp, 0400) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MODE])) + + # replace it with regular file + os.remove(fp) + with open(fp, 'w') as fobj: + fobj.write('dummy file') + os.utime(fp, (stats.st_atime, stats.st_mtime)) + inst = objpath.to_instance() + self.assertNotEqual(inst, None) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(inst.FileType, FILE_TYPE_FILE) + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MODE])) + + @swbase.test_with_packages('stable#pkg4') + def test_method_invoke_on_directory(self): + """ + Test ``Invoke`` method of ``LMI_SoftwareIdentityFileCheck`` + on directory. + """ + pkg = self.get_repo('stable')['pkg4'] + fp = '/usr/share/openlmi-sw-test-pkg4/perms' + objpath = self.make_op(pkg, fp) + inst = objpath.to_instance() + self.assertEqual(inst.FileType, FILE_TYPE_DIRECTORY) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED, + "Invoke method should be successful for %s:%s" % (pkg, fp)) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED, + "InvokeOnSystem method should be successful for %s:%s" + % (pkg.name, fp)) + + # touch it (mtime) + os.utime(fp, None) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED) + + # remove child + os.remove(os.path.join(fp, 'rwxrwxrwx')) + os.utime(fp, None) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_SATISFIED) + + # chmod + os.chmod(fp, 0755) + (rval, _, _) = inst.Invoke() + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname) + self.assertEqual(rval, INVOKE_NOT_SATISFIED) + inst.refresh() + self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MODE])) def suite(): """For unittest loaders.""" diff --git a/src/software/test/test_software_identity_resource.py b/src/software/test/test_software_identity_resource.py index e6b5f66..8c18f62 100644 --- a/src/software/test/test_software_identity_resource.py +++ b/src/software/test/test_software_identity_resource.py @@ -19,42 +19,46 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_SoftwareIdentityResource provider. +Unit tests for ``LMI_SoftwareIdentityResource`` provider. """ import pywbem import time import unittest -from lmi.software.core.IdentityResource import Values -from lmi.test.lmibase import enable_lmi_exceptions -from lmi.test.util import mark_dangerous -import base +import swbase import repository -import util -class TestSoftwareIdentityResource( - base.SoftwareBaseTestCase): #pylint: disable=R0904 +ENABLED_DEFAULT_NOT_APPLICABLE = 5 +ENABLED_STATE_DISABLED = 3 +ENABLED_STATE_ENABLED = 2 +EXTENDED_RESOURCE_TYPE_RPM = 3 +HEALTH_STATE_MAJOR_FAILURE = 20 +HEALTH_STATE_OK = 5 +INFO_FORMAT_URL = 200 +OPERATIONAL_STATUS_ERROR = 6 +OPERATIONAL_STATUS_OK = 2 +REQUESTED_STATE_ENABLED = 2 +REQUESTED_STATE_DISABLED = 3 +REQUEST_STATE_CHANGE_SUCCESSFUL = 0 +RESOURCE_TYPE_OTHER = 1 +PRIMARY_STATUS_ERROR = 3 +PRIMARY_STATUS_OK = 1 +TRANSITIONING_TO_STATE_NOT_APPLICABLE = 12 + +class TestSoftwareIdentityResource(swbase.SwTestCase): """ - Basic cim operations test. + Basic cim operations test on ``LMI_SoftwareIdentityResource``. """ CLASS_NAME = "LMI_SoftwareIdentityResource" KEYS = ("CreationClassName", "Name", "SystemCreationClassName", "SystemName") - @classmethod - def needs_pkgdb(cls): - return False - - @classmethod - def needs_repodb(cls): - return True - def make_op(self, repo): """ - @param ses SoftwareElementState property value - @return object path of SoftwareIdentityResource + :returns: Object path of ``LMI_SoftwareIdentityResource`` + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ if isinstance(repo, repository.Repository): repo = repo.repoid @@ -65,11 +69,6 @@ class TestSoftwareIdentityResource( "CreationClassName" : self.CLASS_NAME }) - def tearDown(self): - for repo in self.repodb: - if repository.is_repo_enabled(repo) != repo.status: - repository.set_repo_enabled(repo, repo.status) - def _check_repo_instance(self, repo, inst): """ Checks instance properties of repository. @@ -78,9 +77,9 @@ class TestSoftwareIdentityResource( self.assertCIMNameEqual(objpath, inst.path) for key in self.KEYS: self.assertEqual(getattr(inst, key), getattr(inst.path, key)) - self.assertEqual(repo.repoid, inst.Name) - self.assertIsInstance(inst.AccessContext, pywbem.Uint16) + self.assertEqual(inst.Name, repo.repoid) + self.assertIsInstance(inst.AccessContext, pywbem.Uint16) access_info = inst.AccessInfo if access_info is not None: access_info = access_info.rstrip('/') @@ -99,157 +98,293 @@ class TestSoftwareIdentityResource( "AccessInfo missing in base_urls for repo %s" % repo.repoid) self.assertIsInstance(inst.AvailableRequestedStates, list) - self.assertEqual(repo.name, inst.Caption) + self.assertEqual(inst.Caption, repo.name) self.assertIsInstance(inst.Cost, pywbem.Sint32) self.assertIsInstance(inst.Description, basestring) - self.assertEqual(repo.repoid, inst.ElementName) - self.assertEqual(5, inst.EnabledDefault) - self.assertEqual(2 if repo.status else 3, inst.EnabledState, + self.assertEqual(inst.ElementName, repo.repoid) + self.assertEqual(inst.EnabledDefault, ENABLED_DEFAULT_NOT_APPLICABLE) + self.assertEqual( + ENABLED_STATE_ENABLED + if repo.status else ENABLED_STATE_DISABLED, + inst.EnabledState, "EnabledState does not match for repo %s" % repo.repoid) - self.assertEqual(3, inst.ExtendedResourceType) - if repo.revision is None: - self.assertIsNone(inst.Generation) - else: - self.assertTrue(isinstance(inst.Generation, (int, long))) + self.assertEqual(inst.ExtendedResourceType, EXTENDED_RESOURCE_TYPE_RPM) + if repo.revision is not None: + self.assertTrue(isinstance(inst.Generation, (int, long)), + 'Generation must be integer for repo %s' % repo.repoid) if repo.status: - self.assertEqual(5, inst.HealthState) # OK - self.assertEqual([2], inst.OperationalStatus) # OK - self.assertEqual(1, inst.PrimaryStatus) # OK + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) else: # repo may not be enabled, but still it can be ready # (which is not documented), so let's check both posibilities - self.assertIn(inst.HealthState, [5, 20]) # OK, Major Failure - self.assertIn(inst.OperationalStatus, [[2], [6]]) # [OK], [Error] - self.assertIn(inst.PrimaryStatus, [1, 3]) # OK, Error + # TODO: health state should be OK for disabled repository + self.assertIn(inst.HealthState, + [HEALTH_STATE_OK, HEALTH_STATE_MAJOR_FAILURE]) + self.assertIn(inst.OperationalStatus, + [[OPERATIONAL_STATUS_OK], [OPERATIONAL_STATUS_ERROR]]) + self.assertIn(inst.PrimaryStatus, [PRIMARY_STATUS_OK, PRIMARY_STATUS_ERROR]) self.assertIsInstance(inst.GPGCheck, bool) - self.assertEqual(200, inst.InfoFormat) + self.assertEqual(inst.InfoFormat, INFO_FORMAT_URL) self.assertEqual("LMI:LMI_SoftwareIdentityResource:"+repo.repoid, inst.InstanceID) - if repo.mirror_list is None: - self.assertIsNone(inst.MirrorList) - else: - self.assertEqual( - repo.metalink if repo.metalink else repo.mirror_list, - inst.MirrorList) + self.assertEqual(inst.MirrorList, repo.mirror_list, + 'MirrorList must match for repo %s' % repo.repoid) self.assertIsInstance(inst.OtherAccessContext, basestring) - #self.assertEqual(repo.pkg_count, inst.PackageCount, - #"PackageCount does not match for repo %s" % repo.repoid) self.assertIsInstance(inst.RepoGPGCheck, bool) - self.assertEqual(2 if repo.status else 3, inst.RequestedState) - self.assertEqual(1, inst.ResourceType) + self.assertEqual( + ENABLED_STATE_ENABLED + if repo.status else ENABLED_STATE_DISABLED, + inst.RequestedState) + self.assertEqual(inst.ResourceType, RESOURCE_TYPE_OTHER) self.assertIsInstance(inst.StatusDescriptions, list) self.assertEqual(1, len(inst.StatusDescriptions)) - if repo.last_updated is None: - self.assertIsNone(inst.TimeOfLastUpdate) - else: + if repo.last_updated is not None: time_stamp = repo.last_updated.replace( microsecond=0, tzinfo=pywbem.cim_types.MinutesFromUTC(0)) self.assertGreaterEqual(inst.TimeOfLastUpdate.datetime, time_stamp) - self.assertEqual(12, inst.TransitioningToState) + self.assertEqual(TRANSITIONING_TO_STATE_NOT_APPLICABLE, + inst.TransitioningToState) - def test_get_instance_safe(self): - """ - Tests GetInstance call. - """ - for repo in self.repodb: - objpath = self.make_op(repo) - inst = objpath.to_instance() - self._check_repo_instance(repo, inst) - - @mark_dangerous + @swbase.test_with_repos(stable=True, updates=False) def test_get_instance(self): """ - Dangerous test, making sure, that properties are set correctly - for enabled and disabled repositories. + Test ``GetInstance()`` call on ``LMI_SoftwareIdentityResource``. """ - for repo in self.repodb: - objpath = self.make_op(repo) - self.assertIs(repository.is_repo_enabled(repo), repo.status) - inst = objpath.to_instance() - self.assertCIMNameEqual(objpath, inst.path) - for key in self.KEYS: - self.assertEqual(getattr(inst, key), getattr(inst.path, key)) - self.assertEqual(repo.repoid, inst.Name) - self.assertEqual(2 if repo.status else 3, inst.EnabledState) - self.assertEqual(2 if repo.status else 3, inst.RequestedState) - for repo in self.repodb: - objpath = self.make_op(repo) - time.sleep(1) # to make sure, that change will be noticed - repository.set_repo_enabled(repo, not repo.status) - self.assertIs(repository.is_repo_enabled(repo), not repo.status) - inst = objpath.to_instance() - self.assertEqual(repo.repoid, inst.Name) - self.assertEqual(3 if repo.status else 2, inst.EnabledState) - self.assertEqual(3 if repo.status else 2, inst.RequestedState) + stablerepo = self.get_repo('stable') + self.assertTrue(stablerepo.status, "stable repository is enabled") + objpath = self.make_op(stablerepo) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + "GetInstance() call on stable repository is successful" + ' with path: "%s"' % str(objpath)) + self._check_repo_instance(stablerepo, inst) + self.assertEqual(set(self.KEYS), set(inst.path.key_properties())) + + updaterepo = self.get_repo('updates') + objpath = self.make_op(updaterepo) + self.assertFalse(updaterepo.status, "updates repository is disabled") + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + 'GetInstance() call on updates repository is successful' + ' with path "%s"' % str(objpath)) + self._check_repo_instance(updaterepo, inst) + self.assertEqual(set(self.KEYS), set(inst.path.key_properties())) def test_enum_instance_names(self): """ - Tests EnumInstanceNames call on all repositories. + Test ``EnumInstanceNames()`` call on ``LMI_SoftwareIdentityResource``. """ + all_repo_names = set(self.repodb.keys()).union( + set(self.other_repos.keys())) inames = self.cim_class.instance_names() - self.assertEqual(len(inames), len(self.repodb)) - repoids = set(r.repoid for r in self.repodb) for iname in inames: - self.assertEqual(iname.namespace, 'root/cimv2') self.assertEqual(set(iname.key_properties()), set(self.KEYS)) - self.assertIn(iname.Name, repoids) - objpath = self.make_op(iname.Name) - self.assertCIMNameEqual(objpath, iname) + self.assertIn(iname.Name, all_repo_names) + all_repo_names.remove(iname.Name) + self.assertEqual(iname.SystemCreationClassName, self.system_cs_name) + self.assertEqual(iname.CreationClassName, self.CLASS_NAME) + self.assertEqual(len(all_repo_names), 0) def test_enum_instances(self): """ - Tests EnumInstances call on all repositories. + Test ``EnumInstances()`` call on ``LMI_SoftwareIdentityResource``. """ + all_repo_names = set(self.repodb.keys()).union( + set(self.other_repos.keys())) insts = self.cim_class.instances() - self.assertGreater(len(insts), 0) - repoids = dict((r.repoid, r) for r in self.repodb) for inst in insts: - self.assertIn(inst.Name, repoids) - self._check_repo_instance(repoids[inst.Name], inst) + self.assertEqual(set(inst.path.key_properties()), set(self.KEYS)) + self.assertIn(inst.Name, all_repo_names) + all_repo_names.remove(inst.Name) + if inst.Name in self.repodb: + repo = self.repodb[inst.Name] + else: + repo = self.other_repos[inst.Name] + self._check_repo_instance(repo, inst) + self.assertEqual(len(all_repo_names), 0) - @enable_lmi_exceptions - @mark_dangerous - def test_request_state_change(self): + @swbase.test_with_repos('updates') + def test_disable_repo(self): """ - Tests InvokeMethod on RequestStateChange(). + Test whether ``LMI_SoftwareIdentityResource`` provider recognizes + that repository has been disabled. """ - for repo in self.repodb: - objpath = self.make_op(repo) - self.assertIs(repository.is_repo_enabled(repo), repo.status) + repo = self.get_repo('updates') + self.assertTrue(repo.status) + objpath = self.make_op(repo) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + 'GetInstance() call on updates repository is successful' + ' with path "%s"' % str(objpath)) + self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED) + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) - # change state of repository (enabled/disabled) - req_state = ( Values.RequestStateChange.RequestedState.Disabled - if repo.status else - Values.RequestStateChange.RequestedState.Enabled) - inst = objpath.to_instance() - (res, oparms, _) = inst.RequestStateChange(RequestedState=req_state) - self.assertEqual(0, res) #Completed with no error - self.assertEqual(0, len(oparms)) - inst.refresh() - self.assertEqual(inst.EnabledState, req_state) - for pkg in self.dangerous_pkgs: - if pkg.up_repo == repo: - pkg_iname = util.make_identity_path(pkg) - self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, - objpath.to_instance) + repository.set_repo_enabled(repo, False) + repo.refresh() + self.assertFalse(repo.status) + time.sleep(0.1) + inst.refresh() + self.assertNotEqual(inst, None, + 'GetInstance() call on updates repository is successful' + ' with path "%s"' % str(objpath)) + self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED) + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) + + + @swbase.test_with_repos(updates=False) + def test_enable_repo(self): + """ + Test whether ``LMI_SoftwareIdentityResource`` provider recognizes that + repository has been enabled. + """ + repo = self.get_repo('updates') + self.assertFalse(repo.status) + objpath = self.make_op(repo) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + 'GetInstance() call on updates repository is successful' + ' with path "%s"' % str(objpath)) + self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED) + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) + + repository.set_repo_enabled(repo, True) + repo.refresh() + self.assertTrue(repo.status) + inst.refresh() + self.assertNotEqual(inst, None, + 'GetInstance() call on updates repository is successful' + ' with path "%s"' % str(objpath)) + self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED) + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) + + @swbase.test_with_repos('stable') + def test_disable_enabled_repo(self): + """ + Test ``RequestStateChange()`` method invocation on + ``LMI_SoftwareIdentityResource`` by requesting disablement of + repository. + """ + repo = self.get_repo('stable') + self.assertTrue(repo.status) + objpath = self.make_op(repo) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + 'GetInstance() call on stable repository is successful' + ' with path "%s"' % str(objpath)) + self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED) + + # try to disable it with method invocation + (rval, oparms, _) = inst.RequestStateChange( + RequestedState=REQUESTED_STATE_DISABLED) + self.assertEqual(rval, REQUEST_STATE_CHANGE_SUCCESSFUL) + self.assertEqual(len(oparms), 0) + repo.refresh() + self.assertFalse(repo.status) + inst.refresh() + self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED) + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) + + @swbase.test_with_repos(stable=False) + def test_enable_disabled_repo(self): + """ + Test ``RequestStateChange()`` method invocation on + ``LMI_SoftwareIdentityResource`` by requesting enablement of + repository. + """ + repo = self.get_repo('stable') + self.assertFalse(repo.status) + objpath = self.make_op(repo) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + 'GetInstance() call on stable repository is successful' + ' with path "%s"' % str(objpath)) + self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED) + + # try to enable it with method invocation + (rval, oparms, _) = inst.RequestStateChange( + RequestedState=REQUESTED_STATE_ENABLED) + self.assertEqual(rval, REQUEST_STATE_CHANGE_SUCCESSFUL) + self.assertEqual(len(oparms), 0) + repo.refresh() + self.assertTrue(repo.status) + inst.refresh() + self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED) + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) + + @swbase.test_with_repos('stable') + def test_enable_enabled_repo(self): + """ + Test ``RequestStateChange()`` method invocation on + ``LMI_SoftwareIdentityResource`` by requesting enablement of + enabled repository. + """ + repo = self.get_repo('stable') + self.assertTrue(repo.status) + objpath = self.make_op(repo) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + 'GetInstance() call on stable repository is successful' + ' with path "%s"' % str(objpath)) + self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED) + + # try to enable it with method invocation + (rval, oparms, _) = inst.RequestStateChange( + RequestedState=REQUESTED_STATE_ENABLED) + self.assertEqual(rval, REQUEST_STATE_CHANGE_SUCCESSFUL) + self.assertEqual(len(oparms), 0) + repo.refresh() + self.assertTrue(repo.status) + inst.refresh() + self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED) + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) + + @swbase.test_with_repos(stable=False) + def test_disable_disabled_repo(self): + """ + Test ``RequestStateChange()`` method invocation on + ``LMI_SoftwareIdentityResource`` by requesting disablement of + disabled repository. + """ + repo = self.get_repo('stable') + self.assertFalse(repo.status) + objpath = self.make_op(repo) + inst = objpath.to_instance() + self.assertNotEqual(inst, None, + 'GetInstance() call on stable repository is successful' + ' with path "%s"' % str(objpath)) + self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED) - # revert back to original configuration - req_state = ( Values.RequestStateChange.RequestedState.Enabled - if repo.status else - Values.RequestStateChange.RequestedState.Disabled) - (res, oparms, _) = inst.RequestStateChange(RequestedState=req_state) - self.assertEqual(0, res) #Completed with no error - self.assertEqual(0, len(oparms)) - inst.refresh() - self.assertEqual(inst.EnabledState, req_state) - for pkg in self.dangerous_pkgs: - if pkg.up_repo == repo: - pkg_iname = util.make_identity_path(pkg) - inst = pkg_iname.to_instance() - self.assertNotEqual(inst, None) + # try to enable it with method invocation + (rval, oparms, _) = inst.RequestStateChange( + RequestedState=REQUESTED_STATE_DISABLED) + self.assertEqual(rval, REQUEST_STATE_CHANGE_SUCCESSFUL) + self.assertEqual(len(oparms), 0) + repo.refresh() + self.assertFalse(repo.status) + inst.refresh() + self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED) + self.assertEqual(inst.HealthState, HEALTH_STATE_OK) + self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK]) + self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK) def suite(): - """For unittest loaders.""" + """ For unittest loaders. """ return unittest.TestLoader().loadTestsFromTestCase( TestSoftwareIdentityResource) diff --git a/src/software/test/test_system_software_collection.py b/src/software/test/test_system_software_collection.py index ae3236a..bcd184e 100644 --- a/src/software/test/test_system_software_collection.py +++ b/src/software/test/test_system_software_collection.py @@ -19,17 +19,16 @@ # Authors: Michal Minar <miminar@redhat.com> # """ -Unit tests for LMI_SoftwareIdentity provider. +Unit tests for ``LMI_SystemSoftwareCollection`` provider. """ import unittest -import base +import swbase -class TestSystemSoftwareCollection( - base.SoftwareBaseTestCase): #pylint: disable=R0904 +class TestSystemSoftwareCollection(swbase.SwTestCase): """ - Basic cim operations test. + Basic cim operations test on ``LMI_SystemSoftwareCollection``. """ CLASS_NAME = "LMI_SystemSoftwareCollection" @@ -37,8 +36,8 @@ class TestSystemSoftwareCollection( def make_op(self): """ - @param ses SoftwareElementState property value - @return object path of SoftwareIdentity + :returns Object path of ``LMI_SystemSoftwareCollection``. + :rtype: :py:class:`lmi.shell.LMIInstanceName` """ return self.cim_class.new_instance_name({ "InstanceID" : "LMI:LMI_SystemSoftwareCollection" @@ -46,7 +45,7 @@ class TestSystemSoftwareCollection( def test_get_instance(self): """ - Tests GetInstance call on packages from our rpm cache. + Test ``GetInstance()`` call on ``LMI_SystemSoftwareCollection``. """ objpath = self.make_op() inst = objpath.to_instance() @@ -58,7 +57,7 @@ class TestSystemSoftwareCollection( def test_enum_instance_names(self): """ - Tests EnumInstanceNames call on installed packages. + Test ``EnumInstanceNames()`` call on ``LMI_SystemSoftwareCollection``. """ inames = self.cim_class.instance_names() self.assertEqual(len(inames), 1) @@ -67,7 +66,7 @@ class TestSystemSoftwareCollection( def test_enum_instances(self): """ - Tests EnumInstances call on installed packages. + Test ``EnumInstances()`` call on ``LMI_SystemSoftwareCollection``. """ insts = self.cim_class.instances() self.assertEqual(len(insts), 1) diff --git a/src/software/test/util.py b/src/software/test/util.py index c0cb4f0..3c63be0 100644 --- a/src/software/test/util.py +++ b/src/software/test/util.py @@ -49,17 +49,17 @@ def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): """ estr = '' if with_epoch.lower() == "always": - estr = epoch + estr = str(epoch) elif with_epoch.lower() == "not_zero": - if epoch and epoch.lower() not in {"0", "(none)"}: - estr = epoch + if epoch and str(epoch).lower() not in {"0", "(none)"}: + estr = str(epoch) if len(estr): estr += ":" return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) def make_evra(epoch, ver, rel, arch): """ @return evra string """ - if not epoch or epoch.lower() == "(none)": + if not epoch or str(epoch).lower() == "(none)": epoch = "0" return "%s:%s-%s.%s" % (epoch, ver, rel, arch) |